• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2016 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import re
11import logging
12from six.moves import range
13import time
14
15from autotest_lib.client.common_lib import error
16from autotest_lib.server.cros.servo import pd_console
17
18
19class PDDevice(object):
20    """Base clase for all PD devices
21
22    This class provides a set of APIs for expected Type C PD required actions
23    in TypeC FAFT tests. The base class is specific for Type C devices that
24    do not have any console access.
25
26    """
27
28    def is_src(self, state=None):
29        """Checks if the port is connected as a source
30
31        """
32        raise NotImplementedError(
33                'is_src should be implemented in derived class')
34
35    def is_snk(self, state=None):
36        """Checks if the port is connected as a sink
37
38        @returns None
39        """
40        raise NotImplementedError(
41                'is_snk should be implemented in derived class')
42
43    def is_connected(self, state=None):
44        """Checks if the port is connected
45
46        @returns True if in a connected state, False otherwise
47        """
48        return self.is_src(state) or self.is_snk(state)
49
50    def is_disconnected(self, state=None):
51        """Checks if the port is disconnected
52
53        """
54        raise NotImplementedError(
55                'is_disconnected should be implemented in derived class')
56
57    def is_ufp(self):
58        """Checks if data role is UFP
59
60        """
61        raise NotImplementedError(
62                'is_ufp should be implemented in derived class')
63
64    def is_dfp(self):
65        """Checks if data role is DFP
66
67        """
68        raise NotImplementedError(
69                'is_dfp should be implemented in derived class')
70
71    def is_drp(self):
72        """Checks if dual role mode is supported
73
74        """
75        raise NotImplementedError(
76                'is_drp should be implemented in derived class')
77
78    def dr_swap(self):
79        """Attempts a data role swap
80
81        """
82        raise NotImplementedError(
83               'dr_swap should be implemented in derived class')
84
85    def pr_swap(self):
86        """Attempts a power role swap
87
88        """
89        raise NotImplementedError(
90                'pr_swap should be implemented in derived class')
91
92    def vbus_request(self, voltage):
93        """Requests a specific VBUS voltage from SRC
94
95        @param voltage: requested voltage level (5, 12, 20) in volts
96        """
97        raise NotImplementedError(
98                'vbus_request should be implemented in derived class')
99
100    def soft_reset(self):
101        """Initates a PD soft reset sequence
102
103        """
104        raise NotImplementedError(
105                'soft_reset should be implemented in derived class')
106
107    def hard_reset(self):
108        """Initates a PD hard reset sequence
109
110        """
111        raise NotImplementedError(
112                'hard_reset should be implemented in derived class')
113
114    def drp_set(self, mode):
115        """Sets dualrole mode
116
117        @param mode: desired dual role setting (on, off, snk, src)
118        """
119        raise NotImplementedError(
120                'drp_set should be implemented in derived class')
121
122    def drp_get(self):
123        """Gets dualrole mode
124
125        @returns one of the modes (on, off, snk, src)
126        """
127        raise NotImplementedError(
128                'drp_set should be implemented in derived class')
129
130    def drp_disconnect_connect(self, disc_time_sec):
131        """Force PD disconnect/connect via drp settings
132
133        @param disc_time_sec: Time in seconds between disconnect and reconnect
134        """
135        raise NotImplementedError(
136                'drp_disconnect_connect should be implemented in derived class'
137        )
138
139    def cc_disconnect_connect(self, disc_time_sec):
140        """Force PD disconnect/connect
141
142        @param disc_time_sec: Time in seconds between disconnect and reconnect
143        """
144        raise NotImplementedError(
145                'cc_disconnect_connect should be implemented in derived class')
146
147    def get_connected_state_after_cc_reconnect(self, disc_time_sec):
148        """Get the connected state after disconnect/reconnect
149
150        @param disc_time_sec: Time in seconds for disconnect period.
151        @returns: The connected PD state.
152        """
153        raise NotImplementedError(
154                'get_connected_state_after_cc_reconnect should be implemented'
155                'in derived class')
156
157
158class PDConsoleDevice(PDDevice):
159    """Class for PD devices that have console access
160
161    This class contains methods for common PD actions for any PD device which
162    has UART console access. It inherits the PD device base class. In addition,
163    it stores both the UART console and port for the PD device.
164    """
165
166    def __init__(self, console, port, utils):
167        """Initialization method
168
169        @param console: UART console object
170        @param port: USB PD port number
171        """
172        # Save UART console
173        self.console = console
174        # Instantiate PD utilities used by methods in this class
175        self.utils = utils
176        # Save the PD port number for this device
177        self.port = port
178        # Not a PDTester device
179        self.is_pdtester = False
180
181    def get_pd_state(self):
182        """Get the state of the PD port"""
183        return self.utils.get_pd_state(self.port)
184
185    def get_pd_role(self):
186        """Get the current PD power role (source or sink)
187
188        @returns: current pd state
189        """
190        return self.utils.get_pd_role(self.port)
191
192    def is_pd_flag_set(self, key):
193        """Test a bit in PD protocol state flags
194
195        The flag word contains various PD protocol state information.
196        This method allows for a specific flag to be tested.
197
198        @param key: dict key to retrieve the flag bit mapping
199
200        @returns True if the bit to be tested is set
201        """
202        return self.utils.is_pd_flag_set(self.port, key)
203
204    def is_src(self, state=None):
205        """Checks if the port is connected as a source.
206
207        The "state" argument allows the caller to get_pd_state() once, and then
208        evaluate multiple conditions without re-getting the state.
209
210        @param state: the state to check (None to get current state)
211        @returns True if connected as SRC, False otherwise
212        """
213        return self.utils.is_src_connected(self.port, state)
214
215    def is_snk(self, state=None):
216        """Checks if the port is connected as a sink
217
218        The "state" argument allows the caller to get_pd_state() once, and then
219        evaluate multiple conditions without re-getting the state.
220
221        @param state: the state to check (None to get current state)
222        @returns True if connected as SNK, False otherwise
223        """
224        return self.utils.is_snk_connected(self.port, state)
225
226    def is_connected(self, state=None):
227        """Checks if the port is connected
228
229        The "state" argument allows the caller to get_pd_state() once, and then
230        evaluate multiple conditions without re-getting the state.
231
232        @param state: the state to check (None to get current state)
233        @returns True if in a connected state, False otherwise
234        """
235        return self.is_snk(state) or self.is_src(state)
236
237    def is_disconnected(self, state=None):
238        """Checks if the port is disconnected
239
240        @returns True if in a disconnected state, False otherwise
241        """
242        return self.utils.is_disconnected(self.port, state)
243
244    def __repr__(self):
245        """String representation of the object"""
246        return "<%s %r port %s>" % (
247            self.__class__.__name__, self.console.name, self.port)
248
249    def is_drp(self):
250        """Checks if dual role mode is supported
251
252        @returns True if dual role mode is 'on', False otherwise
253        """
254        return self.utils.is_pd_dual_role_enabled(self.port)
255
256    def drp_disconnect_connect(self, disc_time_sec):
257        """Disconnect/reconnect using drp mode settings
258
259        A PD console device doesn't have an explicit connect/disconnect
260        command. Instead, the dualrole mode setting is used to force
261        disconnects in devices which support this feature. To disconnect,
262        force the dualrole mode to be the opposite role of the current
263        connected state.
264
265        @param disc_time_sec: time in seconds to wait to reconnect
266
267        @returns True if device disconnects, then returns to a connected
268        state. False if either step fails.
269        """
270        # Dualrole mode must be supported
271        if self.is_drp() is False:
272            logging.warning('Device not DRP capable, unabled to force disconnect')
273            return False
274        # Force state will be the opposite of current connect state
275        if self.is_src():
276            drp_mode = 'snk'
277            swap_state = self.utils.get_snk_connect_states()
278        else:
279            drp_mode = 'src'
280            swap_state = self.utils.get_src_connect_states()
281        # Force disconnect
282        self.drp_set(drp_mode)
283        # Wait for disconnect time
284        time.sleep(disc_time_sec)
285        # Verify that the device is disconnected
286        disconnect = self.is_disconnected()
287
288        # If the other device is dualrole, then forcing dualrole mode will
289        # only cause the disconnect to appear momentarily and reconnect
290        # in the power role forced by the drp_set() call. For this case,
291        # the role swap verifies that a disconnect/connect sequence occurred.
292        if disconnect == False:
293            time.sleep(self.utils.CONNECT_TIME)
294            # Connected, verify if power role swap has occurred
295            if self.utils.get_pd_state(self.port) in swap_state:
296                # Restore default dualrole mode
297                self.drp_set('on')
298                # Restore orignal power role
299                connect = self.pr_swap()
300                if connect == False:
301                    logging.warning('DRP on both devices, 2nd power swap failed')
302                return connect
303
304        # Restore default dualrole mode
305        self.drp_set('on')
306        # Allow enough time for protocol state machine
307        time.sleep(self.utils.CONNECT_TIME)
308        # Check if connected
309        connect = self.is_connected()
310        logging.info('Disconnect = %r, Connect = %r', disconnect, connect)
311        return bool(disconnect and connect)
312
313    def drp_set(self, mode):
314        """Sets dualrole mode
315
316        @param mode: desired dual role setting (on, off, snk, src)
317
318        @returns True is set was successful, False otherwise
319        """
320        # Set desired dualrole mode
321        self.utils.set_pd_dualrole(self.port, mode)
322        # Get current setting
323        current = self.utils.get_pd_dualrole(self.port)
324        # Verify that setting is correct
325        return bool(mode == current)
326
327    def drp_get(self):
328        """Gets dualrole mode
329
330        @returns one of the modes (on, off, snk, src)
331        """
332        return self.utils.get_pd_dualrole(self.port)
333
334    def try_src(self, enable):
335        """Enables/Disables Try.SRC PD protocol setting
336
337        @param enable: True to enable, False to disable
338
339        @returns True is setting was successful, False if feature not
340        supported by the device, or not set as desired.
341        """
342        # Create Try.SRC pd command
343        cmd = 'pd trysrc %d' % int(enable)
344        # TCPMv1 indicates Try.SRC is on by returning 'on'
345        # TCPMv2 indicates Try.SRC is on by returning 'Forced ON'
346        on_vals = ('on', 'Forced ON')
347        # TCPMv1 indicates Try.SRC is off by returning 'off'
348        # TCPMv2 indicates Try.SRC is off by returning 'Forced OFF'
349        off_vals = ('off', 'Forced OFF')
350
351        # Try.SRC on/off is output, if supported feature
352        regex = ['Try\.SRC\s(%s)|(Parameter)' % ('|'.join(on_vals + off_vals))]
353        m = self.utils.send_pd_command_get_output(cmd, regex)
354
355        # Determine if Try.SRC feature is supported
356        if 'Try.SRC' not in m[0][0]:
357            logging.warning('Try.SRC not supported on this PD device')
358            return False
359
360        # TrySRC is supported on this PD device, verify setting.
361        trysrc_val = m[0][1]
362        logging.info('Try.SRC mode = %s', trysrc_val)
363        if enable:
364            vals = on_vals
365        else:
366            vals = off_vals
367
368        return trysrc_val in vals
369
370    def soft_reset(self):
371        """Initates a PD soft reset sequence
372
373        To verify that a soft reset sequence was initiated, the
374        reply message is checked to verify that the reset command
375        was acknowledged by its port pair. The connect state should
376        be same as it was prior to issuing the reset command.
377
378        @returns True if the port pair acknowledges the the reset message
379        and if following the command, the device returns to the same
380        connected state. False otherwise.
381        """
382        RESET_DELAY = 0.5
383        cmd = 'pd %d soft' % self.port
384        state_before = self.utils.get_pd_state(self.port)
385        reply = self.utils.send_pd_command_get_reply_msg(cmd)
386        if reply != self.utils.PD_CONTROL_MSG_DICT['Accept']:
387            return False
388        time.sleep(RESET_DELAY)
389        state_after = self.utils.get_pd_state(self.port)
390        return state_before == state_after
391
392    def hard_reset(self):
393        """Initates a PD hard reset sequence
394
395        To verify that a hard reset sequence was initiated, the
396        console ouput is scanned for HARD RST TX. In addition, the connect
397        state should be same as it was prior to issuing the reset command.
398
399        @returns True if the port pair acknowledges that hard reset was
400        initiated and if following the command, the device returns to the same
401        connected state. False otherwise.
402        """
403        RESET_DELAY = 1.0
404        cmd = 'pd %d hard' % self.port
405        state_before = self.utils.get_pd_state(self.port)
406        self.utils.enable_pd_console_debug()
407        try:
408            tcpmv1_pattern = '.*(HARD\sRST\sTX)'
409            tcpmv2_pattern = '.*(PE_SNK_Hard_Reset)|.*(PE_SRC_Hard_Reset)'
410            pattern = '|'.join((tcpmv1_pattern, tcpmv2_pattern))
411            self.utils.send_pd_command_get_output(cmd, [pattern])
412        except error.TestFail:
413            logging.warning('HARD RST TX not found')
414            return False
415        finally:
416            self.utils.disable_pd_console_debug()
417
418        time.sleep(RESET_DELAY)
419        state_after = self.utils.get_pd_state(self.port)
420        return state_before == state_after
421
422    def pr_swap(self):
423        """Attempts a power role swap
424
425        In order to attempt a power role swap the device must be
426        connected and support dualrole mode. Once these two criteria
427        are checked a power role command is issued. Following a delay
428        to allow for a reconnection the new power role is checked
429        against the power role prior to issuing the command.
430
431        @returns True if the device has swapped power roles, False otherwise.
432        """
433        # Get starting state
434        if not self.is_drp():
435            logging.warning('Dualrole Mode not enabled!')
436            return False
437        if self.is_connected() == False:
438            logging.warning('PD contract not established!')
439            return False
440        current_pr = self.utils.get_pd_state(self.port)
441        swap_cmd = 'pd %d swap power' % self.port
442        self.utils.send_pd_command(swap_cmd)
443        time.sleep(self.utils.CONNECT_TIME)
444        new_pr = self.utils.get_pd_state(self.port)
445        logging.info('Power swap: %s -> %s', current_pr, new_pr)
446        if self.is_connected() == False:
447            logging.warning('Device not connected following PR swap attempt.')
448            return False
449        return current_pr != new_pr
450
451
452class PDTesterDevice(PDConsoleDevice):
453    """Class for PDTester devices
454
455    This class contains methods for PD funtions which are unique to the
456    PDTester board, e.g. Plankton or Servo v4. It inherits all the methods
457    for PD console devices.
458    """
459
460    def __init__(self, console, port, utils):
461        """Initialization method
462
463        @param console: UART console for this device
464        @param port: USB PD port number
465        """
466        # Instantiate the PD console object
467        super(PDTesterDevice, self).__init__(console, port, utils)
468        # Indicate this is PDTester device
469        self.is_pdtester = True
470
471    def _toggle_pdtester_drp(self):
472        """Issue 'usbc_action drp' PDTester command
473
474        @returns value of drp_enable in PDTester FW
475        """
476        drp_cmd = 'usbc_action drp'
477        drp_re = ['DRP\s=\s(\d)']
478        # Send DRP toggle command to PDTester and get value of 'drp_enable'
479        m = self.utils.send_pd_command_get_output(drp_cmd, drp_re)
480        return int(m[0][1])
481
482    def _enable_pdtester_drp(self):
483        """Enable DRP mode on PDTester
484
485        DRP mode can only be toggled and is not able to be explicitly
486        enabled/disabled via the console. Therefore, this method will
487        toggle DRP mode until the console reply indicates that this
488        mode is enabled. The toggle happens a maximum of two times
489        in case this is called when it's already enabled.
490
491        @returns True when DRP mode is enabled, False if not successful
492        """
493        for attempt in range(2):
494            if self._toggle_pdtester_drp() == True:
495                logging.info('PDTester DRP mode enabled')
496                return True
497        logging.error('PDTester DRP mode set failure')
498        return False
499
500    def _verify_state_sequence(self, states_list, console_log):
501        """Compare PD state transitions to expected values
502
503        @param states_list: list of expected PD state transitions
504        @param console_log: console output which contains state names
505
506        @returns True if the sequence matches, False otherwise
507        """
508        # For each state in the expected state transiton table, build
509        # the regexp and search for it in the state transition log.
510        for state in states_list:
511            state_regx = r'C{0}\s+[\w]+:?\s({1})'.format(self.port,
512                                                         state)
513            if re.search(state_regx, console_log) is None:
514                return False
515        return True
516
517    def cc_disconnect_connect(self, disc_time_sec):
518        """Disconnect/reconnect using PDTester
519
520        PDTester supports a feature which simulates a USB Type C disconnect
521        and reconnect.
522
523        @param disc_time_sec: Time in seconds for disconnect period.
524        """
525        DISC_DELAY = 100
526        disc_cmd = 'fakedisconnect %d  %d' % (DISC_DELAY,
527                                              disc_time_sec * 1000)
528        self.utils.send_pd_command(disc_cmd)
529
530    def get_connected_state_after_cc_reconnect(self, disc_time_sec):
531        """Get the connected state after disconnect/reconnect using PDTester
532
533        PDTester supports a feature which simulates a USB Type C disconnect
534        and reconnect. It returns the first connected state (either source or
535        sink) after reconnect.
536
537        @param disc_time_sec: Time in seconds for disconnect period.
538        @returns: The connected PD state.
539        """
540        DISC_DELAY = 100
541        disc_cmd = 'fakedisconnect %d %d' % (DISC_DELAY, disc_time_sec * 1000)
542        state_exp = '(C%d)\s+[\w]+:?\s(%s)'
543
544        disconnected_tuple = self.utils.get_disconnected_states()
545        disconnected_states = '|'.join(disconnected_tuple)
546        disconnected_exp = state_exp % (self.port, disconnected_states)
547
548        src_connected_tuple = self.utils.get_src_connect_states()
549        snk_connected_tuple = self.utils.get_snk_connect_states()
550        connected_states = '|'.join(src_connected_tuple + snk_connected_tuple)
551        connected_exp = state_exp % (self.port, connected_states)
552
553        m = self.utils.send_pd_command_get_output(disc_cmd, [disconnected_exp,
554            connected_exp])
555        return m[1][2]
556
557    def drp_disconnect_connect(self, disc_time_sec):
558        """Disconnect/reconnect using PDTester
559
560        Utilize PDTester disconnect/connect utility and verify
561        that both disconnect and reconnect actions were successful.
562
563        @param disc_time_sec: Time in seconds for disconnect period.
564
565        @returns True if device disconnects, then returns to a connected
566        state. False if either step fails.
567        """
568        self.cc_disconnect_connect(disc_time_sec)
569        time.sleep(disc_time_sec / 2)
570        disconnect = self.is_disconnected()
571        time.sleep(disc_time_sec / 2 + self.utils.CONNECT_TIME)
572        connect = self.is_connected()
573        return disconnect and connect
574
575    def drp_set(self, mode):
576        """Sets dualrole mode
577
578        @param mode: desired dual role setting (on, off, snk, src)
579
580        @returns True if dualrole mode matches the requested value or
581        is successfully set to that value. False, otherwise.
582        """
583        # Get current value of dualrole
584        drp = self.utils.get_pd_dualrole(self.port)
585        if drp == mode:
586            return True
587
588        if mode == 'on':
589            # Setting dpr_enable on PDTester will set dualrole mode to on
590            return self._enable_pdtester_drp()
591        else:
592            # If desired setting is other than 'on', need to ensure that
593            # drp mode on PDTester is disabled.
594            if drp == 'on':
595                # This will turn off drp_enable flag and set dualmode to 'off'
596                return self._toggle_pdtester_drp()
597            # With drp_enable flag off, can set to desired setting
598            return self.utils.set_pd_dualrole(self.port, mode)
599
600    def _reset(self, cmd, states_list):
601        """Initates a PD reset sequence
602
603        PDTester device has state names available on the console. When
604        a soft reset is issued the console log is extracted and then
605        compared against the expected state transisitons.
606
607        @param cmd: reset type (soft or hard)
608        @param states_list: list of expected PD state transitions
609
610        @returns True if state transitions match, False otherwise
611        """
612        # Want to grab all output until either SRC_READY or SNK_READY
613        reply_exp = ['(.*)(C%d)\s+[\w]+:?\s([\w]+_READY)' % self.port]
614        m = self.utils.send_pd_command_get_output(cmd, reply_exp)
615        return self._verify_state_sequence(states_list, m[0][0])
616
617    def soft_reset(self):
618        """Initates a PD soft reset sequence
619
620        @returns True if state transitions match, False otherwise
621        """
622        snk_reset_states = [
623            'SOFT_RESET',
624            'SNK_DISCOVERY',
625            'SNK_REQUESTED',
626            'SNK_TRANSITION',
627            'SNK_READY'
628        ]
629
630        src_reset_states = [
631            'SOFT_RESET',
632            'SRC_DISCOVERY',
633            'SRC_NEGOCIATE',
634            'SRC_ACCEPTED',
635            'SRC_POWERED',
636            'SRC_TRANSITION',
637            'SRC_READY'
638        ]
639
640        if self.is_src():
641            states_list = src_reset_states
642        elif self.is_snk():
643            states_list = snk_reset_states
644        else:
645            raise error.TestFail('Port Pair not in a connected state')
646
647        cmd = 'pd %d soft' % self.port
648        return self._reset(cmd, states_list)
649
650    def hard_reset(self):
651        """Initates a PD hard reset sequence
652
653        @returns True if state transitions match, False otherwise
654        """
655        snk_reset_states = [
656            'HARD_RESET_SEND',
657            'HARD_RESET_EXECUTE',
658            'SNK_HARD_RESET_RECOVER',
659            'SNK_DISCOVERY',
660            'SNK_REQUESTED',
661            'SNK_TRANSITION',
662            'SNK_READY'
663        ]
664
665        src_reset_states = [
666            'HARD_RESET_SEND',
667            'HARD_RESET_EXECUTE',
668            'SRC_HARD_RESET_RECOVER',
669            'SRC_DISCOVERY',
670            'SRC_NEGOCIATE',
671            'SRC_ACCEPTED',
672            'SRC_POWERED',
673            'SRC_TRANSITION',
674            'SRC_READY'
675        ]
676
677        if self.is_src():
678            states_list = src_reset_states
679        elif self.is_snk():
680            states_list = snk_reset_states
681        else:
682            raise error.TestFail('Port Pair not in a connected state')
683
684        cmd = 'pd %d hard' % self.port
685        return self._reset(cmd, states_list)
686
687
688class PDPortPartner(object):
689    """Methods used to instantiate PD device objects
690
691    This class is initalized with a list of servo consoles. It
692    contains methods to determine if USB PD devices are accessible
693    via the consoles and attempts to determine USB PD port partners.
694    A PD device is USB PD port specific, a single console may access
695    multiple PD devices.
696
697    """
698
699    def __init__(self, consoles):
700        """Initialization method
701
702        @param consoles: list of servo consoles
703        """
704        self.consoles = consoles
705
706    def __repr__(self):
707        """String representation of the object"""
708        return "<%s %r>" % (self.__class__.__name__, self.consoles)
709
710    def _send_pd_state(self, port, console):
711        """Tests if PD device exists on a given port number
712
713        @param port: USB PD port number to try
714        @param console: servo UART console
715
716        @returns True if 'pd <port> state' command gives a valid
717        response, False otherwise
718        """
719        cmd = 'pd %d state' % port
720        regex = r'(Port C\d)|(Parameter)'
721        m = console.send_command_get_output(cmd, [regex])
722        # If PD port exists, then output will be Port C0 or C1
723        regex = r'Port C{0}'.format(port)
724        if re.search(regex, m[0][0]):
725            return True
726        return False
727
728    def _find_num_pd_ports(self, console):
729        """Determine number of PD ports for a given console
730
731        @param console: uart console accssed via servo
732
733        @returns: number of PD ports accessible via console
734        """
735        MAX_PORTS = 2
736        num_ports = 0
737        for port in range(MAX_PORTS):
738            if self._send_pd_state(port, console):
739                num_ports += 1
740        return num_ports
741
742    def _is_pd_console(self, console):
743        """Check if pd option exists in console
744
745        @param console: uart console accssed via servo
746
747        @returns: True if 'pd' is found, False otherwise
748        """
749        try:
750            m = console.send_command_get_output('help', [r'(pd)\s+'])
751            return True
752        except error.TestFail:
753            return False
754
755    def _is_pdtester_console(self, console):
756        """Check for PDTester console
757
758        This method looks for a console command option 'usbc_action' which
759        is unique to PDTester PD devices.
760
761        @param console: uart console accssed via servo
762
763        @returns True if usbc_action command is present, False otherwise
764        """
765        try:
766            m = console.send_command_get_output('help', [r'(usbc_action)'])
767            return True
768        except error.TestFail:
769            return False
770
771    def _check_port_pair(self, port1, port2):
772        """Check if two PD devices could be connected
773
774        If two USB PD devices are connected, then they should be in
775        either the SRC_READY or SNK_READY states and have opposite
776        power roles. In addition, they must be on different servo
777        consoles.
778
779        @param: list of two possible PD port parters
780
781        @returns True if not the same console and both PD devices
782        are a plausible pair based only on their PD states.
783        """
784        # Don't test if on the same servo console
785        if port1.console == port2.console:
786            logging.info("PD Devices are on same platform -> can't be a pair")
787            return False
788
789        state1 = port1.get_pd_state()
790        port1_is_snk = port1.is_snk(state1)
791        port1_is_src = port1.is_src(state1)
792
793        state2 = port2.get_pd_state()
794        port2_is_snk = port2.is_snk(state2)
795        port2_is_src = port2.is_src(state2)
796
797        # Must be SRC <--> SNK or SNK <--> SRC
798        if (port1_is_src and port2_is_snk) or (port1_is_snk and port2_is_src):
799            logging.debug("SRC+SNK pair: %s (%s) <--> (%s) %s",
800                          port1, state1, state2, port2)
801            return True
802        else:
803            logging.debug("Not a SRC+SNK pair: %s (%s) <--> (%s) %s",
804                          port1, state1, state2, port2)
805            return False
806
807    def _verify_pdtester_connection(self, tester_port, dut_port):
808        """Verify DUT to PDTester PD connection
809
810        This method checks for a PDTester PD connection for the
811        given port by first verifying if a PD connection is present.
812        If found, then it uses a PDTester feature to force a PD disconnect.
813        If the port is no longer in the connected state, and following
814        a delay, is found to be back in the connected state, then
815        a DUT pd to PDTester connection is verified.
816
817        @param dev_pair: list of two PD devices
818
819        @returns True if DUT to PDTester pd connection is verified
820        """
821        DISC_CHECK_TIME = 10
822        DISC_WAIT_TIME = 20
823        CONNECT_TIME = 4
824
825        logging.info("Check: %s <--> %s", tester_port, dut_port)
826
827        if not self._check_port_pair(tester_port, dut_port):
828            return False
829
830        # Force PD disconnect
831        logging.debug('Disconnecting to check if devices are partners')
832        tester_port.cc_disconnect_connect(DISC_WAIT_TIME)
833        time.sleep(DISC_CHECK_TIME)
834
835        # Verify that both devices are now disconnected
836        tester_state = tester_port.get_pd_state()
837        dut_state = dut_port.get_pd_state()
838        logging.debug("Recheck: %s (%s) <--> (%s) %s",
839                      tester_port, tester_state, dut_state, dut_port)
840
841        if not (tester_port.is_disconnected(tester_state) and
842                dut_port.is_disconnected(dut_state)):
843            logging.info("Ports did not disconnect at the same time, so"
844                         " they aren't considered a pair.")
845            # Delay to allow non-pair devices to reconnect
846            time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME)
847            return False
848
849        logging.debug('Pair disconnected.  Waiting for reconnect...')
850
851        # Allow enough time for reconnection
852        time.sleep(DISC_WAIT_TIME - DISC_CHECK_TIME + CONNECT_TIME)
853        if self._check_port_pair(tester_port, dut_port):
854            # Have verified a pd disconnect/reconnect sequence
855            logging.info('PDTester <--> DUT pair found')
856            return True
857
858        logging.info("Ports did not reconnect at the same time, so"
859                     " they aren't considered a pair.")
860        return False
861
862    def identify_pd_devices(self):
863        """Instantiate PD devices present in test setup
864
865        @return: list of 2 PD devices if a DUT <-> PDTester found.
866                 If not found, then returns an empty list.
867        """
868        tester_devports = []
869        dut_devports = []
870
871        # For each possible uart console, check to see if a PD console
872        # is present and determine the number of PD ports.
873        for console in self.consoles:
874            if self._is_pd_console(console):
875                is_tester = self._is_pdtester_console(console)
876                num_ports = self._find_num_pd_ports(console)
877                # For each PD port that can be accessed via the console,
878                # instantiate either PDConsole or PDTester device.
879                for port in range(num_ports):
880                    if is_tester:
881                        logging.info('PDTesterDevice on %s port %d',
882                                     console.name, port)
883                        tester_utils = pd_console.create_pd_console_utils(
884                                       console)
885                        tester_devports.append(PDTesterDevice(console,
886                                                    port, tester_utils))
887                    else:
888                        logging.info('PDConsoleDevice on %s port %d',
889                                     console.name, port)
890                        dut_utils = pd_console.create_pd_console_utils(console)
891                        dut_devports.append(PDConsoleDevice(console,
892                                                    port, dut_utils))
893
894        if not tester_devports:
895            logging.error('The specified consoles did not include any'
896                          ' PD testers: %s', self.consoles)
897
898        if not dut_devports:
899            logging.error('The specified consoles did not contain any'
900                          ' DUTs: %s', self.consoles)
901
902        # Determine PD port partners in the list of PD devices. Note that
903        # there can be PD devices which are not accessible via a uart console,
904        # but are connected to a PD port which is accessible.
905        for tester in reversed(tester_devports):
906            for dut in dut_devports:
907                if tester.console == dut.console:
908                    # PD Devices are on same servo console -> can't be a pair
909                    continue
910                if self._verify_pdtester_connection(tester, dut):
911                    dut_devports.remove(dut)
912                    return [tester, dut]
913
914        return []
915