• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import re
11import logging
12import six
13from six.moves import range
14import time
15
16from autotest_lib.client.common_lib import error
17
18
19class PDConsoleUtils(object):
20    """Base clase for all PD console utils
21
22    This class provides a set of APIs for expected Type C PD required actions
23    in TypeC FAFT tests. The base class is specific for Type C console access.
24
25    """
26    def __init__(self, console):
27        """Console can be either usbpd, ec, or pdtester UART
28
29        This object with then be used by the class which creates
30        the PDConsoleUtils class to send/receive commands to UART
31        """
32        # save console for UART access functions
33        self.console = console
34
35    def send_pd_command(self, cmd):
36        """Send command to PD console UART
37
38        @param cmd: pd command string
39        """
40        self.console.send_command(cmd)
41
42    def send_pd_command_get_output(self, cmd, regexp, debug_on=True):
43        """Send command to PD console, wait for response
44
45        @param cmd: pd command string
46        @param regexp: regular expression for desired output
47        """
48        # Enable PD console debug mode to show control messages
49        if debug_on:
50            self.enable_pd_console_debug()
51        output = self.console.send_command_get_output(cmd, regexp)
52        if debug_on:
53            self.disable_pd_console_debug()
54        return output
55
56    def send_pd_command_get_reply_msg(self, cmd):
57        """Send PD protocol msg, get PD control msg reply
58
59        The PD console debug mode is enabled prior to sending
60        a pd protocol message. This allows the
61        control message reply to be extracted. The debug mode
62        is disabled prior to exiting.
63
64        @param cmd: pd command to issue to the UART console
65
66        @returns: PD control header message
67        """
68        m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W'])
69        ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
70        return ctrl_msg
71
72    def verify_pd_console(self):
73        """Verify that PD commands exist on UART console
74
75        Send 'help' command to UART console
76
77        @returns: True if 'pd' is found, False if not
78        """
79
80        l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
81        if l[0][1] == 'pd':
82            return True
83        else:
84            return False
85
86    def get_pd_version(self):
87        """Get the version of the PD stack
88
89        @returns: version of PD stack, one of (1, 2)
90        """
91        # Match a number or an error ("Wrong number of params")
92        matches = self.console.send_command_get_output('pd version',
93                                                       [r'\s+(\d+|Wrong.*)'])
94        if matches:
95            result = matches[0][1]
96            if result[0].isdigit():
97                return int(result)
98        return 1
99
100    def execute_pd_state_cmd(self, port):
101        """Get PD state for specified channel
102
103        pd 0/1 state command gives produces 5 fields. The full response
104        line is captured and then parsed to extract each field to fill
105        the dict containing port, polarity, role, pd_state, and flags.
106
107        @param port: Type C PD port 0 or 1
108
109        @returns: A dict with the 5 fields listed above
110        @raises: TestFail if any field not found
111        """
112        cmd = 'pd'
113        subcmd = 'state'
114        pd_cmd = cmd +" " + str(port) + " " + subcmd
115        time.sleep(self.CURRENT_STATE_PROBE_DELAY)
116        # Two FW versions for this command, get full line.
117        m = self.send_pd_command_get_output(pd_cmd, ['(Port.*) - (Role:.*)\n'],
118                                            debug_on=False)
119
120        # Extract desired values from result string
121        state_result = {}
122        pd_state_dict = self.PD_STATE_DICT
123
124        for key, regexp in six.iteritems(pd_state_dict):
125            value = re.search(regexp, m[0][0])
126            if value:
127                state_result[key] = value.group(1)
128            else:
129                raise error.TestFail('pd %d state: %r value not found' %
130                                     (port, key))
131
132        return state_result
133
134    def get_pd_state(self, port):
135        """Get the current PD state
136
137        """
138        raise NotImplementedError(
139            'should be implemented in derived class')
140
141    def get_pd_port(self, port):
142        """Get the current PD port
143
144        @param port: Type C PD port 0/1
145        @returns: current pd state
146        """
147        pd_dict = self.execute_pd_state_cmd(port)
148        return pd_dict['port']
149
150    def get_pd_role(self, port):
151        """Get the current PD power role (source or sink)
152
153        @param port: Type C PD port 0/1
154        @returns: current pd state
155        """
156        pd_dict = self.execute_pd_state_cmd(port)
157        return pd_dict['role']
158
159    def get_pd_flags(self, port):
160        """Get the current PD flags
161
162        @param port: Type C PD port 0/1
163        @returns: current pd state
164        """
165        pd_dict = self.execute_pd_state_cmd(port)
166        return pd_dict['flags']
167
168    def get_pd_dualrole(self, port):
169        """Get the current PD dualrole setting
170
171        @param port: Type C PD port 0/1
172        @returns: current PD dualrole setting, one of (on, off, snk, src)
173        """
174
175        if self.per_port_dualrole_setting is True:
176            cmd = 'pd %d dualrole' % port
177        elif self.per_port_dualrole_setting is False:
178            cmd = 'pd dualrole'
179        else:
180            try:
181                self.per_port_dualrole_setting = True
182                return self.get_pd_dualrole(port)
183            except:
184                self.per_port_dualrole_setting = False
185                return self.get_pd_dualrole(port)
186
187        dualrole_values = self.DUALROLE_VALUES
188
189        m = self.send_pd_command_get_output(
190                cmd, ['dual-role toggling:\s+([\w ]+)[\r\n]'], debug_on=False)
191        # Find the index according to the output of "pd dualrole" command
192        dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1])
193        # Map to a string which is the output of this method
194        return dualrole_values[dual_index]
195
196    def set_pd_dualrole(self, port, value):
197        """Set pd dualrole
198
199        """
200        raise NotImplementedError(
201            'should be implemented in derived class')
202
203    def query_pd_connection(self):
204        """Determine if PD connection is present
205
206        Try the 'pd 0/1 state' command and see if it's in either
207        expected state of a connection. Record the port number
208        that has an active connection
209
210        @returns: dict with params port, connect, and state
211        """
212        status = {}
213        port = 0;
214        status['connect'] = False
215        status['port'] = port
216        state = self.get_pd_state(port)
217        # Check port 0 first
218
219        if self.is_pd_connected(port):
220            status['connect'] = True
221            status['role'] = state
222        else:
223            port = 1
224            status['port'] = port
225            state = self.get_pd_state(port)
226            logging.info('CHECK PORT 1: %s', state)
227            # Check port 1
228            if self.is_pd_connected(port):
229                status['connect'] = True
230                status['role'] = state
231
232        return status
233
234    def swap_power_role(self, port):
235        """Attempt a power role swap
236
237        This method attempts to execute a power role swap. A check
238        is made to ensure that dualrole mode is enabled and that
239        a PD contract is currently established. If both checks pass,
240        then the power role swap command is issued. After a delay,
241        if a PD contract is established and the current state does
242        not equal the starting state, then it was successful.
243
244        @param port: pd port number
245
246        @returns: True if power swap is successful, False otherwise.
247        """
248        # Get starting state
249        if self.is_pd_dual_role_enabled(port) == False:
250            logging.info('Dualrole Mode not enabled!')
251            return False
252        if self.is_pd_connected(port) == False:
253            logging.info('PD contract not established!')
254            return False
255        current_pr = self.get_pd_state(port)
256        swap_cmd = 'pd %d swap power' % port
257        self.send_pd_command(swap_cmd)
258        time.sleep(self.CONNECT_TIME)
259        new_pr = self.get_pd_state(port)
260        logging.info('Power swap: %s -> %s', current_pr, new_pr)
261        if self.is_pd_connected(port) == False:
262            return False
263        return bool(current_pr != new_pr)
264
265    def disable_pd_console_debug(self):
266        """Turn off PD console debug
267
268        """
269        cmd = 'pd dump 0'
270        self.send_pd_command(cmd)
271
272    def enable_pd_console_debug(self):
273        """Enable PD console debug level 1
274
275        """
276        cmd = 'pd dump 2'
277        self.send_pd_command(cmd)
278
279    def is_pd_flag_set(self, port, key):
280        """Test a bit in PD protocol state flags
281
282        The flag word contains various PD protocol state information.
283        This method allows for a specific flag to be tested.
284
285        @param port: Port which has the active PD connection
286        @param key: dict key to retrieve the flag bit mapping
287
288        @returns True if the bit to be tested is set
289        """
290        pd_flags = self.get_pd_flags(port)
291        return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
292
293    def is_pd_connected(self, port):
294        """Check if a PD connection is active
295
296        @param port: port to be used for pd console commands
297
298        @returns True if port is in connected state
299        """
300        return self.is_src_connected(port) or self.is_snk_connected(port)
301
302    def is_pd_dual_role_enabled(self, port):
303        """Check if a PD device is in dualrole mode
304
305        @param port: Type C PD port 0/1
306
307        @returns True is dualrole mode is active, false otherwise
308        """
309        drp = self.get_pd_dualrole(port)
310        return drp == 'on'
311
312    def is_src_connected(self, port, state=None):
313        """Checks if the port is connected as a source
314
315        @param port: Type C PD port 0/1
316        @param state: the state to check (None to get current state)
317
318        @returns True if connected as SRC, False otherwise
319        """
320        if state is None:
321            state = self.get_pd_state(port)
322        return state in self.get_src_connect_states()
323
324    def is_snk_connected(self, port, state=None):
325        """Checks if the port is connected as a sink
326
327        @param port: Type C PD port 0/1
328        @param state: the state to check (None to get current state)
329
330        @returns True if connected as SNK, False otherwise
331        """
332        if state is None:
333            state = self.get_pd_state(port)
334        return state in self.get_snk_connect_states()
335
336    def is_disconnected(self, port, state=None):
337        """Checks if the port is disconnected
338
339        @param port: Type C PD port 0/1
340        @param state: the state to check (None to get current state)
341
342        @return True if disconnected
343        """
344        if state is None:
345            state = self.get_pd_state(port)
346        return state in self.get_disconnected_states()
347
348    def get_src_connect_states(self):
349        """Returns the name of the SRC state
350
351        """
352        raise NotImplementedError(
353            'should be implemented in derived class')
354
355    def get_snk_connect_states(self):
356        """Returns the name of the SNK state
357
358        """
359        raise NotImplementedError(
360            'should be implemented in derived class')
361
362    def get_disconnected_states(self):
363        """Returns the names of the disconnected states
364
365        """
366        return self.DISCONNECTED_STATES
367
368    def is_snk_discovery_state(self, port):
369        """Returns true if in snk discovery state, else false
370
371        @param port: Type C PD port 0/1
372
373        @return: True if in SNK Discovery state
374        """
375        raise NotImplementedError(
376            'should be implemented in derived class')
377
378class TCPMv1ConsoleUtils(PDConsoleUtils):
379    """ Provides a set of methods common to USB PD TCPMv1 FAFT tests
380
381    Each instance of this class is associated with a particular
382    servo UART console. USB PD tests will typically use the console
383    command 'pd' and its subcommands to control/monitor Type C PD
384    connections. The servo object used for UART operations is
385    passed in and stored when this object is created.
386
387    """
388    SRC_CONNECT = ('SRC_READY',)
389    SNK_CONNECT = ('SNK_READY',)
390    SRC_DISC = 'SRC_DISCONNECTED'
391    SNK_DISC = 'SNK_DISCONNECTED'
392    SNK_DISCOVERY = 'SNK_DISCOVERY'
393    DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE'
394    DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE)
395
396    PD_MAX_PORTS = 2
397    CONNECT_TIME = 4
398
399    CURRENT_STATE_PROBE_DELAY = 2
400    DUALROLE_QUERY_DELAY = 1
401    # Dualrole input/output values of methods in this class.
402    DUALROLE_VALUES = ['on', 'off', 'snk', 'src']
403    # Strings passing to the console command "pd dualrole"
404    DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source']
405    # Strings returned from the console command "pd dualrole"
406    DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source']
407
408    # Some old firmware uses a single dualrole setting for all ports; while
409    # some new firmware uses a per port dualrole setting. This flag will be
410    # initialized to True or False.
411    # TODO: Remove this flag when the old setting phases out
412    per_port_dualrole_setting = None
413
414    # Dictionary for 'pd 0/1 state' parsing
415    PD_STATE_DICT = {
416        'port': 'Port\s+([\w]+)',
417        'role': 'Role:\s+([\w]+-[\w]+)',
418        'pd_state': 'State:\s+([\w()]+)',
419        'flags': 'Flags:\s+([\w]+)',
420        'polarity': '(CC\d)'
421    }
422
423    # Regex to match PD state name; work for both old and new formats
424    RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?"
425    # Copied from ec repo: common/usb_pd_protocol.c
426    PD_STATE_NAMES = [
427        "DISABLED",                   # index: 0
428        "SUSPENDED",
429        "SNK_DISCONNECTED",
430        "SNK_DISCONNECTED_DEBOUNCE",
431        "SNK_HARD_RESET_RECOVER",
432        "SNK_DISCOVERY",              # index: 5
433        "SNK_REQUESTED",
434        "SNK_TRANSITION",
435        "SNK_READY",
436        "SNK_SWAP_INIT",
437        "SNK_SWAP_SNK_DISABLE",       # index: 10
438        "SNK_SWAP_SRC_DISABLE",
439        "SNK_SWAP_STANDBY",
440        "SNK_SWAP_COMPLETE",
441        "SRC_DISCONNECTED",
442        "SRC_DISCONNECTED_DEBOUNCE",  # index: 15
443        "SRC_HARD_RESET_RECOVER",
444        "SRC_STARTUP",
445        "SRC_DISCOVERY",
446        "SRC_NEGOCIATE",
447        "SRC_ACCEPTED",               # index: 20
448        "SRC_POWERED",
449        "SRC_TRANSITION",
450        "SRC_READY",
451        "SRC_GET_SNK_CAP",
452        "DR_SWAP",                    # index: 25
453        "SRC_SWAP_INIT",
454        "SRC_SWAP_SNK_DISABLE",
455        "SRC_SWAP_SRC_DISABLE",
456        "SRC_SWAP_STANDBY",
457        "VCONN_SWAP_SEND",            # index: 30
458        "VCONN_SWAP_INIT",
459        "VCONN_SWAP_READY",
460        "SOFT_RESET",
461        "HARD_RESET_SEND",
462        "HARD_RESET_EXECUTE",         # index: 35
463        "BIST_RX",
464        "BIST_TX",
465        "DRP_AUTO_TOGGLE",
466    ]
467
468    # Dictionary for PD control message types
469    PD_CONTROL_MSG_MASK = 0x1f
470    PD_CONTROL_MSG_DICT = {
471        'GoodCRC': 1,
472        'GotoMin': 2,
473        'Accept': 3,
474        'Reject': 4,
475        'Ping': 5,
476        'PS_RDY': 6,
477        'Get_Source_Cap': 7,
478        'Get_Sink_Cap': 8,
479        'DR_Swap': 9,
480        'PR_Swap': 10,
481        'VCONN_Swap': 11,
482        'Wait': 12,
483        'Soft_Reset': 13
484    }
485
486    # Dictionary for PD firmware state flags
487    PD_STATE_FLAGS_DICT = {
488        'power_swap': 1 << 1,
489        'data_swap': 1 << 2,
490        'data_swap_active': 1 << 3,
491        'vconn_on': 1 << 12
492    }
493
494    def _normalize_pd_state(self, state):
495        """Normalize the PD state name which handles both old and new formats.
496
497        The old format is like: "SNK_READY"
498        The new format is like: "8()" if debug_level == 0, or
499                                "8(SNK_READY)" if debug_level > 0
500
501        This method will convert the new format to the old one.
502
503        @param state: The raw PD state text
504
505        @returns: The normalized PD state name
506        @raises: TestFail if unexpected PD state format
507        """
508        m = re.match(self.RE_PD_STATE, state)
509        if m and any(m.groups()):
510            state_index, state_name = m.groups()
511            if state_index is None:
512                # The old format: return the name
513                return state_name
514            # The new format: map the index to a name
515            mapped_name = self.PD_STATE_NAMES[int(state_index)]
516            if state_name is not None:
517                assert mapped_name == state_name
518            return mapped_name
519        else:
520            raise error.TestFail('Unexpected PD state format: %s' % state)
521
522    def get_pd_state(self, port):
523        """Get the current PD state
524
525        @param port: Type C PD port 0/1
526        @returns: current pd state
527        """
528
529        pd_dict = self.execute_pd_state_cmd(port)
530        return self._normalize_pd_state(pd_dict['pd_state'])
531
532    def set_pd_dualrole(self, port, value):
533        """Set pd dualrole
534
535        It can be set to either:
536        1. on
537        2. off
538        3. snk (force sink mode)
539        4. src (force source mode)
540        After setting, the current value is read to confirm that it
541        was set properly.
542
543        @param port: Type C PD port 0/1
544        @param value: One of the 4 options listed
545        """
546        dualrole_values = self.DUALROLE_VALUES
547        # If the dualrole setting is not initialized, call the get method to
548        # initialize it.
549        if self.per_port_dualrole_setting is None:
550            self.get_pd_dualrole(port)
551
552        # Get string required for console command
553        dual_index = dualrole_values.index(value)
554        # Create console command
555        if self.per_port_dualrole_setting is True:
556            cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index])
557        elif self.per_port_dualrole_setting is False:
558            cmd = 'pd dualrole %s' % (self.DUALROLE_CMD_ARGS[dual_index])
559        else:
560            raise error.TestFail("dualrole error")
561
562        self.console.send_command(cmd)
563        time.sleep(self.DUALROLE_QUERY_DELAY)
564        # Get current setting to verify that command was successful
565        dual = self.get_pd_dualrole(port)
566        # If it doesn't match, then raise error
567        if dual != value:
568            raise error.TestFail("dualrole error: " + value + " != " + dual)
569
570    def get_src_connect_states(self):
571        """Returns the name of the SRC state
572
573        """
574        return self.SRC_CONNECT
575
576    def get_snk_connect_states(self):
577        """Returns the name of the SRC state
578
579        """
580        return self.SNK_CONNECT
581
582    def is_snk_discovery_state(self, port):
583        """Returns true if in snk discovery state, else false
584
585        @param port: Type C PD port 0/1
586
587        @return: True if in SNK Discovery state
588        """
589        state = self.get_pd_state(port)
590        return state == self.SNK_DISCOVERY
591
592class TCPMv2ConsoleUtils(PDConsoleUtils):
593    """ Provides a set of methods common to USB PD TCPMv1 FAFT tests
594
595    Each instance of this class is associated with a particular
596    servo UART console. USB PD tests will typically use the console
597    command 'pd' and its subcommands to control/monitor Type C PD
598    connections. The servo object used for UART operations is
599    passed in and stored when this object is created.
600
601    """
602    SRC_CONNECT = ('Attached.SRC', 'UnorientedDebugAccessory.SRC')
603    SNK_CONNECT = ('Attached.SNK', 'DebugAccessory.SNK')
604    SRC_DISC = 'Unattached.SRC'
605    SNK_DISC = 'Unattached.SNK'
606    DRP_AUTO_TOGGLE = 'DRPAutoToggle'
607    LOW_POWER_MODE = 'LowPowerMode'
608    SNK_DISCOVERY = 'PE_SNK_DISCOVERY'
609    DISCONNECTED_STATES = (SRC_DISC, SNK_DISC, DRP_AUTO_TOGGLE, LOW_POWER_MODE)
610
611    PD_MAX_PORTS = 2
612    CONNECT_TIME = 4
613
614    CURRENT_STATE_PROBE_DELAY = 2
615    DUALROLE_QUERY_DELAY = 1
616    # Dualrole input/output values of methods in this class.
617    DUALROLE_VALUES = ['on', 'off', 'sink', 'source']
618    # Strings passing to the console command "pd dualrole"
619    DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source']
620    # Strings returned from the console command "pd dualrole"
621    DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source']
622
623    # Some old firmware uses a single dualrole setting for all ports; while
624    # some new firmware uses a per port dualrole setting. This flag will be
625    # initialized to True or False.
626    # TODO: Remove this flag when the old setting phases out
627    per_port_dualrole_setting = None
628
629    # Dictionary for 'pd 0/1 state' parsing
630    PD_STATE_DICT = {
631        'port': 'Port\s+([\w]+)',
632        'role': 'Role:\s+([\w]+-[\w]+)',
633        'pd_state': 'TC State:\s+([\w().]+)',
634        'flags': 'Flags:\s+([\w]+)',
635        'pe_state': 'PE State:\s+(\w*)',
636        'polarity': '(CC\d)'
637    }
638
639    # Regex to match PD state name; work for both old and new formats
640    RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?"
641
642    # Dictionary for PD control message types
643    PD_CONTROL_MSG_MASK = 0x1f
644    PD_CONTROL_MSG_DICT = {
645        'GoodCRC': 1,
646        'GotoMin': 2,
647        'Accept': 3,
648        'Reject': 4,
649        'Ping': 5,
650        'PS_RDY': 6,
651        'Get_Source_Cap': 7,
652        'Get_Sink_Cap': 8,
653        'DR_Swap': 9,
654        'PR_Swap': 10,
655        'VCONN_Swap': 11,
656        'Wait': 12,
657        'Soft_Reset': 13
658    }
659
660    # Dictionary for PD firmware state flags
661    PD_STATE_FLAGS_DICT = {
662        'power_swap': 1 << 1,
663        'data_swap': 1 << 2,
664        'data_swap_active': 1 << 3,
665        'vconn_on': 1 << 12
666    }
667
668    def get_pe_state(self, port):
669        """Get the current Policy Engine state
670
671        @param port: Type C PD port 0/1
672        @returns: current pe state
673        """
674
675        pd_dict = self.execute_pd_state_cmd(port)
676        return pd_dict['pe_state']
677
678    def get_pd_state(self, port):
679        """Get the current PD state
680
681        @param port: Type C PD port 0/1
682        @returns: current pd state
683        """
684
685        pd_dict = self.execute_pd_state_cmd(port)
686        return pd_dict['pd_state']
687
688    def set_pd_dualrole(self, port, value):
689        """Set pd dualrole
690
691        It can be set to either:
692        1. on
693        2. off
694        3. snk (force sink mode)
695        4. src (force source mode)
696        After setting, the current value is read to confirm that it
697        was set properly.
698
699        @param port: Type C PD port 0/1
700        @param value: One of the 4 options listed
701        """
702        dualrole_values = self.DUALROLE_VALUES
703
704        if value == 'src':
705            value = 'source'
706        elif value == 'snk':
707            value = 'sink'
708
709        # Get string required for console command
710        dual_index = dualrole_values.index(value)
711        # Create console command
712        cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index])
713        self.console.send_command(cmd)
714        time.sleep(self.DUALROLE_QUERY_DELAY)
715        # Get current setting to verify that command was successful
716        dual = self.get_pd_dualrole(port)
717        # If it doesn't match, then raise error
718        if dual != value:
719            raise error.TestFail("dualrole error: " + value + " != " + dual)
720
721    def get_src_connect_states(self):
722        """Returns the name of the SRC states
723
724        @returns: List of connected source state names
725        """
726        return self.SRC_CONNECT
727
728    def get_snk_connect_states(self):
729        """Returns the name of the SRC states
730
731        @returns: List of connected sink state names
732        """
733        return self.SNK_CONNECT
734
735    def is_snk_discovery_state(self, port):
736        """Returns true if in snk discovery state, else false
737
738        @param port: Type C PD port 0/1
739
740        @return: True if in SNK Discovery state
741        """
742        state = self.get_pe_state(port)
743        return state == self.SNK_DISCOVERY
744
745class PDConnectionUtils(PDConsoleUtils):
746    """Provides a set of methods common to USB PD FAFT tests
747
748    This class is used for PD utility methods that require access
749    to both PDTester and DUT PD consoles.
750
751    """
752
753    def __init__(self, dut_console, pdtester_console):
754        """
755        @param dut_console: PD console object for DUT
756        @param pdtester_console: PD console object for PDTester
757        """
758        # save console for DUT PD UART access functions
759        self.dut_console = dut_console
760        # save console for PDTester UART access functions
761        self.pdtester_console = pdtester_console
762
763    def _verify_pdtester_connection(self, port):
764        """Verify DUT to PDTester PD connection
765
766        This method checks for a PDTester PD connection for the
767        given port by first verifying if a PD connection is present.
768        If found, then it uses a PDTester feature to force a PD disconnect.
769        If the port is no longer in the connected state, and following
770        a delay, is found to be back in the connected state, then
771        a DUT pd to PDTester connection is verified.
772
773        @param port: DUT pd port to test
774
775        @returns True if DUT to PDTester pd connection is verified
776        """
777        DISCONNECT_CHECK_TIME = 2
778        DISCONNECT_TIME_SEC = 10
779        # pdtester console command to force PD disconnect
780        disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
781        # Only check for PDTester if DUT has active PD connection
782        if self.dut_console.is_pd_connected(port):
783            # Attempt to force PD disconnection
784            self.pdtester_console.send_pd_command(disc_cmd)
785            time.sleep(DISCONNECT_CHECK_TIME)
786            # Verify that DUT PD port is no longer connected
787            if self.dut_console.is_pd_connected(port) == False:
788                # Wait for disconnect timer and give time to reconnect
789                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
790                if self.dut_console.is_pd_connected(port):
791                    logging.info('PDTester connection verified on port %d',
792                                 port)
793                    return True
794            else:
795                # Could have disconnected other port, allow it to reconnect
796                # before exiting.
797                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
798        return False
799
800    def find_dut_to_pdtester_connection(self):
801        """Find the PD port which is connected to PDTester
802
803        @returns DUT pd port number if found, None otherwise
804        """
805        for port in range(self.dut_console.PD_MAX_PORTS):
806            # Check for DUT to PDTester connection on port
807            if self._verify_pdtester_connection(port):
808                # PDTester PD connection found so exit
809                return port
810        return None
811
812def create_pd_console_utils(console):
813    """Factory that detects the proper PDConsole Utils to use for DUT
814
815    @param console: DUT PD console
816
817    @returns: An instance of TCPMv1ConsoleUtils or TCPMv2ConsoleUtils
818    """
819    pd_console_utils = {
820        1: TCPMv1ConsoleUtils,
821        2: TCPMv2ConsoleUtils,
822    }
823
824    version = PDConsoleUtils(console).get_pd_version()
825    logging.debug('%s is TCPM v%s', console, version)
826    cls = pd_console_utils[version]
827    return cls(console)
828