1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import ctypes 7import logging 8import os 9import pprint 10import re 11import time 12import uuid 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error 16from autotest_lib.client.common_lib.cros import tpm_utils 17from autotest_lib.server import test 18from autotest_lib.server.cros import vboot_constants as vboot 19from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig 20from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 21from autotest_lib.server.cros.faft.utils import mode_switcher 22from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 23from autotest_lib.server.cros.servo import chrome_base_ec 24from autotest_lib.server.cros.servo import chrome_cr50 25from autotest_lib.server.cros.servo import chrome_ec 26from autotest_lib.server.cros.servo import servo 27 28ConnectionError = mode_switcher.ConnectionError 29 30 31class FAFTBase(test.test): 32 """The base class of FAFT classes. 33 34 It launches the FAFTClient on DUT, such that the test can access its 35 firmware functions and interfaces. It also provides some methods to 36 handle the reboot mechanism, in order to ensure FAFTClient is still 37 connected after reboot. 38 @type servo: servo.Servo 39 """ 40 def initialize(self, host): 41 """Create a FAFTClient object and install the dependency.""" 42 43 self.servo = host.servo 44 45 # Rotate old logs out of the way before test starts, to avoid noise. 46 self.servo.rotate_servod_logs(filename=None) 47 self.servo.initialize_dut() 48 49 self._client = host 50 self.faft_client = RPCProxy(host) 51 self.lockfile = '/usr/local/tmp/faft/lock' 52 53 54class FirmwareTest(FAFTBase): 55 """ 56 Base class that sets up helper objects/functions for firmware tests. 57 58 TODO: add documentaion as the FAFT rework progresses. 59 """ 60 version = 1 61 62 # Mapping of partition number of kernel and rootfs. 63 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 64 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 65 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 66 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 67 68 CHROMEOS_MAGIC = "CHROMEOS" 69 CORRUPTED_MAGIC = "CORRUPTD" 70 71 # Delay for waiting client to return before EC suspend 72 EC_SUSPEND_DELAY = 5 73 74 # Delay between EC suspend and wake 75 WAKE_DELAY = 10 76 77 # Delay between closing and opening lid 78 LID_DELAY = 1 79 80 # FWMP space constants 81 FWMP_CLEARED_EXIT_STATUS = 1 82 FWMP_CLEARED_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS' 83 '_INVALID') 84 85 # UARTs that may be captured 86 UARTS = ('cpu', 'cr50', 'ec', 'servo_micro', 'servo_v4', 'usbpd') 87 88 _ROOTFS_PARTITION_NUMBER = 3 89 90 _backup_firmware_identity = dict() 91 _backup_kernel_sha = dict() 92 _backup_cgpt_attr = dict() 93 _backup_gbb_flags = None 94 _backup_dev_mode = None 95 96 # Class level variable, keep track the states of one time setup. 97 # This variable is preserved across tests which inherit this class. 98 _global_setup_done = { 99 'gbb_flags': False, 100 'reimage': False, 101 'usb_check': False, 102 } 103 104 @classmethod 105 def check_setup_done(cls, label): 106 """Check if the given setup is done. 107 108 @param label: The label of the setup. 109 """ 110 return cls._global_setup_done[label] 111 112 @classmethod 113 def mark_setup_done(cls, label): 114 """Mark the given setup done. 115 116 @param label: The label of the setup. 117 """ 118 cls._global_setup_done[label] = True 119 120 @classmethod 121 def unmark_setup_done(cls, label): 122 """Mark the given setup not done. 123 124 @param label: The label of the setup. 125 """ 126 cls._global_setup_done[label] = False 127 128 def initialize(self, host, cmdline_args, ec_wp=None): 129 super(FirmwareTest, self).initialize(host) 130 self.run_id = str(uuid.uuid4()) 131 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 132 # Parse arguments from command line 133 args = {} 134 self.power_control = host.POWER_CONTROL_RPM 135 for arg in cmdline_args: 136 match = re.search("^(\w+)=(.+)", arg) 137 if match: 138 args[match.group(1)] = match.group(2) 139 140 self._no_ec_sync = False 141 if 'no_ec_sync' in args: 142 if 'true' in args['no_ec_sync'].lower(): 143 self._no_ec_sync = True 144 145 self.faft_config = FAFTConfig( 146 self.faft_client.system.get_platform_name(), 147 self.faft_client.system.get_model_name()) 148 self.checkers = FAFTCheckers(self) 149 self.switcher = mode_switcher.create_mode_switcher(self) 150 151 if self.faft_config.chrome_ec: 152 self.ec = chrome_ec.ChromeEC(self.servo) 153 # Check for presence of a USBPD console 154 if self.faft_config.chrome_usbpd: 155 self.usbpd = chrome_ec.ChromeUSBPD(self.servo) 156 elif self.faft_config.chrome_ec: 157 # If no separate USBPD console, then PD exists on EC console 158 self.usbpd = self.ec 159 # Get pdtester console 160 self.pdtester = host.pdtester 161 self.pdtester_host = host._pdtester_host 162 163 if 'power_control' in args: 164 self.power_control = args['power_control'] 165 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 166 raise error.TestError('Valid values for --args=power_control ' 167 'are %s. But you entered wrong argument ' 168 'as "%s".' 169 % (host.POWER_CONTROL_VALID_ARGS, 170 self.power_control)) 171 172 if not self.faft_client.system.dev_tpm_present(): 173 raise error.TestError('/dev/tpm0 does not exist on the client') 174 175 # Create the BaseEC object. None if not available. 176 self.base_ec = chrome_base_ec.create_base_ec(self.servo) 177 178 self._setup_uart_capture() 179 self._record_system_info() 180 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 181 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 182 if self.fw_vboot2: 183 self.faft_client.system.set_fw_try_next('A') 184 if self.faft_client.system.get_crossystem_value( 185 'mainfw_act') == 'B': 186 logging.info('mainfw_act is B. rebooting to set it A') 187 # TODO(crbug.com/1018322): remove try/catch once that bug is 188 # marked as fixed and verified. In that case the overlay for 189 # the board itself will map warm_reset to cold_reset. 190 try: 191 self.switcher.mode_aware_reboot() 192 except ConnectionError as e: 193 if 'DUT is still up unexpectedly' in str(e): 194 # In this case, try doing a cold_reset instead 195 self.switcher.mode_aware_reboot(reboot_type='cold') 196 else: 197 raise 198 199 # Check flashrom before first use, to avoid xmlrpclib.Fault. 200 if not self.faft_client.bios.is_available(): 201 raise error.TestError( 202 "flashrom is broken; check 'flashrom -p host'" 203 "and rpc server log.") 204 205 self._setup_gbb_flags() 206 self.faft_client.updater.stop_daemon() 207 self._create_faft_lockfile() 208 self._create_old_faft_lockfile() 209 self._setup_ec_write_protect(ec_wp) 210 # See chromium:239034 regarding needing this sync. 211 self.blocking_sync() 212 self.servo.rotate_servod_logs('servod.init', self.resultsdir) 213 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 214 215 def cleanup(self): 216 """Autotest cleanup function.""" 217 # Unset state checker in case it's set by subclass 218 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 219 220 # capture servod logs for body of test 221 self.servo.rotate_servod_logs('servod', self.resultsdir) 222 223 # Capture UART before doing anything else, so we can guarantee we get 224 # some uart results. 225 try: 226 self._record_uart_capture() 227 except: 228 logging.warn('Failed initial uart capture during cleanup') 229 230 # Discard redundant log messages containing the captured uart text: 231 # ... Servod - DEBUG - servo_server.py:765:get - ec_uart_stream = '...' 232 self.servo.rotate_servod_logs(filename=None) 233 234 try: 235 self.faft_client.system.is_available() 236 except: 237 # Remote is not responding. Revive DUT so that subsequent tests 238 # don't fail. 239 self._restore_routine_from_timeout() 240 self.switcher.restore_mode() 241 self._restore_ec_write_protect() 242 self._restore_servo_v4_role() 243 self._restore_gbb_flags() 244 self.faft_client.updater.start_daemon() 245 self.faft_client.updater.cleanup() 246 self._remove_faft_lockfile() 247 self._remove_old_faft_lockfile() 248 self._record_faft_client_log() 249 self.servo.rotate_servod_logs('servod.cleanup', self.resultsdir) 250 251 # Capture any new uart output, then discard log messages again. 252 self._cleanup_uart_capture() 253 self.servo.rotate_servod_logs(filename=None) 254 255 super(FirmwareTest, self).cleanup() 256 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 257 258 def _record_system_info(self): 259 """Record some critical system info to the attr keyval. 260 261 This info is used by generate_test_report later. 262 """ 263 system_info = { 264 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 265 'ec_version': self.faft_client.ec.get_version(), 266 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 267 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 268 'servo_host_os_version' : self.servo.get_os_version(), 269 'servod_version': self.servo.get_servod_version(), 270 'os_version': self._client.get_release_builder_path(), 271 'servo_type': self.servo.get_servo_version() 272 } 273 274 # Record the servo v4 and servo micro versions when possible 275 system_info.update(self.servo.get_servo_fw_versions()) 276 277 if hasattr(self, 'cr50'): 278 system_info['cr50_version'] = self.servo.get('cr50_version') 279 280 logging.info('System info:\n%s', pprint.pformat(system_info)) 281 self.write_attr_keyval(system_info) 282 283 def invalidate_firmware_setup(self): 284 """Invalidate all firmware related setup state. 285 286 This method is called when the firmware is re-flashed. It resets all 287 firmware related setup states so that the next test setup properly 288 again. 289 """ 290 self.unmark_setup_done('gbb_flags') 291 292 def _retrieve_recovery_reason_from_trap(self): 293 """Try to retrieve the recovery reason from a trapped recovery screen. 294 295 @return: The recovery_reason, 0 if any error. 296 """ 297 recovery_reason = 0 298 logging.info('Try to retrieve recovery reason...') 299 if self.servo.get_usbkey_direction() == 'dut': 300 self.switcher.bypass_rec_mode() 301 else: 302 self.servo.switch_usbkey('dut') 303 304 try: 305 self.switcher.wait_for_client() 306 lines = self.faft_client.system.run_shell_command_get_output( 307 'crossystem recovery_reason') 308 recovery_reason = int(lines[0]) 309 logging.info('Got the recovery reason %d.', recovery_reason) 310 except ConnectionError: 311 logging.error('Failed to get the recovery reason due to connection ' 312 'error.') 313 return recovery_reason 314 315 def _reset_client(self): 316 """Reset client to a workable state. 317 318 This method is called when the client is not responsive. It may be 319 caused by the following cases: 320 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 321 - corrupted firmware; 322 - corrutped OS image. 323 """ 324 # DUT may halt on a firmware screen. Try cold reboot. 325 logging.info('Try cold reboot...') 326 self.switcher.mode_aware_reboot(reboot_type='cold', 327 sync_before_boot=False, 328 wait_for_dut_up=False) 329 self.switcher.wait_for_client_offline() 330 self.switcher.bypass_dev_mode() 331 try: 332 self.switcher.wait_for_client() 333 return 334 except ConnectionError: 335 logging.warn('Cold reboot doesn\'t help, still connection error.') 336 337 # DUT may be broken by a corrupted firmware. Restore firmware. 338 # We assume the recovery boot still works fine. Since the recovery 339 # code is in RO region and all FAFT tests don't change the RO region 340 # except GBB. 341 if self.is_firmware_saved(): 342 self._ensure_client_in_recovery() 343 logging.info('Try restore the original firmware...') 344 if self.is_firmware_changed(): 345 try: 346 self.restore_firmware() 347 return 348 except ConnectionError: 349 logging.warn('Restoring firmware doesn\'t help, still ' 350 'connection error.') 351 352 # Perhaps it's kernel that's broken. Let's try restoring it. 353 if self.is_kernel_saved(): 354 self._ensure_client_in_recovery() 355 logging.info('Try restore the original kernel...') 356 if self.is_kernel_changed(): 357 try: 358 self.restore_kernel() 359 return 360 except ConnectionError: 361 logging.warn('Restoring kernel doesn\'t help, still ' 362 'connection error.') 363 364 # DUT may be broken by a corrupted OS image. Restore OS image. 365 self._ensure_client_in_recovery() 366 logging.info('Try restore the OS image...') 367 self.faft_client.system.run_shell_command('chromeos-install --yes') 368 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 369 self.switcher.wait_for_client_offline() 370 self.switcher.bypass_dev_mode() 371 try: 372 self.switcher.wait_for_client() 373 logging.info('Successfully restore OS image.') 374 return 375 except ConnectionError: 376 logging.warn('Restoring OS image doesn\'t help, still connection ' 377 'error.') 378 379 def _ensure_client_in_recovery(self): 380 """Ensure client in recovery boot; reboot into it if necessary. 381 382 @raise TestError: if failed to boot the USB image. 383 """ 384 logging.info('Try boot into USB image...') 385 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 386 wait_for_dut_up=False) 387 self.servo.switch_usbkey('host') 388 self.switcher.bypass_rec_mode() 389 try: 390 self.switcher.wait_for_client() 391 except ConnectionError: 392 raise error.TestError('Failed to boot the USB image.') 393 394 def _restore_routine_from_timeout(self): 395 """A routine to try to restore the system from a timeout error. 396 397 This method is called when FAFT failed to connect DUT after reboot. 398 399 @raise TestFail: This exception is already raised, with a decription 400 why it failed. 401 """ 402 # DUT is disconnected. Capture the UART output for debug. 403 self._record_uart_capture() 404 405 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 406 # identify if it is a network flaky. 407 408 recovery_reason = self._retrieve_recovery_reason_from_trap() 409 410 # Reset client to a workable state. 411 self._reset_client() 412 413 # Raise the proper TestFail exception. 414 if recovery_reason: 415 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 416 'and timed out' % recovery_reason) 417 else: 418 raise error.TestFail('Timed out waiting for DUT reboot') 419 420 def assert_test_image_in_usb_disk(self, usb_dev=None): 421 """Assert an USB disk plugged-in on servo and a test image inside. 422 423 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 424 If None, it is detected automatically. 425 @raise TestError: if USB disk not detected or not a test image. 426 """ 427 if self.check_setup_done('usb_check'): 428 return 429 if usb_dev: 430 assert self.servo.get_usbkey_direction() == 'host' 431 else: 432 self.servo.switch_usbkey('host') 433 usb_dev = self.servo.probe_host_usb_dev() 434 if not usb_dev: 435 raise error.TestError( 436 'An USB disk should be plugged in the servo board.') 437 438 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 439 logging.info('usb dev is %s', usb_dev) 440 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 441 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 442 443 try: 444 usb_lsb = self.servo.system_output('cat %s' % 445 os.path.join(tmpd, 'etc/lsb-release')) 446 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 447 dut_lsb = '\n'.join(self.faft_client.system. 448 run_shell_command_get_output('cat /etc/lsb-release')) 449 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 450 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 451 raise error.TestError('USB stick in servo is no test image') 452 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 453 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 454 if usb_board != dut_board: 455 raise error.TestError('USB stick in servo contains a %s ' 456 'image, but DUT is a %s' % (usb_board, dut_board)) 457 finally: 458 for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd): 459 self.servo.system(cmd) 460 461 self.mark_setup_done('usb_check') 462 463 def setup_pdtester(self, flip_cc=False): 464 """Setup the PDTester to a given state. 465 466 @param flip_cc: True to flip CC polarity; False to not flip it. 467 @raise TestError: If Servo v4 not setup properly. 468 """ 469 # Servo v4 by default has dts_mode enabled. Enabling dts_mode affects 470 # the behaviors of what PD FAFT tests. So we want it disabled. 471 if 'servo_v4' in self.pdtester.servo_type: 472 self.pdtester.set('servo_v4_dts_mode', 'off') 473 474 self.pdtester.set('usbc_polarity', 'cc2' if flip_cc else 'cc1') 475 # Make it sourcing max voltage. 476 self.pdtester.charge(self.pdtester.USBC_MAX_VOLTAGE) 477 478 # Servo v4 requires an external charger to source power. Make sure 479 # this setup is correct. 480 if ('servo_v4' in self.pdtester.servo_type and 481 self.pdtester.get('servo_v4_role') != 'src'): 482 raise error.TestError('Servo v4 failed sourcing power! Check ' 483 'the "DUT POWER" port connecting a valid charger.') 484 485 def setup_usbkey(self, usbkey, host=None, used_for_recovery=None): 486 """Setup the USB disk for the test. 487 488 It checks the setup of USB disk and a valid ChromeOS test image inside. 489 It also muxes the USB disk to either the host or DUT by request. 490 491 @param usbkey: True if the USB disk is required for the test, False if 492 not required. 493 @param host: Optional, True to mux the USB disk to host, False to mux it 494 to DUT, default to do nothing. 495 @param used_for_recovery: Optional, True if the USB disk is used for 496 recovery boot; False if the USB disk is not 497 used for recovery boot, like Ctrl-U USB boot. 498 """ 499 if usbkey: 500 self.assert_test_image_in_usb_disk() 501 elif host is None: 502 # USB disk is not required for the test. Better to mux it to host. 503 host = True 504 505 if host is True: 506 self.servo.switch_usbkey('host') 507 elif host is False: 508 self.servo.switch_usbkey('dut') 509 510 if used_for_recovery is None: 511 # Default value is True if usbkey == True. 512 # As the common usecase of USB disk is for recovery boot. Tests 513 # can define it explicitly if not. 514 used_for_recovery = usbkey 515 516 if used_for_recovery: 517 # In recovery boot, the locked EC RO doesn't support PD for most 518 # of the CrOS devices. The default servo v4 power role is a SRC. 519 # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the 520 # data role swap from UFP to DFP; as a result, DUT can't see the 521 # USB disk and the Ethernet dongle on servo v4. 522 # 523 # This is a workaround to set servo v4 as a SNK, for every FAFT 524 # test which boots into the USB disk in the recovery mode. 525 # 526 # TODO(waihong): Add a check to see if the battery level is too 527 # low and sleep for a while for charging. 528 self.set_servo_v4_role_to_snk() 529 530 def set_servo_v4_role_to_snk(self): 531 """Set the servo v4 role to SNK.""" 532 self._needed_restore_servo_v4_role = True 533 self.servo.set_servo_v4_role('snk') 534 535 def _restore_servo_v4_role(self): 536 """Restore the servo v4 role to default SRC.""" 537 if not hasattr(self, '_needed_restore_servo_v4_role'): 538 return 539 if self._needed_restore_servo_v4_role: 540 self.servo.set_servo_v4_role('src') 541 542 def get_usbdisk_path_on_dut(self): 543 """Get the path of the USB disk device plugged-in the servo on DUT. 544 545 Returns: 546 A string representing USB disk path, like '/dev/sdb', or None if 547 no USB disk is found. 548 """ 549 cmd = 'ls -d /dev/s*[a-z]' 550 original_value = self.servo.get_usbkey_direction() 551 552 # Make the dut unable to see the USB disk. 553 self.servo.switch_usbkey('off') 554 no_usb_set = set( 555 self.faft_client.system.run_shell_command_get_output(cmd)) 556 557 # Make the dut able to see the USB disk. 558 self.servo.switch_usbkey('dut') 559 time.sleep(self.faft_config.usb_plug) 560 has_usb_set = set( 561 self.faft_client.system.run_shell_command_get_output(cmd)) 562 563 # Back to its original value. 564 if original_value != self.servo.get_usbkey_direction(): 565 self.servo.switch_usbkey(original_value) 566 567 diff_set = has_usb_set - no_usb_set 568 if len(diff_set) == 1: 569 return diff_set.pop() 570 else: 571 return None 572 573 def _create_faft_lockfile(self): 574 """Creates the FAFT lockfile.""" 575 logging.info('Creating FAFT lockfile...') 576 command = 'touch %s' % (self.lockfile) 577 self.faft_client.system.run_shell_command(command) 578 579 def _create_old_faft_lockfile(self): 580 """ 581 Creates the FAFT lockfile in its legacy location. 582 583 TODO (once M83 is stable, approx. June 9 2020): 584 Delete this function, as platform/installer/chromeos-setgoodkernel 585 will look for the lockfile in the new location 586 (/usr/local/tmp/faft/lock) 587 """ 588 logging.info('Creating legacy FAFT lockfile...') 589 self.faft_client.system.run_shell_command('mkdir -p /var/tmp/faft') 590 self.faft_client.system.run_shell_command('touch /var/tmp/faft/lock') 591 592 def _remove_faft_lockfile(self): 593 """Removes the FAFT lockfile.""" 594 logging.info('Removing FAFT lockfile...') 595 command = 'rm -f %s' % (self.lockfile) 596 self.faft_client.system.run_shell_command(command) 597 598 def _remove_old_faft_lockfile(self): 599 """ 600 Removes the FAFT lockfile from its legacy location. 601 602 TODO (once M83 is stable, approx. June 9 2020): 603 Delete this function, as platform/installer/chromeos-setgoodkernel 604 will look for the lockfile in the new location 605 (/usr/local/tmp/faft/lock) 606 """ 607 logging.info('Removing legacy FAFT lockfile...') 608 self.faft_client.system.run_shell_command('rm -rf /var/tmp/faft') 609 610 def clear_set_gbb_flags(self, clear_mask, set_mask): 611 """Clear and set the GBB flags in the current flashrom. 612 613 @param clear_mask: A mask of flags to be cleared. 614 @param set_mask: A mask of flags to be set. 615 """ 616 gbb_flags = self.faft_client.bios.get_gbb_flags() 617 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 618 self.gbb_flags = new_flags 619 if new_flags != gbb_flags: 620 self._backup_gbb_flags = gbb_flags 621 logging.info('Changing GBB flags from 0x%x to 0x%x.', 622 gbb_flags, new_flags) 623 self.faft_client.bios.set_gbb_flags(new_flags) 624 # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag, 625 # reboot to get a clear state 626 if ((gbb_flags ^ new_flags) & 627 (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 628 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)): 629 self.switcher.mode_aware_reboot() 630 else: 631 logging.info('Current GBB flags look good for test: 0x%x.', 632 gbb_flags) 633 634 635 def _check_capability(self, target, required_cap, suppress_warning): 636 """Check if current platform has required capabilities for the target. 637 638 @param required_cap: A list containing required capabilities. 639 @param suppress_warning: True to suppress any warning messages. 640 @return: True if requirements are met. Otherwise, False. 641 """ 642 if not required_cap: 643 return True 644 645 if target not in ['ec', 'cr50']: 646 raise error.TestError('Invalid capability target %r' % target) 647 648 for cap in required_cap: 649 if cap not in getattr(self.faft_config, target + '_capability'): 650 if not suppress_warning: 651 logging.warn('Requires %s capability "%s" to run this ' 652 'test.', target, cap) 653 return False 654 655 return True 656 657 658 def check_ec_capability(self, required_cap=None, suppress_warning=False): 659 """Check if current platform has required EC capabilities. 660 661 @param required_cap: A list containing required EC capabilities. Pass in 662 None to only check for presence of Chrome EC. 663 @param suppress_warning: True to suppress any warning messages. 664 @return: True if requirements are met. Otherwise, False. 665 """ 666 if not self.faft_config.chrome_ec: 667 if not suppress_warning: 668 logging.warn('Requires Chrome EC to run this test.') 669 return False 670 return self._check_capability('ec', required_cap, suppress_warning) 671 672 673 def check_cr50_capability(self, required_cap=None, suppress_warning=False): 674 """Check if current platform has required Cr50 capabilities. 675 676 @param required_cap: A list containing required Cr50 capabilities. Pass 677 in None to only check for presence of cr50 uart. 678 @param suppress_warning: True to suppress any warning messages. 679 @return: True if requirements are met. Otherwise, False. 680 """ 681 if not hasattr(self, 'cr50'): 682 if not suppress_warning: 683 logging.warn('Requires Chrome Cr50 to run this test.') 684 return False 685 return self._check_capability('cr50', required_cap, suppress_warning) 686 687 688 def check_root_part_on_non_recovery(self, part): 689 """Check the partition number of root device and on normal/dev boot. 690 691 @param part: A string of partition number, e.g.'3'. 692 @return: True if the root device matched and on normal/dev boot; 693 otherwise, False. 694 """ 695 return self.checkers.root_part_checker(part) and \ 696 self.checkers.crossystem_checker({ 697 'mainfw_type': ('normal', 'developer'), 698 }) 699 700 def _join_part(self, dev, part): 701 """Return a concatenated string of device and partition number. 702 703 @param dev: A string of device, e.g.'/dev/sda'. 704 @param part: A string of partition number, e.g.'3'. 705 @return: A concatenated string of device and partition number, 706 e.g.'/dev/sda3'. 707 708 >>> seq = FirmwareTest() 709 >>> seq._join_part('/dev/sda', '3') 710 '/dev/sda3' 711 >>> seq._join_part('/dev/mmcblk0', '2') 712 '/dev/mmcblk0p2' 713 """ 714 if 'mmcblk' in dev: 715 return dev + 'p' + part 716 elif 'nvme' in dev: 717 return dev + 'p' + part 718 else: 719 return dev + part 720 721 def copy_kernel_and_rootfs(self, from_part, to_part): 722 """Copy kernel and rootfs from from_part to to_part. 723 724 @param from_part: A string of partition number to be copied from. 725 @param to_part: A string of partition number to be copied to. 726 """ 727 root_dev = self.faft_client.system.get_root_dev() 728 logging.info('Copying kernel from %s to %s. Please wait...', 729 from_part, to_part) 730 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 731 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 732 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 733 logging.info('Copying rootfs from %s to %s. Please wait...', 734 from_part, to_part) 735 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 736 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 737 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 738 739 def ensure_kernel_boot(self, part): 740 """Ensure the request kernel boot. 741 742 If not, it duplicates the current kernel to the requested kernel 743 and sets the requested higher priority to ensure it boot. 744 745 @param part: A string of kernel partition number or 'a'/'b'. 746 """ 747 if not self.checkers.root_part_checker(part): 748 if self.faft_client.kernel.diff_a_b(): 749 self.copy_kernel_and_rootfs( 750 from_part=self.OTHER_KERNEL_MAP[part], 751 to_part=part) 752 self.reset_and_prioritize_kernel(part) 753 self.switcher.mode_aware_reboot() 754 755 def ensure_dev_internal_boot(self, original_dev_boot_usb): 756 """Ensure internal device boot in developer mode. 757 758 If not internal device boot, it will try to reboot the device and 759 bypass dev mode to boot into internal device. 760 761 @param original_dev_boot_usb: Original dev_boot_usb value. 762 """ 763 logging.info('Checking internal device boot.') 764 if self.faft_client.system.is_removable_device_boot(): 765 logging.info('Reboot into internal disk...') 766 self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb) 767 self.switcher.mode_aware_reboot() 768 self.check_state((self.checkers.dev_boot_usb_checker, False, 769 'Device not booted from internal disk properly.')) 770 771 def set_hardware_write_protect(self, enable): 772 """Set hardware write protect pin. 773 774 @param enable: True if asserting write protect pin. Otherwise, False. 775 """ 776 self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off') 777 778 def set_ec_write_protect_and_reboot(self, enable): 779 """Set EC write protect status and reboot to take effect. 780 781 The write protect state is only activated if both hardware write 782 protect pin is asserted and software write protect flag is set. 783 This method asserts/deasserts hardware write protect pin first, and 784 set corresponding EC software write protect flag. 785 786 If the device uses non-Chrome EC, set the software write protect via 787 flashrom. 788 789 If the device uses Chrome EC, a reboot is required for write protect 790 to take effect. Since the software write protect flag cannot be unset 791 if hardware write protect pin is asserted, we need to deasserted the 792 pin first if we are deactivating write protect. Similarly, a reboot 793 is required before we can modify the software flag. 794 795 @param enable: True if activating EC write protect. Otherwise, False. 796 """ 797 self.set_hardware_write_protect(enable) 798 if self.faft_config.chrome_ec: 799 self.set_chrome_ec_write_protect_and_reboot(enable) 800 else: 801 self.faft_client.ec.set_write_protect(enable) 802 self.switcher.mode_aware_reboot() 803 804 def set_chrome_ec_write_protect_and_reboot(self, enable): 805 """Set Chrome EC write protect status and reboot to take effect. 806 807 @param enable: True if activating EC write protect. Otherwise, False. 808 """ 809 if enable: 810 # Set write protect flag and reboot to take effect. 811 self.ec.set_flash_write_protect(enable) 812 self.sync_and_ec_reboot() 813 else: 814 # Reboot after deasserting hardware write protect pin to deactivate 815 # write protect. And then remove software write protect flag. 816 self.sync_and_ec_reboot() 817 self.ec.set_flash_write_protect(enable) 818 819 def _setup_ec_write_protect(self, ec_wp): 820 """Setup for EC write-protection. 821 822 It makes sure the EC in the requested write-protection state. If not, it 823 flips the state. Flipping the write-protection requires DUT reboot. 824 825 @param ec_wp: True to request EC write-protected; False to request EC 826 not write-protected; None to do nothing. 827 """ 828 if ec_wp is None: 829 self._old_wpsw_boot = None 830 return 831 self._old_wpsw_cur = self.checkers.crossystem_checker( 832 {'wpsw_cur': '1'}, suppress_logging=True) 833 self._old_wpsw_boot = self.checkers.crossystem_checker( 834 {'wpsw_boot': '1'}, suppress_logging=True) 835 if not (ec_wp == self._old_wpsw_cur == self._old_wpsw_boot): 836 if not self.faft_config.ap_access_ec_flash: 837 raise error.TestNAError( 838 "Cannot change EC write-protect for this device") 839 840 logging.info('The test required EC is %swrite-protected. Reboot ' 841 'and flip the state.', '' if ec_wp else 'not ') 842 self.switcher.mode_aware_reboot( 843 'custom', 844 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 845 wpsw_boot = wpsw_cur = '1' if ec_wp == True else '0' 846 self.check_state((self.checkers.crossystem_checker, { 847 'wpsw_boot': wpsw_boot, 'wpsw_cur': wpsw_cur})) 848 849 def _restore_ec_write_protect(self): 850 """Restore the original EC write-protection.""" 851 if (not hasattr(self, '_old_wpsw_boot')) or (self._old_wpsw_boot is 852 None): 853 return 854 if not self.checkers.crossystem_checker({'wpsw_boot': '1' if 855 self._old_wpsw_boot else '0'}, suppress_logging=True): 856 logging.info('Restore original EC write protection and reboot.') 857 self.switcher.mode_aware_reboot( 858 'custom', 859 lambda:self.set_ec_write_protect_and_reboot( 860 self._old_wpsw_boot)) 861 self.check_state((self.checkers.crossystem_checker, { 862 'wpsw_boot': '1' if self._old_wpsw_boot else '0'})) 863 864 def _setup_uart_capture(self): 865 """Set up the CPU/EC/PD UART capture.""" 866 867 # If adding another capture, make sure to update the UARTS constant. 868 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt') 869 self.servo.set('cpu_uart_capture', 'on') 870 self.cr50_uart_file = None 871 self.ec_uart_file = None 872 self.servo_micro_uart_file = None 873 self.servo_v4_uart_file = None 874 self.usbpd_uart_file = None 875 876 try: 877 # Check that the console works before declaring the cr50 console 878 # connection exists and enabling uart capture. 879 self.servo.get('cr50_version') 880 self.servo.set('cr50_uart_capture', 'on') 881 self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt') 882 self.cr50 = chrome_cr50.ChromeCr50(self.servo, self.faft_config) 883 except servo.ControlUnavailableError: 884 logging.warn('cr50 console not supported.') 885 except error.TestFail as e: 886 logging.warn('Unknown cr50 uart capture error: %s', str(e)) 887 if (self.faft_config.chrome_ec and 888 self.servo.has_control('ec_uart_capture')): 889 self.servo.set('ec_uart_capture', 'on') 890 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt') 891 # Log separate PD console if supported 892 if (self.check_ec_capability(['usbpd_uart'], suppress_warning=True) 893 and self.servo.has_control('usb_pd_uart_capture')): 894 self.servo.set('usbpd_uart_capture', 'on') 895 self.usbpd_uart_file = os.path.join(self.resultsdir, 896 'usbpd_uart.txt') 897 else: 898 logging.info('Not a Google EC, cannot capture ec console output.') 899 900 for servo_console in ['servo_micro', 'servo_v4']: 901 capture_cmd = '%s_uart_capture' % servo_console 902 uart_file_attr = '%s_uart_file' % servo_console 903 if self.servo.has_control(capture_cmd): 904 self.servo.set(capture_cmd, 'on') 905 outfile = '%s_uart.txt' % servo_console 906 setattr(self, uart_file_attr, os.path.join(self.resultsdir, 907 outfile)) 908 909 def _record_uart_capture(self): 910 """Record the CPU/EC/PD UART output stream to files.""" 911 for uart in self.UARTS: 912 # Attribute will be nonexistent or empty if capture wasn't set up. 913 uart_file = getattr(self, '%s_uart_file' % uart, None) 914 if uart_file: 915 with open(uart_file, 'a') as f: 916 f.write(ast.literal_eval( 917 self.servo.get('%s_uart_stream' % uart))) 918 919 def _cleanup_uart_capture(self): 920 """Cleanup the CPU/EC/PD UART capture.""" 921 # Flush the remaining UART output first. 922 self._record_uart_capture() 923 for uart in self.UARTS: 924 # Attribute will be nonexistent or empty if capture wasn't set up. 925 uart_file = getattr(self, '%s_uart_file' % uart, None) 926 if uart_file: 927 self.servo.set('%s_uart_capture' % uart, 'off') 928 929 def _get_power_state(self, power_state): 930 """ 931 Return the current power state of the AP 932 """ 933 return self.ec.send_command_get_output("powerinfo", [power_state]) 934 935 def wait_power_state(self, power_state, retries): 936 """ 937 Wait for certain power state. 938 939 @param power_state: power state you are expecting 940 @param retries: retries. This is necessary if AP is powering down 941 and transitioning through different states. 942 """ 943 logging.info('Checking power state "%s" maximum %d times.', 944 power_state, retries) 945 while retries > 0: 946 logging.info("try count: %d", retries) 947 try: 948 retries = retries - 1 949 ret = self._get_power_state(power_state) 950 return True 951 except error.TestFail: 952 pass 953 return False 954 955 def suspend(self): 956 """Suspends the DUT.""" 957 cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY 958 self.faft_client.system.run_shell_command(cmd) 959 time.sleep(self.EC_SUSPEND_DELAY) 960 961 def _record_faft_client_log(self): 962 """Record the faft client log to the results directory.""" 963 client_log = self.faft_client.system.dump_log(True) 964 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 965 with open(client_log_file, 'w') as f: 966 f.write(client_log) 967 968 def _setup_gbb_flags(self): 969 """Setup the GBB flags for FAFT test.""" 970 if self.check_setup_done('gbb_flags'): 971 return 972 973 logging.info('Set proper GBB flags for test.') 974 # Ensure that GBB flags are set to 0x140. 975 flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE | 976 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM) 977 # And if the "no_ec_sync" argument is set, then disable EC software 978 # sync. 979 if self._no_ec_sync: 980 flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC 981 982 self.clear_set_gbb_flags(0xffffffff, flags_to_set) 983 self.mark_setup_done('gbb_flags') 984 985 def drop_backup_gbb_flags(self): 986 """Drops the backup GBB flags. 987 988 This can be used when a test intends to permanently change GBB flags. 989 """ 990 self._backup_gbb_flags = None 991 992 def _restore_gbb_flags(self): 993 """Restore GBB flags to their original state.""" 994 if self._backup_gbb_flags is None: 995 return 996 # Setting up and restoring the GBB flags take a lot of time. For 997 # speed-up purpose, don't restore it. 998 logging.info('***') 999 logging.info('*** Please manually restore the original GBB flags to: ' 1000 '0x%x ***', self._backup_gbb_flags) 1001 logging.info('***') 1002 self.unmark_setup_done('gbb_flags') 1003 1004 def setup_tried_fwb(self, tried_fwb): 1005 """Setup for fw B tried state. 1006 1007 It makes sure the system in the requested fw B tried state. If not, it 1008 tries to do so. 1009 1010 @param tried_fwb: True if requested in tried_fwb=1; 1011 False if tried_fwb=0. 1012 """ 1013 if tried_fwb: 1014 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 1015 logging.info( 1016 'Firmware is not booted with tried_fwb. Reboot into it.') 1017 self.faft_client.system.set_try_fw_b() 1018 else: 1019 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 1020 logging.info( 1021 'Firmware is booted with tried_fwb. Reboot to clear.') 1022 1023 def power_on(self): 1024 """Switch DUT AC power on.""" 1025 self._client.power_on(self.power_control) 1026 1027 def power_off(self): 1028 """Switch DUT AC power off.""" 1029 self._client.power_off(self.power_control) 1030 1031 def power_cycle(self): 1032 """Power cycle DUT AC power.""" 1033 self._client.power_cycle(self.power_control) 1034 1035 def setup_rw_boot(self, section='a'): 1036 """Make sure firmware is in RW-boot mode. 1037 1038 If the given firmware section is in RO-boot mode, turn off the RO-boot 1039 flag and reboot DUT into RW-boot mode. 1040 1041 @param section: A firmware section, either 'a' or 'b'. 1042 """ 1043 flags = self.faft_client.bios.get_preamble_flags(section) 1044 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 1045 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 1046 self.faft_client.bios.set_preamble_flags(section, flags) 1047 self.switcher.mode_aware_reboot() 1048 1049 def setup_kernel(self, part): 1050 """Setup for kernel test. 1051 1052 It makes sure both kernel A and B bootable and the current boot is 1053 the requested kernel part. 1054 1055 @param part: A string of kernel partition number or 'a'/'b'. 1056 """ 1057 self.ensure_kernel_boot(part) 1058 logging.info('Checking the integrity of kernel B and rootfs B...') 1059 if (self.faft_client.kernel.diff_a_b() or 1060 not self.faft_client.rootfs.verify_rootfs('B')): 1061 logging.info('Copying kernel and rootfs from A to B...') 1062 self.copy_kernel_and_rootfs(from_part=part, 1063 to_part=self.OTHER_KERNEL_MAP[part]) 1064 self.reset_and_prioritize_kernel(part) 1065 1066 def reset_and_prioritize_kernel(self, part): 1067 """Make the requested partition highest priority. 1068 1069 This function also reset kerenl A and B to bootable. 1070 1071 @param part: A string of partition number to be prioritized. 1072 """ 1073 root_dev = self.faft_client.system.get_root_dev() 1074 # Reset kernel A and B to bootable. 1075 self.faft_client.system.run_shell_command( 1076 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 1077 self.faft_client.system.run_shell_command( 1078 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 1079 # Set kernel part highest priority. 1080 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 1081 (self.KERNEL_MAP[part], root_dev)) 1082 1083 def do_blocking_sync(self, device): 1084 """Run a blocking sync command.""" 1085 logging.info("Blocking sync for %s", device) 1086 1087 if 'mmcblk' in device: 1088 # For mmc devices, use `mmc status get` command to send an 1089 # empty command to wait for the disk to be available again. 1090 self.faft_client.system.run_shell_command('mmc status get %s' % 1091 device) 1092 elif 'nvme' in device: 1093 # For NVMe devices, use `nvme flush` command to commit data 1094 # and metadata to non-volatile media. 1095 1096 # Get a list of NVMe namespaces, and flush them individually. 1097 # The output is assumed to be in the following format: 1098 # [ 0]:0x1 1099 # [ 1]:0x2 1100 list_ns_cmd = "nvme list-ns %s" % device 1101 available_ns = self.faft_client.system.run_shell_command_get_output( 1102 list_ns_cmd) 1103 1104 if not available_ns: 1105 raise error.TestError( 1106 "Listing namespaces failed (empty output): %s" 1107 % list_ns_cmd) 1108 1109 for ns in available_ns: 1110 ns = ns.split(':')[-1] 1111 flush_cmd = 'nvme flush %s -n %s' % (device, ns) 1112 flush_rc = self.faft_client.system.run_shell_command_get_status( 1113 flush_cmd) 1114 if flush_rc != 0: 1115 raise error.TestError( 1116 "Flushing namespace %s failed (rc=%s): %s" 1117 % (ns, flush_rc, flush_cmd)) 1118 else: 1119 # For other devices, hdparm sends TUR to check if 1120 # a device is ready for transfer operation. 1121 self.faft_client.system.run_shell_command('hdparm -f %s' % device) 1122 1123 def blocking_sync(self): 1124 """Sync root device and internal device.""" 1125 # The double calls to sync fakes a blocking call 1126 # since the first call returns before the flush 1127 # is complete, but the second will wait for the 1128 # first to finish. 1129 self.faft_client.system.run_shell_command('sync') 1130 self.faft_client.system.run_shell_command('sync') 1131 1132 # sync only sends SYNCHRONIZE_CACHE but doesn't check the status. 1133 # This function will perform a device-specific sync command. 1134 root_dev = self.faft_client.system.get_root_dev() 1135 self.do_blocking_sync(root_dev) 1136 1137 # Also sync the internal device if booted from removable media. 1138 if self.faft_client.system.is_removable_device_boot(): 1139 internal_dev = self.faft_client.system.get_internal_device() 1140 self.do_blocking_sync(internal_dev) 1141 1142 def sync_and_ec_reboot(self, flags=''): 1143 """Request the client sync and do a EC triggered reboot. 1144 1145 @param flags: Optional, a space-separated string of flags passed to EC 1146 reboot command, including: 1147 default: EC soft reboot; 1148 'hard': EC cold/hard reboot. 1149 """ 1150 self.blocking_sync() 1151 self.ec.reboot(flags) 1152 time.sleep(self.faft_config.ec_boot_to_console) 1153 self.check_lid_and_power_on() 1154 1155 def reboot_and_reset_tpm(self): 1156 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 1157 self.switcher.reboot_to_mode(to_mode='rec') 1158 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 1159 self.switcher.mode_aware_reboot() 1160 1161 def full_power_off_and_on(self): 1162 """Shutdown the device by pressing power button and power on again.""" 1163 boot_id = self.get_bootid() 1164 # Press power button to trigger Chrome OS normal shutdown process. 1165 # We use a customized delay since the normal-press 1.2s is not enough. 1166 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 1167 # device can take 44-51 seconds to restart, 1168 # add buffer from the default timeout of 60 seconds. 1169 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 1170 time.sleep(self.faft_config.shutdown) 1171 if self.check_ec_capability(['x86'], suppress_warning=True): 1172 self.check_shutdown_power_state("G3", pwr_retries=5) 1173 # Short press power button to boot DUT again. 1174 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1175 1176 def check_shutdown_power_state(self, power_state, pwr_retries): 1177 """Check whether the device entered into requested EC power state 1178 after shutdown. 1179 1180 @param power_state: EC power state has to be checked. Either S5 or G3. 1181 @param pwr_retries: Times to check if the DUT in expected power state. 1182 @raise TestFail: If device failed to enter into requested power state. 1183 """ 1184 if not self.wait_power_state(power_state, pwr_retries): 1185 raise error.TestFail('System not shutdown properly and EC fails ' 1186 'to enter into %s state.' % power_state) 1187 logging.info('System entered into %s state..', power_state) 1188 1189 def check_lid_and_power_on(self): 1190 """ 1191 On devices with EC software sync, system powers on after EC reboots if 1192 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 1193 This method checks lid switch state and presses power button if 1194 necessary. 1195 """ 1196 if self.servo.get("lid_open") == "no": 1197 time.sleep(self.faft_config.software_sync) 1198 self.servo.power_short_press() 1199 1200 def stop_powerd(self): 1201 """Stop the powerd daemon on the AP. 1202 1203 This will cause the AP to ignore power button presses sent by the EC. 1204 """ 1205 powerd_running = self.faft_client.system.run_shell_command_check_output( 1206 'status powerd', 'start/running') 1207 if powerd_running: 1208 logging.debug('Stopping powerd') 1209 self.faft_client.system.run_shell_command("stop powerd") 1210 1211 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 1212 """Modify the kernel header magic in USB stick. 1213 1214 The kernel header magic is the first 8-byte of kernel partition. 1215 We modify it to make it fail on kernel verification check. 1216 1217 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1218 @param from_magic: A string of magic which we change it from. 1219 @param to_magic: A string of magic which we change it to. 1220 @raise TestError: if failed to change magic. 1221 """ 1222 assert len(from_magic) == 8 1223 assert len(to_magic) == 8 1224 # USB image only contains one kernel. 1225 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1226 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1227 current_magic = self.servo.system_output(read_cmd) 1228 if current_magic == to_magic: 1229 logging.info("The kernel magic is already %s.", current_magic) 1230 return 1231 if current_magic != from_magic: 1232 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1233 1234 logging.info('Modify the kernel magic in USB, from %s to %s.', 1235 from_magic, to_magic) 1236 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1237 " 2>/dev/null" % (to_magic, kernel_part)) 1238 self.servo.system(write_cmd) 1239 1240 if self.servo.system_output(read_cmd) != to_magic: 1241 raise error.TestError("Failed to write new magic.") 1242 1243 def corrupt_usb_kernel(self, usb_dev): 1244 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1245 1246 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1247 """ 1248 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1249 self.CORRUPTED_MAGIC) 1250 1251 def restore_usb_kernel(self, usb_dev): 1252 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1253 1254 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1255 """ 1256 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1257 self.CHROMEOS_MAGIC) 1258 1259 def _call_action(self, action_tuple, check_status=False): 1260 """Call the action function with/without arguments. 1261 1262 @param action_tuple: A function, or a tuple (function, args, error_msg), 1263 in which, args and error_msg are optional. args is 1264 either a value or a tuple if multiple arguments. 1265 This can also be a list containing multiple 1266 function or tuple. In this case, these actions are 1267 called in sequence. 1268 @param check_status: Check the return value of action function. If not 1269 succeed, raises a TestFail exception. 1270 @return: The result value of the action function. 1271 @raise TestError: An error when the action function is not callable. 1272 @raise TestFail: When check_status=True, action function not succeed. 1273 """ 1274 if isinstance(action_tuple, list): 1275 return all([self._call_action(action, check_status=check_status) 1276 for action in action_tuple]) 1277 1278 action = action_tuple 1279 args = () 1280 error_msg = 'Not succeed' 1281 if isinstance(action_tuple, tuple): 1282 action = action_tuple[0] 1283 if len(action_tuple) >= 2: 1284 args = action_tuple[1] 1285 if not isinstance(args, tuple): 1286 args = (args,) 1287 if len(action_tuple) >= 3: 1288 error_msg = action_tuple[2] 1289 1290 if action is None: 1291 return 1292 1293 if not callable(action): 1294 raise error.TestError('action is not callable!') 1295 1296 info_msg = 'calling %s' % action.__name__ 1297 if args: 1298 info_msg += ' with args %s' % str(args) 1299 logging.info(info_msg) 1300 ret = action(*args) 1301 1302 if check_status and not ret: 1303 raise error.TestFail('%s: %s returning %s' % 1304 (error_msg, info_msg, str(ret))) 1305 return ret 1306 1307 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1308 run_power_action=True, post_power_action=None, 1309 shutdown_timeout=None): 1310 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1311 1312 @param shutdown_action: function which makes DUT shutdown, like 1313 pressing power key. 1314 @param pre_power_action: function which is called before next power on. 1315 @param run_power_action: power_key press by default, set to None to skip. 1316 @param post_power_action: function which is called after next power on. 1317 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1318 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1319 """ 1320 self._call_action(shutdown_action) 1321 logging.info('Wait to ensure DUT shut down...') 1322 try: 1323 if shutdown_timeout is None: 1324 shutdown_timeout = self.faft_config.shutdown_timeout 1325 self.switcher.wait_for_client(timeout=shutdown_timeout) 1326 raise error.TestFail( 1327 'Should shut the device down after calling %s.' % 1328 shutdown_action.__name__) 1329 except ConnectionError: 1330 if self.check_ec_capability(['x86'], suppress_warning=True): 1331 self.check_shutdown_power_state("G3", pwr_retries=5) 1332 logging.info( 1333 'DUT is surely shutdown. We are going to power it on again...') 1334 1335 if pre_power_action: 1336 self._call_action(pre_power_action) 1337 if run_power_action: 1338 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1339 if post_power_action: 1340 self._call_action(post_power_action) 1341 1342 def get_bootid(self, retry=3): 1343 """ 1344 Return the bootid. 1345 """ 1346 boot_id = None 1347 while retry: 1348 try: 1349 boot_id = self._client.get_boot_id() 1350 break 1351 except error.AutoservRunError: 1352 retry -= 1 1353 if retry: 1354 logging.info('Retry to get boot_id...') 1355 else: 1356 logging.warning('Failed to get boot_id.') 1357 logging.info('boot_id: %s', boot_id) 1358 return boot_id 1359 1360 def check_state(self, func): 1361 """ 1362 Wrapper around _call_action with check_status set to True. This is a 1363 helper function to be used by tests and is currently implemented by 1364 calling _call_action with check_status=True. 1365 1366 TODO: This function's arguments need to be made more stringent. And 1367 its functionality should be moved over to check functions directly in 1368 the future. 1369 1370 @param func: A function, or a tuple (function, args, error_msg), 1371 in which, args and error_msg are optional. args is 1372 either a value or a tuple if multiple arguments. 1373 This can also be a list containing multiple 1374 function or tuple. In this case, these actions are 1375 called in sequence. 1376 @return: The result value of the action function. 1377 @raise TestFail: If the function does notsucceed. 1378 """ 1379 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1380 self._call_action(func, check_status=True) 1381 logging.info("-[FAFT]-[ end state_checker ]----------------") 1382 1383 def get_current_firmware_identity(self): 1384 """Get current firmware sha and fwids of body and vblock. 1385 1386 @return: Current firmware checksums and fwids, as a dict 1387 """ 1388 1389 # TODO(dgoyette): add a way to avoid hardcoding the keys (section names) 1390 current_checksums = { 1391 'VBOOTA': self.faft_client.bios.get_sig_sha('a'), 1392 'FVMAINA': self.faft_client.bios.get_body_sha('a'), 1393 'VBOOTB': self.faft_client.bios.get_sig_sha('b'), 1394 'FVMAINB': self.faft_client.bios.get_body_sha('b'), 1395 } 1396 if not all(current_checksums.values()): 1397 raise error.TestError( 1398 'Failed to get firmware sha: %s', current_checksums) 1399 1400 current_fwids = { 1401 'RO_FRID': self.faft_client.bios.get_section_fwid('ro'), 1402 'RW_FWID_A': self.faft_client.bios.get_section_fwid('a'), 1403 'RW_FWID_B': self.faft_client.bios.get_section_fwid('b'), 1404 } 1405 if not all(current_fwids.values()): 1406 raise error.TestError( 1407 'Failed to get firmware fwid(s): %s', current_fwids) 1408 1409 identifying_info = dict(current_fwids) 1410 identifying_info.update(current_checksums) 1411 return identifying_info 1412 1413 def is_firmware_changed(self): 1414 """Check if the current firmware changed, by comparing its SHA and fwid. 1415 1416 @return: True if it is changed, otherwise False. 1417 """ 1418 # Device may not be rebooted after test. 1419 self.faft_client.bios.reload() 1420 1421 current_info = self.get_current_firmware_identity() 1422 prev_info = self._backup_firmware_identity 1423 1424 if current_info == prev_info: 1425 return False 1426 else: 1427 changed = set() 1428 for section in set(current_info.keys()) | set(prev_info.keys()): 1429 if current_info.get(section) != prev_info.get(section): 1430 changed.add(section) 1431 1432 logging.info('Firmware changed: %s', ', '.join(sorted(changed))) 1433 return True 1434 1435 def backup_firmware(self, suffix='.original'): 1436 """Backup firmware to file, and then send it to host. 1437 1438 @param suffix: a string appended to backup file name 1439 """ 1440 remote_temp_dir = self.faft_client.system.create_temp_dir() 1441 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1442 self.faft_client.bios.dump_whole(remote_bios_path) 1443 self._client.get_file(remote_bios_path, 1444 os.path.join(self.resultsdir, 'bios' + suffix)) 1445 1446 if self.faft_config.chrome_ec: 1447 remote_ec_path = os.path.join(remote_temp_dir, 'ec') 1448 self.faft_client.ec.dump_whole(remote_ec_path) 1449 self._client.get_file(remote_ec_path, 1450 os.path.join(self.resultsdir, 'ec' + suffix)) 1451 1452 self._client.run('rm -rf %s' % remote_temp_dir) 1453 logging.info('Backup firmware stored in %s with suffix %s', 1454 self.resultsdir, suffix) 1455 1456 self._backup_firmware_identity = self.get_current_firmware_identity() 1457 1458 def is_firmware_saved(self): 1459 """Check if a firmware saved (called backup_firmware before). 1460 1461 @return: True if the firmware is backed up; otherwise False. 1462 """ 1463 return bool(self._backup_firmware_identity) 1464 1465 def clear_saved_firmware(self): 1466 """Clear the firmware saved by the method backup_firmware.""" 1467 self._backup_firmware_identity = {} 1468 1469 def restore_firmware(self, suffix='.original', restore_ec=True): 1470 """Restore firmware from host in resultsdir. 1471 1472 @param suffix: a string appended to backup file name 1473 @param restore_ec: True to restore the ec firmware; False not to do. 1474 @return: True if firmware needed to be restored 1475 """ 1476 if not self.is_firmware_changed(): 1477 return False 1478 1479 # Backup current corrupted firmware. 1480 self.backup_firmware(suffix='.corrupt') 1481 1482 # Restore firmware. 1483 remote_temp_dir = self.faft_client.system.create_temp_dir() 1484 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix), 1485 os.path.join(remote_temp_dir, 'bios')) 1486 1487 self.faft_client.bios.write_whole( 1488 os.path.join(remote_temp_dir, 'bios')) 1489 1490 if self.faft_config.chrome_ec and restore_ec: 1491 self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix), 1492 os.path.join(remote_temp_dir, 'ec')) 1493 self.faft_client.ec.write_whole( 1494 os.path.join(remote_temp_dir, 'ec')) 1495 1496 self.switcher.mode_aware_reboot() 1497 logging.info('Successfully restored firmware.') 1498 return True 1499 1500 def setup_firmwareupdate_shellball(self, shellball=None): 1501 """Setup a shellball to use in firmware update test. 1502 1503 Check if there is a given shellball, and it is a shell script. Then, 1504 send it to the remote host. Otherwise, use the 1505 /usr/sbin/chromeos-firmwareupdate in the image and replace its inside 1506 BIOS and EC images with the active firmware images. 1507 1508 @param shellball: path of a shellball or default to None. 1509 """ 1510 if shellball: 1511 # Determine the firmware file is a shellball or a raw binary. 1512 is_shellball = (utils.system_output("file %s" % shellball).find( 1513 "shell script") != -1) 1514 if is_shellball: 1515 logging.info('Device will update firmware with shellball %s', 1516 shellball) 1517 temp_path = self.faft_client.updater.get_temp_path() 1518 working_shellball = os.path.join(temp_path, 1519 'chromeos-firmwareupdate') 1520 self._client.send_file(shellball, working_shellball) 1521 self.faft_client.updater.extract_shellball() 1522 else: 1523 raise error.TestFail( 1524 'The given shellball is not a shell script.') 1525 else: 1526 logging.info('No shellball given, use the original shellball and ' 1527 'replace its BIOS and EC images.') 1528 work_path = self.faft_client.updater.get_work_path() 1529 bios_in_work_path = os.path.join( 1530 work_path, self.faft_client.updater.get_bios_relative_path()) 1531 ec_in_work_path = os.path.join( 1532 work_path, self.faft_client.updater.get_ec_relative_path()) 1533 logging.info('Writing current BIOS to: %s', bios_in_work_path) 1534 self.faft_client.bios.dump_whole(bios_in_work_path) 1535 if self.faft_config.chrome_ec: 1536 logging.info('Writing current EC to: %s', ec_in_work_path) 1537 self.faft_client.ec.dump_firmware(ec_in_work_path) 1538 self.faft_client.updater.repack_shellball() 1539 1540 def is_kernel_changed(self): 1541 """Check if the current kernel is changed, by comparing its SHA1 hash. 1542 1543 @return: True if it is changed; otherwise, False. 1544 """ 1545 changed = False 1546 for p in ('A', 'B'): 1547 backup_sha = self._backup_kernel_sha.get(p, None) 1548 current_sha = self.faft_client.kernel.get_sha(p) 1549 if backup_sha != current_sha: 1550 changed = True 1551 logging.info('Kernel %s is changed', p) 1552 return changed 1553 1554 def backup_kernel(self, suffix='.original'): 1555 """Backup kernel to files, and the send them to host. 1556 1557 @param suffix: a string appended to backup file name. 1558 """ 1559 remote_temp_dir = self.faft_client.system.create_temp_dir() 1560 for p in ('A', 'B'): 1561 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1562 self.faft_client.kernel.dump(p, remote_path) 1563 self._client.get_file( 1564 remote_path, 1565 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1566 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1567 logging.info('Backup kernel stored in %s with suffix %s', 1568 self.resultsdir, suffix) 1569 1570 def is_kernel_saved(self): 1571 """Check if kernel images are saved (backup_kernel called before). 1572 1573 @return: True if the kernel is saved; otherwise, False. 1574 """ 1575 return len(self._backup_kernel_sha) != 0 1576 1577 def clear_saved_kernel(self): 1578 """Clear the kernel saved by backup_kernel().""" 1579 self._backup_kernel_sha = dict() 1580 1581 def restore_kernel(self, suffix='.original'): 1582 """Restore kernel from host in resultsdir. 1583 1584 @param suffix: a string appended to backup file name. 1585 """ 1586 if not self.is_kernel_changed(): 1587 return 1588 1589 # Backup current corrupted kernel. 1590 self.backup_kernel(suffix='.corrupt') 1591 1592 # Restore kernel. 1593 remote_temp_dir = self.faft_client.system.create_temp_dir() 1594 for p in ('A', 'B'): 1595 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1596 self._client.send_file( 1597 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1598 remote_path) 1599 self.faft_client.kernel.write(p, remote_path) 1600 1601 self.switcher.mode_aware_reboot() 1602 logging.info('Successfully restored kernel.') 1603 1604 def backup_cgpt_attributes(self): 1605 """Backup CGPT partition table attributes.""" 1606 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1607 1608 def restore_cgpt_attributes(self): 1609 """Restore CGPT partition table attributes.""" 1610 current_table = self.faft_client.cgpt.get_attributes() 1611 if current_table == self._backup_cgpt_attr: 1612 return 1613 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1614 self._backup_cgpt_attr, 1615 current_table) 1616 self.faft_client.cgpt.set_attributes( 1617 self._backup_cgpt_attr['A'], self._backup_cgpt_attr['B']) 1618 1619 self.switcher.mode_aware_reboot() 1620 logging.info('Successfully restored CGPT table.') 1621 1622 def try_fwb(self, count=0): 1623 """set to try booting FWB count # times 1624 1625 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 1626 vboot2 1627 1628 @param count: an integer specifying value to program into 1629 fwb_tries(vb1)/fw_try_next(vb2) 1630 """ 1631 if self.fw_vboot2: 1632 self.faft_client.system.set_fw_try_next('B', count) 1633 else: 1634 # vboot1: we need to boot into fwb at least once 1635 if not count: 1636 count = count + 1 1637 self.faft_client.system.set_try_fw_b(count) 1638 1639 def identify_shellball(self, include_ec=None): 1640 """Get the FWIDs of all targets and sections in the shellball 1641 1642 @param include_ec: if True, get EC fwids. 1643 If None (default), assume True if board has an EC 1644 @return: the dict of versions in the shellball 1645 """ 1646 fwids = dict() 1647 fwids['bios'] = self.faft_client.updater.get_all_fwids('bios') 1648 1649 if include_ec is None: 1650 if self.faft_config.platform == 'Samus': 1651 include_ec = False # no ec.bin in shellball 1652 else: 1653 include_ec = self.faft_config.chrome_ec 1654 1655 if include_ec: 1656 fwids['ec'] = self.faft_client.updater.get_all_fwids('ec') 1657 return fwids 1658 1659 def modify_shellball(self, append, modify_ro=True, modify_ec=False): 1660 """Modify the FWIDs of targets and sections in the shellball 1661 1662 @return: the full path of the shellball 1663 """ 1664 1665 if modify_ro: 1666 self.faft_client.updater.modify_fwids('bios', ['ro', 'a', 'b']) 1667 else: 1668 self.faft_client.updater.modify_fwids('bios', ['a', 'b']) 1669 1670 if modify_ec: 1671 if modify_ro: 1672 self.faft_client.updater.modify_fwids('ec', ['ro', 'rw']) 1673 else: 1674 self.faft_client.updater.modify_fwids('ec', ['rw']) 1675 1676 modded_shellball = self.faft_client.updater.repack_shellball(append) 1677 1678 return modded_shellball 1679 1680 @staticmethod 1681 def check_fwids_written(before_fwids, image_fwids, after_fwids, 1682 expected_written): 1683 """Check the dicts of fwids for correctness after an update is applied. 1684 1685 The targets checked come from the keys of expected_written. 1686 The sections checked come from the inner dicts of the fwids parameters. 1687 1688 The fwids should be keyed by target (flash type), then by section: 1689 {'bios': {'ro': '<fwid>', 'a': '<fwid>', 'b': '<fwid>'}, 1690 'ec': {'ro': '<fwid>', 'rw': '<fwid>'} 1691 1692 For expected_written, the dict should be keyed by flash type only: 1693 {'bios': ['ro'], 'ec': ['ro', 'rw']} 1694 1695 @param before_fwids: dict of versions from before the update 1696 @param image_fwids: dict of versions in the update 1697 @param after_fwids: dict of actual versions after the update 1698 @param expected_written: dict indicating which ones should have changed 1699 @return: list of error lines for mismatches 1700 1701 @type before_fwids: dict 1702 @type image_fwids: dict 1703 @type after_fwids: dict 1704 @type expected_written: dict 1705 @rtype: list 1706 """ 1707 errors = [] 1708 1709 for target in sorted(expected_written.keys()): 1710 # target is BIOS or EC 1711 1712 before_missing = (target not in before_fwids) 1713 after_missing = (target not in after_fwids) 1714 if before_missing or after_missing: 1715 if before_missing: 1716 errors.append("...no before_fwids[%s]" % target) 1717 if after_missing: 1718 errors.append("...no after_fwids[%s]" % target) 1719 continue 1720 1721 written_sections = expected_written.get(target) or list() 1722 written_sections = set(written_sections) 1723 1724 before_sections = set(before_fwids.get(target) or dict()) 1725 image_sections = set(image_fwids.get(target) or dict()) 1726 after_sections = set(after_fwids.get(target) or dict()) 1727 1728 for section in before_sections | image_sections | after_sections: 1729 # section is RO, RW, A, or B 1730 1731 before_fwid = before_fwids[target][section] 1732 image_fwid = image_fwids[target][section] 1733 actual_fwid = after_fwids[target][section] 1734 1735 if section in written_sections: 1736 expected_fwid = image_fwid 1737 expected_desc = 'rewritten fwid (%s)' % expected_fwid 1738 if image_fwid == before_fwid: 1739 expected_desc = ('rewritten (no changes) fwid (%s)' % 1740 expected_fwid) 1741 else: 1742 expected_fwid = before_fwid 1743 expected_desc = 'original fwid (%s)' % expected_fwid 1744 1745 if actual_fwid == expected_fwid: 1746 actual_desc = 'correct value' 1747 1748 elif actual_fwid == image_fwid: 1749 actual_desc = 'rewritten fwid (%s)' % actual_fwid 1750 if image_fwid == before_fwid: 1751 # The flash could have been rewritten with the same fwid 1752 actual_desc = 'possibly written fwid (%s)' % actual_fwid 1753 1754 elif actual_fwid == before_fwid: 1755 actual_desc = 'original fwid (%s)' % actual_fwid 1756 1757 else: 1758 actual_desc = 'unknown fwid (%s)' % actual_fwid 1759 1760 msg = ("...FWID (%s %s): expected %s, got %s" % 1761 (target.upper(), section.upper(), 1762 expected_desc, actual_desc)) 1763 1764 if actual_fwid != expected_fwid: 1765 errors.append(msg) 1766 return errors 1767 1768 1769 def fwmp_is_cleared(self): 1770 """Return True if the FWMP has been created""" 1771 res = self.host.run('cryptohome ' 1772 '--action=get_firmware_management_parameters', 1773 ignore_status=True) 1774 if res.exit_status and res.exit_status != self.FWMP_CLEARED_EXIT_STATUS: 1775 raise error.TestError('Could not run cryptohome command %r' % res) 1776 return self.FWMP_CLEARED_ERROR_MSG in res.stdout 1777 1778 1779 def _tpm_is_owned(self): 1780 """Returns True if the tpm is owned""" 1781 result = self.host.run('cryptohome --action=tpm_more_status', 1782 ignore_status=True) 1783 logging.debug(result) 1784 return result.exit_status == 0 and 'owned: true' in result.stdout 1785 1786 def clear_fwmp(self): 1787 """Clear the FWMP""" 1788 if self.fwmp_is_cleared(): 1789 return 1790 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 1791 self.host.run('cryptohome --action=tpm_take_ownership') 1792 if not utils.wait_for_value(self._tpm_is_owned, expected_value=True): 1793 raise error.TestError('Unable to own tpm while clearing fwmp.') 1794 self.host.run('cryptohome ' 1795 '--action=remove_firmware_management_parameters') 1796