1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4# 5# Expects to be run in an environment with sudo and no interactive password 6# prompt, such as within the Chromium OS development chroot. 7 8import ast 9import logging 10import os 11import re 12import time 13import xmlrpclib 14 15from autotest_lib.client.common_lib import error 16from autotest_lib.server import utils as server_utils 17from autotest_lib.server.cros.servo import firmware_programmer 18 19# Time to wait when probing for a usb device, it takes on avg 17 seconds 20# to do a full probe. 21_USB_PROBE_TIMEOUT = 40 22 23 24def _extract_image_from_tarball(tarball, dest_dir, image_candidates): 25 """Try extracting the image_candidates from the tarball. 26 27 @param tarball: The path of the tarball. 28 @param dest_path: The path of the destination. 29 @param image_candidates: A tuple of the paths of image candidates. 30 31 @return: The first path from the image candidates, which succeeds, or None 32 if all the image candidates fail. 33 """ 34 for image in image_candidates: 35 status = server_utils.system( 36 ('tar xf %s -C %s %s' % (tarball, dest_dir, image)), 37 timeout=60, ignore_status=True) 38 if status == 0: 39 return image 40 return None 41 42 43class _PowerStateController(object): 44 45 """Class to provide board-specific power operations. 46 47 This class is responsible for "power on" and "power off" 48 operations that can operate without making assumptions in 49 advance about board state. It offers an interface that 50 abstracts out the different sequences required for different 51 board types. 52 53 """ 54 55 # Constants acceptable to be passed for the `rec_mode` parameter 56 # to power_on(). 57 # 58 # REC_ON: Boot the DUT in recovery mode, i.e. boot from USB or 59 # SD card. 60 # REC_OFF: Boot in normal mode, i.e. boot from internal storage. 61 62 REC_ON = 'rec' 63 REC_OFF = 'on' 64 REC_ON_FORCE_MRC = 'rec_force_mrc' 65 66 # Delay in seconds needed between asserting and de-asserting 67 # warm reset. 68 _RESET_HOLD_TIME = 0.5 69 70 def __init__(self, servo): 71 """Initialize the power state control. 72 73 @param servo Servo object providing the underlying `set` and `get` 74 methods for the target controls. 75 76 """ 77 self._servo = servo 78 79 def reset(self): 80 """Force the DUT to reset. 81 82 The DUT is guaranteed to be on at the end of this call, 83 regardless of its previous state, provided that there is 84 working OS software. This also guarantees that the EC has 85 been restarted. 86 87 """ 88 self._servo.set_nocheck('power_state', 'reset') 89 90 def warm_reset(self): 91 """Apply warm reset to the DUT. 92 93 This asserts, then de-asserts the 'warm_reset' signal. 94 Generally, this causes the board to restart. 95 96 """ 97 self._servo.set_get_all(['warm_reset:on', 98 'sleep:%.4f' % self._RESET_HOLD_TIME, 99 'warm_reset:off']) 100 101 def power_off(self): 102 """Force the DUT to power off. 103 104 The DUT is guaranteed to be off at the end of this call, 105 regardless of its previous state, provided that there is 106 working EC and boot firmware. There is no requirement for 107 working OS software. 108 109 """ 110 self._servo.set_nocheck('power_state', 'off') 111 112 def power_on(self, rec_mode=REC_OFF): 113 """Force the DUT to power on. 114 115 Prior to calling this function, the DUT must be powered off, 116 e.g. with a call to `power_off()`. 117 118 At power on, recovery mode is set as specified by the 119 corresponding argument. When booting with recovery mode on, it 120 is the caller's responsibility to unplug/plug in a bootable 121 external storage device. 122 123 If the DUT requires a delay after powering on but before 124 processing inputs such as USB stick insertion, the delay is 125 handled by this method; the caller is not responsible for such 126 delays. 127 128 @param rec_mode Setting of recovery mode to be applied at 129 power on. default: REC_OFF aka 'off' 130 131 """ 132 self._servo.set_nocheck('power_state', rec_mode) 133 134 135class _Uart(object): 136 """Class to capture CPU/EC UART streams.""" 137 def __init__(self, servo): 138 self._servo = servo 139 self._streams = [] 140 self._logs_dir = None 141 142 def start_capture(self): 143 """Start capturing Uart streams.""" 144 logging.debug('Start capturing CPU/EC UART.') 145 self._servo.set('cpu_uart_capture', 'on') 146 self._streams.append(('cpu_uart_stream', 'cpu_uart.log')) 147 try: 148 self._servo.set('ec_uart_capture', 'on') 149 self._streams.append(('ec_uart_stream', 'ec_uart.log')) 150 except error.TestFail as err: 151 if 'No control named' in str(err): 152 logging.debug('The servod is too old that ec_uart_capture not ' 153 'supported.') 154 155 def dump(self): 156 """Dump UART streams to log files accordingly.""" 157 if not self._logs_dir: 158 return 159 160 for stream, logfile in self._streams: 161 logfile_fullname = os.path.join(self._logs_dir, logfile) 162 try: 163 content = self._servo.get(stream) 164 except Exception as err: 165 logging.warn('Failed to get UART log for %s: %s', stream, err) 166 continue 167 168 # The UART stream may contain non-printable characters, and servo 169 # returns it in string representation. We use `ast.leteral_eval` 170 # to revert it back. 171 with open(logfile_fullname, 'a') as fd: 172 fd.write(ast.literal_eval(content)) 173 174 def stop_capture(self): 175 """Stop capturing UART streams.""" 176 logging.debug('Stop capturing CPU/EC UART.') 177 for uart in ('cpu_uart_capture', 'ec_uart_capture'): 178 try: 179 self._servo.set(uart, 'off') 180 except error.TestFail as err: 181 if 'No control named' in str(err): 182 logging.debug('The servod is too old that %s not ' 183 'supported.', uart) 184 except Exception as err: 185 logging.warn('Failed to stop UART logging for %s: %s', uart, 186 err) 187 188 @property 189 def logs_dir(self): 190 """Return the directory to save UART logs.""" 191 return self._logs_dir 192 193 @logs_dir.setter 194 def logs_dir(self, a_dir): 195 """Set directory to save UART logs. 196 197 @param a_dir String of logs directory name.""" 198 self._logs_dir = a_dir 199 200 201class Servo(object): 202 203 """Manages control of a Servo board. 204 205 Servo is a board developed by hardware group to aide in the debug and 206 control of various partner devices. Servo's features include the simulation 207 of pressing the power button, closing the lid, and pressing Ctrl-d. This 208 class manages setting up and communicating with a servo demon (servod) 209 process. It provides both high-level functions for common servo tasks and 210 low-level functions for directly setting and reading gpios. 211 212 """ 213 214 # Power button press delays in seconds. 215 # 216 # The EC specification says that 8.0 seconds should be enough 217 # for the long power press. However, some platforms need a bit 218 # more time. Empirical testing has found these requirements: 219 # Alex: 8.2 seconds 220 # ZGB: 8.5 seconds 221 # The actual value is set to the largest known necessary value. 222 # 223 # TODO(jrbarnette) Being generous is the right thing to do for 224 # existing platforms, but if this code is to be used for 225 # qualification of new hardware, we should be less generous. 226 SHORT_DELAY = 0.1 227 228 # Maximum number of times to re-read power button on release. 229 GET_RETRY_MAX = 10 230 231 # Delays to deal with DUT state transitions. 232 SLEEP_DELAY = 6 233 BOOT_DELAY = 10 234 235 # Default minimum time interval between 'press' and 'release' 236 # keyboard events. 237 SERVO_KEY_PRESS_DELAY = 0.1 238 239 # Time to toggle recovery switch on and off. 240 REC_TOGGLE_DELAY = 0.1 241 242 # Time to toggle development switch on and off. 243 DEV_TOGGLE_DELAY = 0.1 244 245 # Time between an usb disk plugged-in and detected in the system. 246 USB_DETECTION_DELAY = 10 247 # Time to keep USB power off before and after USB mux direction is changed 248 USB_POWEROFF_DELAY = 2 249 250 # Time to wait before timing out on servo initialization. 251 INIT_TIMEOUT_SECS = 10 252 253 254 def __init__(self, servo_host, servo_serial=None): 255 """Sets up the servo communication infrastructure. 256 257 @param servo_host: A ServoHost object representing 258 the host running servod. 259 @param servo_serial: Serial number of the servo board. 260 """ 261 # TODO(fdeng): crbug.com/298379 262 # We should move servo_host object out of servo object 263 # to minimize the dependencies on the rest of Autotest. 264 self._servo_host = servo_host 265 self._servo_serial = servo_serial 266 self._server = servo_host.get_servod_server_proxy() 267 self._power_state = _PowerStateController(self) 268 self._uart = _Uart(self) 269 self._usb_state = None 270 self._programmer = None 271 272 273 @property 274 def servo_serial(self): 275 """Returns the serial number of the servo board.""" 276 return self._servo_serial 277 278 279 def get_power_state_controller(self): 280 """Return the power state controller for this Servo. 281 282 The power state controller provides board-independent 283 interfaces for reset, power-on, power-off operations. 284 285 """ 286 return self._power_state 287 288 289 def initialize_dut(self, cold_reset=False): 290 """Initializes a dut for testing purposes. 291 292 This sets various servo signals back to default values 293 appropriate for the target board. By default, if the DUT 294 is already on, it stays on. If the DUT is powered off 295 before initialization, its state afterward is unspecified. 296 297 Rationale: Basic initialization of servo sets the lid open, 298 when there is a lid. This operation won't affect powered on 299 units; however, setting the lid open may power on a unit 300 that's off, depending on the board type and previous state 301 of the device. 302 303 If `cold_reset` is a true value, the DUT and its EC will be 304 reset, and the DUT rebooted in normal mode. 305 306 @param cold_reset If True, cold reset the device after 307 initialization. 308 """ 309 self._server.hwinit() 310 self.set('usb_mux_oe1', 'on') 311 self._usb_state = None 312 self.switch_usbkey('off') 313 self._uart.start_capture() 314 if cold_reset: 315 self._power_state.reset() 316 logging.debug('Servo initialized, version is %s', 317 self._server.get_version()) 318 319 320 def is_localhost(self): 321 """Is the servod hosted locally? 322 323 Returns: 324 True if local hosted; otherwise, False. 325 """ 326 return self._servo_host.is_localhost() 327 328 329 def power_long_press(self): 330 """Simulate a long power button press.""" 331 # After a long power press, the EC may ignore the next power 332 # button press (at least on Alex). To guarantee that this 333 # won't happen, we need to allow the EC one second to 334 # collect itself. 335 self._server.power_long_press() 336 337 338 def power_normal_press(self): 339 """Simulate a normal power button press.""" 340 self._server.power_normal_press() 341 342 343 def power_short_press(self): 344 """Simulate a short power button press.""" 345 self._server.power_short_press() 346 347 348 def power_key(self, press_secs=''): 349 """Simulate a power button press. 350 351 @param press_secs : Str. Time to press key. 352 """ 353 self._server.power_key(press_secs) 354 355 356 def lid_open(self): 357 """Simulate opening the lid and raise exception if all attempts fail""" 358 self.set('lid_open', 'yes') 359 360 361 def lid_close(self): 362 """Simulate closing the lid and raise exception if all attempts fail 363 364 Waits 6 seconds to ensure the device is fully asleep before returning. 365 """ 366 self.set('lid_open', 'no') 367 time.sleep(Servo.SLEEP_DELAY) 368 369 def volume_up(self, timeout=300): 370 """Simulate pushing the volume down button. 371 372 @param timeout: Timeout for setting the volume. 373 """ 374 self.set_get_all(['volume_up:yes', 375 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 376 'volume_up:no']) 377 # we need to wait for commands to take effect before moving on 378 time_left = float(timeout) 379 while time_left > 0.0: 380 value = self.get('volume_up') 381 if value == 'no': 382 return 383 time.sleep(self.SHORT_DELAY) 384 time_left = time_left - self.SHORT_DELAY 385 raise error.TestFail("Failed setting volume_up to no") 386 387 def volume_down(self, timeout=300): 388 """Simulate pushing the volume down button. 389 390 @param timeout: Timeout for setting the volume. 391 """ 392 self.set_get_all(['volume_down:yes', 393 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 394 'volume_down:no']) 395 # we need to wait for commands to take effect before moving on 396 time_left = float(timeout) 397 while time_left > 0.0: 398 value = self.get('volume_down') 399 if value == 'no': 400 return 401 time.sleep(self.SHORT_DELAY) 402 time_left = time_left - self.SHORT_DELAY 403 raise error.TestFail("Failed setting volume_down to no") 404 405 def ctrl_d(self, press_secs=''): 406 """Simulate Ctrl-d simultaneous button presses. 407 408 @param press_secs : Str. Time to press key. 409 """ 410 self._server.ctrl_d(press_secs) 411 412 413 def ctrl_u(self): 414 """Simulate Ctrl-u simultaneous button presses. 415 416 @param press_secs : Str. Time to press key. 417 """ 418 self._server.ctrl_u() 419 420 421 def ctrl_enter(self, press_secs=''): 422 """Simulate Ctrl-enter simultaneous button presses. 423 424 @param press_secs : Str. Time to press key. 425 """ 426 self._server.ctrl_enter(press_secs) 427 428 429 def d_key(self, press_secs=''): 430 """Simulate Enter key button press. 431 432 @param press_secs : Str. Time to press key. 433 """ 434 self._server.d_key(press_secs) 435 436 437 def ctrl_key(self, press_secs=''): 438 """Simulate Enter key button press. 439 440 @param press_secs : Str. Time to press key. 441 """ 442 self._server.ctrl_key(press_secs) 443 444 445 def enter_key(self, press_secs=''): 446 """Simulate Enter key button press. 447 448 @param press_secs : Str. Time to press key. 449 """ 450 self._server.enter_key(press_secs) 451 452 453 def refresh_key(self, press_secs=''): 454 """Simulate Refresh key (F3) button press. 455 456 @param press_secs : Str. Time to press key. 457 """ 458 self._server.refresh_key(press_secs) 459 460 461 def ctrl_refresh_key(self, press_secs=''): 462 """Simulate Ctrl and Refresh (F3) simultaneous press. 463 464 This key combination is an alternative of Space key. 465 466 @param press_secs : Str. Time to press key. 467 """ 468 self._server.ctrl_refresh_key(press_secs) 469 470 471 def imaginary_key(self, press_secs=''): 472 """Simulate imaginary key button press. 473 474 Maps to a key that doesn't physically exist. 475 476 @param press_secs : Str. Time to press key. 477 """ 478 self._server.imaginary_key(press_secs) 479 480 481 def sysrq_x(self, press_secs=''): 482 """Simulate Alt VolumeUp X simulataneous press. 483 484 This key combination is the kernel system request (sysrq) X. 485 486 @param press_secs : Str. Time to press key. 487 """ 488 self._server.sysrq_x(press_secs) 489 490 491 def toggle_recovery_switch(self): 492 """Toggle recovery switch on and off.""" 493 self.enable_recovery_mode() 494 time.sleep(self.REC_TOGGLE_DELAY) 495 self.disable_recovery_mode() 496 497 498 def enable_recovery_mode(self): 499 """Enable recovery mode on device.""" 500 self.set('rec_mode', 'on') 501 502 503 def disable_recovery_mode(self): 504 """Disable recovery mode on device.""" 505 self.set('rec_mode', 'off') 506 507 508 def toggle_development_switch(self): 509 """Toggle development switch on and off.""" 510 self.enable_development_mode() 511 time.sleep(self.DEV_TOGGLE_DELAY) 512 self.disable_development_mode() 513 514 515 def enable_development_mode(self): 516 """Enable development mode on device.""" 517 self.set('dev_mode', 'on') 518 519 520 def disable_development_mode(self): 521 """Disable development mode on device.""" 522 self.set('dev_mode', 'off') 523 524 def boot_devmode(self): 525 """Boot a dev-mode device that is powered off.""" 526 self.power_short_press() 527 self.pass_devmode() 528 529 530 def pass_devmode(self): 531 """Pass through boot screens in dev-mode.""" 532 time.sleep(Servo.BOOT_DELAY) 533 self.ctrl_d() 534 time.sleep(Servo.BOOT_DELAY) 535 536 537 def get_board(self): 538 """Get the board connected to servod.""" 539 return self._server.get_board() 540 541 542 def get_base_board(self): 543 """Get the board of the base connected to servod.""" 544 try: 545 return self._server.get_base_board() 546 except xmlrpclib.Fault as e: 547 # TODO(waihong): Remove the following compatibility check when 548 # the new versions of hdctools are deployed. 549 if 'not supported' in str(e): 550 logging.warning('The servod is too old that get_base_board ' 551 'not supported.') 552 return '' 553 raise 554 555 556 def get_ec_active_copy(self): 557 """Get the active copy of the EC image.""" 558 return self.get('ec_active_copy') 559 560 561 def _get_xmlrpclib_exception(self, xmlexc): 562 """Get meaningful exception string from xmlrpc. 563 564 Args: 565 xmlexc: xmlrpclib.Fault object 566 567 xmlrpclib.Fault.faultString has the following format: 568 569 <type 'exception type'>:'actual error message' 570 571 Parse and return the real exception from the servod side instead of the 572 less meaningful one like, 573 <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no 574 attribute 'hw_driver'"> 575 576 Returns: 577 string of underlying exception raised in servod. 578 """ 579 return re.sub('^.*>:', '', xmlexc.faultString) 580 581 582 def get(self, gpio_name): 583 """Get the value of a gpio from Servod. 584 585 @param gpio_name Name of the gpio. 586 """ 587 assert gpio_name 588 try: 589 return self._server.get(gpio_name) 590 except xmlrpclib.Fault as e: 591 err_msg = "Getting '%s' :: %s" % \ 592 (gpio_name, self._get_xmlrpclib_exception(e)) 593 raise error.TestFail(err_msg) 594 595 596 def set(self, gpio_name, gpio_value): 597 """Set and check the value of a gpio using Servod. 598 599 @param gpio_name Name of the gpio. 600 @param gpio_value New setting for the gpio. 601 """ 602 self.set_nocheck(gpio_name, gpio_value) 603 retry_count = Servo.GET_RETRY_MAX 604 while gpio_value != self.get(gpio_name) and retry_count: 605 logging.warning("%s != %s, retry %d", gpio_name, gpio_value, 606 retry_count) 607 retry_count -= 1 608 time.sleep(Servo.SHORT_DELAY) 609 if not retry_count: 610 assert gpio_value == self.get(gpio_name), \ 611 'Servo failed to set %s to %s' % (gpio_name, gpio_value) 612 613 614 def set_nocheck(self, gpio_name, gpio_value): 615 """Set the value of a gpio using Servod. 616 617 @param gpio_name Name of the gpio. 618 @param gpio_value New setting for the gpio. 619 """ 620 assert gpio_name and gpio_value 621 logging.info('Setting %s to %r', gpio_name, gpio_value) 622 try: 623 self._server.set(gpio_name, gpio_value) 624 except xmlrpclib.Fault as e: 625 err_msg = "Setting '%s' to %r :: %s" % \ 626 (gpio_name, gpio_value, self._get_xmlrpclib_exception(e)) 627 raise error.TestFail(err_msg) 628 629 630 def set_get_all(self, controls): 631 """Set &| get one or more control values. 632 633 @param controls: list of strings, controls to set &| get. 634 635 @raise: error.TestError in case error occurs setting/getting values. 636 """ 637 rv = [] 638 try: 639 logging.info('Set/get all: %s', str(controls)) 640 rv = self._server.set_get_all(controls) 641 except xmlrpclib.Fault as e: 642 # TODO(waihong): Remove the following backward compatibility when 643 # the new versions of hdctools are deployed. 644 if 'not supported' in str(e): 645 logging.warning('The servod is too old that set_get_all ' 646 'not supported. Use set and get instead.') 647 for control in controls: 648 if ':' in control: 649 (name, value) = control.split(':') 650 if name == 'sleep': 651 time.sleep(float(value)) 652 else: 653 self.set_nocheck(name, value) 654 rv.append(True) 655 else: 656 rv.append(self.get(name)) 657 else: 658 err_msg = "Problem with '%s' :: %s" % \ 659 (controls, self._get_xmlrpclib_exception(e)) 660 raise error.TestFail(err_msg) 661 return rv 662 663 664 # TODO(waihong) It may fail if multiple servo's are connected to the same 665 # host. Should look for a better way, like the USB serial name, to identify 666 # the USB device. 667 # TODO(sbasi) Remove this code from autoserv once firmware tests have been 668 # updated. 669 def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT): 670 """Probe the USB disk device plugged-in the servo from the host side. 671 672 It uses servod to discover if there is a usb device attached to it. 673 674 @param timeout The timeout period when probing for the usb host device. 675 676 @return: String of USB disk path (e.g. '/dev/sdb') or None. 677 """ 678 # Set up Servo's usb mux. 679 self.switch_usbkey('host') 680 return self._server.probe_host_usb_dev(timeout) or None 681 682 683 def image_to_servo_usb(self, image_path=None, 684 make_image_noninteractive=False): 685 """Install an image to the USB key plugged into the servo. 686 687 This method may copy any image to the servo USB key including a 688 recovery image or a test image. These images are frequently used 689 for test purposes such as restoring a corrupted image or conducting 690 an upgrade of ec/fw/kernel as part of a test of a specific image part. 691 692 @param image_path Path on the host to the recovery image. 693 @param make_image_noninteractive Make the recovery image 694 noninteractive, therefore the DUT 695 will reboot automatically after 696 installation. 697 """ 698 # We're about to start plugging/unplugging the USB key. We 699 # don't know the state of the DUT, or what it might choose 700 # to do to the device after hotplug. To avoid surprises, 701 # force the DUT to be off. 702 self._server.hwinit() 703 self._power_state.power_off() 704 705 # Set up Servo's usb mux. 706 self.switch_usbkey('host') 707 if image_path: 708 logging.info('Searching for usb device and copying image to it. ' 709 'Please wait a few minutes...') 710 if not self._server.download_image_to_usb(image_path): 711 logging.error('Failed to transfer requested image to USB. ' 712 'Please take a look at Servo Logs.') 713 raise error.AutotestError('Download image to usb failed.') 714 if make_image_noninteractive: 715 logging.info('Making image noninteractive') 716 if not self._server.make_image_noninteractive(): 717 logging.error('Failed to make image noninteractive. ' 718 'Please take a look at Servo Logs.') 719 720 def boot_in_recovery_mode(self): 721 """Boot host DUT in recovery mode.""" 722 self._power_state.power_on(rec_mode=self._power_state.REC_ON) 723 self.switch_usbkey('dut') 724 725 726 def install_recovery_image(self, image_path=None, 727 make_image_noninteractive=False): 728 """Install the recovery image specified by the path onto the DUT. 729 730 This method uses google recovery mode to install a recovery image 731 onto a DUT through the use of a USB stick that is mounted on a servo 732 board specified by the usb_dev. If no image path is specified 733 we use the recovery image already on the usb image. 734 735 @param image_path: Path on the host to the recovery image. 736 @param make_image_noninteractive: Make the recovery image 737 noninteractive, therefore the DUT will reboot automatically 738 after installation. 739 """ 740 self.image_to_servo_usb(image_path, make_image_noninteractive) 741 self.boot_in_recovery_mode() 742 743 744 def _scp_image(self, image_path): 745 """Copy image to the servo host. 746 747 When programming a firmware image on the DUT, the image must be 748 located on the host to which the servo device is connected. Sometimes 749 servo is controlled by a remote host, in this case the image needs to 750 be transferred to the remote host. 751 752 @param image_path: a string, name of the firmware image file to be 753 transferred. 754 @return: a string, full path name of the copied file on the remote. 755 """ 756 757 dest_path = os.path.join('/tmp', os.path.basename(image_path)) 758 self._servo_host.send_file(image_path, dest_path) 759 return dest_path 760 761 762 def system(self, command, timeout=3600): 763 """Execute the passed in command on the servod host. 764 765 @param command Command to be executed. 766 @param timeout Maximum number of seconds of runtime allowed. Default to 767 1 hour. 768 """ 769 logging.info('Will execute on servo host: %s', command) 770 self._servo_host.run(command, timeout=timeout) 771 772 773 def system_output(self, command, timeout=3600, 774 ignore_status=False, args=()): 775 """Execute the passed in command on the servod host, return stdout. 776 777 @param command a string, the command to execute 778 @param timeout an int, max number of seconds to wait til command 779 execution completes. Default to 1 hour. 780 @param ignore_status a Boolean, if true - ignore command's nonzero exit 781 status, otherwise an exception will be thrown 782 @param args a tuple of strings, each becoming a separate command line 783 parameter for the command 784 @return command's stdout as a string. 785 """ 786 return self._servo_host.run(command, timeout=timeout, 787 ignore_status=ignore_status, 788 args=args).stdout.strip() 789 790 791 def get_servo_version(self): 792 """Get the version of the servo, e.g., servo_v2 or servo_v3. 793 794 @return: The version of the servo. 795 796 """ 797 return self._server.get_version() 798 799 800 def _initialize_programmer(self, rw_only=False): 801 """Initialize the firmware programmer. 802 803 @param rw_only: True to initialize a programmer which only 804 programs the RW portions. 805 """ 806 if self._programmer: 807 return 808 # Initialize firmware programmer 809 servo_version = self.get_servo_version() 810 if servo_version.startswith('servo_v2'): 811 self._programmer = firmware_programmer.ProgrammerV2(self) 812 self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self) 813 # Both servo v3 and v4 use the same programming methods so just leverage 814 # ProgrammerV3 for servo v4 as well. 815 elif (servo_version.startswith('servo_v3') or 816 servo_version.startswith('servo_v4')): 817 self._programmer = firmware_programmer.ProgrammerV3(self) 818 self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self) 819 else: 820 raise error.TestError( 821 'No firmware programmer for servo version: %s' % 822 servo_version) 823 824 825 def program_bios(self, image, rw_only=False): 826 """Program bios on DUT with given image. 827 828 @param image: a string, file name of the BIOS image to program 829 on the DUT. 830 @param rw_only: True to only program the RW portion of BIOS. 831 832 """ 833 self._initialize_programmer() 834 if not self.is_localhost(): 835 image = self._scp_image(image) 836 if rw_only: 837 self._programmer_rw.program_bios(image) 838 else: 839 self._programmer.program_bios(image) 840 841 842 def program_ec(self, image, rw_only=False): 843 """Program ec on DUT with given image. 844 845 @param image: a string, file name of the EC image to program 846 on the DUT. 847 @param rw_only: True to only program the RW portion of EC. 848 849 """ 850 self._initialize_programmer() 851 if not self.is_localhost(): 852 image = self._scp_image(image) 853 if rw_only: 854 self._programmer_rw.program_ec(image) 855 else: 856 self._programmer.program_ec(image) 857 858 859 def _reprogram(self, tarball_path, firmware_name, image_candidates, 860 rw_only): 861 """Helper function to reprogram firmware for EC or BIOS. 862 863 @param tarball_path: The path of the downloaded build tarball. 864 @param: firmware_name: either 'EC' or 'BIOS'. 865 @param image_candidates: A tuple of the paths of image candidates. 866 @param rw_only: True to only install firmware to its RW portions. Keep 867 the RO portions unchanged. 868 869 @raise: TestError if cannot extract firmware from the tarball. 870 """ 871 dest_dir = os.path.dirname(tarball_path) 872 image = _extract_image_from_tarball(tarball_path, dest_dir, 873 image_candidates) 874 if not image: 875 if firmware_name == 'EC': 876 logging.info('Not a Chrome EC, ignore re-programming it') 877 return 878 else: 879 raise error.TestError('Failed to extract the %s image from ' 880 'tarball' % firmware_name) 881 882 logging.info('Will re-program %s %snow', firmware_name, 883 'RW ' if rw_only else '') 884 885 if firmware_name == 'EC': 886 self.program_ec(os.path.join(dest_dir, image), rw_only) 887 else: 888 self.program_bios(os.path.join(dest_dir, image), rw_only) 889 890 891 def program_firmware(self, model, tarball_path, rw_only=False): 892 """Program firmware (EC, if applied, and BIOS) of the DUT. 893 894 @param model: The DUT model name. 895 @param tarball_path: The path of the downloaded build tarball. 896 @param rw_only: True to only install firmware to its RW portions. Keep 897 the RO portions unchanged. 898 """ 899 ap_image_candidates = ('image.bin', 'image-%s.bin' % model) 900 ec_image_candidates = ('ec.bin', '%s/ec.bin' % model) 901 902 self._reprogram(tarball_path, 'EC', ec_image_candidates, rw_only) 903 self._reprogram(tarball_path, 'BIOS', ap_image_candidates, rw_only) 904 905 self.get_power_state_controller().reset() 906 time.sleep(Servo.BOOT_DELAY) 907 908 909 def _switch_usbkey_power(self, power_state, detection_delay=False): 910 """Switch usbkey power. 911 912 This function switches usbkey power by setting the value of 913 'prtctl4_pwren'. If the power is already in the 914 requested state, this function simply returns. 915 916 @param power_state: A string, 'on' or 'off'. 917 @param detection_delay: A boolean value, if True, sleep 918 for |USB_DETECTION_DELAY| after switching 919 the power on. 920 """ 921 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 922 # handle beaglebones that haven't yet updated and have the 923 # safe_switch_usbkey_power RPC. I'll remove this once all beaglebones 924 # have been updated and also think about a better way to handle 925 # situations like this. 926 try: 927 self._server.safe_switch_usbkey_power(power_state) 928 except Exception: 929 self.set('prtctl4_pwren', power_state) 930 if power_state == 'off': 931 time.sleep(self.USB_POWEROFF_DELAY) 932 elif detection_delay: 933 time.sleep(self.USB_DETECTION_DELAY) 934 935 936 def switch_usbkey(self, usb_state): 937 """Connect USB flash stick to either host or DUT, or turn USB port off. 938 939 This function switches the servo multiplexer to provide electrical 940 connection between the USB port J3 and either host or DUT side. It 941 can also be used to turn the USB port off. 942 943 Switching to 'dut' or 'host' is accompanied by powercycling 944 of the USB stick, because it sometimes gets wedged if the mux 945 is switched while the stick power is on. 946 947 @param usb_state: A string, one of 'dut', 'host', or 'off'. 948 'dut' and 'host' indicate which side the 949 USB flash device is required to be connected to. 950 'off' indicates turning the USB port off. 951 952 @raise: error.TestError in case the parameter is not 'dut' 953 'host', or 'off'. 954 """ 955 if self.get_usbkey_direction() == usb_state: 956 return 957 958 if usb_state == 'off': 959 self._switch_usbkey_power('off') 960 self._usb_state = usb_state 961 return 962 elif usb_state == 'host': 963 mux_direction = 'servo_sees_usbkey' 964 elif usb_state == 'dut': 965 mux_direction = 'dut_sees_usbkey' 966 else: 967 raise error.TestError('Unknown USB state request: %s' % usb_state) 968 969 self._switch_usbkey_power('off') 970 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 971 # handle beaglebones that haven't yet updated and have the 972 # safe_switch_usbkey RPC. I'll remove this once all beaglebones have 973 # been updated and also think about a better way to handle situations 974 # like this. 975 try: 976 self._server.safe_switch_usbkey(mux_direction) 977 except Exception: 978 self.set('usb_mux_sel1', mux_direction) 979 time.sleep(self.USB_POWEROFF_DELAY) 980 self._switch_usbkey_power('on', usb_state == 'host') 981 self._usb_state = usb_state 982 983 984 def get_usbkey_direction(self): 985 """Get which side USB is connected to or 'off' if usb power is off. 986 987 @return: A string, one of 'dut', 'host', or 'off'. 988 """ 989 if not self._usb_state: 990 if self.get('prtctl4_pwren') == 'off': 991 self._usb_state = 'off' 992 elif self.get('usb_mux_sel1').startswith('dut'): 993 self._usb_state = 'dut' 994 else: 995 self._usb_state = 'host' 996 return self._usb_state 997 998 999 def set_servo_v4_role(self, role): 1000 """Set the power role of servo v4, either 'src' or 'snk'. 1001 1002 It does nothing if not a servo v4. 1003 1004 @param role: Power role for DUT port on servo v4, either 'src' or 'snk'. 1005 """ 1006 servo_version = self.get_servo_version() 1007 if servo_version.startswith('servo_v4'): 1008 value = self.get('servo_v4_role') 1009 if value != role: 1010 self.set_nocheck('servo_v4_role', role) 1011 else: 1012 logging.debug('Already in the role: %s.', role) 1013 else: 1014 logging.debug('Not a servo v4, unable to set role to %s.', role) 1015 1016 1017 @property 1018 def uart_logs_dir(self): 1019 """Return the directory to save UART logs.""" 1020 return self._uart.logs_dir if self._uart else "" 1021 1022 1023 @uart_logs_dir.setter 1024 def uart_logs_dir(self, logs_dir): 1025 """Set directory to save UART logs. 1026 1027 @param logs_dir String of directory name.""" 1028 if self._uart: 1029 self._uart.logs_dir = logs_dir 1030 1031 1032 def close(self): 1033 """Close the servo object.""" 1034 if self._uart: 1035 self._uart.stop_capture() 1036 self._uart.dump() 1037 self._uart = None 1038