• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7import time
8
9from autotest_lib.client.common_lib import error
10
11class PDConsoleUtils(object):
12    """ Provides a set of methods common to USB PD FAFT tests
13
14    Each instance of this class is associated with a particular
15    servo UART console. USB PD tests will typically use the console
16    command 'pd' and its subcommands to control/monitor Type C PD
17    connections. The servo object used for UART operations is
18    passed in and stored when this object is created.
19
20    """
21
22    SRC_CONNECT = 'SRC_READY'
23    SNK_CONNECT = 'SNK_READY'
24    SRC_DISC = 'SRC_DISCONNECTED'
25    SNK_DISC = 'SNK_DISCONNECTED'
26    DRP_AUTO_TOGGLE = 'DRP_AUTO_TOGGLE'
27    PD_MAX_PORTS = 2
28    CONNECT_TIME = 4
29
30    DUALROLE_QUERY_DELAY = 0.25
31    # Dualrole input/ouput values of methods in this class.
32    DUALROLE_VALUES = ['on', 'off', 'snk', 'src']
33    # Strings passing to the console command "pd dualrole"
34    DUALROLE_CMD_ARGS = ['on', 'off', 'sink', 'source']
35    # Strings returned from the console command "pd dualrole"
36    DUALROLE_CMD_RESULTS = ['on', 'off', 'force sink', 'force source']
37
38    # Some old firmware uses a single dualrole setting for all ports; while
39    # some new firmware uses a per port dualrole settting. This flag will be
40    # initialized to True or False.
41    # TODO: Remove this flag when the old setting phases out
42    per_port_dualrole_setting = None
43
44    # Dictionary for 'pd 0/1 state' parsing
45    PD_STATE_DICT = {
46        'port': 'Port\s+([\w]+)',
47        'role': 'Role:\s+([\w]+-[\w]+)',
48        'pd_state': 'State:\s+([\d\w()_]+)',
49        'flags': 'Flags:\s+([\w]+)',
50        'polarity': '(CC\d)'
51    }
52
53    # Regex to match PD state name; work for both old and new formats
54    RE_PD_STATE = r"(\d+)?\(?([\w_]+)?\)?"
55    # Copied from ec repo: common/usb_pd_protocol.c
56    PD_STATE_NAMES = [
57        "DISABLED",                   # index: 0
58        "SUSPENDED",
59        "SNK_DISCONNECTED",
60        "SNK_DISCONNECTED_DEBOUNCE",
61        "SNK_HARD_RESET_RECOVER",
62        "SNK_DISCOVERY",              # index: 5
63        "SNK_REQUESTED",
64        "SNK_TRANSITION",
65        "SNK_READY",
66        "SNK_SWAP_INIT",
67        "SNK_SWAP_SNK_DISABLE",       # index: 10
68        "SNK_SWAP_SRC_DISABLE",
69        "SNK_SWAP_STANDBY",
70        "SNK_SWAP_COMPLETE",
71        "SRC_DISCONNECTED",
72        "SRC_DISCONNECTED_DEBOUNCE",  # index: 15
73        "SRC_HARD_RESET_RECOVER",
74        "SRC_STARTUP",
75        "SRC_DISCOVERY",
76        "SRC_NEGOCIATE",
77        "SRC_ACCEPTED",               # index: 20
78        "SRC_POWERED",
79        "SRC_TRANSITION",
80        "SRC_READY",
81        "SRC_GET_SNK_CAP",
82        "DR_SWAP",                    # index: 25
83        "SRC_SWAP_INIT",
84        "SRC_SWAP_SNK_DISABLE",
85        "SRC_SWAP_SRC_DISABLE",
86        "SRC_SWAP_STANDBY",
87        "VCONN_SWAP_SEND",            # index: 30
88        "VCONN_SWAP_INIT",
89        "VCONN_SWAP_READY",
90        "SOFT_RESET",
91        "HARD_RESET_SEND",
92        "HARD_RESET_EXECUTE",         # index: 35
93        "BIST_RX",
94        "BIST_TX",
95        "DRP_AUTO_TOGGLE",
96    ]
97
98    # Dictionary for PD control message types
99    PD_CONTROL_MSG_MASK = 0x1f
100    PD_CONTROL_MSG_DICT = {
101        'GoodCRC': 1,
102        'GotoMin': 2,
103        'Accept': 3,
104        'Reject': 4,
105        'Ping': 5,
106        'PS_RDY': 6,
107        'Get_Source_Cap': 7,
108        'Get_Sink_Cap': 8,
109        'DR_Swap': 9,
110        'PR_Swap': 10,
111        'VCONN_Swap': 11,
112        'Wait': 12,
113        'Soft_Reset': 13
114    }
115
116    # Dictionary for PD firmware state flags
117    PD_STATE_FLAGS_DICT = {
118        'power_swap': 1 << 1,
119        'data_swap': 1 << 2,
120        'data_swap_active': 1 << 3,
121        'vconn_on': 1 << 12
122    }
123
124    def __init__(self, console):
125        """Console can be either usbpd, ec, or pdtester UART
126        This object with then be used by the class which creates
127        the PDConsoleUtils class to send/receive commands to UART
128        """
129        # save console for UART access functions
130        self.console = console
131
132    def send_pd_command(self, cmd):
133        """Send command to PD console UART
134
135        @param cmd: pd command string
136        """
137        self.console.send_command(cmd)
138
139    def send_pd_command_get_output(self, cmd, regexp):
140        """Send command to PD console, wait for response
141
142        @param cmd: pd command string
143        @param regexp: regular expression for desired output
144        """
145        # Enable PD console debug mode to show control messages
146        self.enable_pd_console_debug()
147        output = self.console.send_command_get_output(cmd, regexp)
148        self.disable_pd_console_debug()
149        return output
150
151    def send_pd_command_get_reply_msg(self, cmd):
152        """Send PD protocol msg, get PD control msg reply
153
154        The PD console debug mode is enabled prior to sending
155        a pd protocol message. This allows the
156        control message reply to be extracted. The debug mode
157        is disabled prior to exiting.
158
159        @param cmd: pd command to issue to the UART console
160
161        @returns: PD control header message
162        """
163        m = self.send_pd_command_get_output(cmd, ['RECV\s([\w]+)\W'])
164        ctrl_msg = int(m[0][1], 16) & self.PD_CONTROL_MSG_MASK
165        return ctrl_msg
166
167    def verify_pd_console(self):
168        """Verify that PD commands exist on UART console
169
170        Send 'help' command to UART console
171        @returns: True if 'pd' is found, False if not
172        """
173
174        l = self.console.send_command_get_output('help', ['(pd)\s+([\w]+)'])
175        if l[0][1] == 'pd':
176            return True
177        else:
178            return False
179
180    def execute_pd_state_cmd(self, port):
181        """Get PD state for specified channel
182
183        pd 0/1 state command gives produces 5 fields. The full response
184        line is captured and then parsed to extract each field to fill
185        the dict containing port, polarity, role, pd_state, and flags.
186
187        @param port: Type C PD port 0 or 1
188
189        @returns: A dict with the 5 fields listed above
190        @raises: TestFail if any field not found
191        """
192        cmd = 'pd'
193        subcmd = 'state'
194        pd_cmd = cmd +" " + str(port) + " " + subcmd
195        # Two FW versions for this command, get full line.
196        m = self.send_pd_command_get_output(pd_cmd,
197                                            ['(Port.*) - (Role:.*)\n'])
198
199        # Extract desired values from result string
200        state_result = {}
201        for key, regexp in self.PD_STATE_DICT.iteritems():
202            value = re.search(regexp, m[0][0])
203            if value:
204                state_result[key] = value.group(1)
205            else:
206                raise error.TestFail('pd %d state: %r value not found' %
207                                     (port, key))
208
209        return state_result
210
211    def _normalize_pd_state(self, state):
212        """Normalize the PD state name which handles both old and new formats.
213
214        The old format is like: "SNK_READY"
215        The new format is like: "8()" if debug_level == 0, or
216                                "8(SNK_READY)" if debug_level > 0
217
218        This method will convert the new format to the old one.
219
220        @param state: The raw PD state text
221
222        @returns: The normalized PD state name
223        @raises: TestFail if unexpected PD state format
224        """
225        m = re.match(self.RE_PD_STATE, state)
226        if m and any(m.groups()):
227            state_index, state_name = m.groups()
228            if state_index is None:
229                # The old format: return the name
230                return state_name
231            # The new format: map the index to a name
232            mapped_name = self.PD_STATE_NAMES[int(state_index)]
233            if state_name is not None:
234                assert mapped_name == state_name
235            return mapped_name
236        else:
237            raise error.TestFail('Unexpected PD state format: %s' % state)
238
239    def get_pd_state(self, port):
240        """Get the current PD state
241
242        @param port: Type C PD port 0/1
243        @returns: current pd state
244        """
245
246        pd_dict = self.execute_pd_state_cmd(port)
247        return self._normalize_pd_state(pd_dict['pd_state'])
248
249    def get_pd_port(self, port):
250        """Get the current PD port
251
252        @param port: Type C PD port 0/1
253        @returns: current pd state
254        """
255        pd_dict = self.execute_pd_state_cmd(port)
256        return pd_dict['port']
257
258    def get_pd_role(self, port):
259        """Get the current PD power role (source or sink)
260
261        @param port: Type C PD port 0/1
262        @returns: current pd state
263        """
264        pd_dict = self.execute_pd_state_cmd(port)
265        return pd_dict['role']
266
267    def get_pd_flags(self, port):
268        """Get the current PD flags
269
270        @param port: Type C PD port 0/1
271        @returns: current pd state
272        """
273        pd_dict = self.execute_pd_state_cmd(port)
274        return pd_dict['flags']
275
276    def get_pd_dualrole(self, port):
277        """Get the current PD dualrole setting
278
279        @param port: Type C PD port 0/1
280        @returns: current PD dualrole setting, one of (on, off, snk, src)
281        """
282        if self.per_port_dualrole_setting is True:
283            cmd = 'pd %d dualrole' % port
284        elif self.per_port_dualrole_setting is False:
285            cmd = 'pd dualrole'
286        else:
287            try:
288                logging.info('The per_port_dualrole_setting is unknown; '
289                             'try the True case')
290                self.per_port_dualrole_setting = True
291                return self.get_pd_dualrole(port)
292            except:
293                logging.info('The per_port_dualrole_setting=True failed; '
294                             'try the False case')
295                self.per_port_dualrole_setting = False
296                return self.get_pd_dualrole(port)
297
298        m = self.send_pd_command_get_output(cmd,
299                ['dual-role toggling:\s+([\w ]+)[\r\n]'])
300        # Find the index according to the output of "pd dualrole" command
301        dual_index = self.DUALROLE_CMD_RESULTS.index(m[0][1])
302        # Map to a string which is the output of this method
303        return self.DUALROLE_VALUES[dual_index]
304
305    def set_pd_dualrole(self, port, value):
306        """Set pd dualrole
307
308        It can be set to either:
309        1. on
310        2. off
311        3. snk (force sink mode)
312        4. src (force source mode)
313        After setting, the current value is read to confirm that it
314        was set properly.
315
316        @param port: Type C PD port 0/1
317        @param value: One of the 4 options listed
318        """
319        # If the dualrole setting is not initialized, call the get method to
320        # initialize it.
321        if self.per_port_dualrole_setting is None:
322            self.get_pd_dualrole(port)
323
324        # Get string required for console command
325        dual_index = self.DUALROLE_VALUES.index(value)
326        # Create console command
327        cmd = 'pd %d dualrole %s' % (port, self.DUALROLE_CMD_ARGS[dual_index])
328        self.console.send_command(cmd)
329        time.sleep(self.DUALROLE_QUERY_DELAY)
330        # Get current setting to verify that command was successful
331        dual = self.get_pd_dualrole(port)
332        # If it doesn't match, then raise error
333        if dual != value:
334            raise error.TestFail("dualrole error: " + value + " != " + dual)
335
336    def query_pd_connection(self):
337        """Determine if PD connection is present
338
339        Try the 'pd 0/1 state' command and see if it's in either
340        expected state of a conneciton. Record the port number
341        that has an active connection
342
343        @returns: dict with params port, connect, and state
344        """
345        status = {}
346        port = 0;
347        status['connect'] = False
348        status['port'] = port
349        state = self.get_pd_state(port)
350        # Check port 0 first
351        if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
352            status['connect'] = True
353            status['role'] = state
354        else:
355            port = 1
356            status['port'] = port
357            state = self.get_pd_state(port)
358            # Check port 1
359            if state == self.SRC_CONNECT or state == self.SNK_CONNECT:
360                status['connect'] = True
361                status['role'] = state
362
363        return status
364
365    def swap_power_role(self, port):
366        """Attempt a power role swap
367
368        This method attempts to execute a power role swap. A check
369        is made to ensure that dualrole mode is enabled and that
370        a PD contract is currently established. If both checks pass,
371        then the power role swap command is issued. After a delay,
372        if a PD contract is established and the current state does
373        not equal the starting state, then it was successful.
374
375        @param port: pd port number
376
377        @returns: True if power swap is successful, False otherwise.
378        """
379        # Get starting state
380        if self.is_pd_dual_role_enabled(port) == False:
381            logging.info('Dualrole Mode not enabled!')
382            return False
383        if self.is_pd_connected(port) == False:
384            logging.info('PD contract not established!')
385            return False
386        current_pr = self.get_pd_state(port)
387        swap_cmd = 'pd %d swap power' % port
388        self.send_pd_command(swap_cmd)
389        time.sleep(self.CONNECT_TIME)
390        new_pr = self.get_pd_state(port)
391        logging.info('Power swap: %s -> %s', current_pr, new_pr)
392        if self.is_pd_connected(port) == False:
393            return False
394        return bool(current_pr != new_pr)
395
396    def disable_pd_console_debug(self):
397        """Turn off PD console debug
398
399        """
400        cmd = 'pd dump 0'
401        self.send_pd_command(cmd)
402
403    def enable_pd_console_debug(self):
404        """Enable PD console debug level 1
405
406        """
407        cmd = 'pd dump 2'
408        self.send_pd_command(cmd)
409
410    def is_pd_flag_set(self, port, key):
411        """Test a bit in PD protocol state flags
412
413        The flag word contains various PD protocol state information.
414        This method allows for a specific flag to be tested.
415
416        @param port: Port which has the active PD connection
417        @param key: dict key to retrieve the flag bit mapping
418
419        @returns True if the bit to be tested is set
420        """
421        pd_flags = self.get_pd_flags(port)
422        return bool(self.PD_STATE_FLAGS_DICT[key] & int(pd_flags, 16))
423
424    def is_pd_connected(self, port):
425        """Check if a PD connection is active
426
427        @param port: port to be used for pd console commands
428
429        @returns True if port is in connected state
430        """
431        state = self.get_pd_state(port)
432        return bool(state == self.SRC_CONNECT or state == self.SNK_CONNECT)
433
434    def is_pd_dual_role_enabled(self, port):
435        """Check if a PD device is in dualrole mode
436
437        @returns True is dualrole mode is active, false otherwise
438        """
439        drp = self.get_pd_dualrole(port)
440        return bool(drp == 'on')
441
442
443class PDConnectionUtils(PDConsoleUtils):
444    """Provides a set of methods common to USB PD FAFT tests
445
446    This class is used for PD utility methods that require access
447    to both PDTester and DUT PD consoles.
448
449    """
450
451    def __init__(self, dut_console, pdtester_console):
452        """
453        @param dut_console: PD console object for DUT
454        @param pdtester_console: PD console object for PDTester
455        """
456        # save console for DUT PD UART access functions
457        self.dut_console = dut_console
458        # save console for PDTester UART access functions
459        self.pdtester_console = pdtester_console
460        super(PDConnectionUtils, self).__init__(dut_console)
461
462    def _verify_pdtester_connection(self, port):
463        """Verify DUT to PDTester PD connection
464
465        This method checks for a PDTester PD connection for the
466        given port by first verifying if a PD connection is present.
467        If found, then it uses a PDTester feature to force a PD disconnect.
468        If the port is no longer in the connected state, and following
469        a delay, is found to be back in the connected state, then
470        a DUT pd to PDTester connection is verified.
471
472        @param port: DUT pd port to test
473
474        @returns True if DUT to PDTester pd connection is verified
475        """
476        DISCONNECT_CHECK_TIME = 0.5
477        DISCONNECT_TIME_SEC = 2
478        # pdtester console command to force PD disconnect
479        disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC * 1000)
480        # Only check for PDTester if DUT has active PD connection
481        if self.dut_console.is_pd_connected(port):
482            # Attempt to force PD disconnection
483            self.pdtester_console.send_pd_command(disc_cmd)
484            time.sleep(DISCONNECT_CHECK_TIME)
485            # Verify that DUT PD port is no longer connected
486            if self.dut_console.is_pd_connected(port) == False:
487                # Wait for disconnect timer and give time to reconnect
488                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
489                if self.dut_console.is_pd_connected(port):
490                    logging.info('PDTester connection verified on port %d',
491                                 port)
492                    return True
493            else:
494                # Could have disconnected other port, allow it to reconnect
495                # before exiting.
496                time.sleep(self.dut_console.CONNECT_TIME + DISCONNECT_TIME_SEC)
497        return False
498
499    def find_dut_to_pdtester_connection(self):
500        """Find the PD port which is connected to PDTester
501
502        @returns DUT pd port number if found, None otherwise
503        """
504        for port in xrange(self.dut_console.PD_MAX_PORTS):
505            # Check for DUT to PDTester connection on port
506            if self._verify_pdtester_connection(port):
507                # PDTester PD connection found so exit
508                return port
509        return None
510