1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import re 7import time 8 9from autotest_lib.client.common_lib import error 10from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 11from autotest_lib.server.cros.servo import pd_console 12 13 14class firmware_PDDataSwap(FirmwareTest): 15 """ 16 Servo based USB PD data role swap test 17 18 Pass critera is all data role swaps complete, or 19 a reject control message is received from the DUT in the 20 cases where the swap does not complete. 21 22 """ 23 version = 1 24 25 PD_ROLE_DELAY = 0.5 26 PD_CONNECT_DELAY = 4 27 PLANKTON_PORT = 0 28 DATA_SWAP_ITERATIONS = 10 29 # Upward facing port data role 30 UFP = 'UFP' 31 # Downward facing port data role 32 DFP = 'DFP' 33 # Plankton initiated data swap request 34 PLANKTON_SWAP_REQ = 'pd %d swap data' % PLANKTON_PORT 35 # Swap Result Tables 36 swap_attempt = { 37 ('rx', DFP): 0, 38 ('rx', UFP): 0, 39 ('tx', DFP): 0, 40 ('tx', UFP): 0 41 } 42 swap_failure = { 43 ('rx', DFP): 0, 44 ('rx', UFP): 0, 45 ('tx', DFP): 0, 46 ('tx', UFP): 0 47 } 48 49 def _verify_plankton_connection(self, port): 50 """Verify if DUT to Plankton PD connection 51 52 This method checks for a Plankton PD connection for the 53 given port by first verifying if a PD connection is present. 54 If found, then it uses a Plankton feature to force a PD disconnect. 55 If the port is no longer in the connected state, and following 56 a delay, is found to be back in the connected state, then 57 a DUT pd to Plankton connection is verified. 58 59 @param port: DUT pd port to test 60 61 @returns True if DUT to Plankton pd connection is verified 62 """ 63 DISCONNECT_TIME_SEC = 2 64 # plankton console command to force PD disconnect 65 disc_cmd = 'fake_disconnect 100 %d' % (DISCONNECT_TIME_SEC*1000) 66 # Only check for Plankton if DUT has active PD connection 67 if self.dut_pd_utils.is_pd_connected(port): 68 # Attempt to force PD disconnection 69 self.plankton_pd_utils.send_pd_command(disc_cmd) 70 time.sleep(self.PD_ROLE_DELAY) 71 # Verify that DUT PD port is no longer connected 72 if self.dut_pd_utils.is_pd_connected(port) == False: 73 # Wait for disconnect timer and give time to reconnect 74 time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC) 75 if self.dut_pd_utils.is_pd_connected(port): 76 logging.info('Plankton connection verfied on port %d', port) 77 return True 78 else: 79 # Could have disconnected other port, allow it to reconnect 80 # before exiting. 81 time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC) 82 return False 83 84 def _find_dut_to_plankton_connection(self): 85 """Find the PD port which is connected to Plankton 86 87 @returns DUT pd port number if found, None otherwise 88 """ 89 for port in xrange(self.dut_pd_utils.PD_MAX_PORTS): 90 # Check for DUT to Plankton connection on port 91 if self._verify_plankton_connection(port): 92 # Plankton PD connection found so exit 93 return port 94 return None 95 96 def _get_data_role(self, console, port): 97 """Get data role of PD connection 98 99 @param console: pd console object for uart access 100 @param port: 0/1 pd port of current connection 101 102 @returns: 'DFP' or 'UFP' 103 """ 104 role = console.get_pd_role(port) 105 m = re.search('[\w]+-([\w]+)', role) 106 return m.group(1) 107 108 def _get_remote_role(self, local_role): 109 """Invert data role 110 111 @param local_role: data role to be flipped 112 113 @returns: flipped data role value 114 """ 115 if local_role == self.DFP: 116 return self.UFP 117 else: 118 return self.DFP 119 120 def _change_dut_power_role(self, port): 121 """Force power role change via Plankton 122 123 @param port: port of DUT PD connection 124 125 @returns True is power role change is successful 126 """ 127 PLANKTON_SRC_VOLTAGE = 5 128 PLANKTON_SNK_VOLTAGE = 0 129 pd_state = self.dut_pd_utils.get_pd_state(port) 130 if pd_state == self.dut_pd_utils.SRC_CONNECT: 131 # DUT is currently a SRC, so change to SNK 132 # Use Plankton method to ensure power role change 133 self.plankton.charge(PLANKTON_SRC_VOLTAGE) 134 else: 135 # DUT is currently a SNK, so change it to a SRC. 136 self.plankton.charge(PLANKTON_SNK_VOLTAGE) 137 # Wait for change to take place 138 time.sleep(self.PD_CONNECT_DELAY) 139 plankton_state = self.plankton_pd_utils.get_pd_state(0) 140 # Current Plankton state should equal DUT state when called 141 return bool(pd_state == plankton_state) 142 143 def _send_data_swap_get_reply(self, console, port): 144 """Send data swap request, get PD control msg reply 145 146 The PD console debug mode is enabled prior to sending 147 a pd data role swap request message. This allows the 148 control message reply to be extracted. The debug mode 149 is disabled prior to exiting. 150 151 @param console: pd console object for uart access 152 153 @ returns: PD control header message 154 """ 155 # Enable PD console debug mode to show control messages 156 console.enable_pd_console_debug() 157 cmd = 'pd %d swap data' % port 158 m = console.send_pd_command_get_output(cmd, ['RECV\s([\w]+)']) 159 ctrl_msg = int(m[0][1], 16) & console.PD_CONTROL_MSG_MASK 160 console.disable_pd_console_debug() 161 return ctrl_msg 162 163 def _attempt_data_swap(self, pd_port, direction): 164 """Perform a data role swap request 165 166 Data swap requests can be either initiated by the DUT or received 167 by the DUT. This direction determines which PD console is used 168 to initiate the swap command. The data role before and after 169 the swap command are compared to determine if it took place. 170 171 Even if data swap capability is advertised, a PD device is allowed 172 to reject the request. Therefore, not swapping isn't itself a 173 failure. When Plankton is used to initate the request, the debug 174 mode is enabled which allows the control message from the DUT to 175 be analyzed. If the swap does not occur, but the request is rejected 176 by the DUT then that is not counted as a failure. 177 178 @param pd_port: DUT pd port value 0/1 179 @param direction: rx or tx from the DUT perspective 180 181 @returns PD control reply message for tx swaps, 0 otherwise 182 """ 183 # Get starting DUT data role 184 dut_dr = self._get_data_role(self.dut_pd_utils, pd_port) 185 self.swap_attempt[(direction, dut_dr)] += 1 186 if direction == 'tx': 187 # Initiate swap request from the DUT 188 console = self.dut_pd_utils 189 cmd = 'pd %d swap data' % pd_port 190 # Send the 'swap data' command 191 self.dut_pd_utils.send_pd_command(cmd) 192 # Not using debug mode, so there is no reply message 193 ctrl = 0 194 else: 195 # Initiate swap request from Plankton 196 console = self.plankton_pd_utils 197 ctrl = self._send_data_swap_get_reply(console, self.PLANKTON_PORT) 198 199 time.sleep(self.PD_ROLE_DELAY) 200 # Get DUT current data role 201 swap_dr = self._get_data_role(self.dut_pd_utils, pd_port) 202 logging.info('%s swap attempt: prev = %s, new = %s, msg = %s', 203 direction, dut_dr, swap_dr, ctrl) 204 if (dut_dr == swap_dr and 205 ctrl != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']): 206 self.swap_failure[(direction, dut_dr)] += 1 207 return ctrl 208 209 def _execute_data_role_swap_test(self, pd_port): 210 """Execute a series of data role swaps 211 212 Attempt both rx and tx data swaps, from perspective of DUT. 213 Even if the DUT advertises support, it can 214 reject swap requests when already in the desired data role. For 215 example many devices will not swap if already in DFP mode. 216 However, Plankton should always accept a request. Therefore, 217 when a swap failed on a rx swap, then that is followed by 218 a tx swap attempt. 219 220 @param pd_port: port number of DUT PD connection 221 """ 222 for attempt in xrange(self.DATA_SWAP_ITERATIONS): 223 # Use the same direction for every 2 loop iterations 224 if attempt & 2: 225 direction = 'tx' 226 else: 227 direction = 'rx' 228 ctrl_msg = self._attempt_data_swap(pd_port, direction) 229 if (direction == 'rx' and 230 ctrl_msg == 231 self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']): 232 # Use plankton initated swap to change roles 233 self._attempt_data_swap(pd_port, 'tx') 234 235 def _test_data_swap_reject(self, pd_port): 236 """Verify that data swap request is rejected 237 238 This tests the case where the DUT doesn't advertise support 239 for data swaps. A data request is sent by Plankton, and then 240 the control message checked to ensure the request was rejected. 241 In addition, the data role and connection state are verified 242 to remain unchanged. 243 244 @param pd_port: port for DUT pd connection 245 """ 246 # Get current DUT data role 247 dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port) 248 dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port) 249 # Send swap command from Plankton and get reply 250 ctrl_msg = self._send_data_swap_get_reply(self.plankton_pd_utils, 251 self.PLANKTON_PORT) 252 if ctrl_msg != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']: 253 raise error.TestFail('Data Swap Req not rejected, returned %r' % 254 ctrl_msg) 255 # Get DUT current state 256 pd_state = self.dut_pd_utils.get_pd_state(pd_port) 257 if pd_state != dut_connect_state: 258 raise error.TestFail('PD not connected! pd_state = %r' % 259 pd_state) 260 # Since reject message was received, verify data role didn't change 261 curr_dr = self._get_data_role(self.dut_pd_utils, pd_port) 262 if curr_dr != dut_data_role: 263 raise error.TestFail('Unexpected PD data role change') 264 265 def initialize(self, host, cmdline_args): 266 super(firmware_PDDataSwap, self).initialize(host, cmdline_args) 267 # Only run in normal mode 268 self.switcher.setup_mode('normal') 269 self.usbpd.send_command('chan 0') 270 271 def cleanup(self): 272 self.usbpd.send_command('chan 0xffffffff') 273 super(firmware_PDDataSwap, self).cleanup() 274 275 def run_once(self): 276 """Exectue Data Role swap test. 277 278 1. Verify that pd console is accessible 279 2. Verify that DUT has a valid PD contract 280 3. Determine if DUT advertises support for data swaps 281 4. Test DUT initiated and received data swaps 282 5. Swap power roles if supported 283 6. Repeat DUT received data swap requests 284 285 """ 286 # create objects for pd utilities 287 self.dut_pd_utils = pd_console.PDConsoleUtils(self.usbpd) 288 self.plankton_pd_utils = pd_console.PDConsoleUtils(self.plankton) 289 290 # Make sure PD support exists in the UART console 291 if self.dut_pd_utils.verify_pd_console() == False: 292 raise error.TestFail("pd command not present on console!") 293 294 # Type C connection (PD contract) should exist at this point 295 # For this test, the DUT must be connected to a Plankton. 296 pd_port = self._find_dut_to_plankton_connection() 297 if pd_port == None: 298 raise error.TestFail("DUT to Plankton PD connection not found") 299 dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port) 300 logging.info('Initial DUT connect state = %s', dut_connect_state) 301 302 # Determine if DUT supports data role swaps 303 dr_swap_allowed = self.plankton_pd_utils.is_pd_flag_set( 304 self.PLANKTON_PORT, 'data_swap') 305 # Get current DUT data role 306 dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port) 307 logging.info('Starting DUT Data Role = %r', dut_data_role) 308 309 # If data swaps are not allowed on the DUT, then still 310 # attempt a data swap and verify that the request is 311 # rejected by the DUT and that it remains connected and 312 # in the same role. 313 if dr_swap_allowed == False: 314 logging.info('Data Swap support not advertised by DUT') 315 self._test_data_swap_reject(pd_port) 316 logging.info('Data Swap request rejected by DUT as expected') 317 else: 318 # Data role swap support advertised, test this feature. 319 self._execute_data_role_swap_test(pd_port) 320 321 # If DUT supports Power Role swap then attempt to change roles. 322 # This way, data role swaps will be tested in both configurations. 323 if self.plankton_pd_utils.is_pd_flag_set( 324 self.PLANKTON_PORT, 'power_swap'): 325 logging.info('\nDUT advertises Power Swap Support') 326 # Attempt to swap power roles 327 power_swap = self._change_dut_power_role(pd_port) 328 if power_swap: 329 self._execute_data_role_swap_test(pd_port) 330 else: 331 logging.warn('Power swap not successful!') 332 logging.warn('Only tested with DUT in %s state', 333 dut_connect_state) 334 else: 335 logging.info('DUT does not advertise power swap support') 336 337 logging.info('***************** Swap Results ********************') 338 total_attempts = 0 339 total_failures = 0 340 for direction, role in self.swap_attempt.iterkeys(): 341 logging.info('%s %s swap attempts = %d, failures = %d', 342 direction, role, 343 self.swap_attempt[(direction, role)], 344 self.swap_failure[(direction, role)]) 345 total_attempts += self.swap_attempt[(direction, role)] 346 total_failures += self.swap_failure[(direction, role)] 347 348 # If any swap attempts were not successful, flag test as failure 349 if total_failures: 350 raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' % 351 (total_attempts, total_failures)) 352