1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5# 6# Expects to be run in an environment with sudo and no interactive password 7# prompt, such as within the Chromium OS development chroot. 8 9import ast 10import logging 11import os 12import re 13import six 14import six.moves.xmlrpc_client 15import six.moves.http_client 16import time 17 18from autotest_lib.client.common_lib import error 19from autotest_lib.client.common_lib import lsbrelease_utils 20from autotest_lib.client.common_lib import seven 21from autotest_lib.server import utils as server_utils 22from autotest_lib.server.cros.servo import firmware_programmer 23from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig 24 25 26# Regex to match XMLRPC errors due to a servod control not existing. 27# Servod uses both 'No control %s' and 'No control named %s' in exceptions. 28NO_CONTROL_RE = re.compile(r'No control(?: named)? (?P<name>\w*\.?\w*)') 29 30# Please see servo/drv/pty_driver.py for error messages to match. 31 32# This common prefix can apply to all subtypes of console errors. 33# The first portion is an optional qualifier of the type 34# of error that occurred. Each error is or'd. 35CONSOLE_COMMON_RE = (r'((Timeout waiting for response|' 36 r'Known error [\w\'\".\s]+). )?' 37 # The second portion is an optional name for the console 38 # source 39 r'(\w+\: )?') 40 41# Regex to match XMLRPC errors due to a console being unresponsive. 42NO_CONSOLE_OUTPUT_RE = re.compile(r'%sNo data was sent from the pty\.' % 43 CONSOLE_COMMON_RE) 44 45 46# Regex to match XMLRPC errors due to a console control failing, but the 47# underlying Console being responsive. 48CONSOLE_MISMATCH_RE = re.compile(r'%sThere was output:' % CONSOLE_COMMON_RE) 49 50 51# The minimum voltage on the charger port on servo v4 that is expected. This is 52# to query whether a charger is plugged into servo v4 and thus pd control 53# capabilities can be used. 54V4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400 55 56 57class ControlUnavailableError(error.TestFail): 58 """Custom error class to indicate a control is unavailable on servod.""" 59 pass 60 61 62class ConsoleError(error.TestFail): 63 """Common error class for servod console-back control failures.""" 64 pass 65 66 67class UnresponsiveConsoleError(ConsoleError): 68 """Error for: A console control fails for lack of console output.""" 69 pass 70 71 72class ResponsiveConsoleError(ConsoleError): 73 """Error for: A console control fails but console is responsive.""" 74 pass 75 76 77class ServodBadResponse(six.moves.http_client.BadStatusLine): 78 """Indicates a bad HTTP response from servod""" 79 80 def __init__(self, when, line): 81 """ 82 83 @param when: Description of the operation being performed (get/set) 84 @param line: The line that came from the server, often an empty string. 85 """ 86 super(ServodBadResponse, self).__init__(line) 87 self.when = when 88 89 def __str__(self): 90 """String representation of the exception""" 91 return '%s -- StatusLine=%s' % (self.when, self.line) 92 93 94class ServodEmptyResponse(ServodBadResponse): 95 """Indicates an empty response from servod, possibly because it exited.""" 96 pass 97 98 99class ServodConnectionError(seven.SOCKET_ERRORS[0]): 100 """Indicates socket errors seen during communication with servod""" 101 102 def __init__(self, when, errno, strerror, filename): 103 """Instance initializer 104 105 The filename is used to add details to the exception message: 106 [Errno 104] Connection reset by peer: "<Servo 'ipaddr:9999'>" 107 108 @param when: Description of the operation being performed at the time 109 @param errno: errno value, such as ECONNRESET 110 @param strerror: OS-provided description ("connection reset by peer") 111 @param filename: Something to report as a path, such as a socket address 112 """ 113 # [Errno 104] [Setting ctrl:val] Connection reset by peer: <Servo... 114 self.when = when 115 super(ServodConnectionError, self).__init__(errno, strerror, filename) 116 117 def __str__(self): 118 """String representation of the exception""" 119 return '%s -- [Errno %d] %s: %r' % (self.when, self.errno, 120 self.strerror, self.filename) 121 122 123# TODO: once in python 3, inherit from AbstractContextManager 124class _WrapServoErrors(object): 125 """ 126 Wrap an operation, replacing BadStatusLine and socket.error with 127 servo-specific versions, and extracting exception info from xmlrplib.Fault. 128 129 @param servo_name: The servo object, used to add the servo name to errors. 130 See the ServodConnectionError docstring. 131 @param description: String to use when describing what was being done 132 @raise ServodBadStatusLine: if exception is a httplib.BadStatusLine 133 @raise ServodSocketError: if exception is a socket.error 134 @raise ControlUnavailableError: if Fault matches NO_CONTROL_RE 135 @raise UnresponsiveConsoleError: if Fault matches NO_CONSOLE_OUTPUT_RE 136 @raise ResponsiveConsoleError: if Fault matches CONSOLE_MISMATCH_RE 137 """ 138 139 def __init__(self, servo, description): 140 self.servo_name = str(servo) 141 self.description = description 142 143 @staticmethod 144 def _get_xmlrpclib_exception(xmlexc): 145 """Get meaningful exception string from xmlrpc. 146 147 Args: 148 xmlexc: xmlrpclib.Fault object 149 150 xmlrpclib.Fault.faultString has the following format: 151 152 <type 'exception type'>:'actual error message' 153 154 Parse and return the real exception from the servod side instead of the 155 less meaningful one like, 156 <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no 157 attribute 'hw_driver'"> 158 159 Returns: 160 string of underlying exception raised in servod. 161 """ 162 return re.sub('^.*>:', '', xmlexc.faultString) 163 164 @staticmethod 165 def _log_exception(exc_type, exc_val, exc_tb): 166 """Log exception information""" 167 if exc_val is not None: 168 logging.debug( 169 'Wrapped exception:', exc_info=(exc_type, exc_val, exc_tb)) 170 171 def __enter__(self): 172 """Enter the context""" 173 return self 174 175 def __exit__(self, exc_type, exc_val, exc_tb): 176 """Exit the context, handling the exception if there was one""" 177 try: 178 if isinstance(exc_val, six.moves.http_client.BadStatusLine): 179 self._log_exception(exc_type, exc_val, exc_tb) 180 if exc_val.line in ('', "''"): 181 err = ServodEmptyResponse(self.description, exc_val.line) 182 else: 183 err = ServodBadResponse(self.description, exc_val.line) 184 six.reraise(err.__class__, err, exc_tb) 185 186 if isinstance(exc_val, seven.SOCKET_ERRORS): 187 self._log_exception(exc_type, exc_val, exc_tb) 188 err = ServodConnectionError(self.description, exc_val.args[0], 189 exc_val.args[1], self.servo_name) 190 six.reraise(err.__class__, err, exc_tb) 191 192 if isinstance(exc_val, six.moves.xmlrpc_client.Fault): 193 err_str = self._get_xmlrpclib_exception(exc_val) 194 err_msg = '%s :: %s' % (self.description, err_str) 195 unknown_ctrl = re.search(NO_CONTROL_RE, err_str) 196 if not unknown_ctrl: 197 # Log the full text for errors, except unavailable controls. 198 self._log_exception(exc_type, exc_val, exc_tb) 199 logging.debug(err_msg) 200 if unknown_ctrl: 201 # The error message for unavailable controls is huge, since 202 # it reports all known controls. Don't log the full text. 203 unknown_ctrl_name = unknown_ctrl.group('name') 204 logging.debug('%s :: No control named %r', 205 self.description, unknown_ctrl_name) 206 err = ControlUnavailableError( 207 'No control named %r' % unknown_ctrl_name) 208 elif re.search(NO_CONSOLE_OUTPUT_RE, err_str): 209 err = UnresponsiveConsoleError( 210 'Console not printing output. %s.' % 211 self.description) 212 elif re.search(CONSOLE_MISMATCH_RE, err_str): 213 err = ResponsiveConsoleError( 214 'Control failed but console alive. %s.' % 215 self.description) 216 else: 217 err = error.TestFail(err_msg) 218 six.reraise(err.__class__, err, exc_tb) 219 finally: 220 del exc_tb 221 222 223def _extract_image_from_tarball(tarball, dest_dir, image_candidates, timeout): 224 """Try extracting the image_candidates from the tarball. 225 226 @param tarball: The path of the tarball. 227 @param dest_path: The path of the destination. 228 @param image_candidates: A tuple of the paths of image candidates. 229 @param timeout: Time to wait in seconds before timing out. 230 231 @return: The first path from the image candidates, which succeeds, or None 232 if all the image candidates fail. 233 """ 234 235 # Create the firmware_name subdirectory if it doesn't exist 236 if not os.path.exists(dest_dir): 237 os.mkdir(dest_dir) 238 239 # Generate a list of all tarball files 240 stdout = server_utils.system_output('tar tf %s' % tarball, 241 timeout=timeout, 242 ignore_status=True) 243 tarball_files = stdout.splitlines() 244 245 # Check if image candidates are in the list of tarball files 246 for image in image_candidates: 247 if image in tarball_files: 248 # Extract and return the first image candidate found 249 tar_cmd = 'tar xf %s -C %s %s' % (tarball, dest_dir, image) 250 status = server_utils.system(tar_cmd, 251 timeout=timeout, 252 ignore_status=True) 253 if status == 0: 254 return image 255 return None 256 257 258class _PowerStateController(object): 259 260 """Class to provide board-specific power operations. 261 262 This class is responsible for "power on" and "power off" 263 operations that can operate without making assumptions in 264 advance about board state. It offers an interface that 265 abstracts out the different sequences required for different 266 board types. 267 268 """ 269 # Constants acceptable to be passed for the `rec_mode` parameter 270 # to power_on(). 271 # 272 # REC_ON: Boot the DUT in recovery mode, i.e. boot from USB or 273 # SD card. 274 # REC_OFF: Boot in normal mode, i.e. boot from internal storage. 275 276 REC_ON = 'rec' 277 REC_OFF = 'on' 278 REC_ON_FORCE_MRC = 'rec_force_mrc' 279 280 # Delay in seconds needed between asserting and de-asserting 281 # warm reset. 282 _RESET_HOLD_TIME = 0.5 283 284 285 def __init__(self, servo): 286 """Initialize the power state control. 287 288 @param servo Servo object providing the underlying `set` and `get` 289 methods for the target controls. 290 291 """ 292 self._servo = servo 293 self.supported = self._servo.has_control('power_state') 294 self.last_rec_mode = self.REC_OFF 295 if not self.supported: 296 logging.info('Servo setup does not support power-state operations. ' 297 'All power-state calls will lead to error.TestFail') 298 299 def _check_supported(self): 300 """Throw an error if dts mode control is not supported.""" 301 if not self.supported: 302 raise error.TestFail('power_state controls not supported') 303 304 def reset(self): 305 """Force the DUT to reset. 306 307 The DUT is guaranteed to be on at the end of this call, 308 regardless of its previous state, provided that there is 309 working OS software. This also guarantees that the EC has 310 been restarted. 311 312 """ 313 self._check_supported() 314 self._servo.set_nocheck('power_state', 'reset') 315 316 def cr50_reset(self): 317 """Force the DUT to reset. 318 319 The DUT is guaranteed to be on at the end of this call, 320 regardless of its previous state, provided that there is 321 working OS software. This also guarantees that the EC has 322 been restarted. Works only for ccd connections. 323 324 """ 325 self._check_supported() 326 self._servo.set_nocheck('power_state', 'cr50_reset') 327 328 def warm_reset(self): 329 """Apply warm reset to the DUT. 330 331 This asserts, then de-asserts the 'warm_reset' signal. 332 Generally, this causes the board to restart. 333 334 """ 335 # TODO: warm_reset support has added to power_state.py. Once it 336 # available to labstation remove fallback method. 337 self._check_supported() 338 try: 339 self._servo.set_nocheck('power_state', 'warm_reset') 340 except error.TestFail as err: 341 logging.info("Fallback to warm_reset control method") 342 self._servo.set_get_all(['warm_reset:on', 343 'sleep:%.4f' % self._RESET_HOLD_TIME, 344 'warm_reset:off']) 345 346 def power_off(self): 347 """Force the DUT to power off. 348 349 The DUT is guaranteed to be off at the end of this call, 350 regardless of its previous state, provided that there is 351 working EC and boot firmware. There is no requirement for 352 working OS software. 353 354 """ 355 self._check_supported() 356 self._servo.set_nocheck('power_state', 'off') 357 358 def power_on(self, rec_mode=REC_OFF): 359 """Force the DUT to power on. 360 361 Prior to calling this function, the DUT must be powered off, 362 e.g. with a call to `power_off()`. 363 364 At power on, recovery mode is set as specified by the 365 corresponding argument. When booting with recovery mode on, it 366 is the caller's responsibility to unplug/plug in a bootable 367 external storage device. 368 369 If the DUT requires a delay after powering on but before 370 processing inputs such as USB stick insertion, the delay is 371 handled by this method; the caller is not responsible for such 372 delays. 373 374 @param rec_mode Setting of recovery mode to be applied at 375 power on. default: REC_OFF aka 'off' 376 377 """ 378 self._check_supported() 379 self._servo.set_nocheck('power_state', rec_mode) 380 self.last_rec_mode = rec_mode 381 382 def retry_power_on(self): 383 """Retry powering on the DUT. 384 385 After power_on(...) the system might not come up reliably, although 386 the reasons aren't known yet. This function retries turning on the 387 system again, trying to bring it in the last state that power_on() 388 attempted to reach. 389 """ 390 self._check_supported() 391 self._servo.set_nocheck('power_state', self.last_rec_mode) 392 393 394class _Uart(object): 395 """Class to capture UART streams of CPU, EC, Cr50, etc.""" 396 _UartToCapture = ( 397 'cpu', 398 'cr50', 399 'ec', 400 'servo_micro', 401 'servo_v4', 402 'usbpd', 403 'ccd_cr50.ec', 404 'ccd_cr50.cpu', 405 'ccd_cr50.cr50' 406 ) 407 408 409 def __init__(self, servo): 410 self._servo = servo 411 self._streams = [] 412 self.logs_dir = None 413 414 def _start_stop_capture(self, uart, start): 415 """Helper function to start/stop capturing on specified UART. 416 417 @param uart: The UART name to start/stop capturing. 418 @param start: True to start capturing, otherwise stop. 419 420 @returns True if the operation completes successfully. 421 False if the UART capturing is not supported or failed due to 422 an error. 423 """ 424 logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop', 425 uart) 426 uart_cmd = '%s_uart_capture' % uart 427 target_level = 'on' if start else 'off' 428 level = None 429 if self._servo.has_control(uart_cmd): 430 # Do our own implementation of set() here as not_applicable 431 # should also count as a valid control. 432 logging.debug('Trying to set %s to %s.', uart_cmd, target_level) 433 try: 434 self._servo.set_nocheck(uart_cmd, target_level) 435 level = self._servo.get(uart_cmd) 436 except error.TestFail as e: 437 # Any sort of test failure here should not stop the test. This 438 # is just to capture more output. Log and move on. 439 logging.warning('Failed to set %s to %s. %s. Ignoring.', 440 uart_cmd, target_level, str(e)) 441 if level == target_level: 442 logging.debug('Managed to set %s to %s.', uart_cmd, level) 443 else: 444 logging.debug('Failed to set %s to %s. Got %s.', uart_cmd, 445 target_level, level) 446 return level == target_level 447 448 def start_capture(self): 449 """Start capturing UART streams.""" 450 for uart in self._UartToCapture: 451 if self._start_stop_capture(uart, True): 452 self._streams.append(uart) 453 454 def get_logfile(self, uart): 455 """Return the path to the uart logfile or none if logs_dir isn't set.""" 456 if not self.logs_dir: 457 return None 458 return os.path.join(self.logs_dir, '%s_uart.txt' % uart) 459 460 def dump(self): 461 """Dump UART streams to log files accordingly.""" 462 if not self.logs_dir: 463 return 464 465 for uart in self._streams: 466 logfile_fullname = self.get_logfile(uart) 467 stream = '%s_uart_stream' % uart 468 try: 469 content = self._servo.get(stream) 470 except Exception as err: 471 logging.warn('Failed to get UART log for %s: %s', stream, err) 472 continue 473 474 if content == 'not_applicable': 475 logging.warn('%s is not applicable', stream) 476 continue 477 478 # The UART stream may contain non-printable characters, and servo 479 # returns it in string representation. We use `ast.leteral_eval` 480 # to revert it back. 481 with open(logfile_fullname, 'a') as fd: 482 try: 483 fd.write(ast.literal_eval(content)) 484 except ValueError: 485 logging.exception('Invalid value for %s: %r', stream, 486 content) 487 488 def stop_capture(self): 489 """Stop capturing UART streams.""" 490 for uart in self._UartToCapture: 491 try: 492 self._start_stop_capture(uart, False) 493 except Exception as err: 494 logging.warn('Failed to stop UART logging for %s: %s', uart, 495 err) 496 497 498class Servo(object): 499 500 """Manages control of a Servo board. 501 502 Servo is a board developed by hardware group to aide in the debug and 503 control of various partner devices. Servo's features include the simulation 504 of pressing the power button, closing the lid, and pressing Ctrl-d. This 505 class manages setting up and communicating with a servo demon (servod) 506 process. It provides both high-level functions for common servo tasks and 507 low-level functions for directly setting and reading gpios. 508 509 """ 510 511 # Power button press delays in seconds. 512 # 513 # The EC specification says that 8.0 seconds should be enough 514 # for the long power press. However, some platforms need a bit 515 # more time. Empirical testing has found these requirements: 516 # Alex: 8.2 seconds 517 # ZGB: 8.5 seconds 518 # The actual value is set to the largest known necessary value. 519 # 520 # TODO(jrbarnette) Being generous is the right thing to do for 521 # existing platforms, but if this code is to be used for 522 # qualification of new hardware, we should be less generous. 523 SHORT_DELAY = 0.1 524 525 # Maximum number of times to re-read power button on release. 526 GET_RETRY_MAX = 10 527 528 # Delays to deal with DUT state transitions. 529 SLEEP_DELAY = 6 530 BOOT_DELAY = 10 531 532 # Default minimum time interval between 'press' and 'release' 533 # keyboard events. 534 SERVO_KEY_PRESS_DELAY = 0.1 535 536 # Time to toggle recovery switch on and off. 537 REC_TOGGLE_DELAY = 0.1 538 539 # Time to toggle development switch on and off. 540 DEV_TOGGLE_DELAY = 0.1 541 542 # Time between an usb disk plugged-in and detected in the system. 543 USB_DETECTION_DELAY = 5 544 545 # Time to wait before timing out on servo initialization. 546 INIT_TIMEOUT_SECS = 10 547 548 # Time to wait before timing out when extracting firmware images. 549 # 550 # This was increased from 60 seconds to support boards with very 551 # large (>500MB) firmware archives taking longer than expected to 552 # extract firmware on the lab host machines (b/149419503). 553 EXTRACT_TIMEOUT_SECS = 180 554 555 # The VBUS voltage threshold used to detect if VBUS is supplied 556 VBUS_THRESHOLD = 3000.0 557 558 def __init__(self, servo_host, servo_serial=None): 559 """Sets up the servo communication infrastructure. 560 561 @param servo_host: A ServoHost object representing 562 the host running servod. 563 @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost 564 @param servo_serial: Serial number of the servo board. 565 """ 566 # TODO(fdeng): crbug.com/298379 567 # We should move servo_host object out of servo object 568 # to minimize the dependencies on the rest of Autotest. 569 self._servo_host = servo_host 570 self._servo_serial = servo_serial 571 self._servo_type = self.get_servo_version() 572 self._power_state = _PowerStateController(self) 573 self._uart = _Uart(self) 574 self._programmer = None 575 self._prev_log_inode = None 576 self._prev_log_size = 0 577 self._ccd_watchdog_disabled = False 578 579 def __str__(self): 580 """Description of this object and address, for use in errors""" 581 return "<%s '%s:%s'>" % ( 582 type(self).__name__, 583 self._servo_host.hostname, 584 self._servo_host.servo_port) 585 586 @property 587 def _server(self): 588 with _WrapServoErrors( 589 servo=self, description='get_servod_server_proxy()'): 590 return self._servo_host.get_servod_server_proxy() 591 592 @property 593 def servo_serial(self): 594 """Returns the serial number of the servo board.""" 595 return self._servo_serial 596 597 def get_power_state_controller(self): 598 """Return the power state controller for this Servo. 599 600 The power state controller provides board-independent 601 interfaces for reset, power-on, power-off operations. 602 603 """ 604 return self._power_state 605 606 607 def initialize_dut(self, cold_reset=False, enable_main=True): 608 """Initializes a dut for testing purposes. 609 610 This sets various servo signals back to default values 611 appropriate for the target board. By default, if the DUT 612 is already on, it stays on. If the DUT is powered off 613 before initialization, its state afterward is unspecified. 614 615 Rationale: Basic initialization of servo sets the lid open, 616 when there is a lid. This operation won't affect powered on 617 units; however, setting the lid open may power on a unit 618 that's off, depending on the board type and previous state 619 of the device. 620 621 If `cold_reset` is a true value, the DUT and its EC will be 622 reset, and the DUT rebooted in normal mode. 623 624 @param cold_reset If True, cold reset the device after 625 initialization. 626 @param enable_main If True, make sure the main servo device has 627 control of the dut. 628 629 """ 630 if enable_main: 631 self.enable_main_servo_device() 632 633 with _WrapServoErrors( 634 servo=self, description='initialize_dut()->hwinit()'): 635 self._server.hwinit() 636 if self.has_control('usb_mux_oe1'): 637 self.set('usb_mux_oe1', 'on') 638 self.switch_usbkey('off') 639 else: 640 logging.warning('Servod command \'usb_mux_oe1\' is not available. ' 641 'Any USB drive related servo routines will fail.') 642 # Create a record of SBU voltages if this is running support servo (v4, 643 # v4p1). 644 # TODO(coconutruben): eventually, replace this with a metric to track 645 # SBU voltages wrt servo-hw/dut-hw 646 if self.has_control('servo_v4_sbu1_mv'): 647 # Attempt to take a reading of sbu1 and sbu2 multiple times to 648 # account for situations where the two lines exchange hi/lo roles 649 # frequently. 650 for i in range(10): 651 try: 652 sbu1 = int(self.get('servo_v4_sbu1_mv')) 653 sbu2 = int(self.get('servo_v4_sbu2_mv')) 654 logging.info('attempt %d sbu1 %d sbu2 %d', i, sbu1, sbu2) 655 except error.TestFail as e: 656 # This is a nice to have but if reading this fails, it 657 # shouldn't interfere with the test. 658 logging.exception(e) 659 self._uart.start_capture() 660 if cold_reset: 661 if not self._power_state.supported: 662 logging.info('Cold-reset for DUT requested, but servo ' 663 'setup does not support power_state. Skipping.') 664 else: 665 self._power_state.reset() 666 with _WrapServoErrors( 667 servo=self, description='initialize_dut()->get_version()'): 668 version = self._server.get_version() 669 logging.debug('Servo initialized, version is %s', version) 670 671 672 def is_localhost(self): 673 """Is the servod hosted locally? 674 675 Returns: 676 True if local hosted; otherwise, False. 677 """ 678 return self._servo_host.is_localhost() 679 680 681 def get_os_version(self): 682 """Returns the chromeos release version.""" 683 lsb_release_content = self.system_output('cat /etc/lsb-release', 684 ignore_status=True) 685 return lsbrelease_utils.get_chromeos_release_builder_path( 686 lsb_release_content=lsb_release_content) 687 688 689 def get_servod_version(self): 690 """Returns the servod version.""" 691 # TODO: use system_output once servod --sversion prints to stdout 692 try: 693 result = self._servo_host.run('servod --sversion 2>&1') 694 except error.AutoservRunError as e: 695 if 'command execution error' in str(e): 696 # Fall back to version if sversion is not supported yet. 697 result = self._servo_host.run('servod --version 2>&1') 698 return result.stdout.strip() or result.stderr.strip() 699 # An actually unexpected error occurred, just raise. 700 raise e 701 sversion = result.stdout or result.stderr 702 # The sversion output contains 3 lines: 703 # servod v1.0.816-ff8e966 // the extended version with git hash 704 # 2020-04-08 01:10:29 // the time of the latest commit 705 # chromeos-ci-legacy-us-central1-b-x32-55-u8zc // builder information 706 # For debugging purposes, we mainly care about the version, and the 707 # timestamp. 708 return ' '.join(sversion.split()[1:4]) 709 710 def power_long_press(self): 711 """Simulate a long power button press.""" 712 # After a long power press, the EC may ignore the next power 713 # button press (at least on Alex). To guarantee that this 714 # won't happen, we need to allow the EC one second to 715 # collect itself. 716 # long_press is defined as 8.5s in servod 717 self.set_nocheck('power_key', 'long_press') 718 719 720 def power_normal_press(self): 721 """Simulate a normal power button press.""" 722 # press is defined as 1.2s in servod 723 self.set_nocheck('power_key', 'press') 724 725 726 def power_short_press(self): 727 """Simulate a short power button press.""" 728 # tab is defined as 0.2s in servod 729 self.set_nocheck('power_key', 'tab') 730 731 732 def power_key(self, press_secs='tab'): 733 """Simulate a power button press. 734 735 @param press_secs: int, float, str; time to press key in seconds or 736 known shorthand: 'tab' 'press' 'long_press' 737 """ 738 self.set_nocheck('power_key', press_secs) 739 740 741 def pwr_button(self, action='press'): 742 """Simulate a power button press. 743 744 @param action: str; could be press or could be release. 745 """ 746 self.set_nocheck('pwr_button', action) 747 748 749 def lid_open(self): 750 """Simulate opening the lid and raise exception if all attempts fail""" 751 self.set('lid_open', 'yes') 752 753 754 def lid_close(self): 755 """Simulate closing the lid and raise exception if all attempts fail 756 757 Waits 6 seconds to ensure the device is fully asleep before returning. 758 """ 759 self.set('lid_open', 'no') 760 time.sleep(Servo.SLEEP_DELAY) 761 762 763 def vbus_power_get(self): 764 """Get current vbus_power.""" 765 return self.get('vbus_power') 766 767 768 def volume_up(self, timeout=300): 769 """Simulate pushing the volume down button. 770 771 @param timeout: Timeout for setting the volume. 772 """ 773 self.set_get_all(['volume_up:yes', 774 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 775 'volume_up:no']) 776 # we need to wait for commands to take effect before moving on 777 time_left = float(timeout) 778 while time_left > 0.0: 779 value = self.get('volume_up') 780 if value == 'no': 781 return 782 time.sleep(self.SHORT_DELAY) 783 time_left = time_left - self.SHORT_DELAY 784 raise error.TestFail("Failed setting volume_up to no") 785 786 def volume_down(self, timeout=300): 787 """Simulate pushing the volume down button. 788 789 @param timeout: Timeout for setting the volume. 790 """ 791 self.set_get_all(['volume_down:yes', 792 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 793 'volume_down:no']) 794 # we need to wait for commands to take effect before moving on 795 time_left = float(timeout) 796 while time_left > 0.0: 797 value = self.get('volume_down') 798 if value == 'no': 799 return 800 time.sleep(self.SHORT_DELAY) 801 time_left = time_left - self.SHORT_DELAY 802 raise error.TestFail("Failed setting volume_down to no") 803 804 def arrow_up(self, press_secs='tab'): 805 """Simulate arrow up key presses. 806 807 @param press_secs: int, float, str; time to press key in seconds or 808 known shorthand: 'tab' 'press' 'long_press'. 809 """ 810 # TODO: Remove this check after a lab update to include CL:1913684 811 if not self.has_control('arrow_up'): 812 logging.warning('Control arrow_up ignored. ' 813 'Please update hdctools') 814 return 815 self.set_nocheck('arrow_up', press_secs) 816 817 def arrow_down(self, press_secs='tab'): 818 """Simulate arrow down key presses. 819 820 @param press_secs: int, float, str; time to press key in seconds or 821 known shorthand: 'tab' 'press' 'long_press'. 822 """ 823 # TODO: Remove this check after a lab update to include CL:1913684 824 if not self.has_control('arrow_down'): 825 logging.warning('Control arrow_down ignored. ' 826 'Please update hdctools') 827 return 828 self.set_nocheck('arrow_down', press_secs) 829 830 def ctrl_d(self, press_secs='tab'): 831 """Simulate Ctrl-d simultaneous button presses. 832 833 @param press_secs: int, float, str; time to press key in seconds or 834 known shorthand: 'tab' 'press' 'long_press' 835 """ 836 self.set_nocheck('ctrl_d', press_secs) 837 838 839 def ctrl_s(self, press_secs='tab'): 840 """Simulate Ctrl-s simultaneous button presses. 841 842 @param press_secs: int, float, str; time to press key in seconds or 843 known shorthand: 'tab' 'press' 'long_press' 844 """ 845 self.set_nocheck('ctrl_s', press_secs) 846 847 848 def ctrl_u(self, press_secs='tab'): 849 """Simulate Ctrl-u simultaneous button presses. 850 851 @param press_secs: int, float, str; time to press key in seconds or 852 known shorthand: 'tab' 'press' 'long_press' 853 """ 854 self.set_nocheck('ctrl_u', press_secs) 855 856 857 def ctrl_enter(self, press_secs='tab'): 858 """Simulate Ctrl-enter simultaneous button presses. 859 860 @param press_secs: int, float, str; time to press key in seconds or 861 known shorthand: 'tab' 'press' 'long_press' 862 """ 863 self.set_nocheck('ctrl_enter', press_secs) 864 865 866 def ctrl_key(self, press_secs='tab'): 867 """Simulate Enter key button press. 868 869 @param press_secs: int, float, str; time to press key in seconds or 870 known shorthand: 'tab' 'press' 'long_press' 871 """ 872 self.set_nocheck('ctrl_key', press_secs) 873 874 875 def enter_key(self, press_secs='tab'): 876 """Simulate Enter key button press. 877 878 @param press_secs: int, float, str; time to press key in seconds or 879 known shorthand: 'tab' 'press' 'long_press' 880 """ 881 self.set_nocheck('enter_key', press_secs) 882 883 884 def refresh_key(self, press_secs='tab'): 885 """Simulate Refresh key (F3) button press. 886 887 @param press_secs: int, float, str; time to press key in seconds or 888 known shorthand: 'tab' 'press' 'long_press' 889 """ 890 self.set_nocheck('refresh_key', press_secs) 891 892 893 def ctrl_refresh_key(self, press_secs='tab'): 894 """Simulate Ctrl and Refresh (F3) simultaneous press. 895 896 This key combination is an alternative of Space key. 897 898 @param press_secs: int, float, str; time to press key in seconds or 899 known shorthand: 'tab' 'press' 'long_press' 900 """ 901 self.set_nocheck('ctrl_refresh_key', press_secs) 902 903 904 def imaginary_key(self, press_secs='tab'): 905 """Simulate imaginary key button press. 906 907 Maps to a key that doesn't physically exist. 908 909 @param press_secs: int, float, str; time to press key in seconds or 910 known shorthand: 'tab' 'press' 'long_press' 911 """ 912 self.set_nocheck('imaginary_key', press_secs) 913 914 915 def sysrq_x(self, press_secs='tab'): 916 """Simulate Alt VolumeUp X simulataneous press. 917 918 This key combination is the kernel system request (sysrq) X. 919 920 @param press_secs: int, float, str; time to press key in seconds or 921 known shorthand: 'tab' 'press' 'long_press' 922 """ 923 self.set_nocheck('sysrq_x', press_secs) 924 925 926 def toggle_recovery_switch(self): 927 """Toggle recovery switch on and off.""" 928 self.enable_recovery_mode() 929 time.sleep(self.REC_TOGGLE_DELAY) 930 self.disable_recovery_mode() 931 932 933 def enable_recovery_mode(self): 934 """Enable recovery mode on device.""" 935 self.set('rec_mode', 'on') 936 937 938 def disable_recovery_mode(self): 939 """Disable recovery mode on device.""" 940 self.set('rec_mode', 'off') 941 942 943 def toggle_development_switch(self): 944 """Toggle development switch on and off.""" 945 self.enable_development_mode() 946 time.sleep(self.DEV_TOGGLE_DELAY) 947 self.disable_development_mode() 948 949 950 def enable_development_mode(self): 951 """Enable development mode on device.""" 952 self.set('dev_mode', 'on') 953 954 955 def disable_development_mode(self): 956 """Disable development mode on device.""" 957 self.set('dev_mode', 'off') 958 959 def boot_devmode(self): 960 """Boot a dev-mode device that is powered off.""" 961 self.power_short_press() 962 self.pass_devmode() 963 964 965 def pass_devmode(self): 966 """Pass through boot screens in dev-mode.""" 967 time.sleep(Servo.BOOT_DELAY) 968 self.ctrl_d() 969 time.sleep(Servo.BOOT_DELAY) 970 971 972 def get_board(self): 973 """Get the board connected to servod.""" 974 with _WrapServoErrors(servo=self, description='get_board()'): 975 return self._server.get_board() 976 977 978 def get_base_board(self): 979 """Get the board of the base connected to servod.""" 980 try: 981 with _WrapServoErrors(servo=self, description='get_base_board()'): 982 return self._server.get_base_board() 983 except six.moves.xmlrpc_client.Fault as e: 984 # TODO(waihong): Remove the following compatibility check when 985 # the new versions of hdctools are deployed. 986 if 'not supported' in str(e): 987 logging.warning('The servod is too old that get_base_board ' 988 'not supported.') 989 return '' 990 raise 991 992 def get_ec_board(self): 993 """Get the board name from EC.""" 994 if self.has_control('active_v4_device'): 995 # If servo v4 is allowing dual_v4 devices, then choose the 996 # active device. 997 active_device = self.get('active_v4_device') 998 if active_device == self.get_main_servo_device(): 999 active_device = '' 1000 else: 1001 active_device = '' 1002 return self.get('ec_board', prefix=active_device) 1003 1004 def get_ec_active_copy(self): 1005 """Get the active copy of the EC image.""" 1006 return self.get('ec_active_copy') 1007 1008 def has_control(self, ctrl_name, prefix=''): 1009 """Query servod server to determine if |ctrl_name| is a valid control. 1010 1011 @param ctrl_name Name of the control. 1012 @param prefix: prefix to route control to correct servo device. 1013 1014 @returns: true if |ctrl_name| is a known control, false otherwise. 1015 """ 1016 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1017 try: 1018 # If the control exists, doc() will work. 1019 with _WrapServoErrors( 1020 servo=self, 1021 description='has_control(%s)->doc()' % ctrl_name): 1022 self._server.doc(ctrl_name) 1023 return True 1024 except ControlUnavailableError: 1025 return False 1026 1027 def _build_ctrl_name(self, ctrl_name, prefix): 1028 """Helper to build the control name if a prefix is used. 1029 1030 @param ctrl_name Name of the control. 1031 @param prefix: prefix to route control to correct servo device. 1032 1033 @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty. 1034 """ 1035 assert ctrl_name 1036 if prefix: 1037 return '%s.%s' % (prefix, ctrl_name) 1038 return ctrl_name 1039 1040 def get(self, ctrl_name, prefix=''): 1041 """Get the value of a gpio from Servod. 1042 1043 @param ctrl_name Name of the control. 1044 @param prefix: prefix to route control to correct servo device. 1045 1046 @returns: server response to |ctrl_name| request. 1047 1048 @raise ControlUnavailableError: if |ctrl_name| not a known control. 1049 @raise error.TestFail: for all other failures doing get(). 1050 """ 1051 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1052 with _WrapServoErrors( 1053 servo=self, description='Getting %s' % ctrl_name): 1054 return self._server.get(ctrl_name) 1055 1056 def set(self, ctrl_name, ctrl_value, prefix=''): 1057 """Set and check the value of a gpio using Servod. 1058 1059 @param ctrl_name: Name of the control. 1060 @param ctrl_value: New setting for the control. 1061 @param prefix: prefix to route control to correct servo device. 1062 @raise error.TestFail: if the control value fails to change. 1063 """ 1064 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1065 self.set_nocheck(ctrl_name, ctrl_value) 1066 retry_count = Servo.GET_RETRY_MAX 1067 actual_value = self.get(ctrl_name) 1068 while ctrl_value != actual_value and retry_count: 1069 logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value, 1070 retry_count) 1071 retry_count -= 1 1072 time.sleep(Servo.SHORT_DELAY) 1073 actual_value = self.get(ctrl_name) 1074 1075 if ctrl_value != actual_value: 1076 raise error.TestFail( 1077 'Servo failed to set %s to %s. Got %s.' 1078 % (ctrl_name, ctrl_value, actual_value)) 1079 1080 def set_nocheck(self, ctrl_name, ctrl_value, prefix=''): 1081 """Set the value of a gpio using Servod. 1082 1083 @param ctrl_name Name of the control. 1084 @param ctrl_value New setting for the control. 1085 @param prefix: prefix to route control to correct servo device. 1086 1087 @raise ControlUnavailableError: if |ctrl_name| not a known control. 1088 @raise error.TestFail: for all other failures doing set(). 1089 """ 1090 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1091 # The real danger here is to pass a None value through the xmlrpc. 1092 assert ctrl_value is not None 1093 description = 'Setting %s to %r' % (ctrl_name, ctrl_value) 1094 logging.debug('%s', description) 1095 with _WrapServoErrors(servo=self, description=description): 1096 self._server.set(ctrl_name, ctrl_value) 1097 1098 def set_get_all(self, controls): 1099 """Set &| get one or more control values. 1100 1101 @param controls: list of strings, controls to set &| get. 1102 1103 @raise: error.TestError in case error occurs setting/getting values. 1104 """ 1105 description = 'Set/get all: %s' % str(controls) 1106 logging.debug('%s', description) 1107 with _WrapServoErrors(servo=self, description=description): 1108 return self._server.set_get_all(controls) 1109 1110 1111 def probe_host_usb_dev(self): 1112 """Probe the USB disk device plugged-in the servo from the host side. 1113 1114 It uses servod to discover if there is a usb device attached to it. 1115 1116 @return: String of USB disk path (e.g. '/dev/sdb') or None. 1117 """ 1118 # Set up Servo's usb mux. 1119 return self.get('image_usbkey_dev') or None 1120 1121 1122 def image_to_servo_usb(self, image_path=None, 1123 make_image_noninteractive=False, 1124 power_off_dut=True): 1125 """Install an image to the USB key plugged into the servo. 1126 1127 This method may copy any image to the servo USB key including a 1128 recovery image or a test image. These images are frequently used 1129 for test purposes such as restoring a corrupted image or conducting 1130 an upgrade of ec/fw/kernel as part of a test of a specific image part. 1131 1132 @param image_path: Path on the host to the recovery image. 1133 @param make_image_noninteractive: Make the recovery image 1134 noninteractive, therefore the DUT 1135 will reboot automatically after 1136 installation. 1137 @param power_off_dut: To put the DUT in power off mode. 1138 """ 1139 # We're about to start plugging/unplugging the USB key. We 1140 # don't know the state of the DUT, or what it might choose 1141 # to do to the device after hotplug. To avoid surprises, 1142 # force the DUT to be off. 1143 if power_off_dut: 1144 self._power_state.power_off() 1145 1146 if image_path: 1147 logging.info('Searching for usb device and copying image to it. ' 1148 'Please wait a few minutes...') 1149 # The servod control automatically sets up the host in the host 1150 # direction. 1151 try: 1152 self.set_nocheck('download_image_to_usb_dev', image_path) 1153 except error.TestFail as e: 1154 logging.error('Failed to transfer requested image to USB. %s.' 1155 'Please take a look at Servo Logs.', str(e)) 1156 raise error.AutotestError('Download image to usb failed.') 1157 if make_image_noninteractive: 1158 logging.info('Making image noninteractive') 1159 try: 1160 dev = self.probe_host_usb_dev() 1161 if not dev: 1162 # This is fine but also should never happen: if we 1163 # successfully download an image but somehow cannot 1164 # find the stick again, it needs to be investigated. 1165 raise error.TestFail('No image usb key detected ' 1166 'after successful download. ' 1167 'Please investigate.') 1168 # The modification has to happen on partition 1. 1169 dev_partition = '%s1' % dev 1170 self.set_nocheck('make_usb_dev_image_noninteractive', 1171 dev_partition) 1172 except error.TestFail as e: 1173 logging.error('Failed to make image noninteractive. %s.' 1174 'Please take a look at Servo Logs.', 1175 str(e)) 1176 1177 def boot_in_recovery_mode(self, snk_mode=False): 1178 """Boot host DUT in recovery mode. 1179 1180 @param snk_mode: If True, switch servo_v4 role to 'snk' mode before 1181 boot DUT into recovery mode. 1182 """ 1183 # This call has a built-in delay to ensure that we wait a timeout 1184 # for the stick to enumerate and settle on the DUT side. 1185 self.switch_usbkey('dut') 1186 # Switch servo_v4 mode to snk as the DUT won't able to see usb drive 1187 # in recovery mode if the servo is in src mode(see crbug.com/1129165). 1188 if snk_mode: 1189 logging.info('Setting servo_v4 role to snk mode in order to make' 1190 ' the DUT can see usb drive while in recovery mode.') 1191 self.set_servo_v4_role('snk') 1192 1193 try: 1194 self._power_state.power_on(rec_mode=self._power_state.REC_ON) 1195 except error.TestFail as e: 1196 self.set_servo_v4_role('src') 1197 logging.error('Failed to boot DUT in recovery mode. %s.', str(e)) 1198 raise error.AutotestError('Failed to boot DUT in recovery mode.') 1199 1200 def install_recovery_image(self, 1201 image_path=None, 1202 make_image_noninteractive=False, 1203 snk_mode=False): 1204 """Install the recovery image specified by the path onto the DUT. 1205 1206 This method uses google recovery mode to install a recovery image 1207 onto a DUT through the use of a USB stick that is mounted on a servo 1208 board specified by the usb_dev. If no image path is specified 1209 we use the recovery image already on the usb image. 1210 1211 This method will switch servo_v4 role to 'snk' mode in order to make 1212 the DUT can see the usb drive plugged on servo, the caller should 1213 set servo_v4 role back to 'src' mode one the DUT exit recovery mode. 1214 1215 @param image_path: Path on the host to the recovery image. 1216 @param make_image_noninteractive: Make the recovery image 1217 noninteractive, therefore the DUT will reboot automatically 1218 after installation. 1219 @param snk_mode: If True, switch servo_v4 role to 'snk' mode before 1220 boot DUT into recovery mode. 1221 """ 1222 self.image_to_servo_usb(image_path, make_image_noninteractive) 1223 # Give the DUT some time to power_off if we skip 1224 # download image to usb. (crbug.com/982993) 1225 if not image_path: 1226 time.sleep(10) 1227 self.boot_in_recovery_mode(snk_mode=snk_mode) 1228 1229 1230 def _scp_image(self, image_path): 1231 """Copy image to the servo host. 1232 1233 When programming a firmware image on the DUT, the image must be 1234 located on the host to which the servo device is connected. Sometimes 1235 servo is controlled by a remote host, in this case the image needs to 1236 be transferred to the remote host. This adds the servod port number, to 1237 make sure tests for different DUTs don't trample on each other's files. 1238 Along with the firmware image, any subsidiary files in the same 1239 directory shall be copied to the host as well. 1240 1241 @param image_path: a string, name of the firmware image file to be 1242 transferred. 1243 @return: a string, full path name of the copied file on the remote. 1244 """ 1245 src_path = os.path.dirname(image_path) 1246 dest_path = os.path.join('/tmp', 'dut_%d' % self._servo_host.servo_port) 1247 logging.info('Copying %s to %s', src_path, dest_path) 1248 # Copy a directory, src_path to dest_path. send_file() will create a 1249 # directory named basename(src_path) under dest_path, and copy all files 1250 # in src_path to the destination. 1251 self._servo_host.send_file(src_path, dest_path, delete_dest=True) 1252 1253 # Make a image path of the destination. 1254 # e.g. /tmp/dut_9999/EC/ec.bin 1255 rv = os.path.join(dest_path, os.path.basename(src_path)) 1256 return os.path.join(rv, os.path.basename(image_path)) 1257 1258 1259 def system(self, command, timeout=3600): 1260 """Execute the passed in command on the servod host. 1261 1262 @param command Command to be executed. 1263 @param timeout Maximum number of seconds of runtime allowed. Default to 1264 1 hour. 1265 """ 1266 logging.info('Will execute on servo host: %s', command) 1267 self._servo_host.run(command, timeout=timeout) 1268 1269 1270 def system_output(self, command, timeout=3600, 1271 ignore_status=False, args=()): 1272 """Execute the passed in command on the servod host, return stdout. 1273 1274 @param command a string, the command to execute 1275 @param timeout an int, max number of seconds to wait til command 1276 execution completes. Default to 1 hour. 1277 @param ignore_status a Boolean, if true - ignore command's nonzero exit 1278 status, otherwise an exception will be thrown 1279 @param args a tuple of strings, each becoming a separate command line 1280 parameter for the command 1281 @return command's stdout as a string. 1282 """ 1283 return self._servo_host.run(command, timeout=timeout, 1284 ignore_status=ignore_status, 1285 args=args).stdout.strip() 1286 1287 1288 def get_servo_version(self, active=False): 1289 """Get the version of the servo, e.g., servo_v2 or servo_v3. 1290 1291 @param active: Only return the servo type with the active device. 1292 @return: The version of the servo. 1293 1294 """ 1295 with _WrapServoErrors( 1296 servo=self, description='get_servo_version()->get_version()'): 1297 servo_type = self._server.get_version() 1298 if '_and_' not in servo_type or not active: 1299 return servo_type 1300 1301 # If servo v4 is using ccd and servo micro, modify the servo type to 1302 # reflect the active device. 1303 active_device = self.get('active_v4_device') 1304 if active_device in servo_type: 1305 logging.info('%s is active', active_device) 1306 return 'servo_v4_with_' + active_device 1307 1308 logging.warn("%s is active even though it's not in servo type", 1309 active_device) 1310 return servo_type 1311 1312 1313 def get_servo_type(self): 1314 return self._servo_type 1315 1316 1317 def get_main_servo_device(self): 1318 """Return the main servo device""" 1319 return self._servo_type.split('_with_')[-1].split('_and_')[0] 1320 1321 1322 def enable_main_servo_device(self): 1323 """Make sure the main device has control of the dut.""" 1324 if not self.has_control('active_v4_device'): 1325 return 1326 self.set('active_v4_device', self.get_main_servo_device()) 1327 1328 1329 def main_device_is_ccd(self): 1330 """Whether the main servo device (no prefixes) is a ccd device.""" 1331 with _WrapServoErrors( 1332 servo=self, description='main_device_is_ccd()->get_version()'): 1333 servo = self._server.get_version() 1334 return 'ccd_cr50' in servo and 'servo_micro' not in servo 1335 1336 1337 def main_device_is_flex(self): 1338 """Whether the main servo device (no prefixes) is a legacy device.""" 1339 return not self.main_device_is_ccd() 1340 1341 1342 def main_device_is_active(self): 1343 """Return whether the main device is the active device. 1344 1345 This is only relevant for a dual setup with ccd and legacy on the same 1346 DUT. The main device is the servo that has no prefix on its controls. 1347 This helper answers the question whether that device is also the 1348 active device or not. 1349 """ 1350 # TODO(coconutruben): The current implementation of the dual setup only 1351 # ever has legacy as the main device. Therefore, it suffices to ask 1352 # whether the active device is ccd. 1353 if not self.dts_mode_is_valid(): 1354 # Use dts support as a proxy to whether the servo setup could 1355 # support a dual role. Only those setups now support legacy and ccd. 1356 return True 1357 active_device = self.get('active_v4_device') 1358 return 'ccd_cr50' not in active_device 1359 1360 def _initialize_programmer(self, rw_only=False): 1361 """Initialize the firmware programmer. 1362 1363 @param rw_only: True to initialize a programmer which only 1364 programs the RW portions. 1365 """ 1366 if self._programmer: 1367 return 1368 # Initialize firmware programmer 1369 if self._servo_type.startswith('servo_v2'): 1370 self._programmer = firmware_programmer.ProgrammerV2(self) 1371 self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self) 1372 # Both servo v3 and v4 use the same programming methods so just leverage 1373 # ProgrammerV3 for servo v4 as well. 1374 elif (self._servo_type.startswith('servo_v3') or 1375 self._servo_type.startswith('servo_v4')): 1376 self._programmer = firmware_programmer.ProgrammerV3(self) 1377 self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self) 1378 else: 1379 raise error.TestError( 1380 'No firmware programmer for servo version: %s' % 1381 self._servo_type) 1382 1383 1384 def program_bios(self, image, rw_only=False, copy_image=True): 1385 """Program bios on DUT with given image. 1386 1387 @param image: a string, file name of the BIOS image to program 1388 on the DUT. 1389 @param rw_only: True to only program the RW portion of BIOS. 1390 @param copy_image: True indicates we need scp the image to servohost 1391 while False means the image file is already on 1392 servohost. 1393 1394 """ 1395 self._initialize_programmer() 1396 # We don't need scp if test runs locally. 1397 if copy_image and not self.is_localhost(): 1398 image = self._scp_image(image) 1399 if rw_only: 1400 self._programmer_rw.program_bios(image) 1401 else: 1402 self._programmer.program_bios(image) 1403 1404 1405 def program_ec(self, image, rw_only=False, copy_image=True): 1406 """Program ec on DUT with given image. 1407 1408 @param image: a string, file name of the EC image to program 1409 on the DUT. 1410 @param rw_only: True to only program the RW portion of EC. 1411 @param copy_image: True indicates we need scp the image to servohost 1412 while False means the image file is already on 1413 servohost. 1414 """ 1415 self._initialize_programmer() 1416 # We don't need scp if test runs locally. 1417 if copy_image and not self.is_localhost(): 1418 image = self._scp_image(image) 1419 if rw_only: 1420 self._programmer_rw.program_ec(image) 1421 else: 1422 self._programmer.program_ec(image) 1423 1424 1425 def extract_ec_image(self, board, model, tarball_path): 1426 """Helper function to extract EC image from downloaded tarball. 1427 1428 @param board: The DUT board name. 1429 @param model: The DUT model name. 1430 @param tarball_path: The path of the downloaded build tarball. 1431 1432 @return: Path to extracted EC image. 1433 """ 1434 1435 # Ignore extracting EC image and re-programming if not a Chrome EC 1436 chrome_ec = FAFTConfig(board).chrome_ec 1437 if not chrome_ec: 1438 logging.warn('Not a Chrome EC, ignore re-programming it') 1439 return None 1440 1441 # Try to retrieve firmware build target from the version reported 1442 # by the EC. If this doesn't work, we assume the firmware build 1443 # target is the same as the model name. 1444 try: 1445 fw_target = self.get_ec_board() 1446 except Exception as err: 1447 logging.warn('Failed to get ec_board value; ignoring') 1448 fw_target = model 1449 pass 1450 1451 # Array of candidates for EC image 1452 ec_image_candidates = [ 1453 'ec.bin', 1454 '%s/ec.bin' % fw_target, 1455 '%s/ec.bin' % board 1456 ] 1457 1458 # Extract EC image from tarball 1459 dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC') 1460 ec_image = _extract_image_from_tarball(tarball_path, 1461 dest_dir, 1462 ec_image_candidates, 1463 self.EXTRACT_TIMEOUT_SECS) 1464 1465 # Check if EC image was found and return path or raise error 1466 if ec_image: 1467 # Extract subsidiary binaries for EC 1468 # Find a monitor binary for NPCX_UUT chip type, if any. 1469 mon_candidates = [candidate.replace('ec.bin', 'npcx_monitor.bin') 1470 for candidate in ec_image_candidates] 1471 _extract_image_from_tarball(tarball_path, dest_dir, mon_candidates, 1472 self.EXTRACT_TIMEOUT_SECS) 1473 1474 return os.path.join(dest_dir, ec_image) 1475 else: 1476 raise error.TestError('Failed to extract EC image from %s' % 1477 tarball_path) 1478 1479 1480 def extract_bios_image(self, board, model, tarball_path): 1481 """Helper function to extract BIOS image from downloaded tarball. 1482 1483 @param board: The DUT board name. 1484 @param model: The DUT model name. 1485 @param tarball_path: The path of the downloaded build tarball. 1486 1487 @return: Path to extracted BIOS image. 1488 """ 1489 1490 # Try to retrieve firmware build target from the version reported 1491 # by the EC. If this doesn't work, we assume the firmware build 1492 # target is the same as the model name. 1493 try: 1494 fw_target = self.get_ec_board() 1495 except Exception as err: 1496 logging.warn('Failed to get ec_board value; ignoring') 1497 fw_target = model 1498 pass 1499 1500 # Array of candidates for BIOS image 1501 bios_image_candidates = [ 1502 'image.bin', 1503 'image-%s.bin' % fw_target, 1504 'image-%s.bin' % board 1505 ] 1506 1507 # Extract BIOS image from tarball 1508 dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS') 1509 bios_image = _extract_image_from_tarball(tarball_path, 1510 dest_dir, 1511 bios_image_candidates, 1512 self.EXTRACT_TIMEOUT_SECS) 1513 1514 # Check if BIOS image was found and return path or raise error 1515 if bios_image: 1516 return os.path.join(dest_dir, bios_image) 1517 else: 1518 raise error.TestError('Failed to extract BIOS image from %s' % 1519 tarball_path) 1520 1521 1522 def switch_usbkey(self, usb_state): 1523 """Connect USB flash stick to either host or DUT, or turn USB port off. 1524 1525 This function switches the servo multiplexer to provide electrical 1526 connection between the USB port J3 and either host or DUT side. It 1527 can also be used to turn the USB port off. 1528 1529 @param usb_state: A string, one of 'dut', 'host', or 'off'. 1530 'dut' and 'host' indicate which side the 1531 USB flash device is required to be connected to. 1532 'off' indicates turning the USB port off. 1533 1534 @raise: error.TestError in case the parameter is not 'dut' 1535 'host', or 'off'. 1536 """ 1537 if self.get_usbkey_state() == usb_state: 1538 return 1539 1540 if usb_state == 'off': 1541 self.set_nocheck('image_usbkey_pwr', 'off') 1542 return 1543 elif usb_state == 'host': 1544 mux_direction = 'servo_sees_usbkey' 1545 elif usb_state == 'dut': 1546 mux_direction = 'dut_sees_usbkey' 1547 else: 1548 raise error.TestError('Unknown USB state request: %s' % usb_state) 1549 # On the servod side, this control will ensure that 1550 # - the port is power cycled if it is changing directions 1551 # - the port ends up in a powered state after this call 1552 # - if facing the host side, the call only returns once a usb block 1553 # device is detected, or after a generous timeout (10s) 1554 self.set('image_usbkey_direction', mux_direction) 1555 # As servod makes no guarantees when switching to the dut side, 1556 # add a detection delay here when facing the dut. 1557 if mux_direction == 'dut_sees_usbkey': 1558 time.sleep(self.USB_DETECTION_DELAY) 1559 1560 def get_usbkey_state(self): 1561 """Get which side USB is connected to or 'off' if usb power is off. 1562 1563 @return: A string, one of 'dut', 'host', or 'off'. 1564 """ 1565 pwr = self.get('image_usbkey_pwr') 1566 if pwr == 'off': 1567 return pwr 1568 direction = self.get('image_usbkey_direction') 1569 if direction == 'servo_sees_usbkey': 1570 return 'host' 1571 if direction == 'dut_sees_usbkey': 1572 return 'dut' 1573 raise error.TestFail('image_usbkey_direction set an unknown mux ' 1574 'direction: %s' % direction) 1575 1576 def set_servo_v4_role(self, role): 1577 """Set the power role of servo v4, either 'src' or 'snk'. 1578 1579 It does nothing if not a servo v4. 1580 1581 @param role: Power role for DUT port on servo v4, either 'src' or 'snk'. 1582 """ 1583 if not self._servo_type.startswith('servo_v4'): 1584 logging.debug('Not a servo v4, unable to set role to %s.', role) 1585 return 1586 1587 if not self.has_control('servo_v4_role'): 1588 logging.debug( 1589 'Servo does not has servo_v4_role control, unable' 1590 ' to set role to %s.', role) 1591 return 1592 1593 value = self.get('servo_v4_role') 1594 if value != role: 1595 self.set_nocheck('servo_v4_role', role) 1596 else: 1597 logging.debug('Already in the role: %s.', role) 1598 1599 def get_servo_v4_role(self): 1600 """Get the power role of servo v4, either 'src' or 'snk'. 1601 1602 It returns None if not a servo v4. 1603 """ 1604 if not self._servo_type.startswith('servo_v4'): 1605 logging.debug('Not a servo v4, unable to get role') 1606 return None 1607 1608 if not self.has_control('servo_v4_role'): 1609 logging.debug( 1610 'Servo does not has servo_v4_role control, unable' 1611 ' to get the role.') 1612 return None 1613 1614 return self.get('servo_v4_role') 1615 1616 def set_servo_v4_pd_comm(self, en): 1617 """Set the PD communication of servo v4, either 'on' or 'off'. 1618 1619 It does nothing if not a servo v4. 1620 1621 @param en: a string of 'on' or 'off' for PD communication. 1622 """ 1623 if self._servo_type.startswith('servo_v4'): 1624 self.set_nocheck('servo_v4_pd_comm', en) 1625 else: 1626 logging.debug('Not a servo v4, unable to set PD comm to %s.', en) 1627 1628 def supports_built_in_pd_control(self): 1629 """Return whether the servo type supports pd charging and control.""" 1630 if 'servo_v4' not in self._servo_type: 1631 # Only servo v4 supports this feature. 1632 logging.info('%r type does not support pd control.', 1633 self._servo_type) 1634 return False 1635 # On servo v4, it still needs to be the type-c version. 1636 if not self.get('servo_v4_type') == 'type-c': 1637 logging.info('PD controls require a type-c servo v4.') 1638 return False 1639 # Lastly, one cannot really do anything without a plugged in charger. 1640 chg_port_mv = self.get('ppchg5_mv') 1641 if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV: 1642 logging.warn('It appears that no charger is plugged into servo v4. ' 1643 'Charger port voltage: %dmV', chg_port_mv) 1644 return False 1645 logging.info('Charger port voltage: %dmV', chg_port_mv) 1646 return True 1647 1648 def dts_mode_is_valid(self): 1649 """Return whether servo setup supports dts mode control for cr50.""" 1650 if 'servo_v4' not in self._servo_type: 1651 # Only servo v4 supports this feature. 1652 logging.debug('%r type does not support dts mode control.', 1653 self._servo_type) 1654 return False 1655 # On servo v4, it still needs ot be the type-c version. 1656 if not 'type-c' == self.get('servo_v4_type'): 1657 logging.info('DTS controls require a type-c servo v4.') 1658 return False 1659 return True 1660 1661 def dts_mode_is_safe(self): 1662 """Return whether servo setup supports dts mode without losing access. 1663 1664 DTS mode control exists but the main device might go through ccd. 1665 In that case, it's only safe to control dts mode if the main device 1666 is legacy as otherwise the connection to the main device cuts out. 1667 """ 1668 return self.dts_mode_is_valid() and self.main_device_is_flex() 1669 1670 def get_dts_mode(self): 1671 """Return servo dts mode. 1672 1673 @returns: on/off whether dts is on or off 1674 """ 1675 if not self.dts_mode_is_valid(): 1676 logging.info('Not a valid servo setup. Unable to get dts mode.') 1677 return 1678 return self.get('servo_v4_dts_mode') 1679 1680 def ccd_watchdog_enable(self, enable): 1681 """Control the ccd watchdog.""" 1682 if 'ccd' not in self._servo_type: 1683 return 1684 if self._ccd_watchdog_disabled and enable: 1685 logging.info('CCD watchdog disabled for test') 1686 return 1687 control = 'watchdog_add' if enable else 'watchdog_remove' 1688 self.set_nocheck(control, 'ccd') 1689 1690 def disable_ccd_watchdog_for_test(self): 1691 """Prevent servo from enabling the watchdog.""" 1692 self._ccd_watchdog_disabled = True 1693 self.ccd_watchdog_enable(False) 1694 1695 def allow_ccd_watchdog_for_test(self): 1696 """Allow servo to enable the ccd watchdog.""" 1697 self._ccd_watchdog_disabled = False 1698 self.ccd_watchdog_enable(True) 1699 1700 def set_dts_mode(self, state): 1701 """Set servo dts mode to off or on. 1702 1703 It does nothing if not a servo v4. Disable the ccd watchdog if we're 1704 disabling dts mode. CCD will disconnect. The watchdog only allows CCD 1705 to disconnect for 10 seconds until it kills servod. Disable the 1706 watchdog, so CCD can stay disconnected indefinitely. 1707 1708 @param state: Set servo v4 dts mode 'off' or 'on'. 1709 """ 1710 if not self.dts_mode_is_valid(): 1711 logging.info('Not a valid servo setup. Unable to set dts mode %s.', 1712 state) 1713 return 1714 1715 enable_watchdog = state == 'on' 1716 1717 if not enable_watchdog: 1718 self.ccd_watchdog_enable(False) 1719 1720 self.set_nocheck('servo_v4_dts_mode', state) 1721 1722 if enable_watchdog: 1723 self.ccd_watchdog_enable(True) 1724 1725 1726 def _get_servo_type_fw_version(self, servo_type, prefix=''): 1727 """Helper to handle fw retrieval for micro/v4 vs ccd. 1728 1729 @param servo_type: one of 'servo_v4', 'servo_micro', 'ccd_cr50' 1730 @param prefix: whether the control has a prefix 1731 1732 @returns: fw version for non-ccd devices, cr50 version for ccd device 1733 """ 1734 if servo_type == 'ccd_cr50': 1735 # ccd_cr50 runs on cr50, so need to query the cr50 fw. 1736 servo_type = 'cr50' 1737 cmd = '%s_version' % servo_type 1738 try: 1739 return self.get(cmd, prefix=prefix) 1740 except error.TestFail: 1741 # Do not fail here, simply report the version as unknown. 1742 logging.warn('Unable to query %r to get servo fw version.', cmd) 1743 return 'unknown' 1744 1745 1746 def get_servo_fw_versions(self): 1747 """Retrieve a summary of attached servos and their firmware. 1748 1749 Note: that only the Google firmware owned servos supports this e.g. 1750 micro, v4, etc. For anything else, the dictionary will have no entry. 1751 If no device is has Google owned firmware (e.g. v3) then the result 1752 is an empty dictionary. 1753 1754 @returns: dict, a collection of each attached servo & their firmware. 1755 """ 1756 def get_fw_version_tag(tag, dev): 1757 return '%s_version.%s' % (dev, tag) 1758 1759 fw_versions = {} 1760 if 'servo_v4' not in self._servo_type: 1761 return {} 1762 v4_tag = get_fw_version_tag('support', 'servo_v4') 1763 fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_v4') 1764 if 'with' in self._servo_type: 1765 dut_devs = self._servo_type.split('_with_')[1].split('_and_') 1766 main_tag = get_fw_version_tag('main', dut_devs[0]) 1767 fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0]) 1768 if len(dut_devs) == 2: 1769 # Right now, the only way for this to happen is for a dual setup 1770 # to exist where ccd is attached on top of servo micro. Thus, we 1771 # know that the prefix is ccd_cr50 and the type is ccd_cr50. 1772 # TODO(coconutruben): If the new servod is not deployed by 1773 # the time that there are more cases of '_and_' devices, 1774 # this needs to be reworked. 1775 dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1]) 1776 fw = self._get_servo_type_fw_version(dut_devs[1], 'ccd_cr50') 1777 fw_versions[dual_tag] = fw 1778 return fw_versions 1779 1780 @property 1781 def uart_logs_dir(self): 1782 """Return the directory to save UART logs.""" 1783 return self._uart.logs_dir if self._uart else "" 1784 1785 1786 @uart_logs_dir.setter 1787 def uart_logs_dir(self, logs_dir): 1788 """Set directory to save UART logs. 1789 1790 @param logs_dir String of directory name.""" 1791 self._uart.logs_dir = logs_dir 1792 1793 def get_uart_logfile(self, uart): 1794 """Return the path to the uart log file.""" 1795 return self._uart.get_logfile(uart) 1796 1797 def record_uart_capture(self, outdir=None): 1798 """Save uart stream output.""" 1799 if outdir and not self.uart_logs_dir: 1800 self.uart_logs_dir = outdir 1801 self._uart.dump() 1802 1803 def close(self, outdir=None): 1804 """Close the servo object.""" 1805 # We want to ensure that servo_v4 is in src mode to avoid DUTs 1806 # left in discharge state after a task. 1807 try: 1808 self.set_servo_v4_role('src') 1809 except Exception as e: 1810 logging.info( 1811 'Unexpected error while setting servo_v4 role' 1812 ' to src; %s', e) 1813 1814 self._uart.stop_capture() 1815 self.record_uart_capture(outdir) 1816 1817 def ec_reboot(self): 1818 """Reboot Just the embedded controller.""" 1819 self.set_nocheck('ec_uart_flush', 'off') 1820 self.set_nocheck('ec_uart_cmd', 'reboot') 1821 self.set_nocheck('ec_uart_flush', 'on') 1822 1823 def get_vbus_voltage(self): 1824 """Get the voltage of VBUS'. 1825 1826 @returns The voltage of VBUS, if vbus_voltage is supported. 1827 None , if vbus_voltage is not supported. 1828 """ 1829 if not self.has_control('vbus_voltage'): 1830 logging.debug('Servo does not have vbus_voltage control,' 1831 'unable to get vbus voltage') 1832 return None 1833 1834 return self.get('vbus_voltage') 1835