1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4# 5# Expects to be run in an environment with sudo and no interactive password 6# prompt, such as within the Chromium OS development chroot. 7 8import os 9 10import logging, re, time, xmlrpclib 11 12from autotest_lib.client.common_lib import error 13from autotest_lib.server.cros.servo import firmware_programmer 14 15# Time to wait when probing for a usb device, it takes on avg 17 seconds 16# to do a full probe. 17_USB_PROBE_TIMEOUT = 40 18 19 20class _PowerStateController(object): 21 22 """Class to provide board-specific power operations. 23 24 This class is responsible for "power on" and "power off" 25 operations that can operate without making assumptions in 26 advance about board state. It offers an interface that 27 abstracts out the different sequences required for different 28 board types. 29 30 """ 31 32 # Constants acceptable to be passed for the `rec_mode` parameter 33 # to power_on(). 34 # 35 # REC_ON: Boot the DUT in recovery mode, i.e. boot from USB or 36 # SD card. 37 # REC_OFF: Boot in normal mode, i.e. boot from internal storage. 38 39 REC_ON = 'rec' 40 REC_OFF = 'on' 41 42 # Delay in seconds needed between asserting and de-asserting 43 # warm reset. 44 _RESET_HOLD_TIME = 0.5 45 46 def __init__(self, servo): 47 """Initialize the power state control. 48 49 @param servo Servo object providing the underlying `set` and `get` 50 methods for the target controls. 51 52 """ 53 self._servo = servo 54 55 def reset(self): 56 """Force the DUT to reset. 57 58 The DUT is guaranteed to be on at the end of this call, 59 regardless of its previous state, provided that there is 60 working OS software. This also guarantees that the EC has 61 been restarted. 62 63 """ 64 self._servo.set_nocheck('power_state', 'reset') 65 66 def warm_reset(self): 67 """Apply warm reset to the DUT. 68 69 This asserts, then de-asserts the 'warm_reset' signal. 70 Generally, this causes the board to restart. 71 72 """ 73 self._servo.set_get_all(['warm_reset:on', 74 'sleep:%.4f' % self._RESET_HOLD_TIME, 75 'warm_reset:off']) 76 77 def power_off(self): 78 """Force the DUT to power off. 79 80 The DUT is guaranteed to be off at the end of this call, 81 regardless of its previous state, provided that there is 82 working EC and boot firmware. There is no requirement for 83 working OS software. 84 85 """ 86 self._servo.set_nocheck('power_state', 'off') 87 88 def power_on(self, rec_mode=REC_OFF): 89 """Force the DUT to power on. 90 91 Prior to calling this function, the DUT must be powered off, 92 e.g. with a call to `power_off()`. 93 94 At power on, recovery mode is set as specified by the 95 corresponding argument. When booting with recovery mode on, it 96 is the caller's responsibility to unplug/plug in a bootable 97 external storage device. 98 99 If the DUT requires a delay after powering on but before 100 processing inputs such as USB stick insertion, the delay is 101 handled by this method; the caller is not responsible for such 102 delays. 103 104 @param rec_mode Setting of recovery mode to be applied at 105 power on. default: REC_OFF aka 'off' 106 107 """ 108 self._servo.set_nocheck('power_state', rec_mode) 109 110 111class Servo(object): 112 113 """Manages control of a Servo board. 114 115 Servo is a board developed by hardware group to aide in the debug and 116 control of various partner devices. Servo's features include the simulation 117 of pressing the power button, closing the lid, and pressing Ctrl-d. This 118 class manages setting up and communicating with a servo demon (servod) 119 process. It provides both high-level functions for common servo tasks and 120 low-level functions for directly setting and reading gpios. 121 122 """ 123 124 # Power button press delays in seconds. 125 # 126 # The EC specification says that 8.0 seconds should be enough 127 # for the long power press. However, some platforms need a bit 128 # more time. Empirical testing has found these requirements: 129 # Alex: 8.2 seconds 130 # ZGB: 8.5 seconds 131 # The actual value is set to the largest known necessary value. 132 # 133 # TODO(jrbarnette) Being generous is the right thing to do for 134 # existing platforms, but if this code is to be used for 135 # qualification of new hardware, we should be less generous. 136 SHORT_DELAY = 0.1 137 138 # Maximum number of times to re-read power button on release. 139 GET_RETRY_MAX = 10 140 141 # Delays to deal with DUT state transitions. 142 SLEEP_DELAY = 6 143 BOOT_DELAY = 10 144 145 # Default minimum time interval between 'press' and 'release' 146 # keyboard events. 147 SERVO_KEY_PRESS_DELAY = 0.1 148 149 # Time to toggle recovery switch on and off. 150 REC_TOGGLE_DELAY = 0.1 151 152 # Time to toggle development switch on and off. 153 DEV_TOGGLE_DELAY = 0.1 154 155 # Time between an usb disk plugged-in and detected in the system. 156 USB_DETECTION_DELAY = 10 157 # Time to keep USB power off before and after USB mux direction is changed 158 USB_POWEROFF_DELAY = 2 159 160 # Time to wait before timing out on servo initialization. 161 INIT_TIMEOUT_SECS = 10 162 163 164 def __init__(self, servo_host, servo_serial=None): 165 """Sets up the servo communication infrastructure. 166 167 @param servo_host: A ServoHost object representing 168 the host running servod. 169 @param servo_serial: Serial number of the servo board. 170 """ 171 # TODO(fdeng): crbug.com/298379 172 # We should move servo_host object out of servo object 173 # to minimize the dependencies on the rest of Autotest. 174 self._servo_host = servo_host 175 self._servo_serial = servo_serial 176 self._server = servo_host.get_servod_server_proxy() 177 self._power_state = _PowerStateController(self) 178 self._usb_state = None 179 self._programmer = None 180 181 182 @property 183 def servo_serial(self): 184 """Returns the serial number of the servo board.""" 185 return self._servo_serial 186 187 188 def get_power_state_controller(self): 189 """Return the power state controller for this Servo. 190 191 The power state controller provides board-independent 192 interfaces for reset, power-on, power-off operations. 193 194 """ 195 return self._power_state 196 197 198 def initialize_dut(self, cold_reset=False): 199 """Initializes a dut for testing purposes. 200 201 This sets various servo signals back to default values 202 appropriate for the target board. By default, if the DUT 203 is already on, it stays on. If the DUT is powered off 204 before initialization, its state afterward is unspecified. 205 206 Rationale: Basic initialization of servo sets the lid open, 207 when there is a lid. This operation won't affect powered on 208 units; however, setting the lid open may power on a unit 209 that's off, depending on the board type and previous state 210 of the device. 211 212 If `cold_reset` is a true value, the DUT and its EC will be 213 reset, and the DUT rebooted in normal mode. 214 215 @param cold_reset If True, cold reset the device after 216 initialization. 217 """ 218 self._server.hwinit() 219 self.set('usb_mux_oe1', 'on') 220 self._usb_state = None 221 self.switch_usbkey('off') 222 if cold_reset: 223 self._power_state.reset() 224 logging.debug('Servo initialized, version is %s', 225 self._server.get_version()) 226 227 228 def is_localhost(self): 229 """Is the servod hosted locally? 230 231 Returns: 232 True if local hosted; otherwise, False. 233 """ 234 return self._servo_host.is_localhost() 235 236 237 def power_long_press(self): 238 """Simulate a long power button press.""" 239 # After a long power press, the EC may ignore the next power 240 # button press (at least on Alex). To guarantee that this 241 # won't happen, we need to allow the EC one second to 242 # collect itself. 243 self._server.power_long_press() 244 245 246 def power_normal_press(self): 247 """Simulate a normal power button press.""" 248 self._server.power_normal_press() 249 250 251 def power_short_press(self): 252 """Simulate a short power button press.""" 253 self._server.power_short_press() 254 255 256 def power_key(self, press_secs=''): 257 """Simulate a power button press. 258 259 @param press_secs : Str. Time to press key. 260 """ 261 self._server.power_key(press_secs) 262 263 264 def lid_open(self): 265 """Simulate opening the lid and raise exception if all attempts fail""" 266 self.set('lid_open', 'yes') 267 268 269 def lid_close(self): 270 """Simulate closing the lid and raise exception if all attempts fail 271 272 Waits 6 seconds to ensure the device is fully asleep before returning. 273 """ 274 self.set('lid_open', 'no') 275 time.sleep(Servo.SLEEP_DELAY) 276 277 def volume_up(self, timeout=300): 278 """Simulate pushing the volume down button. 279 280 @param timeout: Timeout for setting the volume. 281 """ 282 self.set_get_all(['volume_up:yes', 283 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 284 'volume_up:no']) 285 # we need to wait for commands to take effect before moving on 286 time_left = float(timeout) 287 while time_left > 0.0: 288 value = self.get('volume_up') 289 if value == 'no': 290 return 291 time.sleep(self.SHORT_DELAY) 292 time_left = time_left - self.SHORT_DELAY 293 raise error.TestFail("Failed setting volume_up to no") 294 295 def volume_down(self, timeout=300): 296 """Simulate pushing the volume down button. 297 298 @param timeout: Timeout for setting the volume. 299 """ 300 self.set_get_all(['volume_down:yes', 301 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 302 'volume_down:no']) 303 # we need to wait for commands to take effect before moving on 304 time_left = float(timeout) 305 while time_left > 0.0: 306 value = self.get('volume_down') 307 if value == 'no': 308 return 309 time.sleep(self.SHORT_DELAY) 310 time_left = time_left - self.SHORT_DELAY 311 raise error.TestFail("Failed setting volume_down to no") 312 313 def ctrl_d(self, press_secs=''): 314 """Simulate Ctrl-d simultaneous button presses. 315 316 @param press_secs : Str. Time to press key. 317 """ 318 self._server.ctrl_d(press_secs) 319 320 321 def ctrl_u(self): 322 """Simulate Ctrl-u simultaneous button presses. 323 324 @param press_secs : Str. Time to press key. 325 """ 326 self._server.ctrl_u() 327 328 329 def ctrl_enter(self, press_secs=''): 330 """Simulate Ctrl-enter simultaneous button presses. 331 332 @param press_secs : Str. Time to press key. 333 """ 334 self._server.ctrl_enter(press_secs) 335 336 337 def d_key(self, press_secs=''): 338 """Simulate Enter key button press. 339 340 @param press_secs : Str. Time to press key. 341 """ 342 self._server.d_key(press_secs) 343 344 345 def ctrl_key(self, press_secs=''): 346 """Simulate Enter key button press. 347 348 @param press_secs : Str. Time to press key. 349 """ 350 self._server.ctrl_key(press_secs) 351 352 353 def enter_key(self, press_secs=''): 354 """Simulate Enter key button press. 355 356 @param press_secs : Str. Time to press key. 357 """ 358 self._server.enter_key(press_secs) 359 360 361 def refresh_key(self, press_secs=''): 362 """Simulate Refresh key (F3) button press. 363 364 @param press_secs : Str. Time to press key. 365 """ 366 self._server.refresh_key(press_secs) 367 368 369 def ctrl_refresh_key(self, press_secs=''): 370 """Simulate Ctrl and Refresh (F3) simultaneous press. 371 372 This key combination is an alternative of Space key. 373 374 @param press_secs : Str. Time to press key. 375 """ 376 self._server.ctrl_refresh_key(press_secs) 377 378 379 def imaginary_key(self, press_secs=''): 380 """Simulate imaginary key button press. 381 382 Maps to a key that doesn't physically exist. 383 384 @param press_secs : Str. Time to press key. 385 """ 386 self._server.imaginary_key(press_secs) 387 388 389 def sysrq_x(self, press_secs=''): 390 """Simulate Alt VolumeUp X simulataneous press. 391 392 This key combination is the kernel system request (sysrq) X. 393 394 @param press_secs : Str. Time to press key. 395 """ 396 self._server.sysrq_x(press_secs) 397 398 399 def toggle_recovery_switch(self): 400 """Toggle recovery switch on and off.""" 401 self.enable_recovery_mode() 402 time.sleep(self.REC_TOGGLE_DELAY) 403 self.disable_recovery_mode() 404 405 406 def enable_recovery_mode(self): 407 """Enable recovery mode on device.""" 408 self.set('rec_mode', 'on') 409 410 411 def disable_recovery_mode(self): 412 """Disable recovery mode on device.""" 413 self.set('rec_mode', 'off') 414 415 416 def toggle_development_switch(self): 417 """Toggle development switch on and off.""" 418 self.enable_development_mode() 419 time.sleep(self.DEV_TOGGLE_DELAY) 420 self.disable_development_mode() 421 422 423 def enable_development_mode(self): 424 """Enable development mode on device.""" 425 self.set('dev_mode', 'on') 426 427 428 def disable_development_mode(self): 429 """Disable development mode on device.""" 430 self.set('dev_mode', 'off') 431 432 def boot_devmode(self): 433 """Boot a dev-mode device that is powered off.""" 434 self.power_short_press() 435 self.pass_devmode() 436 437 438 def pass_devmode(self): 439 """Pass through boot screens in dev-mode.""" 440 time.sleep(Servo.BOOT_DELAY) 441 self.ctrl_d() 442 time.sleep(Servo.BOOT_DELAY) 443 444 445 def get_board(self): 446 """Get the board connected to servod.""" 447 return self._server.get_board() 448 449 450 def get_base_board(self): 451 """Get the board of the base connected to servod.""" 452 try: 453 return self._server.get_base_board() 454 except xmlrpclib.Fault as e: 455 # TODO(waihong): Remove the following compatibility check when 456 # the new versions of hdctools are deployed. 457 if 'not supported' in str(e): 458 logging.warning('The servod is too old that get_base_board ' 459 'not supported.') 460 return '' 461 raise 462 463 464 def get_ec_active_copy(self): 465 """Get the active copy of the EC image.""" 466 return self.get('ec_active_copy') 467 468 469 def _get_xmlrpclib_exception(self, xmlexc): 470 """Get meaningful exception string from xmlrpc. 471 472 Args: 473 xmlexc: xmlrpclib.Fault object 474 475 xmlrpclib.Fault.faultString has the following format: 476 477 <type 'exception type'>:'actual error message' 478 479 Parse and return the real exception from the servod side instead of the 480 less meaningful one like, 481 <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no 482 attribute 'hw_driver'"> 483 484 Returns: 485 string of underlying exception raised in servod. 486 """ 487 return re.sub('^.*>:', '', xmlexc.faultString) 488 489 490 def get(self, gpio_name): 491 """Get the value of a gpio from Servod. 492 493 @param gpio_name Name of the gpio. 494 """ 495 assert gpio_name 496 try: 497 return self._server.get(gpio_name) 498 except xmlrpclib.Fault as e: 499 err_msg = "Getting '%s' :: %s" % \ 500 (gpio_name, self._get_xmlrpclib_exception(e)) 501 raise error.TestFail(err_msg) 502 503 504 def set(self, gpio_name, gpio_value): 505 """Set and check the value of a gpio using Servod. 506 507 @param gpio_name Name of the gpio. 508 @param gpio_value New setting for the gpio. 509 """ 510 self.set_nocheck(gpio_name, gpio_value) 511 retry_count = Servo.GET_RETRY_MAX 512 while gpio_value != self.get(gpio_name) and retry_count: 513 logging.warning("%s != %s, retry %d", gpio_name, gpio_value, 514 retry_count) 515 retry_count -= 1 516 time.sleep(Servo.SHORT_DELAY) 517 if not retry_count: 518 assert gpio_value == self.get(gpio_name), \ 519 'Servo failed to set %s to %s' % (gpio_name, gpio_value) 520 521 522 def set_nocheck(self, gpio_name, gpio_value): 523 """Set the value of a gpio using Servod. 524 525 @param gpio_name Name of the gpio. 526 @param gpio_value New setting for the gpio. 527 """ 528 assert gpio_name and gpio_value 529 logging.info('Setting %s to %s', gpio_name, gpio_value) 530 try: 531 self._server.set(gpio_name, gpio_value) 532 except xmlrpclib.Fault as e: 533 err_msg = "Setting '%s' to '%s' :: %s" % \ 534 (gpio_name, gpio_value, self._get_xmlrpclib_exception(e)) 535 raise error.TestFail(err_msg) 536 537 538 def set_get_all(self, controls): 539 """Set &| get one or more control values. 540 541 @param controls: list of strings, controls to set &| get. 542 543 @raise: error.TestError in case error occurs setting/getting values. 544 """ 545 rv = [] 546 try: 547 logging.info('Set/get all: %s', str(controls)) 548 rv = self._server.set_get_all(controls) 549 except xmlrpclib.Fault as e: 550 # TODO(waihong): Remove the following backward compatibility when 551 # the new versions of hdctools are deployed. 552 if 'not supported' in str(e): 553 logging.warning('The servod is too old that set_get_all ' 554 'not supported. Use set and get instead.') 555 for control in controls: 556 if ':' in control: 557 (name, value) = control.split(':') 558 if name == 'sleep': 559 time.sleep(float(value)) 560 else: 561 self.set_nocheck(name, value) 562 rv.append(True) 563 else: 564 rv.append(self.get(name)) 565 else: 566 err_msg = "Problem with '%s' :: %s" % \ 567 (controls, self._get_xmlrpclib_exception(e)) 568 raise error.TestFail(err_msg) 569 return rv 570 571 572 # TODO(waihong) It may fail if multiple servo's are connected to the same 573 # host. Should look for a better way, like the USB serial name, to identify 574 # the USB device. 575 # TODO(sbasi) Remove this code from autoserv once firmware tests have been 576 # updated. 577 def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT): 578 """Probe the USB disk device plugged-in the servo from the host side. 579 580 It uses servod to discover if there is a usb device attached to it. 581 582 @param timeout The timeout period when probing for the usb host device. 583 584 @return: String of USB disk path (e.g. '/dev/sdb') or None. 585 """ 586 return self._server.probe_host_usb_dev(timeout) or None 587 588 589 def image_to_servo_usb(self, image_path=None, 590 make_image_noninteractive=False): 591 """Install an image to the USB key plugged into the servo. 592 593 This method may copy any image to the servo USB key including a 594 recovery image or a test image. These images are frequently used 595 for test purposes such as restoring a corrupted image or conducting 596 an upgrade of ec/fw/kernel as part of a test of a specific image part. 597 598 @param image_path Path on the host to the recovery image. 599 @param make_image_noninteractive Make the recovery image 600 noninteractive, therefore the DUT 601 will reboot automatically after 602 installation. 603 """ 604 # We're about to start plugging/unplugging the USB key. We 605 # don't know the state of the DUT, or what it might choose 606 # to do to the device after hotplug. To avoid surprises, 607 # force the DUT to be off. 608 self._server.hwinit() 609 self._power_state.power_off() 610 611 # Set up Servo's usb mux. 612 self.switch_usbkey('host') 613 if image_path: 614 logging.info('Searching for usb device and copying image to it. ' 615 'Please wait a few minutes...') 616 if not self._server.download_image_to_usb(image_path): 617 logging.error('Failed to transfer requested image to USB. ' 618 'Please take a look at Servo Logs.') 619 raise error.AutotestError('Download image to usb failed.') 620 if make_image_noninteractive: 621 logging.info('Making image noninteractive') 622 if not self._server.make_image_noninteractive(): 623 logging.error('Failed to make image noninteractive. ' 624 'Please take a look at Servo Logs.') 625 626 627 def install_recovery_image(self, image_path=None, 628 make_image_noninteractive=False): 629 """Install the recovery image specified by the path onto the DUT. 630 631 This method uses google recovery mode to install a recovery image 632 onto a DUT through the use of a USB stick that is mounted on a servo 633 board specified by the usb_dev. If no image path is specified 634 we use the recovery image already on the usb image. 635 636 @param image_path: Path on the host to the recovery image. 637 @param make_image_noninteractive: Make the recovery image 638 noninteractive, therefore the DUT will reboot automatically 639 after installation. 640 """ 641 self.image_to_servo_usb(image_path, make_image_noninteractive) 642 self._power_state.power_on(rec_mode=self._power_state.REC_ON) 643 self.switch_usbkey('dut') 644 645 646 def _scp_image(self, image_path): 647 """Copy image to the servo host. 648 649 When programming a firmware image on the DUT, the image must be 650 located on the host to which the servo device is connected. Sometimes 651 servo is controlled by a remote host, in this case the image needs to 652 be transferred to the remote host. 653 654 @param image_path: a string, name of the firmware image file to be 655 transferred. 656 @return: a string, full path name of the copied file on the remote. 657 """ 658 659 dest_path = os.path.join('/tmp', os.path.basename(image_path)) 660 self._servo_host.send_file(image_path, dest_path) 661 return dest_path 662 663 664 def system(self, command, timeout=3600): 665 """Execute the passed in command on the servod host. 666 667 @param command Command to be executed. 668 @param timeout Maximum number of seconds of runtime allowed. Default to 669 1 hour. 670 """ 671 logging.info('Will execute on servo host: %s', command) 672 self._servo_host.run(command, timeout=timeout) 673 674 675 def system_output(self, command, timeout=3600, 676 ignore_status=False, args=()): 677 """Execute the passed in command on the servod host, return stdout. 678 679 @param command a string, the command to execute 680 @param timeout an int, max number of seconds to wait til command 681 execution completes. Default to 1 hour. 682 @param ignore_status a Boolean, if true - ignore command's nonzero exit 683 status, otherwise an exception will be thrown 684 @param args a tuple of strings, each becoming a separate command line 685 parameter for the command 686 @return command's stdout as a string. 687 """ 688 return self._servo_host.run(command, timeout=timeout, 689 ignore_status=ignore_status, 690 args=args).stdout.strip() 691 692 693 def get_servo_version(self): 694 """Get the version of the servo, e.g., servo_v2 or servo_v3. 695 696 @return: The version of the servo. 697 698 """ 699 return self._server.get_version() 700 701 702 def _initialize_programmer(self, rw_only=False): 703 """Initialize the firmware programmer. 704 705 @param rw_only: True to initialize a programmer which only 706 programs the RW portions. 707 """ 708 if self._programmer: 709 return 710 # Initialize firmware programmer 711 servo_version = self.get_servo_version() 712 if servo_version.startswith('servo_v2'): 713 self._programmer = firmware_programmer.ProgrammerV2(self) 714 self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self) 715 # Both servo v3 and v4 use the same programming methods so just leverage 716 # ProgrammerV3 for servo v4 as well. 717 elif (servo_version.startswith('servo_v3') or 718 servo_version.startswith('servo_v4')): 719 self._programmer = firmware_programmer.ProgrammerV3(self) 720 self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self) 721 else: 722 raise error.TestError( 723 'No firmware programmer for servo version: %s' % 724 servo_version) 725 726 727 def program_bios(self, image, rw_only=False): 728 """Program bios on DUT with given image. 729 730 @param image: a string, file name of the BIOS image to program 731 on the DUT. 732 @param rw_only: True to only program the RW portion of BIOS. 733 734 """ 735 self._initialize_programmer() 736 if not self.is_localhost(): 737 image = self._scp_image(image) 738 if rw_only: 739 self._programmer_rw.program_bios(image) 740 else: 741 self._programmer.program_bios(image) 742 743 744 def program_ec(self, image, rw_only=False): 745 """Program ec on DUT with given image. 746 747 @param image: a string, file name of the EC image to program 748 on the DUT. 749 @param rw_only: True to only program the RW portion of EC. 750 751 """ 752 self._initialize_programmer() 753 if not self.is_localhost(): 754 image = self._scp_image(image) 755 if rw_only: 756 self._programmer_rw.program_ec(image) 757 else: 758 self._programmer.program_ec(image) 759 760 761 def _switch_usbkey_power(self, power_state, detection_delay=False): 762 """Switch usbkey power. 763 764 This function switches usbkey power by setting the value of 765 'prtctl4_pwren'. If the power is already in the 766 requested state, this function simply returns. 767 768 @param power_state: A string, 'on' or 'off'. 769 @param detection_delay: A boolean value, if True, sleep 770 for |USB_DETECTION_DELAY| after switching 771 the power on. 772 """ 773 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 774 # handle beaglebones that haven't yet updated and have the 775 # safe_switch_usbkey_power RPC. I'll remove this once all beaglebones 776 # have been updated and also think about a better way to handle 777 # situations like this. 778 try: 779 self._server.safe_switch_usbkey_power(power_state) 780 except Exception: 781 self.set('prtctl4_pwren', power_state) 782 if power_state == 'off': 783 time.sleep(self.USB_POWEROFF_DELAY) 784 elif detection_delay: 785 time.sleep(self.USB_DETECTION_DELAY) 786 787 788 def switch_usbkey(self, usb_state): 789 """Connect USB flash stick to either host or DUT, or turn USB port off. 790 791 This function switches the servo multiplexer to provide electrical 792 connection between the USB port J3 and either host or DUT side. It 793 can also be used to turn the USB port off. 794 795 Switching to 'dut' or 'host' is accompanied by powercycling 796 of the USB stick, because it sometimes gets wedged if the mux 797 is switched while the stick power is on. 798 799 @param usb_state: A string, one of 'dut', 'host', or 'off'. 800 'dut' and 'host' indicate which side the 801 USB flash device is required to be connected to. 802 'off' indicates turning the USB port off. 803 804 @raise: error.TestError in case the parameter is not 'dut' 805 'host', or 'off'. 806 """ 807 if self.get_usbkey_direction() == usb_state: 808 return 809 810 if usb_state == 'off': 811 self._switch_usbkey_power('off') 812 self._usb_state = usb_state 813 return 814 elif usb_state == 'host': 815 mux_direction = 'servo_sees_usbkey' 816 elif usb_state == 'dut': 817 mux_direction = 'dut_sees_usbkey' 818 else: 819 raise error.TestError('Unknown USB state request: %s' % usb_state) 820 821 self._switch_usbkey_power('off') 822 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 823 # handle beaglebones that haven't yet updated and have the 824 # safe_switch_usbkey RPC. I'll remove this once all beaglebones have 825 # been updated and also think about a better way to handle situations 826 # like this. 827 try: 828 self._server.safe_switch_usbkey(mux_direction) 829 except Exception: 830 self.set('usb_mux_sel1', mux_direction) 831 time.sleep(self.USB_POWEROFF_DELAY) 832 self._switch_usbkey_power('on', usb_state == 'host') 833 self._usb_state = usb_state 834 835 836 def get_usbkey_direction(self): 837 """Get which side USB is connected to or 'off' if usb power is off. 838 839 @return: A string, one of 'dut', 'host', or 'off'. 840 """ 841 if not self._usb_state: 842 if self.get('prtctl4_pwren') == 'off': 843 self._usb_state = 'off' 844 elif self.get('usb_mux_sel1').startswith('dut'): 845 self._usb_state = 'dut' 846 else: 847 self._usb_state = 'host' 848 return self._usb_state 849