1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import logging, threading, time 7 8from autotest_lib.client.common_lib.cros import crash_detector 9from autotest_lib.server import autotest, test 10from autotest_lib.client.common_lib import error 11 12_CLIENT_TERMINATION_FILE_PATH = '/tmp/simple_login_exit' 13_LONG_TIMEOUT = 200 14_LOWER_USB_PORT = 'usb_mux_sel3' 15_SUSPEND_TIME = 30 16_UPPER_USB_PORT = 'usb_mux_sel1' 17_WAIT_DELAY = 15 18_WAIT_OPENLID_DELAY = 30 19_WAIT_LONG_DELAY = 60 20 21# servo v4.1 controls 22_AUX_USB_PORT = 'aux_usbkey_mux' 23_IMAGE_USB_PORT = 'image_usbkey_mux' 24 25 26class platform_ExternalUsbPeripherals(test.test): 27 """Uses servo to repeatedly connect/remove USB devices during boot.""" 28 version = 1 29 30 31 def getPluggedUsbDevices(self): 32 """Determines the external USB devices plugged 33 34 @returns plugged_list: List of plugged usb devices names 35 36 """ 37 lsusb_output = self.host.run('lsusb').stdout.strip() 38 items = lsusb_output.split('\n') 39 plugged_list = [] 40 unnamed_device_count = 1 41 for item in items: 42 columns = item.split(' ') 43 if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0: 44 logging.debug('Unnamed device located, adding generic name.') 45 name = 'Unnamed device %d' % unnamed_device_count 46 unnamed_device_count += 1 47 else: 48 name = ' '.join(columns[6:]).strip() 49 plugged_list.append(name) 50 return plugged_list 51 52 53 def plug_peripherals(self, on=True): 54 """Setting USB mux_3 plug status 55 56 @param on: Connect the servo-usb port to DUT(plug) or to servo(unplug) 57 58 """ 59 switch = 'dut_sees_usbkey' 60 if not on: 61 switch = 'servo_sees_usbkey' 62 self.host.servo.set(self.plug_port, switch) 63 self.pluged_status = on 64 65 66 def client_login(self, client_exit): 67 """Login i.e. runs running client test 68 69 @exception TestFail failed to login within timeout. 70 71 """ 72 self.autotest_client.run_test(self.client_autotest, 73 exit_without_logout=client_exit) 74 75 76 def action_login(self, login_client_exit=True): 77 """Login i.e. runs running client login 78 79 @param login_client_exit: Exit after login flag. 80 81 """ 82 thread = threading.Thread(target=self.client_login, 83 args = (login_client_exit, )) 84 thread.start() 85 time.sleep(_WAIT_DELAY) 86 87 88 def action_logout(self): 89 """Logout i.e. runs running client test 90 91 @exception TestFail failed to logout within timeout. 92 93 """ 94 self.host.run('touch %s' % _CLIENT_TERMINATION_FILE_PATH) 95 time.sleep(_WAIT_DELAY) 96 97 98 def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg): 99 """Waits till command output is meta 100 101 @param cmd: executed command 102 @param check: string to be checked for in cmd output 103 @param timeout: max time in sec to wait for output 104 @param timeout_msg: timeout failure message 105 106 @returns True if check is found in command output; False otherwise 107 """ 108 start_time = int(time.time()) 109 time_delta = 0 110 command = '%s %s' % (cmd, check) 111 logging.debug('Command: %s', command) 112 while(self.host.run(command, ignore_status=True).exit_status != 0): 113 time_delta = int(time.time()) - start_time 114 if time_delta > timeout: 115 self.add_failure('%s - %d sec' % (timeout_msg, timeout)) 116 return False 117 time.sleep(0.5) 118 logging.debug('Succeeded in :%d sec', time_delta) 119 return True 120 121 122 def suspend_for_time(self, suspend_time=_SUSPEND_TIME): 123 """Calls the host method suspend with suspend_time argument. 124 125 @param suspend_time: time to suspend the device for. 126 127 """ 128 try: 129 self.host.suspend(suspend_time=suspend_time) 130 except error.AutoservSuspendError: 131 pass 132 133 134 def action_suspend(self): 135 """Suspend i.e. powerd_dbus_suspend and wait 136 137 @returns boot_id for the following resume 138 """ 139 boot_id = self.host.get_boot_id() 140 thread = threading.Thread(target=self.suspend_for_time) 141 thread.start() 142 self.host.test_wait_for_sleep(_LONG_TIMEOUT) 143 logging.debug('--- Suspended') 144 self.suspend_status = True 145 return boot_id 146 147 148 def action_resume(self, boot_id): 149 """Resume i.e. press power key and wait 150 151 @param boot_id: boot id obtained prior to suspending 152 153 """ 154 self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) 155 logging.debug('--- Resumed') 156 self.suspend_status = False 157 158 159 def close_lid(self): 160 """Close lid through servo to suspend the device.""" 161 boot_id = self.host.get_boot_id() 162 logging.info('Closing lid...') 163 self.host.servo.lid_close() 164 self.host.test_wait_for_sleep(_LONG_TIMEOUT) 165 self.suspend_status = True 166 return boot_id 167 168 169 def open_lid(self, boot_id): 170 """Open lid through servo to resume.""" 171 logging.info('Opening lid...') 172 self.host.servo.lid_open() 173 self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) 174 self.suspend_status = False 175 176 177 def crash_not_detected(self): 178 """Finds new crash files and adds to failures list if any 179 180 @returns True if there were not crashes; False otherwise 181 """ 182 crash_files = self.detect_crash.get_new_crash_files() 183 if crash_files: 184 self.add_failure('CRASH DETECTED: %s' % str(crash_files)) 185 return False 186 return True 187 188 189 def check_plugged_usb_devices(self): 190 """Checks the plugged peripherals match device list. 191 192 @returns True if expected USB peripherals are detected; False otherwise 193 """ 194 result = True 195 if self.pluged_status and self.usb_list != None: 196 # Check for mandatory USb devices passed by usb_list flag 197 for usb_name in self.usb_list: 198 found = self.wait_for_cmd_output( 199 'lsusb | grep -E ', usb_name, _WAIT_LONG_DELAY, 200 'Not detecting %s' % usb_name) 201 result = result and found 202 time.sleep(_WAIT_DELAY) 203 on_now = self.getPluggedUsbDevices() 204 if self.pluged_status: 205 if not self.diff_list.issubset(on_now): 206 missing = str(self.diff_list.difference(on_now)) 207 self.add_failure('Missing connected peripheral(s) ' 208 'when plugged: %s ' % missing) 209 result = False 210 else: 211 present = self.diff_list.intersection(on_now) 212 if len(present) > 0: 213 self.add_failure('Still presented peripheral(s) ' 214 'when unplugged: %s ' % str(present)) 215 result = False 216 return result 217 218 219 def check_usb_peripherals_details(self): 220 """Checks the effect from plugged in USB peripherals. 221 222 @returns True if command line output is matched successfuly; Else False 223 """ 224 usb_check_result = True 225 for cmd in self.usb_checks.keys(): 226 out_match_list = self.usb_checks.get(cmd) 227 if cmd.startswith('loggedin:'): 228 if not self.login_status: 229 continue 230 cmd = cmd.replace('loggedin:', '') 231 board = self.host.get_board().split(':')[1].lower() 232 # Run the usb check command 233 for out_match in out_match_list: 234 match_result = self.wait_for_cmd_output( 235 cmd, out_match, _WAIT_LONG_DELAY, 236 'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match)) 237 usb_check_result = usb_check_result and match_result 238 return usb_check_result 239 240 241 def check_status(self): 242 """Performs checks after each action: 243 - for USB detected devices 244 - for generated crash files 245 - peripherals effect checks on cmd line 246 247 @returns True if all of the iteration checks pass; False otherwise. 248 """ 249 result = True 250 if not self.suspend_status: 251 # Detect the USB peripherals 252 result = self.check_plugged_usb_devices() 253 # Check for crash files 254 if self.crash_check: 255 result = result and self.crash_not_detected() 256 if self.pluged_status and (self.usb_checks != None): 257 # Check for plugged USB devices details 258 result = result and self.check_usb_peripherals_details() 259 return result 260 261 262 def add_failure(self, reason): 263 """ Adds a failure reason to list of failures to be reported at end 264 265 @param reason: failure reason to record 266 267 """ 268 if self.action_step is not None: 269 self.fail_reasons.append('%s FAILS - %s' % 270 (self.action_step, reason)) 271 272 273 def check_connected_peripherals(self): 274 """ Verifies there are connected usb devices on servo 275 276 @raise error.TestError: if no peripherals are shown 277 """ 278 279 # Collect USB peripherals when unplugged 280 self.plug_peripherals(False) 281 time.sleep(_WAIT_DELAY) 282 off_list = self.getPluggedUsbDevices() 283 284 # Collect USB peripherals when plugged 285 self.plug_peripherals(True) 286 time.sleep(_WAIT_LONG_DELAY) 287 on_list = self.getPluggedUsbDevices() 288 289 self.diff_list = set(on_list).difference(set(off_list)) 290 if len(self.diff_list) == 0: 291 # Fail if no devices detected after 292 raise error.TestError('No connected devices were detected. Make ' 293 'sure the devices are connected to USB_KEY ' 294 'and DUT_HUB1_USB on the servo board.') 295 logging.debug('Connected devices list: %s', self.diff_list) 296 297 298 def prep_servo_for_test(self): 299 """Connects servo to DUT and sets servo ports 300 301 @returns port as string to plug/unplug the specific port 302 """ 303 if 'servo_v4p1' in self.servo_type: 304 port = _AUX_USB_PORT 305 self.host.servo.set(_IMAGE_USB_PORT, 'servo_sees_usbkey') 306 else: 307 port = _LOWER_USB_PORT 308 self.host.servo.switch_usbkey('dut') 309 self.host.servo.set('dut_hub1_rst1','off') 310 self.host.servo.set(_UPPER_USB_PORT, 'servo_sees_usbkey') 311 self.host.servo.set('usb_mux_oe2', 'off') 312 self.host.servo.set('usb_mux_oe4', 'off') 313 time.sleep(_WAIT_DELAY) 314 return port 315 316 317 def cleanup(self): 318 """Disconnect servo hub""" 319 self.plug_peripherals(False) 320 self.action_logout() 321 if 'servo_v4p1' not in self.servo_type: 322 self.host.servo.set('dut_hub1_rst1','on') 323 self.host.run('reboot now', ignore_status=True) 324 self.host.test_wait_for_boot() 325 326 327 def run_once(self, host, client_autotest, action_sequence, repeat, 328 usb_list=None, usb_checks=None, 329 crash_check=False): 330 self.client_autotest = client_autotest 331 self.host = host 332 self.autotest_client = autotest.Autotest(self.host) 333 self.usb_list = usb_list 334 self.usb_checks = usb_checks 335 self.crash_check = crash_check 336 337 self.suspend_status = False 338 self.login_status = False 339 self.fail_reasons = list() 340 self.action_step = None 341 342 self.servo_type = self.host.servo.get_servo_type() 343 344 self.plug_port = self.prep_servo_for_test() 345 346 # Unplug, plug, compare usb peripherals, and leave plugged. 347 self.check_connected_peripherals() 348 349 action_sequence = action_sequence.upper() 350 # Check for if board type is NOT chromebook and skip lid_close_open tests 351 if (not (host.get_board_type() == 'CHROMEBOOK')) and \ 352 ('CLOSELID' in action_sequence): 353 raise error.TestNAError('No lid on DUT. Test Skipped') 354 actions = action_sequence.split(',') 355 boot_id = 0 356 self.detect_crash = crash_detector.CrashDetector(self.host) 357 self.detect_crash.remove_crash_files() 358 359 for iteration in range(1, repeat + 1): 360 step = 0 361 for action in actions: 362 step += 1 363 action = action.strip() 364 self.action_step = 'STEP %d.%d. %s' % (iteration, step, action) 365 logging.info(self.action_step) 366 367 if action == 'RESUME': 368 self.action_resume(boot_id) 369 time.sleep(_WAIT_DELAY) 370 elif action == 'OPENLID': 371 self.open_lid(boot_id) 372 time.sleep(_WAIT_OPENLID_DELAY) 373 elif action == 'UNPLUG': 374 self.plug_peripherals(False) 375 elif action == 'PLUG': 376 self.plug_peripherals(True) 377 elif self.suspend_status == False: 378 if action.startswith('LOGIN'): 379 if self.login_status: 380 logging.debug('Skipping login. Already logged in.') 381 continue 382 else: 383 self.action_login('LOGOUT' not in actions) 384 self.login_status = True 385 elif action == 'LOGOUT': 386 if self.login_status: 387 self.action_logout() 388 self.login_status = False 389 else: 390 logging.debug('Skipping logout. Not logged in.') 391 elif action == 'REBOOT': 392 self.host.reboot() 393 time.sleep(_WAIT_LONG_DELAY) 394 self.login_status = False 395 elif action == 'SUSPEND': 396 boot_id = self.action_suspend() 397 elif action == 'CLOSELID': 398 boot_id = self.close_lid() 399 else: 400 logging.info('WRONG ACTION: %s .', self.action_step) 401 self.check_status() 402 403 if self.fail_reasons: 404 raise error.TestFail('Failures reported: %s' % 405 str(self.fail_reasons)) 406