• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import logging, threading, time
7
8from autotest_lib.client.common_lib.cros import crash_detector
9from autotest_lib.server import autotest, test
10from autotest_lib.client.common_lib import error
11
12_CLIENT_TERMINATION_FILE_PATH = '/tmp/simple_login_exit'
13_LONG_TIMEOUT = 200
14_LOWER_USB_PORT = 'usb_mux_sel3'
15_SUSPEND_TIME = 30
16_UPPER_USB_PORT = 'usb_mux_sel1'
17_WAIT_DELAY = 15
18_WAIT_OPENLID_DELAY = 30
19_WAIT_LONG_DELAY = 60
20
21# servo v4.1 controls
22_AUX_USB_PORT = 'aux_usbkey_mux'
23_IMAGE_USB_PORT = 'image_usbkey_mux'
24
25
26class platform_ExternalUsbPeripherals(test.test):
27    """Uses servo to repeatedly connect/remove USB devices during boot."""
28    version = 1
29
30
31    def getPluggedUsbDevices(self):
32        """Determines the external USB devices plugged
33
34        @returns plugged_list: List of plugged usb devices names
35
36        """
37        lsusb_output = self.host.run('lsusb').stdout.strip()
38        items = lsusb_output.split('\n')
39        plugged_list = []
40        unnamed_device_count = 1
41        for item in items:
42            columns = item.split(' ')
43            if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0:
44                logging.debug('Unnamed device located, adding generic name.')
45                name = 'Unnamed device %d' % unnamed_device_count
46                unnamed_device_count += 1
47            else:
48                name = ' '.join(columns[6:]).strip()
49            plugged_list.append(name)
50        return plugged_list
51
52
53    def plug_peripherals(self, on=True):
54        """Setting USB mux_3 plug status
55
56        @param on: Connect the servo-usb port to DUT(plug) or to servo(unplug)
57
58        """
59        switch = 'dut_sees_usbkey'
60        if not on:
61            switch = 'servo_sees_usbkey'
62        self.host.servo.set(self.plug_port, switch)
63        self.pluged_status = on
64
65
66    def client_login(self, client_exit):
67        """Login i.e. runs running client test
68
69        @exception TestFail failed to login within timeout.
70
71        """
72        self.autotest_client.run_test(self.client_autotest,
73                                      exit_without_logout=client_exit)
74
75
76    def action_login(self, login_client_exit=True):
77        """Login i.e. runs running client login
78
79        @param login_client_exit: Exit after login flag.
80
81        """
82        thread = threading.Thread(target=self.client_login,
83                                  args = (login_client_exit, ))
84        thread.start()
85        time.sleep(_WAIT_DELAY)
86
87
88    def action_logout(self):
89        """Logout i.e. runs running client test
90
91        @exception TestFail failed to logout within timeout.
92
93        """
94        self.host.run('touch %s' % _CLIENT_TERMINATION_FILE_PATH)
95        time.sleep(_WAIT_DELAY)
96
97
98    def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg):
99        """Waits till command output is meta
100
101        @param cmd: executed command
102        @param check: string to be checked for in cmd output
103        @param timeout: max time in sec to wait for output
104        @param timeout_msg: timeout failure message
105
106        @returns True if check is found in command output; False otherwise
107        """
108        start_time = int(time.time())
109        time_delta = 0
110        command = '%s %s' % (cmd, check)
111        logging.debug('Command: %s', command)
112        while(self.host.run(command, ignore_status=True).exit_status != 0):
113            time_delta = int(time.time()) - start_time
114            if time_delta > timeout:
115                self.add_failure('%s - %d sec' % (timeout_msg, timeout))
116                return False
117            time.sleep(0.5)
118        logging.debug('Succeeded in :%d sec', time_delta)
119        return True
120
121
122    def suspend_for_time(self, suspend_time=_SUSPEND_TIME):
123        """Calls the host method suspend with suspend_time argument.
124
125        @param suspend_time: time to suspend the device for.
126
127        """
128        try:
129            self.host.suspend(suspend_time=suspend_time)
130        except error.AutoservSuspendError:
131            pass
132
133
134    def action_suspend(self):
135        """Suspend i.e. powerd_dbus_suspend and wait
136
137        @returns boot_id for the following resume
138        """
139        boot_id = self.host.get_boot_id()
140        thread = threading.Thread(target=self.suspend_for_time)
141        thread.start()
142        self.host.test_wait_for_sleep(_LONG_TIMEOUT)
143        logging.debug('--- Suspended')
144        self.suspend_status = True
145        return boot_id
146
147
148    def action_resume(self, boot_id):
149        """Resume i.e. press power key and wait
150
151        @param boot_id: boot id obtained prior to suspending
152
153        """
154        self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
155        logging.debug('--- Resumed')
156        self.suspend_status = False
157
158
159    def close_lid(self):
160        """Close lid through servo to suspend the device."""
161        boot_id = self.host.get_boot_id()
162        logging.info('Closing lid...')
163        self.host.servo.lid_close()
164        self.host.test_wait_for_sleep(_LONG_TIMEOUT)
165        self.suspend_status = True
166        return boot_id
167
168
169    def open_lid(self, boot_id):
170        """Open lid through servo to resume."""
171        logging.info('Opening lid...')
172        self.host.servo.lid_open()
173        self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
174        self.suspend_status = False
175
176
177    def crash_not_detected(self):
178        """Finds new crash files and adds to failures list if any
179
180        @returns True if there were not crashes; False otherwise
181        """
182        crash_files = self.detect_crash.get_new_crash_files()
183        if crash_files:
184            self.add_failure('CRASH DETECTED: %s' % str(crash_files))
185            return False
186        return True
187
188
189    def check_plugged_usb_devices(self):
190        """Checks the plugged peripherals match device list.
191
192        @returns True if expected USB peripherals are detected; False otherwise
193        """
194        result = True
195        if self.pluged_status and self.usb_list != None:
196            # Check for mandatory USb devices passed by usb_list flag
197            for usb_name in self.usb_list:
198                found = self.wait_for_cmd_output(
199                    'lsusb | grep -E ', usb_name, _WAIT_LONG_DELAY,
200                    'Not detecting %s' % usb_name)
201                result = result and found
202        time.sleep(_WAIT_DELAY)
203        on_now = self.getPluggedUsbDevices()
204        if self.pluged_status:
205            if not self.diff_list.issubset(on_now):
206                missing = str(self.diff_list.difference(on_now))
207                self.add_failure('Missing connected peripheral(s) '
208                                 'when plugged: %s ' % missing)
209                result = False
210        else:
211            present = self.diff_list.intersection(on_now)
212            if len(present) > 0:
213                self.add_failure('Still presented peripheral(s) '
214                                 'when unplugged: %s ' % str(present))
215                result = False
216        return result
217
218
219    def check_usb_peripherals_details(self):
220        """Checks the effect from plugged in USB peripherals.
221
222        @returns True if command line output is matched successfuly; Else False
223        """
224        usb_check_result = True
225        for cmd in self.usb_checks.keys():
226            out_match_list = self.usb_checks.get(cmd)
227            if cmd.startswith('loggedin:'):
228                if not self.login_status:
229                    continue
230                cmd = cmd.replace('loggedin:', '')
231            board = self.host.get_board().split(':')[1].lower()
232            # Run the usb check command
233            for out_match in out_match_list:
234                match_result = self.wait_for_cmd_output(
235                    cmd, out_match, _WAIT_LONG_DELAY,
236                    'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match))
237                usb_check_result = usb_check_result and match_result
238        return usb_check_result
239
240
241    def check_status(self):
242        """Performs checks after each action:
243            - for USB detected devices
244            - for generated crash files
245            - peripherals effect checks on cmd line
246
247        @returns True if all of the iteration checks pass; False otherwise.
248        """
249        result = True
250        if not self.suspend_status:
251            # Detect the USB peripherals
252            result = self.check_plugged_usb_devices()
253            # Check for crash files
254            if self.crash_check:
255                result = result and self.crash_not_detected()
256            if self.pluged_status and (self.usb_checks != None):
257                # Check for plugged USB devices details
258                result = result and self.check_usb_peripherals_details()
259        return result
260
261
262    def add_failure(self, reason):
263        """ Adds a failure reason to list of failures to be reported at end
264
265        @param reason: failure reason to record
266
267        """
268        if self.action_step is not None:
269            self.fail_reasons.append('%s FAILS - %s' %
270                                     (self.action_step, reason))
271
272
273    def check_connected_peripherals(self):
274        """ Verifies there are connected usb devices on servo
275
276        @raise error.TestError: if no peripherals are shown
277        """
278
279        # Collect USB peripherals when unplugged
280        self.plug_peripherals(False)
281        time.sleep(_WAIT_DELAY)
282        off_list = self.getPluggedUsbDevices()
283
284        # Collect USB peripherals when plugged
285        self.plug_peripherals(True)
286        time.sleep(_WAIT_LONG_DELAY)
287        on_list = self.getPluggedUsbDevices()
288
289        self.diff_list = set(on_list).difference(set(off_list))
290        if len(self.diff_list) == 0:
291            # Fail if no devices detected after
292            raise error.TestError('No connected devices were detected. Make '
293                                  'sure the devices are connected to USB_KEY '
294                                  'and DUT_HUB1_USB on the servo board.')
295        logging.debug('Connected devices list: %s', self.diff_list)
296
297
298    def prep_servo_for_test(self):
299        """Connects servo to DUT  and sets servo ports
300
301        @returns port as string to plug/unplug the specific port
302        """
303        if 'servo_v4p1' in self.servo_type:
304            port = _AUX_USB_PORT
305            self.host.servo.set(_IMAGE_USB_PORT, 'servo_sees_usbkey')
306        else:
307            port = _LOWER_USB_PORT
308            self.host.servo.switch_usbkey('dut')
309            self.host.servo.set('dut_hub1_rst1','off')
310            self.host.servo.set(_UPPER_USB_PORT, 'servo_sees_usbkey')
311            self.host.servo.set('usb_mux_oe2', 'off')
312            self.host.servo.set('usb_mux_oe4', 'off')
313        time.sleep(_WAIT_DELAY)
314        return port
315
316
317    def cleanup(self):
318        """Disconnect servo hub"""
319        self.plug_peripherals(False)
320        self.action_logout()
321        if 'servo_v4p1' not in self.servo_type:
322            self.host.servo.set('dut_hub1_rst1','on')
323        self.host.run('reboot now', ignore_status=True)
324        self.host.test_wait_for_boot()
325
326
327    def run_once(self, host, client_autotest, action_sequence, repeat,
328                 usb_list=None, usb_checks=None,
329                 crash_check=False):
330        self.client_autotest = client_autotest
331        self.host = host
332        self.autotest_client = autotest.Autotest(self.host)
333        self.usb_list = usb_list
334        self.usb_checks = usb_checks
335        self.crash_check = crash_check
336
337        self.suspend_status = False
338        self.login_status = False
339        self.fail_reasons = list()
340        self.action_step = None
341
342        self.servo_type = self.host.servo.get_servo_type()
343
344        self.plug_port = self.prep_servo_for_test()
345
346        # Unplug, plug, compare usb peripherals, and leave plugged.
347        self.check_connected_peripherals()
348
349        action_sequence = action_sequence.upper()
350        # Check for if board type is NOT chromebook and skip lid_close_open tests
351        if (not (host.get_board_type() == 'CHROMEBOOK')) and \
352                ('CLOSELID' in action_sequence):
353            raise error.TestNAError('No lid on DUT. Test Skipped')
354        actions = action_sequence.split(',')
355        boot_id = 0
356        self.detect_crash = crash_detector.CrashDetector(self.host)
357        self.detect_crash.remove_crash_files()
358
359        for iteration in range(1, repeat + 1):
360            step = 0
361            for action in actions:
362                step += 1
363                action = action.strip()
364                self.action_step = 'STEP %d.%d. %s' % (iteration, step, action)
365                logging.info(self.action_step)
366
367                if action == 'RESUME':
368                    self.action_resume(boot_id)
369                    time.sleep(_WAIT_DELAY)
370                elif action == 'OPENLID':
371                    self.open_lid(boot_id)
372                    time.sleep(_WAIT_OPENLID_DELAY)
373                elif action == 'UNPLUG':
374                    self.plug_peripherals(False)
375                elif action == 'PLUG':
376                    self.plug_peripherals(True)
377                elif self.suspend_status == False:
378                    if action.startswith('LOGIN'):
379                        if self.login_status:
380                            logging.debug('Skipping login. Already logged in.')
381                            continue
382                        else:
383                            self.action_login('LOGOUT' not in actions)
384                            self.login_status = True
385                    elif action == 'LOGOUT':
386                        if self.login_status:
387                            self.action_logout()
388                            self.login_status = False
389                        else:
390                            logging.debug('Skipping logout. Not logged in.')
391                    elif action == 'REBOOT':
392                        self.host.reboot()
393                        time.sleep(_WAIT_LONG_DELAY)
394                        self.login_status = False
395                    elif action == 'SUSPEND':
396                        boot_id = self.action_suspend()
397                    elif action == 'CLOSELID':
398                        boot_id = self.close_lid()
399                else:
400                    logging.info('WRONG ACTION: %s .', self.action_step)
401                self.check_status()
402
403            if self.fail_reasons:
404                raise error.TestFail('Failures reported: %s' %
405                                     str(self.fail_reasons))
406