1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import re 6import logging 7import time 8 9from autotest_lib.client.common_lib import error 10 11class PDConsoleUtils(object): 12 """ Provides a set of methods common to USB PD FAFT tests 13 14 Each instance of this class is associated with a particular 15 servo UART console. USB PD tests will typically use the console 16 command 'pd' and its subcommands to control/monitor Type C PD 17 connections. The servo object used for UART operations is 18 passed in and stored when this object is created. 19 20 """ 21 22 SRC_CONNECT = 'SRC_READY' 23 SNK_CONNECT = 'SNK_READY' 24 SRC_DISC = 'SRC_DISCONNECTED' 25 SNK_DISC = 'SNK_DISCONNECTED' 26 DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE' 27 PD_MAX_PORTS = 2 28 CONNECT_TIME = 4 29 30 DUALROLE_QUERY_DELAY = 0.25 31 # Dualrole input/ouput values of methods in this class. 32 DUALROLE_VALUES = ['on', 'off', 'snk', 'src'] 33 # Strings passing to the console command "pd dualrole" 34 DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source'] 35 # Strings returned from the console command "pd dualrole" 36 DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source'] 37 38 # Some old firmware uses a single dualrole setting for all ports; while 39 # some new firmware uses a per port dualrole settting. This flag will be 40 # initialized to True or False. 41 # TODO: Remove this flag when the old setting phases out 42 per_port_dualrole_setting = None 43 44 # Dictionary for 'pd 0/1 state' parsing 45 PD_STATE_DICT = { 46 'port': 'Port\s+([\w]+)', 47 'role': 'Role:\s+([\w]+-[\w]+)', 48 'pd_state': 'State:\s+([\d\w()_]+)', 49 'flags': 'Flags:\s+([\w]+)', 50 'polarity': '(CC\d)' 51 } 52 53 # Regex to match PD state name; work for both old and new formats 54 RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?" 55 # Copied from ec repo: common/usb_pd_protocol.c 56 PD_STATE_NAMES = [ 57 "DISABLED", # index: 0 58 "SUSPENDED", 59 "SNK_DISCONNECTED", 60 "SNK_DISCONNECTED_DEBOUNCE", 61 "SNK_HARD_RESET_RECOVER", 62 "SNK_DISCOVERY", # index: 5 63 "SNK_REQUESTED", 64 "SNK_TRANSITION", 65 "SNK_READY", 66 "SNK_SWAP_INIT", 67 "SNK_SWAP_SNK_DISABLE", # index: 10 68 "SNK_SWAP_SRC_DISABLE", 69 "SNK_SWAP_STANDBY", 70 "SNK_SWAP_COMPLETE", 71 "SRC_DISCONNECTED", 72 "SRC_DISCONNECTED_DEBOUNCE", # index: 15 73 "SRC_HARD_RESET_RECOVER", 74 "SRC_STARTUP", 75 "SRC_DISCOVERY", 76 "SRC_NEGOCIATE", 77 "SRC_ACCEPTED", # index: 20 78 "SRC_POWERED", 79 "SRC_TRANSITION", 80 "SRC_READY", 81 "SRC_GET_SNK_CAP", 82 "DR_SWAP", # index: 25 83 "SRC_SWAP_INIT", 84 "SRC_SWAP_SNK_DISABLE", 85 "SRC_SWAP_SRC_DISABLE", 86 "SRC_SWAP_STANDBY", 87 "VCONN_SWAP_SEND", # index: 30 88 "VCONN_SWAP_INIT", 89 "VCONN_SWAP_READY", 90 "SOFT_RESET", 91 "HARD_RESET_SEND", 92 "HARD_RESET_EXECUTE", # index: 35 93 "BIST_RX", 94 "BIST_TX", 95 "DRP_AUTO_TOGGLE", 96 ] 97 98 # Dictionary for PD control message types 99 PD_CONTROL_MSG_MASK = 0x1f 100 PD_CONTROL_MSG_DICT = { 101 'GoodCRC': 1, 102 'GotoMin': 2, 103 'Accept': 3, 104 'Reject': 4, 105 'Ping': 5, 106 'PS_RDY': 6, 107 'Get_Source_Cap': 7, 108 'Get_Sink_Cap': 8, 109 'DR_Swap': 9, 110 'PR_Swap': 10, 111 'VCONN_Swap': 11, 112 'Wait': 12, 113 'Soft_Reset': 13 114 } 115 116 # Dictionary for PD firmware state flags 117 PD_STATE_FLAGS_DICT = { 118 'power_swap': 1 << 1, 119 'data_swap': 1 << 2, 120 'data_swap_active': 1 << 3, 121 'vconn_on': 1 << 12 122 } 123 124 def __init__(self, console): 125 """Console can be either usbpd, ec, or pdtester UART 126 This object with then be used by the class which creates 127 the PDConsoleUtils class to send/receive commands to UART 128 """ 129 # save console for UART access functions 130 self.console = console 131 132 def send_pd_command(self, cmd): 133 """Send command to PD console UART 134 135 @param cmd: pd command string 136 """ 137 self.console.send_command(cmd) 138 139 def send_pd_command_get_output(self, cmd, regexp): 140 """Send command to PD console, wait for response 141 142 @param cmd: pd command string 143 @param regexp: regular expression for desired output 144 """ 145 # Enable PD console debug mode to show control messages 146 self.enable_pd_console_debug() 147 output = self.console.send_command_get_output(cmd, regexp) 148 self.disable_pd_console_debug() 149 return output 150 151 def send_pd_command_get_reply_msg(self, cmd): 152 """Send PD protocol msg, get PD control msg reply 153 154 The PD console debug mode is enabled prior to sending 155 a pd protocol message. This allows the 156 control message reply to be extracted. The debug mode 157 is disabled prior to exiting. 158 159 @param cmd: pd command to issue to the UART console 160 161 @returns: PD control header message 162 """ 163 m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W']) 164 ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK 165 return ctrl_msg 166 167 def verify_pd_console(self): 168 """Verify that PD commands exist on UART console 169 170 Send 'help' command to UART console 171 @returns: True if 'pd' is found, False if not 172 """ 173 174 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)']) 175 if l[0][1] == 'pd': 176 return True 177 else: 178 return False 179 180 def execute_pd_state_cmd(self, port): 181 """Get PD state for specified channel 182 183 pd 0/1 state command gives produces 5 fields. The full response 184 line is captured and then parsed to extract each field to fill 185 the dict containing port, polarity, role, pd_state, and flags. 186 187 @param port: Type C PD port 0 or 1 188 189 @returns: A dict with the 5 fields listed above 190 @raises: TestFail if any field not found 191 """ 192 cmd = 'pd' 193 subcmd = 'state' 194 pd_cmd = cmd +" " + str(port) + " " + subcmd 195 # Two FW versions for this command, get full line. 196 m = self.send_pd_command_get_output(pd_cmd, 197 ['(Port.*) - (Role:.*)\n']) 198 199 # Extract desired values from result string 200 state_result = {} 201 for key, regexp in self.PD_STATE_DICT.iteritems(): 202 value = re.search(regexp, m[0][0]) 203 if value: 204 state_result[key] = value.group(1) 205 else: 206 raise error.TestFail('pd %d state: %r value not found' % 207 (port, key)) 208 209 return state_result 210 211 def _normalize_pd_state(self, state): 212 """Normalize the PD state name which handles both old and new formats. 213 214 The old format is like: "SNK_READY" 215 The new format is like: "8()" if debug_level == 0, or 216 "8(SNK_READY)" if debug_level > 0 217 218 This method will convert the new format to the old one. 219 220 @param state: The raw PD state text 221 222 @returns: The normalized PD state name 223 @raises: TestFail if unexpected PD state format 224 """ 225 m = re.match(self.RE_PD_STATE, state) 226 if m and any(m.groups()): 227 state_index, state_name = m.groups() 228 if state_index is None: 229 # The old format: return the name 230 return state_name 231 # The new format: map the index to a name 232 mapped_name = self.PD_STATE_NAMES[int(state_index)] 233 if state_name is not None: 234 assert mapped_name == state_name 235 return mapped_name 236 else: 237 raise error.TestFail('Unexpected PD state format: %s' % state) 238 239 def get_pd_state(self, port): 240 """Get the current PD state 241 242 @param port: Type C PD port 0/1 243 @returns: current pd state 244 """ 245 246 pd_dict = self.execute_pd_state_cmd(port) 247 return self._normalize_pd_state(pd_dict['pd_state']) 248 249 def get_pd_port(self, port): 250 """Get the current PD port 251 252 @param port: Type C PD port 0/1 253 @returns: current pd state 254 """ 255 pd_dict = self.execute_pd_state_cmd(port) 256 return pd_dict['port'] 257 258 def get_pd_role(self, port): 259 """Get the current PD power role (source or sink) 260 261 @param port: Type C PD port 0/1 262 @returns: current pd state 263 """ 264 pd_dict = self.execute_pd_state_cmd(port) 265 return pd_dict['role'] 266 267 def get_pd_flags(self, port): 268 """Get the current PD flags 269 270 @param port: Type C PD port 0/1 271 @returns: current pd state 272 """ 273 pd_dict = self.execute_pd_state_cmd(port) 274 return pd_dict['flags'] 275 276 def get_pd_dualrole(self, port): 277 """Get the current PD dualrole setting 278 279 @param port: Type C PD port 0/1 280 @returns: current PD dualrole setting, one of (on, off, snk, src) 281 """ 282 if self.per_port_dualrole_setting is True: 283 cmd = 'pd %d dualrole' % port 284 elif self.per_port_dualrole_setting is False: 285 cmd = 'pd dualrole' 286 else: 287 try: 288 logging.info('The per_port_dualrole_setting is unknown; ' 289 'try the True case') 290 self.per_port_dualrole_setting = True 291 return self.get_pd_dualrole(port) 292 except: 293 logging.info('The per_port_dualrole_setting=True failed; ' 294 'try the False case') 295 self.per_port_dualrole_setting = False 296 return self.get_pd_dualrole(port) 297 298 m = self.send_pd_command_get_output(cmd, 299 ['dual-role toggling:\s+([\w ]+)[\r\n]']) 300 # Find the index according to the output of "pd dualrole" command 301 dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1]) 302 # Map to a string which is the output of this method 303 return self.DUALROLE_VALUES[dual_index] 304 305 def set_pd_dualrole(self, port, value): 306 """Set pd dualrole 307 308 It can be set to either: 309 1. on 310 2. off 311 3. snk (force sink mode) 312 4. src (force source mode) 313 After setting, the current value is read to confirm that it 314 was set properly. 315 316 @param port: Type C PD port 0/1 317 @param value: One of the 4 options listed 318 """ 319 # If the dualrole setting is not initialized, call the get method to 320 # initialize it. 321 if self.per_port_dualrole_setting is None: 322 self.get_pd_dualrole(port) 323 324 # Get string required for console command 325 dual_index = self.DUALROLE_VALUES.index(value) 326 # Create console command 327 cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index]) 328 self.console.send_command(cmd) 329 time.sleep(self.DUALROLE_QUERY_DELAY) 330 # Get current setting to verify that command was successful 331 dual = self.get_pd_dualrole(port) 332 # If it doesn't match, then raise error 333 if dual != value: 334 raise error.TestFail("dualrole error: " + value + " != " + dual) 335 336 def query_pd_connection(self): 337 """Determine if PD connection is present 338 339 Try the 'pd 0/1 state' command and see if it's in either 340 expected state of a conneciton. Record the port number 341 that has an active connection 342 343 @returns: dict with params port, connect, and state 344 """ 345 status = {} 346 port = 0; 347 status['connect'] = False 348 status['port'] = port 349 state = self.get_pd_state(port) 350 # Check port 0 first 351 if state == self.SRC_CONNECT or state == self.SNK_CONNECT: 352 status['connect'] = True 353 status['role'] = state 354 else: 355 port = 1 356 status['port'] = port 357 state = self.get_pd_state(port) 358 # Check port 1 359 if state == self.SRC_CONNECT or state == self.SNK_CONNECT: 360 status['connect'] = True 361 status['role'] = state 362 363 return status 364 365 def swap_power_role(self, port): 366 """Attempt a power role swap 367 368 This method attempts to execute a power role swap. A check 369 is made to ensure that dualrole mode is enabled and that 370 a PD contract is currently established. If both checks pass, 371 then the power role swap command is issued. After a delay, 372 if a PD contract is established and the current state does 373 not equal the starting state, then it was successful. 374 375 @param port: pd port number 376 377 @returns: True if power swap is successful, False otherwise. 378 """ 379 # Get starting state 380 if self.is_pd_dual_role_enabled(port) == False: 381 logging.info('Dualrole Mode not enabled!') 382 return False 383 if self.is_pd_connected(port) == False: 384 logging.info('PD contract not established!') 385 return False 386 current_pr = self.get_pd_state(port) 387 swap_cmd = 'pd %d swap power' % port 388 self.send_pd_command(swap_cmd) 389 time.sleep(self.CONNECT_TIME) 390 new_pr = self.get_pd_state(port) 391 logging.info('Power swap: %s -> %s', current_pr, new_pr) 392 if self.is_pd_connected(port) == False: 393 return False 394 return bool(current_pr != new_pr) 395 396 def disable_pd_console_debug(self): 397 """Turn off PD console debug 398 399 """ 400 cmd = 'pd dump 0' 401 self.send_pd_command(cmd) 402 403 def enable_pd_console_debug(self): 404 """Enable PD console debug level 1 405 406 """ 407 cmd = 'pd dump 2' 408 self.send_pd_command(cmd) 409 410 def is_pd_flag_set(self, port, key): 411 """Test a bit in PD protocol state flags 412 413 The flag word contains various PD protocol state information. 414 This method allows for a specific flag to be tested. 415 416 @param port: Port which has the active PD connection 417 @param key: dict key to retrieve the flag bit mapping 418 419 @returns True if the bit to be tested is set 420 """ 421 pd_flags = self.get_pd_flags(port) 422 return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16)) 423 424 def is_pd_connected(self, port): 425 """Check if a PD connection is active 426 427 @param port: port to be used for pd console commands 428 429 @returns True if port is in connected state 430 """ 431 state = self.get_pd_state(port) 432 return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT) 433 434 def is_pd_dual_role_enabled(self, port): 435 """Check if a PD device is in dualrole mode 436 437 @returns True is dualrole mode is active, false otherwise 438 """ 439 drp = self.get_pd_dualrole(port) 440 return bool(drp == 'on') 441 442 443class PDConnectionUtils(PDConsoleUtils): 444 """Provides a set of methods common to USB PD FAFT tests 445 446 This class is used for PD utility methods that require access 447 to both PDTester and DUT PD consoles. 448 449 """ 450 451 def __init__(self, dut_console, pdtester_console): 452 """ 453 @param dut_console: PD console object for DUT 454 @param pdtester_console: PD console object for PDTester 455 """ 456 # save console for DUT PD UART access functions 457 self.dut_console = dut_console 458 # save console for PDTester UART access functions 459 self.pdtester_console = pdtester_console 460 super(PDConnectionUtils, self).__init__(dut_console) 461 462 def _verify_pdtester_connection(self, port): 463 """Verify DUT to PDTester PD connection 464 465 This method checks for a PDTester PD connection for the 466 given port by first verifying if a PD connection is present. 467 If found, then it uses a PDTester feature to force a PD disconnect. 468 If the port is no longer in the connected state, and following 469 a delay, is found to be back in the connected state, then 470 a DUT pd to PDTester connection is verified. 471 472 @param port: DUT pd port to test 473 474 @returns True if DUT to PDTester pd connection is verified 475 """ 476 DISCONNECT_CHECK_TIME = 0.5 477 DISCONNECT_TIME_SEC = 2 478 # pdtester console command to force PD disconnect 479 disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000) 480 # Only check for PDTester if DUT has active PD connection 481 if self.dut_console.is_pd_connected(port): 482 # Attempt to force PD disconnection 483 self.pdtester_console.send_pd_command(disc_cmd) 484 time.sleep(DISCONNECT_CHECK_TIME) 485 # Verify that DUT PD port is no longer connected 486 if self.dut_console.is_pd_connected(port) == False: 487 # Wait for disconnect timer and give time to reconnect 488 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 489 if self.dut_console.is_pd_connected(port): 490 logging.info('PDTester connection verified on port %d', 491 port) 492 return True 493 else: 494 # Could have disconnected other port, allow it to reconnect 495 # before exiting. 496 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 497 return False 498 499 def find_dut_to_pdtester_connection(self): 500 """Find the PD port which is connected to PDTester 501 502 @returns DUT pd port number if found, None otherwise 503 """ 504 for port in xrange(self.dut_console.PD_MAX_PORTS): 505 # Check for DUT to PDTester connection on port 506 if self._verify_pdtester_connection(port): 507 # PDTester PD connection found so exit 508 return port 509 return None 510