1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import print_function 6 7import ctypes 8import logging 9import os 10import pprint 11import re 12import StringIO 13import time 14import uuid 15 16from autotest_lib.client.bin import utils 17from autotest_lib.client.common_lib import error 18from autotest_lib.client.common_lib import global_config 19from autotest_lib.client.common_lib.cros import retry 20from autotest_lib.client.common_lib.cros import tpm_utils 21from autotest_lib.server import test 22from autotest_lib.server.cros import vboot_constants as vboot 23from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig 24from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy 25from autotest_lib.server.cros.faft.utils import mode_switcher 26from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers 27from autotest_lib.server.cros.power import utils as PowerUtils 28from autotest_lib.server.cros.servo import chrome_base_ec 29from autotest_lib.server.cros.servo import chrome_cr50 30from autotest_lib.server.cros.servo import chrome_ec 31from autotest_lib.server.cros.servo import servo 32from autotest_lib.server.cros.faft import telemetry 33 34# Experimentally tuned time in minutes to wait for partition device nodes on a 35# USB stick to be ready after plugging in the stick. 36PARTITION_TABLE_READINESS_TIMEOUT = 0.1 # minutes 37# Experimentally tuned time in seconds to wait for the first retry of reading 38# the sysfs node of a USB stick's partition device node. 39PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY = 1 # seconds 40 41ConnectionError = mode_switcher.ConnectionError 42 43 44class FirmwareTest(test.test): 45 """ 46 Base class that sets up helper objects/functions for firmware tests. 47 48 It launches the FAFTClient on DUT, such that the test can access its 49 firmware functions and interfaces. It also provides some methods to 50 handle the reboot mechanism, in order to ensure FAFTClient is still 51 connected after reboot. 52 @type servo: servo.Servo 53 @type _client: autotest_lib.server.hosts.ssh_host.SSHHost | 54 autotest_lib.server.hosts.cros_host.CrosHost 55 56 TODO: add documentaion as the FAFT rework progresses. 57 """ 58 version = 1 59 60 # Set this to True in test classes that need to boot from the USB stick. 61 # When True, initialize() will raise TestWarn if USB stick is marked bad. 62 NEEDS_SERVO_USB = False 63 64 # Mapping of partition number of kernel and rootfs. 65 KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'} 66 ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'} 67 OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'} 68 OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'} 69 70 CHROMEOS_MAGIC = "CHROMEOS" 71 CORRUPTED_MAGIC = "CORRUPTD" 72 73 # System power states 74 POWER_STATE_S0 = 'S0' 75 POWER_STATE_S0IX = 'S0ix' 76 POWER_STATE_S3 = 'S3' 77 POWER_STATE_S5 = 'S5' 78 POWER_STATE_G3 = 'G3' 79 POWER_STATE_SUSPEND = '|'.join([POWER_STATE_S0IX, POWER_STATE_S3]) 80 81 # Delay for waiting client to return before EC suspend 82 EC_SUSPEND_DELAY = 5 83 84 # Delay between EC suspend and wake 85 WAKE_DELAY = 10 86 87 # Delay between closing and opening lid 88 LID_DELAY = 1 89 90 # Delay for establishing state after changing PD settings 91 PD_RESYNC_DELAY = 2 92 93 # The default number of power state check retries (each try takes 3 secs) 94 DEFAULT_PWR_RETRIES = 5 95 96 # FWMP space constants 97 FWMP_CLEARED_EXIT_STATUS = 1 98 FWMP_CLEARED_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS' 99 '_INVALID') 100 101 _ROOTFS_PARTITION_NUMBER = 3 102 103 # Class level variable, keep track the states of one time setup. 104 # This variable is preserved across tests which inherit this class. 105 _global_setup_done = { 106 'gbb_flags': False, 107 'reimage': False, 108 'usb_check': False, 109 } 110 111 # CCD password used by tests. 112 CCD_PASSWORD = 'Password' 113 114 RESPONSE_TIMEOUT = 180 115 116 @classmethod 117 def check_setup_done(cls, label): 118 """Check if the given setup is done. 119 120 @param label: The label of the setup. 121 """ 122 return cls._global_setup_done[label] 123 124 @classmethod 125 def mark_setup_done(cls, label): 126 """Mark the given setup done. 127 128 @param label: The label of the setup. 129 """ 130 cls._global_setup_done[label] = True 131 132 @classmethod 133 def unmark_setup_done(cls, label): 134 """Mark the given setup not done. 135 136 @param label: The label of the setup. 137 """ 138 cls._global_setup_done[label] = False 139 140 def initialize(self, host, cmdline_args, ec_wp=None): 141 """Initialize the FirmwareTest. 142 143 This method interacts with the Servo, FAFT RPC client, FAFT Config, 144 Mode Switcher, EC consoles, write-protection, GBB flags, and a lockfile. 145 146 @type host: autotest_lib.server.hosts.CrosHost 147 """ 148 self.run_id = str(uuid.uuid4()) 149 self._client = host 150 self.servo = host.servo 151 152 self.lockfile = '/usr/local/tmp/faft/lock' 153 self._backup_gbb_flags = None 154 self._backup_firmware_identity = dict() 155 self._backup_kernel_sha = dict() 156 self._backup_cgpt_attr = dict() 157 self._backup_dev_mode = None 158 self._restore_power_mode = None 159 self._uart_file_dict = {} 160 161 logging.info('FirmwareTest initialize begin (id=%s)', self.run_id) 162 163 # Parse arguments from command line 164 args = {} 165 self.power_control = host.POWER_CONTROL_RPM 166 for arg in cmdline_args: 167 match = re.search("^(\w+)=(.+)", arg) 168 if match: 169 args[match.group(1)] = match.group(2) 170 171 self._no_fw_rollback_check = False 172 if 'no_fw_rollback_check' in args: 173 if 'true' in args['no_fw_rollback_check'].lower(): 174 self._no_fw_rollback_check = True 175 176 self._no_ec_sync = False 177 if 'no_ec_sync' in args: 178 if 'true' in args['no_ec_sync'].lower(): 179 self._no_ec_sync = True 180 181 self._use_sync_script = global_config.global_config.get_config_value( 182 'CROS', 'enable_fs_sync_script', type=bool, default=False) 183 184 self.servo.initialize_dut() 185 self.faft_client = RPCProxy(host) 186 self.faft_config = FAFTConfig( 187 self.faft_client.system.get_platform_name(), 188 self.faft_client.system.get_model_name()) 189 self.checkers = FAFTCheckers(self) 190 191 if self.faft_config.chrome_ec: 192 self.ec = chrome_ec.ChromeEC(self.servo) 193 self.switcher = mode_switcher.create_mode_switcher(self) 194 # Check for presence of a USBPD console 195 if self.faft_config.chrome_usbpd: 196 self.usbpd = chrome_ec.ChromeUSBPD(self.servo) 197 elif self.faft_config.chrome_ec: 198 # If no separate USBPD console, then PD exists on EC console 199 self.usbpd = self.ec 200 # Get pdtester console 201 self.pdtester = host.pdtester 202 self.pdtester_host = host._pdtester_host 203 # Check for presence of a working Cr50 console 204 if self.servo.has_control('cr50_version'): 205 try: 206 # Check that the console works before declaring the cr50 console 207 # connection exists and enabling uart capture. 208 cr50 = chrome_cr50.ChromeCr50(self.servo, self.faft_config) 209 cr50.get_version() 210 self.cr50 = cr50 211 except servo.ControlUnavailableError: 212 logging.warn('cr50 console not supported.') 213 except Exception as e: 214 logging.warn('Ignored unknown cr50 version error: %s', str(e)) 215 216 if 'power_control' in args: 217 self.power_control = args['power_control'] 218 if self.power_control not in host.POWER_CONTROL_VALID_ARGS: 219 raise error.TestError('Valid values for --args=power_control ' 220 'are %s. But you entered wrong argument ' 221 'as "%s".' 222 % (host.POWER_CONTROL_VALID_ARGS, 223 self.power_control)) 224 225 if self.NEEDS_SERVO_USB and not host.is_servo_usb_usable(): 226 usb_state = host.get_servo_usb_state() 227 raise error.TestWarn( 228 "Servo USB disk unusable (%s); canceling test." % 229 usb_state) 230 231 if not self.faft_client.system.dev_tpm_present(): 232 raise error.TestError('/dev/tpm0 does not exist on the client') 233 234 # Initialize servo role to src 235 self.servo.set_servo_v4_role('src') 236 237 # Create the BaseEC object. None if not available. 238 self.base_ec = chrome_base_ec.create_base_ec(self.servo) 239 240 self._record_uart_capture() 241 self._record_system_info() 242 self.faft_client.system.set_dev_default_boot() 243 self.fw_vboot2 = self.faft_client.system.get_fw_vboot2() 244 logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1) 245 if self.fw_vboot2: 246 self.faft_client.system.set_fw_try_next('A') 247 if self.faft_client.system.get_crossystem_value( 248 'mainfw_act') == 'B': 249 logging.info('mainfw_act is B. rebooting to set it A') 250 # TODO(crbug.com/1018322): remove try/catch once that bug is 251 # marked as fixed and verified. In that case the overlay for 252 # the board itself will map warm_reset to cold_reset. 253 try: 254 self.switcher.mode_aware_reboot() 255 except ConnectionError as e: 256 if 'DUT is still up unexpectedly' in str(e): 257 # In this case, try doing a cold_reset instead 258 self.switcher.mode_aware_reboot(reboot_type='cold') 259 else: 260 raise 261 262 # Check flashrom before first use, to avoid xmlrpclib.Fault. 263 if not self.faft_client.bios.is_available(): 264 raise error.TestError( 265 "flashrom is broken; check 'flashrom -p host'" 266 "and rpc server log.") 267 268 self._setup_gbb_flags() 269 self.faft_client.updater.stop_daemon() 270 self._create_faft_lockfile() 271 self._create_old_faft_lockfile() 272 self._setup_ec_write_protect(ec_wp) 273 # See chromium:239034 regarding needing this sync. 274 self.blocking_sync() 275 logging.info('FirmwareTest initialize done (id=%s)', self.run_id) 276 277 def stage_build_to_usbkey(self): 278 """Downloads host's build to the USB key attached to servo. 279 280 @return: True if build is verified to be on USB key, False otherwise. 281 """ 282 info = self._client.host_info_store.get() 283 if info.build: 284 current_build = self._client._servo_host.validate_image_usbkey() 285 if current_build != info.build: 286 logging.debug('Current build on USB: %s differs from test' 287 ' build: %s, proceed with download.', 288 current_build, info.build) 289 try: 290 self._client.stage_build_to_usb(info.build) 291 return True 292 except error.AutotestError as e: 293 logging.warn('Stage build to USB failed, tests that require' 294 ' test image on Servo USB may fail: {}'.format(e)) 295 return False 296 else: 297 logging.debug('Current build on USB: %s is same as test' 298 ' build, skip download.', current_build) 299 return True 300 else: 301 logging.warn('Failed to get build label from the DUT, will use' 302 ' existing image in Servo USB.') 303 return False 304 305 def run_once(self, *args, **dargs): 306 """Delegates testing to a test method. 307 308 test_name is either the 1st positional argument or a named argument. 309 310 test_name will be mapped to a test method as follows: 311 test_name method 312 -------------- ----------- 313 <TestClass> test 314 <TestClass>.<Case> test_<Case> 315 <TestClass>.<Case>.<SubCase> test_<Case>_<SubCase> 316 317 Any arguments not consumed by FirmwareTest are passed to the test method. 318 319 @param test_name: Should be set to NAME in the control file. 320 321 @raise TestError: If test_name wasn't found in args, does not start 322 with test class, or if the method is not found. 323 """ 324 self_name = type(self).__name__ 325 326 # Parse and remove test name from args. 327 if 'test_name' in dargs: 328 test_name = dargs.pop('test_name') 329 elif len(args) >= 1: 330 test_name = args[0] 331 args = args[1:] 332 else: 333 raise error.TestError('"%s" class must define run_once, or the' 334 ' control file must specify "test_name".' % 335 self_name) 336 337 # Check that test_name starts with the test class name. 338 name_parts = test_name.split('.') 339 340 test_class = name_parts.pop(0) 341 if test_class != self_name: 342 raise error.TestError('Class "%s" does not match that found in test' 343 ' name "%s"' % (self_name, test_class)) 344 345 # Construct and call the test method. 346 method_name = '_'.join(['test'] + name_parts) 347 if not hasattr(self, method_name): 348 raise error.TestError('Method "%s" for testing "%s" not found in' 349 ' "%s"' % (method_name, test_name, self_name)) 350 351 logging.info('Starting test: "%s"', test_name) 352 utils.cherry_pick_call(getattr(self, method_name), *args, **dargs) 353 354 def cleanup(self): 355 """Autotest cleanup function.""" 356 # Unset state checker in case it's set by subclass 357 logging.info('FirmwareTest cleaning up (id=%s)', self.run_id) 358 359 # Capture UART before doing anything else, so we can guarantee we get 360 # some uart results. 361 try: 362 self._record_uart_capture() 363 except: 364 logging.warn('Failed initial uart capture during cleanup') 365 366 try: 367 self.faft_client.system.is_available() 368 except: 369 # Remote is not responding. Revive DUT so that subsequent tests 370 # don't fail. 371 self._restore_routine_from_timeout() 372 373 if hasattr(self, 'switcher'): 374 self.switcher.restore_mode() 375 376 self._restore_ec_write_protect() 377 self._restore_servo_v4_role() 378 379 if hasattr(self, 'faft_client'): 380 self._restore_gbb_flags() 381 self.faft_client.updater.start_daemon() 382 self.faft_client.updater.cleanup() 383 self._remove_faft_lockfile() 384 self._remove_old_faft_lockfile() 385 self._record_faft_client_log() 386 self.faft_client.quit() 387 388 # Capture any new uart output, then discard log messages again. 389 self._cleanup_uart_capture() 390 391 super(FirmwareTest, self).cleanup() 392 logging.info('FirmwareTest cleanup done (id=%s)', self.run_id) 393 394 def _record_system_info(self): 395 """Record some critical system info to the attr keyval. 396 397 This info is used by generate_test_report later. 398 """ 399 system_info = { 400 'hwid': self.faft_client.system.get_crossystem_value('hwid'), 401 'ec_version': self.faft_client.ec.get_version(), 402 'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'), 403 'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'), 404 'servo_host_os_version' : self.servo.get_os_version(), 405 'servod_version': self.servo.get_servod_version(), 406 'os_version': self._client.get_release_builder_path(), 407 'servo_type': self.servo.get_servo_version() 408 } 409 410 # Record the servo v4 and servo micro versions when possible 411 system_info.update(self.servo.get_servo_fw_versions()) 412 413 if hasattr(self, 'cr50'): 414 system_info['cr50_version'] = self.cr50.get_full_version() 415 416 logging.info('System info:\n%s', pprint.pformat(system_info)) 417 self.write_attr_keyval(system_info) 418 419 def invalidate_firmware_setup(self): 420 """Invalidate all firmware related setup state. 421 422 This method is called when the firmware is re-flashed. It resets all 423 firmware related setup states so that the next test setup properly 424 again. 425 """ 426 self.unmark_setup_done('gbb_flags') 427 428 def _retrieve_recovery_reason_from_trap(self): 429 """Try to retrieve the recovery reason from a trapped recovery screen. 430 431 @return: The recovery_reason, 0 if any error. 432 """ 433 recovery_reason = 0 434 logging.info('Try to retrieve recovery reason...') 435 if self.servo.get_usbkey_state() == 'dut': 436 self.switcher.bypass_rec_mode() 437 else: 438 self.servo.switch_usbkey('dut') 439 440 try: 441 self.switcher.wait_for_client() 442 lines = self.faft_client.system.run_shell_command_get_output( 443 'crossystem recovery_reason') 444 recovery_reason = int(lines[0]) 445 logging.info('Got the recovery reason %d.', recovery_reason) 446 except ConnectionError: 447 logging.error('Failed to get the recovery reason due to connection ' 448 'error.') 449 return recovery_reason 450 451 def _reset_client(self): 452 """Reset client to a workable state. 453 454 This method is called when the client is not responsive. It may be 455 caused by the following cases: 456 - halt on a firmware screen without timeout, e.g. REC_INSERT screen; 457 - corrupted firmware; 458 - corrutped OS image. 459 """ 460 # DUT may halt on a firmware screen. Try cold reboot. 461 logging.info('Try cold reboot...') 462 self.switcher.mode_aware_reboot(reboot_type='cold', 463 sync_before_boot=False, 464 wait_for_dut_up=False) 465 self.switcher.wait_for_client_offline() 466 self.switcher.bypass_dev_mode() 467 try: 468 self.switcher.wait_for_client() 469 return 470 except ConnectionError: 471 logging.warn('Cold reboot doesn\'t help, still connection error.') 472 473 # DUT may be broken by a corrupted firmware. Restore firmware. 474 # We assume the recovery boot still works fine. Since the recovery 475 # code is in RO region and all FAFT tests don't change the RO region 476 # except GBB. 477 if self.is_firmware_saved(): 478 self._ensure_client_in_recovery() 479 logging.info('Try restore the original firmware...') 480 if self.is_firmware_changed(): 481 try: 482 self.restore_firmware() 483 return 484 except ConnectionError: 485 logging.warn('Restoring firmware doesn\'t help, still ' 486 'connection error.') 487 488 # Perhaps it's kernel that's broken. Let's try restoring it. 489 if self.is_kernel_saved(): 490 self._ensure_client_in_recovery() 491 logging.info('Try restore the original kernel...') 492 if self.is_kernel_changed(): 493 try: 494 self.restore_kernel() 495 return 496 except ConnectionError: 497 logging.warn('Restoring kernel doesn\'t help, still ' 498 'connection error.') 499 500 # DUT may be broken by a corrupted OS image. Restore OS image. 501 self._ensure_client_in_recovery() 502 logging.info('Try restore the OS image...') 503 self.faft_client.system.run_shell_command('chromeos-install --yes') 504 self.switcher.mode_aware_reboot(wait_for_dut_up=False) 505 self.switcher.wait_for_client_offline() 506 self.switcher.bypass_dev_mode() 507 try: 508 self.switcher.wait_for_client() 509 logging.info('Successfully restore OS image.') 510 return 511 except ConnectionError: 512 logging.warn('Restoring OS image doesn\'t help, still connection ' 513 'error.') 514 515 def _ensure_client_in_recovery(self): 516 """Ensure client in recovery boot; reboot into it if necessary. 517 518 @raise TestError: if failed to boot the USB image. 519 """ 520 logging.info('Try boot into USB image...') 521 self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False, 522 wait_for_dut_up=False) 523 self.servo.switch_usbkey('host') 524 self.switcher.bypass_rec_mode() 525 try: 526 self.switcher.wait_for_client() 527 except ConnectionError: 528 raise error.TestError('Failed to boot the USB image.') 529 530 def _restore_routine_from_timeout(self): 531 """A routine to try to restore the system from a timeout error. 532 533 This method is called when FAFT failed to connect DUT after reboot. 534 535 @raise TestFail: This exception is already raised, with a decription 536 why it failed. 537 """ 538 # DUT is disconnected. Capture the UART output for debug. 539 self._record_uart_capture() 540 541 # TODO(waihong@chromium.org): Implement replugging the Ethernet to 542 # identify if it is a network flaky. 543 544 recovery_reason = self._retrieve_recovery_reason_from_trap() 545 546 # Reset client to a workable state. 547 self._reset_client() 548 549 # Raise the proper TestFail exception. 550 if recovery_reason: 551 raise error.TestFail('Trapped in the recovery screen (reason: %d) ' 552 'and timed out' % recovery_reason) 553 else: 554 raise error.TestFail('Timed out waiting for DUT reboot') 555 556 def assert_test_image_in_usb_disk(self, usb_dev=None): 557 """Assert an USB disk plugged-in on servo and a test image inside. 558 559 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 560 If None, it is detected automatically. 561 @raise TestError: if USB disk not detected or not a test image. 562 """ 563 if self.check_setup_done('usb_check'): 564 return 565 if usb_dev: 566 assert self.servo.get_usbkey_state() == 'host' 567 else: 568 self.servo.switch_usbkey('host') 569 usb_dev = self.servo.probe_host_usb_dev() 570 if not usb_dev: 571 raise error.TestError( 572 'An USB disk should be plugged in the servo board. %s' % 573 telemetry.collect_usb_state(self.servo)) 574 575 rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER) 576 logging.info('usb dev is %s', usb_dev) 577 tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX') 578 # After the USB key is muxed from the DUT to the servo host, there 579 # appears to be a delay between when servod can confirm that a sysfs 580 # entry exists for the disk (as done by probe_host_usb_dev) and when 581 # sysfs entries get populated for the disk's partitions. 582 @retry.retry(error.AutoservRunError, 583 timeout_min=PARTITION_TABLE_READINESS_TIMEOUT, 584 delay_sec=PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY) 585 def confirm_rootfs_partition_device_node_readable(): 586 """Repeatedly poll for the RootFS partition sysfs node.""" 587 self.servo.system('ls {}'.format(rootfs)) 588 589 try: 590 confirm_rootfs_partition_device_node_readable() 591 except error.AutoservRunError as e: 592 usb_info = telemetry.collect_usb_state(self.servo) 593 raise error.TestError( 594 ('Could not ls the device node for the RootFS on the USB ' 595 'device. %s: %s\nMore telemetry: %s') % 596 (type(e).__name__, e, usb_info)) 597 try: 598 self.servo.system('mount -o ro %s %s' % (rootfs, tmpd)) 599 except error.AutoservRunError as e: 600 usb_info = telemetry.collect_usb_state(self.servo) 601 raise error.TestError( 602 ('Could not mount the partition on USB device. %s: %s\n' 603 'More telemetry: %s') % (type(e).__name__, e, usb_info)) 604 605 try: 606 usb_lsb = self.servo.system_output('cat %s' % 607 os.path.join(tmpd, 'etc/lsb-release')) 608 logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb) 609 dut_lsb = '\n'.join(self.faft_client.system. 610 run_shell_command_get_output('cat /etc/lsb-release')) 611 logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb) 612 if not re.search(r'RELEASE_TRACK=.*test', usb_lsb): 613 raise error.TestError('USB stick in servo is no test image') 614 usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1) 615 dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1) 616 if usb_board != dut_board: 617 raise error.TestError('USB stick in servo contains a %s ' 618 'image, but DUT is a %s' % (usb_board, dut_board)) 619 finally: 620 for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd): 621 self.servo.system(cmd) 622 623 self.mark_setup_done('usb_check') 624 625 def setup_pdtester(self, flip_cc=False, dts_mode=False, pd_faft=True, 626 min_batt_level=None): 627 """Setup the PDTester to a given state. 628 629 @param flip_cc: True to flip CC polarity; False to not flip it. 630 @param dts_mode: True to config PDTester to DTS mode; False to not. 631 @param pd_faft: True to config PD FAFT setup. 632 @param min_batt_level: An int for minimum battery level, or None for 633 skip. 634 @raise TestError: If Servo v4 not setup properly. 635 """ 636 637 # PD FAFT is only tested with a least a servo V4 with servo micro 638 # or C2D2. 639 if pd_faft and ( 640 'servo_v4_with_servo_micro' not in self.pdtester.servo_type 641 ) and ('servo_v4_with_c2d2' not in self.pdtester.servo_type): 642 raise error.TestError('servo_v4_with_servo_micro or ' 643 'servo_v4_with_c2d2 is a mandatory setup ' 644 'for PD FAFT. Got %s.' % 645 self.pdtester.servo_type) 646 647 # Ensure the battery is enough for testing, this should be done before 648 # all the following setup. 649 if (min_batt_level is not None) and self._client.has_battery(): 650 logging.info('Start charging if batt level < %d', min_batt_level) 651 PowerUtils.put_host_battery_in_range(self._client, min_batt_level, 652 100, 600) 653 654 # Servo v4 by default has dts_mode enabled. Enabling dts_mode affects 655 # the behaviors of what PD FAFT tests. So we want it disabled. 656 if 'servo_v4' in self.pdtester.servo_type: 657 self.servo.set_dts_mode('on' if dts_mode else 'off') 658 else: 659 logging.warn('Configuring DTS mode only supported on Servo v4') 660 661 self.pdtester.set('usbc_polarity', 'cc2' if flip_cc else 'cc1') 662 # Make it sourcing max voltage. 663 self.pdtester.charge(self.pdtester.USBC_MAX_VOLTAGE) 664 665 time.sleep(self.PD_RESYNC_DELAY) 666 667 # Servo v4 requires an external charger to source power. Make sure 668 # this setup is correct. 669 if 'servo_v4' in self.pdtester.servo_type: 670 role = self.pdtester.get('servo_v4_role') 671 if role != 'src': 672 raise error.TestError( 673 'Servo v4 is not sourcing power! Make sure the servo ' 674 '"DUT POWER" port is connected to a working charger. ' 675 'servo_v4_role:%s' % role) 676 677 def setup_usbkey(self, usbkey, host=None, used_for_recovery=None): 678 """Setup the USB disk for the test. 679 680 It checks the setup of USB disk and a valid ChromeOS test image inside. 681 It also muxes the USB disk to either the host or DUT by request. 682 683 @param usbkey: True if the USB disk is required for the test, False if 684 not required. 685 @param host: Optional, True to mux the USB disk to host, False to mux it 686 to DUT, default to do nothing. 687 @param used_for_recovery: Optional, True if the USB disk is used for 688 recovery boot; False if the USB disk is not 689 used for recovery boot, like Ctrl-U USB boot. 690 """ 691 if usbkey: 692 self.stage_build_to_usbkey() 693 self.assert_test_image_in_usb_disk() 694 elif host is None: 695 # USB disk is not required for the test. Better to mux it to host. 696 host = True 697 698 if host is True: 699 self.servo.switch_usbkey('host') 700 elif host is False: 701 self.servo.switch_usbkey('dut') 702 703 if used_for_recovery is None: 704 # Default value is True if usbkey == True. 705 # As the common usecase of USB disk is for recovery boot. Tests 706 # can define it explicitly if not. 707 used_for_recovery = usbkey 708 709 if used_for_recovery: 710 # In recovery boot, the locked EC RO doesn't support PD for most 711 # of the CrOS devices. The default servo v4 power role is a SRC. 712 # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the 713 # data role swap from UFP to DFP; as a result, DUT can't see the 714 # USB disk and the Ethernet dongle on servo v4. 715 # 716 # This is a workaround to set servo v4 as a SNK, for every FAFT 717 # test which boots into the USB disk in the recovery mode. 718 # 719 # TODO(waihong): Add a check to see if the battery level is too 720 # low and sleep for a while for charging. 721 self.set_servo_v4_role_to_snk() 722 723 def set_servo_v4_role_to_snk(self, pd_comm=False): 724 """Set the servo v4 role to SNK. 725 726 @param pd_comm: a bool. Enable PD communication if True, else otherwise 727 """ 728 self._needed_restore_servo_v4_role = True 729 self.servo.set_servo_v4_role('snk') 730 if pd_comm: 731 self.servo.set_servo_v4_pd_comm('on') 732 733 def _restore_servo_v4_role(self): 734 """Restore the servo v4 role to default SRC.""" 735 if not hasattr(self, '_needed_restore_servo_v4_role'): 736 return 737 if self._needed_restore_servo_v4_role: 738 self.servo.set_servo_v4_role('src') 739 740 def set_dut_low_power_idle_delay(self, delay): 741 """Set EC low power idle delay 742 743 @param delay: Delay in seconds 744 """ 745 if not self.ec.has_command('dsleep'): 746 logging.info("Can't set low power idle delay.") 747 return 748 self._previous_ec_low_power_delay = int( 749 self.ec.send_command_get_output("dsleep", 750 ["timeout:\s+(\d+)\ssec"])[0][1]) 751 self.ec.send_command("dsleep " + str(delay)) 752 753 def restore_dut_low_power_idle_delay(self): 754 """Restore EC low power idle delay""" 755 if getattr(self, '_previous_ec_low_power_delay', None): 756 self.ec.send_command("dsleep " + str( 757 self._previous_ec_low_power_delay)) 758 759 def get_usbdisk_path_on_dut(self): 760 """Get the path of the USB disk device plugged-in the servo on DUT. 761 762 Returns: 763 A string representing USB disk path, like '/dev/sdb', or None if 764 no USB disk is found. 765 """ 766 cmd = 'ls -d /dev/s*[a-z]' 767 original_value = self.servo.get_usbkey_state() 768 769 # Make the dut unable to see the USB disk. 770 self.servo.switch_usbkey('off') 771 time.sleep(self.faft_config.usb_unplug) 772 no_usb_set = set( 773 self.faft_client.system.run_shell_command_get_output(cmd)) 774 775 # Make the dut able to see the USB disk. 776 self.servo.switch_usbkey('dut') 777 time.sleep(self.faft_config.usb_plug) 778 has_usb_set = set( 779 self.faft_client.system.run_shell_command_get_output(cmd)) 780 781 # Back to its original value. 782 if original_value != self.servo.get_usbkey_state(): 783 self.servo.switch_usbkey(original_value) 784 785 diff_set = has_usb_set - no_usb_set 786 if len(diff_set) == 1: 787 return diff_set.pop() 788 else: 789 return None 790 791 def _create_faft_lockfile(self): 792 """Creates the FAFT lockfile.""" 793 logging.info('Creating FAFT lockfile...') 794 command = 'touch %s' % (self.lockfile) 795 self.faft_client.system.run_shell_command(command) 796 797 def _create_old_faft_lockfile(self): 798 """ 799 Creates the FAFT lockfile in its legacy location. 800 801 TODO (once M83 is stable, approx. June 9 2020): 802 Delete this function, as platform/installer/chromeos-setgoodkernel 803 will look for the lockfile in the new location 804 (/usr/local/tmp/faft/lock) 805 """ 806 logging.info('Creating legacy FAFT lockfile...') 807 self.faft_client.system.run_shell_command('mkdir -p /var/tmp/faft') 808 self.faft_client.system.run_shell_command('touch /var/tmp/faft/lock') 809 810 def _remove_faft_lockfile(self): 811 """Removes the FAFT lockfile.""" 812 logging.info('Removing FAFT lockfile...') 813 command = 'rm -f %s' % (self.lockfile) 814 self.faft_client.system.run_shell_command(command) 815 816 def _remove_old_faft_lockfile(self): 817 """ 818 Removes the FAFT lockfile from its legacy location. 819 820 TODO (once M83 is stable, approx. June 9 2020): 821 Delete this function, as platform/installer/chromeos-setgoodkernel 822 will look for the lockfile in the new location 823 (/usr/local/tmp/faft/lock) 824 """ 825 logging.info('Removing legacy FAFT lockfile...') 826 self.faft_client.system.run_shell_command('rm -rf /var/tmp/faft') 827 828 def clear_set_gbb_flags(self, clear_mask, set_mask): 829 """Clear and set the GBB flags in the current flashrom. 830 831 @param clear_mask: A mask of flags to be cleared. 832 @param set_mask: A mask of flags to be set. 833 """ 834 gbb_flags = self.faft_client.bios.get_gbb_flags() 835 new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask 836 self.gbb_flags = new_flags 837 if new_flags != gbb_flags: 838 self._backup_gbb_flags = gbb_flags 839 logging.info('Changing GBB flags from 0x%x to 0x%x.', 840 gbb_flags, new_flags) 841 self.faft_client.bios.set_gbb_flags(new_flags) 842 # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag, 843 # reboot to get a clear state 844 if ((gbb_flags ^ new_flags) & 845 (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON | 846 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)): 847 self.switcher.mode_aware_reboot() 848 else: 849 logging.info('Current GBB flags look good for test: 0x%x.', 850 gbb_flags) 851 852 853 def _check_capability(self, target, required_cap, suppress_warning): 854 """Check if current platform has required capabilities for the target. 855 856 @param required_cap: A list containing required capabilities. 857 @param suppress_warning: True to suppress any warning messages. 858 @return: True if requirements are met. Otherwise, False. 859 """ 860 if not required_cap: 861 return True 862 863 if target not in ['ec', 'cr50']: 864 raise error.TestError('Invalid capability target %r' % target) 865 866 for cap in required_cap: 867 if cap not in getattr(self.faft_config, target + '_capability'): 868 if not suppress_warning: 869 logging.warn('Requires %s capability "%s" to run this ' 870 'test.', target, cap) 871 return False 872 873 return True 874 875 876 def check_ec_capability(self, required_cap=None, suppress_warning=False): 877 """Check if current platform has required EC capabilities. 878 879 @param required_cap: A list containing required EC capabilities. Pass in 880 None to only check for presence of Chrome EC. 881 @param suppress_warning: True to suppress any warning messages. 882 @return: True if requirements are met. Otherwise, False. 883 """ 884 if not self.faft_config.chrome_ec: 885 if not suppress_warning: 886 logging.warn('Requires Chrome EC to run this test.') 887 return False 888 return self._check_capability('ec', required_cap, suppress_warning) 889 890 891 def check_cr50_capability(self, required_cap=None, suppress_warning=False): 892 """Check if current platform has required Cr50 capabilities. 893 894 @param required_cap: A list containing required Cr50 capabilities. Pass 895 in None to only check for presence of cr50 uart. 896 @param suppress_warning: True to suppress any warning messages. 897 @return: True if requirements are met. Otherwise, False. 898 """ 899 if not hasattr(self, 'cr50'): 900 if not suppress_warning: 901 logging.warn('Requires Chrome Cr50 to run this test.') 902 return False 903 return self._check_capability('cr50', required_cap, suppress_warning) 904 905 906 def check_root_part_on_non_recovery(self, part): 907 """Check the partition number of root device and on normal/dev boot. 908 909 @param part: A string of partition number, e.g.'3'. 910 @return: True if the root device matched and on normal/dev boot; 911 otherwise, False. 912 """ 913 return self.checkers.root_part_checker(part) and \ 914 self.checkers.crossystem_checker({ 915 'mainfw_type': ('normal', 'developer'), 916 }) 917 918 def _join_part(self, dev, part): 919 """Return a concatenated string of device and partition number. 920 921 @param dev: A string of device, e.g.'/dev/sda'. 922 @param part: A string of partition number, e.g.'3'. 923 @return: A concatenated string of device and partition number, 924 e.g.'/dev/sda3'. 925 926 >>> seq = FirmwareTest() 927 >>> seq._join_part('/dev/sda', '3') 928 '/dev/sda3' 929 >>> seq._join_part('/dev/mmcblk0', '2') 930 '/dev/mmcblk0p2' 931 """ 932 if 'mmcblk' in dev: 933 return dev + 'p' + part 934 elif 'nvme' in dev: 935 return dev + 'p' + part 936 else: 937 return dev + part 938 939 def copy_kernel_and_rootfs(self, from_part, to_part): 940 """Copy kernel and rootfs from from_part to to_part. 941 942 @param from_part: A string of partition number to be copied from. 943 @param to_part: A string of partition number to be copied to. 944 """ 945 root_dev = self.faft_client.system.get_root_dev() 946 logging.info('Copying kernel from %s to %s. Please wait...', 947 from_part, to_part) 948 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 949 (self._join_part(root_dev, self.KERNEL_MAP[from_part]), 950 self._join_part(root_dev, self.KERNEL_MAP[to_part]))) 951 logging.info('Copying rootfs from %s to %s. Please wait...', 952 from_part, to_part) 953 self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' % 954 (self._join_part(root_dev, self.ROOTFS_MAP[from_part]), 955 self._join_part(root_dev, self.ROOTFS_MAP[to_part]))) 956 957 def ensure_kernel_boot(self, part): 958 """Ensure the request kernel boot. 959 960 If not, it duplicates the current kernel to the requested kernel 961 and sets the requested higher priority to ensure it boot. 962 963 @param part: A string of kernel partition number or 'a'/'b'. 964 """ 965 if not self.checkers.root_part_checker(part): 966 if self.faft_client.kernel.diff_a_b(): 967 self.copy_kernel_and_rootfs( 968 from_part=self.OTHER_KERNEL_MAP[part], 969 to_part=part) 970 self.reset_and_prioritize_kernel(part) 971 self.switcher.mode_aware_reboot() 972 973 def ensure_dev_internal_boot(self, original_dev_boot_usb): 974 """Ensure internal device boot in developer mode. 975 976 If not internal device boot, it will try to reboot the device and 977 bypass dev mode to boot into internal device. 978 979 @param original_dev_boot_usb: Original dev_boot_usb value. 980 """ 981 logging.info('Checking internal device boot.') 982 self.faft_client.system.set_dev_default_boot() 983 if self.faft_client.system.is_removable_device_boot(): 984 logging.info('Reboot into internal disk...') 985 self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb) 986 self.switcher.mode_aware_reboot() 987 self.check_state((self.checkers.dev_boot_usb_checker, False, 988 'Device not booted from internal disk properly.')) 989 990 def set_hardware_write_protect(self, enable): 991 """Set hardware write protect pin. 992 993 @param enable: True if asserting write protect pin. Otherwise, False. 994 """ 995 self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off') 996 997 def set_ap_write_protect_and_reboot(self, enable): 998 """Set AP write protect status and reboot to take effect. 999 1000 @param enable: True if asserting write protect. Otherwise, False. 1001 """ 1002 self.set_hardware_write_protect(enable) 1003 if hasattr(self, 'ec'): 1004 self.sync_and_ec_reboot() 1005 self.switcher.wait_for_client() 1006 1007 def run_chromeos_firmwareupdate(self, mode, append=None, options=(), 1008 ignore_status=False): 1009 """Use RPC to get the command to run, but do the actual run via ssh. 1010 1011 Running the command via SSH improves the reliability in cases where the 1012 USB network connection gets interrupted. SSH will still return the 1013 output, and won't hang like RPC would. 1014 """ 1015 update_cmd = self.faft_client.updater.get_firmwareupdate_command( 1016 mode, append, options) 1017 try: 1018 result = self._client.run( 1019 update_cmd, timeout=300, ignore_status=ignore_status) 1020 if result.exit_status == 255: 1021 self.faft_client.disconnect() 1022 return result 1023 except error.AutoservRunError as e: 1024 if e.result_obj.exit_status == 255: 1025 self.faft_client.disconnect() 1026 if ignore_status: 1027 return e.result_obj 1028 raise 1029 1030 def set_ec_write_protect_and_reboot(self, enable): 1031 """Set EC write protect status and reboot to take effect. 1032 1033 The write protect state is only activated if both hardware write 1034 protect pin is asserted and software write protect flag is set. 1035 This method asserts/deasserts hardware write protect pin first, and 1036 set corresponding EC software write protect flag. 1037 1038 If the device uses non-Chrome EC, set the software write protect via 1039 flashrom. 1040 1041 If the device uses Chrome EC, a reboot is required for write protect 1042 to take effect. Since the software write protect flag cannot be unset 1043 if hardware write protect pin is asserted, we need to deasserted the 1044 pin first if we are deactivating write protect. Similarly, a reboot 1045 is required before we can modify the software flag. 1046 1047 @param enable: True if activating EC write protect. Otherwise, False. 1048 """ 1049 self.set_hardware_write_protect(enable) 1050 if self.faft_config.chrome_ec: 1051 self.set_chrome_ec_write_protect_and_reboot(enable) 1052 else: 1053 self.faft_client.ec.set_write_protect(enable) 1054 self.switcher.mode_aware_reboot() 1055 1056 def set_chrome_ec_write_protect_and_reboot(self, enable): 1057 """Set Chrome EC write protect status and reboot to take effect. 1058 1059 @param enable: True if activating EC write protect. Otherwise, False. 1060 """ 1061 if enable: 1062 # Set write protect flag and reboot to take effect. 1063 self.ec.set_flash_write_protect(enable) 1064 self.sync_and_ec_reboot( 1065 flags='hard', 1066 extra_sleep=self.faft_config.ec_boot_to_wp_en) 1067 else: 1068 # Reboot after deasserting hardware write protect pin to deactivate 1069 # write protect. And then remove software write protect flag. 1070 # Some ITE ECs can only clear their WP status on a power-on reset, 1071 # no software-initiated reset will do. 1072 self.sync_and_ec_reboot(flags='cold') 1073 self.ec.set_flash_write_protect(enable) 1074 1075 def _setup_ec_write_protect(self, ec_wp): 1076 """Setup for EC write-protection. 1077 1078 It makes sure the EC in the requested write-protection state. If not, it 1079 flips the state. Flipping the write-protection requires DUT reboot. 1080 1081 @param ec_wp: True to request EC write-protected; False to request EC 1082 not write-protected; None to do nothing. 1083 """ 1084 if ec_wp is None: 1085 return 1086 self._old_wpsw_cur = self.checkers.crossystem_checker( 1087 {'wpsw_cur': '1'}, suppress_logging=True) 1088 if ec_wp != self._old_wpsw_cur: 1089 if not self.faft_config.ap_access_ec_flash: 1090 raise error.TestNAError( 1091 "Cannot change EC write-protect for this device") 1092 1093 logging.info('The test required EC is %swrite-protected. Reboot ' 1094 'and flip the state.', '' if ec_wp else 'not ') 1095 self.switcher.mode_aware_reboot( 1096 'custom', 1097 lambda:self.set_ec_write_protect_and_reboot(ec_wp)) 1098 wpsw_cur = '1' if ec_wp else '0' 1099 self.check_state((self.checkers.crossystem_checker, { 1100 'wpsw_cur': wpsw_cur})) 1101 1102 def _restore_ec_write_protect(self): 1103 """Restore the original EC write-protection.""" 1104 if (not hasattr(self, '_old_wpsw_cur')) or (self._old_wpsw_cur is 1105 None): 1106 return 1107 if not self.checkers.crossystem_checker({'wpsw_cur': '1' if 1108 self._old_wpsw_cur else '0'}, suppress_logging=True): 1109 logging.info('Restore original EC write protection and reboot.') 1110 self.switcher.mode_aware_reboot( 1111 'custom', 1112 lambda:self.set_ec_write_protect_and_reboot( 1113 self._old_wpsw_cur)) 1114 self.check_state((self.checkers.crossystem_checker, { 1115 'wpsw_cur': '1' if self._old_wpsw_cur else '0'})) 1116 1117 def _record_uart_capture(self): 1118 """Record the CPU/EC/PD UART output stream to files.""" 1119 self.servo.record_uart_capture(self.resultsdir) 1120 1121 def _cleanup_uart_capture(self): 1122 """Cleanup the CPU/EC/PD UART capture.""" 1123 self.servo.close(self.resultsdir) 1124 1125 def set_ap_off_power_mode(self, power_mode): 1126 """ 1127 Set the DUT power mode to suspend (S0ix/S3) or shutdown (G3/S5). 1128 The DUT must be in S0 when calling this method. 1129 1130 @param power_mode: a string for the expected power mode, either 1131 'suspend' or 'shutdown'. 1132 """ 1133 if power_mode == 'suspend': 1134 target_power_state = self.POWER_STATE_SUSPEND 1135 elif power_mode == 'shutdown': 1136 target_power_state = self.POWER_STATE_G3 1137 else: 1138 raise error.TestError('%s is not a valid ap-off power mode.' % 1139 power_mode) 1140 1141 if self.get_power_state() != self.POWER_STATE_S0: 1142 raise error.TestError('The DUT is not in S0.') 1143 1144 self._restore_power_mode = True 1145 1146 if target_power_state == self.POWER_STATE_G3: 1147 self.run_shutdown_cmd() 1148 time.sleep(self.faft_config.shutdown) 1149 elif target_power_state == self.POWER_STATE_SUSPEND: 1150 self.suspend() 1151 1152 if self.wait_power_state(target_power_state, self.DEFAULT_PWR_RETRIES): 1153 logging.info('System entered %s state.', target_power_state) 1154 else: 1155 self._restore_power_mode = False 1156 raise error.TestFail('System fail to enter %s state. ' 1157 'Current state: %s', target_power_state, 1158 self.get_power_state()) 1159 1160 def restore_ap_on_power_mode(self): 1161 """ 1162 Wake up the DUT to S0. If the DUT was not set to suspend or 1163 shutdown mode by set_ap_off_power_mode(), raise an error. 1164 """ 1165 if self.get_power_state() != self.POWER_STATE_S0: 1166 logging.info('Wake up the DUT to S0.') 1167 self.servo.power_normal_press() 1168 # If the DUT is ping-able, it must be in S0. 1169 self.switcher.wait_for_client() 1170 if self._restore_power_mode != True: 1171 raise error.TestFail('The DUT was not set to suspend/shutdown ' 1172 'mode by set_ap_off_power_mode().') 1173 self._restore_power_mode = False 1174 1175 def get_power_state(self): 1176 """ 1177 Return the current power state of the AP (via EC 'powerinfo' command) 1178 1179 @return the name of the power state, or None if a problem occurred 1180 """ 1181 if not hasattr(self, 'ec'): 1182 # Don't fail when EC not present or not fully initialized 1183 return None 1184 1185 pattern = r'power state (\w+) = (\w+),' 1186 1187 try: 1188 match = self.ec.send_command_get_output("powerinfo", [pattern]) 1189 except error.TestFail as err: 1190 logging.warn("powerinfo command encountered an error: %s", err) 1191 return None 1192 if not match: 1193 logging.warn("powerinfo output did not match pattern: %r", pattern) 1194 return None 1195 (line, state_num, state_name) = match[0] 1196 logging.debug("power state info %r", match) 1197 return state_name 1198 1199 def _check_power_state(self, power_state): 1200 """ 1201 Check for correct power state of the AP (via EC 'powerinfo' command) 1202 1203 @return: the line and the match, if the output matched. 1204 @raise error.TestFail: if output didn't match after the delay. 1205 """ 1206 if not isinstance(power_state, str): 1207 raise error.TestError('%s is not a string while it should be.' % 1208 power_state) 1209 return self.ec.send_command_get_output("powerinfo", 1210 ['\\b' + power_state + '\\b']) 1211 1212 def wait_power_state(self, power_state, retries, retry_delay=0): 1213 """ 1214 Wait for certain power state. 1215 1216 @param power_state: power state you are expecting 1217 @param retries: retries. This is necessary if AP is powering down 1218 and transitioning through different states. 1219 @param retry_delay: delay between retries in seconds 1220 """ 1221 logging.info('Checking power state "%s" maximum %d times.', 1222 power_state, retries) 1223 1224 # Reset the cache, in case previous calls silently changed it on servod 1225 self.ec.set_uart_regexp('None') 1226 1227 while retries > 0: 1228 logging.info("try count: %d", retries) 1229 start_time = time.time() 1230 try: 1231 retries = retries - 1 1232 if self._check_power_state(power_state): 1233 return True 1234 except error.TestFail: 1235 pass 1236 delay_time = retry_delay - time.time() + start_time 1237 if delay_time > 0: 1238 time.sleep(delay_time) 1239 return False 1240 1241 def run_shutdown_cmd(self): 1242 """Shut down the DUT by running '/sbin/shutdown -P now'.""" 1243 self.faft_client.disconnect() 1244 # Shut down in the background after sleeping so the call gets a reply. 1245 try: 1246 self._client.run_background('sleep 0.5; /sbin/shutdown -P now') 1247 except error.AutoservRunError as e: 1248 # From the ssh man page, error code 255 indicates ssh errors. 1249 if e.result_obj.exit_status == 255: 1250 logging.warn("Ignoring error from ssh: %s", e) 1251 else: 1252 raise 1253 self.switcher.wait_for_client_offline() 1254 1255 def suspend(self): 1256 """Suspends the DUT.""" 1257 cmd = 'sleep %d; powerd_dbus_suspend' % self.EC_SUSPEND_DELAY 1258 block = False 1259 self.faft_client.system.run_shell_command(cmd, block) 1260 time.sleep(self.EC_SUSPEND_DELAY) 1261 1262 def _record_faft_client_log(self): 1263 """Record the faft client log to the results directory.""" 1264 client_log = self.faft_client.system.dump_log(True) 1265 client_log_file = os.path.join(self.resultsdir, 'faft_client.log') 1266 with open(client_log_file, 'w') as f: 1267 f.write(client_log) 1268 1269 def _setup_gbb_flags(self): 1270 """Setup the GBB flags for FAFT test.""" 1271 if self.check_setup_done('gbb_flags'): 1272 return 1273 1274 logging.info('Set proper GBB flags for test.') 1275 # Ensure that GBB flags are set to 0x140. 1276 flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE | 1277 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM) 1278 # And if the "no_ec_sync" argument is set, then disable EC software 1279 # sync. 1280 if self._no_ec_sync: 1281 logging.info( 1282 'User selected to disable EC software sync') 1283 flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC 1284 1285 # And if the "no_fw_rollback_check" argument is set, then disable fw 1286 # rollback check. 1287 if self._no_fw_rollback_check: 1288 logging.info( 1289 'User selected to disable FW rollback check') 1290 flags_to_set |= vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK 1291 1292 self.clear_set_gbb_flags(0xffffffff, flags_to_set) 1293 self.mark_setup_done('gbb_flags') 1294 1295 def _restore_gbb_flags(self): 1296 """Restore GBB flags to their original state.""" 1297 if self._backup_gbb_flags is None: 1298 return 1299 # Setting up and restoring the GBB flags take a lot of time. For 1300 # speed-up purpose, don't restore it. 1301 logging.info('***') 1302 logging.info('*** Please manually restore the original GBB flags to: ' 1303 '0x%x ***', self._backup_gbb_flags) 1304 logging.info('***') 1305 self.unmark_setup_done('gbb_flags') 1306 1307 def setup_tried_fwb(self, tried_fwb): 1308 """Setup for fw B tried state. 1309 1310 It makes sure the system in the requested fw B tried state. If not, it 1311 tries to do so. 1312 1313 @param tried_fwb: True if requested in tried_fwb=1; 1314 False if tried_fwb=0. 1315 """ 1316 if tried_fwb: 1317 if not self.checkers.crossystem_checker({'tried_fwb': '1'}): 1318 logging.info( 1319 'Firmware is not booted with tried_fwb. Reboot into it.') 1320 self.faft_client.system.set_try_fw_b() 1321 else: 1322 if not self.checkers.crossystem_checker({'tried_fwb': '0'}): 1323 logging.info( 1324 'Firmware is booted with tried_fwb. Reboot to clear.') 1325 1326 def power_on(self): 1327 """Switch DUT AC power on.""" 1328 self._client.power_on(self.power_control) 1329 1330 def power_off(self): 1331 """Switch DUT AC power off.""" 1332 self._client.power_off(self.power_control) 1333 1334 def power_cycle(self): 1335 """Power cycle DUT AC power.""" 1336 self._client.power_cycle(self.power_control) 1337 1338 def setup_rw_boot(self, section='a'): 1339 """Make sure firmware is in RW-boot mode. 1340 1341 If the given firmware section is in RO-boot mode, turn off the RO-boot 1342 flag and reboot DUT into RW-boot mode. 1343 1344 @param section: A firmware section, either 'a' or 'b'. 1345 """ 1346 flags = self.faft_client.bios.get_preamble_flags(section) 1347 if flags & vboot.PREAMBLE_USE_RO_NORMAL: 1348 flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL 1349 self.faft_client.bios.set_preamble_flags(section, flags) 1350 self.switcher.mode_aware_reboot() 1351 1352 def setup_kernel(self, part): 1353 """Setup for kernel test. 1354 1355 It makes sure both kernel A and B bootable and the current boot is 1356 the requested kernel part. 1357 1358 @param part: A string of kernel partition number or 'a'/'b'. 1359 """ 1360 self.ensure_kernel_boot(part) 1361 logging.info('Checking the integrity of kernel B and rootfs B...') 1362 if (self.faft_client.kernel.diff_a_b() or 1363 not self.faft_client.rootfs.verify_rootfs('B')): 1364 logging.info('Copying kernel and rootfs from A to B...') 1365 self.copy_kernel_and_rootfs(from_part=part, 1366 to_part=self.OTHER_KERNEL_MAP[part]) 1367 self.reset_and_prioritize_kernel(part) 1368 1369 def reset_and_prioritize_kernel(self, part): 1370 """Make the requested partition highest priority. 1371 1372 This function also reset kerenl A and B to bootable. 1373 1374 @param part: A string of partition number to be prioritized. 1375 """ 1376 root_dev = self.faft_client.system.get_root_dev() 1377 # Reset kernel A and B to bootable. 1378 self.faft_client.system.run_shell_command( 1379 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev)) 1380 self.faft_client.system.run_shell_command( 1381 'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev)) 1382 # Set kernel part highest priority. 1383 self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' % 1384 (self.KERNEL_MAP[part], root_dev)) 1385 1386 def do_blocking_sync(self, device): 1387 """Run a blocking sync command.""" 1388 logging.info("Blocking sync for %s", device) 1389 1390 if 'mmcblk' in device: 1391 # For mmc devices, use `mmc status get` command to send an 1392 # empty command to wait for the disk to be available again. 1393 self.faft_client.system.run_shell_command('mmc status get %s' % 1394 device) 1395 elif 'nvme' in device: 1396 # For NVMe devices, use `nvme flush` command to commit data 1397 # and metadata to non-volatile media. 1398 1399 # Get a list of NVMe namespaces, and flush them individually. 1400 # The output is assumed to be in the following format: 1401 # [ 0]:0x1 1402 # [ 1]:0x2 1403 list_ns_cmd = "nvme list-ns %s" % device 1404 available_ns = self.faft_client.system.run_shell_command_get_output( 1405 list_ns_cmd) 1406 1407 if not available_ns: 1408 raise error.TestError( 1409 "Listing namespaces failed (empty output): %s" 1410 % list_ns_cmd) 1411 1412 for ns in available_ns: 1413 ns = ns.split(':')[-1] 1414 flush_cmd = 'nvme flush %s -n %s' % (device, ns) 1415 flush_rc = self.faft_client.system.run_shell_command_get_status( 1416 flush_cmd) 1417 if flush_rc != 0: 1418 raise error.TestError( 1419 "Flushing namespace %s failed (rc=%s): %s" 1420 % (ns, flush_rc, flush_cmd)) 1421 else: 1422 # For other devices, hdparm sends TUR to check if 1423 # a device is ready for transfer operation. 1424 self.faft_client.system.run_shell_command('hdparm -f %s' % device) 1425 1426 def blocking_sync(self, freeze_for_reset=False): 1427 """Sync root device and internal device, via script if possible. 1428 1429 The actual calls end up logged by the run() call, since they're printed 1430 to stdout/stderr in the script. 1431 1432 @param freeze_for_reset: if True, prepare for reset by blocking writes 1433 (only if enable_fs_sync_fsfreeze=True) 1434 """ 1435 1436 if self._use_sync_script: 1437 if freeze_for_reset: 1438 self.faft_client.quit() 1439 try: 1440 return self._client.blocking_sync(freeze_for_reset) 1441 except (AttributeError, ImportError, error.AutoservRunError) as e: 1442 logging.warn( 1443 'Falling back to old sync method due to error: %s', e) 1444 1445 # The double calls to sync fakes a blocking call 1446 # since the first call returns before the flush 1447 # is complete, but the second will wait for the 1448 # first to finish. 1449 self.faft_client.system.run_shell_command('sync') 1450 self.faft_client.system.run_shell_command('sync') 1451 1452 # sync only sends SYNCHRONIZE_CACHE but doesn't check the status. 1453 # This function will perform a device-specific sync command. 1454 root_dev = self.faft_client.system.get_root_dev() 1455 self.do_blocking_sync(root_dev) 1456 1457 # Also sync the internal device if booted from removable media. 1458 if self.faft_client.system.is_removable_device_boot(): 1459 internal_dev = self.faft_client.system.get_internal_device() 1460 self.do_blocking_sync(internal_dev) 1461 1462 def sync_and_ec_reboot(self, flags='', extra_sleep=0): 1463 """Request the client sync and do a EC triggered reboot. 1464 1465 @param flags: Optional, a space-separated string of flags passed to EC 1466 reboot command, including: 1467 default: EC soft reboot; 1468 'hard': EC hard reboot. 1469 'cold': Cold reboot via servo. 1470 @param extra_sleep: Optional, int or float for extra wait time for EC 1471 reboot in seconds. 1472 """ 1473 self.blocking_sync(freeze_for_reset=True) 1474 if flags == 'cold': 1475 self.servo.get_power_state_controller().reset() 1476 else: 1477 self.ec.reboot(flags) 1478 time.sleep(self.faft_config.ec_boot_to_console + extra_sleep) 1479 self.check_lid_and_power_on() 1480 1481 def reboot_and_reset_tpm(self): 1482 """Reboot into recovery mode, reset TPM, then reboot back to disk.""" 1483 self.switcher.reboot_to_mode(to_mode='rec') 1484 self.faft_client.system.run_shell_command('chromeos-tpm-recovery') 1485 self.switcher.mode_aware_reboot() 1486 1487 def full_power_off_and_on(self): 1488 """Shutdown the device by pressing power button and power on again.""" 1489 boot_id = self.get_bootid() 1490 self.faft_client.disconnect() 1491 1492 # Press power button to trigger Chrome OS normal shutdown process. 1493 # We use a customized delay since the normal-press 1.2s is not enough. 1494 self.servo.power_key(self.faft_config.hold_pwr_button_poweroff) 1495 # device can take 44-51 seconds to restart, 1496 # add buffer from the default timeout of 60 seconds. 1497 self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id) 1498 time.sleep(self.faft_config.shutdown) 1499 if self.faft_config.chrome_ec: 1500 self.check_shutdown_power_state(self.POWER_STATE_G3, 1501 orig_boot_id=boot_id) 1502 # Short press power button to boot DUT again. 1503 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1504 1505 def check_shutdown_power_state(self, power_state, 1506 pwr_retries=DEFAULT_PWR_RETRIES, 1507 orig_boot_id=None): 1508 """Check whether the device shut down and entered the given power state. 1509 1510 If orig_boot_id is specified, it will check whether the DUT responds to 1511 ssh requests, then use orig_boot_id to check if it rebooted. 1512 1513 @param power_state: EC power state has to be checked. Either S5 or G3. 1514 @param pwr_retries: Times to check if the DUT in expected power state. 1515 @param orig_boot_id: Old boot_id, to check for unexpected reboots. 1516 @raise TestFail: If device failed to enter into requested power state. 1517 """ 1518 if not self.wait_power_state(power_state, pwr_retries): 1519 current_state = self.get_power_state() 1520 if current_state == self.POWER_STATE_S0 and self._client.wait_up(): 1521 # DUT is unexpectedly up, so check whether it rebooted instead. 1522 new_boot_id = self.get_bootid() 1523 logging.debug('orig_boot_id=%s, new_boot_id=%s', 1524 orig_boot_id, new_boot_id) 1525 if orig_boot_id is None or new_boot_id is None: 1526 # Can't say anything more specific without values to compare 1527 raise error.TestFail( 1528 "Expected state %s, but the system is unexpectedly" 1529 " still up. Current state: %s" 1530 % (power_state, current_state)) 1531 if new_boot_id == orig_boot_id: 1532 raise error.TestFail( 1533 "Expected state %s, but the system didn't shut" 1534 " down. Current state: %s" 1535 % (power_state, current_state)) 1536 else: 1537 raise error.TestFail( 1538 "Expected state %s, but the system rebooted instead" 1539 " of shutting down. Current state: %s" 1540 % (power_state, current_state)) 1541 1542 if current_state is None: 1543 current_state = '(unknown)' 1544 1545 if current_state == power_state: 1546 raise error.TestFail( 1547 "Expected state %s, but the system didn't reach it" 1548 " until after the limit of %s tries." 1549 % (power_state, pwr_retries)) 1550 1551 raise error.TestFail('System not shutdown properly and EC fails' 1552 ' to enter into %s state. Current state: %s' 1553 % (power_state, current_state)) 1554 logging.info('System entered into %s state..', power_state) 1555 1556 def check_lid_and_power_on(self): 1557 """ 1558 On devices with EC software sync, system powers on after EC reboots if 1559 lid is open. Otherwise, the EC shuts down CPU after about 3 seconds. 1560 This method checks lid switch state and presses power button if 1561 necessary. 1562 """ 1563 if self.servo.get("lid_open") == "no": 1564 time.sleep(self.faft_config.software_sync) 1565 self.servo.power_short_press() 1566 1567 def stop_powerd(self): 1568 """Stop the powerd daemon on the AP. 1569 1570 This will cause the AP to ignore power button presses sent by the EC. 1571 """ 1572 powerd_running = self.faft_client.system.run_shell_command_check_output( 1573 'status powerd', 'start/running') 1574 if powerd_running: 1575 logging.debug('Stopping powerd') 1576 self.faft_client.system.run_shell_command("stop powerd") 1577 1578 def _modify_usb_kernel(self, usb_dev, from_magic, to_magic): 1579 """Modify the kernel header magic in USB stick. 1580 1581 The kernel header magic is the first 8-byte of kernel partition. 1582 We modify it to make it fail on kernel verification check. 1583 1584 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1585 @param from_magic: A string of magic which we change it from. 1586 @param to_magic: A string of magic which we change it to. 1587 @raise TestError: if failed to change magic. 1588 """ 1589 assert len(from_magic) == 8 1590 assert len(to_magic) == 8 1591 # USB image only contains one kernel. 1592 kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a']) 1593 read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part 1594 current_magic = self.servo.system_output(read_cmd) 1595 if current_magic == to_magic: 1596 logging.info("The kernel magic is already %s.", current_magic) 1597 return 1598 if current_magic != from_magic: 1599 raise error.TestError("Invalid kernel image on USB: wrong magic.") 1600 1601 logging.info('Modify the kernel magic in USB, from %s to %s.', 1602 from_magic, to_magic) 1603 write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc " 1604 " 2>/dev/null" % (to_magic, kernel_part)) 1605 self.servo.system(write_cmd) 1606 1607 if self.servo.system_output(read_cmd) != to_magic: 1608 raise error.TestError("Failed to write new magic.") 1609 1610 def corrupt_usb_kernel(self, usb_dev): 1611 """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD. 1612 1613 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1614 """ 1615 self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC, 1616 self.CORRUPTED_MAGIC) 1617 1618 def restore_usb_kernel(self, usb_dev): 1619 """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS. 1620 1621 @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'. 1622 """ 1623 self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC, 1624 self.CHROMEOS_MAGIC) 1625 1626 def _call_action(self, action_tuple, check_status=False): 1627 """Call the action function with/without arguments. 1628 1629 @param action_tuple: A function, or a tuple (function, args, error_msg), 1630 in which, args and error_msg are optional. args is 1631 either a value or a tuple if multiple arguments. 1632 This can also be a list containing multiple 1633 function or tuple. In this case, these actions are 1634 called in sequence. 1635 @param check_status: Check the return value of action function. If not 1636 succeed, raises a TestFail exception. 1637 @return: The result value of the action function. 1638 @raise TestError: An error when the action function is not callable. 1639 @raise TestFail: When check_status=True, action function not succeed. 1640 """ 1641 if isinstance(action_tuple, list): 1642 return all([self._call_action(action, check_status=check_status) 1643 for action in action_tuple]) 1644 1645 action = action_tuple 1646 args = () 1647 error_msg = 'Not succeed' 1648 if isinstance(action_tuple, tuple): 1649 action = action_tuple[0] 1650 if len(action_tuple) >= 2: 1651 args = action_tuple[1] 1652 if not isinstance(args, tuple): 1653 args = (args,) 1654 if len(action_tuple) >= 3: 1655 error_msg = action_tuple[2] 1656 1657 if action is None: 1658 return 1659 1660 if not callable(action): 1661 raise error.TestError('action is not callable!') 1662 1663 info_msg = 'calling %s' % action.__name__ 1664 if args: 1665 info_msg += ' with args %s' % str(args) 1666 logging.info(info_msg) 1667 ret = action(*args) 1668 1669 if check_status and not ret: 1670 raise error.TestFail('%s: %s returning %s' % 1671 (error_msg, info_msg, str(ret))) 1672 return ret 1673 1674 def run_shutdown_process(self, shutdown_action, pre_power_action=None, 1675 run_power_action=True, post_power_action=None, 1676 shutdown_timeout=None): 1677 """Run shutdown_action(), which makes DUT shutdown, and power it on. 1678 1679 @param shutdown_action: function which makes DUT shutdown, like 1680 pressing power key. 1681 @param pre_power_action: function which is called before next power on. 1682 @param run_power_action: power_key press by default, set to None to skip. 1683 @param post_power_action: function which is called after next power on. 1684 @param shutdown_timeout: a timeout to confirm DUT shutdown. 1685 @raise TestFail: if the shutdown_action() failed to turn DUT off. 1686 """ 1687 self._call_action(shutdown_action) 1688 logging.info('Wait to ensure DUT shut down...') 1689 try: 1690 if shutdown_timeout is None: 1691 shutdown_timeout = self.faft_config.shutdown_timeout 1692 self.switcher.wait_for_client(timeout=shutdown_timeout) 1693 raise error.TestFail( 1694 'Should shut the device down after calling %s.' % 1695 shutdown_action.__name__) 1696 except ConnectionError: 1697 if self.faft_config.chrome_ec: 1698 self.check_shutdown_power_state(self.POWER_STATE_G3) 1699 logging.info( 1700 'DUT is surely shutdown. We are going to power it on again...') 1701 1702 if pre_power_action: 1703 self._call_action(pre_power_action) 1704 if run_power_action: 1705 self.servo.power_key(self.faft_config.hold_pwr_button_poweron) 1706 if post_power_action: 1707 self._call_action(post_power_action) 1708 1709 def get_bootid(self, retry=3): 1710 """ 1711 Return the bootid. 1712 """ 1713 boot_id = None 1714 while retry: 1715 try: 1716 boot_id = self._client.get_boot_id() 1717 break 1718 except error.AutoservRunError: 1719 retry -= 1 1720 if retry: 1721 logging.info('Retry to get boot_id...') 1722 else: 1723 logging.warning('Failed to get boot_id.') 1724 logging.info('boot_id: %s', boot_id) 1725 return boot_id 1726 1727 def check_state(self, func): 1728 """ 1729 Wrapper around _call_action with check_status set to True. This is a 1730 helper function to be used by tests and is currently implemented by 1731 calling _call_action with check_status=True. 1732 1733 TODO: This function's arguments need to be made more stringent. And 1734 its functionality should be moved over to check functions directly in 1735 the future. 1736 1737 @param func: A function, or a tuple (function, args, error_msg), 1738 in which, args and error_msg are optional. args is 1739 either a value or a tuple if multiple arguments. 1740 This can also be a list containing multiple 1741 function or tuple. In this case, these actions are 1742 called in sequence. 1743 @return: The result value of the action function. 1744 @raise TestFail: If the function does notsucceed. 1745 """ 1746 logging.info("-[FAFT]-[ start stepstate_checker ]----------") 1747 self._call_action(func, check_status=True) 1748 logging.info("-[FAFT]-[ end state_checker ]----------------") 1749 1750 def get_current_firmware_identity(self): 1751 """Get current firmware sha and fwids of body and vblock. 1752 1753 @return: Current firmware checksums and fwids, as a dict 1754 """ 1755 1756 current_checksums = { 1757 'VBOOTA': self.faft_client.bios.get_sig_sha('a'), 1758 'FVMAINA': self.faft_client.bios.get_body_sha('a'), 1759 'VBOOTB': self.faft_client.bios.get_sig_sha('b'), 1760 'FVMAINB': self.faft_client.bios.get_body_sha('b'), 1761 } 1762 if not all(current_checksums.values()): 1763 raise error.TestError( 1764 'Failed to get firmware sha: %s', current_checksums) 1765 1766 current_fwids = { 1767 'RO_FRID': self.faft_client.bios.get_section_fwid('ro'), 1768 'RW_FWID_A': self.faft_client.bios.get_section_fwid('a'), 1769 'RW_FWID_B': self.faft_client.bios.get_section_fwid('b'), 1770 } 1771 if not all(current_fwids.values()): 1772 raise error.TestError( 1773 'Failed to get firmware fwid(s): %s', current_fwids) 1774 1775 identifying_info = dict(current_fwids) 1776 identifying_info.update(current_checksums) 1777 return identifying_info 1778 1779 def is_firmware_changed(self): 1780 """Check if the current firmware changed, by comparing its SHA and fwid. 1781 1782 @return: True if it is changed, otherwise False. 1783 """ 1784 # Device may not be rebooted after test. 1785 self.faft_client.bios.reload() 1786 1787 current_info = self.get_current_firmware_identity() 1788 prev_info = self._backup_firmware_identity 1789 1790 if current_info == prev_info: 1791 return False 1792 else: 1793 changed = set() 1794 for section in set(current_info.keys()) | set(prev_info.keys()): 1795 if current_info.get(section) != prev_info.get(section): 1796 changed.add(section) 1797 1798 logging.info('Firmware changed: %s', ', '.join(sorted(changed))) 1799 return True 1800 1801 def backup_firmware(self, suffix='.original'): 1802 """Backup firmware to file, and then send it to host. 1803 1804 @param suffix: a string appended to backup file name 1805 """ 1806 remote_temp_dir = self.faft_client.system.create_temp_dir() 1807 remote_bios_path = os.path.join(remote_temp_dir, 'bios') 1808 self.faft_client.bios.dump_whole(remote_bios_path) 1809 self._client.get_file(remote_bios_path, 1810 os.path.join(self.resultsdir, 'bios' + suffix)) 1811 1812 if self.faft_config.chrome_ec: 1813 remote_ec_path = os.path.join(remote_temp_dir, 'ec') 1814 self.faft_client.ec.dump_whole(remote_ec_path) 1815 self._client.get_file(remote_ec_path, 1816 os.path.join(self.resultsdir, 'ec' + suffix)) 1817 1818 self._client.run('rm -rf %s' % remote_temp_dir) 1819 logging.info('Backup firmware stored in %s with suffix %s', 1820 self.resultsdir, suffix) 1821 1822 self._backup_firmware_identity = self.get_current_firmware_identity() 1823 1824 def is_firmware_saved(self): 1825 """Check if a firmware saved (called backup_firmware before). 1826 1827 @return: True if the firmware is backed up; otherwise False. 1828 """ 1829 return bool(self._backup_firmware_identity) 1830 1831 def restore_firmware(self, suffix='.original', restore_ec=True, 1832 reboot_ec=False): 1833 """Restore firmware from host in resultsdir. 1834 1835 @param suffix: a string appended to backup file name 1836 @param restore_ec: True to restore the ec firmware; False not to do. 1837 @param reboot_ec: True to reboot EC after restore (if it was restored) 1838 @return: True if firmware needed to be restored 1839 """ 1840 if not self.is_firmware_changed(): 1841 return False 1842 1843 # Backup current corrupted firmware. 1844 self.backup_firmware(suffix='.corrupt') 1845 1846 # Restore firmware. 1847 remote_temp_dir = self.faft_client.system.create_temp_dir() 1848 1849 bios_local = os.path.join(self.resultsdir, 'bios%s' % suffix) 1850 bios_remote = os.path.join(remote_temp_dir, 'bios%s' % suffix) 1851 self._client.send_file(bios_local, bios_remote) 1852 self.faft_client.bios.write_whole(bios_remote) 1853 1854 if self.faft_config.chrome_ec and restore_ec: 1855 ec_local = os.path.join(self.resultsdir, 'ec%s' % suffix) 1856 ec_remote = os.path.join(remote_temp_dir, 'ec%s' % suffix) 1857 self._client.send_file(ec_local, ec_remote) 1858 ec_cmd = self.faft_client.ec.get_write_cmd(ec_remote) 1859 try: 1860 self._client.run(ec_cmd, timeout=300) 1861 except error.AutoservSSHTimeout: 1862 logging.warn("DUT connection died during EC restore") 1863 self.faft_client.disconnect() 1864 1865 except error.GenericHostRunError: 1866 logging.warn("DUT command failed during EC restore") 1867 logging.debug("Full exception:", exc_info=True) 1868 if reboot_ec: 1869 self.switcher.mode_aware_reboot( 1870 'custom', lambda: self.sync_and_ec_reboot('hard')) 1871 else: 1872 self.switcher.mode_aware_reboot() 1873 else: 1874 self.switcher.mode_aware_reboot() 1875 logging.info('Successfully restored firmware.') 1876 return True 1877 1878 def setup_firmwareupdate_shellball(self, shellball=None): 1879 """Setup a shellball to use in firmware update test. 1880 1881 Check if there is a given shellball, and it is a shell script. Then, 1882 send it to the remote host. Otherwise, use the 1883 /usr/sbin/chromeos-firmwareupdate in the image and replace its inside 1884 BIOS and EC images with the active firmware images. 1885 1886 @param shellball: path of a shellball or default to None. 1887 """ 1888 if shellball: 1889 # Determine the firmware file is a shellball or a raw binary. 1890 is_shellball = (utils.system_output("file %s" % shellball).find( 1891 "shell script") != -1) 1892 if is_shellball: 1893 logging.info('Device will update firmware with shellball %s', 1894 shellball) 1895 temp_path = self.faft_client.updater.get_temp_path() 1896 working_shellball = os.path.join(temp_path, 1897 'chromeos-firmwareupdate') 1898 self._client.send_file(shellball, working_shellball) 1899 self.faft_client.updater.extract_shellball() 1900 else: 1901 raise error.TestFail( 1902 'The given shellball is not a shell script.') 1903 else: 1904 logging.info('No shellball given, use the original shellball and ' 1905 'replace its BIOS and EC images.') 1906 work_path = self.faft_client.updater.get_work_path() 1907 bios_in_work_path = os.path.join( 1908 work_path, self.faft_client.updater.get_bios_relative_path()) 1909 ec_in_work_path = os.path.join( 1910 work_path, self.faft_client.updater.get_ec_relative_path()) 1911 logging.info('Writing current BIOS to: %s', bios_in_work_path) 1912 self.faft_client.bios.dump_whole(bios_in_work_path) 1913 if self.faft_config.chrome_ec: 1914 logging.info('Writing current EC to: %s', ec_in_work_path) 1915 self.faft_client.ec.dump_firmware(ec_in_work_path) 1916 self.faft_client.updater.repack_shellball() 1917 1918 def is_kernel_changed(self): 1919 """Check if the current kernel is changed, by comparing its SHA1 hash. 1920 1921 @return: True if it is changed; otherwise, False. 1922 """ 1923 changed = False 1924 for p in ('A', 'B'): 1925 backup_sha = self._backup_kernel_sha.get(p, None) 1926 current_sha = self.faft_client.kernel.get_sha(p) 1927 if backup_sha != current_sha: 1928 changed = True 1929 logging.info('Kernel %s is changed', p) 1930 return changed 1931 1932 def backup_kernel(self, suffix='.original'): 1933 """Backup kernel to files, and the send them to host. 1934 1935 @param suffix: a string appended to backup file name. 1936 """ 1937 remote_temp_dir = self.faft_client.system.create_temp_dir() 1938 for p in ('A', 'B'): 1939 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1940 self.faft_client.kernel.dump(p, remote_path) 1941 self._client.get_file( 1942 remote_path, 1943 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix))) 1944 self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p) 1945 logging.info('Backup kernel stored in %s with suffix %s', 1946 self.resultsdir, suffix) 1947 1948 def is_kernel_saved(self): 1949 """Check if kernel images are saved (backup_kernel called before). 1950 1951 @return: True if the kernel is saved; otherwise, False. 1952 """ 1953 return len(self._backup_kernel_sha) != 0 1954 1955 def restore_kernel(self, suffix='.original'): 1956 """Restore kernel from host in resultsdir. 1957 1958 @param suffix: a string appended to backup file name. 1959 """ 1960 if not self.is_kernel_changed(): 1961 return 1962 1963 # Backup current corrupted kernel. 1964 self.backup_kernel(suffix='.corrupt') 1965 1966 # Restore kernel. 1967 remote_temp_dir = self.faft_client.system.create_temp_dir() 1968 for p in ('A', 'B'): 1969 remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p) 1970 self._client.send_file( 1971 os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)), 1972 remote_path) 1973 self.faft_client.kernel.write(p, remote_path) 1974 1975 self.switcher.mode_aware_reboot() 1976 logging.info('Successfully restored kernel.') 1977 1978 def backup_cgpt_attributes(self): 1979 """Backup CGPT partition table attributes.""" 1980 self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes() 1981 1982 def restore_cgpt_attributes(self): 1983 """Restore CGPT partition table attributes.""" 1984 current_table = self.faft_client.cgpt.get_attributes() 1985 if current_table == self._backup_cgpt_attr: 1986 return 1987 logging.info('CGPT table is changed. Original: %r. Current: %r.', 1988 self._backup_cgpt_attr, 1989 current_table) 1990 self.faft_client.cgpt.set_attributes( 1991 self._backup_cgpt_attr['A'], self._backup_cgpt_attr['B']) 1992 1993 self.switcher.mode_aware_reboot() 1994 logging.info('Successfully restored CGPT table.') 1995 1996 def try_fwb(self, count=0): 1997 """set to try booting FWB count # times 1998 1999 Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for 2000 vboot2 2001 2002 @param count: an integer specifying value to program into 2003 fwb_tries(vb1)/fw_try_next(vb2) 2004 """ 2005 if self.fw_vboot2: 2006 self.faft_client.system.set_fw_try_next('B', count) 2007 else: 2008 # vboot1: we need to boot into fwb at least once 2009 if not count: 2010 count = count + 1 2011 self.faft_client.system.set_try_fw_b(count) 2012 2013 def identify_shellball(self, include_ec=None): 2014 """Get the FWIDs of all targets and sections in the shellball 2015 2016 @param include_ec: if True, get EC fwids. 2017 If None (default), assume True if board has an EC 2018 @return: the dict of versions in the shellball 2019 """ 2020 fwids = dict() 2021 fwids['bios'] = self.faft_client.updater.get_image_fwids('bios') 2022 2023 if include_ec is None: 2024 if self.faft_config.platform.lower() == 'samus': 2025 include_ec = False # no ec.bin in shellball 2026 else: 2027 include_ec = self.faft_config.chrome_ec 2028 2029 if include_ec: 2030 fwids['ec'] = self.faft_client.updater.get_image_fwids('ec') 2031 return fwids 2032 2033 def modify_shellball(self, append, modify_ro=True, modify_ec=False): 2034 """Modify the FWIDs of targets and sections in the shellball 2035 2036 @return: the full path of the shellball 2037 """ 2038 2039 if modify_ro: 2040 self.faft_client.updater.modify_image_fwids( 2041 'bios', ['ro', 'a', 'b']) 2042 else: 2043 self.faft_client.updater.modify_image_fwids( 2044 'bios', ['a', 'b']) 2045 2046 if modify_ec: 2047 if modify_ro: 2048 self.faft_client.updater.modify_image_fwids( 2049 'ec', ['ro', 'rw']) 2050 else: 2051 self.faft_client.updater.modify_image_fwids( 2052 'ec', ['rw']) 2053 2054 modded_shellball = self.faft_client.updater.repack_shellball(append) 2055 2056 return modded_shellball 2057 2058 @staticmethod 2059 def check_fwids_written(before_fwids, image_fwids, after_fwids, 2060 expected_written): 2061 """Check the dicts of fwids for correctness after an update is applied. 2062 2063 The targets checked come from the keys of expected_written. 2064 The sections checked come from the inner dicts of the fwids parameters. 2065 2066 The fwids should be keyed by target (flash type), then by section: 2067 {'bios': {'ro': '<fwid>', 'a': '<fwid>', 'b': '<fwid>'}, 2068 'ec': {'ro': '<fwid>', 'rw': '<fwid>'} 2069 2070 For expected_written, the dict should be keyed by flash type only: 2071 {'bios': ['ro'], 'ec': ['ro', 'rw']} 2072 To expect the contents completely unchanged, give only the keys: 2073 {'bios': [], 'ec': []} or {'bios': None, 'ec': None} 2074 2075 @param before_fwids: dict of versions from before the update 2076 @param image_fwids: dict of versions in the update 2077 @param after_fwids: dict of actual versions after the update 2078 @param expected_written: dict indicating which ones should have changed 2079 @return: list of error lines for mismatches 2080 2081 @type before_fwids: dict 2082 @type image_fwids: dict | None 2083 @type after_fwids: dict 2084 @type expected_written: dict 2085 @rtype: list 2086 """ 2087 errors = [] 2088 2089 if image_fwids is None: 2090 image_fwids = {} 2091 2092 for target in sorted(expected_written.keys()): 2093 # target is BIOS or EC 2094 2095 before_missing = (target not in before_fwids) 2096 after_missing = (target not in after_fwids) 2097 if before_missing or after_missing: 2098 if before_missing: 2099 errors.append("...no before_fwids[%s]" % target) 2100 if after_missing: 2101 errors.append("...no after_fwids[%s]" % target) 2102 continue 2103 2104 written_sections = expected_written.get(target) or list() 2105 written_sections = set(written_sections) 2106 2107 before_sections = set(before_fwids.get(target) or dict()) 2108 image_sections = set(image_fwids.get(target) or dict()) 2109 after_sections = set(after_fwids.get(target) or dict()) 2110 2111 for section in before_sections | image_sections | after_sections: 2112 # section is RO, RW, A, or B 2113 2114 before_fwid = before_fwids[target][section] 2115 image_fwid = image_fwids.get(target, {}).get(section, None) 2116 actual_fwid = after_fwids[target][section] 2117 2118 if section in written_sections: 2119 expected_fwid = image_fwid 2120 expected_desc = 'rewritten fwid (%s)' % expected_fwid 2121 if image_fwid == before_fwid: 2122 expected_desc = ('rewritten (no changes) fwid (%s)' % 2123 expected_fwid) 2124 else: 2125 expected_fwid = before_fwid 2126 expected_desc = 'original fwid (%s)' % expected_fwid 2127 2128 if actual_fwid == expected_fwid: 2129 actual_desc = 'correct value' 2130 2131 elif actual_fwid == image_fwid: 2132 actual_desc = 'rewritten fwid (%s)' % actual_fwid 2133 if image_fwid == before_fwid: 2134 # The flash could have been rewritten with the same fwid 2135 actual_desc = 'possibly written fwid (%s)' % actual_fwid 2136 2137 elif actual_fwid == before_fwid: 2138 actual_desc = 'original fwid (%s)' % actual_fwid 2139 2140 else: 2141 actual_desc = 'unknown fwid (%s)' % actual_fwid 2142 2143 msg = ("...FWID (%s %s): expected %s, got %s" % 2144 (target.upper(), section.upper(), 2145 expected_desc, actual_desc)) 2146 2147 if actual_fwid != expected_fwid: 2148 errors.append(msg) 2149 return errors 2150 2151 2152 def fwmp_is_cleared(self): 2153 """Return True if the FWMP has been created""" 2154 res = self.host.run('cryptohome ' 2155 '--action=get_firmware_management_parameters', 2156 ignore_status=True) 2157 if res.exit_status and res.exit_status != self.FWMP_CLEARED_EXIT_STATUS: 2158 raise error.TestError('Could not run cryptohome command %r' % res) 2159 return self.FWMP_CLEARED_ERROR_MSG in res.stdout 2160 2161 2162 def _tpm_is_owned(self): 2163 """Returns True if the tpm is owned""" 2164 result = self.host.run('tpm_manager_client status --nonsensitive', 2165 ignore_status=True) 2166 logging.debug(result) 2167 return result.exit_status == 0 and 'is_owned: true' in result.stdout 2168 2169 def clear_fwmp(self): 2170 """Clear the FWMP""" 2171 if self.fwmp_is_cleared(): 2172 return 2173 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 2174 self.host.run('tpm_manager_client take_ownership') 2175 if not utils.wait_for_value(self._tpm_is_owned, expected_value=True): 2176 raise error.TestError('Unable to own tpm while clearing fwmp.') 2177 self.host.run('cryptohome ' 2178 '--action=remove_firmware_management_parameters') 2179 2180 def wait_for(self, cfg_field, action_msg=None, extra_time=0): 2181 """Waits for time specified in a config. 2182 2183 @ivar cfg_field: The name of the config field that specifies the 2184 time to wait. 2185 @ivar action_msg: Optional log message describing the action that 2186 will occur after the wait. 2187 @ivar extra_time: Additional time to be added to time from config. 2188 """ 2189 wait_time = self.faft_config.__getattr__(cfg_field) + extra_time 2190 if extra_time: 2191 wait_src = "%s + %s" % (cfg_field, extra_time) 2192 else: 2193 wait_src = cfg_field 2194 2195 units = 'second' if wait_time==1 else 'seconds' 2196 start_msg = "Waiting %s(%s) %s" % (wait_time, wait_src, units) 2197 if action_msg: 2198 start_msg += ", before '%s'" % action_msg 2199 start_msg += "." 2200 2201 logging.info(start_msg) 2202 time.sleep(wait_time) 2203 logging.info("Done waiting.") 2204 2205 def _try_to_bring_dut_up(self): 2206 """Try to quickly get the dut in a pingable state""" 2207 if not hasattr(self, 'cr50'): 2208 raise error.TestNAError('Test can only be run on devices with ' 2209 'access to the Cr50 console') 2210 2211 logging.info('checking dut state') 2212 2213 self.servo.set_nocheck('cold_reset', 'off') 2214 try: 2215 self.servo.set_nocheck('warm_reset', 'off') 2216 except error.TestFail as e: 2217 # TODO(b/159338538): remove once the kukui remap issue is resolved. 2218 if 'Timed out waiting for interfaces to become available' in str(e): 2219 logging.warn('Ignoring warm_reset interface issue b/159338538') 2220 else: 2221 raise 2222 2223 time.sleep(self.cr50.SHORT_WAIT) 2224 if not self.cr50.ap_is_on(): 2225 logging.info('Pressing power button to turn on AP') 2226 self.servo.power_short_press() 2227 2228 end_time = time.time() + self.RESPONSE_TIMEOUT 2229 while not self.host.ping_wait_up( 2230 self.faft_config.delay_reboot_to_ping * 2): 2231 if time.time() > end_time: 2232 logging.warn('DUT is unresponsive after trying to bring it up') 2233 return 2234 self.servo.get_power_state_controller().reset() 2235 logging.info('DUT did not respond. Resetting it.') 2236 2237 def _check_open_and_press_power_button(self): 2238 """Check stdout and press the power button if prompted. 2239 2240 Returns: 2241 True if the process is still running. 2242 """ 2243 if not hasattr(self, 'cr50'): 2244 raise error.TestNAError('Test can only be run on devices with ' 2245 'access to the Cr50 console') 2246 2247 logging.info(self._get_ccd_open_output()) 2248 self.servo.power_short_press() 2249 logging.info('long int power button press') 2250 # power button press cr50 erases nvmem and resets the dut before setting 2251 # the state to open. Wait a bit so we don't check the ccd state in the 2252 # middle of this reset process. Power button requests happen once a 2253 # minute, so waiting 10 seconds isn't a big deal. 2254 time.sleep(10) 2255 return (self.cr50.OPEN == self.cr50.get_ccd_level() or 2256 self._ccd_open_job.sp.poll() is not None) 2257 2258 def _get_ccd_open_output(self): 2259 """Read the new output.""" 2260 if not hasattr(self, 'cr50'): 2261 raise error.TestNAError('Test can only be run on devices with ' 2262 'access to the Cr50 console') 2263 2264 self._ccd_open_job.process_output() 2265 self._ccd_open_stdout.seek(self._ccd_open_last_len) 2266 output = self._ccd_open_stdout.read() 2267 self._ccd_open_last_len = self._ccd_open_stdout.len 2268 return output 2269 2270 def _close_ccd_open_job(self): 2271 """Terminate the process and check the results.""" 2272 if not hasattr(self, 'cr50'): 2273 raise error.TestNAError('Test can only be run on devices with ' 2274 'access to the Cr50 console') 2275 2276 exit_status = utils.nuke_subprocess(self._ccd_open_job.sp) 2277 stdout = self._ccd_open_stdout.getvalue().strip() 2278 delattr(self, '_ccd_open_job') 2279 if stdout: 2280 logging.info('stdout of ccd open:\n%s', stdout) 2281 if exit_status: 2282 logging.info('exit status: %d', exit_status) 2283 if 'Error' in stdout: 2284 raise error.TestFail('ccd open Error %s' % 2285 stdout.split('Error')[-1]) 2286 if self.cr50.OPEN != self.cr50.get_ccd_level(): 2287 raise error.TestFail('unable to open cr50: %s' % stdout) 2288 else: 2289 logging.info('Opened Cr50') 2290 2291 def ccd_open_from_ap(self): 2292 """Start the open process and press the power button.""" 2293 if not hasattr(self, 'cr50'): 2294 raise error.TestNAError('Test can only be run on devices with ' 2295 'access to the Cr50 console') 2296 2297 # Opening CCD requires power button presses. If those presses would 2298 # power off the AP and prevent CCD open from completing, ignore them. 2299 if self.faft_config.ec_forwards_short_pp_press: 2300 self.stop_powerd() 2301 2302 # Make sure the test waits long enough to avoid ccd rate limiting. 2303 time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT) 2304 2305 self._ccd_open_last_len = 0 2306 2307 self._ccd_open_stdout = StringIO.StringIO() 2308 2309 ccd_open_cmd = utils.sh_escape('gsctool -a -o') 2310 full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'), 2311 ccd_open_cmd) 2312 # Start running the Cr50 Open process in the background. 2313 self._ccd_open_job = utils.BgJob(full_ssh_cmd, 2314 nickname='ccd_open', 2315 stdout_tee=self._ccd_open_stdout, 2316 stderr_tee=utils.TEE_TO_LOGS) 2317 if self._ccd_open_job == None: 2318 raise error.TestFail('could not start ccd open') 2319 2320 try: 2321 # Wait for the first gsctool power button prompt before starting the 2322 # open process. 2323 logging.info(self._get_ccd_open_output()) 2324 # Cr50 starts out by requesting 5 quick presses then 4 longer 2325 # power button presses. Run the quick presses without looking at the 2326 # command output, because getting the output can take some time. For 2327 # the presses that require a 1 minute wait check the output between 2328 # presses, so we can catch errors 2329 # 2330 # run quick presses for 30 seconds. It may take a couple of seconds 2331 # for open to start. 10 seconds should be enough. 30 is just used 2332 # because it will definitely be enough, and this process takes 300 2333 # seconds, so doing quick presses for 30 seconds won't matter. 2334 end_time = time.time() + 30 2335 while time.time() < end_time: 2336 self.servo.power_short_press() 2337 logging.info('short int power button press') 2338 time.sleep(self.PP_SHORT_INTERVAL) 2339 # Poll the output and press the power button for the longer presses. 2340 utils.wait_for_value(self._check_open_and_press_power_button, 2341 expected_value=True, 2342 timeout_sec=self.cr50.PP_LONG) 2343 except Exception as e: 2344 logging.info(e) 2345 raise 2346 finally: 2347 self._close_ccd_open_job() 2348 self._try_to_bring_dut_up() 2349 logging.info(self.cr50.get_ccd_info()) 2350 2351 def enter_mode_after_checking_cr50_state(self, mode): 2352 """Reboot to mode if cr50 doesn't already match the state""" 2353 if not hasattr(self, 'cr50'): 2354 raise error.TestNAError('Test can only be run on devices with ' 2355 'access to the Cr50 console') 2356 2357 # If the device is already in the correct mode, don't do anything 2358 if (mode == 'dev') == self.cr50.in_dev_mode(): 2359 logging.info('already in %r mode', mode) 2360 return 2361 2362 self.switcher.reboot_to_mode(to_mode=mode) 2363 2364 if (mode == 'dev') != self.cr50.in_dev_mode(): 2365 raise error.TestError('Unable to enter %r mode' % mode) 2366 2367 def fast_ccd_open(self, enable_testlab=False, reset_ccd=True, 2368 dev_mode=False): 2369 """Try to use ccd testlab open. If that fails, do regular ap open. 2370 2371 Args: 2372 enable_testlab: If True, enable testlab mode after cr50 is open. 2373 reset_ccd: If True, reset ccd after open. 2374 dev_mode: True if the device should be in dev mode after ccd is 2375 is opened. 2376 """ 2377 if not hasattr(self, 'cr50'): 2378 raise error.TestNAError('Test can only be run on devices with ' 2379 'access to the Cr50 console') 2380 2381 if self.servo.main_device_is_ccd(): 2382 error_txt = 'because the main servo device is CCD.' 2383 if enable_testlab: 2384 raise error.TestNAError('Cannot enable testlab: %s' % error_txt) 2385 elif reset_ccd: 2386 raise error.TestNAError('CCD reset not allowed: %s' % error_txt) 2387 2388 if not self.faft_config.has_powerbutton: 2389 logging.warning('No power button', exc_info=True) 2390 enable_testlab = False 2391 2392 # Try to use testlab open first, so we don't have to wait for the 2393 # physical presence check. 2394 self.cr50.send_command('ccd testlab open') 2395 if self.cr50.OPEN != self.cr50.get_ccd_level(): 2396 if self.servo.has_control('chassis_open'): 2397 self.servo.set('chassis_open', 'yes') 2398 pw = '' if self.cr50.password_is_reset() else self.CCD_PASSWORD 2399 # Use the console to open cr50 without entering dev mode if 2400 # possible. Ittakes longer and relies on more systems to enter dev 2401 # mode and ssh into the AP. Skip the steps that aren't required. 2402 if not (pw or self.cr50.get_cap( 2403 'OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]): 2404 self.enter_mode_after_checking_cr50_state('dev') 2405 2406 if pw or self.cr50.get_cap( 2407 'OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]: 2408 self.cr50.set_ccd_level(self.cr50.OPEN, pw) 2409 else: 2410 self.ccd_open_from_ap() 2411 2412 if self.servo.has_control('chassis_open'): 2413 self.servo.set('chassis_open', 'no') 2414 2415 if enable_testlab: 2416 self.cr50.set_ccd_testlab('on') 2417 2418 if reset_ccd: 2419 self.cr50.send_command('ccd reset') 2420 2421 # In default, the device should be in normal mode. After opening cr50, 2422 # the TPM should be cleared and the device should automatically reset to 2423 # normal mode. However, some tests might want the device in 'dev' mode. 2424 self.enter_mode_after_checking_cr50_state('dev' if dev_mode else 2425 'normal') 2426