1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import ctypes 7import logging 8import os 9import pprint 10import re 11import time 12import uuid 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error 16from autotest_lib.server import test 17from autotest_lib.server.cros import vboot_constants as vboot 18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig 19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 20from autotest_lib.server.cros.faft.utils import mode_switcher 21from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 22from autotest_lib.server.cros.servo import chrome_base_ec 23from autotest_lib.server.cros.servo import chrome_cr50 24from autotest_lib.server.cros.servo import chrome_ec 25 26ConnectionError = mode_switcher.ConnectionError 27 28 29class FAFTBase(test.test): 30 """The base class of FAFT classes. 31 32 It launches the FAFTClient on DUT, such that the test can access its 33 firmware functions and interfaces. It also provides some methods to 34 handle the reboot mechanism, in order to ensure FAFTClient is still 35 connected after reboot. 36 """ 37 def initialize(self, host): 38 """Create a FAFTClient object and install the dependency.""" 39 self.servo = host.servo 40 self.servo.initialize_dut() 41 self._client = host 42 self.faft_client = RPCProxy(host) 43 self.lockfile = '/var/tmp/faft/lock' 44 45 46class FirmwareTest(FAFTBase): 47 """ 48 Base class that sets up helper objects/functions for firmware tests. 49 50 TODO: add documentaion as the FAFT rework progresses. 51 """ 52 version = 1 53 54 # Mapping of partition number of kernel and rootfs. 55 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 56 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 57 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 58 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 59 60 CHROMEOS_MAGIC = "CHROMEOS" 61 CORRUPTED_MAGIC = "CORRUPTD" 62 63 # Delay for waiting client to return before EC suspend 64 EC_SUSPEND_DELAY = 5 65 66 # Delay between EC suspend and wake 67 WAKE_DELAY = 10 68 69 # Delay between closing and opening lid 70 LID_DELAY = 1 71 72 _SERVOD_LOG = '/var/log/servod.log' 73 74 _ROOTFS_PARTITION_NUMBER = 3 75 76 _backup_firmware_sha = () 77 _backup_kernel_sha = dict() 78 _backup_cgpt_attr = dict() 79 _backup_gbb_flags = None 80 _backup_dev_mode = None 81 82 # Class level variable, keep track the states of one time setup. 83 # This variable is preserved across tests which inherit this class. 84 _global_setup_done = { 85 'gbb_flags': False, 86 'reimage': False, 87 'usb_check': False, 88 } 89 90 @classmethod 91 def check_setup_done(cls, label): 92 """Check if the given setup is done. 93 94 @param label: The label of the setup. 95 """ 96 return cls._global_setup_done[label] 97 98 @classmethod 99 def mark_setup_done(cls, label): 100 """Mark the given setup done. 101 102 @param label: The label of the setup. 103 """ 104 cls._global_setup_done[label] = True 105 106 @classmethod 107 def unmark_setup_done(cls, label): 108 """Mark the given setup not done. 109 110 @param label: The label of the setup. 111 """ 112 cls._global_setup_done[label] = False 113 114 def initialize(self, host, cmdline_args, ec_wp=None): 115 super(FirmwareTest, self).initialize(host) 116 self.run_id = str(uuid.uuid4()) 117 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 118 # Parse arguments from command line 119 args = {} 120 self.power_control = host.POWER_CONTROL_RPM 121 for arg in cmdline_args: 122 match = re.search("^(\w+)=(.+)", arg) 123 if match: 124 args[match.group(1)] = match.group(2) 125 if 'power_control' in args: 126 self.power_control = args['power_control'] 127 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 128 raise error.TestError('Valid values for --args=power_control ' 129 'are %s. But you entered wrong argument ' 130 'as "%s".' 131 % (host.POWER_CONTROL_VALID_ARGS, 132 self.power_control)) 133 134 if not self.faft_client.system.dev_tpm_present(): 135 raise error.TestError('/dev/tpm0 does not exist on the client') 136 137 self.faft_config = FAFTConfig( 138 self.faft_client.system.get_platform_name()) 139 self.checkers = FAFTCheckers(self) 140 self.switcher = mode_switcher.create_mode_switcher(self) 141 142 if self.faft_config.chrome_ec: 143 self.ec = chrome_ec.ChromeEC(self.servo) 144 # Check for presence of a USBPD console 145 if self.faft_config.chrome_usbpd: 146 self.usbpd = chrome_ec.ChromeUSBPD(self.servo) 147 elif self.faft_config.chrome_ec: 148 # If no separate USBPD console, then PD exists on EC console 149 self.usbpd = self.ec 150 # Get plankton console 151 self.plankton = host.plankton 152 self.plankton_host = host._plankton_host 153 154 # Create the BaseEC object. None if not available. 155 self.base_ec = chrome_base_ec.create_base_ec(self.servo) 156 157 self._setup_uart_capture() 158 self._setup_servo_log() 159 self._record_system_info() 160 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 161 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 162 if self.fw_vboot2: 163 self.faft_client.system.set_fw_try_next('A') 164 if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B': 165 logging.info('mainfw_act is B. rebooting to set it A') 166 self.switcher.mode_aware_reboot() 167 self._setup_gbb_flags() 168 self.faft_client.updater.stop_daemon() 169 self._create_faft_lockfile() 170 self._setup_ec_write_protect(ec_wp) 171 # See chromium:239034 regarding needing this sync. 172 self.blocking_sync() 173 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 174 175 def cleanup(self): 176 """Autotest cleanup function.""" 177 # Unset state checker in case it's set by subclass 178 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 179 try: 180 self.faft_client.system.is_available() 181 except: 182 # Remote is not responding. Revive DUT so that subsequent tests 183 # don't fail. 184 self._restore_routine_from_timeout() 185 self.switcher.restore_mode() 186 self._restore_ec_write_protect() 187 self._restore_gbb_flags() 188 self.faft_client.updater.start_daemon() 189 self._remove_faft_lockfile() 190 self._record_servo_log() 191 self._record_faft_client_log() 192 self._cleanup_uart_capture() 193 super(FirmwareTest, self).cleanup() 194 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 195 196 def _record_system_info(self): 197 """Record some critical system info to the attr keyval. 198 199 This info is used by generate_test_report later. 200 """ 201 system_info = { 202 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 203 'ec_version': self.faft_client.ec.get_version(), 204 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 205 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 206 'servod_version': self._client._servo_host.run( 207 'servod --version').stdout.strip(), 208 } 209 210 if hasattr(self, 'cr50'): 211 system_info['cr50_version'] = self.servo.get('cr50_version') 212 213 logging.info('System info:\n' + pprint.pformat(system_info)) 214 self.write_attr_keyval(system_info) 215 216 def invalidate_firmware_setup(self): 217 """Invalidate all firmware related setup state. 218 219 This method is called when the firmware is re-flashed. It resets all 220 firmware related setup states so that the next test setup properly 221 again. 222 """ 223 self.unmark_setup_done('gbb_flags') 224 225 def _retrieve_recovery_reason_from_trap(self): 226 """Try to retrieve the recovery reason from a trapped recovery screen. 227 228 @return: The recovery_reason, 0 if any error. 229 """ 230 recovery_reason = 0 231 logging.info('Try to retrieve recovery reason...') 232 if self.servo.get_usbkey_direction() == 'dut': 233 self.switcher.bypass_rec_mode() 234 else: 235 self.servo.switch_usbkey('dut') 236 237 try: 238 self.switcher.wait_for_client() 239 lines = self.faft_client.system.run_shell_command_get_output( 240 'crossystem recovery_reason') 241 recovery_reason = int(lines[0]) 242 logging.info('Got the recovery reason %d.', recovery_reason) 243 except ConnectionError: 244 logging.error('Failed to get the recovery reason due to connection ' 245 'error.') 246 return recovery_reason 247 248 def _reset_client(self): 249 """Reset client to a workable state. 250 251 This method is called when the client is not responsive. It may be 252 caused by the following cases: 253 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 254 - corrupted firmware; 255 - corrutped OS image. 256 """ 257 # DUT may halt on a firmware screen. Try cold reboot. 258 logging.info('Try cold reboot...') 259 self.switcher.mode_aware_reboot(reboot_type='cold', 260 sync_before_boot=False, 261 wait_for_dut_up=False) 262 self.switcher.wait_for_client_offline() 263 self.switcher.bypass_dev_mode() 264 try: 265 self.switcher.wait_for_client() 266 return 267 except ConnectionError: 268 logging.warn('Cold reboot doesn\'t help, still connection error.') 269 270 # DUT may be broken by a corrupted firmware. Restore firmware. 271 # We assume the recovery boot still works fine. Since the recovery 272 # code is in RO region and all FAFT tests don't change the RO region 273 # except GBB. 274 if self.is_firmware_saved(): 275 self._ensure_client_in_recovery() 276 logging.info('Try restore the original firmware...') 277 if self.is_firmware_changed(): 278 try: 279 self.restore_firmware() 280 return 281 except ConnectionError: 282 logging.warn('Restoring firmware doesn\'t help, still ' 283 'connection error.') 284 285 # Perhaps it's kernel that's broken. Let's try restoring it. 286 if self.is_kernel_saved(): 287 self._ensure_client_in_recovery() 288 logging.info('Try restore the original kernel...') 289 if self.is_kernel_changed(): 290 try: 291 self.restore_kernel() 292 return 293 except ConnectionError: 294 logging.warn('Restoring kernel doesn\'t help, still ' 295 'connection error.') 296 297 # DUT may be broken by a corrupted OS image. Restore OS image. 298 self._ensure_client_in_recovery() 299 logging.info('Try restore the OS image...') 300 self.faft_client.system.run_shell_command('chromeos-install --yes') 301 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 302 self.switcher.wait_for_client_offline() 303 self.switcher.bypass_dev_mode() 304 try: 305 self.switcher.wait_for_client() 306 logging.info('Successfully restore OS image.') 307 return 308 except ConnectionError: 309 logging.warn('Restoring OS image doesn\'t help, still connection ' 310 'error.') 311 312 def _ensure_client_in_recovery(self): 313 """Ensure client in recovery boot; reboot into it if necessary. 314 315 @raise TestError: if failed to boot the USB image. 316 """ 317 logging.info('Try boot into USB image...') 318 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 319 wait_for_dut_up=False) 320 self.servo.switch_usbkey('host') 321 self.switcher.bypass_rec_mode() 322 try: 323 self.switcher.wait_for_client() 324 except ConnectionError: 325 raise error.TestError('Failed to boot the USB image.') 326 327 def _restore_routine_from_timeout(self): 328 """A routine to try to restore the system from a timeout error. 329 330 This method is called when FAFT failed to connect DUT after reboot. 331 332 @raise TestFail: This exception is already raised, with a decription 333 why it failed. 334 """ 335 # DUT is disconnected. Capture the UART output for debug. 336 self._record_uart_capture() 337 338 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 339 # identify if it is a network flaky. 340 341 recovery_reason = self._retrieve_recovery_reason_from_trap() 342 343 # Reset client to a workable state. 344 self._reset_client() 345 346 # Raise the proper TestFail exception. 347 if recovery_reason: 348 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 349 'and timed out' % recovery_reason) 350 else: 351 raise error.TestFail('Timed out waiting for DUT reboot') 352 353 def assert_test_image_in_usb_disk(self, usb_dev=None): 354 """Assert an USB disk plugged-in on servo and a test image inside. 355 356 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 357 If None, it is detected automatically. 358 @raise TestError: if USB disk not detected or not a test image. 359 """ 360 if self.check_setup_done('usb_check'): 361 return 362 if usb_dev: 363 assert self.servo.get_usbkey_direction() == 'host' 364 else: 365 self.servo.switch_usbkey('host') 366 usb_dev = self.servo.probe_host_usb_dev() 367 if not usb_dev: 368 raise error.TestError( 369 'An USB disk should be plugged in the servo board.') 370 371 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 372 logging.info('usb dev is %s', usb_dev) 373 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 374 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 375 376 try: 377 usb_lsb = self.servo.system_output('cat %s' % 378 os.path.join(tmpd, 'etc/lsb-release')) 379 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 380 dut_lsb = '\n'.join(self.faft_client.system. 381 run_shell_command_get_output('cat /etc/lsb-release')) 382 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 383 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 384 raise error.TestError('USB stick in servo is no test image') 385 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 386 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 387 if usb_board != dut_board: 388 raise error.TestError('USB stick in servo contains a %s ' 389 'image, but DUT is a %s' % (usb_board, dut_board)) 390 finally: 391 for cmd in ('umount -l %s' % rootfs, 'sync', 'rm -rf %s' % tmpd): 392 self.servo.system(cmd) 393 394 self.mark_setup_done('usb_check') 395 396 def setup_usbkey(self, usbkey, host=None): 397 """Setup the USB disk for the test. 398 399 It checks the setup of USB disk and a valid ChromeOS test image inside. 400 It also muxes the USB disk to either the host or DUT by request. 401 402 @param usbkey: True if the USB disk is required for the test, False if 403 not required. 404 @param host: Optional, True to mux the USB disk to host, False to mux it 405 to DUT, default to do nothing. 406 """ 407 if usbkey: 408 self.assert_test_image_in_usb_disk() 409 elif host is None: 410 # USB disk is not required for the test. Better to mux it to host. 411 host = True 412 413 if host is True: 414 self.servo.switch_usbkey('host') 415 elif host is False: 416 self.servo.switch_usbkey('dut') 417 418 def get_usbdisk_path_on_dut(self): 419 """Get the path of the USB disk device plugged-in the servo on DUT. 420 421 Returns: 422 A string representing USB disk path, like '/dev/sdb', or None if 423 no USB disk is found. 424 """ 425 cmd = 'ls -d /dev/s*[a-z]' 426 original_value = self.servo.get_usbkey_direction() 427 428 # Make the dut unable to see the USB disk. 429 self.servo.switch_usbkey('off') 430 no_usb_set = set( 431 self.faft_client.system.run_shell_command_get_output(cmd)) 432 433 # Make the dut able to see the USB disk. 434 self.servo.switch_usbkey('dut') 435 time.sleep(self.faft_config.usb_plug) 436 has_usb_set = set( 437 self.faft_client.system.run_shell_command_get_output(cmd)) 438 439 # Back to its original value. 440 if original_value != self.servo.get_usbkey_direction(): 441 self.servo.switch_usbkey(original_value) 442 443 diff_set = has_usb_set - no_usb_set 444 if len(diff_set) == 1: 445 return diff_set.pop() 446 else: 447 return None 448 449 def _create_faft_lockfile(self): 450 """Creates the FAFT lockfile.""" 451 logging.info('Creating FAFT lockfile...') 452 command = 'touch %s' % (self.lockfile) 453 self.faft_client.system.run_shell_command(command) 454 455 def _remove_faft_lockfile(self): 456 """Removes the FAFT lockfile.""" 457 logging.info('Removing FAFT lockfile...') 458 command = 'rm -f %s' % (self.lockfile) 459 self.faft_client.system.run_shell_command(command) 460 461 def clear_set_gbb_flags(self, clear_mask, set_mask): 462 """Clear and set the GBB flags in the current flashrom. 463 464 @param clear_mask: A mask of flags to be cleared. 465 @param set_mask: A mask of flags to be set. 466 """ 467 gbb_flags = self.faft_client.bios.get_gbb_flags() 468 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 469 if new_flags != gbb_flags: 470 self._backup_gbb_flags = gbb_flags 471 logging.info('Changing GBB flags from 0x%x to 0x%x.', 472 gbb_flags, new_flags) 473 self.faft_client.bios.set_gbb_flags(new_flags) 474 # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag, 475 # reboot to get a clear state 476 if ((gbb_flags ^ new_flags) & 477 (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 478 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)): 479 self.switcher.mode_aware_reboot() 480 else: 481 logging.info('Current GBB flags look good for test: 0x%x.', 482 gbb_flags) 483 484 def check_ec_capability(self, required_cap=None, suppress_warning=False): 485 """Check if current platform has required EC capabilities. 486 487 @param required_cap: A list containing required EC capabilities. Pass in 488 None to only check for presence of Chrome EC. 489 @param suppress_warning: True to suppress any warning messages. 490 @return: True if requirements are met. Otherwise, False. 491 """ 492 if not self.faft_config.chrome_ec: 493 if not suppress_warning: 494 logging.warn('Requires Chrome EC to run this test.') 495 return False 496 497 if not required_cap: 498 return True 499 500 for cap in required_cap: 501 if cap not in self.faft_config.ec_capability: 502 if not suppress_warning: 503 logging.warn('Requires EC capability "%s" to run this ' 504 'test.', cap) 505 return False 506 507 return True 508 509 def check_root_part_on_non_recovery(self, part): 510 """Check the partition number of root device and on normal/dev boot. 511 512 @param part: A string of partition number, e.g.'3'. 513 @return: True if the root device matched and on normal/dev boot; 514 otherwise, False. 515 """ 516 return self.checkers.root_part_checker(part) and \ 517 self.checkers.crossystem_checker({ 518 'mainfw_type': ('normal', 'developer'), 519 }) 520 521 def _join_part(self, dev, part): 522 """Return a concatenated string of device and partition number. 523 524 @param dev: A string of device, e.g.'/dev/sda'. 525 @param part: A string of partition number, e.g.'3'. 526 @return: A concatenated string of device and partition number, 527 e.g.'/dev/sda3'. 528 529 >>> seq = FirmwareTest() 530 >>> seq._join_part('/dev/sda', '3') 531 '/dev/sda3' 532 >>> seq._join_part('/dev/mmcblk0', '2') 533 '/dev/mmcblk0p2' 534 """ 535 if 'mmcblk' in dev: 536 return dev + 'p' + part 537 elif 'nvme' in dev: 538 return dev + 'p' + part 539 else: 540 return dev + part 541 542 def copy_kernel_and_rootfs(self, from_part, to_part): 543 """Copy kernel and rootfs from from_part to to_part. 544 545 @param from_part: A string of partition number to be copied from. 546 @param to_part: A string of partition number to be copied to. 547 """ 548 root_dev = self.faft_client.system.get_root_dev() 549 logging.info('Copying kernel from %s to %s. Please wait...', 550 from_part, to_part) 551 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 552 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 553 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 554 logging.info('Copying rootfs from %s to %s. Please wait...', 555 from_part, to_part) 556 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 557 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 558 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 559 560 def ensure_kernel_boot(self, part): 561 """Ensure the request kernel boot. 562 563 If not, it duplicates the current kernel to the requested kernel 564 and sets the requested higher priority to ensure it boot. 565 566 @param part: A string of kernel partition number or 'a'/'b'. 567 """ 568 if not self.checkers.root_part_checker(part): 569 if self.faft_client.kernel.diff_a_b(): 570 self.copy_kernel_and_rootfs( 571 from_part=self.OTHER_KERNEL_MAP[part], 572 to_part=part) 573 self.reset_and_prioritize_kernel(part) 574 self.switcher.mode_aware_reboot() 575 576 def ensure_dev_internal_boot(self, original_dev_boot_usb): 577 """Ensure internal device boot in developer mode. 578 579 If not internal device boot, it will try to reboot the device and 580 bypass dev mode to boot into internal device. 581 582 @param original_dev_boot_usb: Original dev_boot_usb value. 583 """ 584 logging.info('Checking internal device boot.') 585 if self.faft_client.system.is_removable_device_boot(): 586 logging.info('Reboot into internal disk...') 587 self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb) 588 self.switcher.mode_aware_reboot() 589 self.check_state((self.checkers.dev_boot_usb_checker, False, 590 'Device not booted from internal disk properly.')) 591 592 def set_hardware_write_protect(self, enable): 593 """Set hardware write protect pin. 594 595 @param enable: True if asserting write protect pin. Otherwise, False. 596 """ 597 try: 598 self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off') 599 except: 600 # TODO(waihong): Remove this fallback when all servos have the 601 # above new fw_wp_state control. 602 self.servo.set('fw_wp_vref', self.faft_config.wp_voltage) 603 self.servo.set('fw_wp_en', 'on') 604 self.servo.set('fw_wp', 'on' if enable else 'off') 605 606 def set_ec_write_protect_and_reboot(self, enable): 607 """Set EC write protect status and reboot to take effect. 608 609 The write protect state is only activated if both hardware write 610 protect pin is asserted and software write protect flag is set. 611 This method asserts/deasserts hardware write protect pin first, and 612 set corresponding EC software write protect flag. 613 614 If the device uses non-Chrome EC, set the software write protect via 615 flashrom. 616 617 If the device uses Chrome EC, a reboot is required for write protect 618 to take effect. Since the software write protect flag cannot be unset 619 if hardware write protect pin is asserted, we need to deasserted the 620 pin first if we are deactivating write protect. Similarly, a reboot 621 is required before we can modify the software flag. 622 623 @param enable: True if activating EC write protect. Otherwise, False. 624 """ 625 self.set_hardware_write_protect(enable) 626 if self.faft_config.chrome_ec: 627 self.set_chrome_ec_write_protect_and_reboot(enable) 628 else: 629 self.faft_client.ec.set_write_protect(enable) 630 self.switcher.mode_aware_reboot() 631 632 def set_chrome_ec_write_protect_and_reboot(self, enable): 633 """Set Chrome EC write protect status and reboot to take effect. 634 635 @param enable: True if activating EC write protect. Otherwise, False. 636 """ 637 if enable: 638 # Set write protect flag and reboot to take effect. 639 self.ec.set_flash_write_protect(enable) 640 self.sync_and_ec_reboot() 641 else: 642 # Reboot after deasserting hardware write protect pin to deactivate 643 # write protect. And then remove software write protect flag. 644 self.sync_and_ec_reboot() 645 self.ec.set_flash_write_protect(enable) 646 647 def _setup_ec_write_protect(self, ec_wp): 648 """Setup for EC write-protection. 649 650 It makes sure the EC in the requested write-protection state. If not, it 651 flips the state. Flipping the write-protection requires DUT reboot. 652 653 @param ec_wp: True to request EC write-protected; False to request EC 654 not write-protected; None to do nothing. 655 """ 656 if ec_wp is None: 657 self._old_ec_wp = None 658 return 659 self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'}) 660 if ec_wp != self._old_ec_wp: 661 logging.info('The test required EC is %swrite-protected. Reboot ' 662 'and flip the state.', '' if ec_wp else 'not ') 663 self.switcher.mode_aware_reboot( 664 'custom', 665 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 666 667 def _restore_ec_write_protect(self): 668 """Restore the original EC write-protection.""" 669 if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None): 670 return 671 if not self.checkers.crossystem_checker( 672 {'wpsw_boot': '1' if self._old_ec_wp else '0'}): 673 logging.info('Restore original EC write protection and reboot.') 674 self.switcher.mode_aware_reboot( 675 'custom', 676 lambda:self.set_ec_write_protect_and_reboot( 677 self._old_ec_wp)) 678 679 def _setup_uart_capture(self): 680 """Setup the CPU/EC/PD UART capture.""" 681 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt') 682 self.servo.set('cpu_uart_capture', 'on') 683 self.cr50_uart_file = None 684 self.ec_uart_file = None 685 self.usbpd_uart_file = None 686 try: 687 # Check that the console works before declaring the cr50 console 688 # connection exists and enabling uart capture. 689 self.servo.get('cr50_version') 690 self.servo.set('cr50_uart_capture', 'on') 691 self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt') 692 self.cr50 = chrome_cr50.ChromeCr50(self.servo) 693 except error.TestFail as e: 694 if 'No control named' in str(e): 695 logging.warn('cr50 console not supported.') 696 if self.faft_config.chrome_ec: 697 try: 698 self.servo.set('ec_uart_capture', 'on') 699 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt') 700 except error.TestFail as e: 701 if 'No control named' in str(e): 702 logging.warn('The servod is too old that ec_uart_capture ' 703 'not supported.') 704 # Log separate PD console if supported 705 if self.check_ec_capability(['usbpd_uart'], suppress_warning=True): 706 try: 707 self.servo.set('usbpd_uart_capture', 'on') 708 self.usbpd_uart_file = os.path.join(self.resultsdir, 709 'usbpd_uart.txt') 710 except error.TestFail as e: 711 if 'No control named' in str(e): 712 logging.warn('The servod is too old that ' 713 'usbpd_uart_capture is not supported.') 714 else: 715 logging.info('Not a Google EC, cannot capture ec console output.') 716 717 def _record_uart_capture(self): 718 """Record the CPU/EC/PD UART output stream to files.""" 719 if self.cpu_uart_file: 720 with open(self.cpu_uart_file, 'a') as f: 721 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream'))) 722 if self.cr50_uart_file: 723 with open(self.cr50_uart_file, 'a') as f: 724 f.write(ast.literal_eval(self.servo.get('cr50_uart_stream'))) 725 if self.ec_uart_file and self.faft_config.chrome_ec: 726 with open(self.ec_uart_file, 'a') as f: 727 f.write(ast.literal_eval(self.servo.get('ec_uart_stream'))) 728 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 729 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 730 with open(self.usbpd_uart_file, 'a') as f: 731 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream'))) 732 733 def _cleanup_uart_capture(self): 734 """Cleanup the CPU/EC/PD UART capture.""" 735 # Flush the remaining UART output. 736 self._record_uart_capture() 737 self.servo.set('cpu_uart_capture', 'off') 738 if self.cr50_uart_file: 739 self.servo.set('cr50_uart_capture', 'off') 740 if self.ec_uart_file and self.faft_config.chrome_ec: 741 self.servo.set('ec_uart_capture', 'off') 742 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 743 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 744 self.servo.set('usbpd_uart_capture', 'off') 745 746 def _get_power_state(self, power_state): 747 """ 748 Return the current power state of the AP 749 """ 750 return self.ec.send_command_get_output("powerinfo", [power_state]) 751 752 def wait_power_state(self, power_state, retries): 753 """ 754 Wait for certain power state. 755 756 @param power_state: power state you are expecting 757 @param retries: retries. This is necessary if AP is powering down 758 and transitioning through different states. 759 """ 760 logging.info('Checking power state "%s" maximum %d times.', 761 power_state, retries) 762 while retries > 0: 763 logging.info("try count: %d", retries) 764 try: 765 retries = retries - 1 766 ret = self._get_power_state(power_state) 767 return True 768 except error.TestFail: 769 pass 770 return False 771 772 def suspend(self): 773 """Suspends the DUT.""" 774 cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY 775 self.faft_client.system.run_shell_command(cmd) 776 time.sleep(self.EC_SUSPEND_DELAY) 777 778 def _fetch_servo_log(self): 779 """Fetch the servo log.""" 780 cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2) 781 servo_log = self.servo.system_output(cmd) 782 return None if servo_log == 'NOTFOUND' else servo_log 783 784 def _setup_servo_log(self): 785 """Setup the servo log capturing.""" 786 self.servo_log_original_len = -1 787 if self.servo.is_localhost(): 788 # No servo log recorded when servod runs locally. 789 return 790 791 servo_log = self._fetch_servo_log() 792 if servo_log: 793 self.servo_log_original_len = len(servo_log) 794 else: 795 logging.warn('Servo log file not found.') 796 797 def _record_servo_log(self): 798 """Record the servo log to the results directory.""" 799 if self.servo_log_original_len != -1: 800 servo_log = self._fetch_servo_log() 801 servo_log_file = os.path.join(self.resultsdir, 'servod.log') 802 with open(servo_log_file, 'a') as f: 803 f.write(servo_log[self.servo_log_original_len:]) 804 805 def _record_faft_client_log(self): 806 """Record the faft client log to the results directory.""" 807 client_log = self.faft_client.system.dump_log(True) 808 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 809 with open(client_log_file, 'w') as f: 810 f.write(client_log) 811 812 def _setup_gbb_flags(self): 813 """Setup the GBB flags for FAFT test.""" 814 if self.faft_config.gbb_version < 1.1: 815 logging.info('Skip modifying GBB on versions older than 1.1.') 816 return 817 818 if self.check_setup_done('gbb_flags'): 819 return 820 821 logging.info('Set proper GBB flags for test.') 822 self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY | 823 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 824 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB | 825 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK | 826 vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP, 827 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM | 828 vboot.GBB_FLAG_FAFT_KEY_OVERIDE) 829 self.mark_setup_done('gbb_flags') 830 831 def drop_backup_gbb_flags(self): 832 """Drops the backup GBB flags. 833 834 This can be used when a test intends to permanently change GBB flags. 835 """ 836 self._backup_gbb_flags = None 837 838 def _restore_gbb_flags(self): 839 """Restore GBB flags to their original state.""" 840 if self._backup_gbb_flags is None: 841 return 842 # Setting up and restoring the GBB flags take a lot of time. For 843 # speed-up purpose, don't restore it. 844 logging.info('***') 845 logging.info('*** Please manually restore the original GBB flags to: ' 846 '0x%x ***', self._backup_gbb_flags) 847 logging.info('***') 848 self.unmark_setup_done('gbb_flags') 849 850 def setup_tried_fwb(self, tried_fwb): 851 """Setup for fw B tried state. 852 853 It makes sure the system in the requested fw B tried state. If not, it 854 tries to do so. 855 856 @param tried_fwb: True if requested in tried_fwb=1; 857 False if tried_fwb=0. 858 """ 859 if tried_fwb: 860 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 861 logging.info( 862 'Firmware is not booted with tried_fwb. Reboot into it.') 863 self.faft_client.system.set_try_fw_b() 864 else: 865 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 866 logging.info( 867 'Firmware is booted with tried_fwb. Reboot to clear.') 868 869 def power_on(self): 870 """Switch DUT AC power on.""" 871 self._client.power_on(self.power_control) 872 873 def power_off(self): 874 """Switch DUT AC power off.""" 875 self._client.power_off(self.power_control) 876 877 def power_cycle(self): 878 """Power cycle DUT AC power.""" 879 self._client.power_cycle(self.power_control) 880 881 def setup_rw_boot(self, section='a'): 882 """Make sure firmware is in RW-boot mode. 883 884 If the given firmware section is in RO-boot mode, turn off the RO-boot 885 flag and reboot DUT into RW-boot mode. 886 887 @param section: A firmware section, either 'a' or 'b'. 888 """ 889 flags = self.faft_client.bios.get_preamble_flags(section) 890 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 891 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 892 self.faft_client.bios.set_preamble_flags(section, flags) 893 self.switcher.mode_aware_reboot() 894 895 def setup_kernel(self, part): 896 """Setup for kernel test. 897 898 It makes sure both kernel A and B bootable and the current boot is 899 the requested kernel part. 900 901 @param part: A string of kernel partition number or 'a'/'b'. 902 """ 903 self.ensure_kernel_boot(part) 904 logging.info('Checking the integrity of kernel B and rootfs B...') 905 if (self.faft_client.kernel.diff_a_b() or 906 not self.faft_client.rootfs.verify_rootfs('B')): 907 logging.info('Copying kernel and rootfs from A to B...') 908 self.copy_kernel_and_rootfs(from_part=part, 909 to_part=self.OTHER_KERNEL_MAP[part]) 910 self.reset_and_prioritize_kernel(part) 911 912 def reset_and_prioritize_kernel(self, part): 913 """Make the requested partition highest priority. 914 915 This function also reset kerenl A and B to bootable. 916 917 @param part: A string of partition number to be prioritized. 918 """ 919 root_dev = self.faft_client.system.get_root_dev() 920 # Reset kernel A and B to bootable. 921 self.faft_client.system.run_shell_command( 922 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 923 self.faft_client.system.run_shell_command( 924 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 925 # Set kernel part highest priority. 926 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 927 (self.KERNEL_MAP[part], root_dev)) 928 929 def do_blocking_sync(self, device): 930 """Run a blocking sync command.""" 931 logging.info("Blocking sync for %s", device) 932 if 'mmcblk' in device: 933 # For mmc devices, use `mmc status get` command to send an 934 # empty command to wait for the disk to be available again. 935 self.faft_client.system.run_shell_command('mmc status get %s' % 936 device) 937 elif 'nvme' in device: 938 # Get a list of NVMe namespace and flush them individually 939 # Assumes the output format from nvme list-ns command will 940 # be something like follows: 941 # [ 0]:0x1 942 # [ 1]:0x2 943 available_ns = self.faft_client.system.run_shell_command_get_output( 944 'nvme list-ns %s -a' % device) 945 for ns in available_ns: 946 ns = ns.split(':')[-1] 947 # For NVMe devices, use `nvme flush` command to commit data 948 # and metadata to non-volatile media. 949 self.faft_client.system.run_shell_command( 950 'nvme flush %s -n %s' % (device, ns)) 951 else: 952 # For other devices, hdparm sends TUR to check if 953 # a device is ready for transfer operation. 954 self.faft_client.system.run_shell_command('hdparm -f %s' % device) 955 956 def blocking_sync(self): 957 """Sync root device and internal device.""" 958 # The double calls to sync fakes a blocking call 959 # since the first call returns before the flush 960 # is complete, but the second will wait for the 961 # first to finish. 962 self.faft_client.system.run_shell_command('sync') 963 self.faft_client.system.run_shell_command('sync') 964 965 # sync only sends SYNCHRONIZE_CACHE but doesn't check the status. 966 # This function will perform a device-specific sync command. 967 root_dev = self.faft_client.system.get_root_dev() 968 self.do_blocking_sync(root_dev) 969 970 # Also sync the internal device if booted from removable media. 971 if self.faft_client.system.is_removable_device_boot(): 972 internal_dev = self.faft_client.system.get_internal_device() 973 self.do_blocking_sync(internal_dev) 974 975 def sync_and_ec_reboot(self, flags=''): 976 """Request the client sync and do a EC triggered reboot. 977 978 @param flags: Optional, a space-separated string of flags passed to EC 979 reboot command, including: 980 default: EC soft reboot; 981 'hard': EC cold/hard reboot. 982 """ 983 self.blocking_sync() 984 self.ec.reboot(flags) 985 time.sleep(self.faft_config.ec_boot_to_console) 986 self.check_lid_and_power_on() 987 988 def reboot_and_reset_tpm(self): 989 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 990 self.switcher.reboot_to_mode(to_mode='rec') 991 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 992 self.switcher.mode_aware_reboot() 993 994 def full_power_off_and_on(self): 995 """Shutdown the device by pressing power button and power on again.""" 996 boot_id = self.get_bootid() 997 # Press power button to trigger Chrome OS normal shutdown process. 998 # We use a customized delay since the normal-press 1.2s is not enough. 999 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 1000 # device can take 44-51 seconds to restart, 1001 # add buffer from the default timeout of 60 seconds. 1002 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 1003 time.sleep(self.faft_config.shutdown) 1004 # Short press power button to boot DUT again. 1005 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1006 1007 def check_lid_and_power_on(self): 1008 """ 1009 On devices with EC software sync, system powers on after EC reboots if 1010 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 1011 This method checks lid switch state and presses power button if 1012 necessary. 1013 """ 1014 if self.servo.get("lid_open") == "no": 1015 time.sleep(self.faft_config.software_sync) 1016 self.servo.power_short_press() 1017 1018 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 1019 """Modify the kernel header magic in USB stick. 1020 1021 The kernel header magic is the first 8-byte of kernel partition. 1022 We modify it to make it fail on kernel verification check. 1023 1024 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1025 @param from_magic: A string of magic which we change it from. 1026 @param to_magic: A string of magic which we change it to. 1027 @raise TestError: if failed to change magic. 1028 """ 1029 assert len(from_magic) == 8 1030 assert len(to_magic) == 8 1031 # USB image only contains one kernel. 1032 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1033 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1034 current_magic = self.servo.system_output(read_cmd) 1035 if current_magic == to_magic: 1036 logging.info("The kernel magic is already %s.", current_magic) 1037 return 1038 if current_magic != from_magic: 1039 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1040 1041 logging.info('Modify the kernel magic in USB, from %s to %s.', 1042 from_magic, to_magic) 1043 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1044 " 2>/dev/null" % (to_magic, kernel_part)) 1045 self.servo.system(write_cmd) 1046 1047 if self.servo.system_output(read_cmd) != to_magic: 1048 raise error.TestError("Failed to write new magic.") 1049 1050 def corrupt_usb_kernel(self, usb_dev): 1051 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1052 1053 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1054 """ 1055 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1056 self.CORRUPTED_MAGIC) 1057 1058 def restore_usb_kernel(self, usb_dev): 1059 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1060 1061 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1062 """ 1063 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1064 self.CHROMEOS_MAGIC) 1065 1066 def _call_action(self, action_tuple, check_status=False): 1067 """Call the action function with/without arguments. 1068 1069 @param action_tuple: A function, or a tuple (function, args, error_msg), 1070 in which, args and error_msg are optional. args is 1071 either a value or a tuple if multiple arguments. 1072 This can also be a list containing multiple 1073 function or tuple. In this case, these actions are 1074 called in sequence. 1075 @param check_status: Check the return value of action function. If not 1076 succeed, raises a TestFail exception. 1077 @return: The result value of the action function. 1078 @raise TestError: An error when the action function is not callable. 1079 @raise TestFail: When check_status=True, action function not succeed. 1080 """ 1081 if isinstance(action_tuple, list): 1082 return all([self._call_action(action, check_status=check_status) 1083 for action in action_tuple]) 1084 1085 action = action_tuple 1086 args = () 1087 error_msg = 'Not succeed' 1088 if isinstance(action_tuple, tuple): 1089 action = action_tuple[0] 1090 if len(action_tuple) >= 2: 1091 args = action_tuple[1] 1092 if not isinstance(args, tuple): 1093 args = (args,) 1094 if len(action_tuple) >= 3: 1095 error_msg = action_tuple[2] 1096 1097 if action is None: 1098 return 1099 1100 if not callable(action): 1101 raise error.TestError('action is not callable!') 1102 1103 info_msg = 'calling %s' % action.__name__ 1104 if args: 1105 info_msg += ' with args %s' % str(args) 1106 logging.info(info_msg) 1107 ret = action(*args) 1108 1109 if check_status and not ret: 1110 raise error.TestFail('%s: %s returning %s' % 1111 (error_msg, info_msg, str(ret))) 1112 return ret 1113 1114 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1115 run_power_action=True, post_power_action=None, 1116 shutdown_timeout=None): 1117 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1118 1119 @param shutdown_action: function which makes DUT shutdown, like 1120 pressing power key. 1121 @param pre_power_action: function which is called before next power on. 1122 @param run_power_action: power_key press by default, set to None to skip. 1123 @param post_power_action: function which is called after next power on. 1124 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1125 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1126 """ 1127 self._call_action(shutdown_action) 1128 logging.info('Wait to ensure DUT shut down...') 1129 try: 1130 if shutdown_timeout is None: 1131 shutdown_timeout = self.faft_config.shutdown_timeout 1132 self.switcher.wait_for_client(timeout=shutdown_timeout) 1133 raise error.TestFail( 1134 'Should shut the device down after calling %s.' % 1135 shutdown_action.__name__) 1136 except ConnectionError: 1137 if self.check_ec_capability(['x86'], suppress_warning=True): 1138 PWR_RETRIES=5 1139 if not self.wait_power_state("G3", PWR_RETRIES): 1140 raise error.TestFail("System not shutdown properly and EC" 1141 "fails to enter into G3 state.") 1142 logging.info('System entered into G3 state..') 1143 logging.info( 1144 'DUT is surely shutdown. We are going to power it on again...') 1145 1146 if pre_power_action: 1147 self._call_action(pre_power_action) 1148 if run_power_action: 1149 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1150 if post_power_action: 1151 self._call_action(post_power_action) 1152 1153 def get_bootid(self, retry=3): 1154 """ 1155 Return the bootid. 1156 """ 1157 boot_id = None 1158 while retry: 1159 try: 1160 boot_id = self._client.get_boot_id() 1161 break 1162 except error.AutoservRunError: 1163 retry -= 1 1164 if retry: 1165 logging.info('Retry to get boot_id...') 1166 else: 1167 logging.warning('Failed to get boot_id.') 1168 logging.info('boot_id: %s', boot_id) 1169 return boot_id 1170 1171 def check_state(self, func): 1172 """ 1173 Wrapper around _call_action with check_status set to True. This is a 1174 helper function to be used by tests and is currently implemented by 1175 calling _call_action with check_status=True. 1176 1177 TODO: This function's arguments need to be made more stringent. And 1178 its functionality should be moved over to check functions directly in 1179 the future. 1180 1181 @param func: A function, or a tuple (function, args, error_msg), 1182 in which, args and error_msg are optional. args is 1183 either a value or a tuple if multiple arguments. 1184 This can also be a list containing multiple 1185 function or tuple. In this case, these actions are 1186 called in sequence. 1187 @return: The result value of the action function. 1188 @raise TestFail: If the function does notsucceed. 1189 """ 1190 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1191 self._call_action(func, check_status=True) 1192 logging.info("-[FAFT]-[ end state_checker ]----------------") 1193 1194 def get_current_firmware_sha(self): 1195 """Get current firmware sha of body and vblock. 1196 1197 @return: Current firmware sha follows the order ( 1198 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha) 1199 """ 1200 current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'), 1201 self.faft_client.bios.get_body_sha('a'), 1202 self.faft_client.bios.get_sig_sha('b'), 1203 self.faft_client.bios.get_body_sha('b')) 1204 if not all(current_firmware_sha): 1205 raise error.TestError('Failed to get firmware sha.') 1206 return current_firmware_sha 1207 1208 def is_firmware_changed(self): 1209 """Check if the current firmware changed, by comparing its SHA. 1210 1211 @return: True if it is changed, otherwise Flase. 1212 """ 1213 # Device may not be rebooted after test. 1214 self.faft_client.bios.reload() 1215 1216 current_sha = self.get_current_firmware_sha() 1217 1218 if current_sha == self._backup_firmware_sha: 1219 return False 1220 else: 1221 corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0]) 1222 corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1]) 1223 corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2]) 1224 corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3]) 1225 logging.info('Firmware changed:') 1226 logging.info('VBOOTA is changed: %s', corrupt_VBOOTA) 1227 logging.info('VBOOTB is changed: %s', corrupt_VBOOTB) 1228 logging.info('FVMAIN is changed: %s', corrupt_FVMAIN) 1229 logging.info('FVMAINB is changed: %s', corrupt_FVMAINB) 1230 return True 1231 1232 def backup_firmware(self, suffix='.original'): 1233 """Backup firmware to file, and then send it to host. 1234 1235 @param suffix: a string appended to backup file name 1236 """ 1237 remote_temp_dir = self.faft_client.system.create_temp_dir() 1238 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1239 self.faft_client.bios.dump_whole(remote_bios_path) 1240 self._client.get_file(remote_bios_path, 1241 os.path.join(self.resultsdir, 'bios' + suffix)) 1242 1243 if self.faft_config.chrome_ec: 1244 remote_ec_path = os.path.join(remote_temp_dir, 'ec') 1245 self.faft_client.ec.dump_whole(remote_ec_path) 1246 self._client.get_file(remote_ec_path, 1247 os.path.join(self.resultsdir, 'ec' + suffix)) 1248 1249 self._client.run('rm -rf %s' % remote_temp_dir) 1250 logging.info('Backup firmware stored in %s with suffix %s', 1251 self.resultsdir, suffix) 1252 1253 self._backup_firmware_sha = self.get_current_firmware_sha() 1254 1255 def is_firmware_saved(self): 1256 """Check if a firmware saved (called backup_firmware before). 1257 1258 @return: True if the firmware is backuped; otherwise False. 1259 """ 1260 return self._backup_firmware_sha != () 1261 1262 def clear_saved_firmware(self): 1263 """Clear the firmware saved by the method backup_firmware.""" 1264 self._backup_firmware_sha = () 1265 1266 def restore_firmware(self, suffix='.original', restore_ec=True): 1267 """Restore firmware from host in resultsdir. 1268 1269 @param suffix: a string appended to backup file name 1270 @param restore_ec: True to restore the ec firmware; False not to do. 1271 """ 1272 if not self.is_firmware_changed(): 1273 return 1274 1275 # Backup current corrupted firmware. 1276 self.backup_firmware(suffix='.corrupt') 1277 1278 # Restore firmware. 1279 remote_temp_dir = self.faft_client.system.create_temp_dir() 1280 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix), 1281 os.path.join(remote_temp_dir, 'bios')) 1282 1283 self.faft_client.bios.write_whole( 1284 os.path.join(remote_temp_dir, 'bios')) 1285 1286 if self.faft_config.chrome_ec and restore_ec: 1287 self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix), 1288 os.path.join(remote_temp_dir, 'ec')) 1289 self.faft_client.ec.write_whole( 1290 os.path.join(remote_temp_dir, 'ec')) 1291 1292 self.switcher.mode_aware_reboot() 1293 logging.info('Successfully restore firmware.') 1294 1295 def setup_firmwareupdate_shellball(self, shellball=None): 1296 """Setup a shellball to use in firmware update test. 1297 1298 Check if there is a given shellball, and it is a shell script. Then, 1299 send it to the remote host. Otherwise, use the 1300 /usr/sbin/chromeos-firmwareupdate in the image and replace its inside 1301 BIOS and EC images with the active firmware images. 1302 1303 @param shellball: path of a shellball or default to None. 1304 """ 1305 if shellball: 1306 # Determine the firmware file is a shellball or a raw binary. 1307 is_shellball = (utils.system_output("file %s" % shellball).find( 1308 "shell script") != -1) 1309 if is_shellball: 1310 logging.info('Device will update firmware with shellball %s', 1311 shellball) 1312 temp_path = self.faft_client.updater.get_temp_path() 1313 working_shellball = os.path.join(temp_path, 1314 'chromeos-firmwareupdate') 1315 self._client.send_file(shellball, working_shellball) 1316 self.faft_client.updater.extract_shellball() 1317 else: 1318 raise error.TestFail( 1319 'The given shellball is not a shell script.') 1320 else: 1321 logging.info('No shellball given, use the original shellball and ' 1322 'replace its BIOS and EC images.') 1323 work_path = self.faft_client.updater.get_work_path() 1324 bios_in_work_path = os.path.join( 1325 work_path, self.faft_client.updater.get_bios_relative_path()) 1326 ec_in_work_path = os.path.join( 1327 work_path, self.faft_client.updater.get_ec_relative_path()) 1328 logging.info('Writing current BIOS to: %s', bios_in_work_path) 1329 self.faft_client.bios.dump_whole(bios_in_work_path) 1330 if self.faft_config.chrome_ec: 1331 logging.info('Writing current EC to: %s', ec_in_work_path) 1332 self.faft_client.ec.dump_firmware(ec_in_work_path) 1333 self.faft_client.updater.repack_shellball() 1334 1335 def is_kernel_changed(self): 1336 """Check if the current kernel is changed, by comparing its SHA1 hash. 1337 1338 @return: True if it is changed; otherwise, False. 1339 """ 1340 changed = False 1341 for p in ('A', 'B'): 1342 backup_sha = self._backup_kernel_sha.get(p, None) 1343 current_sha = self.faft_client.kernel.get_sha(p) 1344 if backup_sha != current_sha: 1345 changed = True 1346 logging.info('Kernel %s is changed', p) 1347 return changed 1348 1349 def backup_kernel(self, suffix='.original'): 1350 """Backup kernel to files, and the send them to host. 1351 1352 @param suffix: a string appended to backup file name. 1353 """ 1354 remote_temp_dir = self.faft_client.system.create_temp_dir() 1355 for p in ('A', 'B'): 1356 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1357 self.faft_client.kernel.dump(p, remote_path) 1358 self._client.get_file( 1359 remote_path, 1360 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1361 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1362 logging.info('Backup kernel stored in %s with suffix %s', 1363 self.resultsdir, suffix) 1364 1365 def is_kernel_saved(self): 1366 """Check if kernel images are saved (backup_kernel called before). 1367 1368 @return: True if the kernel is saved; otherwise, False. 1369 """ 1370 return len(self._backup_kernel_sha) != 0 1371 1372 def clear_saved_kernel(self): 1373 """Clear the kernel saved by backup_kernel().""" 1374 self._backup_kernel_sha = dict() 1375 1376 def restore_kernel(self, suffix='.original'): 1377 """Restore kernel from host in resultsdir. 1378 1379 @param suffix: a string appended to backup file name. 1380 """ 1381 if not self.is_kernel_changed(): 1382 return 1383 1384 # Backup current corrupted kernel. 1385 self.backup_kernel(suffix='.corrupt') 1386 1387 # Restore kernel. 1388 remote_temp_dir = self.faft_client.system.create_temp_dir() 1389 for p in ('A', 'B'): 1390 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1391 self._client.send_file( 1392 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1393 remote_path) 1394 self.faft_client.kernel.write(p, remote_path) 1395 1396 self.switcher.mode_aware_reboot() 1397 logging.info('Successfully restored kernel.') 1398 1399 def backup_cgpt_attributes(self): 1400 """Backup CGPT partition table attributes.""" 1401 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1402 1403 def restore_cgpt_attributes(self): 1404 """Restore CGPT partition table attributes.""" 1405 current_table = self.faft_client.cgpt.get_attributes() 1406 if current_table == self._backup_cgpt_attr: 1407 return 1408 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1409 self._backup_cgpt_attr, 1410 current_table) 1411 self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr) 1412 1413 self.switcher.mode_aware_reboot() 1414 logging.info('Successfully restored CGPT table.') 1415 1416 def try_fwb(self, count=0): 1417 """set to try booting FWB count # times 1418 1419 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 1420 vboot2 1421 1422 @param count: an integer specifying value to program into 1423 fwb_tries(vb1)/fw_try_next(vb2) 1424 """ 1425 if self.fw_vboot2: 1426 self.faft_client.system.set_fw_try_next('B', count) 1427 else: 1428 # vboot1: we need to boot into fwb at least once 1429 if not count: 1430 count = count + 1 1431 self.faft_client.system.set_try_fw_b(count) 1432 1433