• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4#
5# Expects to be run in an environment with sudo and no interactive password
6# prompt, such as within the Chromium OS development chroot.
7
8import os
9
10import logging, re, time, xmlrpclib
11
12from autotest_lib.client.common_lib import error
13from autotest_lib.server.cros.servo import firmware_programmer
14
15# Time to wait when probing for a usb device, it takes on avg 17 seconds
16# to do a full probe.
17_USB_PROBE_TIMEOUT = 40
18
19
20class _PowerStateController(object):
21
22    """Class to provide board-specific power operations.
23
24    This class is responsible for "power on" and "power off"
25    operations that can operate without making assumptions in
26    advance about board state.  It offers an interface that
27    abstracts out the different sequences required for different
28    board types.
29
30    """
31
32    # Constants acceptable to be passed for the `rec_mode` parameter
33    # to power_on().
34    #
35    # REC_ON:  Boot the DUT in recovery mode, i.e. boot from USB or
36    #   SD card.
37    # REC_OFF:  Boot in normal mode, i.e. boot from internal storage.
38
39    REC_ON = 'rec'
40    REC_OFF = 'on'
41
42    # Delay in seconds needed between asserting and de-asserting
43    # warm reset.
44    _RESET_HOLD_TIME = 0.5
45
46    def __init__(self, servo):
47        """Initialize the power state control.
48
49        @param servo Servo object providing the underlying `set` and `get`
50                     methods for the target controls.
51
52        """
53        self._servo = servo
54
55    def reset(self):
56        """Force the DUT to reset.
57
58        The DUT is guaranteed to be on at the end of this call,
59        regardless of its previous state, provided that there is
60        working OS software. This also guarantees that the EC has
61        been restarted.
62
63        """
64        self._servo.set_nocheck('power_state', 'reset')
65
66    def warm_reset(self):
67        """Apply warm reset to the DUT.
68
69        This asserts, then de-asserts the 'warm_reset' signal.
70        Generally, this causes the board to restart.
71
72        """
73        self._servo.set_get_all(['warm_reset:on',
74                                 'sleep:%.4f' % self._RESET_HOLD_TIME,
75                                 'warm_reset:off'])
76
77    def power_off(self):
78        """Force the DUT to power off.
79
80        The DUT is guaranteed to be off at the end of this call,
81        regardless of its previous state, provided that there is
82        working EC and boot firmware.  There is no requirement for
83        working OS software.
84
85        """
86        self._servo.set_nocheck('power_state', 'off')
87
88    def power_on(self, rec_mode=REC_OFF):
89        """Force the DUT to power on.
90
91        Prior to calling this function, the DUT must be powered off,
92        e.g. with a call to `power_off()`.
93
94        At power on, recovery mode is set as specified by the
95        corresponding argument.  When booting with recovery mode on, it
96        is the caller's responsibility to unplug/plug in a bootable
97        external storage device.
98
99        If the DUT requires a delay after powering on but before
100        processing inputs such as USB stick insertion, the delay is
101        handled by this method; the caller is not responsible for such
102        delays.
103
104        @param rec_mode Setting of recovery mode to be applied at
105                        power on. default: REC_OFF aka 'off'
106
107        """
108        self._servo.set_nocheck('power_state', rec_mode)
109
110
111class Servo(object):
112
113    """Manages control of a Servo board.
114
115    Servo is a board developed by hardware group to aide in the debug and
116    control of various partner devices. Servo's features include the simulation
117    of pressing the power button, closing the lid, and pressing Ctrl-d. This
118    class manages setting up and communicating with a servo demon (servod)
119    process. It provides both high-level functions for common servo tasks and
120    low-level functions for directly setting and reading gpios.
121
122    """
123
124    # Power button press delays in seconds.
125    #
126    # The EC specification says that 8.0 seconds should be enough
127    # for the long power press.  However, some platforms need a bit
128    # more time.  Empirical testing has found these requirements:
129    #   Alex: 8.2 seconds
130    #   ZGB:  8.5 seconds
131    # The actual value is set to the largest known necessary value.
132    #
133    # TODO(jrbarnette) Being generous is the right thing to do for
134    # existing platforms, but if this code is to be used for
135    # qualification of new hardware, we should be less generous.
136    SHORT_DELAY = 0.1
137
138    # Maximum number of times to re-read power button on release.
139    GET_RETRY_MAX = 10
140
141    # Delays to deal with DUT state transitions.
142    SLEEP_DELAY = 6
143    BOOT_DELAY = 10
144
145    # Default minimum time interval between 'press' and 'release'
146    # keyboard events.
147    SERVO_KEY_PRESS_DELAY = 0.1
148
149    # Time to toggle recovery switch on and off.
150    REC_TOGGLE_DELAY = 0.1
151
152    # Time to toggle development switch on and off.
153    DEV_TOGGLE_DELAY = 0.1
154
155    # Time between an usb disk plugged-in and detected in the system.
156    USB_DETECTION_DELAY = 10
157    # Time to keep USB power off before and after USB mux direction is changed
158    USB_POWEROFF_DELAY = 2
159
160    # Time to wait before timing out on servo initialization.
161    INIT_TIMEOUT_SECS = 10
162
163
164    def __init__(self, servo_host, servo_serial=None):
165        """Sets up the servo communication infrastructure.
166
167        @param servo_host: A ServoHost object representing
168                           the host running servod.
169        @param servo_serial: Serial number of the servo board.
170        """
171        # TODO(fdeng): crbug.com/298379
172        # We should move servo_host object out of servo object
173        # to minimize the dependencies on the rest of Autotest.
174        self._servo_host = servo_host
175        self._servo_serial = servo_serial
176        self._server = servo_host.get_servod_server_proxy()
177        self._power_state = _PowerStateController(self)
178        self._usb_state = None
179        self._programmer = None
180
181
182    @property
183    def servo_serial(self):
184        """Returns the serial number of the servo board."""
185        return self._servo_serial
186
187
188    def get_power_state_controller(self):
189        """Return the power state controller for this Servo.
190
191        The power state controller provides board-independent
192        interfaces for reset, power-on, power-off operations.
193
194        """
195        return self._power_state
196
197
198    def initialize_dut(self, cold_reset=False):
199        """Initializes a dut for testing purposes.
200
201        This sets various servo signals back to default values
202        appropriate for the target board.  By default, if the DUT
203        is already on, it stays on.  If the DUT is powered off
204        before initialization, its state afterward is unspecified.
205
206        Rationale:  Basic initialization of servo sets the lid open,
207        when there is a lid.  This operation won't affect powered on
208        units; however, setting the lid open may power on a unit
209        that's off, depending on the board type and previous state
210        of the device.
211
212        If `cold_reset` is a true value, the DUT and its EC will be
213        reset, and the DUT rebooted in normal mode.
214
215        @param cold_reset If True, cold reset the device after
216                          initialization.
217        """
218        self._server.hwinit()
219        self.set('usb_mux_oe1', 'on')
220        self._usb_state = None
221        self.switch_usbkey('off')
222        if cold_reset:
223            self._power_state.reset()
224        logging.debug('Servo initialized, version is %s',
225                      self._server.get_version())
226
227
228    def is_localhost(self):
229        """Is the servod hosted locally?
230
231        Returns:
232          True if local hosted; otherwise, False.
233        """
234        return self._servo_host.is_localhost()
235
236
237    def power_long_press(self):
238        """Simulate a long power button press."""
239        # After a long power press, the EC may ignore the next power
240        # button press (at least on Alex).  To guarantee that this
241        # won't happen, we need to allow the EC one second to
242        # collect itself.
243        self._server.power_long_press()
244
245
246    def power_normal_press(self):
247        """Simulate a normal power button press."""
248        self._server.power_normal_press()
249
250
251    def power_short_press(self):
252        """Simulate a short power button press."""
253        self._server.power_short_press()
254
255
256    def power_key(self, press_secs=''):
257        """Simulate a power button press.
258
259        @param press_secs : Str. Time to press key.
260        """
261        self._server.power_key(press_secs)
262
263
264    def lid_open(self):
265        """Simulate opening the lid and raise exception if all attempts fail"""
266        self.set('lid_open', 'yes')
267
268
269    def lid_close(self):
270        """Simulate closing the lid and raise exception if all attempts fail
271
272        Waits 6 seconds to ensure the device is fully asleep before returning.
273        """
274        self.set('lid_open', 'no')
275        time.sleep(Servo.SLEEP_DELAY)
276
277    def volume_up(self, timeout=300):
278        """Simulate pushing the volume down button.
279
280        @param timeout: Timeout for setting the volume.
281        """
282        self.set_get_all(['volume_up:yes',
283                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
284                          'volume_up:no'])
285        # we need to wait for commands to take effect before moving on
286        time_left = float(timeout)
287        while time_left > 0.0:
288            value = self.get('volume_up')
289            if value == 'no':
290                return
291            time.sleep(self.SHORT_DELAY)
292            time_left = time_left - self.SHORT_DELAY
293        raise error.TestFail("Failed setting volume_up to no")
294
295    def volume_down(self, timeout=300):
296        """Simulate pushing the volume down button.
297
298        @param timeout: Timeout for setting the volume.
299        """
300        self.set_get_all(['volume_down:yes',
301                          'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY,
302                          'volume_down:no'])
303        # we need to wait for commands to take effect before moving on
304        time_left = float(timeout)
305        while time_left > 0.0:
306            value = self.get('volume_down')
307            if value == 'no':
308                return
309            time.sleep(self.SHORT_DELAY)
310            time_left = time_left - self.SHORT_DELAY
311        raise error.TestFail("Failed setting volume_down to no")
312
313    def ctrl_d(self, press_secs=''):
314        """Simulate Ctrl-d simultaneous button presses.
315
316        @param press_secs : Str. Time to press key.
317        """
318        self._server.ctrl_d(press_secs)
319
320
321    def ctrl_u(self):
322        """Simulate Ctrl-u simultaneous button presses.
323
324        @param press_secs : Str. Time to press key.
325        """
326        self._server.ctrl_u()
327
328
329    def ctrl_enter(self, press_secs=''):
330        """Simulate Ctrl-enter simultaneous button presses.
331
332        @param press_secs : Str. Time to press key.
333        """
334        self._server.ctrl_enter(press_secs)
335
336
337    def d_key(self, press_secs=''):
338        """Simulate Enter key button press.
339
340        @param press_secs : Str. Time to press key.
341        """
342        self._server.d_key(press_secs)
343
344
345    def ctrl_key(self, press_secs=''):
346        """Simulate Enter key button press.
347
348        @param press_secs : Str. Time to press key.
349        """
350        self._server.ctrl_key(press_secs)
351
352
353    def enter_key(self, press_secs=''):
354        """Simulate Enter key button press.
355
356        @param press_secs : Str. Time to press key.
357        """
358        self._server.enter_key(press_secs)
359
360
361    def refresh_key(self, press_secs=''):
362        """Simulate Refresh key (F3) button press.
363
364        @param press_secs : Str. Time to press key.
365        """
366        self._server.refresh_key(press_secs)
367
368
369    def ctrl_refresh_key(self, press_secs=''):
370        """Simulate Ctrl and Refresh (F3) simultaneous press.
371
372        This key combination is an alternative of Space key.
373
374        @param press_secs : Str. Time to press key.
375        """
376        self._server.ctrl_refresh_key(press_secs)
377
378
379    def imaginary_key(self, press_secs=''):
380        """Simulate imaginary key button press.
381
382        Maps to a key that doesn't physically exist.
383
384        @param press_secs : Str. Time to press key.
385        """
386        self._server.imaginary_key(press_secs)
387
388
389    def sysrq_x(self, press_secs=''):
390        """Simulate Alt VolumeUp X simulataneous press.
391
392        This key combination is the kernel system request (sysrq) X.
393
394        @param press_secs : Str. Time to press key.
395        """
396        self._server.sysrq_x(press_secs)
397
398
399    def toggle_recovery_switch(self):
400        """Toggle recovery switch on and off."""
401        self.enable_recovery_mode()
402        time.sleep(self.REC_TOGGLE_DELAY)
403        self.disable_recovery_mode()
404
405
406    def enable_recovery_mode(self):
407        """Enable recovery mode on device."""
408        self.set('rec_mode', 'on')
409
410
411    def disable_recovery_mode(self):
412        """Disable recovery mode on device."""
413        self.set('rec_mode', 'off')
414
415
416    def toggle_development_switch(self):
417        """Toggle development switch on and off."""
418        self.enable_development_mode()
419        time.sleep(self.DEV_TOGGLE_DELAY)
420        self.disable_development_mode()
421
422
423    def enable_development_mode(self):
424        """Enable development mode on device."""
425        self.set('dev_mode', 'on')
426
427
428    def disable_development_mode(self):
429        """Disable development mode on device."""
430        self.set('dev_mode', 'off')
431
432    def boot_devmode(self):
433        """Boot a dev-mode device that is powered off."""
434        self.power_short_press()
435        self.pass_devmode()
436
437
438    def pass_devmode(self):
439        """Pass through boot screens in dev-mode."""
440        time.sleep(Servo.BOOT_DELAY)
441        self.ctrl_d()
442        time.sleep(Servo.BOOT_DELAY)
443
444
445    def get_board(self):
446        """Get the board connected to servod."""
447        return self._server.get_board()
448
449
450    def get_base_board(self):
451        """Get the board of the base connected to servod."""
452        try:
453            return self._server.get_base_board()
454        except  xmlrpclib.Fault as e:
455            # TODO(waihong): Remove the following compatibility check when
456            # the new versions of hdctools are deployed.
457            if 'not supported' in str(e):
458                logging.warning('The servod is too old that get_base_board '
459                        'not supported.')
460                return ''
461            raise
462
463
464    def get_ec_active_copy(self):
465        """Get the active copy of the EC image."""
466        return self.get('ec_active_copy')
467
468
469    def _get_xmlrpclib_exception(self, xmlexc):
470        """Get meaningful exception string from xmlrpc.
471
472        Args:
473            xmlexc: xmlrpclib.Fault object
474
475        xmlrpclib.Fault.faultString has the following format:
476
477        <type 'exception type'>:'actual error message'
478
479        Parse and return the real exception from the servod side instead of the
480        less meaningful one like,
481           <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no
482           attribute 'hw_driver'">
483
484        Returns:
485            string of underlying exception raised in servod.
486        """
487        return re.sub('^.*>:', '', xmlexc.faultString)
488
489
490    def get(self, gpio_name):
491        """Get the value of a gpio from Servod.
492
493        @param gpio_name Name of the gpio.
494        """
495        assert gpio_name
496        try:
497            return self._server.get(gpio_name)
498        except  xmlrpclib.Fault as e:
499            err_msg = "Getting '%s' :: %s" % \
500                (gpio_name, self._get_xmlrpclib_exception(e))
501            raise error.TestFail(err_msg)
502
503
504    def set(self, gpio_name, gpio_value):
505        """Set and check the value of a gpio using Servod.
506
507        @param gpio_name Name of the gpio.
508        @param gpio_value New setting for the gpio.
509        """
510        self.set_nocheck(gpio_name, gpio_value)
511        retry_count = Servo.GET_RETRY_MAX
512        while gpio_value != self.get(gpio_name) and retry_count:
513            logging.warning("%s != %s, retry %d", gpio_name, gpio_value,
514                         retry_count)
515            retry_count -= 1
516            time.sleep(Servo.SHORT_DELAY)
517        if not retry_count:
518            assert gpio_value == self.get(gpio_name), \
519                'Servo failed to set %s to %s' % (gpio_name, gpio_value)
520
521
522    def set_nocheck(self, gpio_name, gpio_value):
523        """Set the value of a gpio using Servod.
524
525        @param gpio_name Name of the gpio.
526        @param gpio_value New setting for the gpio.
527        """
528        assert gpio_name and gpio_value
529        logging.info('Setting %s to %s', gpio_name, gpio_value)
530        try:
531            self._server.set(gpio_name, gpio_value)
532        except  xmlrpclib.Fault as e:
533            err_msg = "Setting '%s' to '%s' :: %s" % \
534                (gpio_name, gpio_value, self._get_xmlrpclib_exception(e))
535            raise error.TestFail(err_msg)
536
537
538    def set_get_all(self, controls):
539        """Set &| get one or more control values.
540
541        @param controls: list of strings, controls to set &| get.
542
543        @raise: error.TestError in case error occurs setting/getting values.
544        """
545        rv = []
546        try:
547            logging.info('Set/get all: %s', str(controls))
548            rv = self._server.set_get_all(controls)
549        except xmlrpclib.Fault as e:
550            # TODO(waihong): Remove the following backward compatibility when
551            # the new versions of hdctools are deployed.
552            if 'not supported' in str(e):
553                logging.warning('The servod is too old that set_get_all '
554                        'not supported. Use set and get instead.')
555                for control in controls:
556                    if ':' in control:
557                        (name, value) = control.split(':')
558                        if name == 'sleep':
559                            time.sleep(float(value))
560                        else:
561                            self.set_nocheck(name, value)
562                        rv.append(True)
563                    else:
564                        rv.append(self.get(name))
565            else:
566                err_msg = "Problem with '%s' :: %s" % \
567                    (controls, self._get_xmlrpclib_exception(e))
568                raise error.TestFail(err_msg)
569        return rv
570
571
572    # TODO(waihong) It may fail if multiple servo's are connected to the same
573    # host. Should look for a better way, like the USB serial name, to identify
574    # the USB device.
575    # TODO(sbasi) Remove this code from autoserv once firmware tests have been
576    # updated.
577    def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT):
578        """Probe the USB disk device plugged-in the servo from the host side.
579
580        It uses servod to discover if there is a usb device attached to it.
581
582        @param timeout The timeout period when probing for the usb host device.
583
584        @return: String of USB disk path (e.g. '/dev/sdb') or None.
585        """
586        return self._server.probe_host_usb_dev(timeout) or None
587
588
589    def image_to_servo_usb(self, image_path=None,
590                           make_image_noninteractive=False):
591        """Install an image to the USB key plugged into the servo.
592
593        This method may copy any image to the servo USB key including a
594        recovery image or a test image.  These images are frequently used
595        for test purposes such as restoring a corrupted image or conducting
596        an upgrade of ec/fw/kernel as part of a test of a specific image part.
597
598        @param image_path Path on the host to the recovery image.
599        @param make_image_noninteractive Make the recovery image
600                                   noninteractive, therefore the DUT
601                                   will reboot automatically after
602                                   installation.
603        """
604        # We're about to start plugging/unplugging the USB key.  We
605        # don't know the state of the DUT, or what it might choose
606        # to do to the device after hotplug.  To avoid surprises,
607        # force the DUT to be off.
608        self._server.hwinit()
609        self._power_state.power_off()
610
611        # Set up Servo's usb mux.
612        self.switch_usbkey('host')
613        if image_path:
614            logging.info('Searching for usb device and copying image to it. '
615                         'Please wait a few minutes...')
616            if not self._server.download_image_to_usb(image_path):
617                logging.error('Failed to transfer requested image to USB. '
618                              'Please take a look at Servo Logs.')
619                raise error.AutotestError('Download image to usb failed.')
620            if make_image_noninteractive:
621                logging.info('Making image noninteractive')
622                if not self._server.make_image_noninteractive():
623                    logging.error('Failed to make image noninteractive. '
624                                  'Please take a look at Servo Logs.')
625
626
627    def install_recovery_image(self, image_path=None,
628                               make_image_noninteractive=False):
629        """Install the recovery image specified by the path onto the DUT.
630
631        This method uses google recovery mode to install a recovery image
632        onto a DUT through the use of a USB stick that is mounted on a servo
633        board specified by the usb_dev.  If no image path is specified
634        we use the recovery image already on the usb image.
635
636        @param image_path: Path on the host to the recovery image.
637        @param make_image_noninteractive: Make the recovery image
638                noninteractive, therefore the DUT will reboot automatically
639                after installation.
640        """
641        self.image_to_servo_usb(image_path, make_image_noninteractive)
642        self._power_state.power_on(rec_mode=self._power_state.REC_ON)
643        self.switch_usbkey('dut')
644
645
646    def _scp_image(self, image_path):
647        """Copy image to the servo host.
648
649        When programming a firmware image on the DUT, the image must be
650        located on the host to which the servo device is connected. Sometimes
651        servo is controlled by a remote host, in this case the image needs to
652        be transferred to the remote host.
653
654        @param image_path: a string, name of the firmware image file to be
655               transferred.
656        @return: a string, full path name of the copied file on the remote.
657        """
658
659        dest_path = os.path.join('/tmp', os.path.basename(image_path))
660        self._servo_host.send_file(image_path, dest_path)
661        return dest_path
662
663
664    def system(self, command, timeout=3600):
665        """Execute the passed in command on the servod host.
666
667        @param command Command to be executed.
668        @param timeout Maximum number of seconds of runtime allowed. Default to
669                       1 hour.
670        """
671        logging.info('Will execute on servo host: %s', command)
672        self._servo_host.run(command, timeout=timeout)
673
674
675    def system_output(self, command, timeout=3600,
676                      ignore_status=False, args=()):
677        """Execute the passed in command on the servod host, return stdout.
678
679        @param command a string, the command to execute
680        @param timeout an int, max number of seconds to wait til command
681               execution completes. Default to 1 hour.
682        @param ignore_status a Boolean, if true - ignore command's nonzero exit
683               status, otherwise an exception will be thrown
684        @param args a tuple of strings, each becoming a separate command line
685               parameter for the command
686        @return command's stdout as a string.
687        """
688        return self._servo_host.run(command, timeout=timeout,
689                                    ignore_status=ignore_status,
690                                    args=args).stdout.strip()
691
692
693    def get_servo_version(self):
694        """Get the version of the servo, e.g., servo_v2 or servo_v3.
695
696        @return: The version of the servo.
697
698        """
699        return self._server.get_version()
700
701
702    def _initialize_programmer(self, rw_only=False):
703        """Initialize the firmware programmer.
704
705        @param rw_only: True to initialize a programmer which only
706                        programs the RW portions.
707        """
708        if self._programmer:
709            return
710        # Initialize firmware programmer
711        servo_version = self.get_servo_version()
712        if servo_version.startswith('servo_v2'):
713            self._programmer = firmware_programmer.ProgrammerV2(self)
714            self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self)
715        # Both servo v3 and v4 use the same programming methods so just leverage
716        # ProgrammerV3 for servo v4 as well.
717        elif (servo_version.startswith('servo_v3') or
718              servo_version.startswith('servo_v4')):
719            self._programmer = firmware_programmer.ProgrammerV3(self)
720            self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self)
721        else:
722            raise error.TestError(
723                    'No firmware programmer for servo version: %s' %
724                         servo_version)
725
726
727    def program_bios(self, image, rw_only=False):
728        """Program bios on DUT with given image.
729
730        @param image: a string, file name of the BIOS image to program
731                      on the DUT.
732        @param rw_only: True to only program the RW portion of BIOS.
733
734        """
735        self._initialize_programmer()
736        if not self.is_localhost():
737            image = self._scp_image(image)
738        if rw_only:
739            self._programmer_rw.program_bios(image)
740        else:
741            self._programmer.program_bios(image)
742
743
744    def program_ec(self, image, rw_only=False):
745        """Program ec on DUT with given image.
746
747        @param image: a string, file name of the EC image to program
748                      on the DUT.
749        @param rw_only: True to only program the RW portion of EC.
750
751        """
752        self._initialize_programmer()
753        if not self.is_localhost():
754            image = self._scp_image(image)
755        if rw_only:
756           self._programmer_rw.program_ec(image)
757        else:
758           self._programmer.program_ec(image)
759
760
761    def _switch_usbkey_power(self, power_state, detection_delay=False):
762        """Switch usbkey power.
763
764        This function switches usbkey power by setting the value of
765        'prtctl4_pwren'. If the power is already in the
766        requested state, this function simply returns.
767
768        @param power_state: A string, 'on' or 'off'.
769        @param detection_delay: A boolean value, if True, sleep
770                                for |USB_DETECTION_DELAY| after switching
771                                the power on.
772        """
773        # TODO(kevcheng): Forgive me for this terrible hack. This is just to
774        # handle beaglebones that haven't yet updated and have the
775        # safe_switch_usbkey_power RPC.  I'll remove this once all beaglebones
776        # have been updated and also think about a better way to handle
777        # situations like this.
778        try:
779            self._server.safe_switch_usbkey_power(power_state)
780        except Exception:
781            self.set('prtctl4_pwren', power_state)
782        if power_state == 'off':
783            time.sleep(self.USB_POWEROFF_DELAY)
784        elif detection_delay:
785            time.sleep(self.USB_DETECTION_DELAY)
786
787
788    def switch_usbkey(self, usb_state):
789        """Connect USB flash stick to either host or DUT, or turn USB port off.
790
791        This function switches the servo multiplexer to provide electrical
792        connection between the USB port J3 and either host or DUT side. It
793        can also be used to turn the USB port off.
794
795        Switching to 'dut' or 'host' is accompanied by powercycling
796        of the USB stick, because it sometimes gets wedged if the mux
797        is switched while the stick power is on.
798
799        @param usb_state: A string, one of 'dut', 'host', or 'off'.
800                          'dut' and 'host' indicate which side the
801                          USB flash device is required to be connected to.
802                          'off' indicates turning the USB port off.
803
804        @raise: error.TestError in case the parameter is not 'dut'
805                'host', or 'off'.
806        """
807        if self.get_usbkey_direction() == usb_state:
808            return
809
810        if usb_state == 'off':
811            self._switch_usbkey_power('off')
812            self._usb_state = usb_state
813            return
814        elif usb_state == 'host':
815            mux_direction = 'servo_sees_usbkey'
816        elif usb_state == 'dut':
817            mux_direction = 'dut_sees_usbkey'
818        else:
819            raise error.TestError('Unknown USB state request: %s' % usb_state)
820
821        self._switch_usbkey_power('off')
822        # TODO(kevcheng): Forgive me for this terrible hack. This is just to
823        # handle beaglebones that haven't yet updated and have the
824        # safe_switch_usbkey RPC.  I'll remove this once all beaglebones have
825        # been updated and also think about a better way to handle situations
826        # like this.
827        try:
828            self._server.safe_switch_usbkey(mux_direction)
829        except Exception:
830            self.set('usb_mux_sel1', mux_direction)
831        time.sleep(self.USB_POWEROFF_DELAY)
832        self._switch_usbkey_power('on', usb_state == 'host')
833        self._usb_state = usb_state
834
835
836    def get_usbkey_direction(self):
837        """Get which side USB is connected to or 'off' if usb power is off.
838
839        @return: A string, one of 'dut', 'host', or 'off'.
840        """
841        if not self._usb_state:
842            if self.get('prtctl4_pwren') == 'off':
843                self._usb_state = 'off'
844            elif self.get('usb_mux_sel1').startswith('dut'):
845                self._usb_state = 'dut'
846            else:
847                self._usb_state = 'host'
848        return self._usb_state
849