• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2016 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, os, time
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.client.common_lib.cros import tpm_utils
9from autotest_lib.server import test
10from autotest_lib.server.cros.multimedia import remote_facade_factory
11
12
13_SHORT_TIMEOUT = 2
14_WAIT_DELAY = 15
15_USB_DIR = '/sys/bus/usb/devices'
16
17
18class enterprise_CFM_USBPeripheralHotplugDetect(test.test):
19    """Uses servo to hotplug and detect USB peripherals on CrOS and hotrod. It
20    compares attached audio/video peripheral names on CrOS against what hotrod
21    detects."""
22    version = 1
23
24
25    def _set_hub_power(self, on=True):
26        """Setting USB hub power status
27
28        @param on: To power on the servo usb hub or not.
29
30        """
31        reset = 'off'
32        if not on:
33            reset = 'on'
34        self.client.servo.set('dut_hub1_rst1', reset)
35        time.sleep(_WAIT_DELAY)
36
37
38    def _get_usb_device_dirs(self):
39        """Gets usb device dirs from _USB_DIR path.
40
41        @returns list with number of device dirs else Non
42
43        """
44        usb_dir_list = list()
45        cmd = 'ls %s' % _USB_DIR
46        cmd_output = self.client.run(cmd).stdout.strip().split('\n')
47        for d in cmd_output:
48            usb_dir_list.append(os.path.join(_USB_DIR, d))
49        return usb_dir_list
50
51
52    def _get_usb_device_type(self, vendor_id):
53        """Gets usb device type info from lsusb output based on vendor id.
54
55        @vendor_id: Device vendor id.
56        @returns list of device types associated with vendor id
57
58        """
59        details_list = list()
60        cmd = 'lsusb -v -d ' + vendor_id + ': | head -150'
61        cmd_out = self.client.run(cmd).stdout.strip().split('\n')
62        for line in cmd_out:
63            if (any(phrase in line for phrase in ('bInterfaceClass',
64                    'wTerminalType'))):
65                details_list.append(line.split(None)[2])
66
67        return list(set(details_list))
68
69
70    def _get_product_info(self, directory, prod_string):
71        """Gets the product name from device path.
72
73        @param directory: Driver path for USB device.
74        @param prod_string: Device attribute string.
75        @returns the output of the cat command
76
77        """
78        product_file_name = os.path.join(directory, prod_string)
79        if self._file_exists_on_host(product_file_name):
80            return self.client.run('cat %s' % product_file_name).stdout.strip()
81        return None
82
83
84    def _parse_device_dir_for_info(self, dir_list, peripheral_whitelist_dict):
85        """Uses device path and vendor id to get device type attibutes.
86
87        @param dir_list: Complete list of device directories.
88        @returns cros_peripheral_dict with device names
89
90        """
91        cros_peripheral_dict = {'Camera': None, 'Microphone': None,
92                                'Speaker': None}
93
94        for d_path in dir_list:
95            file_name = os.path.join(d_path, 'idVendor')
96            if self._file_exists_on_host(file_name):
97                vendor_id = self.client.run('cat %s' % file_name).stdout.strip()
98                product_id = self._get_product_info(d_path, 'idProduct')
99                vId_pId = vendor_id + ':' + product_id
100                device_types = self._get_usb_device_type(vendor_id)
101                if vId_pId in peripheral_whitelist_dict:
102                    if 'Microphone' in device_types:
103                        cros_peripheral_dict['Microphone'] = (
104                                peripheral_whitelist_dict.get(vId_pId))
105                    if 'Speaker' in device_types:
106                        cros_peripheral_dict['Speaker'] = (
107                                peripheral_whitelist_dict.get(vId_pId))
108                    if 'Video' in device_types:
109                        cros_peripheral_dict['Camera'] = (
110                                peripheral_whitelist_dict.get(vId_pId))
111
112        for device_type, is_found in cros_peripheral_dict.iteritems():
113            if not is_found:
114                cros_peripheral_dict[device_type] = 'Not Found'
115
116        return cros_peripheral_dict
117
118
119    def _file_exists_on_host(self, path):
120        """Checks if file exists on host.
121
122        @param path: File path
123        @returns True or False
124
125        """
126        return self.client.run('ls %s' % path,
127                             ignore_status=True).exit_status == 0
128
129
130    def _enroll_device_and_skip_oobe(self):
131        """Enroll device into CFM and skip CFM oobe."""
132        self.cfm_facade.enroll_device()
133        self.cfm_facade.restart_chrome_for_cfm()
134        self.cfm_facade.wait_for_telemetry_commands()
135        self.cfm_facade.wait_for_oobe_start_page()
136
137        if not self.cfm_facade.is_oobe_start_page():
138            raise error.TestFail('CFM did not reach oobe screen.')
139
140        self.cfm_facade.skip_oobe_screen()
141        time.sleep(_SHORT_TIMEOUT)
142
143
144    def _set_preferred_peripherals(self, cros_peripherals):
145        """Set perferred peripherals.
146
147        @param cros_peripherals: Dictionary of peripherals
148        """
149        avail_mics = self.cfm_facade.get_mic_devices()
150        avail_speakers = self.cfm_facade.get_speaker_devices()
151        avail_cameras = self.cfm_facade.get_camera_devices()
152
153        if cros_peripherals.get('Microphone') in avail_mics:
154            self.cfm_facade.set_preferred_mic(
155                    cros_peripherals.get('Microphone'))
156        if cros_peripherals.get('Speaker') in avail_speakers:
157            self.cfm_facade.set_preferred_speaker(
158                    cros_peripherals.get('Speaker'))
159        if cros_peripherals.get('Camera') in avail_cameras:
160            self.cfm_facade.set_preferred_camera(
161                    cros_peripherals.get('Camera'))
162
163
164    def _peripheral_detection(self):
165        """Get attached peripheral information."""
166        cfm_peripheral_dict = {'Microphone': None, 'Speaker': None,
167                               'Camera': None}
168
169        cfm_peripheral_dict['Microphone'] = self.cfm_facade.get_preferred_mic()
170        cfm_peripheral_dict['Speaker'] = self.cfm_facade.get_preferred_speaker()
171        cfm_peripheral_dict['Camera'] = self.cfm_facade.get_preferred_camera()
172
173        for device_type, is_found in cfm_peripheral_dict.iteritems():
174            if not is_found:
175                cfm_peripheral_dict[device_type] = 'Not Found'
176
177        return cfm_peripheral_dict
178
179
180    def run_once(self, host, peripheral_whitelist_dict):
181        """Main function to run autotest.
182
183        @param host: Host object representing the DUT.
184
185        """
186        self.client = host
187
188        factory = remote_facade_factory.RemoteFacadeFactory(
189                host, no_chrome=True)
190        self.cfm_facade = factory.create_cfm_facade()
191
192        tpm_utils.ClearTPMOwnerRequest(self.client)
193
194        if self.client.servo:
195            self.client.servo.switch_usbkey('dut')
196            self.client.servo.set('usb_mux_sel3', 'dut_sees_usbkey')
197            time.sleep(_WAIT_DELAY)
198            self._set_hub_power(True)
199
200        usb_list_dir_on = self._get_usb_device_dirs()
201
202        cros_peripheral_dict = self._parse_device_dir_for_info(usb_list_dir_on,
203                peripheral_whitelist_dict)
204        logging.debug('Peripherals detected by CrOS: %s', cros_peripheral_dict)
205
206        try:
207            self._enroll_device_and_skip_oobe()
208            self._set_preferred_peripherals(cros_peripheral_dict)
209            cfm_peripheral_dict = self._peripheral_detection()
210            logging.debug('Peripherals detected by hotrod: %s',
211                          cfm_peripheral_dict)
212        except Exception as e:
213            raise error.TestFail(str(e))
214
215        tpm_utils.ClearTPMOwnerRequest(self.client)
216
217        cros_peripherals = set(cros_peripheral_dict.iteritems())
218        cfm_peripherals = set(cfm_peripheral_dict.iteritems())
219
220        peripheral_diff = cros_peripherals.difference(cfm_peripherals)
221
222        if peripheral_diff:
223            no_match_list = list()
224            for item in peripheral_diff:
225                no_match_list.append(item[0])
226            raise error.TestFail('Following peripherals do not match: %s' %
227                                 ', '.join(no_match_list))
228