1# Lint as: python2, python3 2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import print_function 7 8import ctypes 9import logging 10import os 11import pprint 12import re 13import time 14import uuid 15from xml.parsers import expat 16 17import six 18from autotest_lib.client.bin import utils 19from autotest_lib.client.common_lib import error, global_config 20from autotest_lib.client.common_lib.cros import dev_server, retry, tpm_utils 21from autotest_lib.server import test 22from autotest_lib.server.cros import vboot_constants as vboot 23from autotest_lib.server.cros.faft import telemetry 24from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 25from autotest_lib.server.cros.faft.utils import (menu_mode_switcher, 26 menu_navigator, mode_switcher) 27from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig 28from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 29from autotest_lib.server.cros.power import utils as PowerUtils 30from autotest_lib.server.cros.servo import (chrome_base_ec, chrome_cr50, 31 chrome_ec, chrome_ti50, servo) 32from autotest_lib.site_utils import test_runner_utils 33 34# Experimentally tuned time in minutes to wait for partition device nodes on a 35# USB stick to be ready after plugging in the stick. 36PARTITION_TABLE_READINESS_TIMEOUT = 0.1 # minutes 37# Experimentally tuned time in seconds to wait for the first retry of reading 38# the sysfs node of a USB stick's partition device node. 39PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY = 1 # seconds 40 41ConnectionError = mode_switcher.ConnectionError 42 43 44class FirmwareTest(test.test): 45 """ 46 Base class that sets up helper objects/functions for firmware tests. 47 48 It launches the FAFTClient on DUT, such that the test can access its 49 firmware functions and interfaces. It also provides some methods to 50 handle the reboot mechanism, in order to ensure FAFTClient is still 51 connected after reboot. 52 @type servo: servo.Servo 53 @type _client: autotest_lib.server.hosts.ssh_host.SSHHost | 54 autotest_lib.server.hosts.cros_host.CrosHost 55 56 TODO: add documentaion as the FAFT rework progresses. 57 """ 58 version = 1 59 60 # Set this to True in test classes that need to boot from the USB stick. 61 # When True, initialize() will raise TestWarn if USB stick is marked bad. 62 NEEDS_SERVO_USB = False 63 64 # Mapping of partition number of kernel and rootfs. 65 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 66 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 67 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 68 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 69 70 # Mapping of kernel type and name. 71 KERNEL_TYPE_NAME_MAP = {'KERN': 'kernel', 'MINIOS': 'minios'} 72 73 CHROMEOS_MAGIC = "CHROMEOS" 74 CORRUPTED_MAGIC = "CORRUPTD" 75 76 # System power states 77 POWER_STATE_S0 = 'S0' 78 POWER_STATE_S0IX = 'S0ix' 79 POWER_STATE_S3 = 'S3' 80 POWER_STATE_S5 = 'S5' 81 POWER_STATE_G3 = 'G3' 82 POWER_STATE_SUSPEND = '|'.join([POWER_STATE_S0IX, POWER_STATE_S3]) 83 84 # Delay for waiting client to return before EC suspend 85 EC_SUSPEND_DELAY = 5 86 87 # Delay between EC suspend and wake 88 WAKE_DELAY = 10 89 90 # Delay between closing and opening lid 91 LID_DELAY = 1 92 93 # Delay for establishing state after changing PD settings 94 PD_RESYNC_DELAY = 2 95 96 # Delay to wait for servo USB to work after power role swap: 97 # tPSSourceOff (920ms) + tPSSourceOn (480ms) + buffer 98 POWER_ROLE_SWAP_DELAY = 2 99 100 # The default number of power state check retries (each try takes 3 secs) 101 DEFAULT_PWR_RETRIES = 5 102 103 # FWMP space constants 104 FWMP_CLEARED_EXIT_STATUS = 1 105 FWMP_CLEARED_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS' 106 '_INVALID') 107 108 _ROOTFS_PARTITION_NUMBER = 3 109 110 # Class level variable, keep track the states of one time setup. 111 # This variable is preserved across tests which inherit this class. 112 _global_setup_done = { 113 'gbb_flags': False, 114 'reimage': False, 115 'usb_check': False, 116 } 117 118 # CCD password used by tests. 119 CCD_PASSWORD = 'Password' 120 121 RESPONSE_TIMEOUT = 180 122 123 @classmethod 124 def check_setup_done(cls, label): 125 """Check if the given setup is done. 126 127 @param label: The label of the setup. 128 """ 129 return cls._global_setup_done[label] 130 131 @classmethod 132 def mark_setup_done(cls, label): 133 """Mark the given setup done. 134 135 @param label: The label of the setup. 136 """ 137 cls._global_setup_done[label] = True 138 139 @classmethod 140 def unmark_setup_done(cls, label): 141 """Mark the given setup not done. 142 143 @param label: The label of the setup. 144 """ 145 cls._global_setup_done[label] = False 146 147 def initialize(self, host, cmdline_args, ec_wp=None): 148 """Initialize the FirmwareTest. 149 150 This method interacts with the Servo, FAFT RPC client, FAFT Config, 151 Mode Switcher, EC consoles, write-protection, GBB flags, and a lockfile. 152 153 @type host: autotest_lib.server.hosts.CrosHost 154 """ 155 self.run_id = str(uuid.uuid4()) 156 self._client = host 157 self.servo = host.servo 158 if self.servo is None: 159 raise error.TestError('FirmwareTest failed to set up servo') 160 161 self.lockfile = '/usr/local/tmp/faft/lock' 162 self._backup_gbb_flags = None 163 self._backup_firmware_identity = dict() 164 self._backup_kernel_sha = dict() 165 for kernel_type in self.KERNEL_TYPE_NAME_MAP: 166 self._backup_kernel_sha[kernel_type] = dict() 167 self._backup_cgpt_attr = dict() 168 self._backup_dev_mode = None 169 self._restore_power_mode = None 170 self._uart_file_dict = {} 171 172 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 173 174 # Parse arguments from command line 175 args = {} 176 self.power_control = host.POWER_CONTROL_RPM 177 for arg in cmdline_args: 178 match = re.search("^(\w+)=(.+)", arg) 179 if match: 180 args[match.group(1)] = match.group(2) 181 182 self._no_fw_rollback_check = False 183 if 'no_fw_rollback_check' in args: 184 if 'true' in args['no_fw_rollback_check'].lower(): 185 self._no_fw_rollback_check = True 186 187 self._no_ec_sync = False 188 if 'no_ec_sync' in args: 189 if 'true' in args['no_ec_sync'].lower(): 190 self._no_ec_sync = True 191 192 self._use_sync_script = global_config.global_config.get_config_value( 193 'CROS', 'enable_fs_sync_script', type=bool, default=False) 194 195 self.servo.initialize_dut() 196 self.faft_client = RPCProxy(host) 197 self.faft_config = FAFTConfig( 198 self.faft_client.system.get_platform_name(), 199 self.faft_client.system.get_model_name()) 200 self.checkers = FAFTCheckers(self) 201 202 # Mapping of kernel type and kernel servicer class 203 self.kernel_servicer = { 204 'KERN': self.faft_client.kernel, 205 'MINIOS': self.faft_client.minios, 206 } 207 208 if self.faft_config.chrome_ec: 209 self.ec = chrome_ec.ChromeEC(self.servo) 210 self.menu_navigator = menu_navigator.create_menu_navigator(self) 211 self.switcher = mode_switcher.create_mode_switcher( 212 self, self.menu_navigator) 213 # This will be None for menuless UI 214 self.menu_switcher = menu_mode_switcher.create_menu_mode_switcher( 215 self, self.menu_navigator) 216 # Check for presence of a USBPD console 217 if self.faft_config.chrome_usbpd: 218 self.usbpd = chrome_ec.ChromeUSBPD(self.servo) 219 elif self.faft_config.chrome_ec: 220 # If no separate USBPD console, then PD exists on EC console 221 self.usbpd = self.ec 222 # Get pdtester console 223 self.pdtester = host.pdtester 224 self.pdtester_host = host._pdtester_host 225 gsc = None 226 if self.servo.has_control('ti50_version') or \ 227 self.servo.has_control('ti50_version', 'ccd_gsc'): 228 gsc = chrome_ti50.ChromeTi50(self.servo, self.faft_config) 229 elif self.servo.has_control('cr50_version'): 230 gsc = chrome_cr50.ChromeCr50(self.servo, self.faft_config) 231 if gsc: 232 try: 233 # Check that the gsc console works before declaring the 234 # connection exists and enabling uart capture. 235 gsc.get_version() 236 self.cr50 = gsc 237 except servo.ControlUnavailableError: 238 logging.warning('gsc console not supported.') 239 except Exception as e: 240 logging.warning('Ignored unknown gsc version error: %s', 241 str(e)) 242 243 if 'power_control' in args: 244 self.power_control = args['power_control'] 245 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 246 raise error.TestError('Valid values for --args=power_control ' 247 'are %s. But you entered wrong argument ' 248 'as "%s".' 249 % (host.POWER_CONTROL_VALID_ARGS, 250 self.power_control)) 251 252 if self.NEEDS_SERVO_USB and not host.is_servo_usb_usable(): 253 usb_state = host.get_servo_usb_state() 254 raise error.TestWarn( 255 "Servo USB disk unusable (%s); canceling test." % 256 usb_state) 257 258 if not self.faft_client.system.dev_tpm_present(): 259 raise error.TestError('/dev/tpm0 does not exist on the client') 260 261 # Initialize servo role to src 262 self.servo.set_servo_v4_role('src') 263 264 # Create the BaseEC object. None if not available. 265 self.base_ec = chrome_base_ec.create_base_ec(self.servo) 266 267 self._record_uart_capture() 268 self._record_system_info() 269 self.faft_client.system.set_dev_default_boot() 270 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 271 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 272 if self.fw_vboot2: 273 self.faft_client.system.set_fw_try_next('A') 274 if self.faft_client.system.get_crossystem_value( 275 'mainfw_act') == 'B': 276 logging.info('mainfw_act is B. rebooting to set it A') 277 # TODO(crbug.com/1018322): remove try/catch once that bug is 278 # marked as fixed and verified. In that case the overlay for 279 # the board itself will map warm_reset to cold_reset. 280 try: 281 self.switcher.mode_aware_reboot() 282 except ConnectionError as e: 283 if 'DUT is still up unexpectedly' in str(e): 284 # In this case, try doing a cold_reset instead 285 self.switcher.mode_aware_reboot(reboot_type='cold') 286 else: 287 raise 288 289 # Check flashrom before first use, to avoid xmlrpclib.Fault. 290 if not self.faft_client.bios.is_available(): 291 raise error.TestError( 292 "flashrom is broken; check 'flashrom -p host'" 293 "and rpc server log.") 294 295 self._setup_gbb_flags() 296 self.faft_client.updater.stop_daemon() 297 self._create_faft_lockfile() 298 self._setup_ec_write_protect(ec_wp) 299 # See chromium:239034 regarding needing this sync. 300 self.blocking_sync() 301 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 302 303 def stage_build_to_usbkey(self): 304 """Downloads host's build to the USB key attached to servo. 305 306 @return: True if build is verified to be on USB key, False otherwise. 307 """ 308 info = self._client.host_info_store.get() 309 if info.build and info.build != test_runner_utils.NO_BUILD: 310 current_build = self._client._servo_host.validate_image_usbkey() 311 if current_build != info.build: 312 logging.debug('Current build on USB: %s differs from test' 313 ' build: %s, proceed with download.', 314 current_build, info.build) 315 try: 316 self._client.stage_build_to_usb(info.build) 317 return True 318 except (error.AutotestError, 319 dev_server.DevServerException) as e: 320 logging.warning( 321 'Stage build to USB failed, tests that require' 322 ' test image on Servo USB may fail: {}'.format(e)) 323 return False 324 else: 325 logging.debug('Current build on USB: %s is same as test' 326 ' build, skip download.', current_build) 327 return True 328 else: 329 logging.warning('Failed to get build label from the DUT, will use' 330 ' existing image in Servo USB.') 331 return False 332 333 def run_once(self, *args, **dargs): 334 """Delegates testing to a test method. 335 336 test_name is either the 1st positional argument or a named argument. 337 338 test_name will be mapped to a test method as follows: 339 test_name method 340 -------------- ----------- 341 <TestClass> test 342 <TestClass>.<Case> test_<Case> 343 <TestClass>.<Case>.<SubCase> test_<Case>_<SubCase> 344 345 Any arguments not consumed by FirmwareTest are passed to the test method. 346 347 @param test_name: Should be set to NAME in the control file. 348 349 @raise TestError: If test_name wasn't found in args, does not start 350 with test class, or if the method is not found. 351 """ 352 self_name = type(self).__name__ 353 354 # Parse and remove test name from args. 355 if 'test_name' in dargs: 356 test_name = dargs.pop('test_name') 357 elif len(args) >= 1: 358 test_name = args[0] 359 args = args[1:] 360 else: 361 raise error.TestError('"%s" class must define run_once, or the' 362 ' control file must specify "test_name".' % 363 self_name) 364 365 # Check that test_name starts with the test class name. 366 name_parts = test_name.split('.') 367 368 test_class = name_parts.pop(0) 369 if test_class != self_name: 370 raise error.TestError('Class "%s" does not match that found in test' 371 ' name "%s"' % (self_name, test_class)) 372 373 # Construct and call the test method. 374 method_name = '_'.join(['test'] + name_parts) 375 if not hasattr(self, method_name): 376 raise error.TestError('Method "%s" for testing "%s" not found in' 377 ' "%s"' % (method_name, test_name, self_name)) 378 379 logging.info('Starting test: "%s"', test_name) 380 utils.cherry_pick_call(getattr(self, method_name), *args, **dargs) 381 382 def cleanup(self): 383 """Autotest cleanup function.""" 384 # Unset state checker in case it's set by subclass 385 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 386 387 # Capture UART before doing anything else, so we can guarantee we get 388 # some uart results. 389 try: 390 self._record_uart_capture() 391 except: 392 logging.warning('Failed initial uart capture during cleanup') 393 394 try: 395 self.faft_client.system.is_available() 396 except: 397 # Remote is not responding. Revive DUT so that subsequent tests 398 # don't fail. 399 self._restore_routine_from_timeout() 400 401 if hasattr(self, 'switcher'): 402 self.switcher.restore_mode() 403 404 self._restore_ec_write_protect() 405 self._restore_servo_v4_role() 406 407 if hasattr(self, 'faft_client'): 408 self._restore_gbb_flags() 409 self.faft_client.updater.start_daemon() 410 self.faft_client.updater.cleanup() 411 self._remove_faft_lockfile() 412 self.faft_client.quit() 413 self.faft_client.collect_logfiles(self.resultsdir) 414 415 # Capture any new uart output, then discard log messages again. 416 self._cleanup_uart_capture() 417 418 super(FirmwareTest, self).cleanup() 419 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 420 421 def _record_system_info(self): 422 """Record some critical system info to the attr keyval. 423 424 This info is used by generate_test_report later. 425 """ 426 system_info = { 427 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 428 'ec_version': self.faft_client.ec.get_version(), 429 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 430 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 431 'servo_host_os_version' : self.servo.get_os_version(), 432 'servod_version': self.servo.get_servod_version(), 433 'os_version': self._client.get_release_builder_path(), 434 'servo_type': self.servo.get_servo_version() 435 } 436 437 # Record the servo v4 and servo micro versions when possible 438 system_info.update(self.servo.get_servo_fw_versions()) 439 440 if hasattr(self, 'cr50'): 441 system_info['cr50_version'] = self.cr50.get_full_version() 442 443 logging.info('System info:\n%s', pprint.pformat(system_info)) 444 self.write_attr_keyval(system_info) 445 446 def invalidate_firmware_setup(self): 447 """Invalidate all firmware related setup state. 448 449 This method is called when the firmware is re-flashed. It resets all 450 firmware related setup states so that the next test setup properly 451 again. 452 """ 453 self.unmark_setup_done('gbb_flags') 454 455 def _retrieve_recovery_reason_from_trap(self): 456 """Try to retrieve the recovery reason from a trapped recovery screen. 457 458 @return: The recovery_reason, 0 if any error. 459 """ 460 recovery_reason = 0 461 logging.info('Try to retrieve recovery reason...') 462 if self.servo.get_usbkey_state() == 'dut': 463 self.switcher.bypass_rec_mode() 464 else: 465 self.servo.switch_usbkey('dut') 466 467 try: 468 self.switcher.wait_for_client() 469 lines = self.faft_client.system.run_shell_command_get_output( 470 'crossystem recovery_reason') 471 recovery_reason = int(lines[0]) 472 logging.info('Got the recovery reason %d.', recovery_reason) 473 except ConnectionError: 474 logging.error('Failed to get the recovery reason due to connection ' 475 'error.') 476 return recovery_reason 477 478 def _reset_client(self): 479 """Reset client to a workable state. 480 481 This method is called when the client is not responsive. It may be 482 caused by the following cases: 483 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 484 - corrupted firmware; 485 - corrutped OS image. 486 """ 487 # DUT may halt on a firmware screen. Try cold reboot. 488 logging.info('Try cold reboot...') 489 self.switcher.mode_aware_reboot(reboot_type='cold', 490 sync_before_boot=False, 491 wait_for_dut_up=False) 492 self.switcher.wait_for_client_offline() 493 self.switcher.bypass_dev_mode() 494 try: 495 self.switcher.wait_for_client() 496 return 497 except ConnectionError: 498 logging.warning("Cold reboot didn't help, still connection error.") 499 500 # DUT may be broken by a corrupted firmware. Restore firmware. 501 # We assume the recovery boot still works fine. Since the recovery 502 # code is in RO region and all FAFT tests don't change the RO region 503 # except GBB. 504 if self.is_firmware_saved(): 505 self._ensure_client_in_recovery() 506 logging.info('Try restoring the original firmware...') 507 try: 508 if self.restore_firmware(): 509 return 510 except ConnectionError: 511 logging.warning("Restoring firmware didn't help, still " 512 "connection error.") 513 514 # Perhaps it's kernel that's broken. Let's try restoring it. 515 for kernel_type, kernel_name in self.KERNEL_TYPE_NAME_MAP.items(): 516 if self.is_kernel_saved(kernel_type): 517 self._ensure_client_in_recovery() 518 logging.info('Try restoring the original %s...', kernel_name) 519 try: 520 if self.restore_kernel(kernel_type=kernel_type): 521 return 522 except ConnectionError: 523 logging.warning( 524 "Restoring %s didn't help, still " 525 "connection error.", kernel_name) 526 527 # DUT may be broken by a corrupted OS image. Restore OS image. 528 self._ensure_client_in_recovery() 529 logging.info('Try restoring the OS image...') 530 self.faft_client.system.run_shell_command('chromeos-install --yes') 531 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 532 self.switcher.wait_for_client_offline() 533 self.switcher.bypass_dev_mode() 534 try: 535 self.switcher.wait_for_client() 536 logging.info('Successfully restored OS image.') 537 return 538 except ConnectionError: 539 logging.warning("Restoring OS image didn't help, still connection " 540 "error.") 541 542 def _ensure_client_in_recovery(self): 543 """Ensure client in recovery boot; reboot into it if necessary. 544 545 @raise TestError: if failed to boot the USB image. 546 """ 547 logging.info('Try booting into USB image...') 548 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 549 wait_for_dut_up=False) 550 self.servo.switch_usbkey('host') 551 self.switcher.bypass_rec_mode() 552 try: 553 self.switcher.wait_for_client() 554 except ConnectionError: 555 raise error.TestError('Failed to boot the USB image.') 556 557 def _restore_routine_from_timeout(self): 558 """A routine to try to restore the system from a timeout error. 559 560 This method is called when FAFT failed to connect DUT after reboot. 561 562 @raise TestFail: This exception is already raised, with a decription 563 why it failed. 564 """ 565 # DUT is disconnected. Capture the UART output for debug. 566 self._record_uart_capture() 567 568 # Replug the Ethernet to identify if it is a network flaky. 569 self.servo.eth_power_reset() 570 571 recovery_reason = self._retrieve_recovery_reason_from_trap() 572 573 # Reset client to a workable state. 574 self._reset_client() 575 576 # Raise the proper TestFail exception. 577 if recovery_reason: 578 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 579 'and timed out' % recovery_reason) 580 else: 581 raise error.TestFail('Timed out waiting for DUT reboot') 582 583 def assert_test_image_in_usb_disk(self, usb_dev=None): 584 """Assert an USB disk plugged-in on servo and a test image inside. 585 586 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 587 If None, it is detected automatically. 588 @raise TestError: if USB disk not detected or not a test image. 589 """ 590 if self.check_setup_done('usb_check'): 591 return 592 if usb_dev: 593 assert self.servo.get_usbkey_state() == 'host' 594 else: 595 self.servo.switch_usbkey('host') 596 usb_dev = self.servo.probe_host_usb_dev() 597 if not usb_dev: 598 raise error.TestError( 599 'An USB disk should be plugged in the servo board. %s' % 600 telemetry.collect_usb_state(self.servo)) 601 602 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 603 logging.info('usb dev is %s', usb_dev) 604 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 605 # After the USB key is muxed from the DUT to the servo host, there 606 # appears to be a delay between when servod can confirm that a sysfs 607 # entry exists for the disk (as done by probe_host_usb_dev) and when 608 # sysfs entries get populated for the disk's partitions. 609 @retry.retry(error.AutoservRunError, 610 timeout_min=PARTITION_TABLE_READINESS_TIMEOUT, 611 delay_sec=PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY) 612 def confirm_rootfs_partition_device_node_readable(): 613 """Repeatedly poll for the RootFS partition sysfs node.""" 614 self.servo.system('ls {}'.format(rootfs)) 615 616 try: 617 confirm_rootfs_partition_device_node_readable() 618 except error.AutoservRunError as e: 619 usb_info = telemetry.collect_usb_state(self.servo) 620 raise error.TestError( 621 ('Could not ls the device node for the RootFS on the USB ' 622 'device. %s: %s\nMore telemetry: %s') % 623 (type(e).__name__, e, usb_info)) 624 try: 625 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 626 except error.AutoservRunError as e: 627 usb_info = telemetry.collect_usb_state(self.servo) 628 raise error.TestError( 629 ('Could not mount the partition on USB device. %s: %s\n' 630 'More telemetry: %s') % (type(e).__name__, e, usb_info)) 631 632 try: 633 usb_lsb = self.servo.system_output('cat %s' % 634 os.path.join(tmpd, 'etc/lsb-release')) 635 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 636 dut_lsb = '\n'.join(self.faft_client.system. 637 run_shell_command_get_output('cat /etc/lsb-release')) 638 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 639 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 640 raise error.TestError('USB stick in servo is no test image') 641 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 642 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 643 if usb_board != dut_board: 644 raise error.TestError('USB stick in servo contains a %s ' 645 'image, but DUT is a %s' % (usb_board, dut_board)) 646 finally: 647 for cmd in ('umount -l %s' % tmpd, 'sync %s' % usb_dev, 648 'rm -rf %s' % tmpd): 649 self.servo.system(cmd) 650 651 self.mark_setup_done('usb_check') 652 653 def setup_pdtester(self, flip_cc=False, dts_mode=False, pd_faft=True, 654 min_batt_level=None): 655 """Setup the PDTester to a given state. 656 657 @param flip_cc: True to flip CC polarity; False to not flip it. 658 @param dts_mode: True to config PDTester to DTS mode; False to not. 659 @param pd_faft: True to config PD FAFT setup. 660 @param min_batt_level: An int for minimum battery level, or None for 661 skip. 662 @raise TestError: If Servo v4 not setup properly. 663 """ 664 665 # PD FAFT is only tested with a combination of servo_v4 or servo_v4p1 666 # with servo micro or C2D2. 667 pd_setup = [] 668 for first in self.pdtester.FIRST_PD_SETUP_ELEMENT: 669 for second in self.pdtester.SECOND_PD_SETUP_ELEMENT: 670 pd_setup.append(first + '_with_' + second) 671 672 if pd_faft and self.pdtester.servo_type not in pd_setup: 673 raise error.TestError(', '.join(pd_setup) + 674 ' is a mandatory setup ' 675 'for PD FAFT. Got %s.' % 676 self.pdtester.servo_type) 677 678 # Ensure the battery is enough for testing, this should be done before 679 # all the following setup. 680 if (min_batt_level is not None) and self._client.has_battery(): 681 logging.info('Start charging if batt level < %d', min_batt_level) 682 PowerUtils.put_host_battery_in_range(self._client, min_batt_level, 683 100, 600) 684 685 # Servo v4 by default has dts_mode enabled. Enabling dts_mode affects 686 # the behaviors of what PD FAFT tests. So we want it disabled. 687 pd_tester_device = self.pdtester.servo_type.split('_with_')[0] 688 if pd_tester_device in self.pdtester.FIRST_PD_SETUP_ELEMENT: 689 self.servo.set_dts_mode('on' if dts_mode else 'off') 690 else: 691 logging.warning('Configuring DTS mode only supported on %s', 692 pd_tester_device) 693 694 self.pdtester.set('usbc_polarity', 'cc2' if flip_cc else 'cc1') 695 # Make it sourcing max voltage. 696 self.pdtester.charge(self.pdtester.USBC_MAX_VOLTAGE) 697 698 time.sleep(self.PD_RESYNC_DELAY) 699 700 # Servo v4 requires an external charger to source power. Make sure 701 # this setup is correct. 702 if pd_tester_device in self.pdtester.FIRST_PD_SETUP_ELEMENT: 703 role = self.pdtester.get('servo_pd_role') 704 if role != 'src': 705 raise error.TestError( 706 '%s is not sourcing power! Make sure the servo ' 707 '"DUT POWER" port is connected to a working charger. ' 708 'servo_pd_role:%s' % (pd_tester_device, role)) 709 710 def setup_usbkey(self, usbkey, host=None, used_for_recovery=None): 711 """Setup the USB disk for the test. 712 713 It checks the setup of USB disk and a valid ChromeOS test image inside. 714 It also muxes the USB disk to either the host or DUT by request. 715 716 @param usbkey: True if the USB disk is required for the test, False if 717 not required. 718 @param host: Optional, True to mux the USB disk to host, False to mux it 719 to DUT, default to do nothing. 720 @param used_for_recovery: Optional, True if the USB disk is used for 721 recovery boot; False if the USB disk is not 722 used for recovery boot, like Ctrl-U USB boot. 723 """ 724 if usbkey: 725 self.stage_build_to_usbkey() 726 self.assert_test_image_in_usb_disk() 727 elif host is None: 728 # USB disk is not required for the test. Better to mux it to host. 729 host = True 730 731 if host is True: 732 self.servo.switch_usbkey('host') 733 elif host is False: 734 self.servo.switch_usbkey('dut') 735 736 if used_for_recovery is None: 737 # Default value is True if usbkey == True. 738 # As the common usecase of USB disk is for recovery boot. Tests 739 # can define it explicitly if not. 740 used_for_recovery = usbkey 741 742 if used_for_recovery: 743 # In recovery boot, the locked EC RO doesn't support PD for most 744 # of the CrOS devices. The default servo v4 power role is a SRC. 745 # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the 746 # data role swap from UFP to DFP; as a result, DUT can't see the 747 # USB disk and the Ethernet dongle on servo v4. 748 # 749 # This is a workaround to set servo v4 as a SNK, for every FAFT 750 # test which boots into the USB disk in the recovery mode. 751 # 752 # TODO(waihong): Add a check to see if the battery level is too 753 # low and sleep for a while for charging. 754 self.set_servo_v4_role_to_snk() 755 756 # Force reconnection; otherwise, the next RPC call will timeout 757 logging.info('Waiting for reconnection after power role swap...') 758 time.sleep(self.POWER_ROLE_SWAP_DELAY) 759 self.faft_client.disconnect() 760 self.faft_client.connect() 761 762 def set_servo_v4_role_to_snk(self, pd_comm=False): 763 """Set the servo v4 role to SNK. 764 765 @param pd_comm: a bool. Enable PD communication if True, else otherwise 766 """ 767 self._needed_restore_servo_v4_role = True 768 self.servo.set_servo_v4_role('snk') 769 if pd_comm: 770 self.servo.set_servo_v4_pd_comm('on') 771 772 def _restore_servo_v4_role(self): 773 """Restore the servo v4 role to default SRC.""" 774 if not hasattr(self, '_needed_restore_servo_v4_role'): 775 return 776 if self._needed_restore_servo_v4_role: 777 self.servo.set_servo_v4_role('src') 778 779 def set_dut_low_power_idle_delay(self, delay): 780 """Set EC low power idle delay 781 782 @param delay: Delay in seconds 783 """ 784 if not self.ec.has_command('dsleep'): 785 logging.info("Can't set low power idle delay.") 786 return 787 self._previous_ec_low_power_delay = int( 788 self.ec.send_command_get_output("dsleep", 789 ["timeout:\s+(\d+)\ssec"])[0][1]) 790 self.ec.send_command("dsleep " + str(delay)) 791 792 def restore_dut_low_power_idle_delay(self): 793 """Restore EC low power idle delay""" 794 if getattr(self, '_previous_ec_low_power_delay', None): 795 self.ec.send_command("dsleep " + str( 796 self._previous_ec_low_power_delay)) 797 798 def get_usbdisk_path_on_dut(self): 799 """Get the path of the USB disk device plugged-in the servo on DUT. 800 801 Returns: 802 A string representing USB disk path, like '/dev/sdb', or None if 803 no USB disk is found. 804 """ 805 cmd = 'ls -d /dev/s*[a-z]' 806 original_value = self.servo.get_usbkey_state() 807 808 # Make the dut unable to see the USB disk. 809 self.servo.switch_usbkey('off') 810 time.sleep(self.faft_config.usb_unplug) 811 no_usb_set = set( 812 self.faft_client.system.run_shell_command_get_output(cmd)) 813 814 # Make the dut able to see the USB disk. 815 self.servo.switch_usbkey('dut') 816 time.sleep(self.faft_config.usb_plug) 817 has_usb_set = set( 818 self.faft_client.system.run_shell_command_get_output(cmd)) 819 820 # Back to its original value. 821 if original_value != self.servo.get_usbkey_state(): 822 self.servo.switch_usbkey(original_value) 823 824 diff_set = has_usb_set - no_usb_set 825 if len(diff_set) == 1: 826 return diff_set.pop() 827 else: 828 return None 829 830 def _create_faft_lockfile(self): 831 """Creates the FAFT lockfile.""" 832 logging.info('Creating FAFT lockfile...') 833 command = 'touch %s' % (self.lockfile) 834 self.faft_client.system.run_shell_command(command) 835 836 def _remove_faft_lockfile(self): 837 """Removes the FAFT lockfile.""" 838 logging.info('Removing FAFT lockfile...') 839 command = 'rm -f %s' % (self.lockfile) 840 self.faft_client.system.run_shell_command(command) 841 842 def clear_set_gbb_flags(self, clear_mask, set_mask, reboot=True): 843 """Clear and set the GBB flags in the current flashrom. 844 845 @param clear_mask: A mask of flags to be cleared. 846 @param set_mask: A mask of flags to be set. 847 @param reboot: If true, then this method will reboot the DUT if certain 848 flags are modified. 849 """ 850 gbb_flags = self.faft_client.bios.get_gbb_flags() 851 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 852 if new_flags == gbb_flags: 853 logging.info( 854 'Current GBB flags (0x%x) match desired clear mask ' 855 '(0x%x) and desired set mask (0x%x)', gbb_flags, 856 clear_mask, set_mask) 857 return 858 859 logging.info('Changing GBB flags from 0x%x to 0x%x.', gbb_flags, 860 new_flags) 861 if self._backup_gbb_flags is None: 862 self._backup_gbb_flags = gbb_flags 863 self.faft_client.bios.set_gbb_flags(new_flags) 864 865 # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag, 866 # reboot to get a clear state 867 if reboot and bool((gbb_flags ^ new_flags) 868 & (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON 869 | vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)): 870 self.switcher.mode_aware_reboot() 871 872 873 def _check_capability(self, target, required_cap, suppress_warning): 874 """Check if current platform has required capabilities for the target. 875 876 @param required_cap: A list containing required capabilities. 877 @param suppress_warning: True to suppress any warning messages. 878 @return: True if requirements are met. Otherwise, False. 879 """ 880 if not required_cap: 881 return True 882 883 if target not in ['ec', 'cr50']: 884 raise error.TestError('Invalid capability target %r' % target) 885 886 for cap in required_cap: 887 if cap not in getattr(self.faft_config, target + '_capability'): 888 if not suppress_warning: 889 logging.warning( 890 'Requires %s capability "%s" to run this ' 891 'test.', target, cap) 892 return False 893 894 return True 895 896 897 def check_ec_capability(self, required_cap=None, suppress_warning=False): 898 """Check if current platform has required EC capabilities. 899 900 @param required_cap: A list containing required EC capabilities. Pass in 901 None to only check for presence of Chrome EC. 902 @param suppress_warning: True to suppress any warning messages. 903 @return: True if requirements are met. Otherwise, False. 904 """ 905 if not self.faft_config.chrome_ec: 906 if not suppress_warning: 907 logging.warning('Requires Chrome EC to run this test.') 908 return False 909 return self._check_capability('ec', required_cap, suppress_warning) 910 911 912 def check_cr50_capability(self, required_cap=None, suppress_warning=False): 913 """Check if current platform has required Cr50 capabilities. 914 915 @param required_cap: A list containing required Cr50 capabilities. Pass 916 in None to only check for presence of cr50 uart. 917 @param suppress_warning: True to suppress any warning messages. 918 @return: True if requirements are met. Otherwise, False. 919 """ 920 if not hasattr(self, 'cr50'): 921 if not suppress_warning: 922 logging.warning('Requires Chrome Cr50 to run this test.') 923 return False 924 return self._check_capability('cr50', required_cap, suppress_warning) 925 926 927 def check_root_part_on_non_recovery(self, part): 928 """Check the partition number of root device and on normal/dev boot. 929 930 @param part: A string of partition number, e.g.'3'. 931 @return: True if the root device matched and on normal/dev boot; 932 otherwise, False. 933 """ 934 return self.checkers.root_part_checker(part) and \ 935 self.checkers.crossystem_checker({ 936 'mainfw_type': ('normal', 'developer'), 937 }) 938 939 def _join_part(self, dev, part): 940 """Return a concatenated string of device and partition number. 941 942 @param dev: A string of device, e.g.'/dev/sda'. 943 @param part: A string of partition number, e.g.'3'. 944 @return: A concatenated string of device and partition number, 945 e.g.'/dev/sda3'. 946 947 >>> seq = FirmwareTest() 948 >>> seq._join_part('/dev/sda', '3') 949 '/dev/sda3' 950 >>> seq._join_part('/dev/mmcblk0', '2') 951 '/dev/mmcblk0p2' 952 """ 953 if 'mmcblk' in dev: 954 return dev + 'p' + part 955 elif 'nvme' in dev: 956 return dev + 'p' + part 957 else: 958 return dev + part 959 960 def copy_kernel_and_rootfs(self, from_part, to_part): 961 """Copy kernel and rootfs from from_part to to_part. 962 963 @param from_part: A string of partition number to be copied from. 964 @param to_part: A string of partition number to be copied to. 965 """ 966 root_dev = self.faft_client.system.get_root_dev() 967 logging.info('Copying kernel from %s to %s. Please wait...', 968 from_part, to_part) 969 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 970 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 971 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 972 logging.info('Copying rootfs from %s to %s. Please wait...', 973 from_part, to_part) 974 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 975 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 976 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 977 978 def ensure_kernel_boot(self, part): 979 """Ensure the request kernel boot. 980 981 If not, it duplicates the current kernel to the requested kernel 982 and sets the requested higher priority to ensure it boot. 983 984 @param part: A string of kernel partition number or 'a'/'b'. 985 """ 986 if not self.checkers.root_part_checker(part): 987 if self.faft_client.kernel.diff_a_b(): 988 self.copy_kernel_and_rootfs( 989 from_part=self.OTHER_KERNEL_MAP[part], 990 to_part=part) 991 self.reset_and_prioritize_kernel(part) 992 self.switcher.mode_aware_reboot() 993 994 def ensure_dev_internal_boot(self, original_dev_boot_usb): 995 """Ensure internal device boot in developer mode. 996 997 If not internal device boot, it will try to reboot the device and 998 bypass dev mode to boot into internal device. 999 1000 @param original_dev_boot_usb: Original dev_boot_usb value. 1001 """ 1002 self.faft_client.system.set_dev_default_boot() 1003 if self.faft_client.system.is_removable_device_boot(): 1004 logging.info('Reboot into internal disk...') 1005 self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb) 1006 self.switcher.mode_aware_reboot() 1007 self.check_state((self.checkers.dev_boot_usb_checker, False, 1008 'Device not booted from internal disk properly.')) 1009 1010 def set_hardware_write_protect(self, enable): 1011 """Set hardware write protect pin. 1012 1013 @param enable: True if asserting write protect pin. Otherwise, False. 1014 """ 1015 self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off') 1016 1017 def set_ap_write_protect_and_reboot(self, enable): 1018 """Set AP write protect status and reboot to take effect. 1019 1020 @param enable: True if asserting write protect. Otherwise, False. 1021 """ 1022 self.set_hardware_write_protect(enable) 1023 if hasattr(self, 'ec'): 1024 self.sync_and_ec_reboot() 1025 self.switcher.wait_for_client() 1026 1027 def run_chromeos_firmwareupdate(self, mode, append=None, options=(), 1028 ignore_status=False): 1029 """Use RPC to get the command to run, but do the actual run via ssh. 1030 1031 Running the command via SSH improves the reliability in cases where the 1032 USB network connection gets interrupted. SSH will still return the 1033 output, and won't hang like RPC would. 1034 """ 1035 update_cmd = self.faft_client.updater.get_firmwareupdate_command( 1036 mode, append, options) 1037 try: 1038 result = self._client.run( 1039 update_cmd, timeout=300, ignore_status=ignore_status) 1040 if result.exit_status == 255: 1041 self.faft_client.disconnect() 1042 return result 1043 except error.AutoservRunError as e: 1044 if e.result_obj.exit_status == 255: 1045 self.faft_client.disconnect() 1046 if ignore_status: 1047 return e.result_obj 1048 raise 1049 1050 def set_ec_write_protect_and_reboot(self, enable): 1051 """Set EC write protect status and reboot to take effect. 1052 1053 The write protect state is only activated if both hardware write 1054 protect pin is asserted and software write protect flag is set. 1055 This method asserts/deasserts hardware write protect pin first, and 1056 set corresponding EC software write protect flag. 1057 1058 If the device uses non-Chrome EC, set the software write protect via 1059 flashrom. 1060 1061 If the device uses Chrome EC, a reboot is required for write protect 1062 to take effect. Since the software write protect flag cannot be unset 1063 if hardware write protect pin is asserted, we need to deasserted the 1064 pin first if we are deactivating write protect. Similarly, a reboot 1065 is required before we can modify the software flag. 1066 1067 @param enable: True if activating EC write protect. Otherwise, False. 1068 """ 1069 self.set_hardware_write_protect(enable) 1070 if self.faft_config.chrome_ec: 1071 self.set_chrome_ec_write_protect_and_reboot(enable) 1072 else: 1073 self.faft_client.ec.set_write_protect(enable) 1074 self.switcher.mode_aware_reboot() 1075 1076 def set_chrome_ec_write_protect_and_reboot(self, enable): 1077 """Set Chrome EC write protect status and reboot to take effect. 1078 1079 @param enable: True if activating EC write protect. Otherwise, False. 1080 """ 1081 if enable: 1082 # Set write protect flag and reboot to take effect. 1083 self.ec.set_flash_write_protect(enable) 1084 self.sync_and_ec_reboot( 1085 flags='hard', 1086 extra_sleep=self.faft_config.ec_boot_to_wp_en) 1087 else: 1088 # Reboot after deasserting hardware write protect pin to deactivate 1089 # write protect. And then remove software write protect flag. 1090 # Some ITE ECs can only clear their WP status on a power-on reset, 1091 # no software-initiated reset will do. 1092 self.sync_and_ec_reboot(flags='cold') 1093 self.ec.set_flash_write_protect(enable) 1094 1095 def _setup_ec_write_protect(self, ec_wp): 1096 """Setup for EC write-protection. 1097 1098 It makes sure the EC in the requested write-protection state. If not, it 1099 flips the state. Flipping the write-protection requires DUT reboot. 1100 1101 @param ec_wp: True to request EC write-protected; False to request EC 1102 not write-protected; None to do nothing. 1103 """ 1104 if ec_wp is None: 1105 return 1106 self._old_wpsw_cur = self.checkers.crossystem_checker( 1107 {'wpsw_cur': '1'}, suppress_logging=True) 1108 if ec_wp != self._old_wpsw_cur: 1109 if not self.faft_config.ap_access_ec_flash: 1110 raise error.TestNAError( 1111 "Cannot change EC write-protect for this device") 1112 1113 logging.info('The test required EC is %swrite-protected. Reboot ' 1114 'and flip the state.', '' if ec_wp else 'not ') 1115 self.switcher.mode_aware_reboot( 1116 'custom', 1117 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 1118 wpsw_cur = '1' if ec_wp else '0' 1119 self.check_state((self.checkers.crossystem_checker, { 1120 'wpsw_cur': wpsw_cur})) 1121 1122 def _restore_ec_write_protect(self): 1123 """Restore the original EC write-protection.""" 1124 if (not hasattr(self, '_old_wpsw_cur')) or (self._old_wpsw_cur is 1125 None): 1126 return 1127 if not self.checkers.crossystem_checker({'wpsw_cur': '1' if 1128 self._old_wpsw_cur else '0'}, suppress_logging=True): 1129 logging.info('Restore original EC write protection and reboot.') 1130 self.switcher.mode_aware_reboot( 1131 'custom', 1132 lambda:self.set_ec_write_protect_and_reboot( 1133 self._old_wpsw_cur)) 1134 self.check_state((self.checkers.crossystem_checker, { 1135 'wpsw_cur': '1' if self._old_wpsw_cur else '0'})) 1136 1137 def _record_uart_capture(self): 1138 """Record the CPU/EC/PD UART output stream to files.""" 1139 self.servo.record_uart_capture(self.resultsdir) 1140 1141 def _cleanup_uart_capture(self): 1142 """Cleanup the CPU/EC/PD UART capture.""" 1143 self.servo.close(self.resultsdir) 1144 1145 def set_ap_off_power_mode(self, power_mode): 1146 """ 1147 Set the DUT power mode to suspend (S0ix/S3) or shutdown (G3/S5). 1148 The DUT must be in S0 when calling this method. 1149 1150 @param power_mode: a string for the expected power mode, either 1151 'suspend' or 'shutdown'. 1152 """ 1153 if power_mode == 'suspend': 1154 target_power_state = self.POWER_STATE_SUSPEND 1155 elif power_mode == 'shutdown': 1156 target_power_state = self.POWER_STATE_G3 1157 else: 1158 raise error.TestError('%s is not a valid ap-off power mode.' % 1159 power_mode) 1160 1161 if self.get_power_state() != self.POWER_STATE_S0: 1162 raise error.TestError('The DUT is not in S0.') 1163 1164 self._restore_power_mode = True 1165 1166 if target_power_state == self.POWER_STATE_G3: 1167 self.run_shutdown_cmd() 1168 time.sleep(self.faft_config.shutdown) 1169 elif target_power_state == self.POWER_STATE_SUSPEND: 1170 self.suspend() 1171 1172 if self.wait_power_state(target_power_state, self.DEFAULT_PWR_RETRIES): 1173 logging.info('System entered %s state.', target_power_state) 1174 else: 1175 self._restore_power_mode = False 1176 raise error.TestFail('System fail to enter %s state. ' 1177 'Current state: %s' % 1178 (target_power_state, self.get_power_state())) 1179 1180 def restore_ap_on_power_mode(self): 1181 """ 1182 Wake up the DUT to S0. If the DUT was not set to suspend or 1183 shutdown mode by set_ap_off_power_mode(), raise an error. 1184 """ 1185 if self.get_power_state() != self.POWER_STATE_S0: 1186 logging.info('Wake up the DUT to S0.') 1187 self.servo.power_normal_press() 1188 # If the DUT is ping-able, it must be in S0. 1189 self.switcher.wait_for_client() 1190 if self._restore_power_mode != True: 1191 raise error.TestFail('The DUT was not set to suspend/shutdown ' 1192 'mode by set_ap_off_power_mode().') 1193 self._restore_power_mode = False 1194 1195 def get_power_state(self): 1196 """ 1197 Return the current power state of the AP (via EC 'powerinfo' command) 1198 1199 @return the name of the power state, or None if a problem occurred 1200 """ 1201 if not hasattr(self, 'ec'): 1202 # Don't fail when EC not present or not fully initialized 1203 return None 1204 1205 pattern = r'power state (\w+) = (\w+),' 1206 1207 try: 1208 match = self.ec.send_command_get_output("powerinfo", [pattern], 1209 retries=3) 1210 except (error.TestFail, expat.ExpatError) as err: 1211 logging.warning("powerinfo command encountered an error: %s", err) 1212 return None 1213 if not match: 1214 logging.warning("powerinfo output did not match pattern: %r", 1215 pattern) 1216 return None 1217 (line, state_num, state_name) = match[0] 1218 logging.debug("power state info %r", match) 1219 return state_name 1220 1221 def _check_power_state(self, expected_power_state, actual_power_state): 1222 """ 1223 Check for correct power state of the AP (via EC 'powerinfo' command) 1224 1225 @param expected_power_state: full-string regex of power state you are 1226 expecting 1227 @param actual_power_state: the power state returned from get_power_state 1228 @return: the line and the match, if the output matched. 1229 @raise error.TestFail: if output didn't match after the delay. 1230 """ 1231 if not isinstance(expected_power_state, six.string_types): 1232 raise error.TestError('%s is not a string while it should be.' % 1233 expected_power_state) 1234 if not isinstance(actual_power_state, six.string_types): 1235 raise error.TestError('%s is not a string while it should be.' % 1236 actual_power_state) 1237 if re.match('^' + expected_power_state + '$', actual_power_state): 1238 return True 1239 return False 1240 1241 def wait_power_state(self, power_state, retries, retry_delay=3): 1242 """ 1243 Wait for certain power state. 1244 1245 @param power_state: full-string regex of power state you are expecting 1246 @param retries: retries. This is necessary if AP is powering down 1247 and transitioning through different states. 1248 @param retry_delay: delay between retries in seconds 1249 """ 1250 logging.info('Checking power state "%s" maximum %d times.', 1251 power_state, retries) 1252 1253 last_power_state = '' 1254 while retries > 0: 1255 logging.debug("try count: %d", retries) 1256 start_time = time.time() 1257 try: 1258 retries = retries - 1 1259 actual_power_state = self.get_power_state() 1260 if last_power_state != actual_power_state: 1261 logging.info("power state: %s", actual_power_state) 1262 if actual_power_state is None: 1263 continue 1264 if self._check_power_state(power_state, actual_power_state): 1265 return True 1266 last_power_state = actual_power_state 1267 except (error.TestFail, expat.ExpatError): 1268 pass 1269 delay_time = retry_delay - time.time() + start_time 1270 if delay_time > 0: 1271 time.sleep(delay_time) 1272 return False 1273 1274 def run_shutdown_cmd(self, wait_for_offline=True): 1275 """Shut down the DUT by running '/sbin/shutdown -P now'.""" 1276 self.faft_client.disconnect() 1277 # Shut down in the background after sleeping so the call gets a reply. 1278 try: 1279 self._client.run_background('sleep 0.5; /sbin/shutdown -P now') 1280 except error.AutoservRunError as e: 1281 # From the ssh man page, error code 255 indicates ssh errors. 1282 if e.result_obj.exit_status == 255: 1283 logging.warning("Ignoring error from ssh: %s", e) 1284 else: 1285 raise 1286 if wait_for_offline: 1287 self.switcher.wait_for_client_offline() 1288 self._client.close_main_ssh() 1289 1290 def suspend(self): 1291 """Suspends the DUT.""" 1292 cmd = 'sleep %d; powerd_dbus_suspend' % self.EC_SUSPEND_DELAY 1293 block = False 1294 self.faft_client.system.run_shell_command(cmd, block) 1295 time.sleep(self.EC_SUSPEND_DELAY) 1296 1297 def _setup_gbb_flags(self): 1298 """Setup the GBB flags for FAFT test.""" 1299 if self.check_setup_done('gbb_flags'): 1300 return 1301 1302 logging.info('Setting proper GBB flags for test.') 1303 # Ensure that GBB flags are set to 0x140. 1304 flags_to_set = (vboot.GBB_FLAG_RUNNING_FAFT | 1305 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM) 1306 # And if the "no_ec_sync" argument is set, then disable EC software 1307 # sync. 1308 if self._no_ec_sync: 1309 logging.info( 1310 'User selected to disable EC software sync') 1311 flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC 1312 1313 # And if the "no_fw_rollback_check" argument is set, then disable fw 1314 # rollback check. 1315 if self._no_fw_rollback_check: 1316 logging.info( 1317 'User selected to disable FW rollback check') 1318 flags_to_set |= vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK 1319 1320 self.clear_set_gbb_flags(0xffffffff, flags_to_set) 1321 self.mark_setup_done('gbb_flags') 1322 1323 def _restore_gbb_flags(self): 1324 """Restore GBB flags to their original state.""" 1325 if self._backup_gbb_flags is None: 1326 return 1327 # Setting up and restoring the GBB flags take a lot of time. For 1328 # speed-up purpose, don't restore it. 1329 logging.info('***') 1330 logging.info('*** Please manually restore the original GBB flags to: ' 1331 '0x%x ***', self._backup_gbb_flags) 1332 logging.info('***') 1333 self.unmark_setup_done('gbb_flags') 1334 1335 def setup_tried_fwb(self, tried_fwb): 1336 """Setup for fw B tried state. 1337 1338 It makes sure the system in the requested fw B tried state. If not, it 1339 tries to do so. 1340 1341 @param tried_fwb: True if requested in tried_fwb=1; 1342 False if tried_fwb=0. 1343 """ 1344 if tried_fwb: 1345 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 1346 logging.info( 1347 'Firmware is not booted with tried_fwb. Reboot into it.') 1348 self.faft_client.system.set_try_fw_b() 1349 else: 1350 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 1351 logging.info( 1352 'Firmware is booted with tried_fwb. Reboot to clear.') 1353 1354 def power_on(self): 1355 """Switch DUT AC power on.""" 1356 self._client.power_on(self.power_control) 1357 1358 def power_off(self): 1359 """Switch DUT AC power off.""" 1360 self._client.power_off(self.power_control) 1361 1362 def power_cycle(self): 1363 """Power cycle DUT AC power.""" 1364 self._client.power_cycle(self.power_control) 1365 1366 def setup_rw_boot(self, section='a'): 1367 """Make sure firmware is in RW-boot mode. 1368 1369 If the given firmware section is in RO-boot mode, turn off the RO-boot 1370 flag and reboot DUT into RW-boot mode. 1371 1372 @param section: A firmware section, either 'a' or 'b'. 1373 """ 1374 flags = self.faft_client.bios.get_preamble_flags(section) 1375 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 1376 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 1377 self.faft_client.bios.set_preamble_flags(section, flags) 1378 self.switcher.mode_aware_reboot() 1379 1380 def setup_kernel(self, part): 1381 """Setup for kernel test. 1382 1383 It makes sure both kernel A and B bootable and the current boot is 1384 the requested kernel part. 1385 1386 @param part: A string of kernel partition number or 'a'/'b'. 1387 """ 1388 self.ensure_kernel_boot(part) 1389 logging.info('Checking the integrity of kernel B and rootfs B...') 1390 if (self.faft_client.kernel.diff_a_b() or 1391 not self.faft_client.rootfs.verify_rootfs('B')): 1392 logging.info('Copying kernel and rootfs from A to B...') 1393 self.copy_kernel_and_rootfs(from_part=part, 1394 to_part=self.OTHER_KERNEL_MAP[part]) 1395 self.reset_and_prioritize_kernel(part) 1396 1397 def reset_and_prioritize_kernel(self, part): 1398 """Make the requested partition highest priority. 1399 1400 This function also reset kerenl A and B to bootable. 1401 1402 @param part: A string of partition number to be prioritized. 1403 """ 1404 root_dev = self.faft_client.system.get_root_dev() 1405 # Reset kernel A and B to bootable. 1406 self.faft_client.system.run_shell_command( 1407 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 1408 self.faft_client.system.run_shell_command( 1409 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 1410 # Set kernel part highest priority. 1411 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 1412 (self.KERNEL_MAP[part], root_dev)) 1413 1414 def do_blocking_sync(self, device): 1415 """Run a blocking sync command.""" 1416 logging.debug("Blocking sync for %s", device) 1417 1418 if 'mmcblk' in device: 1419 # For mmc devices, use `mmc status get` command to send an 1420 # empty command to wait for the disk to be available again. 1421 self.faft_client.system.run_shell_command('mmc status get %s' % 1422 device) 1423 elif 'nvme' in device: 1424 # For NVMe devices, use `nvme flush` command to commit data 1425 # and metadata to non-volatile media. 1426 1427 # Get a list of NVMe namespaces, and flush them individually. 1428 # The output is assumed to be in the following format: 1429 # [ 0]:0x1 1430 # [ 1]:0x2 1431 list_ns_cmd = "nvme list-ns %s" % device 1432 available_ns = self.faft_client.system.run_shell_command_get_output( 1433 list_ns_cmd) 1434 1435 if not available_ns: 1436 raise error.TestError( 1437 "Listing namespaces failed (empty output): %s" 1438 % list_ns_cmd) 1439 1440 for ns in available_ns: 1441 ns = ns.split(':')[-1] 1442 flush_cmd = 'nvme flush %s -n %s' % (device, ns) 1443 flush_rc = self.faft_client.system.run_shell_command_get_status( 1444 flush_cmd) 1445 if flush_rc != 0: 1446 raise error.TestError( 1447 "Flushing namespace %s failed (rc=%s): %s" 1448 % (ns, flush_rc, flush_cmd)) 1449 else: 1450 # For other devices, hdparm sends TUR to check if 1451 # a device is ready for transfer operation. 1452 self.faft_client.system.run_shell_command('hdparm -f %s' % device) 1453 1454 def blocking_sync(self, freeze_for_reset=False): 1455 """Sync root device and internal device, via script if possible. 1456 1457 The actual calls end up logged by the run() call, since they're printed 1458 to stdout/stderr in the script. 1459 1460 @param freeze_for_reset: if True, prepare for reset by blocking writes 1461 (only if enable_fs_sync_fsfreeze=True) 1462 """ 1463 1464 if self._use_sync_script: 1465 if freeze_for_reset: 1466 self.faft_client.quit() 1467 try: 1468 return self._client.blocking_sync(freeze_for_reset) 1469 except (AttributeError, ImportError, error.AutoservRunError) as e: 1470 logging.warning( 1471 'Falling back to old sync method due to error: %s', e) 1472 1473 # The double calls to sync fakes a blocking call 1474 # since the first call returns before the flush 1475 # is complete, but the second will wait for the 1476 # first to finish. 1477 self.faft_client.system.run_shell_command('sync') 1478 self.faft_client.system.run_shell_command('sync') 1479 1480 # sync only sends SYNCHRONIZE_CACHE but doesn't check the status. 1481 # This function will perform a device-specific sync command. 1482 root_dev = self.faft_client.system.get_root_dev() 1483 self.do_blocking_sync(root_dev) 1484 1485 # Also sync the internal device if booted from removable media. 1486 if self.faft_client.system.is_removable_device_boot(): 1487 internal_dev = self.faft_client.system.get_internal_device() 1488 self.do_blocking_sync(internal_dev) 1489 1490 def sync_and_ec_reboot(self, flags='', extra_sleep=0): 1491 """Request the client sync and do a EC triggered reboot. 1492 1493 @param flags: Optional, a space-separated string of flags passed to EC 1494 reboot command, including: 1495 default: EC soft reboot; 1496 'hard': EC hard reboot. 1497 'cold': Cold reboot via servo. 1498 @param extra_sleep: Optional, int or float for extra wait time for EC 1499 reboot in seconds. 1500 """ 1501 self.blocking_sync(freeze_for_reset=True) 1502 if flags == 'cold': 1503 self.servo.get_power_state_controller().reset() 1504 else: 1505 self.ec.reboot(flags) 1506 time.sleep(self.faft_config.ec_boot_to_console + extra_sleep) 1507 self.check_lid_and_power_on() 1508 1509 def reboot_and_reset_tpm(self): 1510 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 1511 self.switcher.reboot_to_mode(to_mode='rec') 1512 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 1513 self.switcher.mode_aware_reboot() 1514 1515 def full_power_off_and_on(self): 1516 """Shutdown the device by pressing power button and power on again.""" 1517 boot_id = self.get_bootid() 1518 self.faft_client.disconnect() 1519 1520 # Press power button to trigger ChromeOS normal shutdown process. 1521 # We use a customized delay since the normal-press 1.2s is not enough. 1522 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 1523 # device can take 44-51 seconds to restart, 1524 # add buffer from the default timeout of 60 seconds. 1525 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 1526 time.sleep(self.faft_config.shutdown) 1527 if self.faft_config.chrome_ec: 1528 self.check_shutdown_power_state(self.POWER_STATE_G3, 1529 orig_boot_id=boot_id) 1530 # Short press power button to boot DUT again. 1531 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1532 1533 def check_shutdown_power_state(self, power_state, 1534 pwr_retries=DEFAULT_PWR_RETRIES, 1535 orig_boot_id=None): 1536 """Check whether the device shut down and entered the given power state. 1537 1538 If orig_boot_id is specified, it will check whether the DUT responds to 1539 ssh requests, then use orig_boot_id to check if it rebooted. 1540 1541 @param power_state: EC power state has to be checked. Either S5 or G3. 1542 @param pwr_retries: Times to check if the DUT in expected power state. 1543 @param orig_boot_id: Old boot_id, to check for unexpected reboots. 1544 @raise TestFail: If device failed to enter into requested power state. 1545 """ 1546 if not self.wait_power_state(power_state, pwr_retries): 1547 current_state = self.get_power_state() 1548 if current_state == self.POWER_STATE_S0 and self._client.wait_up(): 1549 # DUT is unexpectedly up, so check whether it rebooted instead. 1550 new_boot_id = self.get_bootid() 1551 logging.debug('orig_boot_id=%s, new_boot_id=%s', 1552 orig_boot_id, new_boot_id) 1553 if orig_boot_id is None or new_boot_id is None: 1554 # Can't say anything more specific without values to compare 1555 raise error.TestFail( 1556 "Expected state %s, but the system is unexpectedly" 1557 " still up. Current state: %s" 1558 % (power_state, current_state)) 1559 if new_boot_id == orig_boot_id: 1560 raise error.TestFail( 1561 "Expected state %s, but the system didn't shut" 1562 " down. Current state: %s" 1563 % (power_state, current_state)) 1564 else: 1565 raise error.TestFail( 1566 "Expected state %s, but the system rebooted instead" 1567 " of shutting down. Current state: %s" 1568 % (power_state, current_state)) 1569 1570 if current_state is None: 1571 current_state = '(unknown)' 1572 1573 if current_state == power_state: 1574 raise error.TestFail( 1575 "Expected state %s, but the system didn't reach it" 1576 " until after the limit of %s tries." 1577 % (power_state, pwr_retries)) 1578 1579 raise error.TestFail('System not shutdown properly and EC fails' 1580 ' to enter into %s state. Current state: %s' 1581 % (power_state, current_state)) 1582 logging.info('System entered into %s state..', power_state) 1583 1584 def check_lid_and_power_on(self): 1585 """ 1586 On devices with EC software sync, system powers on after EC reboots if 1587 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 1588 This method checks lid switch state and presses power button if 1589 necessary. 1590 """ 1591 if self.servo.get("lid_open") == "no": 1592 time.sleep(self.faft_config.software_sync) 1593 self.servo.power_short_press() 1594 1595 def stop_powerd(self): 1596 """Stop the powerd daemon on the AP. 1597 1598 This will cause the AP to ignore power button presses sent by the EC. 1599 """ 1600 powerd_running = self.faft_client.system.run_shell_command_check_output( 1601 'status powerd', 'start/running') 1602 if powerd_running: 1603 logging.debug('Stopping powerd') 1604 self.faft_client.system.run_shell_command("stop powerd") 1605 1606 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 1607 """Modify the kernel header magic in USB stick. 1608 1609 The kernel header magic is the first 8-byte of kernel partition. 1610 We modify it to make it fail on kernel verification check. 1611 1612 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1613 @param from_magic: A string of magic which we change it from. 1614 @param to_magic: A string of magic which we change it to. 1615 @raise TestError: if failed to change magic. 1616 """ 1617 assert len(from_magic) == 8 1618 assert len(to_magic) == 8 1619 # USB image only contains one kernel. 1620 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1621 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1622 current_magic = self.servo.system_output(read_cmd) 1623 if current_magic == to_magic: 1624 logging.info("The kernel magic is already %s.", current_magic) 1625 return 1626 if current_magic != from_magic: 1627 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1628 1629 logging.info('Modify the kernel magic in USB, from %s to %s.', 1630 from_magic, to_magic) 1631 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1632 " 2>/dev/null" % (to_magic, kernel_part)) 1633 self.servo.system(write_cmd) 1634 1635 if self.servo.system_output(read_cmd) != to_magic: 1636 raise error.TestError("Failed to write new magic.") 1637 1638 def corrupt_usb_kernel(self, usb_dev): 1639 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1640 1641 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1642 """ 1643 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1644 self.CORRUPTED_MAGIC) 1645 1646 def restore_usb_kernel(self, usb_dev): 1647 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1648 1649 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1650 """ 1651 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1652 self.CHROMEOS_MAGIC) 1653 1654 def _call_action(self, action_tuple, check_status=False): 1655 """Call the action function with/without arguments. 1656 1657 @param action_tuple: A function, or a tuple (function, args, error_msg), 1658 in which, args and error_msg are optional. args is 1659 either a value or a tuple if multiple arguments. 1660 This can also be a list containing multiple 1661 function or tuple. In this case, these actions are 1662 called in sequence. 1663 @param check_status: Check the return value of action function. If not 1664 succeed, raises a TestFail exception. 1665 @return: The result value of the action function. 1666 @raise TestError: An error when the action function is not callable. 1667 @raise TestFail: When check_status=True, action function not succeed. 1668 """ 1669 if isinstance(action_tuple, list): 1670 return all([self._call_action(action, check_status=check_status) 1671 for action in action_tuple]) 1672 1673 action = action_tuple 1674 args = () 1675 error_msg = 'Not succeed' 1676 if isinstance(action_tuple, tuple): 1677 action = action_tuple[0] 1678 if len(action_tuple) >= 2: 1679 args = action_tuple[1] 1680 if not isinstance(args, tuple): 1681 args = (args,) 1682 if len(action_tuple) >= 3: 1683 error_msg = action_tuple[2] 1684 1685 if action is None: 1686 return 1687 1688 if not callable(action): 1689 raise error.TestError('action is not callable!') 1690 1691 info_msg = 'calling %s' % action.__name__ 1692 if args: 1693 info_msg += ' with args %s' % str(args) 1694 logging.info(info_msg) 1695 ret = action(*args) 1696 1697 if check_status and not ret: 1698 raise error.TestFail('%s: %s returning %s' % 1699 (error_msg, info_msg, str(ret))) 1700 return ret 1701 1702 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1703 run_power_action=True, post_power_action=None, 1704 shutdown_timeout=None): 1705 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1706 1707 @param shutdown_action: function which makes DUT shutdown, like 1708 pressing power key. 1709 @param pre_power_action: function which is called before next power on. 1710 @param run_power_action: power_key press by default, set to None to skip. 1711 @param post_power_action: function which is called after next power on. 1712 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1713 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1714 """ 1715 self._call_action(shutdown_action) 1716 logging.info('Wait to ensure DUT shut down...') 1717 try: 1718 if shutdown_timeout is None: 1719 shutdown_timeout = self.faft_config.shutdown_timeout 1720 self.switcher.wait_for_client(timeout=shutdown_timeout) 1721 raise error.TestFail( 1722 'Should shut the device down after calling %s.' % 1723 shutdown_action.__name__) 1724 except ConnectionError: 1725 if self.faft_config.chrome_ec: 1726 self.check_shutdown_power_state(self.POWER_STATE_G3) 1727 logging.info( 1728 'DUT is surely shutdown. We are going to power it on again...') 1729 1730 if pre_power_action: 1731 self._call_action(pre_power_action) 1732 if run_power_action: 1733 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1734 if post_power_action: 1735 self._call_action(post_power_action) 1736 1737 def get_bootid(self, retry=3): 1738 """ 1739 Return the bootid. 1740 """ 1741 boot_id = None 1742 while retry: 1743 try: 1744 boot_id = self._client.get_boot_id() 1745 break 1746 except error.AutoservRunError: 1747 retry -= 1 1748 if retry: 1749 logging.debug('Retry to get boot_id...') 1750 else: 1751 logging.warning('Failed to get boot_id.') 1752 logging.debug('boot_id: %s', boot_id) 1753 return boot_id 1754 1755 def check_state(self, func): 1756 """ 1757 Wrapper around _call_action with check_status set to True. This is a 1758 helper function to be used by tests and is currently implemented by 1759 calling _call_action with check_status=True. 1760 1761 TODO: This function's arguments need to be made more stringent. And 1762 its functionality should be moved over to check functions directly in 1763 the future. 1764 1765 @param func: A function, or a tuple (function, args, error_msg), 1766 in which, args and error_msg are optional. args is 1767 either a value or a tuple if multiple arguments. 1768 This can also be a list containing multiple 1769 function or tuple. In this case, these actions are 1770 called in sequence. 1771 @return: The result value of the action function. 1772 @raise TestFail: If the function does notsucceed. 1773 """ 1774 self._call_action(func, check_status=True) 1775 1776 def get_current_firmware_identity(self): 1777 """Get current firmware sha and fwids of body and vblock. 1778 1779 @return: Current firmware checksums and fwids, as a dict 1780 """ 1781 1782 current_checksums = { 1783 'VBOOTA': self.faft_client.bios.get_sig_sha('a'), 1784 'FVMAINA': self.faft_client.bios.get_body_sha('a'), 1785 'VBOOTB': self.faft_client.bios.get_sig_sha('b'), 1786 'FVMAINB': self.faft_client.bios.get_body_sha('b'), 1787 } 1788 if not all(current_checksums.values()): 1789 raise error.TestError( 1790 'Failed to get firmware sha: %s', current_checksums) 1791 1792 current_fwids = { 1793 'RO_FRID': self.faft_client.bios.get_section_fwid('ro'), 1794 'RW_FWID_A': self.faft_client.bios.get_section_fwid('a'), 1795 'RW_FWID_B': self.faft_client.bios.get_section_fwid('b'), 1796 } 1797 if not all(current_fwids.values()): 1798 raise error.TestError( 1799 'Failed to get firmware fwid(s): %s', current_fwids) 1800 1801 identifying_info = dict(current_fwids) 1802 identifying_info.update(current_checksums) 1803 return identifying_info 1804 1805 def is_firmware_changed(self): 1806 """Check if the current firmware changed, by comparing its SHA and fwid. 1807 1808 @return: True if it is changed, otherwise False. 1809 """ 1810 # Device may not be rebooted after test. 1811 self.faft_client.bios.reload() 1812 1813 current_info = self.get_current_firmware_identity() 1814 prev_info = self._backup_firmware_identity 1815 1816 if current_info == prev_info: 1817 return False 1818 else: 1819 changed = set() 1820 for section in set(current_info.keys()) | set(prev_info.keys()): 1821 if current_info.get(section) != prev_info.get(section): 1822 changed.add(section) 1823 1824 logging.info('Firmware changed: %s', ', '.join(sorted(changed))) 1825 return True 1826 1827 def backup_firmware(self, suffix='.original'): 1828 """Backup firmware to file, and then send it to host. 1829 1830 @param suffix: a string appended to backup file name 1831 """ 1832 remote_temp_dir = self.faft_client.system.create_temp_dir() 1833 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1834 self.faft_client.bios.dump_whole(remote_bios_path) 1835 self._client.get_file(remote_bios_path, 1836 os.path.join(self.resultsdir, 'bios' + suffix)) 1837 1838 if self.faft_config.chrome_ec: 1839 remote_ec_path = os.path.join(remote_temp_dir, 'ec') 1840 self.faft_client.ec.dump_whole(remote_ec_path) 1841 self._client.get_file(remote_ec_path, 1842 os.path.join(self.resultsdir, 'ec' + suffix)) 1843 1844 self._client.run('rm -rf %s' % remote_temp_dir) 1845 logging.info('Backup firmware stored in %s with suffix %s', 1846 self.resultsdir, suffix) 1847 1848 self._backup_firmware_identity = self.get_current_firmware_identity() 1849 1850 def is_firmware_saved(self): 1851 """Check if a firmware saved (called backup_firmware before). 1852 1853 @return: True if the firmware is backed up; otherwise False. 1854 """ 1855 return bool(self._backup_firmware_identity) 1856 1857 def restore_firmware(self, suffix='.original', restore_ec=True, 1858 reboot_ec=False): 1859 """Restore firmware from host in resultsdir. 1860 1861 @param suffix: a string appended to backup file name 1862 @param restore_ec: True to restore the ec firmware; False not to do. 1863 @param reboot_ec: True to reboot EC after restore (if it was restored) 1864 @return: True if firmware needed to be restored 1865 """ 1866 if not self.is_firmware_changed(): 1867 return False 1868 1869 # Backup current corrupted firmware. 1870 self.backup_firmware(suffix='.corrupt') 1871 1872 # Restore firmware. 1873 remote_temp_dir = self.faft_client.system.create_temp_dir() 1874 1875 bios_local = os.path.join(self.resultsdir, 'bios%s' % suffix) 1876 bios_remote = os.path.join(remote_temp_dir, 'bios%s' % suffix) 1877 self._client.send_file(bios_local, bios_remote) 1878 self.faft_client.bios.write_whole(bios_remote) 1879 1880 if self.faft_config.chrome_ec and restore_ec: 1881 ec_local = os.path.join(self.resultsdir, 'ec%s' % suffix) 1882 ec_remote = os.path.join(remote_temp_dir, 'ec%s' % suffix) 1883 self._client.send_file(ec_local, ec_remote) 1884 ec_cmd = self.faft_client.ec.get_write_cmd(ec_remote) 1885 try: 1886 self._client.run(ec_cmd, timeout=300) 1887 except error.AutoservSSHTimeout: 1888 logging.warning("DUT connection died during EC restore") 1889 self.faft_client.disconnect() 1890 1891 except error.GenericHostRunError: 1892 logging.warning("DUT command failed during EC restore") 1893 logging.debug("Full exception:", exc_info=True) 1894 if reboot_ec: 1895 self.switcher.mode_aware_reboot( 1896 'custom', lambda: self.sync_and_ec_reboot('hard')) 1897 else: 1898 self.switcher.mode_aware_reboot() 1899 else: 1900 self.switcher.mode_aware_reboot() 1901 logging.info('Successfully restored firmware.') 1902 return True 1903 1904 def setup_firmwareupdate_shellball(self, shellball=None): 1905 """Setup a shellball to use in firmware update test. 1906 1907 Check if there is a given shellball, and it is a shell script. Then, 1908 send it to the remote host. Otherwise, use the 1909 /usr/sbin/chromeos-firmwareupdate in the image and replace its inside 1910 BIOS and EC images with the active firmware images. 1911 1912 @param shellball: path of a shellball or default to None. 1913 """ 1914 if shellball: 1915 # Determine the firmware file is a shellball or a raw binary. 1916 is_shellball = (utils.system_output("file %s" % shellball).find( 1917 "shell script") != -1) 1918 if is_shellball: 1919 logging.info('Device will update firmware with shellball %s', 1920 shellball) 1921 temp_path = self.faft_client.updater.get_temp_path() 1922 working_shellball = os.path.join(temp_path, 1923 'chromeos-firmwareupdate') 1924 self._client.send_file(shellball, working_shellball) 1925 self.faft_client.updater.extract_shellball() 1926 else: 1927 raise error.TestFail( 1928 'The given shellball is not a shell script.') 1929 else: 1930 logging.info('No shellball given, use the original shellball and ' 1931 'replace its BIOS and EC images.') 1932 work_path = self.faft_client.updater.get_work_path() 1933 bios_in_work_path = os.path.join( 1934 work_path, self.faft_client.updater.get_bios_relative_path()) 1935 ec_in_work_path = os.path.join( 1936 work_path, self.faft_client.updater.get_ec_relative_path()) 1937 logging.info('Writing current BIOS to: %s', bios_in_work_path) 1938 self.faft_client.bios.dump_whole(bios_in_work_path) 1939 if self.faft_config.chrome_ec: 1940 logging.info('Writing current EC to: %s', ec_in_work_path) 1941 self.faft_client.ec.dump_firmware(ec_in_work_path) 1942 self.faft_client.updater.repack_shellball() 1943 1944 def is_kernel_changed(self, kernel_type='KERN'): 1945 """Check if the current kernel is changed, by comparing its SHA1 hash. 1946 1947 @param kernel_type: The type name of kernel ('KERN' or 'MINIOS'). 1948 @return: True if it is changed; otherwise, False. 1949 """ 1950 changed = False 1951 for p in ('A', 'B'): 1952 backup_sha = self._backup_kernel_sha[kernel_type].get(p, None) 1953 current_sha = self.kernel_servicer[kernel_type].get_sha(p) 1954 if backup_sha != current_sha: 1955 changed = True 1956 logging.info('Kernel %s-%s is changed', kernel_type, p) 1957 return changed 1958 1959 def backup_kernel(self, suffix='.original', kernel_type='KERN'): 1960 """Backup kernel to files, and the send them to host. 1961 1962 @param kernel_type: The type name of kernel ('KERN' or 'MINIOS'). 1963 @param suffix: a string appended to backup file name. 1964 """ 1965 kernel_name = self.KERNEL_TYPE_NAME_MAP[kernel_type] 1966 servicer = self.kernel_servicer[kernel_type] 1967 remote_temp_dir = self.faft_client.system.create_temp_dir() 1968 for p in ('A', 'B'): 1969 remote_path = os.path.join(remote_temp_dir, 1970 '%s_%s' % (kernel_name, p)) 1971 servicer.dump(p, remote_path) 1972 self._client.get_file( 1973 remote_path, 1974 os.path.join(self.resultsdir, 1975 '%s_%s%s' % (kernel_name, p, suffix))) 1976 self._backup_kernel_sha[kernel_type][p] = servicer.get_sha(p) 1977 logging.info('Backup %s stored in %s with suffix %s', kernel_name, 1978 self.resultsdir, suffix) 1979 1980 def is_kernel_saved(self, kernel_type='KERN'): 1981 """Check if kernel images are saved (backup_kernel called before). 1982 1983 @param kernel_type: The type name of kernel ('KERN' or 'MINIOS'). 1984 @return: True if the kernel is saved; otherwise, False. 1985 """ 1986 return len(self._backup_kernel_sha[kernel_type]) != 0 1987 1988 def restore_kernel(self, suffix='.original', kernel_type='KERN'): 1989 """Restore kernel from host in resultsdir. 1990 1991 @param kernel_type: The type name of kernel ('KERN' or 'MINIOS'). 1992 @param suffix: a string appended to backup file name. 1993 @return: True if kernel needed to be restored 1994 """ 1995 if not self.is_kernel_changed(kernel_type): 1996 return False 1997 1998 # Backup current corrupted kernel. 1999 self.backup_kernel(suffix='.corrupt', kernel_type=kernel_type) 2000 2001 # Restore kernel. 2002 kernel_name = self.KERNEL_TYPE_NAME_MAP[kernel_type] 2003 servicer = self.kernel_servicer[kernel_type] 2004 remote_temp_dir = self.faft_client.system.create_temp_dir() 2005 for p in ('A', 'B'): 2006 remote_path = os.path.join(remote_temp_dir, 2007 '%s_%s' % (kernel_name, p)) 2008 servicer.dump(p, remote_path) 2009 self._client.send_file( 2010 os.path.join(self.resultsdir, 2011 '%s_%s%s' % (kernel_name, p, suffix)), 2012 remote_path) 2013 servicer.write(p, remote_path) 2014 2015 self.switcher.mode_aware_reboot() 2016 logging.info('Successfully restored %s.', kernel_type) 2017 return True 2018 2019 def backup_cgpt_attributes(self): 2020 """Backup CGPT partition table attributes.""" 2021 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 2022 2023 def restore_cgpt_attributes(self): 2024 """Restore CGPT partition table attributes.""" 2025 current_table = self.faft_client.cgpt.get_attributes() 2026 if current_table == self._backup_cgpt_attr: 2027 return 2028 logging.info('CGPT table is changed. Original: %r. Current: %r.', 2029 self._backup_cgpt_attr, 2030 current_table) 2031 self.faft_client.cgpt.set_attributes( 2032 self._backup_cgpt_attr['A'], self._backup_cgpt_attr['B']) 2033 2034 self.switcher.mode_aware_reboot() 2035 logging.info('Successfully restored CGPT table.') 2036 2037 def try_fwb(self, count=0): 2038 """set to try booting FWB count # times 2039 2040 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 2041 vboot2 2042 2043 @param count: an integer specifying value to program into 2044 fwb_tries(vb1)/fw_try_next(vb2) 2045 """ 2046 if self.fw_vboot2: 2047 self.faft_client.system.set_fw_try_next('B', count) 2048 else: 2049 # vboot1: we need to boot into fwb at least once 2050 if not count: 2051 count = count + 1 2052 self.faft_client.system.set_try_fw_b(count) 2053 2054 def identify_shellball(self, include_ec=None): 2055 """Get the FWIDs of all targets and sections in the shellball 2056 2057 @param include_ec: if True, get EC fwids. 2058 If None (default), assume True if board has an EC 2059 @return: the dict of versions in the shellball 2060 """ 2061 fwids = dict() 2062 fwids['bios'] = self.faft_client.updater.get_image_fwids('bios') 2063 2064 if include_ec is None: 2065 if self.faft_config.platform.lower() == 'samus': 2066 include_ec = False # no ec.bin in shellball 2067 else: 2068 include_ec = self.faft_config.chrome_ec 2069 2070 if include_ec: 2071 fwids['ec'] = self.faft_client.updater.get_image_fwids('ec') 2072 return fwids 2073 2074 def modify_shellball(self, append, modify_ro=True, modify_ec=False): 2075 """Modify the FWIDs of targets and sections in the shellball 2076 2077 @return: the full path of the shellball 2078 """ 2079 2080 if modify_ro: 2081 self.faft_client.updater.modify_image_fwids( 2082 'bios', ['ro', 'a', 'b']) 2083 else: 2084 self.faft_client.updater.modify_image_fwids( 2085 'bios', ['a', 'b']) 2086 2087 if modify_ec: 2088 if modify_ro: 2089 self.faft_client.updater.modify_image_fwids( 2090 'ec', ['ro', 'rw']) 2091 else: 2092 self.faft_client.updater.modify_image_fwids( 2093 'ec', ['rw']) 2094 2095 modded_shellball = self.faft_client.updater.repack_shellball(append) 2096 2097 return modded_shellball 2098 2099 @staticmethod 2100 def check_fwids_written(before_fwids, image_fwids, after_fwids, 2101 expected_written): 2102 """Check the dicts of fwids for correctness after an update is applied. 2103 2104 The targets checked come from the keys of expected_written. 2105 The sections checked come from the inner dicts of the fwids parameters. 2106 2107 The fwids should be keyed by target (flash type), then by section: 2108 {'bios': {'ro': '<fwid>', 'a': '<fwid>', 'b': '<fwid>'}, 2109 'ec': {'ro': '<fwid>', 'rw': '<fwid>'} 2110 2111 For expected_written, the dict should be keyed by flash type only: 2112 {'bios': ['ro'], 'ec': ['ro', 'rw']} 2113 To expect the contents completely unchanged, give only the keys: 2114 {'bios': [], 'ec': []} or {'bios': None, 'ec': None} 2115 2116 @param before_fwids: dict of versions from before the update 2117 @param image_fwids: dict of versions in the update 2118 @param after_fwids: dict of actual versions after the update 2119 @param expected_written: dict indicating which ones should have changed 2120 @return: list of error lines for mismatches 2121 2122 @type before_fwids: dict 2123 @type image_fwids: dict | None 2124 @type after_fwids: dict 2125 @type expected_written: dict 2126 @rtype: list 2127 """ 2128 errors = [] 2129 2130 if image_fwids is None: 2131 image_fwids = {} 2132 2133 for target in sorted(expected_written.keys()): 2134 # target is BIOS or EC 2135 2136 before_missing = (target not in before_fwids) 2137 after_missing = (target not in after_fwids) 2138 if before_missing or after_missing: 2139 if before_missing: 2140 errors.append("...no before_fwids[%s]" % target) 2141 if after_missing: 2142 errors.append("...no after_fwids[%s]" % target) 2143 continue 2144 2145 written_sections = expected_written.get(target) or list() 2146 written_sections = set(written_sections) 2147 2148 before_sections = set(before_fwids.get(target) or dict()) 2149 image_sections = set(image_fwids.get(target) or dict()) 2150 after_sections = set(after_fwids.get(target) or dict()) 2151 2152 for section in before_sections | image_sections | after_sections: 2153 # section is RO, RW, A, or B 2154 2155 before_fwid = before_fwids[target][section] 2156 image_fwid = image_fwids.get(target, {}).get(section, None) 2157 actual_fwid = after_fwids[target][section] 2158 2159 if section in written_sections: 2160 expected_fwid = image_fwid 2161 expected_desc = 'rewritten fwid (%s)' % expected_fwid 2162 if image_fwid == before_fwid: 2163 expected_desc = ('rewritten (no changes) fwid (%s)' % 2164 expected_fwid) 2165 else: 2166 expected_fwid = before_fwid 2167 expected_desc = 'original fwid (%s)' % expected_fwid 2168 2169 if actual_fwid == expected_fwid: 2170 actual_desc = 'correct value' 2171 2172 elif actual_fwid == image_fwid: 2173 actual_desc = 'rewritten fwid (%s)' % actual_fwid 2174 if image_fwid == before_fwid: 2175 # The flash could have been rewritten with the same fwid 2176 actual_desc = 'possibly written fwid (%s)' % actual_fwid 2177 2178 elif actual_fwid == before_fwid: 2179 actual_desc = 'original fwid (%s)' % actual_fwid 2180 2181 else: 2182 actual_desc = 'unknown fwid (%s)' % actual_fwid 2183 2184 msg = ("...FWID (%s %s): expected %s, got %s" % 2185 (target.upper(), section.upper(), 2186 expected_desc, actual_desc)) 2187 2188 if actual_fwid != expected_fwid: 2189 errors.append(msg) 2190 return errors 2191 2192 2193 def fwmp_is_cleared(self): 2194 """Return True if the FWMP has been created""" 2195 res = self.host.run('cryptohome ' 2196 '--action=get_firmware_management_parameters', 2197 ignore_status=True) 2198 if res.exit_status and res.exit_status != self.FWMP_CLEARED_EXIT_STATUS: 2199 raise error.TestError('Could not run cryptohome command %r' % res) 2200 return (self.FWMP_CLEARED_ERROR_MSG in res.stdout 2201 or tpm_utils.FwmpIsAllZero(res.stdout)) 2202 2203 2204 def _tpm_is_owned(self): 2205 """Returns True if the tpm is owned""" 2206 result = self.host.run('tpm_manager_client status --nonsensitive', 2207 ignore_status=True) 2208 logging.debug(result) 2209 return result.exit_status == 0 and 'is_owned: true' in result.stdout 2210 2211 def clear_fwmp(self): 2212 """Clear the FWMP""" 2213 if self.fwmp_is_cleared(): 2214 return 2215 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 2216 self.host.run('tpm_manager_client take_ownership') 2217 if not utils.wait_for_value(self._tpm_is_owned, expected_value=True): 2218 raise error.TestError('Unable to own tpm while clearing fwmp.') 2219 self.host.run('cryptohome ' 2220 '--action=remove_firmware_management_parameters') 2221 2222 def wait_for(self, cfg_field, action_msg=None, extra_time=0): 2223 """Waits for time specified in a config. 2224 2225 @ivar cfg_field: The name of the config field that specifies the 2226 time to wait. 2227 @ivar action_msg: Optional log message describing the action that 2228 will occur after the wait. 2229 @ivar extra_time: Additional time to be added to time from config. 2230 """ 2231 wait_time = self.faft_config.__getattr__(cfg_field) + extra_time 2232 if extra_time: 2233 wait_src = "%s + %s" % (cfg_field, extra_time) 2234 else: 2235 wait_src = cfg_field 2236 2237 units = 'second' if wait_time==1 else 'seconds' 2238 start_msg = "Waiting %s(%s) %s" % (wait_time, wait_src, units) 2239 if action_msg: 2240 start_msg += ", before '%s'" % action_msg 2241 start_msg += "." 2242 2243 logging.info(start_msg) 2244 time.sleep(wait_time) 2245 logging.info("Done waiting.") 2246 2247 def _try_to_bring_dut_up(self): 2248 """Try to quickly get the dut in a pingable state""" 2249 if not hasattr(self, 'cr50'): 2250 raise error.TestNAError('Test can only be run on devices with ' 2251 'access to the Cr50 console') 2252 logging.info('Bringing DUT up') 2253 2254 self.servo.set_nocheck('cold_reset', 'off') 2255 self.servo.set_nocheck('warm_reset', 'off') 2256 2257 time.sleep(self.cr50.SHORT_WAIT) 2258 if not self.cr50.ap_is_on(): 2259 logging.info('Pressing power button to turn on AP') 2260 self.servo.power_short_press() 2261 2262 end_time = time.time() + self.RESPONSE_TIMEOUT 2263 while not self.host.ping_wait_up( 2264 self.faft_config.delay_reboot_to_ping * 2): 2265 if time.time() > end_time: 2266 logging.warning( 2267 'DUT is unresponsive after trying to bring it up') 2268 return 2269 self.servo.get_power_state_controller().reset() 2270 logging.info('DUT did not respond. Resetting it.') 2271 2272 def _check_open_and_press_power_button(self): 2273 """Check stdout and press the power button if prompted. 2274 2275 Returns: 2276 True if the process is still running. 2277 """ 2278 if not hasattr(self, 'cr50'): 2279 raise error.TestNAError('Test can only be run on devices with ' 2280 'access to the Cr50 console') 2281 2282 logging.info(self._get_ccd_open_output()) 2283 self.servo.power_short_press() 2284 logging.info('long int power button press') 2285 # power button press cr50 erases nvmem and resets the dut before setting 2286 # the state to open. Wait a bit so we don't check the ccd state in the 2287 # middle of this reset process. Power button requests happen once a 2288 # minute, so waiting 10 seconds isn't a big deal. 2289 time.sleep(10) 2290 return (self.cr50.OPEN == self.cr50.get_ccd_level() or 2291 self._ccd_open_job.sp.poll() is not None) 2292 2293 def _get_ccd_open_output(self): 2294 """Read the new output.""" 2295 if not hasattr(self, 'cr50'): 2296 raise error.TestNAError('Test can only be run on devices with ' 2297 'access to the Cr50 console') 2298 2299 self._ccd_open_job.process_output() 2300 output = self._ccd_open_stdout.getvalue() 2301 self._ccd_open_stdout.seek(self._ccd_open_last_len) 2302 self._ccd_open_last_len = len(output) 2303 return self._ccd_open_stdout.read().strip() 2304 2305 def _close_ccd_open_job(self): 2306 """Terminate the process and check the results.""" 2307 if not hasattr(self, 'cr50'): 2308 raise error.TestNAError('Test can only be run on devices with ' 2309 'access to the Cr50 console') 2310 2311 exit_status = utils.nuke_subprocess(self._ccd_open_job.sp) 2312 stdout = self._ccd_open_stdout.getvalue().strip() 2313 delattr(self, '_ccd_open_job') 2314 if stdout: 2315 logging.info('stdout of ccd open:\n%s', stdout) 2316 if exit_status: 2317 logging.info('exit status: %d', exit_status) 2318 if 'Error' in stdout: 2319 raise error.TestFail('ccd open Error %s' % 2320 stdout.split('Error')[-1]) 2321 if self.cr50.OPEN != self.cr50.get_ccd_level(): 2322 raise error.TestFail('unable to open cr50: %s' % stdout) 2323 else: 2324 logging.info('Opened Cr50') 2325 2326 def ccd_open_from_ap(self): 2327 """Start the open process and press the power button.""" 2328 if not hasattr(self, 'cr50'): 2329 raise error.TestNAError('Test can only be run on devices with ' 2330 'access to the Cr50 console') 2331 2332 # Opening CCD requires power button presses. If those presses would 2333 # power off the AP and prevent CCD open from completing, ignore them. 2334 if self.faft_config.ec_forwards_short_pp_press: 2335 self.stop_powerd() 2336 2337 # Make sure the test waits long enough to avoid ccd rate limiting. 2338 time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT) 2339 2340 self._ccd_open_last_len = 0 2341 2342 self._ccd_open_stdout = six.StringIO() 2343 2344 ccd_open_cmd = utils.sh_escape('gsctool -a -o') 2345 full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'), 2346 ccd_open_cmd) 2347 # Start running the Cr50 Open process in the background. 2348 self._ccd_open_job = utils.BgJob(full_ssh_cmd, 2349 nickname='ccd_open', 2350 stdout_tee=self._ccd_open_stdout, 2351 stderr_tee=utils.TEE_TO_LOGS) 2352 if self._ccd_open_job == None: 2353 raise error.TestFail('could not start ccd open') 2354 2355 try: 2356 # Wait for the first gsctool power button prompt before starting the 2357 # open process. 2358 logging.info(self._get_ccd_open_output()) 2359 # Cr50 starts out by requesting 5 quick presses then 4 longer 2360 # power button presses. Run the quick presses without looking at the 2361 # command output, because getting the output can take some time. For 2362 # the presses that require a 1 minute wait check the output between 2363 # presses, so we can catch errors 2364 # 2365 # run quick presses for 30 seconds. It may take a couple of seconds 2366 # for open to start. 10 seconds should be enough. 30 is just used 2367 # because it will definitely be enough, and this process takes 300 2368 # seconds, so doing quick presses for 30 seconds won't matter. 2369 end_time = time.time() + 30 2370 while time.time() < end_time: 2371 self.servo.power_short_press() 2372 logging.info('short int power button press') 2373 time.sleep(self.PP_SHORT_INTERVAL) 2374 # Poll the output and press the power button for the longer presses. 2375 utils.wait_for_value(self._check_open_and_press_power_button, 2376 expected_value=True, 2377 timeout_sec=self.cr50.PP_LONG) 2378 except Exception as e: 2379 logging.info(e) 2380 raise 2381 finally: 2382 self._close_ccd_open_job() 2383 self._try_to_bring_dut_up() 2384 logging.info(self.cr50.get_ccd_info()) 2385 2386 def enter_mode_after_checking_cr50_state(self, mode): 2387 """Reboot to mode if cr50 doesn't already match the state""" 2388 if not hasattr(self, 'cr50'): 2389 raise error.TestNAError('Test can only be run on devices with ' 2390 'access to the Cr50 console') 2391 2392 # If the device is already in the correct mode, don't do anything 2393 if (mode == 'dev') == self.cr50.in_dev_mode(): 2394 logging.info('already in %r mode', mode) 2395 return 2396 2397 self.switcher.reboot_to_mode(to_mode=mode) 2398 2399 if (mode == 'dev') != self.cr50.in_dev_mode(): 2400 raise error.TestError('Unable to enter %r mode' % mode) 2401 2402 def fast_ccd_open(self, enable_testlab=False, reset_ccd=True, 2403 dev_mode=False): 2404 """Try to use ccd testlab open. If that fails, do regular ap open. 2405 2406 Args: 2407 enable_testlab: If True, enable testlab mode after cr50 is open. 2408 reset_ccd: If True, reset ccd after open. 2409 dev_mode: True if the device should be in dev mode after ccd is 2410 is opened. 2411 """ 2412 if not hasattr(self, 'cr50'): 2413 raise error.TestNAError('Test can only be run on devices with ' 2414 'access to the Cr50 console') 2415 2416 if self.servo.main_device_is_ccd() and not self.cr50.testlab_is_on(): 2417 error_txt = 'because the main servo device is CCD.' 2418 if enable_testlab: 2419 raise error.TestNAError('Cannot enable testlab: %s' % error_txt) 2420 elif reset_ccd: 2421 raise error.TestNAError('CCD reset not allowed: %s' % error_txt) 2422 2423 if not self.faft_config.has_powerbutton: 2424 logging.warning('No power button', exc_info=True) 2425 enable_testlab = False 2426 2427 # Try to use testlab open first, so we don't have to wait for the 2428 # physical presence check. 2429 self.cr50.send_command('ccd testlab open') 2430 if self.cr50.OPEN != self.cr50.get_ccd_level(): 2431 if self.servo.has_control('chassis_open'): 2432 self.servo.set('chassis_open', 'yes') 2433 pw = '' if self.cr50.password_is_reset() else self.CCD_PASSWORD 2434 # Use the console to open cr50 without entering dev mode if 2435 # possible. Ittakes longer and relies on more systems to enter dev 2436 # mode and ssh into the AP. Skip the steps that aren't required. 2437 if not (pw or self.cr50.get_cap( 2438 'OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]): 2439 self.enter_mode_after_checking_cr50_state('dev') 2440 2441 if pw or self.cr50.get_cap( 2442 'OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]: 2443 self.cr50.set_ccd_level(self.cr50.OPEN, pw) 2444 else: 2445 self.ccd_open_from_ap() 2446 2447 if self.servo.has_control('chassis_open'): 2448 self.servo.set('chassis_open', 'no') 2449 2450 if enable_testlab: 2451 self.cr50.set_ccd_testlab('on') 2452 2453 if reset_ccd: 2454 self.cr50.ccd_reset() 2455 2456 # In default, the device should be in normal mode. After opening cr50, 2457 # the TPM should be cleared and the device should automatically reset to 2458 # normal mode. However, some tests might want the device in 'dev' mode. 2459 self.enter_mode_after_checking_cr50_state('dev' if dev_mode else 2460 'normal') 2461