1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4# 5# Expects to be run in an environment with sudo and no interactive password 6# prompt, such as within the Chromium OS development chroot. 7 8import os 9 10import logging, re, time, xmlrpclib 11 12from autotest_lib.client.common_lib import error 13from autotest_lib.server.cros.servo import firmware_programmer 14 15# Time to wait when probing for a usb device, it takes on avg 17 seconds 16# to do a full probe. 17_USB_PROBE_TIMEOUT = 40 18 19 20class _PowerStateController(object): 21 22 """Class to provide board-specific power operations. 23 24 This class is responsible for "power on" and "power off" 25 operations that can operate without making assumptions in 26 advance about board state. It offers an interface that 27 abstracts out the different sequences required for different 28 board types. 29 30 """ 31 32 # Constants acceptable to be passed for the `rec_mode` parameter 33 # to power_on(). 34 # 35 # REC_ON: Boot the DUT in recovery mode, i.e. boot from USB or 36 # SD card. 37 # REC_OFF: Boot in normal mode, i.e. boot from internal storage. 38 39 REC_ON = 'rec' 40 REC_OFF = 'on' 41 42 # Delay in seconds needed between asserting and de-asserting 43 # warm reset. 44 _RESET_HOLD_TIME = 0.5 45 46 def __init__(self, servo): 47 """Initialize the power state control. 48 49 @param servo Servo object providing the underlying `set` and `get` 50 methods for the target controls. 51 52 """ 53 self._servo = servo 54 55 def reset(self): 56 """Force the DUT to reset. 57 58 The DUT is guaranteed to be on at the end of this call, 59 regardless of its previous state, provided that there is 60 working OS software. This also guarantees that the EC has 61 been restarted. 62 63 """ 64 self._servo.set_nocheck('power_state', 'reset') 65 66 def warm_reset(self): 67 """Apply warm reset to the DUT. 68 69 This asserts, then de-asserts the 'warm_reset' signal. 70 Generally, this causes the board to restart. 71 72 """ 73 self._servo.set_get_all(['warm_reset:on', 74 'sleep:%.4f' % self._RESET_HOLD_TIME, 75 'warm_reset:off']) 76 77 def power_off(self): 78 """Force the DUT to power off. 79 80 The DUT is guaranteed to be off at the end of this call, 81 regardless of its previous state, provided that there is 82 working EC and boot firmware. There is no requirement for 83 working OS software. 84 85 """ 86 self._servo.set_nocheck('power_state', 'off') 87 88 def power_on(self, rec_mode=REC_OFF): 89 """Force the DUT to power on. 90 91 Prior to calling this function, the DUT must be powered off, 92 e.g. with a call to `power_off()`. 93 94 At power on, recovery mode is set as specified by the 95 corresponding argument. When booting with recovery mode on, it 96 is the caller's responsibility to unplug/plug in a bootable 97 external storage device. 98 99 If the DUT requires a delay after powering on but before 100 processing inputs such as USB stick insertion, the delay is 101 handled by this method; the caller is not responsible for such 102 delays. 103 104 @param rec_mode Setting of recovery mode to be applied at 105 power on. default: REC_OFF aka 'off' 106 107 """ 108 self._servo.set_nocheck('power_state', rec_mode) 109 110 111class Servo(object): 112 113 """Manages control of a Servo board. 114 115 Servo is a board developed by hardware group to aide in the debug and 116 control of various partner devices. Servo's features include the simulation 117 of pressing the power button, closing the lid, and pressing Ctrl-d. This 118 class manages setting up and communicating with a servo demon (servod) 119 process. It provides both high-level functions for common servo tasks and 120 low-level functions for directly setting and reading gpios. 121 122 """ 123 124 # Power button press delays in seconds. 125 # 126 # The EC specification says that 8.0 seconds should be enough 127 # for the long power press. However, some platforms need a bit 128 # more time. Empirical testing has found these requirements: 129 # Alex: 8.2 seconds 130 # ZGB: 8.5 seconds 131 # The actual value is set to the largest known necessary value. 132 # 133 # TODO(jrbarnette) Being generous is the right thing to do for 134 # existing platforms, but if this code is to be used for 135 # qualification of new hardware, we should be less generous. 136 SHORT_DELAY = 0.1 137 138 # Maximum number of times to re-read power button on release. 139 GET_RETRY_MAX = 10 140 141 # Delays to deal with DUT state transitions. 142 SLEEP_DELAY = 6 143 BOOT_DELAY = 10 144 145 # Default minimum time interval between 'press' and 'release' 146 # keyboard events. 147 SERVO_KEY_PRESS_DELAY = 0.1 148 149 # Time to toggle recovery switch on and off. 150 REC_TOGGLE_DELAY = 0.1 151 152 # Time to toggle development switch on and off. 153 DEV_TOGGLE_DELAY = 0.1 154 155 # Time between an usb disk plugged-in and detected in the system. 156 USB_DETECTION_DELAY = 10 157 # Time to keep USB power off before and after USB mux direction is changed 158 USB_POWEROFF_DELAY = 2 159 160 # Time to wait before timing out on servo initialization. 161 INIT_TIMEOUT_SECS = 10 162 163 164 def __init__(self, servo_host, servo_serial=None): 165 """Sets up the servo communication infrastructure. 166 167 @param servo_host: A ServoHost object representing 168 the host running servod. 169 @param servo_serial: Serial number of the servo board. 170 """ 171 # TODO(fdeng): crbug.com/298379 172 # We should move servo_host object out of servo object 173 # to minimize the dependencies on the rest of Autotest. 174 self._servo_host = servo_host 175 self._servo_serial = servo_serial 176 self._server = servo_host.get_servod_server_proxy() 177 self._power_state = _PowerStateController(self) 178 self._usb_state = None 179 self._programmer = None 180 181 182 @property 183 def servo_serial(self): 184 """Returns the serial number of the servo board.""" 185 return self._servo_serial 186 187 188 def get_power_state_controller(self): 189 """Return the power state controller for this Servo. 190 191 The power state controller provides board-independent 192 interfaces for reset, power-on, power-off operations. 193 194 """ 195 return self._power_state 196 197 198 def initialize_dut(self, cold_reset=False): 199 """Initializes a dut for testing purposes. 200 201 This sets various servo signals back to default values 202 appropriate for the target board. By default, if the DUT 203 is already on, it stays on. If the DUT is powered off 204 before initialization, its state afterward is unspecified. 205 206 Rationale: Basic initialization of servo sets the lid open, 207 when there is a lid. This operation won't affect powered on 208 units; however, setting the lid open may power on a unit 209 that's off, depending on the board type and previous state 210 of the device. 211 212 If `cold_reset` is a true value, the DUT and its EC will be 213 reset, and the DUT rebooted in normal mode. 214 215 @param cold_reset If True, cold reset the device after 216 initialization. 217 """ 218 self._server.hwinit() 219 self.set('usb_mux_oe1', 'on') 220 self._usb_state = None 221 self.switch_usbkey('off') 222 if cold_reset: 223 self._power_state.reset() 224 logging.debug('Servo initialized, version is %s', 225 self._server.get_version()) 226 227 228 def is_localhost(self): 229 """Is the servod hosted locally? 230 231 Returns: 232 True if local hosted; otherwise, False. 233 """ 234 return self._servo_host.is_localhost() 235 236 237 def power_long_press(self): 238 """Simulate a long power button press.""" 239 # After a long power press, the EC may ignore the next power 240 # button press (at least on Alex). To guarantee that this 241 # won't happen, we need to allow the EC one second to 242 # collect itself. 243 self._server.power_long_press() 244 245 246 def power_normal_press(self): 247 """Simulate a normal power button press.""" 248 self._server.power_normal_press() 249 250 251 def power_short_press(self): 252 """Simulate a short power button press.""" 253 self._server.power_short_press() 254 255 256 def power_key(self, press_secs=''): 257 """Simulate a power button press. 258 259 @param press_secs : Str. Time to press key. 260 """ 261 self._server.power_key(press_secs) 262 263 264 def lid_open(self): 265 """Simulate opening the lid and raise exception if all attempts fail""" 266 self.set('lid_open', 'yes') 267 268 269 def lid_close(self): 270 """Simulate closing the lid and raise exception if all attempts fail 271 272 Waits 6 seconds to ensure the device is fully asleep before returning. 273 """ 274 self.set('lid_open', 'no') 275 time.sleep(Servo.SLEEP_DELAY) 276 277 def volume_up(self, timeout=300): 278 """Simulate pushing the volume down button. 279 280 @param timeout: Timeout for setting the volume. 281 """ 282 self.set_get_all(['volume_up:yes', 283 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 284 'volume_up:no']) 285 # we need to wait for commands to take effect before moving on 286 time_left = float(timeout) 287 while time_left > 0.0: 288 value = self.get('volume_up') 289 if value == 'no': 290 return 291 time.sleep(self.SHORT_DELAY) 292 time_left = time_left - self.SHORT_DELAY 293 raise error.TestFail("Failed setting volume_up to no") 294 295 def volume_down(self, timeout=300): 296 """Simulate pushing the volume down button. 297 298 @param timeout: Timeout for setting the volume. 299 """ 300 self.set_get_all(['volume_down:yes', 301 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 302 'volume_down:no']) 303 # we need to wait for commands to take effect before moving on 304 time_left = float(timeout) 305 while time_left > 0.0: 306 value = self.get('volume_down') 307 if value == 'no': 308 return 309 time.sleep(self.SHORT_DELAY) 310 time_left = time_left - self.SHORT_DELAY 311 raise error.TestFail("Failed setting volume_down to no") 312 313 def ctrl_d(self, press_secs=''): 314 """Simulate Ctrl-d simultaneous button presses. 315 316 @param press_secs : Str. Time to press key. 317 """ 318 self._server.ctrl_d(press_secs) 319 320 321 def ctrl_u(self): 322 """Simulate Ctrl-u simultaneous button presses. 323 324 @param press_secs : Str. Time to press key. 325 """ 326 self._server.ctrl_u() 327 328 329 def ctrl_enter(self, press_secs=''): 330 """Simulate Ctrl-enter simultaneous button presses. 331 332 @param press_secs : Str. Time to press key. 333 """ 334 self._server.ctrl_enter(press_secs) 335 336 337 def d_key(self, press_secs=''): 338 """Simulate Enter key button press. 339 340 @param press_secs : Str. Time to press key. 341 """ 342 self._server.d_key(press_secs) 343 344 345 def ctrl_key(self, press_secs=''): 346 """Simulate Enter key button press. 347 348 @param press_secs : Str. Time to press key. 349 """ 350 self._server.ctrl_key(press_secs) 351 352 353 def enter_key(self, press_secs=''): 354 """Simulate Enter key button press. 355 356 @param press_secs : Str. Time to press key. 357 """ 358 self._server.enter_key(press_secs) 359 360 361 def refresh_key(self, press_secs=''): 362 """Simulate Refresh key (F3) button press. 363 364 @param press_secs : Str. Time to press key. 365 """ 366 self._server.refresh_key(press_secs) 367 368 369 def ctrl_refresh_key(self, press_secs=''): 370 """Simulate Ctrl and Refresh (F3) simultaneous press. 371 372 This key combination is an alternative of Space key. 373 374 @param press_secs : Str. Time to press key. 375 """ 376 self._server.ctrl_refresh_key(press_secs) 377 378 379 def imaginary_key(self, press_secs=''): 380 """Simulate imaginary key button press. 381 382 Maps to a key that doesn't physically exist. 383 384 @param press_secs : Str. Time to press key. 385 """ 386 self._server.imaginary_key(press_secs) 387 388 389 def sysrq_x(self, press_secs=''): 390 """Simulate Alt VolumeUp X simulataneous press. 391 392 This key combination is the kernel system request (sysrq) X. 393 394 @param press_secs : Str. Time to press key. 395 """ 396 self._server.sysrq_x(press_secs) 397 398 399 def toggle_recovery_switch(self): 400 """Toggle recovery switch on and off.""" 401 self.enable_recovery_mode() 402 time.sleep(self.REC_TOGGLE_DELAY) 403 self.disable_recovery_mode() 404 405 406 def enable_recovery_mode(self): 407 """Enable recovery mode on device.""" 408 self.set('rec_mode', 'on') 409 410 411 def disable_recovery_mode(self): 412 """Disable recovery mode on device.""" 413 self.set('rec_mode', 'off') 414 415 416 def toggle_development_switch(self): 417 """Toggle development switch on and off.""" 418 self.enable_development_mode() 419 time.sleep(self.DEV_TOGGLE_DELAY) 420 self.disable_development_mode() 421 422 423 def enable_development_mode(self): 424 """Enable development mode on device.""" 425 self.set('dev_mode', 'on') 426 427 428 def disable_development_mode(self): 429 """Disable development mode on device.""" 430 self.set('dev_mode', 'off') 431 432 def boot_devmode(self): 433 """Boot a dev-mode device that is powered off.""" 434 self.power_short_press() 435 self.pass_devmode() 436 437 438 def pass_devmode(self): 439 """Pass through boot screens in dev-mode.""" 440 time.sleep(Servo.BOOT_DELAY) 441 self.ctrl_d() 442 time.sleep(Servo.BOOT_DELAY) 443 444 445 def get_board(self): 446 """Get the board connected to servod. 447 448 """ 449 return self._server.get_board() 450 451 452 def _get_xmlrpclib_exception(self, xmlexc): 453 """Get meaningful exception string from xmlrpc. 454 455 Args: 456 xmlexc: xmlrpclib.Fault object 457 458 xmlrpclib.Fault.faultString has the following format: 459 460 <type 'exception type'>:'actual error message' 461 462 Parse and return the real exception from the servod side instead of the 463 less meaningful one like, 464 <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no 465 attribute 'hw_driver'"> 466 467 Returns: 468 string of underlying exception raised in servod. 469 """ 470 return re.sub('^.*>:', '', xmlexc.faultString) 471 472 473 def get(self, gpio_name): 474 """Get the value of a gpio from Servod. 475 476 @param gpio_name Name of the gpio. 477 """ 478 assert gpio_name 479 try: 480 return self._server.get(gpio_name) 481 except xmlrpclib.Fault as e: 482 err_msg = "Getting '%s' :: %s" % \ 483 (gpio_name, self._get_xmlrpclib_exception(e)) 484 raise error.TestFail(err_msg) 485 486 487 def set(self, gpio_name, gpio_value): 488 """Set and check the value of a gpio using Servod. 489 490 @param gpio_name Name of the gpio. 491 @param gpio_value New setting for the gpio. 492 """ 493 self.set_nocheck(gpio_name, gpio_value) 494 retry_count = Servo.GET_RETRY_MAX 495 while gpio_value != self.get(gpio_name) and retry_count: 496 logging.warning("%s != %s, retry %d", gpio_name, gpio_value, 497 retry_count) 498 retry_count -= 1 499 time.sleep(Servo.SHORT_DELAY) 500 if not retry_count: 501 assert gpio_value == self.get(gpio_name), \ 502 'Servo failed to set %s to %s' % (gpio_name, gpio_value) 503 504 505 def set_nocheck(self, gpio_name, gpio_value): 506 """Set the value of a gpio using Servod. 507 508 @param gpio_name Name of the gpio. 509 @param gpio_value New setting for the gpio. 510 """ 511 assert gpio_name and gpio_value 512 logging.info('Setting %s to %s', gpio_name, gpio_value) 513 try: 514 self._server.set(gpio_name, gpio_value) 515 except xmlrpclib.Fault as e: 516 err_msg = "Setting '%s' to '%s' :: %s" % \ 517 (gpio_name, gpio_value, self._get_xmlrpclib_exception(e)) 518 raise error.TestFail(err_msg) 519 520 521 def set_get_all(self, controls): 522 """Set &| get one or more control values. 523 524 @param controls: list of strings, controls to set &| get. 525 526 @raise: error.TestError in case error occurs setting/getting values. 527 """ 528 rv = [] 529 try: 530 logging.info('Set/get all: %s', str(controls)) 531 rv = self._server.set_get_all(controls) 532 except xmlrpclib.Fault as e: 533 # TODO(waihong): Remove the following backward compatibility when 534 # the new versions of hdctools are deployed. 535 if 'not supported' in str(e): 536 logging.warning('The servod is too old that set_get_all ' 537 'not supported. Use set and get instead.') 538 for control in controls: 539 if ':' in control: 540 (name, value) = control.split(':') 541 if name == 'sleep': 542 time.sleep(float(value)) 543 else: 544 self.set_nocheck(name, value) 545 rv.append(True) 546 else: 547 rv.append(self.get(name)) 548 else: 549 err_msg = "Problem with '%s' :: %s" % \ 550 (controls, self._get_xmlrpclib_exception(e)) 551 raise error.TestFail(err_msg) 552 return rv 553 554 555 # TODO(waihong) It may fail if multiple servo's are connected to the same 556 # host. Should look for a better way, like the USB serial name, to identify 557 # the USB device. 558 # TODO(sbasi) Remove this code from autoserv once firmware tests have been 559 # updated. 560 def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT): 561 """Probe the USB disk device plugged-in the servo from the host side. 562 563 It uses servod to discover if there is a usb device attached to it. 564 565 @param timeout The timeout period when probing for the usb host device. 566 567 @return: String of USB disk path (e.g. '/dev/sdb') or None. 568 """ 569 return self._server.probe_host_usb_dev(timeout) or None 570 571 572 def image_to_servo_usb(self, image_path=None, 573 make_image_noninteractive=False): 574 """Install an image to the USB key plugged into the servo. 575 576 This method may copy any image to the servo USB key including a 577 recovery image or a test image. These images are frequently used 578 for test purposes such as restoring a corrupted image or conducting 579 an upgrade of ec/fw/kernel as part of a test of a specific image part. 580 581 @param image_path Path on the host to the recovery image. 582 @param make_image_noninteractive Make the recovery image 583 noninteractive, therefore the DUT 584 will reboot automatically after 585 installation. 586 """ 587 # We're about to start plugging/unplugging the USB key. We 588 # don't know the state of the DUT, or what it might choose 589 # to do to the device after hotplug. To avoid surprises, 590 # force the DUT to be off. 591 self._server.hwinit() 592 self._power_state.power_off() 593 594 # Set up Servo's usb mux. 595 self.switch_usbkey('host') 596 if image_path: 597 logging.info('Searching for usb device and copying image to it. ' 598 'Please wait a few minutes...') 599 if not self._server.download_image_to_usb(image_path): 600 logging.error('Failed to transfer requested image to USB. ' 601 'Please take a look at Servo Logs.') 602 raise error.AutotestError('Download image to usb failed.') 603 if make_image_noninteractive: 604 logging.info('Making image noninteractive') 605 if not self._server.make_image_noninteractive(): 606 logging.error('Failed to make image noninteractive. ' 607 'Please take a look at Servo Logs.') 608 609 610 def install_recovery_image(self, image_path=None, 611 make_image_noninteractive=False): 612 """Install the recovery image specified by the path onto the DUT. 613 614 This method uses google recovery mode to install a recovery image 615 onto a DUT through the use of a USB stick that is mounted on a servo 616 board specified by the usb_dev. If no image path is specified 617 we use the recovery image already on the usb image. 618 619 @param image_path: Path on the host to the recovery image. 620 @param make_image_noninteractive: Make the recovery image 621 noninteractive, therefore the DUT will reboot automatically 622 after installation. 623 """ 624 self.image_to_servo_usb(image_path, make_image_noninteractive) 625 self._power_state.power_on(rec_mode=self._power_state.REC_ON) 626 self.switch_usbkey('dut') 627 628 629 def _scp_image(self, image_path): 630 """Copy image to the servo host. 631 632 When programming a firmware image on the DUT, the image must be 633 located on the host to which the servo device is connected. Sometimes 634 servo is controlled by a remote host, in this case the image needs to 635 be transferred to the remote host. 636 637 @param image_path: a string, name of the firmware image file to be 638 transferred. 639 @return: a string, full path name of the copied file on the remote. 640 """ 641 642 dest_path = os.path.join('/tmp', os.path.basename(image_path)) 643 self._servo_host.send_file(image_path, dest_path) 644 return dest_path 645 646 647 def system(self, command, timeout=3600): 648 """Execute the passed in command on the servod host. 649 650 @param command Command to be executed. 651 @param timeout Maximum number of seconds of runtime allowed. Default to 652 1 hour. 653 """ 654 logging.info('Will execute on servo host: %s', command) 655 self._servo_host.run(command, timeout=timeout) 656 657 658 def system_output(self, command, timeout=3600, 659 ignore_status=False, args=()): 660 """Execute the passed in command on the servod host, return stdout. 661 662 @param command a string, the command to execute 663 @param timeout an int, max number of seconds to wait til command 664 execution completes. Default to 1 hour. 665 @param ignore_status a Boolean, if true - ignore command's nonzero exit 666 status, otherwise an exception will be thrown 667 @param args a tuple of strings, each becoming a separate command line 668 parameter for the command 669 @return command's stdout as a string. 670 """ 671 return self._servo_host.run(command, timeout=timeout, 672 ignore_status=ignore_status, 673 args=args).stdout.strip() 674 675 676 def get_servo_version(self): 677 """Get the version of the servo, e.g., servo_v2 or servo_v3. 678 679 @return: The version of the servo. 680 681 """ 682 return self._server.get_version() 683 684 685 def _initialize_programmer(self, rw_only=False): 686 """Initialize the firmware programmer. 687 688 @param rw_only: True to initialize a programmer which only 689 programs the RW portions. 690 """ 691 if self._programmer: 692 return 693 # Initialize firmware programmer 694 servo_version = self.get_servo_version() 695 if servo_version.startswith('servo_v2'): 696 self._programmer = firmware_programmer.ProgrammerV2(self) 697 self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self) 698 # Both servo v3 and v4 use the same programming methods so just leverage 699 # ProgrammerV3 for servo v4 as well. 700 elif (servo_version.startswith('servo_v3') or 701 servo_version.startswith('servo_v4')): 702 self._programmer = firmware_programmer.ProgrammerV3(self) 703 self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self) 704 else: 705 raise error.TestError( 706 'No firmware programmer for servo version: %s' % 707 servo_version) 708 709 710 def program_bios(self, image, rw_only=False): 711 """Program bios on DUT with given image. 712 713 @param image: a string, file name of the BIOS image to program 714 on the DUT. 715 @param rw_only: True to only program the RW portion of BIOS. 716 717 """ 718 self._initialize_programmer() 719 if not self.is_localhost(): 720 image = self._scp_image(image) 721 if rw_only: 722 self._programmer_rw.program_bios(image) 723 else: 724 self._programmer.program_bios(image) 725 726 727 def program_ec(self, image, rw_only=False): 728 """Program ec on DUT with given image. 729 730 @param image: a string, file name of the EC image to program 731 on the DUT. 732 @param rw_only: True to only program the RW portion of EC. 733 734 """ 735 self._initialize_programmer() 736 if not self.is_localhost(): 737 image = self._scp_image(image) 738 if rw_only: 739 self._programmer_rw.program_ec(image) 740 else: 741 self._programmer.program_ec(image) 742 743 744 def _switch_usbkey_power(self, power_state, detection_delay=False): 745 """Switch usbkey power. 746 747 This function switches usbkey power by setting the value of 748 'prtctl4_pwren'. If the power is already in the 749 requested state, this function simply returns. 750 751 @param power_state: A string, 'on' or 'off'. 752 @param detection_delay: A boolean value, if True, sleep 753 for |USB_DETECTION_DELAY| after switching 754 the power on. 755 """ 756 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 757 # handle beaglebones that haven't yet updated and have the 758 # safe_switch_usbkey_power RPC. I'll remove this once all beaglebones 759 # have been updated and also think about a better way to handle 760 # situations like this. 761 try: 762 self._server.safe_switch_usbkey_power(power_state) 763 except Exception: 764 self.set('prtctl4_pwren', power_state) 765 if power_state == 'off': 766 time.sleep(self.USB_POWEROFF_DELAY) 767 elif detection_delay: 768 time.sleep(self.USB_DETECTION_DELAY) 769 770 771 def switch_usbkey(self, usb_state): 772 """Connect USB flash stick to either host or DUT, or turn USB port off. 773 774 This function switches the servo multiplexer to provide electrical 775 connection between the USB port J3 and either host or DUT side. It 776 can also be used to turn the USB port off. 777 778 Switching to 'dut' or 'host' is accompanied by powercycling 779 of the USB stick, because it sometimes gets wedged if the mux 780 is switched while the stick power is on. 781 782 @param usb_state: A string, one of 'dut', 'host', or 'off'. 783 'dut' and 'host' indicate which side the 784 USB flash device is required to be connected to. 785 'off' indicates turning the USB port off. 786 787 @raise: error.TestError in case the parameter is not 'dut' 788 'host', or 'off'. 789 """ 790 if self.get_usbkey_direction() == usb_state: 791 return 792 793 if usb_state == 'off': 794 self._switch_usbkey_power('off') 795 self._usb_state = usb_state 796 return 797 elif usb_state == 'host': 798 mux_direction = 'servo_sees_usbkey' 799 elif usb_state == 'dut': 800 mux_direction = 'dut_sees_usbkey' 801 else: 802 raise error.TestError('Unknown USB state request: %s' % usb_state) 803 804 self._switch_usbkey_power('off') 805 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 806 # handle beaglebones that haven't yet updated and have the 807 # safe_switch_usbkey RPC. I'll remove this once all beaglebones have 808 # been updated and also think about a better way to handle situations 809 # like this. 810 try: 811 self._server.safe_switch_usbkey(mux_direction) 812 except Exception: 813 self.set('usb_mux_sel1', mux_direction) 814 time.sleep(self.USB_POWEROFF_DELAY) 815 self._switch_usbkey_power('on', usb_state == 'host') 816 self._usb_state = usb_state 817 818 819 def get_usbkey_direction(self): 820 """Get which side USB is connected to or 'off' if usb power is off. 821 822 @return: A string, one of 'dut', 'host', or 'off'. 823 """ 824 if not self._usb_state: 825 if self.get('prtctl4_pwren') == 'off': 826 self._usb_state = 'off' 827 elif self.get('usb_mux_sel1').startswith('dut'): 828 self._usb_state = 'dut' 829 else: 830 self._usb_state = 'host' 831 return self._usb_state 832