1# Lint as: python2, python3 2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5# 6# Expects to be run in an environment with sudo and no interactive password 7# prompt, such as within the Chromium OS development chroot. 8 9import ast 10import logging 11import os 12import re 13import six 14import six.moves.xmlrpc_client 15import six.moves.http_client 16import time 17 18from autotest_lib.client.common_lib import error 19from autotest_lib.client.common_lib import lsbrelease_utils 20from autotest_lib.client.common_lib import seven 21from autotest_lib.server import utils as server_utils 22from autotest_lib.server.cros.servo import firmware_programmer 23from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig 24 25 26# Regex to match XMLRPC errors due to a servod control not existing. 27# Servod uses both 'No control %s' and 'No control named %s' in exceptions. 28NO_CONTROL_RE = re.compile(r'No control(?: named)? (?P<name>\w*\.?\w*)') 29 30# Please see servo/drv/pty_driver.py for error messages to match. 31 32# This common prefix can apply to all subtypes of console errors. 33# The first portion is an optional qualifier of the type 34# of error that occurred. Each error is or'd. 35CONSOLE_COMMON_RE = (r'((Timeout waiting for response|' 36 r'Known error [\w\'\".\s]+). )?' 37 # The second portion is an optional name for the console 38 # source 39 r'(\w+\: )?') 40 41# Regex to match XMLRPC errors due to a console being unresponsive. 42NO_CONSOLE_OUTPUT_RE = re.compile(r'%sNo data was sent from the pty\.' % 43 CONSOLE_COMMON_RE) 44 45 46# Regex to match XMLRPC errors due to a console control failing, but the 47# underlying Console being responsive. 48CONSOLE_MISMATCH_RE = re.compile(r'%sThere was output:' % CONSOLE_COMMON_RE) 49 50 51# The minimum voltage on the charger port on servo v4 that is expected. This is 52# to query whether a charger is plugged into servo v4 and thus pd control 53# capabilities can be used. 54V4_CHG_ATTACHED_MIN_VOLTAGE_MV = 4400 55 56 57class ControlUnavailableError(error.TestFail): 58 """Custom error class to indicate a control is unavailable on servod.""" 59 pass 60 61 62class ConsoleError(error.TestFail): 63 """Common error class for servod console-back control failures.""" 64 pass 65 66 67class UnresponsiveConsoleError(ConsoleError): 68 """Error for: A console control fails for lack of console output.""" 69 pass 70 71 72class ResponsiveConsoleError(ConsoleError): 73 """Error for: A console control fails but console is responsive.""" 74 pass 75 76 77class ServodBadResponse(six.moves.http_client.BadStatusLine): 78 """Indicates a bad HTTP response from servod""" 79 80 def __init__(self, when, line): 81 """ 82 83 @param when: Description of the operation being performed (get/set) 84 @param line: The line that came from the server, often an empty string. 85 """ 86 super(ServodBadResponse, self).__init__(line) 87 self.when = when 88 89 def __str__(self): 90 """String representation of the exception""" 91 return '%s -- StatusLine=%s' % (self.when, self.line) 92 93 94class ServodEmptyResponse(ServodBadResponse): 95 """Indicates an empty response from servod, possibly because it exited.""" 96 pass 97 98 99class ServodConnectionError(seven.SOCKET_ERRORS[0]): 100 """Indicates socket errors seen during communication with servod""" 101 102 def __init__(self, when, errno, strerror, filename): 103 """Instance initializer 104 105 The filename is used to add details to the exception message: 106 [Errno 104] Connection reset by peer: "<Servo 'ipaddr:9999'>" 107 108 @param when: Description of the operation being performed at the time 109 @param errno: errno value, such as ECONNRESET 110 @param strerror: OS-provided description ("connection reset by peer") 111 @param filename: Something to report as a path, such as a socket address 112 """ 113 # [Errno 104] [Setting ctrl:val] Connection reset by peer: <Servo... 114 self.when = when 115 super(ServodConnectionError, self).__init__(errno, strerror, filename) 116 117 def __str__(self): 118 """String representation of the exception""" 119 msgv = [self.when] 120 if self.errno is not None or self.strerror is not None: 121 msgv.append('--') 122 if self.errno is not None: 123 msgv.append('[Errno %d]' % self.errno) 124 if self.strerror is not None: 125 msgv.append(self.strerror) 126 return '%s: %r' % (' '.join(msgv), self.filename) 127 128 129# TODO: once in python 3, inherit from AbstractContextManager 130class _WrapServoErrors(object): 131 """ 132 Wrap an operation, replacing BadStatusLine and socket.error with 133 servo-specific versions, and extracting exception info from xmlrplib.Fault. 134 135 @param servo_name: The servo object, used to add the servo name to errors. 136 See the ServodConnectionError docstring. 137 @param description: String to use when describing what was being done 138 @raise ServodBadStatusLine: if exception is a httplib.BadStatusLine 139 @raise ServodSocketError: if exception is a socket.error 140 @raise ControlUnavailableError: if Fault matches NO_CONTROL_RE 141 @raise UnresponsiveConsoleError: if Fault matches NO_CONSOLE_OUTPUT_RE 142 @raise ResponsiveConsoleError: if Fault matches CONSOLE_MISMATCH_RE 143 """ 144 145 def __init__(self, servo, description): 146 self.servo_name = str(servo) 147 self.description = description 148 149 @staticmethod 150 def _get_xmlrpclib_exception(xmlexc): 151 """Get meaningful exception string from xmlrpc. 152 153 Args: 154 xmlexc: xmlrpclib.Fault object 155 156 xmlrpclib.Fault.faultString has the following format: 157 158 <type 'exception type'>:'actual error message' 159 160 Parse and return the real exception from the servod side instead of the 161 less meaningful one like, 162 <Fault 1: "<type 'exceptions.AttributeError'>:'tca6416' object has no 163 attribute 'hw_driver'"> 164 165 Returns: 166 string of underlying exception raised in servod. 167 """ 168 return re.sub('^.*>:', '', xmlexc.faultString) 169 170 @staticmethod 171 def _log_exception(exc_type, exc_val, exc_tb): 172 """Log exception information""" 173 if exc_val is not None: 174 logging.debug( 175 'Wrapped exception:', exc_info=(exc_type, exc_val, exc_tb)) 176 177 def __enter__(self): 178 """Enter the context""" 179 return self 180 181 def __exit__(self, exc_type, exc_val, exc_tb): 182 """Exit the context, handling the exception if there was one""" 183 try: 184 if isinstance(exc_val, six.moves.http_client.BadStatusLine): 185 self._log_exception(exc_type, exc_val, exc_tb) 186 if exc_val.line in ('', "''"): 187 err = ServodEmptyResponse(self.description, exc_val.line) 188 else: 189 err = ServodBadResponse(self.description, exc_val.line) 190 six.reraise(err.__class__, err, exc_tb) 191 192 if isinstance(exc_val, seven.SOCKET_ERRORS): 193 self._log_exception(exc_type, exc_val, exc_tb) 194 if len(exc_val.args) == 0: 195 errno = None 196 strerror = None 197 elif len(exc_val.args) == 1: 198 errno = None 199 strerror = exc_val.args[0] 200 else: 201 errno = exc_val.args[0] 202 strerror = exc_val.args[1] 203 err = ServodConnectionError(self.description, errno, strerror, 204 self.servo_name) 205 six.reraise(err.__class__, err, exc_tb) 206 207 if isinstance(exc_val, six.moves.xmlrpc_client.Fault): 208 err_str = self._get_xmlrpclib_exception(exc_val) 209 err_msg = '%s :: %s' % (self.description, err_str) 210 unknown_ctrl = re.search(NO_CONTROL_RE, err_str) 211 if not unknown_ctrl: 212 # Log the full text for errors, except unavailable controls. 213 self._log_exception(exc_type, exc_val, exc_tb) 214 logging.debug(err_msg) 215 if unknown_ctrl: 216 # The error message for unavailable controls is huge, since 217 # it reports all known controls. Don't log the full text. 218 unknown_ctrl_name = unknown_ctrl.group('name') 219 logging.debug('%s :: No control named %r', 220 self.description, unknown_ctrl_name) 221 err = ControlUnavailableError( 222 'No control named %r' % unknown_ctrl_name) 223 elif re.search(NO_CONSOLE_OUTPUT_RE, err_str): 224 err = UnresponsiveConsoleError( 225 'Console not printing output. %s.' % 226 self.description) 227 elif re.search(CONSOLE_MISMATCH_RE, err_str): 228 err = ResponsiveConsoleError( 229 'Control failed but console alive. %s.' % 230 self.description) 231 else: 232 err = error.TestFail(err_msg) 233 six.reraise(err.__class__, err, exc_tb) 234 finally: 235 del exc_tb 236 237 238def _extract_image_from_tarball(tarball, dest_dir, image_candidates, timeout): 239 """Try extracting the image_candidates from the tarball. 240 241 @param tarball: The path of the tarball. 242 @param dest_path: The path of the destination. 243 @param image_candidates: A tuple of the paths of image candidates. 244 @param timeout: Time to wait in seconds before timing out. 245 246 @return: The first path from the image candidates, which succeeds, or None 247 if all the image candidates fail. 248 """ 249 250 # Create the firmware_name subdirectory if it doesn't exist 251 if not os.path.exists(dest_dir): 252 os.mkdir(dest_dir) 253 254 # Generate a list of all tarball files 255 stdout = server_utils.system_output('tar tf %s' % tarball, 256 timeout=timeout, 257 ignore_status=True, 258 args=image_candidates) 259 tarball_files = stdout.splitlines() 260 261 # Check if image candidates are in the list of tarball files 262 for image in image_candidates: 263 logging.debug("Trying to extract %s (autotest)", image) 264 if image in tarball_files: 265 # Extract and return the first image candidate found 266 tar_cmd = 'tar xf %s -C %s %s' % (tarball, dest_dir, image) 267 status = server_utils.system(tar_cmd, 268 timeout=timeout, 269 ignore_status=True) 270 if status == 0: 271 return image 272 return None 273 274 275class _PowerStateController(object): 276 277 """Class to provide board-specific power operations. 278 279 This class is responsible for "power on" and "power off" 280 operations that can operate without making assumptions in 281 advance about board state. It offers an interface that 282 abstracts out the different sequences required for different 283 board types. 284 285 """ 286 # Constants acceptable to be passed for the `rec_mode` parameter 287 # to power_on(). 288 # 289 # REC_ON: Boot the DUT in recovery mode, i.e. boot from USB or 290 # SD card. 291 # REC_OFF: Boot in normal mode, i.e. boot from internal storage. 292 293 REC_ON = 'rec' 294 REC_OFF = 'on' 295 REC_ON_FORCE_MRC = 'rec_force_mrc' 296 297 # Delay in seconds needed between asserting and de-asserting 298 # warm reset. 299 _RESET_HOLD_TIME = 0.5 300 301 302 def __init__(self, servo): 303 """Initialize the power state control. 304 305 @param servo Servo object providing the underlying `set` and `get` 306 methods for the target controls. 307 308 """ 309 self._servo = servo 310 self.supported = self._servo.has_control('power_state') 311 self.last_rec_mode = self.REC_OFF 312 if not self.supported: 313 logging.info('Servo setup does not support power-state operations. ' 314 'All power-state calls will lead to error.TestFail') 315 316 def _check_supported(self): 317 """Throw an error if dts mode control is not supported.""" 318 if not self.supported: 319 raise error.TestFail('power_state controls not supported') 320 321 def reset(self): 322 """Force the DUT to reset. 323 324 The DUT is guaranteed to be on at the end of this call, 325 regardless of its previous state, provided that there is 326 working OS software. This also guarantees that the EC has 327 been restarted. 328 329 """ 330 self._check_supported() 331 self._servo.set_nocheck('power_state', 'reset') 332 333 def cr50_reset(self): 334 """Force the DUT to reset. 335 336 The DUT is guaranteed to be on at the end of this call, 337 regardless of its previous state, provided that there is 338 working OS software. This also guarantees that the EC has 339 been restarted. Works only for ccd connections. 340 341 """ 342 self._check_supported() 343 self._servo.set_nocheck('power_state', 'cr50_reset') 344 345 def warm_reset(self): 346 """Apply warm reset to the DUT. 347 348 This asserts, then de-asserts the 'warm_reset' signal. 349 Generally, this causes the board to restart. 350 351 """ 352 # TODO: warm_reset support has added to power_state.py. Once it 353 # available to labstation remove fallback method. 354 self._check_supported() 355 try: 356 self._servo.set_nocheck('power_state', 'warm_reset') 357 except error.TestFail as err: 358 logging.info("Fallback to warm_reset control method") 359 self._servo.set_get_all(['warm_reset:on', 360 'sleep:%.4f' % self._RESET_HOLD_TIME, 361 'warm_reset:off']) 362 363 def power_off(self): 364 """Force the DUT to power off. 365 366 The DUT is guaranteed to be off at the end of this call, 367 regardless of its previous state, provided that there is 368 working EC and boot firmware. There is no requirement for 369 working OS software. 370 371 """ 372 self._check_supported() 373 self._servo.set_nocheck('power_state', 'off') 374 375 def power_on(self, rec_mode=REC_OFF): 376 """Force the DUT to power on. 377 378 Prior to calling this function, the DUT must be powered off, 379 e.g. with a call to `power_off()`. 380 381 At power on, recovery mode is set as specified by the 382 corresponding argument. When booting with recovery mode on, it 383 is the caller's responsibility to unplug/plug in a bootable 384 external storage device. 385 386 If the DUT requires a delay after powering on but before 387 processing inputs such as USB stick insertion, the delay is 388 handled by this method; the caller is not responsible for such 389 delays. 390 391 @param rec_mode Setting of recovery mode to be applied at 392 power on. default: REC_OFF aka 'off' 393 394 """ 395 self._check_supported() 396 self._servo.set_nocheck('power_state', rec_mode) 397 self.last_rec_mode = rec_mode 398 399 def retry_power_on(self): 400 """Retry powering on the DUT. 401 402 After power_on(...) the system might not come up reliably, although 403 the reasons aren't known yet. This function retries turning on the 404 system again, trying to bring it in the last state that power_on() 405 attempted to reach. 406 """ 407 self._check_supported() 408 self._servo.set_nocheck('power_state', self.last_rec_mode) 409 410 411class _Uart(object): 412 """Class to capture UART streams of CPU, EC, Cr50, etc.""" 413 _UartToCapture = ('cpu', 'cr50', 'ec', 'servo_micro', 'servo_v4', 'usbpd', 414 'ccd_cr50.ec', 'ccd_cr50.cpu', 'ccd_cr50.cr50' 415 'ccd_gsc.ec', 'ccd_gsc.cpu', 'ccd_gsc.cr50') 416 417 418 def __init__(self, servo): 419 self._servo = servo 420 self._streams = [] 421 self.logs_dir = None 422 423 def _start_stop_capture(self, uart, start): 424 """Helper function to start/stop capturing on specified UART. 425 426 @param uart: The UART name to start/stop capturing. 427 @param start: True to start capturing, otherwise stop. 428 429 @returns True if the operation completes successfully. 430 False if the UART capturing is not supported or failed due to 431 an error. 432 """ 433 logging.debug('%s capturing %s UART.', 'Start' if start else 'Stop', 434 uart) 435 uart_cmd = '%s_uart_capture' % uart 436 target_level = 'on' if start else 'off' 437 level = None 438 if self._servo.has_control(uart_cmd): 439 # Do our own implementation of set() here as not_applicable 440 # should also count as a valid control. 441 logging.debug('Trying to set %s to %s.', uart_cmd, target_level) 442 try: 443 self._servo.set_nocheck(uart_cmd, target_level) 444 level = self._servo.get(uart_cmd) 445 except error.TestFail as e: 446 # Any sort of test failure here should not stop the test. This 447 # is just to capture more output. Log and move on. 448 logging.warning('Failed to set %s to %s. %s. Ignoring.', 449 uart_cmd, target_level, str(e)) 450 if level == target_level: 451 logging.debug('Managed to set %s to %s.', uart_cmd, level) 452 else: 453 logging.debug('Failed to set %s to %s. Got %s.', uart_cmd, 454 target_level, level) 455 return level == target_level 456 457 def start_capture(self): 458 """Start capturing UART streams.""" 459 for uart in self._UartToCapture: 460 # Always try to start the uart. Only add it to _streams if it's not 461 # in the list. 462 if (self._start_stop_capture(uart, True) 463 and uart not in self._streams): 464 self._streams.append(uart) 465 466 def get_logfile(self, uart): 467 """Return the path to the uart logfile or none if logs_dir isn't set.""" 468 if not self.logs_dir: 469 return None 470 return os.path.join(self.logs_dir, '%s_uart.txt' % uart) 471 472 def dump(self): 473 """Dump UART streams to log files accordingly.""" 474 if not self.logs_dir: 475 return 476 477 for uart in self._streams: 478 logfile_fullname = self.get_logfile(uart) 479 stream = '%s_uart_stream' % uart 480 try: 481 content = self._servo.get(stream) 482 except Exception as err: 483 logging.warning('Failed to get UART log for %s: %s', stream, err) 484 continue 485 486 if content == 'not_applicable': 487 logging.warning('%s is not applicable', stream) 488 continue 489 490 # The UART stream may contain non-printable characters, and servo 491 # returns it in string representation. We use `ast.leteral_eval` 492 # to revert it back. 493 with open(logfile_fullname, 'a') as fd: 494 try: 495 fd.write(ast.literal_eval(content)) 496 except ValueError: 497 logging.exception('Invalid value for %s: %r', stream, 498 content) 499 500 def stop_capture(self): 501 """Stop capturing UART streams.""" 502 for uart in self._UartToCapture: 503 try: 504 self._start_stop_capture(uart, False) 505 except Exception as err: 506 logging.warning('Failed to stop UART logging for %s: %s', uart, 507 err) 508 509 510class Servo(object): 511 512 """Manages control of a Servo board. 513 514 Servo is a board developed by hardware group to aide in the debug and 515 control of various partner devices. Servo's features include the simulation 516 of pressing the power button, closing the lid, and pressing Ctrl-d. This 517 class manages setting up and communicating with a servo demon (servod) 518 process. It provides both high-level functions for common servo tasks and 519 low-level functions for directly setting and reading gpios. 520 521 """ 522 523 # Power button press delays in seconds. 524 # 525 # The EC specification says that 8.0 seconds should be enough 526 # for the long power press. However, some platforms need a bit 527 # more time. Empirical testing has found these requirements: 528 # Alex: 8.2 seconds 529 # ZGB: 8.5 seconds 530 # The actual value is set to the largest known necessary value. 531 # 532 # TODO(jrbarnette) Being generous is the right thing to do for 533 # existing platforms, but if this code is to be used for 534 # qualification of new hardware, we should be less generous. 535 SHORT_DELAY = 0.1 536 537 # Maximum number of times to re-read power button on release. 538 GET_RETRY_MAX = 10 539 540 # Delays to deal with DUT state transitions. 541 SLEEP_DELAY = 6 542 BOOT_DELAY = 10 543 544 # Default minimum time interval between 'press' and 'release' 545 # keyboard events. 546 SERVO_KEY_PRESS_DELAY = 0.1 547 548 # Time to toggle recovery switch on and off. 549 REC_TOGGLE_DELAY = 0.1 550 551 # Time to toggle development switch on and off. 552 DEV_TOGGLE_DELAY = 0.1 553 554 # Time between an usb disk plugged-in and detected in the system. 555 USB_DETECTION_DELAY = 5 556 557 # Time to wait before timing out on servo initialization. 558 INIT_TIMEOUT_SECS = 10 559 560 # Time to wait before timing out when extracting firmware images. 561 # 562 # This was increased from 60 seconds to support boards with very 563 # large (>500MB) firmware archives taking longer than expected to 564 # extract firmware on the lab host machines (b/149419503). 565 EXTRACT_TIMEOUT_SECS = 900 566 567 # The VBUS voltage threshold used to detect if VBUS is supplied 568 VBUS_THRESHOLD = 3000.0 569 570 # List of servos that connect to a debug header on the board. 571 FLEX_SERVOS = ['c2d2', 'servo_micro', 'servo_v3'] 572 573 # List of servos that rely on gsc commands for some part of dut control. 574 GSC_DRV_SERVOS = ['c2d2', 'ccd_gsc', 'ccd_cr50'] 575 576 CCD_PREFIX = 'ccd_' 577 578 def __init__(self, servo_host, servo_serial=None, delay_init=False): 579 """Sets up the servo communication infrastructure. 580 581 @param servo_host: A ServoHost object representing 582 the host running servod. 583 @type servo_host: autotest_lib.server.hosts.servo_host.ServoHost 584 @param servo_serial: Serial number of the servo board. 585 @param delay_init: Delay cache servo_type and power_state to prevent 586 attempt to connect to the servod. 587 """ 588 # TODO(fdeng): crbug.com/298379 589 # We should move servo_host object out of servo object 590 # to minimize the dependencies on the rest of Autotest. 591 self._servo_host = servo_host 592 self._servo_serial = servo_serial 593 self._servo_type = None 594 self._power_state = None 595 self._programmer = None 596 self._prev_log_inode = None 597 self._prev_log_size = 0 598 self._ccd_watchdog_disabled = False 599 if not delay_init: 600 self._servo_type = self.get_servo_version() 601 self._power_state = _PowerStateController(self) 602 self._uart = _Uart(self) 603 604 def __str__(self): 605 """Description of this object and address, for use in errors""" 606 return "<%s '%s:%s'>" % ( 607 type(self).__name__, 608 self._servo_host.hostname, 609 self._servo_host.servo_port) 610 611 @property 612 def _server(self): 613 with _WrapServoErrors( 614 servo=self, description='get_servod_server_proxy()'): 615 return self._servo_host.get_servod_server_proxy() 616 617 @property 618 def servo_serial(self): 619 """Returns the serial number of the servo board.""" 620 return self._servo_serial 621 622 def get_power_state_controller(self): 623 """Return the power state controller for this Servo. 624 625 The power state controller provides board-independent 626 interfaces for reset, power-on, power-off operations. 627 628 """ 629 if self._power_state is None: 630 self._power_state = _PowerStateController(self) 631 return self._power_state 632 633 634 def initialize_dut(self, cold_reset=False, enable_main=True): 635 """Initializes a dut for testing purposes. 636 637 This sets various servo signals back to default values 638 appropriate for the target board. By default, if the DUT 639 is already on, it stays on. If the DUT is powered off 640 before initialization, its state afterward is unspecified. 641 642 Rationale: Basic initialization of servo sets the lid open, 643 when there is a lid. This operation won't affect powered on 644 units; however, setting the lid open may power on a unit 645 that's off, depending on the board type and previous state 646 of the device. 647 648 If `cold_reset` is a true value, the DUT and its EC will be 649 reset, and the DUT rebooted in normal mode. 650 651 @param cold_reset If True, cold reset the device after 652 initialization. 653 @param enable_main If True, make sure the main servo device has 654 control of the dut. 655 656 """ 657 if enable_main: 658 self.enable_main_servo_device() 659 660 with _WrapServoErrors( 661 servo=self, description='initialize_dut()->hwinit()'): 662 self._server.hwinit() 663 if self.has_control('usb_mux_oe1'): 664 self.set('usb_mux_oe1', 'on') 665 self.switch_usbkey('off') 666 else: 667 logging.warning('Servod command \'usb_mux_oe1\' is not available. ' 668 'Any USB drive related servo routines will fail.') 669 # Create a record of SBU voltages if this is running support servo (v4, 670 # v4p1). 671 # TODO(coconutruben): eventually, replace this with a metric to track 672 # SBU voltages wrt servo-hw/dut-hw 673 if self.has_control('servo_dut_sbu1_mv'): 674 # Attempt to take a reading of sbu1 and sbu2 multiple times to 675 # account for situations where the two lines exchange hi/lo roles 676 # frequently. 677 for i in range(10): 678 try: 679 sbu1 = int(self.get('servo_dut_sbu1_mv')) 680 sbu2 = int(self.get('servo_dut_sbu2_mv')) 681 logging.info('attempt %d sbu1 %d sbu2 %d', i, sbu1, sbu2) 682 except error.TestFail as e: 683 # This is a nice to have but if reading this fails, it 684 # shouldn't interfere with the test. 685 logging.exception(e) 686 self._uart.start_capture() 687 # Run testlab open if servo relies on ccd to control the dut. 688 if self.main_device_uses_gsc_drv(): 689 self.set_nocheck('cr50_testlab', 'open') 690 if cold_reset: 691 if not self.get_power_state_controller().supported: 692 logging.info('Cold-reset for DUT requested, but servo ' 693 'setup does not support power_state. Skipping.') 694 else: 695 self.get_power_state_controller().reset() 696 with _WrapServoErrors( 697 servo=self, description='initialize_dut()->get_version()'): 698 version = self._server.get_version() 699 logging.debug('Servo initialized, version is %s', version) 700 701 702 def is_localhost(self): 703 """Is the servod hosted locally? 704 705 Returns: 706 True if local hosted; otherwise, False. 707 """ 708 return self._servo_host.is_localhost() 709 710 711 def get_os_version(self): 712 """Returns the chromeos release version.""" 713 lsb_release_content = self.system_output('cat /etc/lsb-release', 714 ignore_status=True) 715 return lsbrelease_utils.get_chromeos_release_builder_path( 716 lsb_release_content=lsb_release_content) 717 718 719 def get_servod_version(self): 720 """Returns the servod version.""" 721 # TODO: use system_output once servod --sversion prints to stdout 722 try: 723 result = self._servo_host.run('servod --sversion 2>&1') 724 except error.AutoservRunError as e: 725 if 'command execution error' in str(e): 726 # Fall back to version if sversion is not supported yet. 727 result = self._servo_host.run('servod --version 2>&1') 728 return result.stdout.strip() or result.stderr.strip() 729 # An actually unexpected error occurred, just raise. 730 raise e 731 sversion = result.stdout or result.stderr 732 # The sversion output contains 3 lines: 733 # servod v1.0.816-ff8e966 // the extended version with git hash 734 # 2020-04-08 01:10:29 // the time of the latest commit 735 # chromeos-ci-legacy-us-central1-b-x32-55-u8zc // builder information 736 # For debugging purposes, we mainly care about the version, and the 737 # timestamp. 738 if type(sversion) == type(b' '): 739 sversion = sversion.decode("utf-8") 740 return ' '.join(sversion.split()[1:4]) 741 742 743 def power_long_press(self): 744 """Simulate a long power button press.""" 745 # After a long power press, the EC may ignore the next power 746 # button press (at least on Alex). To guarantee that this 747 # won't happen, we need to allow the EC one second to 748 # collect itself. 749 # long_press is defined as 8.5s in servod 750 self.power_key('long_press') 751 752 753 def power_normal_press(self): 754 """Simulate a normal power button press.""" 755 # press is defined as 1.2s in servod 756 self.power_key('press') 757 758 759 def power_short_press(self): 760 """Simulate a short power button press.""" 761 # tab is defined as 0.2s in servod 762 self.power_key('tab') 763 764 765 def power_key(self, press_secs='tab'): 766 """Simulate a power button press. 767 768 @param press_secs: int, float, str; time to press key in seconds or 769 known shorthand: 'tab' 'press' 'long_press' 770 """ 771 # TODO(b/224804060): use the power_key control for all servo types when 772 # c2d2 has a defined power_key driver. 773 if 'c2d2' not in self.get_servo_type(): 774 self.set_nocheck('power_key', press_secs) 775 return 776 if isinstance(press_secs, str): 777 if press_secs == 'tab': 778 press_secs = 0.2 779 elif press_secs == 'press': 780 press_secs = 1.2 781 elif press_secs == 'long_press': 782 press_secs = 8.5 783 else: 784 raise error.TestError('Invalid press %r' % press_secs) 785 logging.info('Manual power button press for %ds', press_secs) 786 self.set_nocheck('pwr_button', 'press') 787 time.sleep(press_secs) 788 self.set_nocheck('pwr_button', 'release') 789 790 791 def pwr_button(self, action='press'): 792 """Simulate a power button press. 793 794 @param action: str; could be press or could be release. 795 """ 796 self.set_nocheck('pwr_button', action) 797 798 799 def lid_open(self): 800 """Simulate opening the lid and raise exception if all attempts fail""" 801 self.set('lid_open', 'yes') 802 803 804 def lid_close(self): 805 """Simulate closing the lid and raise exception if all attempts fail 806 807 Waits 6 seconds to ensure the device is fully asleep before returning. 808 """ 809 self.set('lid_open', 'no') 810 time.sleep(Servo.SLEEP_DELAY) 811 812 813 def vbus_power_get(self): 814 """Get current vbus_power.""" 815 return self.get('vbus_power') 816 817 818 def volume_up(self, timeout=300): 819 """Simulate pushing the volume down button. 820 821 @param timeout: Timeout for setting the volume. 822 """ 823 self.set_get_all(['volume_up:yes', 824 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 825 'volume_up:no']) 826 # we need to wait for commands to take effect before moving on 827 time_left = float(timeout) 828 while time_left > 0.0: 829 value = self.get('volume_up') 830 if value == 'no': 831 return 832 time.sleep(self.SHORT_DELAY) 833 time_left = time_left - self.SHORT_DELAY 834 raise error.TestFail("Failed setting volume_up to no") 835 836 def volume_down(self, timeout=300): 837 """Simulate pushing the volume down button. 838 839 @param timeout: Timeout for setting the volume. 840 """ 841 self.set_get_all(['volume_down:yes', 842 'sleep:%.4f' % self.SERVO_KEY_PRESS_DELAY, 843 'volume_down:no']) 844 # we need to wait for commands to take effect before moving on 845 time_left = float(timeout) 846 while time_left > 0.0: 847 value = self.get('volume_down') 848 if value == 'no': 849 return 850 time.sleep(self.SHORT_DELAY) 851 time_left = time_left - self.SHORT_DELAY 852 raise error.TestFail("Failed setting volume_down to no") 853 854 def arrow_up(self, press_secs='tab'): 855 """Simulate arrow up key presses. 856 857 @param press_secs: int, float, str; time to press key in seconds or 858 known shorthand: 'tab' 'press' 'long_press'. 859 """ 860 # TODO: Remove this check after a lab update to include CL:1913684 861 if not self.has_control('arrow_up'): 862 logging.warning('Control arrow_up ignored. ' 863 'Please update hdctools') 864 return 865 self.set_nocheck('arrow_up', press_secs) 866 867 def arrow_down(self, press_secs='tab'): 868 """Simulate arrow down key presses. 869 870 @param press_secs: int, float, str; time to press key in seconds or 871 known shorthand: 'tab' 'press' 'long_press'. 872 """ 873 # TODO: Remove this check after a lab update to include CL:1913684 874 if not self.has_control('arrow_down'): 875 logging.warning('Control arrow_down ignored. ' 876 'Please update hdctools') 877 return 878 self.set_nocheck('arrow_down', press_secs) 879 880 def ctrl_d(self, press_secs='tab'): 881 """Simulate Ctrl-d simultaneous button presses. 882 883 @param press_secs: int, float, str; time to press key in seconds or 884 known shorthand: 'tab' 'press' 'long_press' 885 """ 886 self.set_nocheck('ctrl_d', press_secs) 887 888 889 def ctrl_r(self, press_secs='tab'): 890 """Simulate Ctrl-r simultaneous button presses. 891 892 @param press_secs: int, float, str; time to press key in seconds or 893 known shorthand: 'tab' 'press' 'long_press' 894 """ 895 self.set_nocheck('ctrl_r', press_secs) 896 897 def ctrl_s(self, press_secs='tab'): 898 """Simulate Ctrl-s simultaneous button presses. 899 900 @param press_secs: int, float, str; time to press key in seconds or 901 known shorthand: 'tab' 'press' 'long_press' 902 """ 903 self.set_nocheck('ctrl_s', press_secs) 904 905 906 def ctrl_u(self, press_secs='tab'): 907 """Simulate Ctrl-u simultaneous button presses. 908 909 @param press_secs: int, float, str; time to press key in seconds or 910 known shorthand: 'tab' 'press' 'long_press' 911 """ 912 self.set_nocheck('ctrl_u', press_secs) 913 914 915 def ctrl_enter(self, press_secs='tab'): 916 """Simulate Ctrl-enter simultaneous button presses. 917 918 @param press_secs: int, float, str; time to press key in seconds or 919 known shorthand: 'tab' 'press' 'long_press' 920 """ 921 self.set_nocheck('ctrl_enter', press_secs) 922 923 924 def ctrl_key(self, press_secs='tab'): 925 """Simulate Enter key button press. 926 927 @param press_secs: int, float, str; time to press key in seconds or 928 known shorthand: 'tab' 'press' 'long_press' 929 """ 930 self.set_nocheck('ctrl_key', press_secs) 931 932 933 def enter_key(self, press_secs='tab'): 934 """Simulate Enter key button press. 935 936 @param press_secs: int, float, str; time to press key in seconds or 937 known shorthand: 'tab' 'press' 'long_press' 938 """ 939 self.set_nocheck('enter_key', press_secs) 940 941 942 def refresh_key(self, press_secs='tab'): 943 """Simulate Refresh key (F3) button press. 944 945 @param press_secs: int, float, str; time to press key in seconds or 946 known shorthand: 'tab' 'press' 'long_press' 947 """ 948 self.set_nocheck('refresh_key', press_secs) 949 950 951 def ctrl_refresh_key(self, press_secs='tab'): 952 """Simulate Ctrl and Refresh (F3) simultaneous press. 953 954 This key combination is an alternative of Space key. 955 956 @param press_secs: int, float, str; time to press key in seconds or 957 known shorthand: 'tab' 'press' 'long_press' 958 """ 959 self.set_nocheck('ctrl_refresh_key', press_secs) 960 961 962 def imaginary_key(self, press_secs='tab'): 963 """Simulate imaginary key button press. 964 965 Maps to a key that doesn't physically exist. 966 967 @param press_secs: int, float, str; time to press key in seconds or 968 known shorthand: 'tab' 'press' 'long_press' 969 """ 970 self.set_nocheck('imaginary_key', press_secs) 971 972 973 def sysrq_x(self, press_secs='tab'): 974 """Simulate Alt VolumeUp X simulataneous press. 975 976 This key combination is the kernel system request (sysrq) X. 977 978 @param press_secs: int, float, str; time to press key in seconds or 979 known shorthand: 'tab' 'press' 'long_press' 980 """ 981 self.set_nocheck('sysrq_x', press_secs) 982 983 984 def toggle_recovery_switch(self): 985 """Toggle recovery switch on and off.""" 986 self.enable_recovery_mode() 987 time.sleep(self.REC_TOGGLE_DELAY) 988 self.disable_recovery_mode() 989 990 991 def enable_recovery_mode(self): 992 """Enable recovery mode on device.""" 993 self.set('rec_mode', 'on') 994 995 996 def disable_recovery_mode(self): 997 """Disable recovery mode on device.""" 998 self.set('rec_mode', 'off') 999 1000 1001 def toggle_development_switch(self): 1002 """Toggle development switch on and off.""" 1003 self.enable_development_mode() 1004 time.sleep(self.DEV_TOGGLE_DELAY) 1005 self.disable_development_mode() 1006 1007 1008 def enable_development_mode(self): 1009 """Enable development mode on device.""" 1010 self.set('dev_mode', 'on') 1011 1012 1013 def disable_development_mode(self): 1014 """Disable development mode on device.""" 1015 self.set('dev_mode', 'off') 1016 1017 def boot_devmode(self): 1018 """Boot a dev-mode device that is powered off.""" 1019 self.power_short_press() 1020 self.pass_devmode() 1021 1022 1023 def pass_devmode(self): 1024 """Pass through boot screens in dev-mode.""" 1025 time.sleep(Servo.BOOT_DELAY) 1026 self.ctrl_d() 1027 time.sleep(Servo.BOOT_DELAY) 1028 1029 1030 def get_board(self): 1031 """Get the board connected to servod.""" 1032 with _WrapServoErrors(servo=self, description='get_board()'): 1033 return self._server.get_board() 1034 1035 1036 def get_base_board(self): 1037 """Get the board of the base connected to servod.""" 1038 try: 1039 with _WrapServoErrors(servo=self, description='get_base_board()'): 1040 return self._server.get_base_board() 1041 except six.moves.xmlrpc_client.Fault as e: 1042 # TODO(waihong): Remove the following compatibility check when 1043 # the new versions of hdctools are deployed. 1044 if 'not supported' in str(e): 1045 logging.warning('The servod is too old that get_base_board ' 1046 'not supported.') 1047 return '' 1048 raise 1049 1050 def can_set_active_device(self): 1051 """Returns True if the servo setup supports setting the active device 1052 1053 Servo can only change the active device if there are multiple devices 1054 and servo has the active_dut_controller control. 1055 """ 1056 return ('_and_' in self.get_servo_type() 1057 and self.has_control('active_dut_controller')) 1058 1059 def get_active_device_prefix(self): 1060 """Return ccd_(gsc|cr50) or '' if the main device is active.""" 1061 active_device = '' 1062 if self.can_set_active_device(): 1063 # If servo v4 is allowing dual_v4 devices, then choose the 1064 # active device. 1065 active_device = self.get('active_dut_controller') 1066 if active_device == self.get_main_servo_device(): 1067 active_device = '' 1068 return active_device 1069 1070 def get_ec_board(self): 1071 """Get the board name from EC.""" 1072 1073 return self.get('ec_board', prefix=self.get_active_device_prefix()) 1074 1075 def get_ec_active_copy(self): 1076 """Get the active copy of the EC image.""" 1077 return self.get('ec_active_copy') 1078 1079 def has_control(self, ctrl_name, prefix=''): 1080 """Query servod server to determine if |ctrl_name| is a valid control. 1081 1082 @param ctrl_name Name of the control. 1083 @param prefix: prefix to route control to correct servo device. 1084 1085 @returns: true if |ctrl_name| is a known control, false otherwise. 1086 """ 1087 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1088 try: 1089 # If the control exists, doc() will work. 1090 with _WrapServoErrors( 1091 servo=self, 1092 description='has_control(%s)->doc()' % ctrl_name): 1093 self._server.doc(ctrl_name) 1094 return True 1095 except ControlUnavailableError: 1096 return False 1097 1098 def _build_ctrl_name(self, ctrl_name, prefix): 1099 """Helper to build the control name if a prefix is used. 1100 1101 @param ctrl_name Name of the control. 1102 @param prefix: prefix to route control to correct servo device. 1103 1104 @returns: [|prefix|.]ctrl_name depending on whether prefix is non-empty. 1105 """ 1106 assert ctrl_name 1107 if prefix: 1108 return '%s.%s' % (prefix, ctrl_name) 1109 return ctrl_name 1110 1111 def get(self, ctrl_name, prefix=''): 1112 """Get the value of a gpio from Servod. 1113 1114 @param ctrl_name Name of the control. 1115 @param prefix: prefix to route control to correct servo device. 1116 1117 @returns: server response to |ctrl_name| request. 1118 1119 @raise ControlUnavailableError: if |ctrl_name| not a known control. 1120 @raise error.TestFail: for all other failures doing get(). 1121 """ 1122 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1123 with _WrapServoErrors( 1124 servo=self, description='Getting %s' % ctrl_name): 1125 return self._server.get(ctrl_name) 1126 1127 def set(self, ctrl_name, ctrl_value, prefix=''): 1128 """Set and check the value of a gpio using Servod. 1129 1130 @param ctrl_name: Name of the control. 1131 @param ctrl_value: New setting for the control. 1132 @param prefix: prefix to route control to correct servo device. 1133 @raise error.TestFail: if the control value fails to change. 1134 """ 1135 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1136 self.set_nocheck(ctrl_name, ctrl_value) 1137 retry_count = Servo.GET_RETRY_MAX 1138 actual_value = self.get(ctrl_name) 1139 while ctrl_value != actual_value and retry_count: 1140 logging.warning("%s != %s, retry %d", ctrl_name, ctrl_value, 1141 retry_count) 1142 retry_count -= 1 1143 time.sleep(Servo.SHORT_DELAY) 1144 actual_value = self.get(ctrl_name) 1145 1146 if ctrl_value != actual_value: 1147 raise error.TestFail( 1148 'Servo failed to set %s to %s. Got %s.' 1149 % (ctrl_name, ctrl_value, actual_value)) 1150 1151 def set_nocheck(self, ctrl_name, ctrl_value, prefix=''): 1152 """Set the value of a gpio using Servod. 1153 1154 @param ctrl_name Name of the control. 1155 @param ctrl_value New setting for the control. 1156 @param prefix: prefix to route control to correct servo device. 1157 1158 @raise ControlUnavailableError: if |ctrl_name| not a known control. 1159 @raise error.TestFail: for all other failures doing set(). 1160 """ 1161 ctrl_name = self._build_ctrl_name(ctrl_name, prefix) 1162 # The real danger here is to pass a None value through the xmlrpc. 1163 assert ctrl_value is not None 1164 description = 'Setting %s to %r' % (ctrl_name, ctrl_value) 1165 logging.debug('%s', description) 1166 with _WrapServoErrors(servo=self, description=description): 1167 self._server.set(ctrl_name, ctrl_value) 1168 1169 def set_get_all(self, controls): 1170 """Set &| get one or more control values. 1171 1172 @param controls: list of strings, controls to set &| get. 1173 1174 @raise: error.TestError in case error occurs setting/getting values. 1175 """ 1176 description = 'Set/get all: %s' % str(controls) 1177 logging.debug('%s', description) 1178 with _WrapServoErrors(servo=self, description=description): 1179 return self._server.set_get_all(controls) 1180 1181 1182 def probe_host_usb_dev(self): 1183 """Probe the USB disk device plugged-in the servo from the host side. 1184 1185 It uses servod to discover if there is a usb device attached to it. 1186 1187 @return: String of USB disk path (e.g. '/dev/sdb') or None. 1188 """ 1189 # Set up Servo's usb mux. 1190 return self.get('image_usbkey_dev') or None 1191 1192 1193 def image_to_servo_usb(self, image_path=None, 1194 make_image_noninteractive=False, 1195 power_off_dut=True): 1196 """Install an image to the USB key plugged into the servo. 1197 1198 This method may copy any image to the servo USB key including a 1199 recovery image or a test image. These images are frequently used 1200 for test purposes such as restoring a corrupted image or conducting 1201 an upgrade of ec/fw/kernel as part of a test of a specific image part. 1202 1203 @param image_path: Path on the host to the recovery image. 1204 @param make_image_noninteractive: Make the recovery image 1205 noninteractive, therefore the DUT 1206 will reboot automatically after 1207 installation. 1208 @param power_off_dut: To put the DUT in power off mode. 1209 """ 1210 # We're about to start plugging/unplugging the USB key. We 1211 # don't know the state of the DUT, or what it might choose 1212 # to do to the device after hotplug. To avoid surprises, 1213 # force the DUT to be off. 1214 if power_off_dut: 1215 self.get_power_state_controller().power_off() 1216 1217 if image_path: 1218 logging.info('Searching for usb device and copying image to it. ' 1219 'Please wait a few minutes...') 1220 # The servod control automatically sets up the host in the host 1221 # direction. 1222 try: 1223 self.set_nocheck('download_image_to_usb_dev', image_path) 1224 except error.TestFail as e: 1225 logging.error('Failed to transfer requested image to USB. %s.' 1226 'Please take a look at Servo Logs.', str(e)) 1227 raise error.AutotestError('Download image to usb failed.') 1228 if make_image_noninteractive: 1229 logging.info('Making image noninteractive') 1230 try: 1231 dev = self.probe_host_usb_dev() 1232 if not dev: 1233 # This is fine but also should never happen: if we 1234 # successfully download an image but somehow cannot 1235 # find the stick again, it needs to be investigated. 1236 raise error.TestFail('No image usb key detected ' 1237 'after successful download. ' 1238 'Please investigate.') 1239 # The modification has to happen on partition 1. 1240 dev_partition = '%s1' % dev 1241 self.set_nocheck('make_usb_dev_image_noninteractive', 1242 dev_partition) 1243 except error.TestFail as e: 1244 logging.error('Failed to make image noninteractive. %s.' 1245 'Please take a look at Servo Logs.', 1246 str(e)) 1247 1248 def boot_in_recovery_mode(self, snk_mode=False): 1249 """Boot host DUT in recovery mode. 1250 1251 @param snk_mode: If True, switch servo_v4 role to 'snk' mode before 1252 boot DUT into recovery mode. 1253 """ 1254 # This call has a built-in delay to ensure that we wait a timeout 1255 # for the stick to enumerate and settle on the DUT side. 1256 self.switch_usbkey('dut') 1257 # Switch servo_v4 mode to snk as the DUT won't able to see usb drive 1258 # in recovery mode if the servo is in src mode(see crbug.com/1129165). 1259 if snk_mode: 1260 logging.info('Setting servo_v4 role to snk mode in order to make' 1261 ' the DUT can see usb drive while in recovery mode.') 1262 self.set_servo_v4_role('snk') 1263 1264 try: 1265 power_state = self.get_power_state_controller() 1266 power_state.power_on(rec_mode=power_state.REC_ON) 1267 except error.TestFail as e: 1268 self.set_servo_v4_role('src') 1269 logging.error('Failed to boot DUT in recovery mode. %s.', str(e)) 1270 raise error.AutotestError('Failed to boot DUT in recovery mode.') 1271 1272 def install_recovery_image(self, 1273 image_path=None, 1274 make_image_noninteractive=False, 1275 snk_mode=False): 1276 """Install the recovery image specified by the path onto the DUT. 1277 1278 This method uses google recovery mode to install a recovery image 1279 onto a DUT through the use of a USB stick that is mounted on a servo 1280 board specified by the usb_dev. If no image path is specified 1281 we use the recovery image already on the usb image. 1282 1283 This method will switch servo_v4 role to 'snk' mode in order to make 1284 the DUT can see the usb drive plugged on servo, the caller should 1285 set servo_v4 role back to 'src' mode one the DUT exit recovery mode. 1286 1287 @param image_path: Path on the host to the recovery image. 1288 @param make_image_noninteractive: Make the recovery image 1289 noninteractive, therefore the DUT will reboot automatically 1290 after installation. 1291 @param snk_mode: If True, switch servo_v4 role to 'snk' mode before 1292 boot DUT into recovery mode. 1293 """ 1294 self.image_to_servo_usb(image_path, make_image_noninteractive) 1295 # Give the DUT some time to power_off if we skip 1296 # download image to usb. (crbug.com/982993) 1297 if not image_path: 1298 time.sleep(10) 1299 self.boot_in_recovery_mode(snk_mode=snk_mode) 1300 1301 1302 def _scp_image(self, image_path): 1303 """Copy image to the servo host. 1304 1305 When programming a firmware image on the DUT, the image must be 1306 located on the host to which the servo device is connected. Sometimes 1307 servo is controlled by a remote host, in this case the image needs to 1308 be transferred to the remote host. This adds the servod port number, to 1309 make sure tests for different DUTs don't trample on each other's files. 1310 Along with the firmware image, any subsidiary files in the same 1311 directory shall be copied to the host as well. 1312 1313 @param image_path: a string, name of the firmware image file to be 1314 transferred. 1315 @return: a string, full path name of the copied file on the remote. 1316 """ 1317 src_path = os.path.dirname(image_path) 1318 dest_path = os.path.join('/tmp', 'dut_%d' % self._servo_host.servo_port) 1319 logging.info('Copying %s to %s', src_path, dest_path) 1320 # Copy a directory, src_path to dest_path. send_file() will create a 1321 # directory named basename(src_path) under dest_path, and copy all files 1322 # in src_path to the destination. 1323 self._servo_host.send_file(src_path, dest_path, delete_dest=True) 1324 1325 # Make a image path of the destination. 1326 # e.g. /tmp/dut_9999/EC/ec.bin 1327 rv = os.path.join(dest_path, os.path.basename(src_path)) 1328 return os.path.join(rv, os.path.basename(image_path)) 1329 1330 1331 def system(self, command, timeout=3600): 1332 """Execute the passed in command on the servod host. 1333 1334 @param command Command to be executed. 1335 @param timeout Maximum number of seconds of runtime allowed. Default to 1336 1 hour. 1337 """ 1338 logging.info('Will execute on servo host: %s', command) 1339 self._servo_host.run(command, timeout=timeout) 1340 1341 1342 def system_output(self, command, timeout=3600, 1343 ignore_status=False, args=()): 1344 """Execute the passed in command on the servod host, return stdout. 1345 1346 @param command a string, the command to execute 1347 @param timeout an int, max number of seconds to wait til command 1348 execution completes. Default to 1 hour. 1349 @param ignore_status a Boolean, if true - ignore command's nonzero exit 1350 status, otherwise an exception will be thrown 1351 @param args a tuple of strings, each becoming a separate command line 1352 parameter for the command 1353 @return command's stdout as a string. 1354 """ 1355 return self._servo_host.run(command, timeout=timeout, 1356 ignore_status=ignore_status, 1357 args=args).stdout.strip() 1358 1359 1360 def get_servo_version(self, active=False): 1361 """Get the version of the servo, e.g., servo_v2 or servo_v3. 1362 1363 @param active: Only return the servo type with the active device. 1364 @return: The version of the servo. 1365 1366 """ 1367 with _WrapServoErrors( 1368 servo=self, description='get_servo_version()->get_version()'): 1369 servo_type = self._server.get_version() 1370 if '_and_' not in servo_type or not active: 1371 return servo_type 1372 1373 # If servo v4 is using ccd and servo micro, modify the servo type to 1374 # reflect the active device. 1375 active_device = self.get('active_dut_controller') 1376 if active_device in servo_type: 1377 logging.info('%s is active', active_device) 1378 return 'servo_v4_with_' + active_device 1379 1380 logging.warning("%s is active even though it's not in servo type", 1381 active_device) 1382 return servo_type 1383 1384 1385 def get_servo_type(self): 1386 if self._servo_type is None: 1387 self._servo_type = self.get_servo_version() 1388 return self._servo_type 1389 1390 def get_servo_v4_type(self): 1391 """Return the servo_v4_type (such as 'type-c'), or None if not v4.""" 1392 if not hasattr(self, '_servo_v4_type'): 1393 if 'servo_v4' in self.get_servo_type(): 1394 self._servo_v4_type = self.get('root.dut_connection_type') 1395 else: 1396 self._servo_v4_type = None 1397 return self._servo_v4_type 1398 1399 def is_servo_v4_type_a(self): 1400 """True if the servo is v4 and type-a, else False.""" 1401 return self.get_servo_v4_type() == 'type-a' 1402 1403 def is_servo_v4_type_c(self): 1404 """True if the servo is v4 and type-c, else False.""" 1405 return self.get_servo_v4_type() == 'type-c' 1406 1407 def get_main_servo_device(self): 1408 """Return the main servo device""" 1409 return self.get_servo_type().split('_with_')[-1].split('_and_')[0] 1410 1411 1412 def enable_main_servo_device(self): 1413 """Make sure the main device has control of the dut.""" 1414 if not self.can_set_active_device(): 1415 return 1416 self.set('active_dut_controller', self.get_main_servo_device()) 1417 1418 def get_ccd_servo_device(self): 1419 """Return the ccd servo device or '' if no ccd devices are connected.""" 1420 servo_type = self.get_servo_type() 1421 if 'ccd' not in servo_type: 1422 return '' 1423 return servo_type.split('_with_')[-1].split('_and_')[-1] 1424 1425 def active_device_is_ccd(self): 1426 """Returns True if a ccd device is active.""" 1427 return 'ccd' in self.get_servo_version(active=True) 1428 1429 def enable_ccd_servo_device(self): 1430 """Make sure the ccd device has control of the dut. 1431 1432 Returns True if the ccd device is in control of the dut. 1433 """ 1434 if self.active_device_is_ccd(): 1435 return True 1436 ccd_device = self.get_ccd_servo_device() 1437 if not self.can_set_active_device() or not ccd_device: 1438 return False 1439 self.set('active_dut_controller', ccd_device) 1440 return True 1441 1442 def main_device_is_ccd(self): 1443 """Whether the main servo device (no prefixes) is a ccd device.""" 1444 servo = self.get_servo_type() 1445 return 'ccd' in servo and not self.main_device_is_flex() 1446 1447 def main_device_is_flex(self): 1448 """Whether the main servo device (no prefixes) is a legacy device.""" 1449 servo = self.get_servo_type() 1450 return any([flex in servo for flex in self.FLEX_SERVOS]) 1451 1452 def main_device_uses_gsc_drv(self): 1453 """Whether the main servo device uses gsc drivers. 1454 1455 Servo may use gsc wp or console commands to control the dut. These 1456 get restricted with ccd capabilities. This returns true if some of 1457 the servo functionality will be disabled if ccd is restricted. 1458 """ 1459 return self.get_main_servo_device() in self.GSC_DRV_SERVOS 1460 1461 def _initialize_programmer(self, rw_only=False): 1462 """Initialize the firmware programmer. 1463 1464 @param rw_only: True to initialize a programmer which only 1465 programs the RW portions. 1466 """ 1467 if self._programmer: 1468 return 1469 # Initialize firmware programmer 1470 servo_type = self.get_servo_type() 1471 if servo_type.startswith('servo_v2'): 1472 self._programmer = firmware_programmer.ProgrammerV2(self) 1473 self._programmer_rw = firmware_programmer.ProgrammerV2RwOnly(self) 1474 # Both servo v3 and v4 use the same programming methods so just leverage 1475 # ProgrammerV3 for servo v4 as well. 1476 elif (servo_type.startswith('servo_v3') 1477 or servo_type.startswith('servo_v4')): 1478 self._programmer = firmware_programmer.ProgrammerV3(self) 1479 self._programmer_rw = firmware_programmer.ProgrammerV3RwOnly(self) 1480 else: 1481 raise error.TestError( 1482 'No firmware programmer for servo version: %s' % 1483 self.get_servo_type()) 1484 1485 1486 def program_bios(self, image, rw_only=False, copy_image=True): 1487 """Program bios on DUT with given image. 1488 1489 @param image: a string, file name of the BIOS image to program 1490 on the DUT. 1491 @param rw_only: True to only program the RW portion of BIOS. 1492 @param copy_image: True indicates we need scp the image to servohost 1493 while False means the image file is already on 1494 servohost. 1495 @return: a string, full path name of the copied file on the remote. 1496 """ 1497 self._initialize_programmer() 1498 # We don't need scp if test runs locally. 1499 if copy_image and not self.is_localhost(): 1500 image = self._scp_image(image) 1501 if rw_only: 1502 self._programmer_rw.program_bios(image) 1503 else: 1504 self._programmer.program_bios(image) 1505 return image 1506 1507 1508 def program_ec(self, image, rw_only=False, copy_image=True): 1509 """Program ec on DUT with given image. 1510 1511 @param image: a string, file name of the EC image to program 1512 on the DUT. 1513 @param rw_only: True to only program the RW portion of EC. 1514 @param copy_image: True indicates we need scp the image to servohost 1515 while False means the image file is already on 1516 servohost. 1517 @return: a string, full path name of the copied file on the remote. 1518 """ 1519 self._initialize_programmer() 1520 # We don't need scp if test runs locally. 1521 if copy_image and not self.is_localhost(): 1522 image = self._scp_image(image) 1523 if rw_only: 1524 self._programmer_rw.program_ec(image) 1525 else: 1526 self._programmer.program_ec(image) 1527 return image 1528 1529 1530 def extract_ec_image(self, board, model, tarball_path, fake_image=False): 1531 """Helper function to extract EC image from downloaded tarball. 1532 1533 @param board: The DUT board name. 1534 @param model: The DUT model name. 1535 @param tarball_path: The path of the downloaded build tarball. 1536 @param fake_image: True to return a fake zero-filled image instead. 1537 1538 @return: Path to extracted EC image. 1539 """ 1540 1541 # Ignore extracting EC image and re-programming if not a Chrome EC 1542 chrome_ec = FAFTConfig(board).chrome_ec 1543 if not chrome_ec: 1544 logging.warning('Not a Chrome EC, ignore re-programming it') 1545 return None 1546 1547 # Most boards use the model name as the ec directory. 1548 ec_image_candidates = ['%s/ec.bin' % model] 1549 1550 if model == "dragonair": 1551 ec_image_candidates.append('dratini/ec.bin') 1552 1553 # If that isn't found try the name from the EC RO version. 1554 try: 1555 fw_target = self.get_ec_board().lower() 1556 ec_image_candidates.append('%s/ec.bin' % fw_target) 1557 except Exception as err: 1558 logging.warning('Failed to get ec_board value; ignoring') 1559 1560 # Fallback to the name of the board, and then a bare ec.bin. 1561 ec_image_candidates.append('%s/ec.bin' % board) 1562 ec_image_candidates.append('ec.bin') 1563 1564 # Extract EC image from tarball 1565 dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC') 1566 ec_image = _extract_image_from_tarball(tarball_path, 1567 dest_dir, 1568 ec_image_candidates, 1569 self.EXTRACT_TIMEOUT_SECS) 1570 1571 # Check if EC image was found and return path or raise error 1572 if ec_image: 1573 # Extract subsidiary binaries for EC 1574 # Find a monitor binary for NPCX_UUT chip type, if any. 1575 mon_candidates = [candidate.replace('ec.bin', 'npcx_monitor.bin') 1576 for candidate in ec_image_candidates] 1577 _extract_image_from_tarball(tarball_path, dest_dir, mon_candidates, 1578 self.EXTRACT_TIMEOUT_SECS) 1579 1580 if fake_image: 1581 # Create a small (25% of original size) zero-filled binary to 1582 # replace the real ec_image 1583 file_size = os.path.getsize(ec_image) / 4 1584 ec_image = os.path.join(os.path.dirname(ec_image), 1585 "zero_ec.bin") 1586 dump_cmd = 'dd if=/dev/zero of=%s bs=4096 count=%d' % ( 1587 os.path.join(dest_dir, ec_image), file_size / 4096) 1588 if server_utils.system(dump_cmd, ignore_status=True) != 0: 1589 return None 1590 1591 return os.path.join(dest_dir, ec_image) 1592 else: 1593 raise error.TestError('Failed to extract EC image from %s' % 1594 tarball_path) 1595 1596 1597 def extract_bios_image(self, board, model, tarball_path): 1598 """Helper function to extract BIOS image from downloaded tarball. 1599 1600 @param board: The DUT board name. 1601 @param model: The DUT model name. 1602 @param tarball_path: The path of the downloaded build tarball. 1603 1604 @return: Path to extracted BIOS image. 1605 """ 1606 1607 # Most boards use the model name as the image filename. 1608 bios_image_candidates = [ 1609 'image-%s.bin' % model, 1610 ] 1611 1612 if model == "dragonair": 1613 bios_image_candidates.append('image-dratini.bin') 1614 1615 # If that isn't found try the name from the EC RO version. 1616 try: 1617 fw_target = self.get_ec_board().lower() 1618 bios_image_candidates.append('image-%s.bin' % fw_target) 1619 except Exception as err: 1620 logging.warning('Failed to get ec_board value; ignoring') 1621 1622 # Fallback to the name of the board, and then a bare image.bin. 1623 bios_image_candidates.append('image-%s.bin' % board) 1624 bios_image_candidates.append('image.bin') 1625 1626 # Extract BIOS image from tarball 1627 dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS') 1628 bios_image = _extract_image_from_tarball(tarball_path, 1629 dest_dir, 1630 bios_image_candidates, 1631 self.EXTRACT_TIMEOUT_SECS) 1632 1633 # Check if BIOS image was found and return path or raise error 1634 if bios_image: 1635 return os.path.join(dest_dir, bios_image) 1636 else: 1637 raise error.TestError('Failed to extract BIOS image from %s' % 1638 tarball_path) 1639 1640 1641 def switch_usbkey(self, usb_state): 1642 """Connect USB flash stick to either host or DUT, or turn USB port off. 1643 1644 This function switches the servo multiplexer to provide electrical 1645 connection between the USB port J3 and either host or DUT side. It 1646 can also be used to turn the USB port off. 1647 1648 @param usb_state: A string, one of 'dut', 'host', or 'off'. 1649 'dut' and 'host' indicate which side the 1650 USB flash device is required to be connected to. 1651 'off' indicates turning the USB port off. 1652 1653 @raise: error.TestError in case the parameter is not 'dut' 1654 'host', or 'off'. 1655 """ 1656 if self.get_usbkey_state() == usb_state: 1657 return 1658 1659 if usb_state == 'off': 1660 self.set_nocheck('image_usbkey_pwr', 'off') 1661 return 1662 elif usb_state == 'host': 1663 mux_direction = 'servo_sees_usbkey' 1664 elif usb_state == 'dut': 1665 mux_direction = 'dut_sees_usbkey' 1666 else: 1667 raise error.TestError('Unknown USB state request: %s' % usb_state) 1668 # On the servod side, this control will ensure that 1669 # - the port is power cycled if it is changing directions 1670 # - the port ends up in a powered state after this call 1671 # - if facing the host side, the call only returns once a usb block 1672 # device is detected, or after a generous timeout (10s) 1673 self.set('image_usbkey_direction', mux_direction) 1674 # As servod makes no guarantees when switching to the dut side, 1675 # add a detection delay here when facing the dut. 1676 if mux_direction == 'dut_sees_usbkey': 1677 time.sleep(self.USB_DETECTION_DELAY) 1678 1679 def get_usbkey_state(self): 1680 """Get which side USB is connected to or 'off' if usb power is off. 1681 1682 @return: A string, one of 'dut', 'host', or 'off'. 1683 """ 1684 pwr = self.get('image_usbkey_pwr') 1685 if pwr == 'off': 1686 return pwr 1687 direction = self.get('image_usbkey_direction') 1688 if direction == 'servo_sees_usbkey': 1689 return 'host' 1690 if direction == 'dut_sees_usbkey': 1691 return 'dut' 1692 raise error.TestFail('image_usbkey_direction set an unknown mux ' 1693 'direction: %s' % direction) 1694 1695 def set_servo_v4_role(self, role): 1696 """Set the power role of servo v4, either 'src' or 'snk'. 1697 1698 It does nothing if not a servo v4. 1699 1700 @param role: Power role for DUT port on servo v4, either 'src' or 'snk'. 1701 """ 1702 if not self.get_servo_type().startswith('servo_v4'): 1703 logging.debug('Not a servo v4, unable to set role to %s.', role) 1704 return 1705 1706 if not self.has_control('servo_pd_role'): 1707 logging.debug( 1708 'Servo does not has servo_v4_role control, unable' 1709 ' to set role to %s.', role) 1710 return 1711 1712 value = self.get('servo_pd_role') 1713 if value != role: 1714 self.set_nocheck('servo_pd_role', role) 1715 else: 1716 logging.debug('Already in the role: %s.', role) 1717 1718 def get_servo_v4_role(self): 1719 """Get the power role of servo v4, either 'src' or 'snk'. 1720 1721 It returns None if not a servo v4. 1722 """ 1723 if not self.get_servo_type().startswith('servo_v4'): 1724 logging.debug('Not a servo v4, unable to get role') 1725 return None 1726 1727 if not self.has_control('servo_pd_role'): 1728 logging.debug( 1729 'Servo does not has servo_v4_role control, unable' 1730 ' to get the role.') 1731 return None 1732 1733 return self.get('servo_pd_role') 1734 1735 def set_servo_v4_pd_comm(self, en): 1736 """Set the PD communication of servo v4, either 'on' or 'off'. 1737 1738 It does nothing if not a servo v4. 1739 1740 @param en: a string of 'on' or 'off' for PD communication. 1741 """ 1742 if self.get_servo_type().startswith('servo_v4'): 1743 self.set_nocheck('servo_pd_comm', en) 1744 else: 1745 logging.debug('Not a servo v4, unable to set PD comm to %s.', en) 1746 1747 def supports_built_in_pd_control(self): 1748 """Return whether the servo type supports pd charging and control.""" 1749 # Only servo v4 type-c supports this feature. 1750 if not self.is_servo_v4_type_c(): 1751 logging.info('PD controls require a servo v4 type-c.') 1752 return False 1753 # Lastly, one cannot really do anything without a plugged in charger. 1754 chg_port_mv = self.get('ppchg5_mv') 1755 if chg_port_mv < V4_CHG_ATTACHED_MIN_VOLTAGE_MV: 1756 logging.info( 1757 'It appears that no charger is plugged into servo v4. ' 1758 'Charger port voltage: %dmV', chg_port_mv) 1759 return False 1760 logging.info('Charger port voltage: %dmV', chg_port_mv) 1761 return True 1762 1763 def dts_mode_is_valid(self): 1764 """Return whether servo setup supports dts mode control for cr50.""" 1765 # Only servo v4 type-c supports this feature. 1766 return self.is_servo_v4_type_c() 1767 1768 def dts_mode_is_safe(self): 1769 """Return whether servo setup supports dts mode without losing access. 1770 1771 DTS mode control exists but the main device might go through ccd. 1772 In that case, it's only safe to control dts mode if the main device 1773 is legacy as otherwise the connection to the main device cuts out. 1774 """ 1775 return self.dts_mode_is_valid() and self.main_device_is_flex() 1776 1777 def get_dts_mode(self): 1778 """Return servo dts mode. 1779 1780 @returns: on/off whether dts is on or off 1781 """ 1782 if not self.dts_mode_is_valid(): 1783 logging.info('Not a valid servo setup. Unable to get dts mode.') 1784 return 1785 return self.get('servo_dts_mode') 1786 1787 def ccd_watchdog_enable(self, enable): 1788 """Control the ccd watchdog.""" 1789 if 'ccd' not in self.get_servo_type(): 1790 return 1791 if self._ccd_watchdog_disabled and enable: 1792 logging.info('CCD watchdog disabled for test') 1793 return 1794 control = 'watchdog_add' if enable else 'watchdog_remove' 1795 self.set_nocheck(control, 'ccd') 1796 1797 def disable_ccd_watchdog_for_test(self): 1798 """Prevent servo from enabling the watchdog.""" 1799 self._ccd_watchdog_disabled = True 1800 self.ccd_watchdog_enable(False) 1801 1802 def allow_ccd_watchdog_for_test(self): 1803 """Allow servo to enable the ccd watchdog.""" 1804 self._ccd_watchdog_disabled = False 1805 self.ccd_watchdog_enable(True) 1806 1807 def set_dts_mode(self, state): 1808 """Set servo dts mode to off or on. 1809 1810 It does nothing if not a servo v4. Disable the ccd watchdog if we're 1811 disabling dts mode. CCD will disconnect. The watchdog only allows CCD 1812 to disconnect for 10 seconds until it kills servod. Disable the 1813 watchdog, so CCD can stay disconnected indefinitely. 1814 1815 @param state: Set servo v4 dts mode 'off' or 'on'. 1816 """ 1817 if not self.dts_mode_is_valid(): 1818 logging.info('Not a valid servo setup. Unable to set dts mode %s.', 1819 state) 1820 return 1821 1822 enable_watchdog = state == 'on' 1823 1824 if not enable_watchdog: 1825 self.ccd_watchdog_enable(False) 1826 1827 self.set_nocheck('servo_dts_mode', state) 1828 1829 if enable_watchdog: 1830 self.ccd_watchdog_enable(True) 1831 1832 1833 def _get_servo_type_fw_version(self, servo_type, prefix=''): 1834 """Helper to handle fw retrieval for micro/v4 vs ccd. 1835 1836 @param servo_type: one of 'servo_v4', 'servo_micro', 'c2d2', 1837 'ccd_cr50', or 'ccd_gsc' 1838 @param prefix: whether the control has a prefix 1839 1840 @returns: fw version for non-ccd devices, cr50 version for ccd device 1841 """ 1842 # If it's a ccd device, remove the 'ccd_' prefix to find the firmware 1843 # name. 1844 if servo_type.startswith(self.CCD_PREFIX): 1845 servo_type = servo_type[len(self.CCD_PREFIX)::] 1846 cmd = '%s_version' % servo_type 1847 try: 1848 return self.get(cmd, prefix=prefix) 1849 except error.TestFail: 1850 # Do not fail here, simply report the version as unknown. 1851 logging.warning('Unable to query %r to get servo fw version.', cmd) 1852 return 'unknown' 1853 1854 1855 def get_servo_fw_versions(self): 1856 """Retrieve a summary of attached servos and their firmware. 1857 1858 Note: that only the Google firmware owned servos supports this e.g. 1859 micro, v4, etc. For anything else, the dictionary will have no entry. 1860 If no device is has Google owned firmware (e.g. v3) then the result 1861 is an empty dictionary. 1862 1863 @returns: dict, a collection of each attached servo & their firmware. 1864 """ 1865 def get_fw_version_tag(tag, dev): 1866 return '%s_version.%s' % (dev, tag) 1867 1868 fw_versions = {} 1869 # Note, this works because v4p1 starts with v4 as well. 1870 # TODO(coconutruben): make this more robust so that it can work on 1871 # a future v-whatever as well. 1872 if 'servo_v4' not in self.get_servo_type(): 1873 return {} 1874 # v4 or v4p1 1875 v4_flavor = self.get_servo_type().split('_with_')[0] 1876 v4_tag = get_fw_version_tag('root', v4_flavor) 1877 fw_versions[v4_tag] = self._get_servo_type_fw_version('servo_fw', 1878 prefix='root') 1879 if 'with' in self.get_servo_type(): 1880 dut_devs = self.get_servo_type().split('_with_')[1].split('_and_') 1881 main_tag = get_fw_version_tag('main', dut_devs[0]) 1882 fw_versions[main_tag] = self._get_servo_type_fw_version(dut_devs[0]) 1883 if len(dut_devs) == 2: 1884 # Right now, the only way for this to happen is for a dual setup 1885 # to exist where ccd is attached on top of servo micro. Thus, we 1886 # know that the prefix is ccd_cr50 and the type is ccd_cr50. 1887 # TODO(coconutruben): If the new servod is not deployed by 1888 # the time that there are more cases of '_and_' devices, 1889 # this needs to be reworked. 1890 dual_tag = get_fw_version_tag('ccd_flex_secondary', dut_devs[1]) 1891 fw = self._get_servo_type_fw_version(dut_devs[1], dut_devs[1]) 1892 fw_versions[dual_tag] = fw 1893 return fw_versions 1894 1895 @property 1896 def uart_logs_dir(self): 1897 """Return the directory to save UART logs.""" 1898 return self._uart.logs_dir if self._uart else "" 1899 1900 1901 @uart_logs_dir.setter 1902 def uart_logs_dir(self, logs_dir): 1903 """Set directory to save UART logs. 1904 1905 @param logs_dir String of directory name.""" 1906 self._uart.logs_dir = logs_dir 1907 1908 def get_uart_logfile(self, uart): 1909 """Return the path to the uart log file.""" 1910 return self._uart.get_logfile(uart) 1911 1912 def record_uart_capture(self, outdir=None): 1913 """Save uart stream output.""" 1914 if outdir and not self.uart_logs_dir: 1915 self.uart_logs_dir = outdir 1916 self._uart.dump() 1917 1918 def close(self, outdir=None): 1919 """Close the servo object.""" 1920 # We want to ensure that servo_v4 is in src mode to avoid DUTs 1921 # left in discharge state after a task. 1922 try: 1923 self.set_servo_v4_role('src') 1924 except Exception as e: 1925 logging.info( 1926 'Unexpected error while setting servo_v4 role' 1927 ' to src; %s', e) 1928 1929 self._uart.stop_capture() 1930 self.record_uart_capture(outdir) 1931 1932 def ec_reboot(self): 1933 """Reboot Just the embedded controller.""" 1934 self.set_nocheck('ec_uart_flush', 'off') 1935 self.set_nocheck('ec_uart_cmd', 'reboot') 1936 self.set_nocheck('ec_uart_flush', 'on') 1937 1938 def get_vbus_voltage(self): 1939 """Get the voltage of VBUS'. 1940 1941 @returns The voltage of VBUS, if vbus_voltage is supported. 1942 None , if vbus_voltage is not supported. 1943 """ 1944 if not self.has_control('vbus_voltage'): 1945 logging.debug('Servo does not have vbus_voltage control,' 1946 'unable to get vbus voltage') 1947 return None 1948 1949 return self.get('vbus_voltage') 1950 1951 def supports_eth_power_control(self): 1952 """True if servo supports power management for ethernet dongle.""" 1953 return self.has_control('dut_eth_pwr_en') 1954 1955 def set_eth_power(self, state): 1956 """Set ethernet dongle power state, either 'on' or 'off'. 1957 1958 Note: this functionality is supported only on servo v4p1. 1959 1960 @param state: a string of 'on' or 'off'. 1961 """ 1962 if state != 'off' and state != 'on': 1963 raise error.TestError('Unknown ethernet power state request: %s' % 1964 state) 1965 1966 if not self.supports_eth_power_control(): 1967 logging.info('Not a supported servo setup. Unable to set ethernet' 1968 'dongle power state %s.', state) 1969 return 1970 1971 self.set_nocheck('dut_eth_pwr_en', state) 1972 1973 def eth_power_reset(self): 1974 """Reset ethernet dongle power state if supported'. 1975 1976 It does nothing if servo setup does not support power management for 1977 the etherent dongle, only log information about this. 1978 """ 1979 if self.supports_eth_power_control(): 1980 logging.info("Resetting servo's Ethernet controller...") 1981 self.set_eth_power('off') 1982 time.sleep(1) 1983 self.set_eth_power('on') 1984 else: 1985 logging.info("Trying to reset servo's Ethernet controller, but" 1986 "this feature is not supported on used servo setup.") 1987