• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import re
7import time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
11from autotest_lib.server.cros.servo import pd_console
12
13
14class firmware_PDDataSwap(FirmwareTest):
15    """
16    Servo based USB PD data role swap test
17
18    Pass critera is all data role swaps complete, or
19    a reject control message is received from the DUT in the
20    cases where the swap does not complete.
21
22    """
23    version = 1
24
25    PD_ROLE_DELAY = 0.5
26    PD_CONNECT_DELAY = 4
27    DATA_SWAP_ITERATIONS = 10
28    # Upward facing port data role
29    UFP = 'UFP'
30    # Downward facing port data role
31    DFP = 'DFP'
32    # Swap Result Tables
33    swap_attempt = {
34        ('rx', DFP): 0,
35        ('rx', UFP): 0,
36        ('tx', DFP): 0,
37        ('tx', UFP): 0
38    }
39    swap_failure = {
40        ('rx', DFP): 0,
41        ('rx', UFP): 0,
42        ('tx', DFP): 0,
43        ('tx', UFP): 0
44    }
45
46    def _verify_pdtester_connection(self, port):
47        """Verify if DUT to PDTester PD connection
48
49        This method checks for a PDTester PD connection for the
50        given port by first verifying if a PD connection is present.
51        If found, then it uses a PDTester feature to force a PD disconnect.
52        If the port is no longer in the connected state, and following
53        a delay, is found to be back in the connected state, then
54        a DUT pd to PDTester connection is verified.
55
56        @param port: DUT pd port to test
57
58        @returns True if DUT to PDTester pd connection is verified
59        """
60        DISCONNECT_TIME_SEC = 2
61        # pdtester console command to force PD disconnect
62        disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC*1000)
63        # Only check for PDTester if DUT has active PD connection
64        if self.dut_pd_utils.is_pd_connected(port):
65            # Attempt to force PD disconnection
66            self.pdtester_pd_utils.send_pd_command(disc_cmd)
67            time.sleep(self.PD_ROLE_DELAY)
68            # Verify that DUT PD port is no longer connected
69            if self.dut_pd_utils.is_pd_connected(port) == False:
70                # Wait for disconnect timer and give time to reconnect
71                time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC)
72                if self.dut_pd_utils.is_pd_connected(port):
73                    logging.info('PDTester connection verfied on port %d', port)
74                    return True
75            else:
76                # Could have disconnected other port, allow it to reconnect
77                # before exiting.
78                time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC)
79        return False
80
81    def _find_dut_to_pdtester_connection(self):
82        """Find the PD port which is connected to PDTester
83
84        @returns DUT pd port number if found, None otherwise
85        """
86        for port in xrange(self.dut_pd_utils.PD_MAX_PORTS):
87            # Check for DUT to PDTester connection on port
88            if self._verify_pdtester_connection(port):
89                # PDTester PD connection found so exit
90                return port
91        return None
92
93    def _get_data_role(self, console, port):
94        """Get data role of PD connection
95
96        @param console: pd console object for uart access
97        @param port: 0/1 pd port of current connection
98
99        @returns: 'DFP' or 'UFP'
100        """
101        role = console.get_pd_role(port)
102        m = re.search('[\w]+-([\w]+)', role)
103        return m.group(1)
104
105    def _get_remote_role(self, local_role):
106        """Invert data role
107
108        @param local_role: data role to be flipped
109
110        @returns: flipped data role value
111        """
112        if local_role == self.DFP:
113            return self.UFP
114        else:
115            return self.DFP
116
117    def _change_dut_power_role(self, port):
118        """Force power role change via PDTester
119
120        @param port: port of DUT PD connection
121
122        @returns True is power role change is successful
123        """
124        PDTESTER_SRC_VOLTAGE = 5
125        PDTESTER_SNK_VOLTAGE = 0
126        pd_state = self.dut_pd_utils.get_pd_state(port)
127        if pd_state == self.dut_pd_utils.SRC_CONNECT:
128            # DUT is currently a SRC, so change to SNK
129            # Use PDTester method to ensure power role change
130            self.pdtester.charge(PDTESTER_SRC_VOLTAGE)
131        else:
132            # DUT is currently a SNK, so change it to a SRC.
133            self.pdtester.charge(PDTESTER_SNK_VOLTAGE)
134        # Wait for change to take place
135        time.sleep(self.PD_CONNECT_DELAY)
136        pdtester_state = self.pdtester_pd_utils.get_pd_state(self.pdtester_port)
137        # Current PDTester state should equal DUT state when called
138        return bool(pd_state == pdtester_state)
139
140    def _send_data_swap_get_reply(self, console, port):
141        """Send data swap request, get PD control msg reply
142
143        The PD console debug mode is enabled prior to sending
144        a pd data role swap request message. This allows the
145        control message reply to be extracted. The debug mode
146        is disabled prior to exiting.
147
148        @param console: pd console object for uart access
149
150        @ returns: PD control header message
151        """
152        # Enable PD console debug mode to show control messages
153        console.enable_pd_console_debug()
154        cmd = 'pd %d swap data' % port
155        m = console.send_pd_command_get_output(cmd, ['RECV\s([\w]+)'])
156        ctrl_msg = int(m[0][1], 16) & console.PD_CONTROL_MSG_MASK
157        console.disable_pd_console_debug()
158        return ctrl_msg
159
160    def _attempt_data_swap(self, pd_port, direction):
161        """Perform a data role swap request
162
163        Data swap requests can be either initiated by the DUT or received
164        by the DUT. This direction determines which PD console is used
165        to initiate the swap command. The data role before and after
166        the swap command are compared to determine if it took place.
167
168        Even if data swap capability is advertised, a PD device is allowed
169        to reject the request. Therefore, not swapping isn't itself a
170        failure. When PDTester is used to initate the request, the debug
171        mode is enabled which allows the control message from the DUT to
172        be analyzed. If the swap does not occur, but the request is rejected
173        by the DUT then that is not counted as a failure.
174
175        @param pd_port: DUT pd port value 0/1
176        @param direction: rx or tx from the DUT perspective
177
178        @returns PD control reply message for tx swaps, 0 otherwise
179        """
180        # Get starting DUT data role
181        dut_dr = self._get_data_role(self.dut_pd_utils, pd_port)
182        self.swap_attempt[(direction, dut_dr)] += 1
183        if direction == 'tx':
184            # Initiate swap request from the DUT
185            console = self.dut_pd_utils
186            cmd = 'pd %d swap data' % pd_port
187            # Send the 'swap data' command
188            self.dut_pd_utils.send_pd_command(cmd)
189            # Not using debug mode, so there is no reply message
190            ctrl = 0
191        else:
192            # Initiate swap request from PDTester
193            console = self.pdtester_pd_utils
194            ctrl  = self._send_data_swap_get_reply(console, self.pdtester_port)
195
196        time.sleep(self.PD_ROLE_DELAY)
197        # Get DUT current data role
198        swap_dr = self._get_data_role(self.dut_pd_utils, pd_port)
199        logging.info('%s swap attempt: prev = %s, new = %s, msg = %s',
200                      direction, dut_dr, swap_dr, ctrl)
201        if (dut_dr == swap_dr and
202                ctrl != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']):
203            self.swap_failure[(direction, dut_dr)] += 1
204        return ctrl
205
206    def _execute_data_role_swap_test(self, pd_port):
207        """Execute a series of data role swaps
208
209        Attempt both rx and tx data swaps, from perspective of DUT.
210        Even if the DUT advertises support, it can
211        reject swap requests when already in the desired data role. For
212        example many devices will not swap if already in DFP mode.
213        However, PDTester should always accept a request. Therefore,
214        when a swap failed on a rx swap, then that is followed by
215        a tx swap attempt.
216
217        @param pd_port: port number of DUT PD connection
218        """
219        for attempt in xrange(self.DATA_SWAP_ITERATIONS):
220            # Use the same direction for every 2 loop iterations
221            if attempt & 2:
222                direction = 'tx'
223            else:
224                direction = 'rx'
225            ctrl_msg = self._attempt_data_swap(pd_port, direction)
226            if (direction == 'rx' and
227                    ctrl_msg ==
228                    self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']):
229                # Use pdtester initated swap to change roles
230                self._attempt_data_swap(pd_port, 'tx')
231
232    def _test_data_swap_reject(self, pd_port):
233        """Verify that data swap request is rejected
234
235        This tests the case where the DUT doesn't advertise support
236        for data swaps. A data request is sent by PDTester, and then
237        the control message checked to ensure the request was rejected.
238        In addition, the data role and connection state are verified
239        to remain unchanged.
240
241        @param pd_port: port for DUT pd connection
242        """
243        # Get current DUT data role
244        dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port)
245        dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port)
246        # Send swap command from PDTester and get reply
247        ctrl_msg = self._send_data_swap_get_reply(self.pdtester_pd_utils,
248                                                  self.pdtester_port)
249        if ctrl_msg != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']:
250            raise error.TestFail('Data Swap Req not rejected, returned %r' %
251                                 ctrl_msg)
252        # Get DUT current state
253        pd_state = self.dut_pd_utils.get_pd_state(pd_port)
254        if pd_state != dut_connect_state:
255            raise error.TestFail('PD not connected! pd_state = %r' %
256                                 pd_state)
257        # Since reject message was received, verify data role didn't change
258        curr_dr = self._get_data_role(self.dut_pd_utils, pd_port)
259        if curr_dr != dut_data_role:
260            raise error.TestFail('Unexpected PD data role change')
261
262    def initialize(self, host, cmdline_args, flip_cc=False):
263        super(firmware_PDDataSwap, self).initialize(host, cmdline_args)
264        self.setup_pdtester(flip_cc)
265        # Only run in normal mode
266        self.switcher.setup_mode('normal')
267        self.usbpd.send_command('chan 0')
268
269    def cleanup(self):
270        self.usbpd.send_command('chan 0xffffffff')
271        super(firmware_PDDataSwap, self).cleanup()
272
273    def run_once(self):
274        """Exectue Data Role swap test.
275
276        1. Verify that pd console is accessible
277        2. Verify that DUT has a valid PD contract
278        3. Determine if DUT advertises support for data swaps
279        4. Test DUT initiated and received data swaps
280        5. Swap power roles if supported
281        6. Repeat DUT received data swap requests
282
283        """
284        # TODO(b/35573842): Refactor to use PDPortPartner to probe the port
285        self.pdtester_port = 1 if 'servo_v4' in self.pdtester.servo_type else 0
286
287        # create objects for pd utilities
288        self.dut_pd_utils = pd_console.PDConsoleUtils(self.usbpd)
289        self.pdtester_pd_utils = pd_console.PDConsoleUtils(self.pdtester)
290
291        # Make sure PD support exists in the UART console
292        if self.dut_pd_utils.verify_pd_console() == False:
293            raise error.TestFail("pd command not present on console!")
294
295        # Type C connection (PD contract) should exist at this point
296        # For this test, the DUT must be connected to a PDTester.
297        pd_port = self._find_dut_to_pdtester_connection()
298        if pd_port == None:
299            raise error.TestFail("DUT to PDTester PD connection not found")
300        dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port)
301        logging.info('Initial DUT connect state = %s', dut_connect_state)
302
303        # Determine if DUT supports data role swaps
304        dr_swap_allowed = self.pdtester_pd_utils.is_pd_flag_set(
305                self.pdtester_port, 'data_swap')
306        # Get current DUT data role
307        dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port)
308        logging.info('Starting DUT Data Role = %r', dut_data_role)
309
310        # If data swaps are not allowed on the DUT, then still
311        # attempt a data swap and verify that the request is
312        # rejected by the DUT and that it remains connected and
313        # in the same role.
314        if dr_swap_allowed == False:
315            logging.info('Data Swap support not advertised by DUT')
316            self._test_data_swap_reject(pd_port)
317            logging.info('Data Swap request rejected by DUT as expected')
318        else:
319            # Data role swap support advertised, test this feature.
320            self._execute_data_role_swap_test(pd_port)
321
322            # If DUT supports Power Role swap then attempt to change roles.
323            # This way, data role swaps will be tested in both configurations.
324            if self.pdtester_pd_utils.is_pd_flag_set(
325                     self.pdtester_port, 'power_swap'):
326                logging.info('\nDUT advertises Power Swap Support')
327                # Attempt to swap power roles
328                power_swap = self._change_dut_power_role(pd_port)
329                if power_swap:
330                    try:
331                        self._execute_data_role_swap_test(pd_port)
332                    finally:
333                        # Swap power role, back to the original
334                        self._change_dut_power_role(pd_port)
335                else:
336                    logging.warn('Power swap not successful!')
337                    logging.warn('Only tested with DUT in %s state',
338                                 dut_connect_state)
339            else:
340                logging.info('DUT does not advertise power swap support')
341
342            logging.info('***************** Swap Results ********************')
343            total_attempts = 0
344            total_failures = 0
345            for direction, role in self.swap_attempt.iterkeys():
346                logging.info('%s %s swap attempts = %d, failures = %d',
347                             direction, role,
348                             self.swap_attempt[(direction, role)],
349                             self.swap_failure[(direction, role)])
350                total_attempts += self.swap_attempt[(direction, role)]
351                total_failures += self.swap_failure[(direction, role)]
352
353            # If any swap attempts were not successful, flag test as failure
354            if total_failures:
355                raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' %
356                                 (total_attempts, total_failures))
357