# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging, threading, time from autotest_lib.client.cros.crash.crash_test import CrashTest from autotest_lib.server import autotest, test from autotest_lib.client.common_lib import error _WAIT_DELAY = 15 _LONG_TIMEOUT = 200 _SUSPEND_TIME = 30 _CRASH_PATHS = [CrashTest._SYSTEM_CRASH_DIR.replace("/crash",""), CrashTest._FALLBACK_USER_CRASH_DIR.replace("/crash",""), CrashTest._USER_CRASH_DIRS.replace("/crash","")] class platform_ExternalUsbPeripherals(test.test): """Uses servo to repeatedly connect/remove USB devices during boot.""" version = 1 def getPluggedUsbDevices(self): """Determines the external USB devices plugged @returns plugged_list: List of plugged usb devices names """ lsusb_output = self.host.run('lsusb').stdout.strip() items = lsusb_output.split('\n') plugged_list = [] unnamed_device_count = 1 for item in items: columns = item.split(' ') if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0: logging.debug('Unnamed device located, adding generic name.') name = 'Unnamed device %d' % unnamed_device_count unnamed_device_count += 1 else: name = ' '.join(columns[6:]).strip() #Avoid servo components if not name.startswith('Standard Microsystems Corp'): plugged_list.append(name) return plugged_list def set_hub_power(self, on=True): """Setting USB hub power status @param on: To power on the servo-usb hub or not """ reset = 'off' if not on: reset = 'on' self.host.servo.set('dut_hub1_rst1', reset) self.pluged_status = on def action_login(self): """Login i.e. runs running client test @exception TestFail failed to login within timeout. """ self.autotest_client.run_test(self.client_autotest, exit_without_logout=True) def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg): """Waits till command output is meta @param cmd: executed command @param check: string to be checked for in cmd output @param timeout: max time in sec to wait for output @param timeout_msg: timeout failure message @returns True if check is found in command output; False otherwise """ start_time = int(time.time()) time_delta = 0 command = '%s %s' % (cmd, check) logging.debug('Command: %s', command) while(self.host.run(command, ignore_status=True).exit_status != 0): time_delta = int(time.time()) - start_time if time_delta > timeout: self.add_failure('%s - %d sec' % (timeout_msg, timeout)) return False time.sleep(0.5) logging.debug('Succeeded in :%d sec', time_delta) return True def suspend_for_time(self, suspend_time=_SUSPEND_TIME): """Calls the host method suspend with suspend_time argument. @param suspend_time: time to suspend the device for. """ try: self.host.suspend(suspend_time=suspend_time) except error.AutoservSuspendError: pass def action_suspend(self): """Suspend i.e. powerd_dbus_suspend and wait @returns boot_id for the following resume """ boot_id = self.host.get_boot_id() thread = threading.Thread(target=self.suspend_for_time) thread.start() self.host.test_wait_for_sleep(_LONG_TIMEOUT) logging.debug('--- Suspended') self.suspend_status = True return boot_id def action_resume(self, boot_id): """Resume i.e. press power key and wait @param boot_id: boot id obtained prior to suspending """ self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) logging.debug('--- Resumed') self.suspend_status = False def close_lid(self): """Close lid through servo to suspend the device.""" boot_id = self.host.get_boot_id() logging.info('Closing lid...') self.host.servo.lid_close() self.host.test_wait_for_sleep(_LONG_TIMEOUT) self.suspend_status = True return boot_id def open_lid(self, boot_id): """Open lid through servo to resume.""" logging.info('Opening lid...') self.host.servo.lid_open() self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT) self.suspend_status = False def crash_not_detected(self, crash_path): """Check for kernel, browser, process crashes @param crash_path: Crash files path @returns True if there were not crashes; False otherwise """ result = True if str(self.host.run('ls %s' % crash_path, ignore_status=True)).find('crash') != -1: crash_out = self.host.run('ls %s/crash/' % crash_path, ignore_status=True).stdout crash_files = crash_out.strip().split('\n') for crash_file in crash_files: if crash_file.find('.meta') != -1 and \ crash_file.find('kernel_warning') == -1: self.add_failure('CRASH DETECTED in %s/crash: %s' % (crash_path, crash_file)) result = False return result def check_plugged_usb_devices(self): """Checks the plugged peripherals match device list. @returns True if expected USB peripherals are detected; False otherwise """ result = True if self.pluged_status and self.usb_list != None: # Check for mandatory USb devices passed by usb_list flag for usb_name in self.usb_list: found = self.wait_for_cmd_output( 'lsusb | grep -E ', usb_name, _WAIT_DELAY * 4, 'Not detecting %s' % usb_name) result = result and found time.sleep(_WAIT_DELAY) on_now = self.getPluggedUsbDevices() if self.pluged_status: if not self.diff_list.issubset(on_now): missing = str(self.diff_list.difference(on_now)) self.add_failure('Missing connected peripheral(s) ' 'when plugged: %s ' % missing) result = False else: present = self.diff_list.intersection(on_now) if len(present) > 0: self.add_failure('Still presented peripheral(s) ' 'when unplugged: %s ' % str(present)) result = False return result def check_usb_peripherals_details(self): """Checks the effect from plugged in USB peripherals. @returns True if command line output is matched successfuly; Else False """ usb_check_result = True for cmd in self.usb_checks.keys(): out_match_list = self.usb_checks.get(cmd) if cmd.startswith('loggedin:'): if not self.login_status: continue cmd = cmd.replace('loggedin:','') # Run the usb check command for out_match in out_match_list: match_result = self.wait_for_cmd_output( cmd, out_match, _WAIT_DELAY * 4, 'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match)) usb_check_result = usb_check_result and match_result return usb_check_result def check_status(self): """Performs checks after each action: - for USB detected devices - for generated crash files - peripherals effect checks on cmd line @returns True if all of the iteration checks pass; False otherwise. """ result = True if not self.suspend_status: # Detect the USB peripherals result = self.check_plugged_usb_devices() # Check for crash files if self.crash_check: for crash_path in _CRASH_PATHS: result = result and self.crash_not_detected(crash_path) if self.pluged_status and (self.usb_checks != None): # Check for plugged USB devices details result = result and self.check_usb_peripherals_details() return result def remove_crash_data(self): """Delete crash meta files if present""" for crash_path in _CRASH_PATHS: if not self.crash_not_detected(crash_path): self.host.run('rm -rf %s/crash' % crash_path, ignore_status=True) def add_failure(self, reason): """ Adds a failure reason to list of failures to be reported at end @param reason: failure reason to record """ if self.action_step is not None: self.fail_reasons.append('%s FAILS - %s' % (self.action_step, reason)) def cleanup(self): """Disconnect servo hub""" self.set_hub_power(False) self.host.servo.set('usb_mux_sel3', 'servo_sees_usbkey') self.host.reboot() def run_once(self, host, client_autotest, action_sequence, repeat, usb_list=None, usb_checks=None, crash_check=False, stress_rack=False): self.client_autotest = client_autotest self.host = host self.autotest_client = autotest.Autotest(self.host) self.usb_list = usb_list self.usb_checks = usb_checks self.crash_check = crash_check self.suspend_status = False self.login_status = False self.fail_reasons = list() self.action_step = None self.host.servo.switch_usbkey('dut') self.set_hub_power(False) if (stress_rack): self.host.servo.set('usb_mux_sel1', 'dut_sees_usbkey') else: self.host.servo.set('usb_mux_sel1', 'servo_sees_usbkey') self.host.servo.set('usb_mux_oe2', 'off') self.host.servo.set('usb_mux_oe4', 'off') self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey') time.sleep(_WAIT_DELAY) # Collect USB peripherals when unplugged self.set_hub_power(False) time.sleep(_WAIT_DELAY) off_list = self.getPluggedUsbDevices() # Collect USB peripherals when plugged self.set_hub_power(True) time.sleep(_WAIT_DELAY * 2) on_list = self.getPluggedUsbDevices() self.diff_list = set(on_list).difference(set(off_list)) if len(self.diff_list) == 0: # Fail if no devices detected after raise error.TestError('No connected devices were detected. Make ' 'sure the devices are connected to USB_KEY ' 'and DUT_HUB1_USB on the servo board.') logging.debug('Connected devices list: %s', self.diff_list) board = host.get_board().split(':')[1] action_sequence = action_sequence.upper() actions = action_sequence.split(',') boot_id = 0 self.remove_crash_data() for iteration in xrange(1, repeat + 1): step = 0 for action in actions: step += 1 action = action.strip() self.action_step = 'STEP %d.%d. %s' % (iteration, step, action) logging.info(self.action_step) if action == 'RESUME': self.action_resume(boot_id) time.sleep(_WAIT_DELAY) elif action == 'OPENLID': self.open_lid(boot_id) time.sleep(_WAIT_DELAY) elif action == 'UNPLUG': self.set_hub_power(False) elif action == 'PLUG': self.set_hub_power(True) elif self.suspend_status == False: if action.startswith('LOGIN'): if self.login_status: logging.debug('Skipping login. Already logged in.') continue else: self.action_login() self.login_status = True elif action == 'REBOOT': self.host.reboot() time.sleep(_WAIT_DELAY * 3) self.login_status = False elif action == 'SUSPEND': boot_id = self.action_suspend() elif action == 'CLOSELID': boot_id = self.close_lid() else: logging.info('WRONG ACTION: %s .', self.action_step) self.check_status() if self.fail_reasons: raise error.TestFail('Failures reported: %s' % str(self.fail_reasons))