1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging, threading, time 6 7from autotest_lib.client.common_lib.cros import crash_detector 8from autotest_lib.server import autotest, test 9from autotest_lib.client.common_lib import error 10 11_WAIT_DELAY = 15 12_LONG_TIMEOUT = 200 13_LOWER_USB_PORT = 'usb_mux_sel3' 14_SUSPEND_TIME = 30 15_UPPER_USB_PORT = 'usb_mux_sel1' 16 17class platform_ExternalUsbPeripherals(test.test): 18 """Uses servo to repeatedly connect/remove USB devices during boot.""" 19 version = 1 20 21 22 def getPluggedUsbDevices(self): 23 """Determines the external USB devices plugged 24 25 @returns plugged_list: List of plugged usb devices names 26 27 """ 28 lsusb_output = self.host.run('lsusb').stdout.strip() 29 items = lsusb_output.split('\n') 30 plugged_list = [] 31 unnamed_device_count = 1 32 for item in items: 33 columns = item.split(' ') 34 if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0: 35 logging.debug('Unnamed device located, adding generic name.') 36 name = 'Unnamed device %d' % unnamed_device_count 37 unnamed_device_count += 1 38 else: 39 name = ' '.join(columns[6:]).strip() 40 plugged_list.append(name) 41 return plugged_list 42 43 44 def plug_peripherals(self, on=True): 45 """Setting USB mux_3 plug status 46 47 @param on: Connect the servo-usb port to DUT(plug) or to servo(unplug) 48 49 """ 50 switch = 'dut_sees_usbkey' 51 if not on: 52 switch = 'servo_sees_usbkey' 53 self.host.servo.set(self.plug_port, switch) 54 self.pluged_status = on 55 56 57 def action_login(self): 58 """Login i.e. runs running client test 59 60 @exception TestFail failed to login within timeout. 61 62 """ 63 self.autotest_client.run_test(self.client_autotest, 64 exit_without_logout=True) 65 66 67 def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg): 68 """Waits till command output is meta 69 70 @param cmd: executed command 71 @param check: string to be checked for in cmd output 72 @param timeout: max time in sec to wait for output 73 @param timeout_msg: timeout failure message 74 75 @returns True if check is found in command output; False otherwise 76 """ 77 start_time = int(time.time()) 78 time_delta = 0 79 command = '%s %s' % (cmd, check) 80 logging.debug('Command: %s', command) 81 while(self.host.run(command, ignore_status=True).exit_status != 0): 82 time_delta = int(time.time()) - start_time 83 if time_delta > timeout: 84 self.add_failure('%s - %d sec' % (timeout_msg, timeout)) 85 return False 86 time.sleep(0.5) 87 logging.debug('Succeeded in :%d sec', time_delta) 88 return True 89 90 91 def suspend_for_time(self, suspend_time=_SUSPEND_TIME): 92 """Calls the host method suspend with suspend_time argument. 93 94 @param suspend_time: time to suspend the device for. 95 96 """ 97 try: 98 self.host.suspend(suspend_time=suspend_time) 99 except error.AutoservSuspendError: 100 pass 101 102 103 def action_suspend(self): 104 """Suspend i.e. powerd_dbus_suspend and wait 105 106 @returns boot_id for the following resume 107 """ 108 boot_id = self.host.get_boot_id() 109 thread = threading.Thread(target=self.suspend_for_time) 110 thread.start() 111 self.host.test_wait_for_sleep(_LONG_TIMEOUT) 112 logging.debug('--- Suspended') 113 self.suspend_status = True 114 return boot_id 115 116 117 def action_resume(self, boot_id): 118 """Resume i.e. press power key and wait 119 120 @param boot_id: boot id obtained prior to suspending 121 122 """ 123 self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) 124 logging.debug('--- Resumed') 125 self.suspend_status = False 126 127 def close_lid(self): 128 """Close lid through servo to suspend the device.""" 129 boot_id = self.host.get_boot_id() 130 logging.info('Closing lid...') 131 self.host.servo.lid_close() 132 self.host.test_wait_for_sleep(_LONG_TIMEOUT) 133 self.suspend_status = True 134 return boot_id 135 136 137 def open_lid(self, boot_id): 138 """Open lid through servo to resume.""" 139 logging.info('Opening lid...') 140 self.host.servo.lid_open() 141 self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) 142 self.suspend_status = False 143 144 def crash_not_detected(self): 145 """Finds new crash files and adds to failures list if any 146 147 @returns True if there were not crashes; False otherwise 148 """ 149 crash_files = self.detect_crash.get_new_crash_files() 150 if crash_files: 151 self.add_failure('CRASH DETECTED: %s' % str(crash_files)) 152 return False 153 return True 154 155 156 def check_plugged_usb_devices(self): 157 """Checks the plugged peripherals match device list. 158 159 @returns True if expected USB peripherals are detected; False otherwise 160 """ 161 result = True 162 if self.pluged_status and self.usb_list != None: 163 # Check for mandatory USb devices passed by usb_list flag 164 for usb_name in self.usb_list: 165 found = self.wait_for_cmd_output( 166 'lsusb | grep -E ', usb_name, _WAIT_DELAY * 4, 167 'Not detecting %s' % usb_name) 168 result = result and found 169 time.sleep(_WAIT_DELAY) 170 on_now = self.getPluggedUsbDevices() 171 if self.pluged_status: 172 if not self.diff_list.issubset(on_now): 173 missing = str(self.diff_list.difference(on_now)) 174 self.add_failure('Missing connected peripheral(s) ' 175 'when plugged: %s ' % missing) 176 result = False 177 else: 178 present = self.diff_list.intersection(on_now) 179 if len(present) > 0: 180 self.add_failure('Still presented peripheral(s) ' 181 'when unplugged: %s ' % str(present)) 182 result = False 183 return result 184 185 186 def check_usb_peripherals_details(self): 187 """Checks the effect from plugged in USB peripherals. 188 189 @returns True if command line output is matched successfuly; Else False 190 """ 191 usb_check_result = True 192 for cmd in self.usb_checks.keys(): 193 out_match_list = self.usb_checks.get(cmd) 194 if cmd.startswith('loggedin:'): 195 if not self.login_status: 196 continue 197 cmd = cmd.replace('loggedin:','') 198 # Run the usb check command 199 for out_match in out_match_list: 200 match_result = self.wait_for_cmd_output( 201 cmd, out_match, _WAIT_DELAY * 4, 202 'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match)) 203 usb_check_result = usb_check_result and match_result 204 return usb_check_result 205 206 207 def check_status(self): 208 """Performs checks after each action: 209 - for USB detected devices 210 - for generated crash files 211 - peripherals effect checks on cmd line 212 213 @returns True if all of the iteration checks pass; False otherwise. 214 """ 215 result = True 216 if not self.suspend_status: 217 # Detect the USB peripherals 218 result = self.check_plugged_usb_devices() 219 # Check for crash files 220 if self.crash_check: 221 result = result and self.crash_not_detected() 222 if self.pluged_status and (self.usb_checks != None): 223 # Check for plugged USB devices details 224 result = result and self.check_usb_peripherals_details() 225 return result 226 227 228 def add_failure(self, reason): 229 """ Adds a failure reason to list of failures to be reported at end 230 231 @param reason: failure reason to record 232 233 """ 234 if self.action_step is not None: 235 self.fail_reasons.append('%s FAILS - %s' % 236 (self.action_step, reason)) 237 238 239 def check_connected_peripherals(self): 240 """ Verifies there are connected usb devices on servo 241 242 @raise error.TestError: if no peripherals are shown 243 """ 244 245 # Collect USB peripherals when unplugged 246 self.plug_peripherals(False) 247 time.sleep(_WAIT_DELAY) 248 off_list = self.getPluggedUsbDevices() 249 250 # Collect USB peripherals when plugged 251 self.plug_peripherals(True) 252 time.sleep(_WAIT_DELAY * 2) 253 on_list = self.getPluggedUsbDevices() 254 255 self.diff_list = set(on_list).difference(set(off_list)) 256 if len(self.diff_list) == 0: 257 # Fail if no devices detected after 258 raise error.TestError('No connected devices were detected. Make ' 259 'sure the devices are connected to USB_KEY ' 260 'and DUT_HUB1_USB on the servo board.') 261 logging.debug('Connected devices list: %s', self.diff_list) 262 263 264 def prep_servo_for_test(self, stress_rack): 265 """Connects servo to DUT and sets servo ports 266 267 @param stress_rack: either to prep servo for stress tests, where 268 usb_mux_1 port should be on. For usb peripherals on usb_mux_3, 269 the port is on, and the oe2,oe4 poers are off. 270 271 @returns port as string to plug/unplug the specific port 272 """ 273 port = _LOWER_USB_PORT 274 self.host.servo.switch_usbkey('dut') 275 self.host.servo.set('dut_hub1_rst1','off') 276 if stress_rack: 277 port = _UPPER_USB_PORT 278 self.host.servo.set(port, 'dut_sees_usbkey') 279 else: 280 self.host.servo.set(_UPPER_USB_PORT, 'servo_sees_usbkey') 281 self.host.servo.set('usb_mux_oe2', 'off') 282 self.host.servo.set('usb_mux_oe4', 'off') 283 time.sleep(_WAIT_DELAY) 284 return port 285 286 287 def cleanup(self): 288 """Disconnect servo hub""" 289 self.plug_peripherals(False) 290 self.host.servo.set('dut_hub1_rst1','on') 291 self.host.reboot() 292 293 294 def run_once(self, host, client_autotest, action_sequence, repeat, 295 usb_list=None, usb_checks=None, 296 crash_check=False, stress_rack=False): 297 self.client_autotest = client_autotest 298 self.host = host 299 self.autotest_client = autotest.Autotest(self.host) 300 self.usb_list = usb_list 301 self.usb_checks = usb_checks 302 self.crash_check = crash_check 303 304 self.suspend_status = False 305 self.login_status = False 306 self.fail_reasons = list() 307 self.action_step = None 308 309 self.plug_port = self.prep_servo_for_test(stress_rack) 310 311 # Unplug, plug, compare usb peripherals, and leave plugged. 312 self.check_connected_peripherals() 313 314 action_sequence = action_sequence.upper() 315 actions = action_sequence.split(',') 316 boot_id = 0 317 self. detect_crash = crash_detector.CrashDetector(self.host) 318 self.detect_crash.remove_crash_files() 319 320 # Run camera client test to gather media_V4L2_test binary. 321 if 'media_v4l2_test' in str(self.usb_checks): 322 self.autotest_client.run_test("camera_V4L2") 323 324 for iteration in xrange(1, repeat + 1): 325 step = 0 326 for action in actions: 327 step += 1 328 action = action.strip() 329 self.action_step = 'STEP %d.%d. %s' % (iteration, step, action) 330 logging.info(self.action_step) 331 332 if action == 'RESUME': 333 self.action_resume(boot_id) 334 time.sleep(_WAIT_DELAY) 335 elif action == 'OPENLID': 336 self.open_lid(boot_id) 337 time.sleep(_WAIT_DELAY) 338 elif action == 'UNPLUG': 339 self.plug_peripherals(False) 340 elif action == 'PLUG': 341 self.plug_peripherals(True) 342 elif self.suspend_status == False: 343 if action.startswith('LOGIN'): 344 if self.login_status: 345 logging.debug('Skipping login. Already logged in.') 346 continue 347 else: 348 self.action_login() 349 self.login_status = True 350 elif action == 'REBOOT': 351 self.host.reboot() 352 time.sleep(_WAIT_DELAY * 3) 353 self.login_status = False 354 elif action == 'SUSPEND': 355 boot_id = self.action_suspend() 356 elif action == 'CLOSELID': 357 boot_id = self.close_lid() 358 else: 359 logging.info('WRONG ACTION: %s .', self.action_step) 360 self.check_status() 361 362 if self.fail_reasons: 363 raise error.TestFail('Failures reported: %s' % 364 str(self.fail_reasons)) 365