• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, threading, time
6
7from autotest_lib.client.common_lib.cros import crash_detector
8from autotest_lib.server import autotest, test
9from autotest_lib.client.common_lib import error
10
11_WAIT_DELAY = 15
12_LONG_TIMEOUT = 200
13_LOWER_USB_PORT = 'usb_mux_sel3'
14_SUSPEND_TIME = 30
15_UPPER_USB_PORT = 'usb_mux_sel1'
16
17class platform_ExternalUsbPeripherals(test.test):
18    """Uses servo to repeatedly connect/remove USB devices during boot."""
19    version = 1
20
21
22    def getPluggedUsbDevices(self):
23        """Determines the external USB devices plugged
24
25        @returns plugged_list: List of plugged usb devices names
26
27        """
28        lsusb_output = self.host.run('lsusb').stdout.strip()
29        items = lsusb_output.split('\n')
30        plugged_list = []
31        unnamed_device_count = 1
32        for item in items:
33            columns = item.split(' ')
34            if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0:
35                logging.debug('Unnamed device located, adding generic name.')
36                name = 'Unnamed device %d' % unnamed_device_count
37                unnamed_device_count += 1
38            else:
39                name = ' '.join(columns[6:]).strip()
40            plugged_list.append(name)
41        return plugged_list
42
43
44    def plug_peripherals(self, on=True):
45        """Setting USB mux_3 plug status
46
47        @param on: Connect the servo-usb port to DUT(plug) or to servo(unplug)
48
49        """
50        switch = 'dut_sees_usbkey'
51        if not on:
52            switch = 'servo_sees_usbkey'
53        self.host.servo.set(self.plug_port, switch)
54        self.pluged_status = on
55
56
57    def action_login(self):
58        """Login i.e. runs running client test
59
60        @exception TestFail failed to login within timeout.
61
62        """
63        self.autotest_client.run_test(self.client_autotest,
64                                      exit_without_logout=True)
65
66
67    def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg):
68        """Waits till command output is meta
69
70        @param cmd: executed command
71        @param check: string to be checked for in cmd output
72        @param timeout: max time in sec to wait for output
73        @param timeout_msg: timeout failure message
74
75        @returns True if check is found in command output; False otherwise
76        """
77        start_time = int(time.time())
78        time_delta = 0
79        command = '%s %s' % (cmd, check)
80        logging.debug('Command: %s', command)
81        while(self.host.run(command, ignore_status=True).exit_status != 0):
82            time_delta = int(time.time()) - start_time
83            if time_delta > timeout:
84                self.add_failure('%s - %d sec' % (timeout_msg, timeout))
85                return False
86            time.sleep(0.5)
87        logging.debug('Succeeded in :%d sec', time_delta)
88        return True
89
90
91    def suspend_for_time(self, suspend_time=_SUSPEND_TIME):
92        """Calls the host method suspend with suspend_time argument.
93
94        @param suspend_time: time to suspend the device for.
95
96        """
97        try:
98            self.host.suspend(suspend_time=suspend_time)
99        except error.AutoservSuspendError:
100            pass
101
102
103    def action_suspend(self):
104        """Suspend i.e. powerd_dbus_suspend and wait
105
106        @returns boot_id for the following resume
107        """
108        boot_id = self.host.get_boot_id()
109        thread = threading.Thread(target=self.suspend_for_time)
110        thread.start()
111        self.host.test_wait_for_sleep(_LONG_TIMEOUT)
112        logging.debug('--- Suspended')
113        self.suspend_status = True
114        return boot_id
115
116
117    def action_resume(self, boot_id):
118        """Resume i.e. press power key and wait
119
120        @param boot_id: boot id obtained prior to suspending
121
122        """
123        self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
124        logging.debug('--- Resumed')
125        self.suspend_status = False
126
127    def close_lid(self):
128        """Close lid through servo to suspend the device."""
129        boot_id = self.host.get_boot_id()
130        logging.info('Closing lid...')
131        self.host.servo.lid_close()
132        self.host.test_wait_for_sleep(_LONG_TIMEOUT)
133        self.suspend_status = True
134        return boot_id
135
136
137    def open_lid(self, boot_id):
138        """Open lid through servo to resume."""
139        logging.info('Opening lid...')
140        self.host.servo.lid_open()
141        self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
142        self.suspend_status = False
143
144    def crash_not_detected(self):
145        """Finds new crash files and adds to failures list if any
146
147        @returns True if there were not crashes; False otherwise
148        """
149        crash_files = self.detect_crash.get_new_crash_files()
150        if crash_files:
151            self.add_failure('CRASH DETECTED: %s' % str(crash_files))
152            return False
153        return True
154
155
156    def check_plugged_usb_devices(self):
157        """Checks the plugged peripherals match device list.
158
159        @returns True if expected USB peripherals are detected; False otherwise
160        """
161        result = True
162        if self.pluged_status and self.usb_list != None:
163            # Check for mandatory USb devices passed by usb_list flag
164            for usb_name in self.usb_list:
165                found = self.wait_for_cmd_output(
166                    'lsusb | grep -E ', usb_name, _WAIT_DELAY * 4,
167                    'Not detecting %s' % usb_name)
168                result = result and found
169        time.sleep(_WAIT_DELAY)
170        on_now = self.getPluggedUsbDevices()
171        if self.pluged_status:
172            if not self.diff_list.issubset(on_now):
173                missing = str(self.diff_list.difference(on_now))
174                self.add_failure('Missing connected peripheral(s) '
175                                 'when plugged: %s ' % missing)
176                result = False
177        else:
178            present = self.diff_list.intersection(on_now)
179            if len(present) > 0:
180                self.add_failure('Still presented peripheral(s) '
181                                 'when unplugged: %s ' % str(present))
182                result = False
183        return result
184
185
186    def check_usb_peripherals_details(self):
187        """Checks the effect from plugged in USB peripherals.
188
189        @returns True if command line output is matched successfuly; Else False
190        """
191        usb_check_result = True
192        for cmd in self.usb_checks.keys():
193            out_match_list = self.usb_checks.get(cmd)
194            if cmd.startswith('loggedin:'):
195                if not self.login_status:
196                    continue
197                cmd = cmd.replace('loggedin:','')
198            # Run the usb check command
199            for out_match in out_match_list:
200                match_result = self.wait_for_cmd_output(
201                    cmd, out_match, _WAIT_DELAY * 4,
202                    'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match))
203                usb_check_result = usb_check_result and match_result
204        return usb_check_result
205
206
207    def check_status(self):
208        """Performs checks after each action:
209            - for USB detected devices
210            - for generated crash files
211            - peripherals effect checks on cmd line
212
213        @returns True if all of the iteration checks pass; False otherwise.
214        """
215        result = True
216        if not self.suspend_status:
217            # Detect the USB peripherals
218            result = self.check_plugged_usb_devices()
219            # Check for crash files
220            if self.crash_check:
221                result = result and self.crash_not_detected()
222            if self.pluged_status and (self.usb_checks != None):
223                # Check for plugged USB devices details
224                result = result and self.check_usb_peripherals_details()
225        return result
226
227
228    def add_failure(self, reason):
229        """ Adds a failure reason to list of failures to be reported at end
230
231        @param reason: failure reason to record
232
233        """
234        if self.action_step is not None:
235            self.fail_reasons.append('%s FAILS - %s' %
236                                     (self.action_step, reason))
237
238
239    def check_connected_peripherals(self):
240        """ Verifies there are connected usb devices on servo
241
242        @raise error.TestError: if no peripherals are shown
243        """
244
245        # Collect USB peripherals when unplugged
246        self.plug_peripherals(False)
247        time.sleep(_WAIT_DELAY)
248        off_list = self.getPluggedUsbDevices()
249
250        # Collect USB peripherals when plugged
251        self.plug_peripherals(True)
252        time.sleep(_WAIT_DELAY * 2)
253        on_list = self.getPluggedUsbDevices()
254
255        self.diff_list = set(on_list).difference(set(off_list))
256        if len(self.diff_list) == 0:
257            # Fail if no devices detected after
258            raise error.TestError('No connected devices were detected. Make '
259                                  'sure the devices are connected to USB_KEY '
260                                  'and DUT_HUB1_USB on the servo board.')
261        logging.debug('Connected devices list: %s', self.diff_list)
262
263
264    def prep_servo_for_test(self, stress_rack):
265        """Connects servo to DUT  and sets servo ports
266
267        @param stress_rack: either to prep servo for stress tests, where
268        usb_mux_1 port should be on. For usb peripherals on usb_mux_3,
269        the port is on, and the oe2,oe4 poers are off.
270
271        @returns port as string to plug/unplug the specific port
272        """
273        port = _LOWER_USB_PORT
274        self.host.servo.switch_usbkey('dut')
275        self.host.servo.set('dut_hub1_rst1','off')
276        if stress_rack:
277            port = _UPPER_USB_PORT
278            self.host.servo.set(port, 'dut_sees_usbkey')
279        else:
280            self.host.servo.set(_UPPER_USB_PORT, 'servo_sees_usbkey')
281            self.host.servo.set('usb_mux_oe2', 'off')
282            self.host.servo.set('usb_mux_oe4', 'off')
283        time.sleep(_WAIT_DELAY)
284        return port
285
286
287    def cleanup(self):
288        """Disconnect servo hub"""
289        self.plug_peripherals(False)
290        self.host.servo.set('dut_hub1_rst1','on')
291        self.host.reboot()
292
293
294    def run_once(self, host, client_autotest, action_sequence, repeat,
295                 usb_list=None, usb_checks=None,
296                 crash_check=False, stress_rack=False):
297        self.client_autotest = client_autotest
298        self.host = host
299        self.autotest_client = autotest.Autotest(self.host)
300        self.usb_list = usb_list
301        self.usb_checks = usb_checks
302        self.crash_check = crash_check
303
304        self.suspend_status = False
305        self.login_status = False
306        self.fail_reasons = list()
307        self.action_step = None
308
309        self.plug_port = self.prep_servo_for_test(stress_rack)
310
311        # Unplug, plug, compare usb peripherals, and leave plugged.
312        self.check_connected_peripherals()
313
314        action_sequence = action_sequence.upper()
315        actions = action_sequence.split(',')
316        boot_id = 0
317        self. detect_crash = crash_detector.CrashDetector(self.host)
318        self.detect_crash.remove_crash_files()
319
320        # Run camera client test to gather media_V4L2_test binary.
321        if 'media_v4l2_test' in str(self.usb_checks):
322            self.autotest_client.run_test("camera_V4L2")
323
324        for iteration in xrange(1, repeat + 1):
325            step = 0
326            for action in actions:
327                step += 1
328                action = action.strip()
329                self.action_step = 'STEP %d.%d. %s' % (iteration, step, action)
330                logging.info(self.action_step)
331
332                if action == 'RESUME':
333                    self.action_resume(boot_id)
334                    time.sleep(_WAIT_DELAY)
335                elif action == 'OPENLID':
336                    self.open_lid(boot_id)
337                    time.sleep(_WAIT_DELAY)
338                elif action == 'UNPLUG':
339                    self.plug_peripherals(False)
340                elif action == 'PLUG':
341                    self.plug_peripherals(True)
342                elif self.suspend_status == False:
343                    if action.startswith('LOGIN'):
344                        if self.login_status:
345                            logging.debug('Skipping login. Already logged in.')
346                            continue
347                        else:
348                            self.action_login()
349                            self.login_status = True
350                    elif action == 'REBOOT':
351                        self.host.reboot()
352                        time.sleep(_WAIT_DELAY * 3)
353                        self.login_status = False
354                    elif action == 'SUSPEND':
355                        boot_id = self.action_suspend()
356                    elif action == 'CLOSELID':
357                        boot_id = self.close_lid()
358                else:
359                    logging.info('WRONG ACTION: %s .', self.action_step)
360                self.check_status()
361
362            if self.fail_reasons:
363                raise error.TestFail('Failures reported: %s' %
364                                     str(self.fail_reasons))
365