1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, threading, time 6 7from autotest_lib.client.common_lib.cros import crash_detector 8from autotest_lib.server import autotest, test 9from autotest_lib.client.common_lib import error 10 11_CLIENT_TERMINATION_FILE_PATH = '/tmp/simple_login_exit' 12_LONG_TIMEOUT = 200 13_LOWER_USB_PORT = 'usb_mux_sel3' 14_SUSPEND_TIME = 30 15_UPPER_USB_PORT = 'usb_mux_sel1' 16_WAIT_DELAY = 15 17 18 19class platform_ExternalUsbPeripherals(test.test): 20 """Uses servo to repeatedly connect/remove USB devices during boot.""" 21 version = 1 22 23 24 def getPluggedUsbDevices(self): 25 """Determines the external USB devices plugged 26 27 @returns plugged_list: List of plugged usb devices names 28 29 """ 30 lsusb_output = self.host.run('lsusb').stdout.strip() 31 items = lsusb_output.split('\n') 32 plugged_list = [] 33 unnamed_device_count = 1 34 for item in items: 35 columns = item.split(' ') 36 if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0: 37 logging.debug('Unnamed device located, adding generic name.') 38 name = 'Unnamed device %d' % unnamed_device_count 39 unnamed_device_count += 1 40 else: 41 name = ' '.join(columns[6:]).strip() 42 plugged_list.append(name) 43 return plugged_list 44 45 46 def plug_peripherals(self, on=True): 47 """Setting USB mux_3 plug status 48 49 @param on: Connect the servo-usb port to DUT(plug) or to servo(unplug) 50 51 """ 52 switch = 'dut_sees_usbkey' 53 if not on: 54 switch = 'servo_sees_usbkey' 55 self.host.servo.set(self.plug_port, switch) 56 self.pluged_status = on 57 58 59 def client_login(self, client_exit): 60 """Login i.e. runs running client test 61 62 @exception TestFail failed to login within timeout. 63 64 """ 65 self.autotest_client.run_test(self.client_autotest, 66 exit_without_logout=client_exit) 67 68 69 def action_login(self, login_client_exit=True): 70 """Login i.e. runs running client login 71 72 @param login_client_exit: Exit after login flag. 73 74 """ 75 thread = threading.Thread(target=self.client_login, 76 args = (login_client_exit, )) 77 thread.start() 78 time.sleep(_WAIT_DELAY) 79 80 81 def action_logout(self): 82 """Logout i.e. runs running client test 83 84 @exception TestFail failed to logout within timeout. 85 86 """ 87 self.host.run('touch %s' % _CLIENT_TERMINATION_FILE_PATH) 88 time.sleep(_WAIT_DELAY) 89 90 91 def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg): 92 """Waits till command output is meta 93 94 @param cmd: executed command 95 @param check: string to be checked for in cmd output 96 @param timeout: max time in sec to wait for output 97 @param timeout_msg: timeout failure message 98 99 @returns True if check is found in command output; False otherwise 100 """ 101 start_time = int(time.time()) 102 time_delta = 0 103 command = '%s %s' % (cmd, check) 104 logging.debug('Command: %s', command) 105 while(self.host.run(command, ignore_status=True).exit_status != 0): 106 time_delta = int(time.time()) - start_time 107 if time_delta > timeout: 108 self.add_failure('%s - %d sec' % (timeout_msg, timeout)) 109 return False 110 time.sleep(0.5) 111 logging.debug('Succeeded in :%d sec', time_delta) 112 return True 113 114 115 def suspend_for_time(self, suspend_time=_SUSPEND_TIME): 116 """Calls the host method suspend with suspend_time argument. 117 118 @param suspend_time: time to suspend the device for. 119 120 """ 121 try: 122 self.host.suspend(suspend_time=suspend_time) 123 except error.AutoservSuspendError: 124 pass 125 126 127 def action_suspend(self): 128 """Suspend i.e. powerd_dbus_suspend and wait 129 130 @returns boot_id for the following resume 131 """ 132 boot_id = self.host.get_boot_id() 133 thread = threading.Thread(target=self.suspend_for_time) 134 thread.start() 135 self.host.test_wait_for_sleep(_LONG_TIMEOUT) 136 logging.debug('--- Suspended') 137 self.suspend_status = True 138 return boot_id 139 140 141 def action_resume(self, boot_id): 142 """Resume i.e. press power key and wait 143 144 @param boot_id: boot id obtained prior to suspending 145 146 """ 147 self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) 148 logging.debug('--- Resumed') 149 self.suspend_status = False 150 151 152 def close_lid(self): 153 """Close lid through servo to suspend the device.""" 154 boot_id = self.host.get_boot_id() 155 logging.info('Closing lid...') 156 self.host.servo.lid_close() 157 self.host.test_wait_for_sleep(_LONG_TIMEOUT) 158 self.suspend_status = True 159 return boot_id 160 161 162 def open_lid(self, boot_id): 163 """Open lid through servo to resume.""" 164 logging.info('Opening lid...') 165 self.host.servo.lid_open() 166 self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) 167 self.suspend_status = False 168 169 170 def crash_not_detected(self): 171 """Finds new crash files and adds to failures list if any 172 173 @returns True if there were not crashes; False otherwise 174 """ 175 crash_files = self.detect_crash.get_new_crash_files() 176 if crash_files: 177 self.add_failure('CRASH DETECTED: %s' % str(crash_files)) 178 return False 179 return True 180 181 182 def check_plugged_usb_devices(self): 183 """Checks the plugged peripherals match device list. 184 185 @returns True if expected USB peripherals are detected; False otherwise 186 """ 187 result = True 188 if self.pluged_status and self.usb_list != None: 189 # Check for mandatory USb devices passed by usb_list flag 190 for usb_name in self.usb_list: 191 found = self.wait_for_cmd_output( 192 'lsusb | grep -E ', usb_name, _WAIT_DELAY * 4, 193 'Not detecting %s' % usb_name) 194 result = result and found 195 time.sleep(_WAIT_DELAY) 196 on_now = self.getPluggedUsbDevices() 197 if self.pluged_status: 198 if not self.diff_list.issubset(on_now): 199 missing = str(self.diff_list.difference(on_now)) 200 self.add_failure('Missing connected peripheral(s) ' 201 'when plugged: %s ' % missing) 202 result = False 203 else: 204 present = self.diff_list.intersection(on_now) 205 if len(present) > 0: 206 self.add_failure('Still presented peripheral(s) ' 207 'when unplugged: %s ' % str(present)) 208 result = False 209 return result 210 211 212 def check_usb_peripherals_details(self): 213 """Checks the effect from plugged in USB peripherals. 214 215 @returns True if command line output is matched successfuly; Else False 216 """ 217 usb_check_result = True 218 for cmd in self.usb_checks.keys(): 219 out_match_list = self.usb_checks.get(cmd) 220 if cmd.startswith('loggedin:'): 221 if not self.login_status: 222 continue 223 cmd = cmd.replace('loggedin:', '') 224 board = self.host.get_board().split(':')[1].lower() 225 # Run the usb check command 226 for out_match in out_match_list: 227 # Skip running media_v4l2_test on hana boards 228 # crbug.com/820500 229 if 'media_v4l2_test' in cmd and board in ["hana"]: 230 continue 231 match_result = self.wait_for_cmd_output( 232 cmd, out_match, _WAIT_DELAY * 4, 233 'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match)) 234 usb_check_result = usb_check_result and match_result 235 return usb_check_result 236 237 238 def check_status(self): 239 """Performs checks after each action: 240 - for USB detected devices 241 - for generated crash files 242 - peripherals effect checks on cmd line 243 244 @returns True if all of the iteration checks pass; False otherwise. 245 """ 246 result = True 247 if not self.suspend_status: 248 # Detect the USB peripherals 249 result = self.check_plugged_usb_devices() 250 # Check for crash files 251 if self.crash_check: 252 result = result and self.crash_not_detected() 253 if self.pluged_status and (self.usb_checks != None): 254 # Check for plugged USB devices details 255 result = result and self.check_usb_peripherals_details() 256 return result 257 258 259 def add_failure(self, reason): 260 """ Adds a failure reason to list of failures to be reported at end 261 262 @param reason: failure reason to record 263 264 """ 265 if self.action_step is not None: 266 self.fail_reasons.append('%s FAILS - %s' % 267 (self.action_step, reason)) 268 269 270 def check_connected_peripherals(self): 271 """ Verifies there are connected usb devices on servo 272 273 @raise error.TestError: if no peripherals are shown 274 """ 275 276 # Collect USB peripherals when unplugged 277 self.plug_peripherals(False) 278 time.sleep(_WAIT_DELAY) 279 off_list = self.getPluggedUsbDevices() 280 281 # Collect USB peripherals when plugged 282 self.plug_peripherals(True) 283 time.sleep(_WAIT_DELAY * 2) 284 on_list = self.getPluggedUsbDevices() 285 286 self.diff_list = set(on_list).difference(set(off_list)) 287 if len(self.diff_list) == 0: 288 # Fail if no devices detected after 289 raise error.TestError('No connected devices were detected. Make ' 290 'sure the devices are connected to USB_KEY ' 291 'and DUT_HUB1_USB on the servo board.') 292 logging.debug('Connected devices list: %s', self.diff_list) 293 294 295 def prep_servo_for_test(self, stress_rack): 296 """Connects servo to DUT and sets servo ports 297 298 @param stress_rack: either to prep servo for stress tests, where 299 usb_mux_1 port should be on. For usb peripherals on usb_mux_3, 300 the port is on, and the oe2,oe4 poers are off. 301 302 @returns port as string to plug/unplug the specific port 303 """ 304 port = _LOWER_USB_PORT 305 self.host.servo.switch_usbkey('dut') 306 self.host.servo.set('dut_hub1_rst1','off') 307 if stress_rack: 308 port = _UPPER_USB_PORT 309 self.host.servo.set(port, 'dut_sees_usbkey') 310 else: 311 self.host.servo.set(_UPPER_USB_PORT, 'servo_sees_usbkey') 312 self.host.servo.set('usb_mux_oe2', 'off') 313 self.host.servo.set('usb_mux_oe4', 'off') 314 time.sleep(_WAIT_DELAY) 315 return port 316 317 318 def cleanup(self): 319 """Disconnect servo hub""" 320 self.plug_peripherals(False) 321 self.action_logout() 322 self.host.servo.set('dut_hub1_rst1','on') 323 self.host.run('reboot now', ignore_status=True) 324 self.host.test_wait_for_boot() 325 326 327 def run_once(self, host, client_autotest, action_sequence, repeat, 328 usb_list=None, usb_checks=None, 329 crash_check=False, stress_rack=False): 330 self.client_autotest = client_autotest 331 self.host = host 332 self.autotest_client = autotest.Autotest(self.host) 333 self.usb_list = usb_list 334 self.usb_checks = usb_checks 335 self.crash_check = crash_check 336 337 self.suspend_status = False 338 self.login_status = False 339 self.fail_reasons = list() 340 self.action_step = None 341 342 self.plug_port = self.prep_servo_for_test(stress_rack) 343 344 # Unplug, plug, compare usb peripherals, and leave plugged. 345 self.check_connected_peripherals() 346 347 action_sequence = action_sequence.upper() 348 # Check for if board type is NOT chromebook and skip lid_close_open tests 349 if (not (host.get_board_type() == 'CHROMEBOOK')) and \ 350 ('CLOSELID' in action_sequence): 351 raise error.TestNAError('No lid on DUT. Test Skipped') 352 actions = action_sequence.split(',') 353 boot_id = 0 354 self.detect_crash = crash_detector.CrashDetector(self.host) 355 self.detect_crash.remove_crash_files() 356 357 # Run camera client test to gather media_V4L2_test binary. 358 if 'media_v4l2_test' in str(self.usb_checks): 359 self.autotest_client.run_test("camera_V4L2") 360 361 for iteration in xrange(1, repeat + 1): 362 step = 0 363 for action in actions: 364 step += 1 365 action = action.strip() 366 self.action_step = 'STEP %d.%d. %s' % (iteration, step, action) 367 logging.info(self.action_step) 368 369 if action == 'RESUME': 370 self.action_resume(boot_id) 371 time.sleep(_WAIT_DELAY) 372 elif action == 'OPENLID': 373 self.open_lid(boot_id) 374 time.sleep(_WAIT_DELAY) 375 elif action == 'UNPLUG': 376 self.plug_peripherals(False) 377 elif action == 'PLUG': 378 self.plug_peripherals(True) 379 elif self.suspend_status == False: 380 if action.startswith('LOGIN'): 381 if self.login_status: 382 logging.debug('Skipping login. Already logged in.') 383 continue 384 else: 385 self.action_login('LOGOUT' not in actions) 386 self.login_status = True 387 elif action == 'LOGOUT': 388 if self.login_status: 389 self.action_logout() 390 self.login_status = False 391 else: 392 logging.debug('Skipping logout. Not logged in.') 393 elif action == 'REBOOT': 394 self.host.reboot() 395 time.sleep(_WAIT_DELAY * 3) 396 self.login_status = False 397 elif action == 'SUSPEND': 398 boot_id = self.action_suspend() 399 elif action == 'CLOSELID': 400 boot_id = self.close_lid() 401 else: 402 logging.info('WRONG ACTION: %s .', self.action_step) 403 self.check_status() 404 405 if self.fail_reasons: 406 raise error.TestFail('Failures reported: %s' % 407 str(self.fail_reasons)) 408