1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import re 11import logging 12from six.moves import range 13import time 14 15from autotest_lib.client.common_lib import error 16from autotest_lib.server.cros.servo import pd_console 17 18 19class PDDevice(object): 20 """Base clase for all PD devices 21 22 This class provides a set of APIs for expected Type C PD required actions 23 in TypeC FAFT tests. The base class is specific for Type C devices that 24 do not have any console access. 25 26 """ 27 28 def is_src(self, state=None): 29 """Checks if the port is connected as a source 30 31 """ 32 raise NotImplementedError( 33 'is_src should be implemented in derived class') 34 35 def is_snk(self, state=None): 36 """Checks if the port is connected as a sink 37 38 @returns None 39 """ 40 raise NotImplementedError( 41 'is_snk should be implemented in derived class') 42 43 def is_connected(self, state=None): 44 """Checks if the port is connected 45 46 @returns True if in a connected state, False otherwise 47 """ 48 return self.is_src(state) or self.is_snk(state) 49 50 def is_disconnected(self, state=None): 51 """Checks if the port is disconnected 52 53 """ 54 raise NotImplementedError( 55 'is_disconnected should be implemented in derived class') 56 57 def is_ufp(self): 58 """Checks if data role is UFP 59 60 """ 61 raise NotImplementedError( 62 'is_ufp should be implemented in derived class') 63 64 def is_dfp(self): 65 """Checks if data role is DFP 66 67 """ 68 raise NotImplementedError( 69 'is_dfp should be implemented in derived class') 70 71 def is_drp(self): 72 """Checks if dual role mode is supported 73 74 """ 75 raise NotImplementedError( 76 'is_drp should be implemented in derived class') 77 78 def dr_swap(self): 79 """Attempts a data role swap 80 81 """ 82 raise NotImplementedError( 83 'dr_swap should be implemented in derived class') 84 85 def pr_swap(self): 86 """Attempts a power role swap 87 88 """ 89 raise NotImplementedError( 90 'pr_swap should be implemented in derived class') 91 92 def vbus_request(self, voltage): 93 """Requests a specific VBUS voltage from SRC 94 95 @param voltage: requested voltage level (5, 12, 20) in volts 96 """ 97 raise NotImplementedError( 98 'vbus_request should be implemented in derived class') 99 100 def soft_reset(self): 101 """Initates a PD soft reset sequence 102 103 """ 104 raise NotImplementedError( 105 'soft_reset should be implemented in derived class') 106 107 def hard_reset(self): 108 """Initates a PD hard reset sequence 109 110 """ 111 raise NotImplementedError( 112 'hard_reset should be implemented in derived class') 113 114 def drp_set(self, mode): 115 """Sets dualrole mode 116 117 @param mode: desired dual role setting (on, off, snk, src) 118 """ 119 raise NotImplementedError( 120 'drp_set should be implemented in derived class') 121 122 def drp_get(self): 123 """Gets dualrole mode 124 125 @returns one of the modes (on, off, snk, src) 126 """ 127 raise NotImplementedError( 128 'drp_set should be implemented in derived class') 129 130 def drp_disconnect_connect(self, disc_time_sec): 131 """Force PD disconnect/connect via drp settings 132 133 @param disc_time_sec: Time in seconds between disconnect and reconnect 134 """ 135 raise NotImplementedError( 136 'drp_disconnect_connect should be implemented in derived class' 137 ) 138 139 def cc_disconnect_connect(self, disc_time_sec): 140 """Force PD disconnect/connect 141 142 @param disc_time_sec: Time in seconds between disconnect and reconnect 143 """ 144 raise NotImplementedError( 145 'cc_disconnect_connect should be implemented in derived class') 146 147 def get_connected_state_after_cc_reconnect(self, disc_time_sec): 148 """Get the connected state after disconnect/reconnect 149 150 @param disc_time_sec: Time in seconds for disconnect period. 151 @returns: The connected PD state. 152 """ 153 raise NotImplementedError( 154 'get_connected_state_after_cc_reconnect should be implemented' 155 'in derived class') 156 157 158class PDConsoleDevice(PDDevice): 159 """Class for PD devices that have console access 160 161 This class contains methods for common PD actions for any PD device which 162 has UART console access. It inherits the PD device base class. In addition, 163 it stores both the UART console and port for the PD device. 164 """ 165 166 def __init__(self, console, port, utils): 167 """Initialization method 168 169 @param console: UART console object 170 @param port: USB PD port number 171 """ 172 # Save UART console 173 self.console = console 174 # Instantiate PD utilities used by methods in this class 175 self.utils = utils 176 # Save the PD port number for this device 177 self.port = port 178 # Not a PDTester device 179 self.is_pdtester = False 180 181 def get_pd_state(self): 182 """Get the state of the PD port""" 183 return self.utils.get_pd_state(self.port) 184 185 def get_pd_role(self): 186 """Get the current PD power role (source or sink) 187 188 @returns: current pd state 189 """ 190 return self.utils.get_pd_role(self.port) 191 192 def is_pd_flag_set(self, key): 193 """Test a bit in PD protocol state flags 194 195 The flag word contains various PD protocol state information. 196 This method allows for a specific flag to be tested. 197 198 @param key: dict key to retrieve the flag bit mapping 199 200 @returns True if the bit to be tested is set 201 """ 202 return self.utils.is_pd_flag_set(self.port, key) 203 204 def is_src(self, state=None): 205 """Checks if the port is connected as a source. 206 207 The "state" argument allows the caller to get_pd_state() once, and then 208 evaluate multiple conditions without re-getting the state. 209 210 @param state: the state to check (None to get current state) 211 @returns True if connected as SRC, False otherwise 212 """ 213 return self.utils.is_src_connected(self.port, state) 214 215 def is_snk(self, state=None): 216 """Checks if the port is connected as a sink 217 218 The "state" argument allows the caller to get_pd_state() once, and then 219 evaluate multiple conditions without re-getting the state. 220 221 @param state: the state to check (None to get current state) 222 @returns True if connected as SNK, False otherwise 223 """ 224 return self.utils.is_snk_connected(self.port, state) 225 226 def is_connected(self, state=None): 227 """Checks if the port is connected 228 229 The "state" argument allows the caller to get_pd_state() once, and then 230 evaluate multiple conditions without re-getting the state. 231 232 @param state: the state to check (None to get current state) 233 @returns True if in a connected state, False otherwise 234 """ 235 return self.is_snk(state) or self.is_src(state) 236 237 def is_disconnected(self, state=None): 238 """Checks if the port is disconnected 239 240 @returns True if in a disconnected state, False otherwise 241 """ 242 return self.utils.is_disconnected(self.port, state) 243 244 def __repr__(self): 245 """String representation of the object""" 246 return "<%s %r port %s>" % ( 247 self.__class__.__name__, self.console.name, self.port) 248 249 def is_drp(self): 250 """Checks if dual role mode is supported 251 252 @returns True if dual role mode is 'on', False otherwise 253 """ 254 return self.utils.is_pd_dual_role_enabled(self.port) 255 256 def drp_disconnect_connect(self, disc_time_sec): 257 """Disconnect/reconnect using drp mode settings 258 259 A PD console device doesn't have an explicit connect/disconnect 260 command. Instead, the dualrole mode setting is used to force 261 disconnects in devices which support this feature. To disconnect, 262 force the dualrole mode to be the opposite role of the current 263 connected state. 264 265 @param disc_time_sec: time in seconds to wait to reconnect 266 267 @returns True if device disconnects, then returns to a connected 268 state. False if either step fails. 269 """ 270 # Dualrole mode must be supported 271 if self.is_drp() is False: 272 logging.warning('Device not DRP capable, unabled to force disconnect') 273 return False 274 # Force state will be the opposite of current connect state 275 if self.is_src(): 276 drp_mode = 'snk' 277 swap_state = self.utils.get_snk_connect_states() 278 else: 279 drp_mode = 'src' 280 swap_state = self.utils.get_src_connect_states() 281 # Force disconnect 282 self.drp_set(drp_mode) 283 # Wait for disconnect time 284 time.sleep(disc_time_sec) 285 # Verify that the device is disconnected 286 disconnect = self.is_disconnected() 287 288 # If the other device is dualrole, then forcing dualrole mode will 289 # only cause the disconnect to appear momentarily and reconnect 290 # in the power role forced by the drp_set() call. For this case, 291 # the role swap verifies that a disconnect/connect sequence occurred. 292 if disconnect == False: 293 time.sleep(self.utils.CONNECT_TIME) 294 # Connected, verify if power role swap has occurred 295 if self.utils.get_pd_state(self.port) in swap_state: 296 # Restore default dualrole mode 297 self.drp_set('on') 298 # Restore orignal power role 299 connect = self.pr_swap() 300 if connect == False: 301 logging.warning('DRP on both devices, 2nd power swap failed') 302 return connect 303 304 # Restore default dualrole mode 305 self.drp_set('on') 306 # Allow enough time for protocol state machine 307 time.sleep(self.utils.CONNECT_TIME) 308 # Check if connected 309 connect = self.is_connected() 310 logging.info('Disconnect = %r, Connect = %r', disconnect, connect) 311 return bool(disconnect and connect) 312 313 def drp_set(self, mode): 314 """Sets dualrole mode 315 316 @param mode: desired dual role setting (on, off, snk, src) 317 318 @returns True is set was successful, False otherwise 319 """ 320 # Set desired dualrole mode 321 self.utils.set_pd_dualrole(self.port, mode) 322 # Get current setting 323 current = self.utils.get_pd_dualrole(self.port) 324 # Verify that setting is correct 325 return bool(mode == current) 326 327 def drp_get(self): 328 """Gets dualrole mode 329 330 @returns one of the modes (on, off, snk, src) 331 """ 332 return self.utils.get_pd_dualrole(self.port) 333 334 def try_src(self, enable): 335 """Enables/Disables Try.SRC PD protocol setting 336 337 @param enable: True to enable, False to disable 338 339 @returns True is setting was successful, False if feature not 340 supported by the device, or not set as desired. 341 """ 342 # Create Try.SRC pd command 343 cmd = 'pd trysrc %d' % int(enable) 344 # TCPMv1 indicates Try.SRC is on by returning 'on' 345 # TCPMv2 indicates Try.SRC is on by returning 'Forced ON' 346 on_vals = ('on', 'Forced ON') 347 # TCPMv1 indicates Try.SRC is off by returning 'off' 348 # TCPMv2 indicates Try.SRC is off by returning 'Forced OFF' 349 off_vals = ('off', 'Forced OFF') 350 351 # Try.SRC on/off is output, if supported feature 352 regex = ['Try\.SRC\s(%s)|(Parameter)' % ('|'.join(on_vals + off_vals))] 353 m = self.utils.send_pd_command_get_output(cmd, regex) 354 355 # Determine if Try.SRC feature is supported 356 if 'Try.SRC' not in m[0][0]: 357 logging.warning('Try.SRC not supported on this PD device') 358 return False 359 360 # TrySRC is supported on this PD device, verify setting. 361 trysrc_val = m[0][1] 362 logging.info('Try.SRC mode = %s', trysrc_val) 363 if enable: 364 vals = on_vals 365 else: 366 vals = off_vals 367 368 return trysrc_val in vals 369 370 def soft_reset(self): 371 """Initates a PD soft reset sequence 372 373 To verify that a soft reset sequence was initiated, the 374 reply message is checked to verify that the reset command 375 was acknowledged by its port pair. The connect state should 376 be same as it was prior to issuing the reset command. 377 378 @returns True if the port pair acknowledges the the reset message 379 and if following the command, the device returns to the same 380 connected state. False otherwise. 381 """ 382 RESET_DELAY = 0.5 383 cmd = 'pd %d soft' % self.port 384 state_before = self.utils.get_pd_state(self.port) 385 reply = self.utils.send_pd_command_get_reply_msg(cmd) 386 if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']: 387 return False 388 time.sleep(RESET_DELAY) 389 state_after = self.utils.get_pd_state(self.port) 390 return state_before == state_after 391 392 def hard_reset(self): 393 """Initates a PD hard reset sequence 394 395 To verify that a hard reset sequence was initiated, the 396 console ouput is scanned for HARD RST TX. In addition, the connect 397 state should be same as it was prior to issuing the reset command. 398 399 @returns True if the port pair acknowledges that hard reset was 400 initiated and if following the command, the device returns to the same 401 connected state. False otherwise. 402 """ 403 RESET_DELAY = 1.0 404 cmd = 'pd %d hard' % self.port 405 state_before = self.utils.get_pd_state(self.port) 406 self.utils.enable_pd_console_debug() 407 try: 408 tcpmv1_pattern = '.*(HARD\sRST\sTX)' 409 tcpmv2_pattern = '.*(PE_SNK_Hard_Reset)|.*(PE_SRC_Hard_Reset)' 410 pattern = '|'.join((tcpmv1_pattern, tcpmv2_pattern)) 411 self.utils.send_pd_command_get_output(cmd, [pattern]) 412 except error.TestFail: 413 logging.warning('HARD RST TX not found') 414 return False 415 finally: 416 self.utils.disable_pd_console_debug() 417 418 time.sleep(RESET_DELAY) 419 state_after = self.utils.get_pd_state(self.port) 420 return state_before == state_after 421 422 def pr_swap(self): 423 """Attempts a power role swap 424 425 In order to attempt a power role swap the device must be 426 connected and support dualrole mode. Once these two criteria 427 are checked a power role command is issued. Following a delay 428 to allow for a reconnection the new power role is checked 429 against the power role prior to issuing the command. 430 431 @returns True if the device has swapped power roles, False otherwise. 432 """ 433 # Get starting state 434 if not self.is_drp(): 435 logging.warning('Dualrole Mode not enabled!') 436 return False 437 if self.is_connected() == False: 438 logging.warning('PD contract not established!') 439 return False 440 current_pr = self.utils.get_pd_state(self.port) 441 swap_cmd = 'pd %d swap power' % self.port 442 self.utils.send_pd_command(swap_cmd) 443 time.sleep(self.utils.CONNECT_TIME) 444 new_pr = self.utils.get_pd_state(self.port) 445 logging.info('Power swap: %s -> %s', current_pr, new_pr) 446 if self.is_connected() == False: 447 logging.warning('Device not connected following PR swap attempt.') 448 return False 449 return current_pr != new_pr 450 451 452class PDTesterDevice(PDConsoleDevice): 453 """Class for PDTester devices 454 455 This class contains methods for PD funtions which are unique to the 456 PDTester board, e.g. Plankton or Servo v4. It inherits all the methods 457 for PD console devices. 458 """ 459 460 def __init__(self, console, port, utils): 461 """Initialization method 462 463 @param console: UART console for this device 464 @param port: USB PD port number 465 """ 466 # Instantiate the PD console object 467 super(PDTesterDevice, self).__init__(console, port, utils) 468 # Indicate this is PDTester device 469 self.is_pdtester = True 470 471 def _toggle_pdtester_drp(self): 472 """Issue 'usbc_action drp' PDTester command 473 474 @returns value of drp_enable in PDTester FW 475 """ 476 drp_cmd = 'usbc_action drp' 477 drp_re = ['DRP\s=\s(\d)'] 478 # Send DRP toggle command to PDTester and get value of 'drp_enable' 479 m = self.utils.send_pd_command_get_output(drp_cmd, drp_re) 480 return int(m[0][1]) 481 482 def _enable_pdtester_drp(self): 483 """Enable DRP mode on PDTester 484 485 DRP mode can only be toggled and is not able to be explicitly 486 enabled/disabled via the console. Therefore, this method will 487 toggle DRP mode until the console reply indicates that this 488 mode is enabled. The toggle happens a maximum of two times 489 in case this is called when it's already enabled. 490 491 @returns True when DRP mode is enabled, False if not successful 492 """ 493 for attempt in range(2): 494 if self._toggle_pdtester_drp() == True: 495 logging.info('PDTester DRP mode enabled') 496 return True 497 logging.error('PDTester DRP mode set failure') 498 return False 499 500 def _verify_state_sequence(self, states_list, console_log): 501 """Compare PD state transitions to expected values 502 503 @param states_list: list of expected PD state transitions 504 @param console_log: console output which contains state names 505 506 @returns True if the sequence matches, False otherwise 507 """ 508 # For each state in the expected state transiton table, build 509 # the regexp and search for it in the state transition log. 510 for state in states_list: 511 state_regx = r'C{0}\s+[\w]+:?\s({1})'.format(self.port, 512 state) 513 if re.search(state_regx, console_log) is None: 514 return False 515 return True 516 517 def cc_disconnect_connect(self, disc_time_sec): 518 """Disconnect/reconnect using PDTester 519 520 PDTester supports a feature which simulates a USB Type C disconnect 521 and reconnect. 522 523 @param disc_time_sec: Time in seconds for disconnect period. 524 """ 525 DISC_DELAY = 100 526 disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, 527 disc_time_sec * 1000) 528 self.utils.send_pd_command(disc_cmd) 529 530 def get_connected_state_after_cc_reconnect(self, disc_time_sec): 531 """Get the connected state after disconnect/reconnect using PDTester 532 533 PDTester supports a feature which simulates a USB Type C disconnect 534 and reconnect. It returns the first connected state (either source or 535 sink) after reconnect. 536 537 @param disc_time_sec: Time in seconds for disconnect period. 538 @returns: The connected PD state. 539 """ 540 DISC_DELAY = 100 541 disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, disc_time_sec * 1000) 542 state_exp = '(C%d)\s+[\w]+:?\s(%s)' 543 544 disconnected_tuple = self.utils.get_disconnected_states() 545 disconnected_states = '|'.join(disconnected_tuple) 546 disconnected_exp = state_exp % (self.port, disconnected_states) 547 548 src_connected_tuple = self.utils.get_src_connect_states() 549 snk_connected_tuple = self.utils.get_snk_connect_states() 550 connected_states = '|'.join(src_connected_tuple + snk_connected_tuple) 551 connected_exp = state_exp % (self.port, connected_states) 552 553 m = self.utils.send_pd_command_get_output(disc_cmd, [disconnected_exp, 554 connected_exp]) 555 return m[1][2] 556 557 def drp_disconnect_connect(self, disc_time_sec): 558 """Disconnect/reconnect using PDTester 559 560 Utilize PDTester disconnect/connect utility and verify 561 that both disconnect and reconnect actions were successful. 562 563 @param disc_time_sec: Time in seconds for disconnect period. 564 565 @returns True if device disconnects, then returns to a connected 566 state. False if either step fails. 567 """ 568 self.cc_disconnect_connect(disc_time_sec) 569 time.sleep(disc_time_sec / 2) 570 disconnect = self.is_disconnected() 571 time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME) 572 connect = self.is_connected() 573 return disconnect and connect 574 575 def drp_set(self, mode): 576 """Sets dualrole mode 577 578 @param mode: desired dual role setting (on, off, snk, src) 579 580 @returns True if dualrole mode matches the requested value or 581 is successfully set to that value. False, otherwise. 582 """ 583 # Get current value of dualrole 584 drp = self.utils.get_pd_dualrole(self.port) 585 if drp == mode: 586 return True 587 588 if mode == 'on': 589 # Setting dpr_enable on PDTester will set dualrole mode to on 590 return self._enable_pdtester_drp() 591 else: 592 # If desired setting is other than 'on', need to ensure that 593 # drp mode on PDTester is disabled. 594 if drp == 'on': 595 # This will turn off drp_enable flag and set dualmode to 'off' 596 return self._toggle_pdtester_drp() 597 # With drp_enable flag off, can set to desired setting 598 return self.utils.set_pd_dualrole(self.port, mode) 599 600 def _reset(self, cmd, states_list): 601 """Initates a PD reset sequence 602 603 PDTester device has state names available on the console. When 604 a soft reset is issued the console log is extracted and then 605 compared against the expected state transisitons. 606 607 @param cmd: reset type (soft or hard) 608 @param states_list: list of expected PD state transitions 609 610 @returns True if state transitions match, False otherwise 611 """ 612 # Want to grab all output until either SRC_READY or SNK_READY 613 reply_exp = ['(.*)(C%d)\s+[\w]+:?\s([\w]+_READY)' % self.port] 614 m = self.utils.send_pd_command_get_output(cmd, reply_exp) 615 return self._verify_state_sequence(states_list, m[0][0]) 616 617 def soft_reset(self): 618 """Initates a PD soft reset sequence 619 620 @returns True if state transitions match, False otherwise 621 """ 622 snk_reset_states = [ 623 'SOFT_RESET', 624 'SNK_DISCOVERY', 625 'SNK_REQUESTED', 626 'SNK_TRANSITION', 627 'SNK_READY' 628 ] 629 630 src_reset_states = [ 631 'SOFT_RESET', 632 'SRC_DISCOVERY', 633 'SRC_NEGOCIATE', 634 'SRC_ACCEPTED', 635 'SRC_POWERED', 636 'SRC_TRANSITION', 637 'SRC_READY' 638 ] 639 640 if self.is_src(): 641 states_list = src_reset_states 642 elif self.is_snk(): 643 states_list = snk_reset_states 644 else: 645 raise error.TestFail('Port Pair not in a connected state') 646 647 cmd = 'pd %d soft' % self.port 648 return self._reset(cmd, states_list) 649 650 def hard_reset(self): 651 """Initates a PD hard reset sequence 652 653 @returns True if state transitions match, False otherwise 654 """ 655 snk_reset_states = [ 656 'HARD_RESET_SEND', 657 'HARD_RESET_EXECUTE', 658 'SNK_HARD_RESET_RECOVER', 659 'SNK_DISCOVERY', 660 'SNK_REQUESTED', 661 'SNK_TRANSITION', 662 'SNK_READY' 663 ] 664 665 src_reset_states = [ 666 'HARD_RESET_SEND', 667 'HARD_RESET_EXECUTE', 668 'SRC_HARD_RESET_RECOVER', 669 'SRC_DISCOVERY', 670 'SRC_NEGOCIATE', 671 'SRC_ACCEPTED', 672 'SRC_POWERED', 673 'SRC_TRANSITION', 674 'SRC_READY' 675 ] 676 677 if self.is_src(): 678 states_list = src_reset_states 679 elif self.is_snk(): 680 states_list = snk_reset_states 681 else: 682 raise error.TestFail('Port Pair not in a connected state') 683 684 cmd = 'pd %d hard' % self.port 685 return self._reset(cmd, states_list) 686 687 688class PDPortPartner(object): 689 """Methods used to instantiate PD device objects 690 691 This class is initalized with a list of servo consoles. It 692 contains methods to determine if USB PD devices are accessible 693 via the consoles and attempts to determine USB PD port partners. 694 A PD device is USB PD port specific, a single console may access 695 multiple PD devices. 696 697 """ 698 699 def __init__(self, consoles): 700 """Initialization method 701 702 @param consoles: list of servo consoles 703 """ 704 self.consoles = consoles 705 706 def __repr__(self): 707 """String representation of the object""" 708 return "<%s %r>" % (self.__class__.__name__, self.consoles) 709 710 def _send_pd_state(self, port, console): 711 """Tests if PD device exists on a given port number 712 713 @param port: USB PD port number to try 714 @param console: servo UART console 715 716 @returns True if 'pd <port> state' command gives a valid 717 response, False otherwise 718 """ 719 cmd = 'pd %d state' % port 720 regex = r'(Port C\d)|(Parameter)' 721 m = console.send_command_get_output(cmd, [regex]) 722 # If PD port exists, then output will be Port C0 or C1 723 regex = r'Port C{0}'.format(port) 724 if re.search(regex, m[0][0]): 725 return True 726 return False 727 728 def _find_num_pd_ports(self, console): 729 """Determine number of PD ports for a given console 730 731 @param console: uart console accssed via servo 732 733 @returns: number of PD ports accessible via console 734 """ 735 MAX_PORTS = 2 736 num_ports = 0 737 for port in range(MAX_PORTS): 738 if self._send_pd_state(port, console): 739 num_ports += 1 740 return num_ports 741 742 def _is_pd_console(self, console): 743 """Check if pd option exists in console 744 745 @param console: uart console accssed via servo 746 747 @returns: True if 'pd' is found, False otherwise 748 """ 749 try: 750 m = console.send_command_get_output('help', [r'(pd)\s+']) 751 return True 752 except error.TestFail: 753 return False 754 755 def _is_pdtester_console(self, console): 756 """Check for PDTester console 757 758 This method looks for a console command option 'usbc_action' which 759 is unique to PDTester PD devices. 760 761 @param console: uart console accssed via servo 762 763 @returns True if usbc_action command is present, False otherwise 764 """ 765 try: 766 m = console.send_command_get_output('help', [r'(usbc_action)']) 767 return True 768 except error.TestFail: 769 return False 770 771 def _check_port_pair(self, port1, port2): 772 """Check if two PD devices could be connected 773 774 If two USB PD devices are connected, then they should be in 775 either the SRC_READY or SNK_READY states and have opposite 776 power roles. In addition, they must be on different servo 777 consoles. 778 779 @param: list of two possible PD port parters 780 781 @returns True if not the same console and both PD devices 782 are a plausible pair based only on their PD states. 783 """ 784 # Don't test if on the same servo console 785 if port1.console == port2.console: 786 logging.info("PD Devices are on same platform -> can't be a pair") 787 return False 788 789 state1 = port1.get_pd_state() 790 port1_is_snk = port1.is_snk(state1) 791 port1_is_src = port1.is_src(state1) 792 793 state2 = port2.get_pd_state() 794 port2_is_snk = port2.is_snk(state2) 795 port2_is_src = port2.is_src(state2) 796 797 # Must be SRC <--> SNK or SNK <--> SRC 798 if (port1_is_src and port2_is_snk) or (port1_is_snk and port2_is_src): 799 logging.debug("SRC+SNK pair: %s (%s) <--> (%s) %s", 800 port1, state1, state2, port2) 801 return True 802 else: 803 logging.debug("Not a SRC+SNK pair: %s (%s) <--> (%s) %s", 804 port1, state1, state2, port2) 805 return False 806 807 def _verify_pdtester_connection(self, tester_port, dut_port): 808 """Verify DUT to PDTester PD connection 809 810 This method checks for a PDTester PD connection for the 811 given port by first verifying if a PD connection is present. 812 If found, then it uses a PDTester feature to force a PD disconnect. 813 If the port is no longer in the connected state, and following 814 a delay, is found to be back in the connected state, then 815 a DUT pd to PDTester connection is verified. 816 817 @param dev_pair: list of two PD devices 818 819 @returns True if DUT to PDTester pd connection is verified 820 """ 821 DISC_CHECK_TIME = 10 822 DISC_WAIT_TIME = 20 823 CONNECT_TIME = 4 824 825 logging.info("Check: %s <--> %s", tester_port, dut_port) 826 827 if not self._check_port_pair(tester_port, dut_port): 828 return False 829 830 # Force PD disconnect 831 logging.debug('Disconnecting to check if devices are partners') 832 tester_port.cc_disconnect_connect(DISC_WAIT_TIME) 833 time.sleep(DISC_CHECK_TIME) 834 835 # Verify that both devices are now disconnected 836 tester_state = tester_port.get_pd_state() 837 dut_state = dut_port.get_pd_state() 838 logging.debug("Recheck: %s (%s) <--> (%s) %s", 839 tester_port, tester_state, dut_state, dut_port) 840 841 if not (tester_port.is_disconnected(tester_state) and 842 dut_port.is_disconnected(dut_state)): 843 logging.info("Ports did not disconnect at the same time, so" 844 " they aren't considered a pair.") 845 # Delay to allow non-pair devices to reconnect 846 time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME) 847 return False 848 849 logging.debug('Pair disconnected. Waiting for reconnect...') 850 851 # Allow enough time for reconnection 852 time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME) 853 if self._check_port_pair(tester_port, dut_port): 854 # Have verified a pd disconnect/reconnect sequence 855 logging.info('PDTester <--> DUT pair found') 856 return True 857 858 logging.info("Ports did not reconnect at the same time, so" 859 " they aren't considered a pair.") 860 return False 861 862 def identify_pd_devices(self): 863 """Instantiate PD devices present in test setup 864 865 @return: list of 2 PD devices if a DUT <-> PDTester found. 866 If not found, then returns an empty list. 867 """ 868 tester_devports = [] 869 dut_devports = [] 870 871 # For each possible uart console, check to see if a PD console 872 # is present and determine the number of PD ports. 873 for console in self.consoles: 874 if self._is_pd_console(console): 875 is_tester = self._is_pdtester_console(console) 876 num_ports = self._find_num_pd_ports(console) 877 # For each PD port that can be accessed via the console, 878 # instantiate either PDConsole or PDTester device. 879 for port in range(num_ports): 880 if is_tester: 881 logging.info('PDTesterDevice on %s port %d', 882 console.name, port) 883 tester_utils = pd_console.create_pd_console_utils( 884 console) 885 tester_devports.append(PDTesterDevice(console, 886 port, tester_utils)) 887 else: 888 logging.info('PDConsoleDevice on %s port %d', 889 console.name, port) 890 dut_utils = pd_console.create_pd_console_utils(console) 891 dut_devports.append(PDConsoleDevice(console, 892 port, dut_utils)) 893 894 if not tester_devports: 895 logging.error('The specified consoles did not include any' 896 ' PD testers: %s', self.consoles) 897 898 if not dut_devports: 899 logging.error('The specified consoles did not contain any' 900 ' DUTs: %s', self.consoles) 901 902 # Determine PD port partners in the list of PD devices. Note that 903 # there can be PD devices which are not accessible via a uart console, 904 # but are connected to a PD port which is accessible. 905 for tester in reversed(tester_devports): 906 for dut in dut_devports: 907 if tester.console == dut.console: 908 # PD Devices are on same servo console -> can't be a pair 909 continue 910 if self._verify_pdtester_connection(tester, dut): 911 dut_devports.remove(dut) 912 return [tester, dut] 913 914 return [] 915