1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4# 5# Expects to be run in an environment with sudo and no interactive password 6# prompt, such as within the Chromium OS development chroot. 7 8import ast 9import logging 10import os 11import re 12import socket 13import time 14import xmlrpclib 15 16from autotest_lib.client.common_lib import error 17from autotest_lib.client.common_lib import lsbrelease_utils 18from autotest_lib.server import utils as server_utils 19from autotest_lib.server.cros.servo import firmware_programmer 20from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig 21 22# Time to wait when probing for a usb device, it takes on avg 17 seconds 23# to do a full probe. 24_USB_PROBE_TIMEOUT = 40 25 26 27# Regex to match XMLRPC errors due to a servod control not existing. 28NO_CONTROL_RE = re.compile(r'No control named (\w*\.?\w*)') 29 30 31# The minimum voltage on the charger port on servo v4 that is expected. This is 32# to query whether a charger is plugged into servo v4 and thus pd control 33# capabilities can be used. 34V4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400 35 36class ControlUnavailableError(error.TestFail): 37 """Custom error class to indicate a control is unavailable on servod.""" 38 pass 39 40 41def _extract_image_from_tarball(tarball, dest_dir, image_candidates): 42 """Try extracting the image_candidates from the tarball. 43 44 @param tarball: The path of the tarball. 45 @param dest_path: The path of the destination. 46 @param image_candidates: A tuple of the paths of image candidates. 47 48 @return: The first path from the image candidates, which succeeds, or None 49 if all the image candidates fail. 50 """ 51 52 # Create the firmware_name subdirectory if it doesn't exist 53 if not os.path.exists(dest_dir): 54 os.mkdir(dest_dir) 55 56 # Generate a list of all tarball files 57 tarball_files = server_utils.system_output( 58 ('tar tf %s' % tarball), timeout=120, ignore_status=True).splitlines() 59 60 # Check if image candidates are in the list of tarball files 61 for image in image_candidates: 62 if image in tarball_files: 63 # Extract and return the first image candidate found 64 status = server_utils.system( 65 ('tar xf %s -C %s %s' % (tarball, dest_dir, image)), 66 timeout=120, ignore_status=True) 67 if status == 0: 68 return image 69 return None 70 71 72class _PowerStateController(object): 73 74 """Class to provide board-specific power operations. 75 76 This class is responsible for "power on" and "power off" 77 operations that can operate without making assumptions in 78 advance about board state. It offers an interface that 79 abstracts out the different sequences required for different 80 board types. 81 82 """ 83 # Constants acceptable to be passed for the `rec_mode` parameter 84 # to power_on(). 85 # 86 # REC_ON: Boot the DUT in recovery mode, i.e. boot from USB or 87 # SD card. 88 # REC_OFF: Boot in normal mode, i.e. boot from internal storage. 89 90 REC_ON = 'rec' 91 REC_OFF = 'on' 92 REC_ON_FORCE_MRC = 'rec_force_mrc' 93 94 # Delay in seconds needed between asserting and de-asserting 95 # warm reset. 96 _RESET_HOLD_TIME = 0.5 97 98 99 def __init__(self, servo): 100 """Initialize the power state control. 101 102 @param servo Servo object providing the underlying `set` and `get` 103 methods for the target controls. 104 105 """ 106 self._servo = servo 107 self.supported = self._servo.has_control('power_state') 108 if not self.supported: 109 logging.info('Servo setup does not support power-state operations. ' 110 'All power-state calls will lead to error.TestFail') 111 112 def _check_supported(self): 113 """Throw an error if dts mode control is not supported.""" 114 if not self.supported: 115 raise error.TestFail('power_state controls not supported') 116 117 def reset(self): 118 """Force the DUT to reset. 119 120 The DUT is guaranteed to be on at the end of this call, 121 regardless of its previous state, provided that there is 122 working OS software. This also guarantees that the EC has 123 been restarted. 124 125 """ 126 self._check_supported() 127 self._servo.set_nocheck('power_state', 'reset') 128 129 def warm_reset(self): 130 """Apply warm reset to the DUT. 131 132 This asserts, then de-asserts the 'warm_reset' signal. 133 Generally, this causes the board to restart. 134 135 """ 136 # TODO: warm_reset support has added to power_state.py. Once it 137 # available to labstation remove fallback method. 138 self._check_supported() 139 try: 140 self._servo.set_nocheck('power_state', 'warm_reset') 141 except error.TestFail as err: 142 logging.info("Fallback to warm_reset control method") 143 self._servo.set_get_all(['warm_reset:on', 144 'sleep:%.4f' % self._RESET_HOLD_TIME, 145 'warm_reset:off']) 146 def power_off(self): 147 """Force the DUT to power off. 148 149 The DUT is guaranteed to be off at the end of this call, 150 regardless of its previous state, provided that there is 151 working EC and boot firmware. There is no requirement for 152 working OS software. 153 154 """ 155 self._check_supported() 156 self._servo.set_nocheck('power_state', 'off') 157 158 def power_on(self, rec_mode=REC_OFF): 159 """Force the DUT to power on. 160 161 Prior to calling this function, the DUT must be powered off, 162 e.g. with a call to `power_off()`. 163 164 At power on, recovery mode is set as specified by the 165 corresponding argument. When booting with recovery mode on, it 166 is the caller's responsibility to unplug/plug in a bootable 167 external storage device. 168 169 If the DUT requires a delay after powering on but before 170 processing inputs such as USB stick insertion, the delay is 171 handled by this method; the caller is not responsible for such 172 delays. 173 174 @param rec_mode Setting of recovery mode to be applied at 175 power on. default: REC_OFF aka 'off' 176 177 """ 178 self._check_supported() 179 self._servo.set_nocheck('power_state', rec_mode) 180 181 182class _Uart(object): 183 """Class to capture UART streams of CPU, EC, Cr50, etc.""" 184 _UartToCapture = ('cpu', 'ec', 'cr50', 'servo_v4', 'servo_micro', 'usbpd') 185 186 def __init__(self, servo): 187 self._servo = servo 188 self._streams = [] 189 self.logs_dir = None 190 191 def _start_stop_capture(self, uart, start): 192 """Helper function to start/stop capturing on specified UART. 193 194 @param uart: The UART name to start/stop capturing. 195 @param start: True to start capturing, otherwise stop. 196 197 @returns True if the operation completes successfully. 198 False if the UART capturing is not supported or failed due to 199 an error. 200 """ 201 logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop', 202 uart) 203 uart_cmd = '%s_uart_capture' % uart 204 target_level = 'on' if start else 'off' 205 level = None 206 if self._servo.has_control(uart_cmd): 207 # Do our own implementation of set() here as not_applicable 208 # should also count as a valid control. 209 logging.debug('Trying to set %s to %s.', uart_cmd, target_level) 210 try: 211 self._servo.set_nocheck(uart_cmd, target_level) 212 level = self._servo.get(uart_cmd) 213 except error.TestFail as e: 214 # Any sort of test failure here should not stop the test. This 215 # is just to capture more output. Log and move on. 216 logging.warning('Failed to set %s to %s. %s. Ignoring.', 217 uart_cmd, target_level, str(e)) 218 if level == target_level: 219 logging.debug('Managed to set %s to %s.', uart_cmd, level) 220 else: 221 logging.debug('Failed to set %s to %s. Got %s.', uart_cmd, 222 target_level, level) 223 return level == target_level 224 225 def start_capture(self): 226 """Start capturing UART streams.""" 227 for uart in self._UartToCapture: 228 if self._start_stop_capture(uart, True): 229 self._streams.append(('%s_uart_stream' % uart, '%s_uart.log' % 230 uart)) 231 232 def dump(self): 233 """Dump UART streams to log files accordingly.""" 234 if not self.logs_dir: 235 return 236 237 for stream, logfile in self._streams: 238 logfile_fullname = os.path.join(self.logs_dir, logfile) 239 try: 240 content = self._servo.get(stream) 241 except Exception as err: 242 logging.warn('Failed to get UART log for %s: %s', stream, err) 243 continue 244 245 if content == 'not_applicable': 246 logging.warn('%s is not applicable', stream) 247 continue 248 249 # The UART stream may contain non-printable characters, and servo 250 # returns it in string representation. We use `ast.leteral_eval` 251 # to revert it back. 252 with open(logfile_fullname, 'a') as fd: 253 try: 254 fd.write(ast.literal_eval(content)) 255 except ValueError: 256 logging.exception('Invalid value for %s: %r', stream, 257 content) 258 259 def stop_capture(self): 260 """Stop capturing UART streams.""" 261 for uart in self._UartToCapture: 262 try: 263 self._start_stop_capture(uart, False) 264 except Exception as err: 265 logging.warn('Failed to stop UART logging for %s: %s', uart, 266 err) 267 268 269class Servo(object): 270 271 """Manages control of a Servo board. 272 273 Servo is a board developed by hardware group to aide in the debug and 274 control of various partner devices. Servo's features include the simulation 275 of pressing the power button, closing the lid, and pressing Ctrl-d. This 276 class manages setting up and communicating with a servo demon (servod) 277 process. It provides both high-level functions for common servo tasks and 278 low-level functions for directly setting and reading gpios. 279 280 """ 281 282 # Power button press delays in seconds. 283 # 284 # The EC specification says that 8.0 seconds should be enough 285 # for the long power press. However, some platforms need a bit 286 # more time. Empirical testing has found these requirements: 287 # Alex: 8.2 seconds 288 # ZGB: 8.5 seconds 289 # The actual value is set to the largest known necessary value. 290 # 291 # TODO(jrbarnette) Being generous is the right thing to do for 292 # existing platforms, but if this code is to be used for 293 # qualification of new hardware, we should be less generous. 294 SHORT_DELAY = 0.1 295 296 # Maximum number of times to re-read power button on release. 297 GET_RETRY_MAX = 10 298 299 # Delays to deal with DUT state transitions. 300 SLEEP_DELAY = 6 301 BOOT_DELAY = 10 302 303 # Default minimum time interval between 'press' and 'release' 304 # keyboard events. 305 SERVO_KEY_PRESS_DELAY = 0.1 306 307 # Time to toggle recovery switch on and off. 308 REC_TOGGLE_DELAY = 0.1 309 310 # Time to toggle development switch on and off. 311 DEV_TOGGLE_DELAY = 0.1 312 313 # Time between an usb disk plugged-in and detected in the system. 314 USB_DETECTION_DELAY = 10 315 # Time to keep USB power off before and after USB mux direction is changed 316 USB_POWEROFF_DELAY = 2 317 318 # Time to wait before timing out on servo initialization. 319 INIT_TIMEOUT_SECS = 10 320 321 322 def __init__(self, servo_host, servo_serial=None): 323 """Sets up the servo communication infrastructure. 324 325 @param servo_host: A ServoHost object representing 326 the host running servod. 327 @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost 328 @param servo_serial: Serial number of the servo board. 329 """ 330 # TODO(fdeng): crbug.com/298379 331 # We should move servo_host object out of servo object 332 # to minimize the dependencies on the rest of Autotest. 333 self._servo_host = servo_host 334 self._servo_serial = servo_serial 335 self._server = servo_host.get_servod_server_proxy() 336 self._servo_type = self.get_servo_version() 337 self._power_state = _PowerStateController(self) 338 self._uart = _Uart(self) 339 self._usb_state = None 340 self._programmer = None 341 self._prev_log_inode = None 342 self._prev_log_size = 0 343 344 @property 345 def servo_serial(self): 346 """Returns the serial number of the servo board.""" 347 return self._servo_serial 348 349 def rotate_servod_logs(self, filename=None, directory=None): 350 """Save the latest servod log into a local directory, then rotate logs. 351 352 The files will be <filename>.DEBUG, <filename>.INFO, <filename>.WARNING, 353 or just <filename>.log if not using split level logging. 354 355 @param filename: local filename prefix (no file extension) to use. 356 If None, rotate log but don't save it. 357 @param directory: local directory to save logs into (if unset, use cwd) 358 """ 359 if self.is_localhost(): 360 # Local servod usually runs without log-dir, so can't be collected. 361 # TODO(crbug.com/1011516): allow localhost when rotation is enabled 362 return 363 364 log_dir = '/var/log/servod_%s' % self._servo_host.servo_port 365 366 if filename: 367 logging.info("Saving servod logs: %s/%s.*", directory or '.', 368 filename) 369 # TODO(crrev.com/c/1793030): remove no-level case once CL is pushed 370 for level_name in ('', 'DEBUG', 'INFO', 'WARNING'): 371 372 remote_path = os.path.join(log_dir, 'latest') 373 if level_name: 374 remote_path += '.%s' % level_name 375 376 local_path = '%s.%s' % (filename, level_name or 'log') 377 if directory: 378 local_path = os.path.join(directory, local_path) 379 380 try: 381 self._servo_host.get_file( 382 remote_path, local_path, try_rsync=False) 383 384 except error.AutoservRunError as e: 385 result = e.result_obj 386 if result.exit_status != 0: 387 stderr = result.stderr.strip() 388 389 # File not existing is okay, but warn for anything else. 390 if 'no such' not in stderr.lower(): 391 logging.warn( 392 "Couldn't retrieve servod log: %s", 393 stderr or '\n%s' % result) 394 395 try: 396 if os.stat(local_path).st_size == 0: 397 os.unlink(local_path) 398 except EnvironmentError: 399 pass 400 401 else: 402 # No filename given, so caller wants to discard the log lines. 403 # Remove the symlinks to prevent old log-dir links from being 404 # picked up multiple times when using servod without log-dir. 405 remote_path = os.path.join(log_dir, 'latest*') 406 self._servo_host.run( 407 "rm %s" % remote_path, 408 stderr_tee=None, ignore_status=True) 409 410 # Servod log rotation renames current log, then creates a new file with 411 # the old name: log.<date> -> log.<date>.1.tbz2 -> log.<date>.2.tbz2 412 413 # Must rotate after copying, or the copy would be the new, empty file. 414 try: 415 self.set_nocheck('rotate_servod_logs', 'yes') 416 except ControlUnavailableError as e: 417 # Missing control (possibly old servod) 418 logging.warn("Couldn't rotate servod logs: %s", str(e)) 419 except error.TestFail: 420 # Control exists but gave an error; don't let it fail the test. 421 # The error is already logged in set_nocheck(). 422 pass 423 424 def get_power_state_controller(self): 425 """Return the power state controller for this Servo. 426 427 The power state controller provides board-independent 428 interfaces for reset, power-on, power-off operations. 429 430 """ 431 return self._power_state 432 433 434 def initialize_dut(self, cold_reset=False, enable_main=True): 435 """Initializes a dut for testing purposes. 436 437 This sets various servo signals back to default values 438 appropriate for the target board. By default, if the DUT 439 is already on, it stays on. If the DUT is powered off 440 before initialization, its state afterward is unspecified. 441 442 Rationale: Basic initialization of servo sets the lid open, 443 when there is a lid. This operation won't affect powered on 444 units; however, setting the lid open may power on a unit 445 that's off, depending on the board type and previous state 446 of the device. 447 448 If `cold_reset` is a true value, the DUT and its EC will be 449 reset, and the DUT rebooted in normal mode. 450 451 @param cold_reset If True, cold reset the device after 452 initialization. 453 @param enable_main If True, make sure the main servo device has 454 control of the dut. 455 456 """ 457 if enable_main: 458 self.enable_main_servo_device() 459 460 try: 461 self._server.hwinit() 462 except socket.error as e: 463 e.filename = '%s:%s' % (self._servo_host.hostname, 464 self._servo_host.servo_port) 465 raise 466 self._usb_state = None 467 if self.has_control('usb_mux_oe1'): 468 self.set('usb_mux_oe1', 'on') 469 self.switch_usbkey('off') 470 else: 471 logging.warning('Servod command \'usb_mux_oe1\' is not available. ' 472 'Any USB drive related servo routines will fail.') 473 self._uart.start_capture() 474 if cold_reset: 475 if not self._power_state.supported: 476 logging.info('Cold-reset for DUT requested, but servo ' 477 'setup does not support power_state. Skipping.') 478 else: 479 self._power_state.reset() 480 logging.debug('Servo initialized, version is %s', 481 self._server.get_version()) 482 if self.has_control('init_keyboard'): 483 # This indicates the servod version does not 484 # have explicit keyboard initialization yet. 485 # Ignore this. 486 # TODO(coconutruben): change this back to set() about a month 487 # after crrev.com/c/1586239 has been merged (or whenever that 488 # logic is in the labstation images). 489 self.set_nocheck('init_keyboard','on') 490 491 492 def is_localhost(self): 493 """Is the servod hosted locally? 494 495 Returns: 496 True if local hosted; otherwise, False. 497 """ 498 return self._servo_host.is_localhost() 499 500 501 def get_os_version(self): 502 """Returns the chromeos release version.""" 503 lsb_release_content = self.system_output('cat /etc/lsb-release', 504 ignore_status=True) 505 return lsbrelease_utils.get_chromeos_release_builder_path( 506 lsb_release_content=lsb_release_content) 507 508 509 def get_servod_version(self): 510 """Returns the servod version.""" 511 result = self._servo_host.run('servod --version') 512 # TODO: use system_output once servod --version prints to stdout 513 stdout = result.stdout.strip() 514 return stdout if stdout else result.stderr.strip() 515 516 517 def power_long_press(self): 518 """Simulate a long power button press.""" 519 # After a long power press, the EC may ignore the next power 520 # button press (at least on Alex). To guarantee that this 521 # won't happen, we need to allow the EC one second to 522 # collect itself. 523 # long_press is defined as 8.5s in servod 524 self.set_nocheck('power_key', 'long_press') 525 526 527 def power_normal_press(self): 528 """Simulate a normal power button press.""" 529 # press is defined as 1.2s in servod 530 self.set_nocheck('power_key', 'press') 531 532 533 def power_short_press(self): 534 """Simulate a short power button press.""" 535 # tab is defined as 0.2s in servod 536 self.set_nocheck('power_key', 'tab') 537 538 539 def power_key(self, press_secs='tab'): 540 """Simulate a power button press. 541 542 @param press_secs: int, float, str; time to press key in seconds or 543 known shorthand: 'tab' 'press' 'long_press' 544 """ 545 self.set_nocheck('power_key', press_secs) 546 547 548 def pwr_button(self, action='press'): 549 """Simulate a power button press. 550 551 @param action: str; could be press or could be release. 552 """ 553 self.set_nocheck('pwr_button', action) 554 555 556 def lid_open(self): 557 """Simulate opening the lid and raise exception if all attempts fail""" 558 self.set('lid_open', 'yes') 559 560 561 def lid_close(self): 562 """Simulate closing the lid and raise exception if all attempts fail 563 564 Waits 6 seconds to ensure the device is fully asleep before returning. 565 """ 566 self.set('lid_open', 'no') 567 time.sleep(Servo.SLEEP_DELAY) 568 569 570 def vbus_power_get(self): 571 """Get current vbus_power.""" 572 return self.get('vbus_power') 573 574 575 def volume_up(self, timeout=300): 576 """Simulate pushing the volume down button. 577 578 @param timeout: Timeout for setting the volume. 579 """ 580 self.set_get_all(['volume_up:yes', 581 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 582 'volume_up:no']) 583 # we need to wait for commands to take effect before moving on 584 time_left = float(timeout) 585 while time_left > 0.0: 586 value = self.get('volume_up') 587 if value == 'no': 588 return 589 time.sleep(self.SHORT_DELAY) 590 time_left = time_left - self.SHORT_DELAY 591 raise error.TestFail("Failed setting volume_up to no") 592 593 def volume_down(self, timeout=300): 594 """Simulate pushing the volume down button. 595 596 @param timeout: Timeout for setting the volume. 597 """ 598 self.set_get_all(['volume_down:yes', 599 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 600 'volume_down:no']) 601 # we need to wait for commands to take effect before moving on 602 time_left = float(timeout) 603 while time_left > 0.0: 604 value = self.get('volume_down') 605 if value == 'no': 606 return 607 time.sleep(self.SHORT_DELAY) 608 time_left = time_left - self.SHORT_DELAY 609 raise error.TestFail("Failed setting volume_down to no") 610 611 def ctrl_d(self, press_secs='tab'): 612 """Simulate Ctrl-d simultaneous button presses. 613 614 @param press_secs: int, float, str; time to press key in seconds or 615 known shorthand: 'tab' 'press' 'long_press' 616 """ 617 self.set_nocheck('ctrl_d', press_secs) 618 619 620 def ctrl_u(self, press_secs='tab'): 621 """Simulate Ctrl-u simultaneous button presses. 622 623 @param press_secs: int, float, str; time to press key in seconds or 624 known shorthand: 'tab' 'press' 'long_press' 625 """ 626 self.set_nocheck('ctrl_u', press_secs) 627 628 629 def ctrl_enter(self, press_secs='tab'): 630 """Simulate Ctrl-enter simultaneous button presses. 631 632 @param press_secs: int, float, str; time to press key in seconds or 633 known shorthand: 'tab' 'press' 'long_press' 634 """ 635 self.set_nocheck('ctrl_enter', press_secs) 636 637 638 def ctrl_key(self, press_secs='tab'): 639 """Simulate Enter key button press. 640 641 @param press_secs: int, float, str; time to press key in seconds or 642 known shorthand: 'tab' 'press' 'long_press' 643 """ 644 self.set_nocheck('ctrl_key', press_secs) 645 646 647 def enter_key(self, press_secs='tab'): 648 """Simulate Enter key button press. 649 650 @param press_secs: int, float, str; time to press key in seconds or 651 known shorthand: 'tab' 'press' 'long_press' 652 """ 653 self.set_nocheck('enter_key', press_secs) 654 655 656 def refresh_key(self, press_secs='tab'): 657 """Simulate Refresh key (F3) button press. 658 659 @param press_secs: int, float, str; time to press key in seconds or 660 known shorthand: 'tab' 'press' 'long_press' 661 """ 662 self.set_nocheck('refresh_key', press_secs) 663 664 665 def ctrl_refresh_key(self, press_secs='tab'): 666 """Simulate Ctrl and Refresh (F3) simultaneous press. 667 668 This key combination is an alternative of Space key. 669 670 @param press_secs: int, float, str; time to press key in seconds or 671 known shorthand: 'tab' 'press' 'long_press' 672 """ 673 self.set_nocheck('ctrl_refresh_key', press_secs) 674 675 676 def imaginary_key(self, press_secs='tab'): 677 """Simulate imaginary key button press. 678 679 Maps to a key that doesn't physically exist. 680 681 @param press_secs: int, float, str; time to press key in seconds or 682 known shorthand: 'tab' 'press' 'long_press' 683 """ 684 self.set_nocheck('imaginary_key', press_secs) 685 686 687 def sysrq_x(self, press_secs='tab'): 688 """Simulate Alt VolumeUp X simulataneous press. 689 690 This key combination is the kernel system request (sysrq) X. 691 692 @param press_secs: int, float, str; time to press key in seconds or 693 known shorthand: 'tab' 'press' 'long_press' 694 """ 695 self.set_nocheck('sysrq_x', press_secs) 696 697 698 def toggle_recovery_switch(self): 699 """Toggle recovery switch on and off.""" 700 self.enable_recovery_mode() 701 time.sleep(self.REC_TOGGLE_DELAY) 702 self.disable_recovery_mode() 703 704 705 def enable_recovery_mode(self): 706 """Enable recovery mode on device.""" 707 self.set('rec_mode', 'on') 708 709 710 def disable_recovery_mode(self): 711 """Disable recovery mode on device.""" 712 self.set('rec_mode', 'off') 713 714 715 def toggle_development_switch(self): 716 """Toggle development switch on and off.""" 717 self.enable_development_mode() 718 time.sleep(self.DEV_TOGGLE_DELAY) 719 self.disable_development_mode() 720 721 722 def enable_development_mode(self): 723 """Enable development mode on device.""" 724 self.set('dev_mode', 'on') 725 726 727 def disable_development_mode(self): 728 """Disable development mode on device.""" 729 self.set('dev_mode', 'off') 730 731 def boot_devmode(self): 732 """Boot a dev-mode device that is powered off.""" 733 self.power_short_press() 734 self.pass_devmode() 735 736 737 def pass_devmode(self): 738 """Pass through boot screens in dev-mode.""" 739 time.sleep(Servo.BOOT_DELAY) 740 self.ctrl_d() 741 time.sleep(Servo.BOOT_DELAY) 742 743 744 def get_board(self): 745 """Get the board connected to servod.""" 746 return self._server.get_board() 747 748 749 def get_base_board(self): 750 """Get the board of the base connected to servod.""" 751 try: 752 return self._server.get_base_board() 753 except xmlrpclib.Fault as e: 754 # TODO(waihong): Remove the following compatibility check when 755 # the new versions of hdctools are deployed. 756 if 'not supported' in str(e): 757 logging.warning('The servod is too old that get_base_board ' 758 'not supported.') 759 return '' 760 raise 761 762 763 def get_ec_active_copy(self): 764 """Get the active copy of the EC image.""" 765 return self.get('ec_active_copy') 766 767 768 def _get_xmlrpclib_exception(self, xmlexc): 769 """Get meaningful exception string from xmlrpc. 770 771 Args: 772 xmlexc: xmlrpclib.Fault object 773 774 xmlrpclib.Fault.faultString has the following format: 775 776 <type 'exception type'>:'actual error message' 777 778 Parse and return the real exception from the servod side instead of the 779 less meaningful one like, 780 <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no 781 attribute 'hw_driver'"> 782 783 Returns: 784 string of underlying exception raised in servod. 785 """ 786 return re.sub('^.*>:', '', xmlexc.faultString) 787 788 def has_control(self, ctrl_name, prefix=''): 789 """Query servod server to determine if |ctrl_name| is a valid control. 790 791 @param ctrl_name Name of the control. 792 @param prefix: prefix to route control to correct servo device. 793 794 @returns: true if |ctrl_name| is a known control, false otherwise. 795 """ 796 cltr_name = self._build_ctrl_name(ctrl_name, prefix) 797 try: 798 # If the control exists, doc() will work. 799 self._server.doc(ctrl_name) 800 return True 801 except xmlrpclib.Fault as e: 802 if re.search('No control %s' % ctrl_name, 803 self._get_xmlrpclib_exception(e)): 804 return False 805 raise e 806 807 def _build_ctrl_name(self, ctrl_name, prefix): 808 """Helper to build the control name if a prefix is used. 809 810 @param ctrl_name Name of the control. 811 @param prefix: prefix to route control to correct servo device. 812 813 @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty. 814 """ 815 assert ctrl_name 816 if prefix: 817 return '%s.%s' % (prefix, ctrl_name) 818 return ctrl_name 819 820 def get(self, ctrl_name, prefix=''): 821 """Get the value of a gpio from Servod. 822 823 @param ctrl_name Name of the control. 824 @param prefix: prefix to route control to correct servo device. 825 826 @returns: server response to |ctrl_name| request. 827 828 @raise ControlUnavailableError: if |ctrl_name| not a known control. 829 @raise error.TestFail: for all other failures doing get(). 830 """ 831 cltr_name = self._build_ctrl_name(ctrl_name, prefix) 832 try: 833 return self._server.get(ctrl_name) 834 except xmlrpclib.Fault as e: 835 err_str = self._get_xmlrpclib_exception(e) 836 err_msg = "Getting '%s' :: %s" % (ctrl_name, err_str) 837 unknown_ctrl = re.findall(NO_CONTROL_RE, err_str) 838 if unknown_ctrl: 839 raise ControlUnavailableError('No control named %r' % 840 unknown_ctrl[0]) 841 else: 842 logging.error(err_msg) 843 raise error.TestFail(err_msg) 844 845 846 def set(self, ctrl_name, ctrl_value, prefix=''): 847 """Set and check the value of a gpio using Servod. 848 849 @param ctrl_name: Name of the control. 850 @param ctrl_value: New setting for the control. 851 @param prefix: prefix to route control to correct servo device. 852 @raise error.TestFail: if the control value fails to change. 853 """ 854 cltr_name = self._build_ctrl_name(ctrl_name, prefix) 855 self.set_nocheck(ctrl_name, ctrl_value) 856 retry_count = Servo.GET_RETRY_MAX 857 actual_value = self.get(ctrl_name) 858 while ctrl_value != actual_value and retry_count: 859 logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value, 860 retry_count) 861 retry_count -= 1 862 time.sleep(Servo.SHORT_DELAY) 863 actual_value = self.get(ctrl_name) 864 865 if ctrl_value != actual_value: 866 raise error.TestFail( 867 'Servo failed to set %s to %s. Got %s.' 868 % (ctrl_name, ctrl_value, actual_value)) 869 870 871 def set_nocheck(self, ctrl_name, ctrl_value, prefix=''): 872 """Set the value of a gpio using Servod. 873 874 @param ctrl_name Name of the control. 875 @param ctrl_value New setting for the control. 876 @param prefix: prefix to route control to correct servo device. 877 878 @raise ControlUnavailableError: if |ctrl_name| not a known control. 879 @raise error.TestFail: for all other failures doing set(). 880 """ 881 cltr_name = self._build_ctrl_name(ctrl_name, prefix) 882 # The real danger here is to pass a None value through the xmlrpc. 883 assert ctrl_value is not None 884 logging.debug('Setting %s to %r', ctrl_name, ctrl_value) 885 try: 886 self._server.set(ctrl_name, ctrl_value) 887 except xmlrpclib.Fault as e: 888 err_str = self._get_xmlrpclib_exception(e) 889 err_msg = "Setting '%s' :: %s" % (ctrl_name, err_str) 890 unknown_ctrl = re.findall(NO_CONTROL_RE, err_str) 891 if unknown_ctrl: 892 raise ControlUnavailableError('No control named %r' % 893 unknown_ctrl[0]) 894 else: 895 logging.error(err_msg) 896 raise error.TestFail(err_msg) 897 898 899 def set_get_all(self, controls): 900 """Set &| get one or more control values. 901 902 @param controls: list of strings, controls to set &| get. 903 904 @raise: error.TestError in case error occurs setting/getting values. 905 """ 906 rv = [] 907 try: 908 logging.debug('Set/get all: %s', str(controls)) 909 rv = self._server.set_get_all(controls) 910 except xmlrpclib.Fault as e: 911 # TODO(waihong): Remove the following backward compatibility when 912 # the new versions of hdctools are deployed. 913 if 'not supported' in str(e): 914 logging.warning('The servod is too old that set_get_all ' 915 'not supported. Use set and get instead.') 916 for control in controls: 917 if ':' in control: 918 (name, value) = control.split(':') 919 if name == 'sleep': 920 time.sleep(float(value)) 921 else: 922 self.set_nocheck(name, value) 923 rv.append(True) 924 else: 925 rv.append(self.get(name)) 926 else: 927 err_msg = "Problem with '%s' :: %s" % \ 928 (controls, self._get_xmlrpclib_exception(e)) 929 raise error.TestFail(err_msg) 930 return rv 931 932 933 # TODO(waihong) It may fail if multiple servo's are connected to the same 934 # host. Should look for a better way, like the USB serial name, to identify 935 # the USB device. 936 # TODO(sbasi) Remove this code from autoserv once firmware tests have been 937 # updated. 938 def probe_host_usb_dev(self, timeout=_USB_PROBE_TIMEOUT): 939 """Probe the USB disk device plugged-in the servo from the host side. 940 941 It uses servod to discover if there is a usb device attached to it. 942 943 @param timeout The timeout period when probing for the usb host device. 944 945 @return: String of USB disk path (e.g. '/dev/sdb') or None. 946 """ 947 # Set up Servo's usb mux. 948 self.switch_usbkey('host') 949 return self._server.probe_host_usb_dev(timeout) or None 950 951 952 def image_to_servo_usb(self, image_path=None, 953 make_image_noninteractive=False): 954 """Install an image to the USB key plugged into the servo. 955 956 This method may copy any image to the servo USB key including a 957 recovery image or a test image. These images are frequently used 958 for test purposes such as restoring a corrupted image or conducting 959 an upgrade of ec/fw/kernel as part of a test of a specific image part. 960 961 @param image_path Path on the host to the recovery image. 962 @param make_image_noninteractive Make the recovery image 963 noninteractive, therefore the DUT 964 will reboot automatically after 965 installation. 966 """ 967 # We're about to start plugging/unplugging the USB key. We 968 # don't know the state of the DUT, or what it might choose 969 # to do to the device after hotplug. To avoid surprises, 970 # force the DUT to be off. 971 self._server.hwinit() 972 if self.has_control('init_keyboard'): 973 # This indicates the servod version does not 974 # have explicit keyboard initialization yet. 975 # Ignore this. 976 # TODO(coconutruben): change this back to set() about a month 977 # after crrev.com/c/1586239 has been merged (or whenever that 978 # logic is in the labstation images). 979 self.set_nocheck('init_keyboard','on') 980 self._power_state.power_off() 981 982 if image_path: 983 # Set up Servo's usb mux. 984 self.switch_usbkey('host') 985 logging.info('Searching for usb device and copying image to it. ' 986 'Please wait a few minutes...') 987 if not self._server.download_image_to_usb(image_path): 988 logging.error('Failed to transfer requested image to USB. ' 989 'Please take a look at Servo Logs.') 990 raise error.AutotestError('Download image to usb failed.') 991 if make_image_noninteractive: 992 logging.info('Making image noninteractive') 993 if not self._server.make_image_noninteractive(): 994 logging.error('Failed to make image noninteractive. ' 995 'Please take a look at Servo Logs.') 996 997 def boot_in_recovery_mode(self): 998 """Boot host DUT in recovery mode.""" 999 self._power_state.power_on(rec_mode=self._power_state.REC_ON) 1000 self.switch_usbkey('dut') 1001 1002 1003 def install_recovery_image(self, image_path=None, 1004 make_image_noninteractive=False): 1005 """Install the recovery image specified by the path onto the DUT. 1006 1007 This method uses google recovery mode to install a recovery image 1008 onto a DUT through the use of a USB stick that is mounted on a servo 1009 board specified by the usb_dev. If no image path is specified 1010 we use the recovery image already on the usb image. 1011 1012 @param image_path: Path on the host to the recovery image. 1013 @param make_image_noninteractive: Make the recovery image 1014 noninteractive, therefore the DUT will reboot automatically 1015 after installation. 1016 """ 1017 self.image_to_servo_usb(image_path, make_image_noninteractive) 1018 # Give the DUT some time to power_off if we skip 1019 # download image to usb. (crbug.com/982993) 1020 if not image_path: 1021 time.sleep(10) 1022 self.boot_in_recovery_mode() 1023 1024 1025 def _scp_image(self, image_path): 1026 """Copy image to the servo host. 1027 1028 When programming a firmware image on the DUT, the image must be 1029 located on the host to which the servo device is connected. Sometimes 1030 servo is controlled by a remote host, in this case the image needs to 1031 be transferred to the remote host. This adds the servod port number, to 1032 make sure tests for different DUTs don't trample on each other's files. 1033 1034 @param image_path: a string, name of the firmware image file to be 1035 transferred. 1036 @return: a string, full path name of the copied file on the remote. 1037 """ 1038 name = os.path.basename(image_path) 1039 remote_name = 'dut_%s.%s' % (self._servo_host.servo_port, name) 1040 dest_path = os.path.join('/tmp', remote_name) 1041 logging.info('Copying %s to %s', name, dest_path) 1042 self._servo_host.send_file(image_path, dest_path) 1043 return dest_path 1044 1045 1046 def system(self, command, timeout=3600): 1047 """Execute the passed in command on the servod host. 1048 1049 @param command Command to be executed. 1050 @param timeout Maximum number of seconds of runtime allowed. Default to 1051 1 hour. 1052 """ 1053 logging.info('Will execute on servo host: %s', command) 1054 self._servo_host.run(command, timeout=timeout) 1055 1056 1057 def system_output(self, command, timeout=3600, 1058 ignore_status=False, args=()): 1059 """Execute the passed in command on the servod host, return stdout. 1060 1061 @param command a string, the command to execute 1062 @param timeout an int, max number of seconds to wait til command 1063 execution completes. Default to 1 hour. 1064 @param ignore_status a Boolean, if true - ignore command's nonzero exit 1065 status, otherwise an exception will be thrown 1066 @param args a tuple of strings, each becoming a separate command line 1067 parameter for the command 1068 @return command's stdout as a string. 1069 """ 1070 return self._servo_host.run(command, timeout=timeout, 1071 ignore_status=ignore_status, 1072 args=args).stdout.strip() 1073 1074 1075 def get_servo_version(self, active=False): 1076 """Get the version of the servo, e.g., servo_v2 or servo_v3. 1077 1078 @param active: Only return the servo type with the active device. 1079 @return: The version of the servo. 1080 1081 """ 1082 servo_type = self._server.get_version() 1083 if '_and_' not in servo_type or not active: 1084 return servo_type 1085 1086 # If servo v4 is using ccd and servo micro, modify the servo type to 1087 # reflect the active device. 1088 active_device = self.get('active_v4_device') 1089 if active_device in servo_type: 1090 logging.info('%s is active', active_device) 1091 return 'servo_v4_with_' + active_device 1092 1093 logging.warn("%s is active even though it's not in servo type", 1094 active_device) 1095 return servo_type 1096 1097 1098 def get_main_servo_device(self): 1099 """Return the main servo device""" 1100 return self._servo_type.split('_with_')[-1].split('_and_')[0] 1101 1102 1103 def enable_main_servo_device(self): 1104 """Make sure the main device has control of the dut.""" 1105 # Cr50 detects servo using the EC uart. It doesn't work well if the 1106 # board doesn't use EC uart. The lab active_v4_device doesn't handle 1107 # this correctly. Check ec_uart_pty before trying to change the active 1108 # device. 1109 # TODO(crbug.com/1016842): reenable the setting the main device when 1110 # active device works on labstations. 1111 return 1112 if not self.has_control('active_v4_device'): 1113 return 1114 self.set('active_v4_device', self.get_main_servo_device()) 1115 1116 1117 def main_device_is_ccd(self): 1118 """Whether the main servo device (no prefixes) is a ccd device.""" 1119 servo = self._server.get_version() 1120 return 'ccd_cr50' in servo and 'servo_micro' not in servo 1121 1122 1123 def main_device_is_flex(self): 1124 """Whether the main servo device (no prefixes) is a legacy device.""" 1125 return not self.main_device_is_ccd() 1126 1127 1128 def main_device_is_active(self): 1129 """Return whether the main device is the active device. 1130 1131 This is only relevant for a dual setup with ccd and legacy on the same 1132 DUT. The main device is the servo that has no prefix on its controls. 1133 This helper answers the question whether that device is also the 1134 active device or not. 1135 """ 1136 # TODO(coconutruben): The current implementation of the dual setup only 1137 # ever has legacy as the main device. Therefore, it suffices to ask 1138 # whether the active device is ccd. 1139 if not self.dts_mode_is_valid(): 1140 # Use dts support as a proxy to whether the servo setup could 1141 # support a dual role. Only those setups now support legacy and ccd. 1142 return True 1143 active_device = self.get('active_v4_device') 1144 return 'ccd_cr50' not in active_device 1145 1146 def _initialize_programmer(self, rw_only=False): 1147 """Initialize the firmware programmer. 1148 1149 @param rw_only: True to initialize a programmer which only 1150 programs the RW portions. 1151 """ 1152 if self._programmer: 1153 return 1154 # Initialize firmware programmer 1155 if self._servo_type.startswith('servo_v2'): 1156 self._programmer = firmware_programmer.ProgrammerV2(self) 1157 self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self) 1158 # Both servo v3 and v4 use the same programming methods so just leverage 1159 # ProgrammerV3 for servo v4 as well. 1160 elif (self._servo_type.startswith('servo_v3') or 1161 self._servo_type.startswith('servo_v4')): 1162 self._programmer = firmware_programmer.ProgrammerV3(self) 1163 self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self) 1164 else: 1165 raise error.TestError( 1166 'No firmware programmer for servo version: %s' % 1167 self._servo_type) 1168 1169 1170 def program_bios(self, image, rw_only=False): 1171 """Program bios on DUT with given image. 1172 1173 @param image: a string, file name of the BIOS image to program 1174 on the DUT. 1175 @param rw_only: True to only program the RW portion of BIOS. 1176 1177 """ 1178 self._initialize_programmer() 1179 if not self.is_localhost(): 1180 image = self._scp_image(image) 1181 if rw_only: 1182 self._programmer_rw.program_bios(image) 1183 else: 1184 self._programmer.program_bios(image) 1185 1186 1187 def program_ec(self, image, rw_only=False): 1188 """Program ec on DUT with given image. 1189 1190 @param image: a string, file name of the EC image to program 1191 on the DUT. 1192 @param rw_only: True to only program the RW portion of EC. 1193 1194 """ 1195 self._initialize_programmer() 1196 if not self.is_localhost(): 1197 image = self._scp_image(image) 1198 if rw_only: 1199 self._programmer_rw.program_ec(image) 1200 else: 1201 self._programmer.program_ec(image) 1202 1203 1204 def extract_ec_image(self, board, model, tarball_path): 1205 """Helper function to extract EC image from downloaded tarball. 1206 1207 @param board: The DUT board name. 1208 @param model: The DUT model name. 1209 @param tarball_path: The path of the downloaded build tarball. 1210 1211 @return: Path to extracted EC image. 1212 """ 1213 1214 # Ignore extracting EC image and re-programming if not a Chrome EC 1215 chrome_ec = FAFTConfig(board).chrome_ec 1216 if not chrome_ec: 1217 logging.info('Not a Chrome EC, ignore re-programming it') 1218 return None 1219 1220 # Best effort; try to retrieve the EC board from the version as 1221 # reported by the EC. 1222 ec_board = None 1223 try: 1224 ec_board = self.get('ec_board') 1225 except Exception as err: 1226 logging.info('Failed to get ec_board value; ignoring') 1227 pass 1228 1229 # Array of candidates for EC image 1230 ec_image_candidates = ['ec.bin', 1231 '%s/ec.bin' % model, 1232 '%s/ec.bin' % board] 1233 if ec_board: 1234 ec_image_candidates.append('%s/ec.bin' % ec_board) 1235 1236 # Extract EC image from tarball 1237 dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC') 1238 ec_image = _extract_image_from_tarball(tarball_path, dest_dir, 1239 ec_image_candidates) 1240 1241 # Check if EC image was found and return path or raise error 1242 if ec_image: 1243 return os.path.join(dest_dir, ec_image) 1244 else: 1245 raise error.TestError('Failed to extract EC image from %s', 1246 tarball_path) 1247 1248 1249 def extract_bios_image(self, board, model, tarball_path): 1250 """Helper function to extract BIOS image from downloaded tarball. 1251 1252 @param board: The DUT board name. 1253 @param model: The DUT model name. 1254 @param tarball_path: The path of the downloaded build tarball. 1255 1256 @return: Path to extracted BIOS image. 1257 """ 1258 1259 # Array of candidates for BIOS image 1260 bios_image_candidates = ['image.bin', 1261 'image-%s.bin' % model, 1262 'image-%s.bin' % board] 1263 1264 # Extract BIOS image from tarball 1265 dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS') 1266 bios_image = _extract_image_from_tarball(tarball_path, dest_dir, 1267 bios_image_candidates) 1268 1269 # Check if BIOS image was found and return path or raise error 1270 if bios_image: 1271 return os.path.join(dest_dir, bios_image) 1272 else: 1273 raise error.TestError('Failed to extract BIOS image from %s', 1274 tarball_path) 1275 1276 1277 def _switch_usbkey_power(self, power_state, detection_delay=False): 1278 """Switch usbkey power. 1279 1280 This function switches usbkey power by setting the value of 1281 'prtctl4_pwren'. If the power is already in the 1282 requested state, this function simply returns. 1283 1284 @param power_state: A string, 'on' or 'off'. 1285 @param detection_delay: A boolean value, if True, sleep 1286 for |USB_DETECTION_DELAY| after switching 1287 the power on. 1288 """ 1289 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 1290 # handle beaglebones that haven't yet updated and have the 1291 # safe_switch_usbkey_power RPC. I'll remove this once all beaglebones 1292 # have been updated and also think about a better way to handle 1293 # situations like this. 1294 try: 1295 self._server.safe_switch_usbkey_power(power_state) 1296 except Exception: 1297 self.set('prtctl4_pwren', power_state) 1298 if power_state == 'off': 1299 time.sleep(self.USB_POWEROFF_DELAY) 1300 elif detection_delay: 1301 time.sleep(self.USB_DETECTION_DELAY) 1302 1303 1304 def switch_usbkey(self, usb_state): 1305 """Connect USB flash stick to either host or DUT, or turn USB port off. 1306 1307 This function switches the servo multiplexer to provide electrical 1308 connection between the USB port J3 and either host or DUT side. It 1309 can also be used to turn the USB port off. 1310 1311 Switching to 'dut' or 'host' is accompanied by powercycling 1312 of the USB stick, because it sometimes gets wedged if the mux 1313 is switched while the stick power is on. 1314 1315 @param usb_state: A string, one of 'dut', 'host', or 'off'. 1316 'dut' and 'host' indicate which side the 1317 USB flash device is required to be connected to. 1318 'off' indicates turning the USB port off. 1319 1320 @raise: error.TestError in case the parameter is not 'dut' 1321 'host', or 'off'. 1322 """ 1323 if self.get_usbkey_direction() == usb_state: 1324 return 1325 1326 if usb_state == 'off': 1327 self._switch_usbkey_power('off') 1328 self._usb_state = usb_state 1329 return 1330 elif usb_state == 'host': 1331 mux_direction = 'servo_sees_usbkey' 1332 elif usb_state == 'dut': 1333 mux_direction = 'dut_sees_usbkey' 1334 else: 1335 raise error.TestError('Unknown USB state request: %s' % usb_state) 1336 1337 self._switch_usbkey_power('off') 1338 # TODO(kevcheng): Forgive me for this terrible hack. This is just to 1339 # handle beaglebones that haven't yet updated and have the 1340 # safe_switch_usbkey RPC. I'll remove this once all beaglebones have 1341 # been updated and also think about a better way to handle situations 1342 # like this. 1343 try: 1344 self._server.safe_switch_usbkey(mux_direction) 1345 except Exception: 1346 self.set('usb_mux_sel1', mux_direction) 1347 time.sleep(self.USB_POWEROFF_DELAY) 1348 self._switch_usbkey_power('on', usb_state == 'host') 1349 self._usb_state = usb_state 1350 1351 1352 def get_usbkey_direction(self): 1353 """Get which side USB is connected to or 'off' if usb power is off. 1354 1355 @return: A string, one of 'dut', 'host', or 'off'. 1356 """ 1357 if not self._usb_state: 1358 if self.get('prtctl4_pwren') == 'off': 1359 self._usb_state = 'off' 1360 elif self.get('usb_mux_sel1').startswith('dut'): 1361 self._usb_state = 'dut' 1362 else: 1363 self._usb_state = 'host' 1364 return self._usb_state 1365 1366 1367 def set_servo_v4_role(self, role): 1368 """Set the power role of servo v4, either 'src' or 'snk'. 1369 1370 It does nothing if not a servo v4. 1371 1372 @param role: Power role for DUT port on servo v4, either 'src' or 'snk'. 1373 """ 1374 if self._servo_type.startswith('servo_v4'): 1375 value = self.get('servo_v4_role') 1376 if value != role: 1377 self.set_nocheck('servo_v4_role', role) 1378 else: 1379 logging.debug('Already in the role: %s.', role) 1380 else: 1381 logging.debug('Not a servo v4, unable to set role to %s.', role) 1382 1383 1384 def supports_built_in_pd_control(self): 1385 """Return whether the servo type supports pd charging and control.""" 1386 if 'servo_v4' not in self._servo_type: 1387 # Only servo v4 supports this feature. 1388 logging.info('%r type does not support pd control.', 1389 self._servo_type) 1390 return False 1391 # On servo v4, it still needs to be the type-c version. 1392 if not self.get('servo_v4_type') == 'type-c': 1393 logging.info('PD controls require a type-c servo v4.') 1394 return False 1395 # Lastly, one cannot really do anything without a plugged in charger. 1396 chg_port_mv = self.get('ppchg5_mv') 1397 if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV: 1398 logging.warn('It appears that no charger is plugged into servo v4. ' 1399 'Charger port voltage: %dmV', chg_port_mv) 1400 return False 1401 logging.info('Charger port voltage: %dmV', chg_port_mv) 1402 return True 1403 1404 def dts_mode_is_valid(self): 1405 """Return whether servo setup supports dts mode control for cr50.""" 1406 if 'servo_v4' not in self._servo_type: 1407 # Only servo v4 supports this feature. 1408 logging.debug('%r type does not support dts mode control.', 1409 self._servo_type) 1410 return False 1411 # On servo v4, it still needs ot be the type-c version. 1412 if not 'type-c' == self.get('servo_v4_type'): 1413 logging.info('DTS controls require a type-c servo v4.') 1414 return False 1415 return True 1416 1417 def dts_mode_is_safe(self): 1418 """Return whether servo setup supports dts mode without losing access. 1419 1420 DTS mode control exists but the main device might go through ccd. 1421 In that case, it's only safe to control dts mode if the main device 1422 is legacy as otherwise the connection to the main device cuts out. 1423 """ 1424 return self.dts_mode_is_valid() and self.main_device_is_flex() 1425 1426 def get_dts_mode(self): 1427 """Return servo dts mode. 1428 1429 @returns: on/off whether dts is on or off 1430 """ 1431 if not self.dts_mode_is_valid(): 1432 logging.info('Not a valid servo setup. Unable to get dts mode.') 1433 return 1434 return self.get('servo_v4_dts_mode') 1435 1436 def set_dts_mode(self, state): 1437 """Set servo dts mode to off or on. 1438 1439 It does nothing if not a servo v4. Disable the ccd watchdog if we're 1440 disabling dts mode. CCD will disconnect. The watchdog only allows CCD 1441 to disconnect for 10 seconds until it kills servod. Disable the 1442 watchdog, so CCD can stay disconnected indefinitely. 1443 1444 @param state: Set servo v4 dts mode 'off' or 'on'. 1445 """ 1446 if not self.dts_mode_is_valid(): 1447 logging.info('Not a valid servo setup. Unable to set dts mode %s.', 1448 state) 1449 return 1450 1451 # TODO(mruthven): remove watchdog check once the labstation has been 1452 # updated to have support for modifying the watchdog. 1453 set_watchdog = (self.has_control('watchdog') and 1454 'ccd' in self._servo_type) 1455 enable_watchdog = state == 'on' 1456 1457 if set_watchdog and not enable_watchdog: 1458 self.set_nocheck('watchdog_remove', 'ccd') 1459 1460 self.set_nocheck('servo_v4_dts_mode', state) 1461 1462 if set_watchdog and enable_watchdog: 1463 self.set_nocheck('watchdog_add', 'ccd') 1464 1465 1466 def _get_servo_type_fw_version(self, servo_type, prefix=''): 1467 """Helper to handle fw retrieval for micro/v4 vs ccd. 1468 1469 @param servo_type: one of 'servo_v4', 'servo_micro', 'ccd_cr50' 1470 @param prefix: whether the control has a prefix 1471 1472 @returns: fw version for non-ccd devices, cr50 version for ccd device 1473 """ 1474 if servo_type == 'ccd_cr50': 1475 # ccd_cr50 runs on cr50, so need to query the cr50 fw. 1476 servo_type = 'cr50' 1477 cmd = '%s_version' % servo_type 1478 try: 1479 return self.get(cmd, prefix=prefix) 1480 except error.TestFail: 1481 # Do not fail here, simply report the version as unknown. 1482 logging.warn('Unable to query %r to get servo fw version.', cmd) 1483 return 'unknown' 1484 1485 1486 def get_servo_fw_versions(self): 1487 """Retrieve a summary of attached servos and their firmware. 1488 1489 Note: that only the Google firmware owned servos supports this e.g. 1490 micro, v4, etc. For anything else, the dictionary will have no entry. 1491 If no device is has Google owned firmware (e.g. v3) then the result 1492 is an empty dictionary. 1493 1494 @returns: dict, a collection of each attached servo & their firmware. 1495 """ 1496 def get_fw_version_tag(tag, dev): 1497 return '%s_version.%s' % (dev, tag) 1498 1499 fw_versions = {} 1500 if 'servo_v4' not in self._servo_type: 1501 return {} 1502 v4_tag = get_fw_version_tag('support', 'servo_v4') 1503 fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_v4') 1504 if 'with' in self._servo_type: 1505 dut_devs = self._servo_type.split('_with_')[1].split('_and_') 1506 main_tag = get_fw_version_tag('main', dut_devs[0]) 1507 fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0]) 1508 if len(dut_devs) == 2: 1509 # Right now, the only way for this to happen is for a dual setup 1510 # to exist where ccd is attached on top of servo micro. Thus, we 1511 # know that the prefix is ccd_cr50 and the type is ccd_cr50. 1512 # TODO(coconutruben): If the new servod is not deployed by 1513 # the time that there are more cases of '_and_' devices, 1514 # this needs to be reworked. 1515 dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1]) 1516 fw = self._get_servo_type_fw_version(dut_devs[1], 'ccd_cr50') 1517 fw_versions[dual_tag] = fw 1518 return fw_versions 1519 1520 @property 1521 def uart_logs_dir(self): 1522 """Return the directory to save UART logs.""" 1523 return self._uart.logs_dir if self._uart else "" 1524 1525 1526 @uart_logs_dir.setter 1527 def uart_logs_dir(self, logs_dir): 1528 """Set directory to save UART logs. 1529 1530 @param logs_dir String of directory name.""" 1531 if self._uart: 1532 self._uart.logs_dir = logs_dir 1533 1534 1535 def close(self): 1536 """Close the servo object.""" 1537 if self._uart: 1538 self._uart.stop_capture() 1539 self._uart.dump() 1540 self._uart = None 1541