# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging, os, re, time, random from autotest_lib.client.bin import utils from autotest_lib.server import test from autotest_lib.client.common_lib import error from autotest_lib.server.cros.usb_mux_controller import USBMuxController _WAIT_DELAY = 5 _USB_DIR = '/sys/bus/usb/devices' MAX_PORTS = 8 TMP_FAILED_TEST_LIST = list() PORT_BEING_TESTED = list() class kernel_ExternalUsbPeripheralsDetectionStress(test.test): """Uses USB multiplexer to repeatedly connect and disconnect USB devices.""" version = 1 def set_hub_power(self, on=True): """Setting USB hub power status @param on: To power on the servo-usb hub or not. """ reset = 'off' if not on: reset = 'on' self.host.servo.set('dut_hub1_rst1', reset) time.sleep(_WAIT_DELAY) def check_manufacturer_and_product_info( self, vId, pId, manufacturer, pName): """Check manufacturer and product info from lsusb against dict values. @param vId: Vendor id of the connected USB device. @param pId: Product id of the connected USB device. @param manufacturer: Manufacturer name of the connected USB device. @param pName: Product name of the connected USB device @param result: To track test result. @return result value """ result = True manu_cmd = ('lsusb -v -d ' + vId + ':' + pId + ' | grep iManufacturer') prod_cmd = ('lsusb -v -d ' + vId + ':' + pId + ' | grep iProduct') manu_cmd_output = (self.host.run(manu_cmd, ignore_status=True). stdout.strip()) prod_cmd_output = (self.host.run(prod_cmd, ignore_status=True). stdout.strip()) manu_verify = 'iManufacturer.*' + manufacturer prod_verify = 'iProduct.*' + pName match_result_manu = re.search(manu_verify, manu_cmd_output) != None match_result_prod = re.search(prod_verify, prod_cmd_output) != None if not match_result_manu or not match_result_prod: logging.debug('Manufacturer or productName do not match.') result = False return result def check_driver_symlink_and_dir(self, devicePath, productName): """Check driver symlink and dir against devicePath value from dict. @param devicePath: Device driver path. @param productName: Product name of the connected USB device. @param result: To track test result. @return result value """ result = True tmp_list = [device_dir for device_dir in self.host.run('ls -1 %s' % devicePath, ignore_status=True).stdout.split('\n') if re.match(r'\d-\d.*:\d\.\d', device_dir)] if not tmp_list: logging.debug('No driver created/loaded for %s', productName) result = False flag = False for device_dir in tmp_list: driver_path = os.path.join(devicePath, '%s/driver' % device_dir) if self._exists_on(driver_path): flag = True link = (self.host.run('ls -l %s | grep ^l' '| grep driver' % driver_path, ignore_status=True) .stdout.strip()) logging.info('%s', link) if not flag: logging.debug('Device driver not found') result = False return result def check_usb_peripherals_details(self, connected_device_dict): """Checks USB peripheral details against the values in dictionary. @param connected_device_dict: Dictionary of device attributes. """ result = True usbPort = connected_device_dict['usb_port'] PORT_BEING_TESTED.append(usbPort) self.usb_mux.enable_port(usbPort) vendorId = connected_device_dict['deviceInfo']['vendorId'] productId = connected_device_dict['deviceInfo']['productId'] manufacturer = connected_device_dict['deviceInfo']['manufacturer'] productName = connected_device_dict['deviceInfo']['productName'] lsusbOutput = connected_device_dict['deviceInfo']['lsusb'] devicePath = connected_device_dict['deviceInfo']['devicePath'] try: utils.poll_for_condition( lambda: self.host.path_exists(devicePath), exception=utils.TimeoutError('Trouble finding USB device ' 'on port %d' % usbPort), timeout=15, sleep_interval=1) except utils.TimeoutError: logging.debug('Trouble finding USB device on port %d', usbPort) result = False pass if not ((vendorId + ':' + productId in lsusbOutput) and self.check_manufacturer_and_product_info( vendorId, productId, manufacturer, productName) and self.check_driver_symlink_and_dir(devicePath, productName)): result = False self.usb_mux.disable_all_ports() try: utils.poll_for_condition( lambda: not self.host.path_exists(devicePath), exception=utils.TimeoutError('Device driver path does not ' 'disappear after device is disconnected.'), timeout=15, sleep_interval=1) except utils.TimeoutError: logging.debug('Device driver path is still present after device is ' 'disconnected from the DUT.') result = False pass if result is False: logging.debug('Test failed on port %s.', usbPort) TMP_FAILED_TEST_LIST.append(usbPort) def get_usb_device_dirs(self): """Gets the usb device dirs from _USB_DIR path. @returns list with number of device dirs else None """ usb_dir_list = list() cmd = 'ls -1 %s' % _USB_DIR cmd_output = self.host.run(cmd).stdout.strip().split('\n') for d in cmd_output: usb_dir_list.append(os.path.join(_USB_DIR, d)) return usb_dir_list def parse_device_dir_for_info(self, dir_list, usb_device_dict): """Uses vendorId/device path and to get other device attributes. @param dir_list: Complete path of directories. @param usb_device_dict: Dictionary to store device attributes. @returns usb_device_dict with device attributes """ for d_path in dir_list: file_name = os.path.join(d_path, 'idVendor') if self._exists_on(file_name): vendor_id = self.host.run('cat %s' % file_name).stdout.strip() if vendor_id: usb_device_dict['deviceInfo']['vendorId'] = vendor_id usb_device_dict['deviceInfo']['devicePath'] = d_path usb_device_dict['deviceInfo']['productId'] = ( self.get_product_info(d_path, 'idProduct')) usb_device_dict['deviceInfo']['productName'] = ( self.get_product_info(d_path, 'product')) usb_device_dict['deviceInfo']['manufacturer'] = ( self.get_product_info(d_path, 'manufacturer')) return usb_device_dict def get_product_info(self, directory, prod_string): """Gets the product id, name and manufacturer info from device path. @param directory: Driver path for the USB device. @param prod_string: Device attribute string. returns the output of the cat command """ product_file_name = os.path.join(directory, prod_string) if self._exists_on(product_file_name): return self.host.run('cat %s' % product_file_name).stdout.strip() return None def _exists_on(self, path): """Checks if file exists on host or not. @returns True or False """ return self.host.run('ls %s' % path, ignore_status=True).exit_status == 0 def check_lsusb_diff(self, original_lsusb_output): """Compare LSUSB output for before and after connecting each device. @param original_lsusb_output: lsusb output prior to connecting device. @returns the difference between new and old lsusb outputs """ lsusb_output = (self.host.run('lsusb', ignore_status=True). stdout.strip().split('\n')) lsusb_diff = (list(set(lsusb_output). difference(set(original_lsusb_output)))) return lsusb_diff def run_once(self, host, loop_count): """Main function to run the autotest. @param host: Host object representing the DUT. @param loop_count: Number of iteration cycles. @raise error.TestFail if one or more USB devices are not detected """ self.host = host self.usb_mux = USBMuxController(self.host) # Make sure all USB ports are disabled prior to starting the test. self.usb_mux.disable_all_ports() self.host.servo.switch_usbkey('dut') self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey') self.set_hub_power(False) # Collect the USB devices directories before switching on hub usb_list_dir_off = self.get_usb_device_dirs() self.set_hub_power(True) # Collect the USB devices directories after switching on hub usb_list_dir_on = self.get_usb_device_dirs() lsusb_original_out = (self.host.run('lsusb', ignore_status=True). stdout.strip().split('\n')) list_of_usb_device_dictionaries = list() usb_port = 0 # Extract connected USB device information and store it in a dict. while usb_port < MAX_PORTS: usb_device_dict = {'usb_port':None,'deviceInfo': {'devicePath':None,'vendorId':None,'productId':None, 'productName':None,'manufacturer':None,'lsusb':None}} usb_device_dir_list = list() self.usb_mux.enable_port(usb_port) try: utils.poll_for_condition( lambda: self.check_lsusb_diff(lsusb_original_out), exception=utils.TimeoutError('No USB device on port ' '%d' % usb_port), timeout=_WAIT_DELAY, sleep_interval=1) except utils.TimeoutError: logging.debug('No USB device found on port %d', usb_port) pass # Maintain list of associated dirs for each connected USB device for device in self.get_usb_device_dirs(): if device not in usb_list_dir_on: usb_device_dir_list.append(device) usb_device_dict = self.parse_device_dir_for_info( usb_device_dir_list, usb_device_dict) lsusb_diff = self.check_lsusb_diff(lsusb_original_out) if lsusb_diff: usb_device_dict['usb_port'] = usb_port usb_device_dict['deviceInfo']['lsusb'] = lsusb_diff[0] list_of_usb_device_dictionaries.append(usb_device_dict) self.usb_mux.disable_all_ports() try: utils.poll_for_condition( lambda: not self.check_lsusb_diff(lsusb_original_out), exception=utils.TimeoutError('Timed out waiting for ' 'USB device to disappear.'), timeout=_WAIT_DELAY, sleep_interval=1) except utils.TimeoutError: logging.debug('Timed out waiting for USB device to disappear.') pass logging.info('%s', usb_device_dict) usb_port += 1 if len(list_of_usb_device_dictionaries) == 0: # Fails if no devices detected raise error.TestError('No connected devices were detected. Make ' 'sure the devices are connected to USB_KEY ' 'and DUT_HUB1_USB on the servo board.') logging.info('Connected devices list: %s', list_of_usb_device_dictionaries) # loop_count defines the number of times the USB peripheral details # should be checked and random.choice is used to randomly select one of # the elements from the usb device list specifying the device whose # details should be checked. for i in xrange(loop_count): self.check_usb_peripherals_details( random.choice(list_of_usb_device_dictionaries)) logging.info('Sequence of ports tested with random picker: %s', ', '.join(map(str, PORT_BEING_TESTED))) if TMP_FAILED_TEST_LIST: logging.info('Failed to verify devices on following ports: %s', ', '.join(map(str, TMP_FAILED_TEST_LIST))) raise error.TestFail('Failed to do full device verification on ' 'some ports.')