1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import ctypes 7import logging 8import os 9import pprint 10import re 11import time 12import uuid 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error 16from autotest_lib.server import test 17from autotest_lib.server.cros import vboot_constants as vboot 18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig 19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 20from autotest_lib.server.cros.faft.utils import mode_switcher 21from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 22from autotest_lib.server.cros.servo import chrome_base_ec 23from autotest_lib.server.cros.servo import chrome_cr50 24from autotest_lib.server.cros.servo import chrome_ec 25 26ConnectionError = mode_switcher.ConnectionError 27 28 29class FAFTBase(test.test): 30 """The base class of FAFT classes. 31 32 It launches the FAFTClient on DUT, such that the test can access its 33 firmware functions and interfaces. It also provides some methods to 34 handle the reboot mechanism, in order to ensure FAFTClient is still 35 connected after reboot. 36 """ 37 def initialize(self, host): 38 """Create a FAFTClient object and install the dependency.""" 39 self.servo = host.servo 40 self.servo.initialize_dut() 41 self._client = host 42 self.faft_client = RPCProxy(host) 43 self.lockfile = '/var/tmp/faft/lock' 44 45 46class FirmwareTest(FAFTBase): 47 """ 48 Base class that sets up helper objects/functions for firmware tests. 49 50 TODO: add documentaion as the FAFT rework progresses. 51 """ 52 version = 1 53 54 # Mapping of partition number of kernel and rootfs. 55 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 56 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 57 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 58 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 59 60 CHROMEOS_MAGIC = "CHROMEOS" 61 CORRUPTED_MAGIC = "CORRUPTD" 62 63 # Delay for waiting client to return before EC suspend 64 EC_SUSPEND_DELAY = 5 65 66 # Delay between EC suspend and wake 67 WAKE_DELAY = 10 68 69 # Delay between closing and opening lid 70 LID_DELAY = 1 71 72 _SERVOD_LOG = '/var/log/servod.log' 73 74 _ROOTFS_PARTITION_NUMBER = 3 75 76 _backup_firmware_sha = () 77 _backup_kernel_sha = dict() 78 _backup_cgpt_attr = dict() 79 _backup_gbb_flags = None 80 _backup_dev_mode = None 81 82 # Class level variable, keep track the states of one time setup. 83 # This variable is preserved across tests which inherit this class. 84 _global_setup_done = { 85 'gbb_flags': False, 86 'reimage': False, 87 'usb_check': False, 88 } 89 90 @classmethod 91 def check_setup_done(cls, label): 92 """Check if the given setup is done. 93 94 @param label: The label of the setup. 95 """ 96 return cls._global_setup_done[label] 97 98 @classmethod 99 def mark_setup_done(cls, label): 100 """Mark the given setup done. 101 102 @param label: The label of the setup. 103 """ 104 cls._global_setup_done[label] = True 105 106 @classmethod 107 def unmark_setup_done(cls, label): 108 """Mark the given setup not done. 109 110 @param label: The label of the setup. 111 """ 112 cls._global_setup_done[label] = False 113 114 def initialize(self, host, cmdline_args, ec_wp=None): 115 super(FirmwareTest, self).initialize(host) 116 self.run_id = str(uuid.uuid4()) 117 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 118 # Parse arguments from command line 119 args = {} 120 self.power_control = host.POWER_CONTROL_RPM 121 for arg in cmdline_args: 122 match = re.search("^(\w+)=(.+)", arg) 123 if match: 124 args[match.group(1)] = match.group(2) 125 if 'power_control' in args: 126 self.power_control = args['power_control'] 127 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 128 raise error.TestError('Valid values for --args=power_control ' 129 'are %s. But you entered wrong argument ' 130 'as "%s".' 131 % (host.POWER_CONTROL_VALID_ARGS, 132 self.power_control)) 133 self._no_ec_sync = False 134 if 'no_ec_sync' in args: 135 if 'true' in args['no_ec_sync'].lower(): 136 self._no_ec_sync = True 137 138 if not self.faft_client.system.dev_tpm_present(): 139 raise error.TestError('/dev/tpm0 does not exist on the client') 140 141 self.faft_config = FAFTConfig( 142 self.faft_client.system.get_platform_name()) 143 self.checkers = FAFTCheckers(self) 144 self.switcher = mode_switcher.create_mode_switcher(self) 145 146 if self.faft_config.chrome_ec: 147 self.ec = chrome_ec.ChromeEC(self.servo) 148 # Check for presence of a USBPD console 149 if self.faft_config.chrome_usbpd: 150 self.usbpd = chrome_ec.ChromeUSBPD(self.servo) 151 elif self.faft_config.chrome_ec: 152 # If no separate USBPD console, then PD exists on EC console 153 self.usbpd = self.ec 154 # Get plankton console 155 self.plankton = host.plankton 156 self.plankton_host = host._plankton_host 157 158 # Create the BaseEC object. None if not available. 159 self.base_ec = chrome_base_ec.create_base_ec(self.servo) 160 161 self._setup_uart_capture() 162 self._setup_servo_log() 163 self._record_system_info() 164 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 165 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 166 if self.fw_vboot2: 167 self.faft_client.system.set_fw_try_next('A') 168 if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B': 169 logging.info('mainfw_act is B. rebooting to set it A') 170 self.switcher.mode_aware_reboot() 171 self._setup_gbb_flags() 172 self.faft_client.updater.stop_daemon() 173 self._create_faft_lockfile() 174 self._setup_ec_write_protect(ec_wp) 175 # See chromium:239034 regarding needing this sync. 176 self.blocking_sync() 177 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 178 179 def cleanup(self): 180 """Autotest cleanup function.""" 181 # Unset state checker in case it's set by subclass 182 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 183 try: 184 self.faft_client.system.is_available() 185 except: 186 # Remote is not responding. Revive DUT so that subsequent tests 187 # don't fail. 188 self._restore_routine_from_timeout() 189 self.switcher.restore_mode() 190 self._restore_ec_write_protect() 191 self._restore_servo_v4_role() 192 self._restore_gbb_flags() 193 self.faft_client.updater.start_daemon() 194 self.faft_client.updater.cleanup() 195 self._remove_faft_lockfile() 196 self._record_servo_log() 197 self._record_faft_client_log() 198 self._cleanup_uart_capture() 199 super(FirmwareTest, self).cleanup() 200 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 201 202 def _record_system_info(self): 203 """Record some critical system info to the attr keyval. 204 205 This info is used by generate_test_report later. 206 """ 207 system_info = { 208 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 209 'ec_version': self.faft_client.ec.get_version(), 210 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 211 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 212 'servod_version': self._client._servo_host.run( 213 'servod --version').stdout.strip(), 214 'os_version': self._client.get_release_builder_path(), 215 'servo_type': self.servo.get_servo_version() 216 } 217 218 # Record the servo v4 and servo micro versions when possible 219 if 'servo_micro' in system_info['servo_type']: 220 system_info['servo_micro_version'] = self.servo.get( 221 'servo_micro_version') 222 223 if 'servo_v4' in system_info['servo_type']: 224 system_info['servo_v4_version'] = self.servo.get('servo_v4_version') 225 226 if hasattr(self, 'cr50'): 227 system_info['cr50_version'] = self.servo.get('cr50_version') 228 229 logging.info('System info:\n%s', pprint.pformat(system_info)) 230 self.write_attr_keyval(system_info) 231 232 def invalidate_firmware_setup(self): 233 """Invalidate all firmware related setup state. 234 235 This method is called when the firmware is re-flashed. It resets all 236 firmware related setup states so that the next test setup properly 237 again. 238 """ 239 self.unmark_setup_done('gbb_flags') 240 241 def _retrieve_recovery_reason_from_trap(self): 242 """Try to retrieve the recovery reason from a trapped recovery screen. 243 244 @return: The recovery_reason, 0 if any error. 245 """ 246 recovery_reason = 0 247 logging.info('Try to retrieve recovery reason...') 248 if self.servo.get_usbkey_direction() == 'dut': 249 self.switcher.bypass_rec_mode() 250 else: 251 self.servo.switch_usbkey('dut') 252 253 try: 254 self.switcher.wait_for_client() 255 lines = self.faft_client.system.run_shell_command_get_output( 256 'crossystem recovery_reason') 257 recovery_reason = int(lines[0]) 258 logging.info('Got the recovery reason %d.', recovery_reason) 259 except ConnectionError: 260 logging.error('Failed to get the recovery reason due to connection ' 261 'error.') 262 return recovery_reason 263 264 def _reset_client(self): 265 """Reset client to a workable state. 266 267 This method is called when the client is not responsive. It may be 268 caused by the following cases: 269 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 270 - corrupted firmware; 271 - corrutped OS image. 272 """ 273 # DUT may halt on a firmware screen. Try cold reboot. 274 logging.info('Try cold reboot...') 275 self.switcher.mode_aware_reboot(reboot_type='cold', 276 sync_before_boot=False, 277 wait_for_dut_up=False) 278 self.switcher.wait_for_client_offline() 279 self.switcher.bypass_dev_mode() 280 try: 281 self.switcher.wait_for_client() 282 return 283 except ConnectionError: 284 logging.warn('Cold reboot doesn\'t help, still connection error.') 285 286 # DUT may be broken by a corrupted firmware. Restore firmware. 287 # We assume the recovery boot still works fine. Since the recovery 288 # code is in RO region and all FAFT tests don't change the RO region 289 # except GBB. 290 if self.is_firmware_saved(): 291 self._ensure_client_in_recovery() 292 logging.info('Try restore the original firmware...') 293 if self.is_firmware_changed(): 294 try: 295 self.restore_firmware() 296 return 297 except ConnectionError: 298 logging.warn('Restoring firmware doesn\'t help, still ' 299 'connection error.') 300 301 # Perhaps it's kernel that's broken. Let's try restoring it. 302 if self.is_kernel_saved(): 303 self._ensure_client_in_recovery() 304 logging.info('Try restore the original kernel...') 305 if self.is_kernel_changed(): 306 try: 307 self.restore_kernel() 308 return 309 except ConnectionError: 310 logging.warn('Restoring kernel doesn\'t help, still ' 311 'connection error.') 312 313 # DUT may be broken by a corrupted OS image. Restore OS image. 314 self._ensure_client_in_recovery() 315 logging.info('Try restore the OS image...') 316 self.faft_client.system.run_shell_command('chromeos-install --yes') 317 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 318 self.switcher.wait_for_client_offline() 319 self.switcher.bypass_dev_mode() 320 try: 321 self.switcher.wait_for_client() 322 logging.info('Successfully restore OS image.') 323 return 324 except ConnectionError: 325 logging.warn('Restoring OS image doesn\'t help, still connection ' 326 'error.') 327 328 def _ensure_client_in_recovery(self): 329 """Ensure client in recovery boot; reboot into it if necessary. 330 331 @raise TestError: if failed to boot the USB image. 332 """ 333 logging.info('Try boot into USB image...') 334 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 335 wait_for_dut_up=False) 336 self.servo.switch_usbkey('host') 337 self.switcher.bypass_rec_mode() 338 try: 339 self.switcher.wait_for_client() 340 except ConnectionError: 341 raise error.TestError('Failed to boot the USB image.') 342 343 def _restore_routine_from_timeout(self): 344 """A routine to try to restore the system from a timeout error. 345 346 This method is called when FAFT failed to connect DUT after reboot. 347 348 @raise TestFail: This exception is already raised, with a decription 349 why it failed. 350 """ 351 # DUT is disconnected. Capture the UART output for debug. 352 self._record_uart_capture() 353 354 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 355 # identify if it is a network flaky. 356 357 recovery_reason = self._retrieve_recovery_reason_from_trap() 358 359 # Reset client to a workable state. 360 self._reset_client() 361 362 # Raise the proper TestFail exception. 363 if recovery_reason: 364 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 365 'and timed out' % recovery_reason) 366 else: 367 raise error.TestFail('Timed out waiting for DUT reboot') 368 369 def assert_test_image_in_usb_disk(self, usb_dev=None): 370 """Assert an USB disk plugged-in on servo and a test image inside. 371 372 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 373 If None, it is detected automatically. 374 @raise TestError: if USB disk not detected or not a test image. 375 """ 376 if self.check_setup_done('usb_check'): 377 return 378 if usb_dev: 379 assert self.servo.get_usbkey_direction() == 'host' 380 else: 381 self.servo.switch_usbkey('host') 382 usb_dev = self.servo.probe_host_usb_dev() 383 if not usb_dev: 384 raise error.TestError( 385 'An USB disk should be plugged in the servo board.') 386 387 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 388 logging.info('usb dev is %s', usb_dev) 389 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 390 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 391 392 try: 393 usb_lsb = self.servo.system_output('cat %s' % 394 os.path.join(tmpd, 'etc/lsb-release')) 395 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 396 dut_lsb = '\n'.join(self.faft_client.system. 397 run_shell_command_get_output('cat /etc/lsb-release')) 398 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 399 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 400 raise error.TestError('USB stick in servo is no test image') 401 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 402 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 403 if usb_board != dut_board: 404 raise error.TestError('USB stick in servo contains a %s ' 405 'image, but DUT is a %s' % (usb_board, dut_board)) 406 finally: 407 for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd): 408 self.servo.system(cmd) 409 410 self.mark_setup_done('usb_check') 411 412 def setup_usbkey(self, usbkey, host=None, used_for_recovery=None): 413 """Setup the USB disk for the test. 414 415 It checks the setup of USB disk and a valid ChromeOS test image inside. 416 It also muxes the USB disk to either the host or DUT by request. 417 418 @param usbkey: True if the USB disk is required for the test, False if 419 not required. 420 @param host: Optional, True to mux the USB disk to host, False to mux it 421 to DUT, default to do nothing. 422 @param used_for_recovery: Optional, True if the USB disk is used for 423 recovery boot; False if the USB disk is not 424 used for recovery boot, like Ctrl-U USB boot. 425 """ 426 if usbkey: 427 self.assert_test_image_in_usb_disk() 428 elif host is None: 429 # USB disk is not required for the test. Better to mux it to host. 430 host = True 431 432 if host is True: 433 self.servo.switch_usbkey('host') 434 elif host is False: 435 self.servo.switch_usbkey('dut') 436 437 if used_for_recovery is None: 438 # Default value is True if usbkey == True. 439 # As the common usecase of USB disk is for recovery boot. Tests 440 # can define it explicitly if not. 441 used_for_recovery = usbkey 442 443 if used_for_recovery: 444 # In recovery boot, the locked EC RO doesn't support PD for most 445 # of the CrOS devices. The default servo v4 power role is a SRC. 446 # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the 447 # data role swap from UFP to DFP; as a result, DUT can't see the 448 # USB disk and the Ethernet dongle on servo v4. 449 # 450 # This is a workaround to set servo v4 as a SNK, for every FAFT 451 # test which boots into the USB disk in the recovery mode. 452 # 453 # TODO(waihong): Add a check to see if the battery level is too 454 # low and sleep for a while for charging. 455 self.set_servo_v4_role_to_snk() 456 457 def set_servo_v4_role_to_snk(self): 458 """Set the servo v4 role to SNK.""" 459 self._needed_restore_servo_v4_role = True 460 self.servo.set_servo_v4_role('snk') 461 462 def _restore_servo_v4_role(self): 463 """Restore the servo v4 role to default SRC.""" 464 if not hasattr(self, '_needed_restore_servo_v4_role'): 465 return 466 if self._needed_restore_servo_v4_role: 467 self.servo.set_servo_v4_role('src') 468 469 def get_usbdisk_path_on_dut(self): 470 """Get the path of the USB disk device plugged-in the servo on DUT. 471 472 Returns: 473 A string representing USB disk path, like '/dev/sdb', or None if 474 no USB disk is found. 475 """ 476 cmd = 'ls -d /dev/s*[a-z]' 477 original_value = self.servo.get_usbkey_direction() 478 479 # Make the dut unable to see the USB disk. 480 self.servo.switch_usbkey('off') 481 no_usb_set = set( 482 self.faft_client.system.run_shell_command_get_output(cmd)) 483 484 # Make the dut able to see the USB disk. 485 self.servo.switch_usbkey('dut') 486 time.sleep(self.faft_config.usb_plug) 487 has_usb_set = set( 488 self.faft_client.system.run_shell_command_get_output(cmd)) 489 490 # Back to its original value. 491 if original_value != self.servo.get_usbkey_direction(): 492 self.servo.switch_usbkey(original_value) 493 494 diff_set = has_usb_set - no_usb_set 495 if len(diff_set) == 1: 496 return diff_set.pop() 497 else: 498 return None 499 500 def _create_faft_lockfile(self): 501 """Creates the FAFT lockfile.""" 502 logging.info('Creating FAFT lockfile...') 503 command = 'touch %s' % (self.lockfile) 504 self.faft_client.system.run_shell_command(command) 505 506 def _remove_faft_lockfile(self): 507 """Removes the FAFT lockfile.""" 508 logging.info('Removing FAFT lockfile...') 509 command = 'rm -f %s' % (self.lockfile) 510 self.faft_client.system.run_shell_command(command) 511 512 def clear_set_gbb_flags(self, clear_mask, set_mask): 513 """Clear and set the GBB flags in the current flashrom. 514 515 @param clear_mask: A mask of flags to be cleared. 516 @param set_mask: A mask of flags to be set. 517 """ 518 gbb_flags = self.faft_client.bios.get_gbb_flags() 519 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 520 self.gbb_flags = new_flags 521 if new_flags != gbb_flags: 522 self._backup_gbb_flags = gbb_flags 523 logging.info('Changing GBB flags from 0x%x to 0x%x.', 524 gbb_flags, new_flags) 525 self.faft_client.bios.set_gbb_flags(new_flags) 526 # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag, 527 # reboot to get a clear state 528 if ((gbb_flags ^ new_flags) & 529 (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 530 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)): 531 self.switcher.mode_aware_reboot() 532 else: 533 logging.info('Current GBB flags look good for test: 0x%x.', 534 gbb_flags) 535 536 def check_ec_capability(self, required_cap=None, suppress_warning=False): 537 """Check if current platform has required EC capabilities. 538 539 @param required_cap: A list containing required EC capabilities. Pass in 540 None to only check for presence of Chrome EC. 541 @param suppress_warning: True to suppress any warning messages. 542 @return: True if requirements are met. Otherwise, False. 543 """ 544 if not self.faft_config.chrome_ec: 545 if not suppress_warning: 546 logging.warn('Requires Chrome EC to run this test.') 547 return False 548 549 if not required_cap: 550 return True 551 552 for cap in required_cap: 553 if cap not in self.faft_config.ec_capability: 554 if not suppress_warning: 555 logging.warn('Requires EC capability "%s" to run this ' 556 'test.', cap) 557 return False 558 559 return True 560 561 def check_root_part_on_non_recovery(self, part): 562 """Check the partition number of root device and on normal/dev boot. 563 564 @param part: A string of partition number, e.g.'3'. 565 @return: True if the root device matched and on normal/dev boot; 566 otherwise, False. 567 """ 568 return self.checkers.root_part_checker(part) and \ 569 self.checkers.crossystem_checker({ 570 'mainfw_type': ('normal', 'developer'), 571 }) 572 573 def _join_part(self, dev, part): 574 """Return a concatenated string of device and partition number. 575 576 @param dev: A string of device, e.g.'/dev/sda'. 577 @param part: A string of partition number, e.g.'3'. 578 @return: A concatenated string of device and partition number, 579 e.g.'/dev/sda3'. 580 581 >>> seq = FirmwareTest() 582 >>> seq._join_part('/dev/sda', '3') 583 '/dev/sda3' 584 >>> seq._join_part('/dev/mmcblk0', '2') 585 '/dev/mmcblk0p2' 586 """ 587 if 'mmcblk' in dev: 588 return dev + 'p' + part 589 elif 'nvme' in dev: 590 return dev + 'p' + part 591 else: 592 return dev + part 593 594 def copy_kernel_and_rootfs(self, from_part, to_part): 595 """Copy kernel and rootfs from from_part to to_part. 596 597 @param from_part: A string of partition number to be copied from. 598 @param to_part: A string of partition number to be copied to. 599 """ 600 root_dev = self.faft_client.system.get_root_dev() 601 logging.info('Copying kernel from %s to %s. Please wait...', 602 from_part, to_part) 603 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 604 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 605 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 606 logging.info('Copying rootfs from %s to %s. Please wait...', 607 from_part, to_part) 608 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 609 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 610 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 611 612 def ensure_kernel_boot(self, part): 613 """Ensure the request kernel boot. 614 615 If not, it duplicates the current kernel to the requested kernel 616 and sets the requested higher priority to ensure it boot. 617 618 @param part: A string of kernel partition number or 'a'/'b'. 619 """ 620 if not self.checkers.root_part_checker(part): 621 if self.faft_client.kernel.diff_a_b(): 622 self.copy_kernel_and_rootfs( 623 from_part=self.OTHER_KERNEL_MAP[part], 624 to_part=part) 625 self.reset_and_prioritize_kernel(part) 626 self.switcher.mode_aware_reboot() 627 628 def ensure_dev_internal_boot(self, original_dev_boot_usb): 629 """Ensure internal device boot in developer mode. 630 631 If not internal device boot, it will try to reboot the device and 632 bypass dev mode to boot into internal device. 633 634 @param original_dev_boot_usb: Original dev_boot_usb value. 635 """ 636 logging.info('Checking internal device boot.') 637 if self.faft_client.system.is_removable_device_boot(): 638 logging.info('Reboot into internal disk...') 639 self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb) 640 self.switcher.mode_aware_reboot() 641 self.check_state((self.checkers.dev_boot_usb_checker, False, 642 'Device not booted from internal disk properly.')) 643 644 def set_hardware_write_protect(self, enable): 645 """Set hardware write protect pin. 646 647 @param enable: True if asserting write protect pin. Otherwise, False. 648 """ 649 self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off') 650 651 def set_ec_write_protect_and_reboot(self, enable): 652 """Set EC write protect status and reboot to take effect. 653 654 The write protect state is only activated if both hardware write 655 protect pin is asserted and software write protect flag is set. 656 This method asserts/deasserts hardware write protect pin first, and 657 set corresponding EC software write protect flag. 658 659 If the device uses non-Chrome EC, set the software write protect via 660 flashrom. 661 662 If the device uses Chrome EC, a reboot is required for write protect 663 to take effect. Since the software write protect flag cannot be unset 664 if hardware write protect pin is asserted, we need to deasserted the 665 pin first if we are deactivating write protect. Similarly, a reboot 666 is required before we can modify the software flag. 667 668 @param enable: True if activating EC write protect. Otherwise, False. 669 """ 670 self.set_hardware_write_protect(enable) 671 if self.faft_config.chrome_ec: 672 self.set_chrome_ec_write_protect_and_reboot(enable) 673 else: 674 self.faft_client.ec.set_write_protect(enable) 675 self.switcher.mode_aware_reboot() 676 677 def set_chrome_ec_write_protect_and_reboot(self, enable): 678 """Set Chrome EC write protect status and reboot to take effect. 679 680 @param enable: True if activating EC write protect. Otherwise, False. 681 """ 682 if enable: 683 # Set write protect flag and reboot to take effect. 684 self.ec.set_flash_write_protect(enable) 685 self.sync_and_ec_reboot() 686 else: 687 # Reboot after deasserting hardware write protect pin to deactivate 688 # write protect. And then remove software write protect flag. 689 self.sync_and_ec_reboot() 690 self.ec.set_flash_write_protect(enable) 691 692 def _setup_ec_write_protect(self, ec_wp): 693 """Setup for EC write-protection. 694 695 It makes sure the EC in the requested write-protection state. If not, it 696 flips the state. Flipping the write-protection requires DUT reboot. 697 698 @param ec_wp: True to request EC write-protected; False to request EC 699 not write-protected; None to do nothing. 700 """ 701 if ec_wp is None: 702 self._old_wpsw_boot = None 703 return 704 self._old_wpsw_cur = self.checkers.crossystem_checker( 705 {'wpsw_cur': '1'}, suppress_logging=True) 706 self._old_wpsw_boot = self.checkers.crossystem_checker( 707 {'wpsw_boot': '1'}, suppress_logging=True) 708 if not (ec_wp == self._old_wpsw_cur == self._old_wpsw_boot): 709 if not self.faft_config.ap_access_ec_flash: 710 raise error.TestNAError( 711 "Cannot change EC write-protect for this device") 712 713 logging.info('The test required EC is %swrite-protected. Reboot ' 714 'and flip the state.', '' if ec_wp else 'not ') 715 self.switcher.mode_aware_reboot( 716 'custom', 717 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 718 wpsw_boot = wpsw_cur = '1' if ec_wp == True else '0' 719 self.check_state((self.checkers.crossystem_checker, { 720 'wpsw_boot': wpsw_boot, 'wpsw_cur': wpsw_cur})) 721 722 def _restore_ec_write_protect(self): 723 """Restore the original EC write-protection.""" 724 if (not hasattr(self, '_old_wpsw_boot')) or (self._old_wpsw_boot is 725 None): 726 return 727 if not self.checkers.crossystem_checker({'wpsw_boot': '1' if 728 self._old_wpsw_boot else '0'}, suppress_logging=True): 729 logging.info('Restore original EC write protection and reboot.') 730 self.switcher.mode_aware_reboot( 731 'custom', 732 lambda:self.set_ec_write_protect_and_reboot( 733 self._old_wpsw_boot)) 734 self.check_state((self.checkers.crossystem_checker, { 735 'wpsw_boot': '1' if self._old_wpsw_boot else '0'})) 736 737 def _setup_uart_capture(self): 738 """Setup the CPU/EC/PD UART capture.""" 739 self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt') 740 self.servo.set('cpu_uart_capture', 'on') 741 self.cr50_uart_file = None 742 self.ec_uart_file = None 743 self.servo_micro_uart_file = None 744 self.servo_v4_uart_file = None 745 self.usbpd_uart_file = None 746 try: 747 # Check that the console works before declaring the cr50 console 748 # connection exists and enabling uart capture. 749 self.servo.get('cr50_version') 750 self.servo.set('cr50_uart_capture', 'on') 751 self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt') 752 self.cr50 = chrome_cr50.ChromeCr50(self.servo) 753 except error.TestFail as e: 754 if 'No control named' in str(e): 755 logging.warn('cr50 console not supported.') 756 if self.faft_config.chrome_ec: 757 try: 758 self.servo.set('ec_uart_capture', 'on') 759 self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt') 760 except error.TestFail as e: 761 if 'No control named' in str(e): 762 logging.warn('The servod is too old that ec_uart_capture ' 763 'not supported.') 764 # Log separate PD console if supported 765 if self.check_ec_capability(['usbpd_uart'], suppress_warning=True): 766 try: 767 self.servo.set('usbpd_uart_capture', 'on') 768 self.usbpd_uart_file = os.path.join(self.resultsdir, 769 'usbpd_uart.txt') 770 except error.TestFail as e: 771 if 'No control named' in str(e): 772 logging.warn('The servod is too old that ' 773 'usbpd_uart_capture is not supported.') 774 else: 775 logging.info('Not a Google EC, cannot capture ec console output.') 776 try: 777 self.servo.set('servo_micro_uart_capture', 'on') 778 self.servo_micro_uart_file = os.path.join(self.resultsdir, 779 'servo_micro_uart.txt') 780 except error.TestFail as e: 781 if 'No control named' in str(e): 782 logging.warn('servo micro console not supported.') 783 try: 784 self.servo.set('servo_v4_uart_capture', 'on') 785 self.servo_v4_uart_file = os.path.join(self.resultsdir, 786 'servo_v4_uart.txt') 787 except error.TestFail as e: 788 if 'No control named' in str(e): 789 logging.warn('servo v4 console not supported.') 790 791 def _record_uart_capture(self): 792 """Record the CPU/EC/PD UART output stream to files.""" 793 if self.cpu_uart_file: 794 with open(self.cpu_uart_file, 'a') as f: 795 f.write(ast.literal_eval(self.servo.get('cpu_uart_stream'))) 796 if self.cr50_uart_file: 797 with open(self.cr50_uart_file, 'a') as f: 798 f.write(ast.literal_eval(self.servo.get('cr50_uart_stream'))) 799 if self.ec_uart_file and self.faft_config.chrome_ec: 800 with open(self.ec_uart_file, 'a') as f: 801 f.write(ast.literal_eval(self.servo.get('ec_uart_stream'))) 802 if self.servo_micro_uart_file: 803 with open(self.servo_micro_uart_file, 'a') as f: 804 f.write(ast.literal_eval(self.servo.get( 805 'servo_micro_uart_stream'))) 806 if self.servo_v4_uart_file: 807 with open(self.servo_v4_uart_file, 'a') as f: 808 f.write(ast.literal_eval(self.servo.get( 809 'servo_v4_uart_stream'))) 810 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 811 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 812 with open(self.usbpd_uart_file, 'a') as f: 813 f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream'))) 814 815 def _cleanup_uart_capture(self): 816 """Cleanup the CPU/EC/PD UART capture.""" 817 # Flush the remaining UART output. 818 self._record_uart_capture() 819 self.servo.set('cpu_uart_capture', 'off') 820 if self.cr50_uart_file: 821 self.servo.set('cr50_uart_capture', 'off') 822 if self.ec_uart_file and self.faft_config.chrome_ec: 823 self.servo.set('ec_uart_capture', 'off') 824 if self.servo_micro_uart_file: 825 self.servo.set('servo_micro_uart_capture', 'off') 826 if self.servo_v4_uart_file: 827 self.servo.set('servo_v4_uart_capture', 'off') 828 if (self.usbpd_uart_file and self.faft_config.chrome_ec and 829 self.check_ec_capability(['usbpd_uart'], suppress_warning=True)): 830 self.servo.set('usbpd_uart_capture', 'off') 831 832 def _get_power_state(self, power_state): 833 """ 834 Return the current power state of the AP 835 """ 836 return self.ec.send_command_get_output("powerinfo", [power_state]) 837 838 def wait_power_state(self, power_state, retries): 839 """ 840 Wait for certain power state. 841 842 @param power_state: power state you are expecting 843 @param retries: retries. This is necessary if AP is powering down 844 and transitioning through different states. 845 """ 846 logging.info('Checking power state "%s" maximum %d times.', 847 power_state, retries) 848 while retries > 0: 849 logging.info("try count: %d", retries) 850 try: 851 retries = retries - 1 852 ret = self._get_power_state(power_state) 853 return True 854 except error.TestFail: 855 pass 856 return False 857 858 def suspend(self): 859 """Suspends the DUT.""" 860 cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY 861 self.faft_client.system.run_shell_command(cmd) 862 time.sleep(self.EC_SUSPEND_DELAY) 863 864 def _fetch_servo_log(self): 865 """Fetch the servo log.""" 866 cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2) 867 servo_log = self.servo.system_output(cmd) 868 return None if servo_log == 'NOTFOUND' else servo_log 869 870 def _setup_servo_log(self): 871 """Setup the servo log capturing.""" 872 self.servo_log_original_len = -1 873 if self.servo.is_localhost(): 874 # No servo log recorded when servod runs locally. 875 return 876 877 servo_log = self._fetch_servo_log() 878 if servo_log: 879 self.servo_log_original_len = len(servo_log) 880 else: 881 logging.warn('Servo log file not found.') 882 883 def _record_servo_log(self): 884 """Record the servo log to the results directory.""" 885 if self.servo_log_original_len != -1: 886 servo_log = self._fetch_servo_log() 887 servo_log_file = os.path.join(self.resultsdir, 'servod.log') 888 with open(servo_log_file, 'a') as f: 889 f.write(servo_log[self.servo_log_original_len:]) 890 891 def _record_faft_client_log(self): 892 """Record the faft client log to the results directory.""" 893 client_log = self.faft_client.system.dump_log(True) 894 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 895 with open(client_log_file, 'w') as f: 896 f.write(client_log) 897 898 def _setup_gbb_flags(self): 899 """Setup the GBB flags for FAFT test.""" 900 if self.check_setup_done('gbb_flags'): 901 return 902 903 logging.info('Set proper GBB flags for test.') 904 # Ensure that GBB flags are set to 0x140. 905 flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE | 906 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM) 907 # And if the "no_ec_sync" argument is set, then disable EC software 908 # sync. 909 if self._no_ec_sync: 910 flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC 911 912 self.clear_set_gbb_flags(0xffffffff, flags_to_set) 913 self.mark_setup_done('gbb_flags') 914 915 def drop_backup_gbb_flags(self): 916 """Drops the backup GBB flags. 917 918 This can be used when a test intends to permanently change GBB flags. 919 """ 920 self._backup_gbb_flags = None 921 922 def _restore_gbb_flags(self): 923 """Restore GBB flags to their original state.""" 924 if self._backup_gbb_flags is None: 925 return 926 # Setting up and restoring the GBB flags take a lot of time. For 927 # speed-up purpose, don't restore it. 928 logging.info('***') 929 logging.info('*** Please manually restore the original GBB flags to: ' 930 '0x%x ***', self._backup_gbb_flags) 931 logging.info('***') 932 self.unmark_setup_done('gbb_flags') 933 934 def setup_tried_fwb(self, tried_fwb): 935 """Setup for fw B tried state. 936 937 It makes sure the system in the requested fw B tried state. If not, it 938 tries to do so. 939 940 @param tried_fwb: True if requested in tried_fwb=1; 941 False if tried_fwb=0. 942 """ 943 if tried_fwb: 944 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 945 logging.info( 946 'Firmware is not booted with tried_fwb. Reboot into it.') 947 self.faft_client.system.set_try_fw_b() 948 else: 949 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 950 logging.info( 951 'Firmware is booted with tried_fwb. Reboot to clear.') 952 953 def power_on(self): 954 """Switch DUT AC power on.""" 955 self._client.power_on(self.power_control) 956 957 def power_off(self): 958 """Switch DUT AC power off.""" 959 self._client.power_off(self.power_control) 960 961 def power_cycle(self): 962 """Power cycle DUT AC power.""" 963 self._client.power_cycle(self.power_control) 964 965 def setup_rw_boot(self, section='a'): 966 """Make sure firmware is in RW-boot mode. 967 968 If the given firmware section is in RO-boot mode, turn off the RO-boot 969 flag and reboot DUT into RW-boot mode. 970 971 @param section: A firmware section, either 'a' or 'b'. 972 """ 973 flags = self.faft_client.bios.get_preamble_flags(section) 974 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 975 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 976 self.faft_client.bios.set_preamble_flags(section, flags) 977 self.switcher.mode_aware_reboot() 978 979 def setup_kernel(self, part): 980 """Setup for kernel test. 981 982 It makes sure both kernel A and B bootable and the current boot is 983 the requested kernel part. 984 985 @param part: A string of kernel partition number or 'a'/'b'. 986 """ 987 self.ensure_kernel_boot(part) 988 logging.info('Checking the integrity of kernel B and rootfs B...') 989 if (self.faft_client.kernel.diff_a_b() or 990 not self.faft_client.rootfs.verify_rootfs('B')): 991 logging.info('Copying kernel and rootfs from A to B...') 992 self.copy_kernel_and_rootfs(from_part=part, 993 to_part=self.OTHER_KERNEL_MAP[part]) 994 self.reset_and_prioritize_kernel(part) 995 996 def reset_and_prioritize_kernel(self, part): 997 """Make the requested partition highest priority. 998 999 This function also reset kerenl A and B to bootable. 1000 1001 @param part: A string of partition number to be prioritized. 1002 """ 1003 root_dev = self.faft_client.system.get_root_dev() 1004 # Reset kernel A and B to bootable. 1005 self.faft_client.system.run_shell_command( 1006 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 1007 self.faft_client.system.run_shell_command( 1008 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 1009 # Set kernel part highest priority. 1010 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 1011 (self.KERNEL_MAP[part], root_dev)) 1012 1013 def do_blocking_sync(self, device): 1014 """Run a blocking sync command.""" 1015 logging.info("Blocking sync for %s", device) 1016 if 'mmcblk' in device: 1017 # For mmc devices, use `mmc status get` command to send an 1018 # empty command to wait for the disk to be available again. 1019 self.faft_client.system.run_shell_command('mmc status get %s' % 1020 device) 1021 elif 'nvme' in device: 1022 # Get a list of NVMe namespace and flush them individually 1023 # Assumes the output format from nvme list-ns command will 1024 # be something like follows: 1025 # [ 0]:0x1 1026 # [ 1]:0x2 1027 available_ns = self.faft_client.system.run_shell_command_get_output( 1028 'nvme list-ns %s -a' % device) 1029 for ns in available_ns: 1030 ns = ns.split(':')[-1] 1031 # For NVMe devices, use `nvme flush` command to commit data 1032 # and metadata to non-volatile media. 1033 self.faft_client.system.run_shell_command( 1034 'nvme flush %s -n %s' % (device, ns)) 1035 else: 1036 # For other devices, hdparm sends TUR to check if 1037 # a device is ready for transfer operation. 1038 self.faft_client.system.run_shell_command('hdparm -f %s' % device) 1039 1040 def blocking_sync(self): 1041 """Sync root device and internal device.""" 1042 # The double calls to sync fakes a blocking call 1043 # since the first call returns before the flush 1044 # is complete, but the second will wait for the 1045 # first to finish. 1046 self.faft_client.system.run_shell_command('sync') 1047 self.faft_client.system.run_shell_command('sync') 1048 1049 # sync only sends SYNCHRONIZE_CACHE but doesn't check the status. 1050 # This function will perform a device-specific sync command. 1051 root_dev = self.faft_client.system.get_root_dev() 1052 self.do_blocking_sync(root_dev) 1053 1054 # Also sync the internal device if booted from removable media. 1055 if self.faft_client.system.is_removable_device_boot(): 1056 internal_dev = self.faft_client.system.get_internal_device() 1057 self.do_blocking_sync(internal_dev) 1058 1059 def sync_and_ec_reboot(self, flags=''): 1060 """Request the client sync and do a EC triggered reboot. 1061 1062 @param flags: Optional, a space-separated string of flags passed to EC 1063 reboot command, including: 1064 default: EC soft reboot; 1065 'hard': EC cold/hard reboot. 1066 """ 1067 self.blocking_sync() 1068 self.ec.reboot(flags) 1069 time.sleep(self.faft_config.ec_boot_to_console) 1070 self.check_lid_and_power_on() 1071 1072 def reboot_and_reset_tpm(self): 1073 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 1074 self.switcher.reboot_to_mode(to_mode='rec') 1075 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 1076 self.switcher.mode_aware_reboot() 1077 1078 def full_power_off_and_on(self): 1079 """Shutdown the device by pressing power button and power on again.""" 1080 boot_id = self.get_bootid() 1081 # Press power button to trigger Chrome OS normal shutdown process. 1082 # We use a customized delay since the normal-press 1.2s is not enough. 1083 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 1084 # device can take 44-51 seconds to restart, 1085 # add buffer from the default timeout of 60 seconds. 1086 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 1087 time.sleep(self.faft_config.shutdown) 1088 if self.check_ec_capability(['x86'], suppress_warning=True): 1089 self.check_shutdown_power_state("G3", pwr_retries=5) 1090 # Short press power button to boot DUT again. 1091 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1092 1093 def check_shutdown_power_state(self, power_state, pwr_retries): 1094 """Check whether the device entered into requested EC power state 1095 after shutdown. 1096 1097 @param power_state: EC power state has to be checked. Either S5 or G3. 1098 @param pwr_retries: Times to check if the DUT in expected power state. 1099 @raise TestFail: If device failed to enter into requested power state. 1100 """ 1101 if not self.wait_power_state(power_state, pwr_retries): 1102 raise error.TestFail('System not shutdown properly and EC fails ' 1103 'to enter into %s state.' % power_state) 1104 logging.info('System entered into %s state..', power_state) 1105 1106 def check_lid_and_power_on(self): 1107 """ 1108 On devices with EC software sync, system powers on after EC reboots if 1109 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 1110 This method checks lid switch state and presses power button if 1111 necessary. 1112 """ 1113 if self.servo.get("lid_open") == "no": 1114 time.sleep(self.faft_config.software_sync) 1115 self.servo.power_short_press() 1116 1117 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 1118 """Modify the kernel header magic in USB stick. 1119 1120 The kernel header magic is the first 8-byte of kernel partition. 1121 We modify it to make it fail on kernel verification check. 1122 1123 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1124 @param from_magic: A string of magic which we change it from. 1125 @param to_magic: A string of magic which we change it to. 1126 @raise TestError: if failed to change magic. 1127 """ 1128 assert len(from_magic) == 8 1129 assert len(to_magic) == 8 1130 # USB image only contains one kernel. 1131 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1132 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1133 current_magic = self.servo.system_output(read_cmd) 1134 if current_magic == to_magic: 1135 logging.info("The kernel magic is already %s.", current_magic) 1136 return 1137 if current_magic != from_magic: 1138 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1139 1140 logging.info('Modify the kernel magic in USB, from %s to %s.', 1141 from_magic, to_magic) 1142 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1143 " 2>/dev/null" % (to_magic, kernel_part)) 1144 self.servo.system(write_cmd) 1145 1146 if self.servo.system_output(read_cmd) != to_magic: 1147 raise error.TestError("Failed to write new magic.") 1148 1149 def corrupt_usb_kernel(self, usb_dev): 1150 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1151 1152 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1153 """ 1154 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1155 self.CORRUPTED_MAGIC) 1156 1157 def restore_usb_kernel(self, usb_dev): 1158 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1159 1160 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1161 """ 1162 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1163 self.CHROMEOS_MAGIC) 1164 1165 def _call_action(self, action_tuple, check_status=False): 1166 """Call the action function with/without arguments. 1167 1168 @param action_tuple: A function, or a tuple (function, args, error_msg), 1169 in which, args and error_msg are optional. args is 1170 either a value or a tuple if multiple arguments. 1171 This can also be a list containing multiple 1172 function or tuple. In this case, these actions are 1173 called in sequence. 1174 @param check_status: Check the return value of action function. If not 1175 succeed, raises a TestFail exception. 1176 @return: The result value of the action function. 1177 @raise TestError: An error when the action function is not callable. 1178 @raise TestFail: When check_status=True, action function not succeed. 1179 """ 1180 if isinstance(action_tuple, list): 1181 return all([self._call_action(action, check_status=check_status) 1182 for action in action_tuple]) 1183 1184 action = action_tuple 1185 args = () 1186 error_msg = 'Not succeed' 1187 if isinstance(action_tuple, tuple): 1188 action = action_tuple[0] 1189 if len(action_tuple) >= 2: 1190 args = action_tuple[1] 1191 if not isinstance(args, tuple): 1192 args = (args,) 1193 if len(action_tuple) >= 3: 1194 error_msg = action_tuple[2] 1195 1196 if action is None: 1197 return 1198 1199 if not callable(action): 1200 raise error.TestError('action is not callable!') 1201 1202 info_msg = 'calling %s' % action.__name__ 1203 if args: 1204 info_msg += ' with args %s' % str(args) 1205 logging.info(info_msg) 1206 ret = action(*args) 1207 1208 if check_status and not ret: 1209 raise error.TestFail('%s: %s returning %s' % 1210 (error_msg, info_msg, str(ret))) 1211 return ret 1212 1213 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1214 run_power_action=True, post_power_action=None, 1215 shutdown_timeout=None): 1216 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1217 1218 @param shutdown_action: function which makes DUT shutdown, like 1219 pressing power key. 1220 @param pre_power_action: function which is called before next power on. 1221 @param run_power_action: power_key press by default, set to None to skip. 1222 @param post_power_action: function which is called after next power on. 1223 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1224 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1225 """ 1226 self._call_action(shutdown_action) 1227 logging.info('Wait to ensure DUT shut down...') 1228 try: 1229 if shutdown_timeout is None: 1230 shutdown_timeout = self.faft_config.shutdown_timeout 1231 self.switcher.wait_for_client(timeout=shutdown_timeout) 1232 raise error.TestFail( 1233 'Should shut the device down after calling %s.' % 1234 shutdown_action.__name__) 1235 except ConnectionError: 1236 if self.check_ec_capability(['x86'], suppress_warning=True): 1237 self.check_shutdown_power_state("G3", pwr_retries=5) 1238 logging.info( 1239 'DUT is surely shutdown. We are going to power it on again...') 1240 1241 if pre_power_action: 1242 self._call_action(pre_power_action) 1243 if run_power_action: 1244 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1245 if post_power_action: 1246 self._call_action(post_power_action) 1247 1248 def get_bootid(self, retry=3): 1249 """ 1250 Return the bootid. 1251 """ 1252 boot_id = None 1253 while retry: 1254 try: 1255 boot_id = self._client.get_boot_id() 1256 break 1257 except error.AutoservRunError: 1258 retry -= 1 1259 if retry: 1260 logging.info('Retry to get boot_id...') 1261 else: 1262 logging.warning('Failed to get boot_id.') 1263 logging.info('boot_id: %s', boot_id) 1264 return boot_id 1265 1266 def check_state(self, func): 1267 """ 1268 Wrapper around _call_action with check_status set to True. This is a 1269 helper function to be used by tests and is currently implemented by 1270 calling _call_action with check_status=True. 1271 1272 TODO: This function's arguments need to be made more stringent. And 1273 its functionality should be moved over to check functions directly in 1274 the future. 1275 1276 @param func: A function, or a tuple (function, args, error_msg), 1277 in which, args and error_msg are optional. args is 1278 either a value or a tuple if multiple arguments. 1279 This can also be a list containing multiple 1280 function or tuple. In this case, these actions are 1281 called in sequence. 1282 @return: The result value of the action function. 1283 @raise TestFail: If the function does notsucceed. 1284 """ 1285 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1286 self._call_action(func, check_status=True) 1287 logging.info("-[FAFT]-[ end state_checker ]----------------") 1288 1289 def get_current_firmware_sha(self): 1290 """Get current firmware sha of body and vblock. 1291 1292 @return: Current firmware sha follows the order ( 1293 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha) 1294 """ 1295 current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'), 1296 self.faft_client.bios.get_body_sha('a'), 1297 self.faft_client.bios.get_sig_sha('b'), 1298 self.faft_client.bios.get_body_sha('b')) 1299 if not all(current_firmware_sha): 1300 raise error.TestError('Failed to get firmware sha.') 1301 return current_firmware_sha 1302 1303 def is_firmware_changed(self): 1304 """Check if the current firmware changed, by comparing its SHA. 1305 1306 @return: True if it is changed, otherwise Flase. 1307 """ 1308 # Device may not be rebooted after test. 1309 self.faft_client.bios.reload() 1310 1311 current_sha = self.get_current_firmware_sha() 1312 1313 if current_sha == self._backup_firmware_sha: 1314 return False 1315 else: 1316 corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0]) 1317 corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1]) 1318 corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2]) 1319 corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3]) 1320 logging.info('Firmware changed:') 1321 logging.info('VBOOTA is changed: %s', corrupt_VBOOTA) 1322 logging.info('VBOOTB is changed: %s', corrupt_VBOOTB) 1323 logging.info('FVMAIN is changed: %s', corrupt_FVMAIN) 1324 logging.info('FVMAINB is changed: %s', corrupt_FVMAINB) 1325 return True 1326 1327 def backup_firmware(self, suffix='.original'): 1328 """Backup firmware to file, and then send it to host. 1329 1330 @param suffix: a string appended to backup file name 1331 """ 1332 remote_temp_dir = self.faft_client.system.create_temp_dir() 1333 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1334 self.faft_client.bios.dump_whole(remote_bios_path) 1335 self._client.get_file(remote_bios_path, 1336 os.path.join(self.resultsdir, 'bios' + suffix)) 1337 1338 if self.faft_config.chrome_ec: 1339 remote_ec_path = os.path.join(remote_temp_dir, 'ec') 1340 self.faft_client.ec.dump_whole(remote_ec_path) 1341 self._client.get_file(remote_ec_path, 1342 os.path.join(self.resultsdir, 'ec' + suffix)) 1343 1344 self._client.run('rm -rf %s' % remote_temp_dir) 1345 logging.info('Backup firmware stored in %s with suffix %s', 1346 self.resultsdir, suffix) 1347 1348 self._backup_firmware_sha = self.get_current_firmware_sha() 1349 1350 def is_firmware_saved(self): 1351 """Check if a firmware saved (called backup_firmware before). 1352 1353 @return: True if the firmware is backuped; otherwise False. 1354 """ 1355 return self._backup_firmware_sha != () 1356 1357 def clear_saved_firmware(self): 1358 """Clear the firmware saved by the method backup_firmware.""" 1359 self._backup_firmware_sha = () 1360 1361 def restore_firmware(self, suffix='.original', restore_ec=True): 1362 """Restore firmware from host in resultsdir. 1363 1364 @param suffix: a string appended to backup file name 1365 @param restore_ec: True to restore the ec firmware; False not to do. 1366 """ 1367 if not self.is_firmware_changed(): 1368 return 1369 1370 # Backup current corrupted firmware. 1371 self.backup_firmware(suffix='.corrupt') 1372 1373 # Restore firmware. 1374 remote_temp_dir = self.faft_client.system.create_temp_dir() 1375 self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix), 1376 os.path.join(remote_temp_dir, 'bios')) 1377 1378 self.faft_client.bios.write_whole( 1379 os.path.join(remote_temp_dir, 'bios')) 1380 1381 if self.faft_config.chrome_ec and restore_ec: 1382 self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix), 1383 os.path.join(remote_temp_dir, 'ec')) 1384 self.faft_client.ec.write_whole( 1385 os.path.join(remote_temp_dir, 'ec')) 1386 1387 self.switcher.mode_aware_reboot() 1388 logging.info('Successfully restore firmware.') 1389 1390 def setup_firmwareupdate_shellball(self, shellball=None): 1391 """Setup a shellball to use in firmware update test. 1392 1393 Check if there is a given shellball, and it is a shell script. Then, 1394 send it to the remote host. Otherwise, use the 1395 /usr/sbin/chromeos-firmwareupdate in the image and replace its inside 1396 BIOS and EC images with the active firmware images. 1397 1398 @param shellball: path of a shellball or default to None. 1399 """ 1400 if shellball: 1401 # Determine the firmware file is a shellball or a raw binary. 1402 is_shellball = (utils.system_output("file %s" % shellball).find( 1403 "shell script") != -1) 1404 if is_shellball: 1405 logging.info('Device will update firmware with shellball %s', 1406 shellball) 1407 temp_path = self.faft_client.updater.get_temp_path() 1408 working_shellball = os.path.join(temp_path, 1409 'chromeos-firmwareupdate') 1410 self._client.send_file(shellball, working_shellball) 1411 self.faft_client.updater.extract_shellball() 1412 else: 1413 raise error.TestFail( 1414 'The given shellball is not a shell script.') 1415 else: 1416 logging.info('No shellball given, use the original shellball and ' 1417 'replace its BIOS and EC images.') 1418 work_path = self.faft_client.updater.get_work_path() 1419 bios_in_work_path = os.path.join( 1420 work_path, self.faft_client.updater.get_bios_relative_path()) 1421 ec_in_work_path = os.path.join( 1422 work_path, self.faft_client.updater.get_ec_relative_path()) 1423 logging.info('Writing current BIOS to: %s', bios_in_work_path) 1424 self.faft_client.bios.dump_whole(bios_in_work_path) 1425 if self.faft_config.chrome_ec: 1426 logging.info('Writing current EC to: %s', ec_in_work_path) 1427 self.faft_client.ec.dump_firmware(ec_in_work_path) 1428 self.faft_client.updater.repack_shellball() 1429 1430 def is_kernel_changed(self): 1431 """Check if the current kernel is changed, by comparing its SHA1 hash. 1432 1433 @return: True if it is changed; otherwise, False. 1434 """ 1435 changed = False 1436 for p in ('A', 'B'): 1437 backup_sha = self._backup_kernel_sha.get(p, None) 1438 current_sha = self.faft_client.kernel.get_sha(p) 1439 if backup_sha != current_sha: 1440 changed = True 1441 logging.info('Kernel %s is changed', p) 1442 return changed 1443 1444 def backup_kernel(self, suffix='.original'): 1445 """Backup kernel to files, and the send them to host. 1446 1447 @param suffix: a string appended to backup file name. 1448 """ 1449 remote_temp_dir = self.faft_client.system.create_temp_dir() 1450 for p in ('A', 'B'): 1451 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1452 self.faft_client.kernel.dump(p, remote_path) 1453 self._client.get_file( 1454 remote_path, 1455 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1456 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1457 logging.info('Backup kernel stored in %s with suffix %s', 1458 self.resultsdir, suffix) 1459 1460 def is_kernel_saved(self): 1461 """Check if kernel images are saved (backup_kernel called before). 1462 1463 @return: True if the kernel is saved; otherwise, False. 1464 """ 1465 return len(self._backup_kernel_sha) != 0 1466 1467 def clear_saved_kernel(self): 1468 """Clear the kernel saved by backup_kernel().""" 1469 self._backup_kernel_sha = dict() 1470 1471 def restore_kernel(self, suffix='.original'): 1472 """Restore kernel from host in resultsdir. 1473 1474 @param suffix: a string appended to backup file name. 1475 """ 1476 if not self.is_kernel_changed(): 1477 return 1478 1479 # Backup current corrupted kernel. 1480 self.backup_kernel(suffix='.corrupt') 1481 1482 # Restore kernel. 1483 remote_temp_dir = self.faft_client.system.create_temp_dir() 1484 for p in ('A', 'B'): 1485 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1486 self._client.send_file( 1487 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1488 remote_path) 1489 self.faft_client.kernel.write(p, remote_path) 1490 1491 self.switcher.mode_aware_reboot() 1492 logging.info('Successfully restored kernel.') 1493 1494 def backup_cgpt_attributes(self): 1495 """Backup CGPT partition table attributes.""" 1496 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1497 1498 def restore_cgpt_attributes(self): 1499 """Restore CGPT partition table attributes.""" 1500 current_table = self.faft_client.cgpt.get_attributes() 1501 if current_table == self._backup_cgpt_attr: 1502 return 1503 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1504 self._backup_cgpt_attr, 1505 current_table) 1506 self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr) 1507 1508 self.switcher.mode_aware_reboot() 1509 logging.info('Successfully restored CGPT table.') 1510 1511 def try_fwb(self, count=0): 1512 """set to try booting FWB count # times 1513 1514 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 1515 vboot2 1516 1517 @param count: an integer specifying value to program into 1518 fwb_tries(vb1)/fw_try_next(vb2) 1519 """ 1520 if self.fw_vboot2: 1521 self.faft_client.system.set_fw_try_next('B', count) 1522 else: 1523 # vboot1: we need to boot into fwb at least once 1524 if not count: 1525 count = count + 1 1526 self.faft_client.system.set_try_fw_b(count) 1527 1528