1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import re 11import logging 12import six 13from six.moves import range 14import time 15 16from autotest_lib.client.common_lib import error 17 18 19class PDConsoleUtils(object): 20 """Base clase for all PD console utils 21 22 This class provides a set of APIs for expected Type C PD required actions 23 in TypeC FAFT tests. The base class is specific for Type C console access. 24 25 """ 26 def __init__(self, console): 27 """Console can be either usbpd, ec, or pdtester UART 28 29 This object with then be used by the class which creates 30 the PDConsoleUtils class to send/receive commands to UART 31 """ 32 # save console for UART access functions 33 self.console = console 34 35 def send_pd_command(self, cmd): 36 """Send command to PD console UART 37 38 @param cmd: pd command string 39 """ 40 self.console.send_command(cmd) 41 42 def send_pd_command_get_output(self, cmd, regexp, debug_on=True): 43 """Send command to PD console, wait for response 44 45 @param cmd: pd command string 46 @param regexp: regular expression for desired output 47 """ 48 # Enable PD console debug mode to show control messages 49 if debug_on: 50 self.enable_pd_console_debug() 51 output = self.console.send_command_get_output(cmd, regexp) 52 if debug_on: 53 self.disable_pd_console_debug() 54 return output 55 56 def send_pd_command_get_reply_msg(self, cmd): 57 """Send PD protocol msg, get PD control msg reply 58 59 The PD console debug mode is enabled prior to sending 60 a pd protocol message. This allows the 61 control message reply to be extracted. The debug mode 62 is disabled prior to exiting. 63 64 @param cmd: pd command to issue to the UART console 65 66 @returns: PD control header message 67 """ 68 m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W']) 69 ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK 70 return ctrl_msg 71 72 def verify_pd_console(self): 73 """Verify that PD commands exist on UART console 74 75 Send 'help' command to UART console 76 77 @returns: True if 'pd' is found, False if not 78 """ 79 80 l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)']) 81 if l[0][1] == 'pd': 82 return True 83 else: 84 return False 85 86 def get_pd_version(self): 87 """Get the version of the PD stack 88 89 @returns: version of PD stack, one of (1, 2) 90 """ 91 # Match a number or an error ("Wrong number of params") 92 matches = self.console.send_command_get_output('pd version', 93 [r'\s+(\d+|Wrong.*)']) 94 if matches: 95 result = matches[0][1] 96 if result[0].isdigit(): 97 return int(result) 98 return 1 99 100 def execute_pd_state_cmd(self, port): 101 """Get PD state for specified channel 102 103 pd 0/1 state command gives produces 5 fields. The full response 104 line is captured and then parsed to extract each field to fill 105 the dict containing port, polarity, role, pd_state, and flags. 106 107 @param port: Type C PD port 0 or 1 108 109 @returns: A dict with the 5 fields listed above 110 @raises: TestFail if any field not found 111 """ 112 cmd = 'pd' 113 subcmd = 'state' 114 pd_cmd = cmd +" " + str(port) + " " + subcmd 115 time.sleep(self.CURRENT_STATE_PROBE_DELAY) 116 # Two FW versions for this command, get full line. 117 m = self.send_pd_command_get_output(pd_cmd, ['(Port.*) - (Role:.*)\n'], 118 debug_on=False) 119 120 # Extract desired values from result string 121 state_result = {} 122 pd_state_dict = self.PD_STATE_DICT 123 124 for key, regexp in six.iteritems(pd_state_dict): 125 value = re.search(regexp, m[0][0]) 126 if value: 127 state_result[key] = value.group(1) 128 else: 129 raise error.TestFail('pd %d state: %r value not found' % 130 (port, key)) 131 132 return state_result 133 134 def get_pd_state(self, port): 135 """Get the current PD state 136 137 """ 138 raise NotImplementedError( 139 'should be implemented in derived class') 140 141 def get_pd_port(self, port): 142 """Get the current PD port 143 144 @param port: Type C PD port 0/1 145 @returns: current pd state 146 """ 147 pd_dict = self.execute_pd_state_cmd(port) 148 return pd_dict['port'] 149 150 def get_pd_role(self, port): 151 """Get the current PD power role (source or sink) 152 153 @param port: Type C PD port 0/1 154 @returns: current pd state 155 """ 156 pd_dict = self.execute_pd_state_cmd(port) 157 return pd_dict['role'] 158 159 def get_pd_flags(self, port): 160 """Get the current PD flags 161 162 @param port: Type C PD port 0/1 163 @returns: current pd state 164 """ 165 pd_dict = self.execute_pd_state_cmd(port) 166 return pd_dict['flags'] 167 168 def get_pd_dualrole(self, port): 169 """Get the current PD dualrole setting 170 171 @param port: Type C PD port 0/1 172 @returns: current PD dualrole setting, one of (on, off, snk, src) 173 """ 174 175 if self.per_port_dualrole_setting is True: 176 cmd = 'pd %d dualrole' % port 177 elif self.per_port_dualrole_setting is False: 178 cmd = 'pd dualrole' 179 else: 180 try: 181 self.per_port_dualrole_setting = True 182 return self.get_pd_dualrole(port) 183 except: 184 self.per_port_dualrole_setting = False 185 return self.get_pd_dualrole(port) 186 187 dualrole_values = self.DUALROLE_VALUES 188 189 m = self.send_pd_command_get_output( 190 cmd, ['dual-role toggling:\s+([\w ]+)[\r\n]'], debug_on=False) 191 # Find the index according to the output of "pd dualrole" command 192 dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1]) 193 # Map to a string which is the output of this method 194 return dualrole_values[dual_index] 195 196 def set_pd_dualrole(self, port, value): 197 """Set pd dualrole 198 199 """ 200 raise NotImplementedError( 201 'should be implemented in derived class') 202 203 def query_pd_connection(self): 204 """Determine if PD connection is present 205 206 Try the 'pd 0/1 state' command and see if it's in either 207 expected state of a connection. Record the port number 208 that has an active connection 209 210 @returns: dict with params port, connect, and state 211 """ 212 status = {} 213 port = 0; 214 status['connect'] = False 215 status['port'] = port 216 state = self.get_pd_state(port) 217 # Check port 0 first 218 219 if self.is_pd_connected(port): 220 status['connect'] = True 221 status['role'] = state 222 else: 223 port = 1 224 status['port'] = port 225 state = self.get_pd_state(port) 226 logging.info('CHECK PORT 1: %s', state) 227 # Check port 1 228 if self.is_pd_connected(port): 229 status['connect'] = True 230 status['role'] = state 231 232 return status 233 234 def swap_power_role(self, port): 235 """Attempt a power role swap 236 237 This method attempts to execute a power role swap. A check 238 is made to ensure that dualrole mode is enabled and that 239 a PD contract is currently established. If both checks pass, 240 then the power role swap command is issued. After a delay, 241 if a PD contract is established and the current state does 242 not equal the starting state, then it was successful. 243 244 @param port: pd port number 245 246 @returns: True if power swap is successful, False otherwise. 247 """ 248 # Get starting state 249 if self.is_pd_dual_role_enabled(port) == False: 250 logging.info('Dualrole Mode not enabled!') 251 return False 252 if self.is_pd_connected(port) == False: 253 logging.info('PD contract not established!') 254 return False 255 current_pr = self.get_pd_state(port) 256 swap_cmd = 'pd %d swap power' % port 257 self.send_pd_command(swap_cmd) 258 time.sleep(self.CONNECT_TIME) 259 new_pr = self.get_pd_state(port) 260 logging.info('Power swap: %s -> %s', current_pr, new_pr) 261 if self.is_pd_connected(port) == False: 262 return False 263 return bool(current_pr != new_pr) 264 265 def disable_pd_console_debug(self): 266 """Turn off PD console debug 267 268 """ 269 cmd = 'pd dump 0' 270 self.send_pd_command(cmd) 271 272 def enable_pd_console_debug(self): 273 """Enable PD console debug level 1 274 275 """ 276 cmd = 'pd dump 2' 277 self.send_pd_command(cmd) 278 279 def is_pd_flag_set(self, port, key): 280 """Test a bit in PD protocol state flags 281 282 The flag word contains various PD protocol state information. 283 This method allows for a specific flag to be tested. 284 285 @param port: Port which has the active PD connection 286 @param key: dict key to retrieve the flag bit mapping 287 288 @returns True if the bit to be tested is set 289 """ 290 pd_flags = self.get_pd_flags(port) 291 return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16)) 292 293 def is_pd_connected(self, port): 294 """Check if a PD connection is active 295 296 @param port: port to be used for pd console commands 297 298 @returns True if port is in connected state 299 """ 300 return self.is_src_connected(port) or self.is_snk_connected(port) 301 302 def is_pd_dual_role_enabled(self, port): 303 """Check if a PD device is in dualrole mode 304 305 @param port: Type C PD port 0/1 306 307 @returns True is dualrole mode is active, false otherwise 308 """ 309 drp = self.get_pd_dualrole(port) 310 return drp == 'on' 311 312 def is_src_connected(self, port, state=None): 313 """Checks if the port is connected as a source 314 315 @param port: Type C PD port 0/1 316 @param state: the state to check (None to get current state) 317 318 @returns True if connected as SRC, False otherwise 319 """ 320 if state is None: 321 state = self.get_pd_state(port) 322 return state in self.get_src_connect_states() 323 324 def is_snk_connected(self, port, state=None): 325 """Checks if the port is connected as a sink 326 327 @param port: Type C PD port 0/1 328 @param state: the state to check (None to get current state) 329 330 @returns True if connected as SNK, False otherwise 331 """ 332 if state is None: 333 state = self.get_pd_state(port) 334 return state in self.get_snk_connect_states() 335 336 def is_disconnected(self, port, state=None): 337 """Checks if the port is disconnected 338 339 @param port: Type C PD port 0/1 340 @param state: the state to check (None to get current state) 341 342 @return True if disconnected 343 """ 344 if state is None: 345 state = self.get_pd_state(port) 346 return state in self.get_disconnected_states() 347 348 def get_src_connect_states(self): 349 """Returns the name of the SRC state 350 351 """ 352 raise NotImplementedError( 353 'should be implemented in derived class') 354 355 def get_snk_connect_states(self): 356 """Returns the name of the SNK state 357 358 """ 359 raise NotImplementedError( 360 'should be implemented in derived class') 361 362 def get_disconnected_states(self): 363 """Returns the names of the disconnected states 364 365 """ 366 return self.DISCONNECTED_STATES 367 368 def is_snk_discovery_state(self, port): 369 """Returns true if in snk discovery state, else false 370 371 @param port: Type C PD port 0/1 372 373 @return: True if in SNK Discovery state 374 """ 375 raise NotImplementedError( 376 'should be implemented in derived class') 377 378class TCPMv1ConsoleUtils(PDConsoleUtils): 379 """ Provides a set of methods common to USB PD TCPMv1 FAFT tests 380 381 Each instance of this class is associated with a particular 382 servo UART console. USB PD tests will typically use the console 383 command 'pd' and its subcommands to control/monitor Type C PD 384 connections. The servo object used for UART operations is 385 passed in and stored when this object is created. 386 387 """ 388 SRC_CONNECT = ('SRC_READY',) 389 SNK_CONNECT = ('SNK_READY',) 390 SRC_DISC = 'SRC_DISCONNECTED' 391 SNK_DISC = 'SNK_DISCONNECTED' 392 SNK_DISCOVERY = 'SNK_DISCOVERY' 393 DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE' 394 DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE) 395 396 PD_MAX_PORTS = 2 397 CONNECT_TIME = 4 398 399 CURRENT_STATE_PROBE_DELAY = 2 400 DUALROLE_QUERY_DELAY = 1 401 # Dualrole input/output values of methods in this class. 402 DUALROLE_VALUES = ['on', 'off', 'snk', 'src'] 403 # Strings passing to the console command "pd dualrole" 404 DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source'] 405 # Strings returned from the console command "pd dualrole" 406 DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source'] 407 408 # Some old firmware uses a single dualrole setting for all ports; while 409 # some new firmware uses a per port dualrole setting. This flag will be 410 # initialized to True or False. 411 # TODO: Remove this flag when the old setting phases out 412 per_port_dualrole_setting = None 413 414 # Dictionary for 'pd 0/1 state' parsing 415 PD_STATE_DICT = { 416 'port': 'Port\s+([\w]+)', 417 'role': 'Role:\s+([\w]+-[\w]+)', 418 'pd_state': 'State:\s+([\w()]+)', 419 'flags': 'Flags:\s+([\w]+)', 420 'polarity': '(CC\d)' 421 } 422 423 # Regex to match PD state name; work for both old and new formats 424 RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?" 425 # Copied from ec repo: common/usb_pd_protocol.c 426 PD_STATE_NAMES = [ 427 "DISABLED", # index: 0 428 "SUSPENDED", 429 "SNK_DISCONNECTED", 430 "SNK_DISCONNECTED_DEBOUNCE", 431 "SNK_HARD_RESET_RECOVER", 432 "SNK_DISCOVERY", # index: 5 433 "SNK_REQUESTED", 434 "SNK_TRANSITION", 435 "SNK_READY", 436 "SNK_SWAP_INIT", 437 "SNK_SWAP_SNK_DISABLE", # index: 10 438 "SNK_SWAP_SRC_DISABLE", 439 "SNK_SWAP_STANDBY", 440 "SNK_SWAP_COMPLETE", 441 "SRC_DISCONNECTED", 442 "SRC_DISCONNECTED_DEBOUNCE", # index: 15 443 "SRC_HARD_RESET_RECOVER", 444 "SRC_STARTUP", 445 "SRC_DISCOVERY", 446 "SRC_NEGOCIATE", 447 "SRC_ACCEPTED", # index: 20 448 "SRC_POWERED", 449 "SRC_TRANSITION", 450 "SRC_READY", 451 "SRC_GET_SNK_CAP", 452 "DR_SWAP", # index: 25 453 "SRC_SWAP_INIT", 454 "SRC_SWAP_SNK_DISABLE", 455 "SRC_SWAP_SRC_DISABLE", 456 "SRC_SWAP_STANDBY", 457 "VCONN_SWAP_SEND", # index: 30 458 "VCONN_SWAP_INIT", 459 "VCONN_SWAP_READY", 460 "SOFT_RESET", 461 "HARD_RESET_SEND", 462 "HARD_RESET_EXECUTE", # index: 35 463 "BIST_RX", 464 "BIST_TX", 465 "DRP_AUTO_TOGGLE", 466 ] 467 468 # Dictionary for PD control message types 469 PD_CONTROL_MSG_MASK = 0x1f 470 PD_CONTROL_MSG_DICT = { 471 'GoodCRC': 1, 472 'GotoMin': 2, 473 'Accept': 3, 474 'Reject': 4, 475 'Ping': 5, 476 'PS_RDY': 6, 477 'Get_Source_Cap': 7, 478 'Get_Sink_Cap': 8, 479 'DR_Swap': 9, 480 'PR_Swap': 10, 481 'VCONN_Swap': 11, 482 'Wait': 12, 483 'Soft_Reset': 13 484 } 485 486 # Dictionary for PD firmware state flags 487 PD_STATE_FLAGS_DICT = { 488 'power_swap': 1 << 1, 489 'data_swap': 1 << 2, 490 'data_swap_active': 1 << 3, 491 'vconn_on': 1 << 12 492 } 493 494 def _normalize_pd_state(self, state): 495 """Normalize the PD state name which handles both old and new formats. 496 497 The old format is like: "SNK_READY" 498 The new format is like: "8()" if debug_level == 0, or 499 "8(SNK_READY)" if debug_level > 0 500 501 This method will convert the new format to the old one. 502 503 @param state: The raw PD state text 504 505 @returns: The normalized PD state name 506 @raises: TestFail if unexpected PD state format 507 """ 508 m = re.match(self.RE_PD_STATE, state) 509 if m and any(m.groups()): 510 state_index, state_name = m.groups() 511 if state_index is None: 512 # The old format: return the name 513 return state_name 514 # The new format: map the index to a name 515 mapped_name = self.PD_STATE_NAMES[int(state_index)] 516 if state_name is not None: 517 assert mapped_name == state_name 518 return mapped_name 519 else: 520 raise error.TestFail('Unexpected PD state format: %s' % state) 521 522 def get_pd_state(self, port): 523 """Get the current PD state 524 525 @param port: Type C PD port 0/1 526 @returns: current pd state 527 """ 528 529 pd_dict = self.execute_pd_state_cmd(port) 530 return self._normalize_pd_state(pd_dict['pd_state']) 531 532 def set_pd_dualrole(self, port, value): 533 """Set pd dualrole 534 535 It can be set to either: 536 1. on 537 2. off 538 3. snk (force sink mode) 539 4. src (force source mode) 540 After setting, the current value is read to confirm that it 541 was set properly. 542 543 @param port: Type C PD port 0/1 544 @param value: One of the 4 options listed 545 """ 546 dualrole_values = self.DUALROLE_VALUES 547 # If the dualrole setting is not initialized, call the get method to 548 # initialize it. 549 if self.per_port_dualrole_setting is None: 550 self.get_pd_dualrole(port) 551 552 # Get string required for console command 553 dual_index = dualrole_values.index(value) 554 # Create console command 555 if self.per_port_dualrole_setting is True: 556 cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index]) 557 elif self.per_port_dualrole_setting is False: 558 cmd = 'pd dualrole %s' % (self.DUALROLE_CMD_ARGS[dual_index]) 559 else: 560 raise error.TestFail("dualrole error") 561 562 self.console.send_command(cmd) 563 time.sleep(self.DUALROLE_QUERY_DELAY) 564 # Get current setting to verify that command was successful 565 dual = self.get_pd_dualrole(port) 566 # If it doesn't match, then raise error 567 if dual != value: 568 raise error.TestFail("dualrole error: " + value + " != " + dual) 569 570 def get_src_connect_states(self): 571 """Returns the name of the SRC state 572 573 """ 574 return self.SRC_CONNECT 575 576 def get_snk_connect_states(self): 577 """Returns the name of the SRC state 578 579 """ 580 return self.SNK_CONNECT 581 582 def is_snk_discovery_state(self, port): 583 """Returns true if in snk discovery state, else false 584 585 @param port: Type C PD port 0/1 586 587 @return: True if in SNK Discovery state 588 """ 589 state = self.get_pd_state(port) 590 return state == self.SNK_DISCOVERY 591 592class TCPMv2ConsoleUtils(PDConsoleUtils): 593 """ Provides a set of methods common to USB PD TCPMv1 FAFT tests 594 595 Each instance of this class is associated with a particular 596 servo UART console. USB PD tests will typically use the console 597 command 'pd' and its subcommands to control/monitor Type C PD 598 connections. The servo object used for UART operations is 599 passed in and stored when this object is created. 600 601 """ 602 SRC_CONNECT = ('Attached.SRC', 'UnorientedDebugAccessory.SRC') 603 SNK_CONNECT = ('Attached.SNK', 'DebugAccessory.SNK') 604 SRC_DISC = 'Unattached.SRC' 605 SNK_DISC = 'Unattached.SNK' 606 DRP_AUTO_TOGGLE = 'DRPAutoToggle' 607 LOW_POWER_MODE = 'LowPowerMode' 608 SNK_DISCOVERY = 'PE_SNK_DISCOVERY' 609 DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE, LOW_POWER_MODE) 610 611 PD_MAX_PORTS = 2 612 CONNECT_TIME = 4 613 614 CURRENT_STATE_PROBE_DELAY = 2 615 DUALROLE_QUERY_DELAY = 1 616 # Dualrole input/output values of methods in this class. 617 DUALROLE_VALUES = ['on', 'off', 'sink', 'source'] 618 # Strings passing to the console command "pd dualrole" 619 DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source'] 620 # Strings returned from the console command "pd dualrole" 621 DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source'] 622 623 # Some old firmware uses a single dualrole setting for all ports; while 624 # some new firmware uses a per port dualrole setting. This flag will be 625 # initialized to True or False. 626 # TODO: Remove this flag when the old setting phases out 627 per_port_dualrole_setting = None 628 629 # Dictionary for 'pd 0/1 state' parsing 630 PD_STATE_DICT = { 631 'port': 'Port\s+([\w]+)', 632 'role': 'Role:\s+([\w]+-[\w]+)', 633 'pd_state': 'TC State:\s+([\w().]+)', 634 'flags': 'Flags:\s+([\w]+)', 635 'pe_state': 'PE State:\s+(\w*)', 636 'polarity': '(CC\d)' 637 } 638 639 # Regex to match PD state name; work for both old and new formats 640 RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?" 641 642 # Dictionary for PD control message types 643 PD_CONTROL_MSG_MASK = 0x1f 644 PD_CONTROL_MSG_DICT = { 645 'GoodCRC': 1, 646 'GotoMin': 2, 647 'Accept': 3, 648 'Reject': 4, 649 'Ping': 5, 650 'PS_RDY': 6, 651 'Get_Source_Cap': 7, 652 'Get_Sink_Cap': 8, 653 'DR_Swap': 9, 654 'PR_Swap': 10, 655 'VCONN_Swap': 11, 656 'Wait': 12, 657 'Soft_Reset': 13 658 } 659 660 # Dictionary for PD firmware state flags 661 PD_STATE_FLAGS_DICT = { 662 'power_swap': 1 << 1, 663 'data_swap': 1 << 2, 664 'data_swap_active': 1 << 3, 665 'vconn_on': 1 << 12 666 } 667 668 def get_pe_state(self, port): 669 """Get the current Policy Engine state 670 671 @param port: Type C PD port 0/1 672 @returns: current pe state 673 """ 674 675 pd_dict = self.execute_pd_state_cmd(port) 676 return pd_dict['pe_state'] 677 678 def get_pd_state(self, port): 679 """Get the current PD state 680 681 @param port: Type C PD port 0/1 682 @returns: current pd state 683 """ 684 685 pd_dict = self.execute_pd_state_cmd(port) 686 return pd_dict['pd_state'] 687 688 def set_pd_dualrole(self, port, value): 689 """Set pd dualrole 690 691 It can be set to either: 692 1. on 693 2. off 694 3. snk (force sink mode) 695 4. src (force source mode) 696 After setting, the current value is read to confirm that it 697 was set properly. 698 699 @param port: Type C PD port 0/1 700 @param value: One of the 4 options listed 701 """ 702 dualrole_values = self.DUALROLE_VALUES 703 704 if value == 'src': 705 value = 'source' 706 elif value == 'snk': 707 value = 'sink' 708 709 # Get string required for console command 710 dual_index = dualrole_values.index(value) 711 # Create console command 712 cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index]) 713 self.console.send_command(cmd) 714 time.sleep(self.DUALROLE_QUERY_DELAY) 715 # Get current setting to verify that command was successful 716 dual = self.get_pd_dualrole(port) 717 # If it doesn't match, then raise error 718 if dual != value: 719 raise error.TestFail("dualrole error: " + value + " != " + dual) 720 721 def get_src_connect_states(self): 722 """Returns the name of the SRC states 723 724 @returns: List of connected source state names 725 """ 726 return self.SRC_CONNECT 727 728 def get_snk_connect_states(self): 729 """Returns the name of the SRC states 730 731 @returns: List of connected sink state names 732 """ 733 return self.SNK_CONNECT 734 735 def is_snk_discovery_state(self, port): 736 """Returns true if in snk discovery state, else false 737 738 @param port: Type C PD port 0/1 739 740 @return: True if in SNK Discovery state 741 """ 742 state = self.get_pe_state(port) 743 return state == self.SNK_DISCOVERY 744 745class PDConnectionUtils(PDConsoleUtils): 746 """Provides a set of methods common to USB PD FAFT tests 747 748 This class is used for PD utility methods that require access 749 to both PDTester and DUT PD consoles. 750 751 """ 752 753 def __init__(self, dut_console, pdtester_console): 754 """ 755 @param dut_console: PD console object for DUT 756 @param pdtester_console: PD console object for PDTester 757 """ 758 # save console for DUT PD UART access functions 759 self.dut_console = dut_console 760 # save console for PDTester UART access functions 761 self.pdtester_console = pdtester_console 762 763 def _verify_pdtester_connection(self, port): 764 """Verify DUT to PDTester PD connection 765 766 This method checks for a PDTester PD connection for the 767 given port by first verifying if a PD connection is present. 768 If found, then it uses a PDTester feature to force a PD disconnect. 769 If the port is no longer in the connected state, and following 770 a delay, is found to be back in the connected state, then 771 a DUT pd to PDTester connection is verified. 772 773 @param port: DUT pd port to test 774 775 @returns True if DUT to PDTester pd connection is verified 776 """ 777 DISCONNECT_CHECK_TIME = 2 778 DISCONNECT_TIME_SEC = 10 779 # pdtester console command to force PD disconnect 780 disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000) 781 # Only check for PDTester if DUT has active PD connection 782 if self.dut_console.is_pd_connected(port): 783 # Attempt to force PD disconnection 784 self.pdtester_console.send_pd_command(disc_cmd) 785 time.sleep(DISCONNECT_CHECK_TIME) 786 # Verify that DUT PD port is no longer connected 787 if self.dut_console.is_pd_connected(port) == False: 788 # Wait for disconnect timer and give time to reconnect 789 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 790 if self.dut_console.is_pd_connected(port): 791 logging.info('PDTester connection verified on port %d', 792 port) 793 return True 794 else: 795 # Could have disconnected other port, allow it to reconnect 796 # before exiting. 797 time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC) 798 return False 799 800 def find_dut_to_pdtester_connection(self): 801 """Find the PD port which is connected to PDTester 802 803 @returns DUT pd port number if found, None otherwise 804 """ 805 for port in range(self.dut_console.PD_MAX_PORTS): 806 # Check for DUT to PDTester connection on port 807 if self._verify_pdtester_connection(port): 808 # PDTester PD connection found so exit 809 return port 810 return None 811 812def create_pd_console_utils(console): 813 """Factory that detects the proper PDConsole Utils to use for DUT 814 815 @param console: DUT PD console 816 817 @returns: An instance of TCPMv1ConsoleUtils or TCPMv2ConsoleUtils 818 """ 819 pd_console_utils = { 820 1: TCPMv1ConsoleUtils, 821 2: TCPMv2ConsoleUtils, 822 } 823 824 version = PDConsoleUtils(console).get_pd_version() 825 logging.debug('%s is TCPM v%s', console, version) 826 cls = pd_console_utils[version] 827 return cls(console) 828