• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging, threading, time
6
7from autotest_lib.client.common_lib.cros import crash_detector
8from autotest_lib.server import autotest, test
9from autotest_lib.client.common_lib import error
10
11_CLIENT_TERMINATION_FILE_PATH = '/tmp/simple_login_exit'
12_LONG_TIMEOUT = 200
13_LOWER_USB_PORT = 'usb_mux_sel3'
14_SUSPEND_TIME = 30
15_UPPER_USB_PORT = 'usb_mux_sel1'
16_WAIT_DELAY = 15
17
18
19class platform_ExternalUsbPeripherals(test.test):
20    """Uses servo to repeatedly connect/remove USB devices during boot."""
21    version = 1
22
23
24    def getPluggedUsbDevices(self):
25        """Determines the external USB devices plugged
26
27        @returns plugged_list: List of plugged usb devices names
28
29        """
30        lsusb_output = self.host.run('lsusb').stdout.strip()
31        items = lsusb_output.split('\n')
32        plugged_list = []
33        unnamed_device_count = 1
34        for item in items:
35            columns = item.split(' ')
36            if len(columns) == 6 or len(' '.join(columns[6:]).strip()) == 0:
37                logging.debug('Unnamed device located, adding generic name.')
38                name = 'Unnamed device %d' % unnamed_device_count
39                unnamed_device_count += 1
40            else:
41                name = ' '.join(columns[6:]).strip()
42            plugged_list.append(name)
43        return plugged_list
44
45
46    def plug_peripherals(self, on=True):
47        """Setting USB mux_3 plug status
48
49        @param on: Connect the servo-usb port to DUT(plug) or to servo(unplug)
50
51        """
52        switch = 'dut_sees_usbkey'
53        if not on:
54            switch = 'servo_sees_usbkey'
55        self.host.servo.set(self.plug_port, switch)
56        self.pluged_status = on
57
58
59    def client_login(self, client_exit):
60        """Login i.e. runs running client test
61
62        @exception TestFail failed to login within timeout.
63
64        """
65        self.autotest_client.run_test(self.client_autotest,
66                                      exit_without_logout=client_exit)
67
68
69    def action_login(self, login_client_exit=True):
70        """Login i.e. runs running client login
71
72        @param login_client_exit: Exit after login flag.
73
74        """
75        thread = threading.Thread(target=self.client_login,
76                                  args = (login_client_exit, ))
77        thread.start()
78        time.sleep(_WAIT_DELAY)
79
80
81    def action_logout(self):
82        """Logout i.e. runs running client test
83
84        @exception TestFail failed to logout within timeout.
85
86        """
87        self.host.run('touch %s' % _CLIENT_TERMINATION_FILE_PATH)
88        time.sleep(_WAIT_DELAY)
89
90
91    def wait_for_cmd_output(self, cmd, check, timeout, timeout_msg):
92        """Waits till command output is meta
93
94        @param cmd: executed command
95        @param check: string to be checked for in cmd output
96        @param timeout: max time in sec to wait for output
97        @param timeout_msg: timeout failure message
98
99        @returns True if check is found in command output; False otherwise
100        """
101        start_time = int(time.time())
102        time_delta = 0
103        command = '%s %s' % (cmd, check)
104        logging.debug('Command: %s', command)
105        while(self.host.run(command, ignore_status=True).exit_status != 0):
106            time_delta = int(time.time()) - start_time
107            if time_delta > timeout:
108                self.add_failure('%s - %d sec' % (timeout_msg, timeout))
109                return False
110            time.sleep(0.5)
111        logging.debug('Succeeded in :%d sec', time_delta)
112        return True
113
114
115    def suspend_for_time(self, suspend_time=_SUSPEND_TIME):
116        """Calls the host method suspend with suspend_time argument.
117
118        @param suspend_time: time to suspend the device for.
119
120        """
121        try:
122            self.host.suspend(suspend_time=suspend_time)
123        except error.AutoservSuspendError:
124            pass
125
126
127    def action_suspend(self):
128        """Suspend i.e. powerd_dbus_suspend and wait
129
130        @returns boot_id for the following resume
131        """
132        boot_id = self.host.get_boot_id()
133        thread = threading.Thread(target=self.suspend_for_time)
134        thread.start()
135        self.host.test_wait_for_sleep(_LONG_TIMEOUT)
136        logging.debug('--- Suspended')
137        self.suspend_status = True
138        return boot_id
139
140
141    def action_resume(self, boot_id):
142        """Resume i.e. press power key and wait
143
144        @param boot_id: boot id obtained prior to suspending
145
146        """
147        self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
148        logging.debug('--- Resumed')
149        self.suspend_status = False
150
151
152    def close_lid(self):
153        """Close lid through servo to suspend the device."""
154        boot_id = self.host.get_boot_id()
155        logging.info('Closing lid...')
156        self.host.servo.lid_close()
157        self.host.test_wait_for_sleep(_LONG_TIMEOUT)
158        self.suspend_status = True
159        return boot_id
160
161
162    def open_lid(self, boot_id):
163        """Open lid through servo to resume."""
164        logging.info('Opening lid...')
165        self.host.servo.lid_open()
166        self.host.test_wait_for_resume(boot_id, _LONG_TIMEOUT)
167        self.suspend_status = False
168
169
170    def crash_not_detected(self):
171        """Finds new crash files and adds to failures list if any
172
173        @returns True if there were not crashes; False otherwise
174        """
175        crash_files = self.detect_crash.get_new_crash_files()
176        if crash_files:
177            self.add_failure('CRASH DETECTED: %s' % str(crash_files))
178            return False
179        return True
180
181
182    def check_plugged_usb_devices(self):
183        """Checks the plugged peripherals match device list.
184
185        @returns True if expected USB peripherals are detected; False otherwise
186        """
187        result = True
188        if self.pluged_status and self.usb_list != None:
189            # Check for mandatory USb devices passed by usb_list flag
190            for usb_name in self.usb_list:
191                found = self.wait_for_cmd_output(
192                    'lsusb | grep -E ', usb_name, _WAIT_DELAY * 4,
193                    'Not detecting %s' % usb_name)
194                result = result and found
195        time.sleep(_WAIT_DELAY)
196        on_now = self.getPluggedUsbDevices()
197        if self.pluged_status:
198            if not self.diff_list.issubset(on_now):
199                missing = str(self.diff_list.difference(on_now))
200                self.add_failure('Missing connected peripheral(s) '
201                                 'when plugged: %s ' % missing)
202                result = False
203        else:
204            present = self.diff_list.intersection(on_now)
205            if len(present) > 0:
206                self.add_failure('Still presented peripheral(s) '
207                                 'when unplugged: %s ' % str(present))
208                result = False
209        return result
210
211
212    def check_usb_peripherals_details(self):
213        """Checks the effect from plugged in USB peripherals.
214
215        @returns True if command line output is matched successfuly; Else False
216        """
217        usb_check_result = True
218        for cmd in self.usb_checks.keys():
219            out_match_list = self.usb_checks.get(cmd)
220            if cmd.startswith('loggedin:'):
221                if not self.login_status:
222                    continue
223                cmd = cmd.replace('loggedin:', '')
224            board = self.host.get_board().split(':')[1].lower()
225            # Run the usb check command
226            for out_match in out_match_list:
227                # Skip running media_v4l2_test on hana boards
228                # crbug.com/820500
229                if 'media_v4l2_test' in cmd and board in ["hana"]:
230                    continue
231                match_result = self.wait_for_cmd_output(
232                    cmd, out_match, _WAIT_DELAY * 4,
233                    'USB CHECKS DETAILS failed at %s %s:' % (cmd, out_match))
234                usb_check_result = usb_check_result and match_result
235        return usb_check_result
236
237
238    def check_status(self):
239        """Performs checks after each action:
240            - for USB detected devices
241            - for generated crash files
242            - peripherals effect checks on cmd line
243
244        @returns True if all of the iteration checks pass; False otherwise.
245        """
246        result = True
247        if not self.suspend_status:
248            # Detect the USB peripherals
249            result = self.check_plugged_usb_devices()
250            # Check for crash files
251            if self.crash_check:
252                result = result and self.crash_not_detected()
253            if self.pluged_status and (self.usb_checks != None):
254                # Check for plugged USB devices details
255                result = result and self.check_usb_peripherals_details()
256        return result
257
258
259    def add_failure(self, reason):
260        """ Adds a failure reason to list of failures to be reported at end
261
262        @param reason: failure reason to record
263
264        """
265        if self.action_step is not None:
266            self.fail_reasons.append('%s FAILS - %s' %
267                                     (self.action_step, reason))
268
269
270    def check_connected_peripherals(self):
271        """ Verifies there are connected usb devices on servo
272
273        @raise error.TestError: if no peripherals are shown
274        """
275
276        # Collect USB peripherals when unplugged
277        self.plug_peripherals(False)
278        time.sleep(_WAIT_DELAY)
279        off_list = self.getPluggedUsbDevices()
280
281        # Collect USB peripherals when plugged
282        self.plug_peripherals(True)
283        time.sleep(_WAIT_DELAY * 2)
284        on_list = self.getPluggedUsbDevices()
285
286        self.diff_list = set(on_list).difference(set(off_list))
287        if len(self.diff_list) == 0:
288            # Fail if no devices detected after
289            raise error.TestError('No connected devices were detected. Make '
290                                  'sure the devices are connected to USB_KEY '
291                                  'and DUT_HUB1_USB on the servo board.')
292        logging.debug('Connected devices list: %s', self.diff_list)
293
294
295    def prep_servo_for_test(self, stress_rack):
296        """Connects servo to DUT  and sets servo ports
297
298        @param stress_rack: either to prep servo for stress tests, where
299        usb_mux_1 port should be on. For usb peripherals on usb_mux_3,
300        the port is on, and the oe2,oe4 poers are off.
301
302        @returns port as string to plug/unplug the specific port
303        """
304        port = _LOWER_USB_PORT
305        self.host.servo.switch_usbkey('dut')
306        self.host.servo.set('dut_hub1_rst1','off')
307        if stress_rack:
308            port = _UPPER_USB_PORT
309            self.host.servo.set(port, 'dut_sees_usbkey')
310        else:
311            self.host.servo.set(_UPPER_USB_PORT, 'servo_sees_usbkey')
312            self.host.servo.set('usb_mux_oe2', 'off')
313            self.host.servo.set('usb_mux_oe4', 'off')
314        time.sleep(_WAIT_DELAY)
315        return port
316
317
318    def cleanup(self):
319        """Disconnect servo hub"""
320        self.plug_peripherals(False)
321        self.action_logout()
322        self.host.servo.set('dut_hub1_rst1','on')
323        self.host.run('reboot now', ignore_status=True)
324        self.host.test_wait_for_boot()
325
326
327    def run_once(self, host, client_autotest, action_sequence, repeat,
328                 usb_list=None, usb_checks=None,
329                 crash_check=False, stress_rack=False):
330        self.client_autotest = client_autotest
331        self.host = host
332        self.autotest_client = autotest.Autotest(self.host)
333        self.usb_list = usb_list
334        self.usb_checks = usb_checks
335        self.crash_check = crash_check
336
337        self.suspend_status = False
338        self.login_status = False
339        self.fail_reasons = list()
340        self.action_step = None
341
342        self.plug_port = self.prep_servo_for_test(stress_rack)
343
344        # Unplug, plug, compare usb peripherals, and leave plugged.
345        self.check_connected_peripherals()
346
347        action_sequence = action_sequence.upper()
348        # Check for if board type is NOT chromebook and skip lid_close_open tests
349        if (not (host.get_board_type() == 'CHROMEBOOK')) and \
350                ('CLOSELID' in action_sequence):
351            raise error.TestNAError('No lid on DUT. Test Skipped')
352        actions = action_sequence.split(',')
353        boot_id = 0
354        self.detect_crash = crash_detector.CrashDetector(self.host)
355        self.detect_crash.remove_crash_files()
356
357        # Run camera client test to gather media_V4L2_test binary.
358        if 'media_v4l2_test' in str(self.usb_checks):
359            self.autotest_client.run_test("camera_V4L2")
360
361        for iteration in xrange(1, repeat + 1):
362            step = 0
363            for action in actions:
364                step += 1
365                action = action.strip()
366                self.action_step = 'STEP %d.%d. %s' % (iteration, step, action)
367                logging.info(self.action_step)
368
369                if action == 'RESUME':
370                    self.action_resume(boot_id)
371                    time.sleep(_WAIT_DELAY)
372                elif action == 'OPENLID':
373                    self.open_lid(boot_id)
374                    time.sleep(_WAIT_DELAY)
375                elif action == 'UNPLUG':
376                    self.plug_peripherals(False)
377                elif action == 'PLUG':
378                    self.plug_peripherals(True)
379                elif self.suspend_status == False:
380                    if action.startswith('LOGIN'):
381                        if self.login_status:
382                            logging.debug('Skipping login. Already logged in.')
383                            continue
384                        else:
385                            self.action_login('LOGOUT' not in actions)
386                            self.login_status = True
387                    elif action == 'LOGOUT':
388                        if self.login_status:
389                            self.action_logout()
390                            self.login_status = False
391                        else:
392                            logging.debug('Skipping logout. Not logged in.')
393                    elif action == 'REBOOT':
394                        self.host.reboot()
395                        time.sleep(_WAIT_DELAY * 3)
396                        self.login_status = False
397                    elif action == 'SUSPEND':
398                        boot_id = self.action_suspend()
399                    elif action == 'CLOSELID':
400                        boot_id = self.close_lid()
401                else:
402                    logging.info('WRONG ACTION: %s .', self.action_step)
403                self.check_status()
404
405            if self.fail_reasons:
406                raise error.TestFail('Failures reported: %s' %
407                                     str(self.fail_reasons))
408