1# Copyright 2016 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import re 7import time 8 9from autotest_lib.client.common_lib import error 10from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 11from autotest_lib.server.cros.servo import pd_console 12 13 14class firmware_PDDataSwap(FirmwareTest): 15 """ 16 Servo based USB PD data role swap test 17 18 Pass critera is all data role swaps complete, or 19 a reject control message is received from the DUT in the 20 cases where the swap does not complete. 21 22 """ 23 version = 1 24 25 PD_ROLE_DELAY = 0.5 26 PD_CONNECT_DELAY = 4 27 DATA_SWAP_ITERATIONS = 10 28 # Upward facing port data role 29 UFP = 'UFP' 30 # Downward facing port data role 31 DFP = 'DFP' 32 # Swap Result Tables 33 swap_attempt = { 34 ('rx', DFP): 0, 35 ('rx', UFP): 0, 36 ('tx', DFP): 0, 37 ('tx', UFP): 0 38 } 39 swap_failure = { 40 ('rx', DFP): 0, 41 ('rx', UFP): 0, 42 ('tx', DFP): 0, 43 ('tx', UFP): 0 44 } 45 46 def _verify_pdtester_connection(self, port): 47 """Verify if DUT to PDTester PD connection 48 49 This method checks for a PDTester PD connection for the 50 given port by first verifying if a PD connection is present. 51 If found, then it uses a PDTester feature to force a PD disconnect. 52 If the port is no longer in the connected state, and following 53 a delay, is found to be back in the connected state, then 54 a DUT pd to PDTester connection is verified. 55 56 @param port: DUT pd port to test 57 58 @returns True if DUT to PDTester pd connection is verified 59 """ 60 DISCONNECT_TIME_SEC = 2 61 # pdtester console command to force PD disconnect 62 disc_cmd = 'fakedisconnect 100 %d' % (DISCONNECT_TIME_SEC*1000) 63 # Only check for PDTester if DUT has active PD connection 64 if self.dut_pd_utils.is_pd_connected(port): 65 # Attempt to force PD disconnection 66 self.pdtester_pd_utils.send_pd_command(disc_cmd) 67 time.sleep(self.PD_ROLE_DELAY) 68 # Verify that DUT PD port is no longer connected 69 if self.dut_pd_utils.is_pd_connected(port) == False: 70 # Wait for disconnect timer and give time to reconnect 71 time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC) 72 if self.dut_pd_utils.is_pd_connected(port): 73 logging.info('PDTester connection verfied on port %d', port) 74 return True 75 else: 76 # Could have disconnected other port, allow it to reconnect 77 # before exiting. 78 time.sleep(self.PD_CONNECT_DELAY + DISCONNECT_TIME_SEC) 79 return False 80 81 def _find_dut_to_pdtester_connection(self): 82 """Find the PD port which is connected to PDTester 83 84 @returns DUT pd port number if found, None otherwise 85 """ 86 for port in xrange(self.dut_pd_utils.PD_MAX_PORTS): 87 # Check for DUT to PDTester connection on port 88 if self._verify_pdtester_connection(port): 89 # PDTester PD connection found so exit 90 return port 91 return None 92 93 def _get_data_role(self, console, port): 94 """Get data role of PD connection 95 96 @param console: pd console object for uart access 97 @param port: 0/1 pd port of current connection 98 99 @returns: 'DFP' or 'UFP' 100 """ 101 role = console.get_pd_role(port) 102 m = re.search('[\w]+-([\w]+)', role) 103 return m.group(1) 104 105 def _get_remote_role(self, local_role): 106 """Invert data role 107 108 @param local_role: data role to be flipped 109 110 @returns: flipped data role value 111 """ 112 if local_role == self.DFP: 113 return self.UFP 114 else: 115 return self.DFP 116 117 def _change_dut_power_role(self, port): 118 """Force power role change via PDTester 119 120 @param port: port of DUT PD connection 121 122 @returns True is power role change is successful 123 """ 124 PDTESTER_SRC_VOLTAGE = 5 125 PDTESTER_SNK_VOLTAGE = 0 126 pd_state = self.dut_pd_utils.get_pd_state(port) 127 if pd_state == self.dut_pd_utils.SRC_CONNECT: 128 # DUT is currently a SRC, so change to SNK 129 # Use PDTester method to ensure power role change 130 self.pdtester.charge(PDTESTER_SRC_VOLTAGE) 131 else: 132 # DUT is currently a SNK, so change it to a SRC. 133 self.pdtester.charge(PDTESTER_SNK_VOLTAGE) 134 # Wait for change to take place 135 time.sleep(self.PD_CONNECT_DELAY) 136 pdtester_state = self.pdtester_pd_utils.get_pd_state(self.pdtester_port) 137 # Current PDTester state should equal DUT state when called 138 return bool(pd_state == pdtester_state) 139 140 def _send_data_swap_get_reply(self, console, port): 141 """Send data swap request, get PD control msg reply 142 143 The PD console debug mode is enabled prior to sending 144 a pd data role swap request message. This allows the 145 control message reply to be extracted. The debug mode 146 is disabled prior to exiting. 147 148 @param console: pd console object for uart access 149 150 @ returns: PD control header message 151 """ 152 # Enable PD console debug mode to show control messages 153 console.enable_pd_console_debug() 154 cmd = 'pd %d swap data' % port 155 m = console.send_pd_command_get_output(cmd, ['RECV\s([\w]+)']) 156 ctrl_msg = int(m[0][1], 16) & console.PD_CONTROL_MSG_MASK 157 console.disable_pd_console_debug() 158 return ctrl_msg 159 160 def _attempt_data_swap(self, pd_port, direction): 161 """Perform a data role swap request 162 163 Data swap requests can be either initiated by the DUT or received 164 by the DUT. This direction determines which PD console is used 165 to initiate the swap command. The data role before and after 166 the swap command are compared to determine if it took place. 167 168 Even if data swap capability is advertised, a PD device is allowed 169 to reject the request. Therefore, not swapping isn't itself a 170 failure. When PDTester is used to initate the request, the debug 171 mode is enabled which allows the control message from the DUT to 172 be analyzed. If the swap does not occur, but the request is rejected 173 by the DUT then that is not counted as a failure. 174 175 @param pd_port: DUT pd port value 0/1 176 @param direction: rx or tx from the DUT perspective 177 178 @returns PD control reply message for tx swaps, 0 otherwise 179 """ 180 # Get starting DUT data role 181 dut_dr = self._get_data_role(self.dut_pd_utils, pd_port) 182 self.swap_attempt[(direction, dut_dr)] += 1 183 if direction == 'tx': 184 # Initiate swap request from the DUT 185 console = self.dut_pd_utils 186 cmd = 'pd %d swap data' % pd_port 187 # Send the 'swap data' command 188 self.dut_pd_utils.send_pd_command(cmd) 189 # Not using debug mode, so there is no reply message 190 ctrl = 0 191 else: 192 # Initiate swap request from PDTester 193 console = self.pdtester_pd_utils 194 ctrl = self._send_data_swap_get_reply(console, self.pdtester_port) 195 196 time.sleep(self.PD_ROLE_DELAY) 197 # Get DUT current data role 198 swap_dr = self._get_data_role(self.dut_pd_utils, pd_port) 199 logging.info('%s swap attempt: prev = %s, new = %s, msg = %s', 200 direction, dut_dr, swap_dr, ctrl) 201 if (dut_dr == swap_dr and 202 ctrl != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']): 203 self.swap_failure[(direction, dut_dr)] += 1 204 return ctrl 205 206 def _execute_data_role_swap_test(self, pd_port): 207 """Execute a series of data role swaps 208 209 Attempt both rx and tx data swaps, from perspective of DUT. 210 Even if the DUT advertises support, it can 211 reject swap requests when already in the desired data role. For 212 example many devices will not swap if already in DFP mode. 213 However, PDTester should always accept a request. Therefore, 214 when a swap failed on a rx swap, then that is followed by 215 a tx swap attempt. 216 217 @param pd_port: port number of DUT PD connection 218 """ 219 for attempt in xrange(self.DATA_SWAP_ITERATIONS): 220 # Use the same direction for every 2 loop iterations 221 if attempt & 2: 222 direction = 'tx' 223 else: 224 direction = 'rx' 225 ctrl_msg = self._attempt_data_swap(pd_port, direction) 226 if (direction == 'rx' and 227 ctrl_msg == 228 self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']): 229 # Use pdtester initated swap to change roles 230 self._attempt_data_swap(pd_port, 'tx') 231 232 def _test_data_swap_reject(self, pd_port): 233 """Verify that data swap request is rejected 234 235 This tests the case where the DUT doesn't advertise support 236 for data swaps. A data request is sent by PDTester, and then 237 the control message checked to ensure the request was rejected. 238 In addition, the data role and connection state are verified 239 to remain unchanged. 240 241 @param pd_port: port for DUT pd connection 242 """ 243 # Get current DUT data role 244 dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port) 245 dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port) 246 # Send swap command from PDTester and get reply 247 ctrl_msg = self._send_data_swap_get_reply(self.pdtester_pd_utils, 248 self.pdtester_port) 249 if ctrl_msg != self.dut_pd_utils.PD_CONTROL_MSG_DICT['Reject']: 250 raise error.TestFail('Data Swap Req not rejected, returned %r' % 251 ctrl_msg) 252 # Get DUT current state 253 pd_state = self.dut_pd_utils.get_pd_state(pd_port) 254 if pd_state != dut_connect_state: 255 raise error.TestFail('PD not connected! pd_state = %r' % 256 pd_state) 257 # Since reject message was received, verify data role didn't change 258 curr_dr = self._get_data_role(self.dut_pd_utils, pd_port) 259 if curr_dr != dut_data_role: 260 raise error.TestFail('Unexpected PD data role change') 261 262 def initialize(self, host, cmdline_args, flip_cc=False): 263 super(firmware_PDDataSwap, self).initialize(host, cmdline_args) 264 self.setup_pdtester(flip_cc) 265 # Only run in normal mode 266 self.switcher.setup_mode('normal') 267 self.usbpd.send_command('chan 0') 268 269 def cleanup(self): 270 self.usbpd.send_command('chan 0xffffffff') 271 super(firmware_PDDataSwap, self).cleanup() 272 273 def run_once(self): 274 """Exectue Data Role swap test. 275 276 1. Verify that pd console is accessible 277 2. Verify that DUT has a valid PD contract 278 3. Determine if DUT advertises support for data swaps 279 4. Test DUT initiated and received data swaps 280 5. Swap power roles if supported 281 6. Repeat DUT received data swap requests 282 283 """ 284 # TODO(b/35573842): Refactor to use PDPortPartner to probe the port 285 self.pdtester_port = 1 if 'servo_v4' in self.pdtester.servo_type else 0 286 287 # create objects for pd utilities 288 self.dut_pd_utils = pd_console.PDConsoleUtils(self.usbpd) 289 self.pdtester_pd_utils = pd_console.PDConsoleUtils(self.pdtester) 290 291 # Make sure PD support exists in the UART console 292 if self.dut_pd_utils.verify_pd_console() == False: 293 raise error.TestFail("pd command not present on console!") 294 295 # Type C connection (PD contract) should exist at this point 296 # For this test, the DUT must be connected to a PDTester. 297 pd_port = self._find_dut_to_pdtester_connection() 298 if pd_port == None: 299 raise error.TestFail("DUT to PDTester PD connection not found") 300 dut_connect_state = self.dut_pd_utils.get_pd_state(pd_port) 301 logging.info('Initial DUT connect state = %s', dut_connect_state) 302 303 # Determine if DUT supports data role swaps 304 dr_swap_allowed = self.pdtester_pd_utils.is_pd_flag_set( 305 self.pdtester_port, 'data_swap') 306 # Get current DUT data role 307 dut_data_role = self._get_data_role(self.dut_pd_utils, pd_port) 308 logging.info('Starting DUT Data Role = %r', dut_data_role) 309 310 # If data swaps are not allowed on the DUT, then still 311 # attempt a data swap and verify that the request is 312 # rejected by the DUT and that it remains connected and 313 # in the same role. 314 if dr_swap_allowed == False: 315 logging.info('Data Swap support not advertised by DUT') 316 self._test_data_swap_reject(pd_port) 317 logging.info('Data Swap request rejected by DUT as expected') 318 else: 319 # Data role swap support advertised, test this feature. 320 self._execute_data_role_swap_test(pd_port) 321 322 # If DUT supports Power Role swap then attempt to change roles. 323 # This way, data role swaps will be tested in both configurations. 324 if self.pdtester_pd_utils.is_pd_flag_set( 325 self.pdtester_port, 'power_swap'): 326 logging.info('\nDUT advertises Power Swap Support') 327 # Attempt to swap power roles 328 power_swap = self._change_dut_power_role(pd_port) 329 if power_swap: 330 try: 331 self._execute_data_role_swap_test(pd_port) 332 finally: 333 # Swap power role, back to the original 334 self._change_dut_power_role(pd_port) 335 else: 336 logging.warn('Power swap not successful!') 337 logging.warn('Only tested with DUT in %s state', 338 dut_connect_state) 339 else: 340 logging.info('DUT does not advertise power swap support') 341 342 logging.info('***************** Swap Results ********************') 343 total_attempts = 0 344 total_failures = 0 345 for direction, role in self.swap_attempt.iterkeys(): 346 logging.info('%s %s swap attempts = %d, failures = %d', 347 direction, role, 348 self.swap_attempt[(direction, role)], 349 self.swap_failure[(direction, role)]) 350 total_attempts += self.swap_attempt[(direction, role)] 351 total_failures += self.swap_failure[(direction, role)] 352 353 # If any swap attempts were not successful, flag test as failure 354 if total_failures: 355 raise error.TestFail('Data Swap Fail: Attempt = %d, Failure = %d' % 356 (total_attempts, total_failures)) 357