• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, re, time
6
7from autotest_lib.server import test
8from autotest_lib.client.common_lib import error
9
10_WAIT_DELAY = 25
11_USB_DIR = '/sys/bus/usb/devices'
12
13class kernel_ExternalUsbPeripheralsDetectionTest(test.test):
14    """Uses servo to repeatedly connect/remove USB devices during boot."""
15    version = 1
16
17
18    def set_hub_power(self, on=True):
19        """Setting USB hub power status
20
21        @param: on To power on the servo-usb hub or not
22
23        """
24        reset = 'off'
25        if not on:
26            reset = 'on'
27        self.host.servo.set('dut_hub1_rst1', reset)
28        self.pluged_status = on
29        time.sleep(_WAIT_DELAY)
30
31
32    def check_usb_peripherals_details(self):
33        """Checks the effect from plugged in USB peripherals.
34
35        @returns True if command line output is matched successfuly; Else False
36        """
37        failed = list()
38        for cmd in self.usb_checks.keys():
39            out_match_list = self.usb_checks.get(cmd)
40            logging.info('Running %s',  cmd)
41
42            # Run the usb check command
43            cmd_out_lines = (self.host.run(cmd, ignore_status=True).
44                             stdout.strip().split('\n'))
45            for out_match in out_match_list:
46                match_result = False
47                for cmd_out_line in cmd_out_lines:
48                    match_result = (match_result or
49                        re.search(out_match, cmd_out_line) != None)
50                if not match_result:
51                    failed.append((cmd,out_match))
52        return failed
53
54
55    def get_usb_device_dirs(self):
56        """Gets the usb device dirs from _USB_DIR path.
57
58        @returns list with number of device dirs else None
59        """
60        usb_dir_list = []
61        cmd = 'ls -1 %s' % _USB_DIR
62        tmp = self.host.run(cmd).stdout.strip().split('\n')
63        for d in tmp:
64            usb_dir_list.append(os.path.join(_USB_DIR, d))
65        return usb_dir_list
66
67
68    def get_vendor_id_dict_from_dut(self, dir_list):
69        """Finds the vendor id from provided dir list.
70
71        @param dir_list: full path of directories
72        @returns dict of all vendor ids vs file path
73        """
74        vendor_id_dict = dict()
75        for d in dir_list:
76            file_name = os.path.join(d, 'idVendor')
77            if self._exists_on(file_name):
78                vendor_id = self.host.run('cat %s' % file_name).stdout.strip()
79                if vendor_id:
80                    vendor_id_dict[vendor_id] = d
81        logging.info('%s', vendor_id_dict)
82        return vendor_id_dict
83
84
85    def _exists_on(self, path):
86        """Checks if file exists on host or not.
87
88        @returns True or False
89        """
90        return self.host.run('ls %s' % path,
91                             ignore_status=True).exit_status == 0
92
93
94
95    def run_once(self, host, usb_checks=None,
96                 vendor_id_dict_control_file=None):
97        """Main function to run the autotest.
98
99        @param host: name of the host
100        @param usb_checks: dictionary defined in control file
101        @param vendor_id_list: dictionary defined in control file
102        """
103        self.host = host
104        self.usb_checks = usb_checks
105
106        self.host.servo.switch_usbkey('dut')
107        self.host.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
108        time.sleep(_WAIT_DELAY)
109
110        self.set_hub_power(False)
111        # Collect the USB devices directories before switching on hub
112        usb_list_dir_off = self.get_usb_device_dirs()
113
114        self.set_hub_power(True)
115        # Collect the USB devices directories after switching on hub
116        usb_list_dir_on = self.get_usb_device_dirs()
117
118        diff_list = list(set(usb_list_dir_on).difference(set(usb_list_dir_off)))
119        if len(diff_list) == 0:
120            # Fail if no devices detected after
121            raise error.TestError('No connected devices were detected. Make '
122                                  'sure the devices are connected to USB_KEY '
123                                  'and DUT_HUB1_USB on the servo board.')
124        logging.debug('Connected devices list: %s', diff_list)
125
126        # Test 1: check USB peripherals info in detail
127        failed = self.check_usb_peripherals_details()
128        if len(failed)> 0:
129            raise error.TestError('USB device not detected %s', str(failed))
130
131        # Test 2: check USB device dir under /sys/bus/usb/devices
132        vendor_ids = {}
133        # Gets a dict idVendor: dir_path
134        vendor_ids = self.get_vendor_id_dict_from_dut(diff_list)
135        for vid in vendor_id_dict_control_file.keys():
136            peripheral = vendor_id_dict_control_file[vid]
137            if vid not in vendor_ids.keys():
138                raise error.TestFail('%s is not detected at %s dir'
139                                     % (peripheral, _USB_DIR))
140            else:
141            # Test 3: check driver symlink and dir for each USB device
142                tmp_list = [device_dir for device_dir in
143                            self.host.run('ls -1 %s' % vendor_ids[vid],
144                            ignore_status=True).stdout.split('\n')
145                            if re.match(r'\d-\d.*:\d\.\d', device_dir)]
146                if not tmp_list:
147                    raise error.TestFail('No driver created/loaded for %s'
148                                         % peripheral)
149                logging.info('---- Drivers for %s ----', peripheral)
150                flag = False
151                for device_dir in tmp_list:
152                    driver_path = os.path.join(vendor_ids[vid],
153                                               '%s/driver' % device_dir)
154                    if self._exists_on(driver_path):
155                        flag = True
156                        link = (self.host.run('ls -l %s | grep ^l'
157                                              '| grep driver'
158                                              % driver_path, ignore_status=True)
159                                              .stdout.strip())
160                        logging.info('%s', link)
161                if not flag:
162                    raise error.TestFail('Driver for %s is not loaded - %s'
163                                         % (peripheral, driver_path))
164