• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import print_function
6
7import ctypes
8import logging
9import os
10import pprint
11import re
12import StringIO
13import time
14import uuid
15
16from autotest_lib.client.bin import utils
17from autotest_lib.client.common_lib import error
18from autotest_lib.client.common_lib import global_config
19from autotest_lib.client.common_lib.cros import retry
20from autotest_lib.client.common_lib.cros import tpm_utils
21from autotest_lib.server import test
22from autotest_lib.server.cros import vboot_constants as vboot
23from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
24from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
25from autotest_lib.server.cros.faft.utils import mode_switcher
26from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
27from autotest_lib.server.cros.power import utils as PowerUtils
28from autotest_lib.server.cros.servo import chrome_base_ec
29from autotest_lib.server.cros.servo import chrome_cr50
30from autotest_lib.server.cros.servo import chrome_ec
31from autotest_lib.server.cros.servo import servo
32from autotest_lib.server.cros.faft import telemetry
33
34# Experimentally tuned time in minutes to wait for partition device nodes on a
35# USB stick to be ready after plugging in the stick.
36PARTITION_TABLE_READINESS_TIMEOUT = 0.1  # minutes
37# Experimentally tuned time in seconds to wait for the first retry of reading
38# the sysfs node of a USB stick's partition device node.
39PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY = 1  # seconds
40
41ConnectionError = mode_switcher.ConnectionError
42
43
44class FirmwareTest(test.test):
45    """
46    Base class that sets up helper objects/functions for firmware tests.
47
48    It launches the FAFTClient on DUT, such that the test can access its
49    firmware functions and interfaces. It also provides some methods to
50    handle the reboot mechanism, in order to ensure FAFTClient is still
51    connected after reboot.
52    @type servo: servo.Servo
53    @type _client: autotest_lib.server.hosts.ssh_host.SSHHost |
54                   autotest_lib.server.hosts.cros_host.CrosHost
55
56    TODO: add documentaion as the FAFT rework progresses.
57    """
58    version = 1
59
60    # Set this to True in test classes that need to boot from the USB stick.
61    # When True, initialize() will raise TestWarn if USB stick is marked bad.
62    NEEDS_SERVO_USB = False
63
64    # Mapping of partition number of kernel and rootfs.
65    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
66    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
67    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
68    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
69
70    CHROMEOS_MAGIC = "CHROMEOS"
71    CORRUPTED_MAGIC = "CORRUPTD"
72
73    # System power states
74    POWER_STATE_S0 = 'S0'
75    POWER_STATE_S0IX = 'S0ix'
76    POWER_STATE_S3 = 'S3'
77    POWER_STATE_S5 = 'S5'
78    POWER_STATE_G3 = 'G3'
79    POWER_STATE_SUSPEND = '|'.join([POWER_STATE_S0IX, POWER_STATE_S3])
80
81    # Delay for waiting client to return before EC suspend
82    EC_SUSPEND_DELAY = 5
83
84    # Delay between EC suspend and wake
85    WAKE_DELAY = 10
86
87    # Delay between closing and opening lid
88    LID_DELAY = 1
89
90    # Delay for establishing state after changing PD settings
91    PD_RESYNC_DELAY = 2
92
93    # The default number of power state check retries (each try takes 3 secs)
94    DEFAULT_PWR_RETRIES = 5
95
96    # FWMP space constants
97    FWMP_CLEARED_EXIT_STATUS = 1
98    FWMP_CLEARED_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS'
99                              '_INVALID')
100
101    _ROOTFS_PARTITION_NUMBER = 3
102
103    # Class level variable, keep track the states of one time setup.
104    # This variable is preserved across tests which inherit this class.
105    _global_setup_done = {
106        'gbb_flags': False,
107        'reimage': False,
108        'usb_check': False,
109    }
110
111    # CCD password used by tests.
112    CCD_PASSWORD = 'Password'
113
114    RESPONSE_TIMEOUT = 180
115
116    @classmethod
117    def check_setup_done(cls, label):
118        """Check if the given setup is done.
119
120        @param label: The label of the setup.
121        """
122        return cls._global_setup_done[label]
123
124    @classmethod
125    def mark_setup_done(cls, label):
126        """Mark the given setup done.
127
128        @param label: The label of the setup.
129        """
130        cls._global_setup_done[label] = True
131
132    @classmethod
133    def unmark_setup_done(cls, label):
134        """Mark the given setup not done.
135
136        @param label: The label of the setup.
137        """
138        cls._global_setup_done[label] = False
139
140    def initialize(self, host, cmdline_args, ec_wp=None):
141        """Initialize the FirmwareTest.
142
143        This method interacts with the Servo, FAFT RPC client, FAFT Config,
144        Mode Switcher, EC consoles, write-protection, GBB flags, and a lockfile.
145
146        @type host: autotest_lib.server.hosts.CrosHost
147        """
148        self.run_id = str(uuid.uuid4())
149        self._client = host
150        self.servo = host.servo
151
152        self.lockfile = '/usr/local/tmp/faft/lock'
153        self._backup_gbb_flags = None
154        self._backup_firmware_identity = dict()
155        self._backup_kernel_sha = dict()
156        self._backup_cgpt_attr = dict()
157        self._backup_dev_mode = None
158        self._restore_power_mode = None
159        self._uart_file_dict = {}
160
161        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
162
163        # Parse arguments from command line
164        args = {}
165        self.power_control = host.POWER_CONTROL_RPM
166        for arg in cmdline_args:
167            match = re.search("^(\w+)=(.+)", arg)
168            if match:
169                args[match.group(1)] = match.group(2)
170
171        self._no_fw_rollback_check = False
172        if 'no_fw_rollback_check' in args:
173            if 'true' in args['no_fw_rollback_check'].lower():
174                self._no_fw_rollback_check = True
175
176        self._no_ec_sync = False
177        if 'no_ec_sync' in args:
178            if 'true' in args['no_ec_sync'].lower():
179                self._no_ec_sync = True
180
181        self._use_sync_script = global_config.global_config.get_config_value(
182                'CROS', 'enable_fs_sync_script', type=bool, default=False)
183
184        self.servo.initialize_dut()
185        self.faft_client = RPCProxy(host)
186        self.faft_config = FAFTConfig(
187                self.faft_client.system.get_platform_name(),
188                self.faft_client.system.get_model_name())
189        self.checkers = FAFTCheckers(self)
190
191        if self.faft_config.chrome_ec:
192            self.ec = chrome_ec.ChromeEC(self.servo)
193        self.switcher = mode_switcher.create_mode_switcher(self)
194        # Check for presence of a USBPD console
195        if self.faft_config.chrome_usbpd:
196            self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
197        elif self.faft_config.chrome_ec:
198            # If no separate USBPD console, then PD exists on EC console
199            self.usbpd = self.ec
200        # Get pdtester console
201        self.pdtester = host.pdtester
202        self.pdtester_host = host._pdtester_host
203        # Check for presence of a working Cr50 console
204        if self.servo.has_control('cr50_version'):
205            try:
206                # Check that the console works before declaring the cr50 console
207                # connection exists and enabling uart capture.
208                cr50 = chrome_cr50.ChromeCr50(self.servo, self.faft_config)
209                cr50.get_version()
210                self.cr50 = cr50
211            except servo.ControlUnavailableError:
212                logging.warn('cr50 console not supported.')
213            except Exception as e:
214                logging.warn('Ignored unknown cr50 version error: %s', str(e))
215
216        if 'power_control' in args:
217            self.power_control = args['power_control']
218            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
219                raise error.TestError('Valid values for --args=power_control '
220                                      'are %s. But you entered wrong argument '
221                                      'as "%s".'
222                                      % (host.POWER_CONTROL_VALID_ARGS,
223                                         self.power_control))
224
225        if self.NEEDS_SERVO_USB and not host.is_servo_usb_usable():
226            usb_state = host.get_servo_usb_state()
227            raise error.TestWarn(
228                    "Servo USB disk unusable (%s); canceling test." %
229                    usb_state)
230
231        if not self.faft_client.system.dev_tpm_present():
232            raise error.TestError('/dev/tpm0 does not exist on the client')
233
234        # Initialize servo role to src
235        self.servo.set_servo_v4_role('src')
236
237        # Create the BaseEC object. None if not available.
238        self.base_ec = chrome_base_ec.create_base_ec(self.servo)
239
240        self._record_uart_capture()
241        self._record_system_info()
242        self.faft_client.system.set_dev_default_boot()
243        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
244        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
245        if self.fw_vboot2:
246            self.faft_client.system.set_fw_try_next('A')
247            if self.faft_client.system.get_crossystem_value(
248                    'mainfw_act') == 'B':
249                logging.info('mainfw_act is B. rebooting to set it A')
250                # TODO(crbug.com/1018322): remove try/catch once that bug is
251                # marked as fixed and verified. In that case the overlay for
252                # the board itself will map warm_reset to cold_reset.
253                try:
254                    self.switcher.mode_aware_reboot()
255                except ConnectionError as e:
256                    if 'DUT is still up unexpectedly' in str(e):
257                        # In this case, try doing a cold_reset instead
258                        self.switcher.mode_aware_reboot(reboot_type='cold')
259                    else:
260                        raise
261
262        # Check flashrom before first use, to avoid xmlrpclib.Fault.
263        if not self.faft_client.bios.is_available():
264            raise error.TestError(
265                    "flashrom is broken; check 'flashrom -p host'"
266                    "and rpc server log.")
267
268        self._setup_gbb_flags()
269        self.faft_client.updater.stop_daemon()
270        self._create_faft_lockfile()
271        self._create_old_faft_lockfile()
272        self._setup_ec_write_protect(ec_wp)
273        # See chromium:239034 regarding needing this sync.
274        self.blocking_sync()
275        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
276
277    def stage_build_to_usbkey(self):
278        """Downloads host's build to the USB key attached to servo.
279
280        @return: True if build is verified to be on USB key, False otherwise.
281        """
282        info = self._client.host_info_store.get()
283        if info.build:
284            current_build = self._client._servo_host.validate_image_usbkey()
285            if current_build != info.build:
286                logging.debug('Current build on USB: %s differs from test'
287                              ' build: %s, proceed with download.',
288                              current_build, info.build)
289                try:
290                    self._client.stage_build_to_usb(info.build)
291                    return True
292                except error.AutotestError as e:
293                    logging.warn('Stage build to USB failed, tests that require'
294                                 ' test image on Servo USB may fail: {}'.format(e))
295                    return False
296            else:
297                logging.debug('Current build on USB: %s is same as test'
298                              ' build, skip download.', current_build)
299                return True
300        else:
301            logging.warn('Failed to get build label from the DUT, will use'
302                         ' existing image in Servo USB.')
303            return False
304
305    def run_once(self, *args, **dargs):
306        """Delegates testing to a test method.
307
308        test_name is either the 1st positional argument or a named argument.
309
310        test_name will be mapped to a test method as follows:
311        test_name                     method
312        --------------                -----------
313        <TestClass>                   test
314        <TestClass>.<Case>            test_<Case>
315        <TestClass>.<Case>.<SubCase>  test_<Case>_<SubCase>
316
317        Any arguments not consumed by FirmwareTest are passed to the test method.
318
319        @param test_name: Should be set to NAME in the control file.
320
321        @raise TestError: If test_name wasn't found in args, does not start
322                          with test class, or if the method is not found.
323        """
324        self_name = type(self).__name__
325
326        # Parse and remove test name from args.
327        if 'test_name' in dargs:
328            test_name = dargs.pop('test_name')
329        elif len(args) >= 1:
330            test_name = args[0]
331            args = args[1:]
332        else:
333            raise error.TestError('"%s" class must define run_once, or the'
334                                  ' control file must specify "test_name".' %
335                                  self_name)
336
337        # Check that test_name starts with the test class name.
338        name_parts = test_name.split('.')
339
340        test_class = name_parts.pop(0)
341        if test_class != self_name:
342            raise error.TestError('Class "%s" does not match that found in test'
343                                  ' name "%s"' % (self_name, test_class))
344
345        # Construct and call the test method.
346        method_name = '_'.join(['test'] + name_parts)
347        if not hasattr(self, method_name):
348            raise error.TestError('Method "%s" for testing "%s" not found in'
349                                  ' "%s"' % (method_name, test_name, self_name))
350
351        logging.info('Starting test: "%s"', test_name)
352        utils.cherry_pick_call(getattr(self, method_name), *args, **dargs)
353
354    def cleanup(self):
355        """Autotest cleanup function."""
356        # Unset state checker in case it's set by subclass
357        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
358
359        # Capture UART before doing anything else, so we can guarantee we get
360        # some uart results.
361        try:
362            self._record_uart_capture()
363        except:
364            logging.warn('Failed initial uart capture during cleanup')
365
366        try:
367            self.faft_client.system.is_available()
368        except:
369            # Remote is not responding. Revive DUT so that subsequent tests
370            # don't fail.
371            self._restore_routine_from_timeout()
372
373        if hasattr(self, 'switcher'):
374            self.switcher.restore_mode()
375
376        self._restore_ec_write_protect()
377        self._restore_servo_v4_role()
378
379        if hasattr(self, 'faft_client'):
380            self._restore_gbb_flags()
381            self.faft_client.updater.start_daemon()
382            self.faft_client.updater.cleanup()
383            self._remove_faft_lockfile()
384            self._remove_old_faft_lockfile()
385            self._record_faft_client_log()
386            self.faft_client.quit()
387
388        # Capture any new uart output, then discard log messages again.
389        self._cleanup_uart_capture()
390
391        super(FirmwareTest, self).cleanup()
392        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
393
394    def _record_system_info(self):
395        """Record some critical system info to the attr keyval.
396
397        This info is used by generate_test_report later.
398        """
399        system_info = {
400            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
401            'ec_version': self.faft_client.ec.get_version(),
402            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
403            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
404            'servo_host_os_version' : self.servo.get_os_version(),
405            'servod_version': self.servo.get_servod_version(),
406            'os_version': self._client.get_release_builder_path(),
407            'servo_type': self.servo.get_servo_version()
408        }
409
410        # Record the servo v4 and servo micro versions when possible
411        system_info.update(self.servo.get_servo_fw_versions())
412
413        if hasattr(self, 'cr50'):
414            system_info['cr50_version'] = self.cr50.get_full_version()
415
416        logging.info('System info:\n%s', pprint.pformat(system_info))
417        self.write_attr_keyval(system_info)
418
419    def invalidate_firmware_setup(self):
420        """Invalidate all firmware related setup state.
421
422        This method is called when the firmware is re-flashed. It resets all
423        firmware related setup states so that the next test setup properly
424        again.
425        """
426        self.unmark_setup_done('gbb_flags')
427
428    def _retrieve_recovery_reason_from_trap(self):
429        """Try to retrieve the recovery reason from a trapped recovery screen.
430
431        @return: The recovery_reason, 0 if any error.
432        """
433        recovery_reason = 0
434        logging.info('Try to retrieve recovery reason...')
435        if self.servo.get_usbkey_state() == 'dut':
436            self.switcher.bypass_rec_mode()
437        else:
438            self.servo.switch_usbkey('dut')
439
440        try:
441            self.switcher.wait_for_client()
442            lines = self.faft_client.system.run_shell_command_get_output(
443                        'crossystem recovery_reason')
444            recovery_reason = int(lines[0])
445            logging.info('Got the recovery reason %d.', recovery_reason)
446        except ConnectionError:
447            logging.error('Failed to get the recovery reason due to connection '
448                          'error.')
449        return recovery_reason
450
451    def _reset_client(self):
452        """Reset client to a workable state.
453
454        This method is called when the client is not responsive. It may be
455        caused by the following cases:
456          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
457          - corrupted firmware;
458          - corrutped OS image.
459        """
460        # DUT may halt on a firmware screen. Try cold reboot.
461        logging.info('Try cold reboot...')
462        self.switcher.mode_aware_reboot(reboot_type='cold',
463                                        sync_before_boot=False,
464                                        wait_for_dut_up=False)
465        self.switcher.wait_for_client_offline()
466        self.switcher.bypass_dev_mode()
467        try:
468            self.switcher.wait_for_client()
469            return
470        except ConnectionError:
471            logging.warn('Cold reboot doesn\'t help, still connection error.')
472
473        # DUT may be broken by a corrupted firmware. Restore firmware.
474        # We assume the recovery boot still works fine. Since the recovery
475        # code is in RO region and all FAFT tests don't change the RO region
476        # except GBB.
477        if self.is_firmware_saved():
478            self._ensure_client_in_recovery()
479            logging.info('Try restore the original firmware...')
480            if self.is_firmware_changed():
481                try:
482                    self.restore_firmware()
483                    return
484                except ConnectionError:
485                    logging.warn('Restoring firmware doesn\'t help, still '
486                                 'connection error.')
487
488        # Perhaps it's kernel that's broken. Let's try restoring it.
489        if self.is_kernel_saved():
490            self._ensure_client_in_recovery()
491            logging.info('Try restore the original kernel...')
492            if self.is_kernel_changed():
493                try:
494                    self.restore_kernel()
495                    return
496                except ConnectionError:
497                    logging.warn('Restoring kernel doesn\'t help, still '
498                                 'connection error.')
499
500        # DUT may be broken by a corrupted OS image. Restore OS image.
501        self._ensure_client_in_recovery()
502        logging.info('Try restore the OS image...')
503        self.faft_client.system.run_shell_command('chromeos-install --yes')
504        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
505        self.switcher.wait_for_client_offline()
506        self.switcher.bypass_dev_mode()
507        try:
508            self.switcher.wait_for_client()
509            logging.info('Successfully restore OS image.')
510            return
511        except ConnectionError:
512            logging.warn('Restoring OS image doesn\'t help, still connection '
513                         'error.')
514
515    def _ensure_client_in_recovery(self):
516        """Ensure client in recovery boot; reboot into it if necessary.
517
518        @raise TestError: if failed to boot the USB image.
519        """
520        logging.info('Try boot into USB image...')
521        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
522                                     wait_for_dut_up=False)
523        self.servo.switch_usbkey('host')
524        self.switcher.bypass_rec_mode()
525        try:
526            self.switcher.wait_for_client()
527        except ConnectionError:
528            raise error.TestError('Failed to boot the USB image.')
529
530    def _restore_routine_from_timeout(self):
531        """A routine to try to restore the system from a timeout error.
532
533        This method is called when FAFT failed to connect DUT after reboot.
534
535        @raise TestFail: This exception is already raised, with a decription
536                         why it failed.
537        """
538        # DUT is disconnected. Capture the UART output for debug.
539        self._record_uart_capture()
540
541        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
542        # identify if it is a network flaky.
543
544        recovery_reason = self._retrieve_recovery_reason_from_trap()
545
546        # Reset client to a workable state.
547        self._reset_client()
548
549        # Raise the proper TestFail exception.
550        if recovery_reason:
551            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
552                                 'and timed out' % recovery_reason)
553        else:
554            raise error.TestFail('Timed out waiting for DUT reboot')
555
556    def assert_test_image_in_usb_disk(self, usb_dev=None):
557        """Assert an USB disk plugged-in on servo and a test image inside.
558
559        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
560                        If None, it is detected automatically.
561        @raise TestError: if USB disk not detected or not a test image.
562        """
563        if self.check_setup_done('usb_check'):
564            return
565        if usb_dev:
566            assert self.servo.get_usbkey_state() == 'host'
567        else:
568            self.servo.switch_usbkey('host')
569            usb_dev = self.servo.probe_host_usb_dev()
570            if not usb_dev:
571                raise error.TestError(
572                    'An USB disk should be plugged in the servo board. %s' %
573                    telemetry.collect_usb_state(self.servo))
574
575        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
576        logging.info('usb dev is %s', usb_dev)
577        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
578        # After the USB key is muxed from the DUT to the servo host, there
579        # appears to be a delay between when servod can confirm that a sysfs
580        # entry exists for the disk (as done by probe_host_usb_dev) and when
581        # sysfs entries get populated for the disk's partitions.
582        @retry.retry(error.AutoservRunError,
583                     timeout_min=PARTITION_TABLE_READINESS_TIMEOUT,
584                     delay_sec=PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY)
585        def confirm_rootfs_partition_device_node_readable():
586            """Repeatedly poll for the RootFS partition sysfs node."""
587            self.servo.system('ls {}'.format(rootfs))
588
589        try:
590            confirm_rootfs_partition_device_node_readable()
591        except error.AutoservRunError as e:
592            usb_info = telemetry.collect_usb_state(self.servo)
593            raise error.TestError(
594                    ('Could not ls the device node for the RootFS on the USB '
595                     'device. %s: %s\nMore telemetry: %s') %
596                    (type(e).__name__, e, usb_info))
597        try:
598            self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
599        except error.AutoservRunError as e:
600            usb_info = telemetry.collect_usb_state(self.servo)
601            raise error.TestError(
602                ('Could not mount the partition on USB device. %s: %s\n'
603                 'More telemetry: %s') % (type(e).__name__, e, usb_info))
604
605        try:
606            usb_lsb = self.servo.system_output('cat %s' %
607                os.path.join(tmpd, 'etc/lsb-release'))
608            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
609            dut_lsb = '\n'.join(self.faft_client.system.
610                run_shell_command_get_output('cat /etc/lsb-release'))
611            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
612            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
613                raise error.TestError('USB stick in servo is no test image')
614            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
615            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
616            if usb_board != dut_board:
617                raise error.TestError('USB stick in servo contains a %s '
618                    'image, but DUT is a %s' % (usb_board, dut_board))
619        finally:
620            for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd):
621                self.servo.system(cmd)
622
623        self.mark_setup_done('usb_check')
624
625    def setup_pdtester(self, flip_cc=False, dts_mode=False, pd_faft=True,
626                       min_batt_level=None):
627        """Setup the PDTester to a given state.
628
629        @param flip_cc: True to flip CC polarity; False to not flip it.
630        @param dts_mode: True to config PDTester to DTS mode; False to not.
631        @param pd_faft: True to config PD FAFT setup.
632        @param min_batt_level: An int for minimum battery level, or None for
633                               skip.
634        @raise TestError: If Servo v4 not setup properly.
635        """
636
637        # PD FAFT is only tested with a least a servo V4 with servo micro
638        # or C2D2.
639        if pd_faft and (
640                'servo_v4_with_servo_micro' not in self.pdtester.servo_type
641        ) and ('servo_v4_with_c2d2' not in self.pdtester.servo_type):
642            raise error.TestError('servo_v4_with_servo_micro or '
643                                  'servo_v4_with_c2d2 is a mandatory setup '
644                                  'for PD FAFT. Got %s.' %
645                                  self.pdtester.servo_type)
646
647        # Ensure the battery is enough for testing, this should be done before
648        # all the following setup.
649        if (min_batt_level is not None) and self._client.has_battery():
650            logging.info('Start charging if batt level < %d', min_batt_level)
651            PowerUtils.put_host_battery_in_range(self._client, min_batt_level,
652                                                 100, 600)
653
654        # Servo v4 by default has dts_mode enabled. Enabling dts_mode affects
655        # the behaviors of what PD FAFT tests. So we want it disabled.
656        if 'servo_v4' in self.pdtester.servo_type:
657            self.servo.set_dts_mode('on' if dts_mode else 'off')
658        else:
659            logging.warn('Configuring DTS mode only supported on Servo v4')
660
661        self.pdtester.set('usbc_polarity', 'cc2' if flip_cc else 'cc1')
662        # Make it sourcing max voltage.
663        self.pdtester.charge(self.pdtester.USBC_MAX_VOLTAGE)
664
665        time.sleep(self.PD_RESYNC_DELAY)
666
667        # Servo v4 requires an external charger to source power. Make sure
668        # this setup is correct.
669        if 'servo_v4' in self.pdtester.servo_type:
670            role = self.pdtester.get('servo_v4_role')
671            if role != 'src':
672                raise error.TestError(
673                        'Servo v4 is not sourcing power! Make sure the servo '
674                        '"DUT POWER" port is connected to a working charger. '
675                        'servo_v4_role:%s' % role)
676
677    def setup_usbkey(self, usbkey, host=None, used_for_recovery=None):
678        """Setup the USB disk for the test.
679
680        It checks the setup of USB disk and a valid ChromeOS test image inside.
681        It also muxes the USB disk to either the host or DUT by request.
682
683        @param usbkey: True if the USB disk is required for the test, False if
684                       not required.
685        @param host: Optional, True to mux the USB disk to host, False to mux it
686                    to DUT, default to do nothing.
687        @param used_for_recovery: Optional, True if the USB disk is used for
688                                  recovery boot; False if the USB disk is not
689                                  used for recovery boot, like Ctrl-U USB boot.
690        """
691        if usbkey:
692            self.stage_build_to_usbkey()
693            self.assert_test_image_in_usb_disk()
694        elif host is None:
695            # USB disk is not required for the test. Better to mux it to host.
696            host = True
697
698        if host is True:
699            self.servo.switch_usbkey('host')
700        elif host is False:
701            self.servo.switch_usbkey('dut')
702
703        if used_for_recovery is None:
704            # Default value is True if usbkey == True.
705            # As the common usecase of USB disk is for recovery boot. Tests
706            # can define it explicitly if not.
707            used_for_recovery = usbkey
708
709        if used_for_recovery:
710            # In recovery boot, the locked EC RO doesn't support PD for most
711            # of the CrOS devices. The default servo v4 power role is a SRC.
712            # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the
713            # data role swap from UFP to DFP; as a result, DUT can't see the
714            # USB disk and the Ethernet dongle on servo v4.
715            #
716            # This is a workaround to set servo v4 as a SNK, for every FAFT
717            # test which boots into the USB disk in the recovery mode.
718            #
719            # TODO(waihong): Add a check to see if the battery level is too
720            # low and sleep for a while for charging.
721            self.set_servo_v4_role_to_snk()
722
723    def set_servo_v4_role_to_snk(self, pd_comm=False):
724        """Set the servo v4 role to SNK.
725
726        @param pd_comm: a bool. Enable PD communication if True, else otherwise
727        """
728        self._needed_restore_servo_v4_role = True
729        self.servo.set_servo_v4_role('snk')
730        if pd_comm:
731            self.servo.set_servo_v4_pd_comm('on')
732
733    def _restore_servo_v4_role(self):
734        """Restore the servo v4 role to default SRC."""
735        if not hasattr(self, '_needed_restore_servo_v4_role'):
736            return
737        if self._needed_restore_servo_v4_role:
738            self.servo.set_servo_v4_role('src')
739
740    def set_dut_low_power_idle_delay(self, delay):
741        """Set EC low power idle delay
742
743        @param delay: Delay in seconds
744        """
745        if not self.ec.has_command('dsleep'):
746            logging.info("Can't set low power idle delay.")
747            return
748        self._previous_ec_low_power_delay = int(
749                self.ec.send_command_get_output("dsleep",
750                ["timeout:\s+(\d+)\ssec"])[0][1])
751        self.ec.send_command("dsleep " + str(delay))
752
753    def restore_dut_low_power_idle_delay(self):
754        """Restore EC low power idle delay"""
755        if getattr(self, '_previous_ec_low_power_delay', None):
756            self.ec.send_command("dsleep " + str(
757                    self._previous_ec_low_power_delay))
758
759    def get_usbdisk_path_on_dut(self):
760        """Get the path of the USB disk device plugged-in the servo on DUT.
761
762        Returns:
763          A string representing USB disk path, like '/dev/sdb', or None if
764          no USB disk is found.
765        """
766        cmd = 'ls -d /dev/s*[a-z]'
767        original_value = self.servo.get_usbkey_state()
768
769        # Make the dut unable to see the USB disk.
770        self.servo.switch_usbkey('off')
771        time.sleep(self.faft_config.usb_unplug)
772        no_usb_set = set(
773            self.faft_client.system.run_shell_command_get_output(cmd))
774
775        # Make the dut able to see the USB disk.
776        self.servo.switch_usbkey('dut')
777        time.sleep(self.faft_config.usb_plug)
778        has_usb_set = set(
779            self.faft_client.system.run_shell_command_get_output(cmd))
780
781        # Back to its original value.
782        if original_value != self.servo.get_usbkey_state():
783            self.servo.switch_usbkey(original_value)
784
785        diff_set = has_usb_set - no_usb_set
786        if len(diff_set) == 1:
787            return diff_set.pop()
788        else:
789            return None
790
791    def _create_faft_lockfile(self):
792        """Creates the FAFT lockfile."""
793        logging.info('Creating FAFT lockfile...')
794        command = 'touch %s' % (self.lockfile)
795        self.faft_client.system.run_shell_command(command)
796
797    def _create_old_faft_lockfile(self):
798        """
799        Creates the FAFT lockfile in its legacy location.
800
801        TODO (once M83 is stable, approx. June 9 2020):
802        Delete this function, as platform/installer/chromeos-setgoodkernel
803        will look for the lockfile in the new location
804        (/usr/local/tmp/faft/lock)
805        """
806        logging.info('Creating legacy FAFT lockfile...')
807        self.faft_client.system.run_shell_command('mkdir -p /var/tmp/faft')
808        self.faft_client.system.run_shell_command('touch /var/tmp/faft/lock')
809
810    def _remove_faft_lockfile(self):
811        """Removes the FAFT lockfile."""
812        logging.info('Removing FAFT lockfile...')
813        command = 'rm -f %s' % (self.lockfile)
814        self.faft_client.system.run_shell_command(command)
815
816    def _remove_old_faft_lockfile(self):
817        """
818        Removes the FAFT lockfile from its legacy location.
819
820        TODO (once M83 is stable, approx. June 9 2020):
821        Delete this function, as platform/installer/chromeos-setgoodkernel
822        will look for the lockfile in the new location
823        (/usr/local/tmp/faft/lock)
824        """
825        logging.info('Removing legacy FAFT lockfile...')
826        self.faft_client.system.run_shell_command('rm -rf /var/tmp/faft')
827
828    def clear_set_gbb_flags(self, clear_mask, set_mask):
829        """Clear and set the GBB flags in the current flashrom.
830
831        @param clear_mask: A mask of flags to be cleared.
832        @param set_mask: A mask of flags to be set.
833        """
834        gbb_flags = self.faft_client.bios.get_gbb_flags()
835        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
836        self.gbb_flags = new_flags
837        if new_flags != gbb_flags:
838            self._backup_gbb_flags = gbb_flags
839            logging.info('Changing GBB flags from 0x%x to 0x%x.',
840                         gbb_flags, new_flags)
841            self.faft_client.bios.set_gbb_flags(new_flags)
842            # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
843            # reboot to get a clear state
844            if ((gbb_flags ^ new_flags) &
845                (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
846                 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
847                self.switcher.mode_aware_reboot()
848        else:
849            logging.info('Current GBB flags look good for test: 0x%x.',
850                         gbb_flags)
851
852
853    def _check_capability(self, target, required_cap, suppress_warning):
854        """Check if current platform has required capabilities for the target.
855
856        @param required_cap: A list containing required capabilities.
857        @param suppress_warning: True to suppress any warning messages.
858        @return: True if requirements are met. Otherwise, False.
859        """
860        if not required_cap:
861            return True
862
863        if target not in ['ec', 'cr50']:
864            raise error.TestError('Invalid capability target %r' % target)
865
866        for cap in required_cap:
867            if cap not in getattr(self.faft_config, target + '_capability'):
868                if not suppress_warning:
869                    logging.warn('Requires %s capability "%s" to run this '
870                                 'test.', target, cap)
871                return False
872
873        return True
874
875
876    def check_ec_capability(self, required_cap=None, suppress_warning=False):
877        """Check if current platform has required EC capabilities.
878
879        @param required_cap: A list containing required EC capabilities. Pass in
880                             None to only check for presence of Chrome EC.
881        @param suppress_warning: True to suppress any warning messages.
882        @return: True if requirements are met. Otherwise, False.
883        """
884        if not self.faft_config.chrome_ec:
885            if not suppress_warning:
886                logging.warn('Requires Chrome EC to run this test.')
887            return False
888        return self._check_capability('ec', required_cap, suppress_warning)
889
890
891    def check_cr50_capability(self, required_cap=None, suppress_warning=False):
892        """Check if current platform has required Cr50 capabilities.
893
894        @param required_cap: A list containing required Cr50 capabilities. Pass
895                             in None to only check for presence of cr50 uart.
896        @param suppress_warning: True to suppress any warning messages.
897        @return: True if requirements are met. Otherwise, False.
898        """
899        if not hasattr(self, 'cr50'):
900            if not suppress_warning:
901                logging.warn('Requires Chrome Cr50 to run this test.')
902            return False
903        return self._check_capability('cr50', required_cap, suppress_warning)
904
905
906    def check_root_part_on_non_recovery(self, part):
907        """Check the partition number of root device and on normal/dev boot.
908
909        @param part: A string of partition number, e.g.'3'.
910        @return: True if the root device matched and on normal/dev boot;
911                 otherwise, False.
912        """
913        return self.checkers.root_part_checker(part) and \
914                self.checkers.crossystem_checker({
915                    'mainfw_type': ('normal', 'developer'),
916                })
917
918    def _join_part(self, dev, part):
919        """Return a concatenated string of device and partition number.
920
921        @param dev: A string of device, e.g.'/dev/sda'.
922        @param part: A string of partition number, e.g.'3'.
923        @return: A concatenated string of device and partition number,
924                 e.g.'/dev/sda3'.
925
926        >>> seq = FirmwareTest()
927        >>> seq._join_part('/dev/sda', '3')
928        '/dev/sda3'
929        >>> seq._join_part('/dev/mmcblk0', '2')
930        '/dev/mmcblk0p2'
931        """
932        if 'mmcblk' in dev:
933            return dev + 'p' + part
934        elif 'nvme' in dev:
935            return dev + 'p' + part
936        else:
937            return dev + part
938
939    def copy_kernel_and_rootfs(self, from_part, to_part):
940        """Copy kernel and rootfs from from_part to to_part.
941
942        @param from_part: A string of partition number to be copied from.
943        @param to_part: A string of partition number to be copied to.
944        """
945        root_dev = self.faft_client.system.get_root_dev()
946        logging.info('Copying kernel from %s to %s. Please wait...',
947                     from_part, to_part)
948        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
949                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
950                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
951        logging.info('Copying rootfs from %s to %s. Please wait...',
952                     from_part, to_part)
953        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
954                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
955                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
956
957    def ensure_kernel_boot(self, part):
958        """Ensure the request kernel boot.
959
960        If not, it duplicates the current kernel to the requested kernel
961        and sets the requested higher priority to ensure it boot.
962
963        @param part: A string of kernel partition number or 'a'/'b'.
964        """
965        if not self.checkers.root_part_checker(part):
966            if self.faft_client.kernel.diff_a_b():
967                self.copy_kernel_and_rootfs(
968                        from_part=self.OTHER_KERNEL_MAP[part],
969                        to_part=part)
970            self.reset_and_prioritize_kernel(part)
971            self.switcher.mode_aware_reboot()
972
973    def ensure_dev_internal_boot(self, original_dev_boot_usb):
974        """Ensure internal device boot in developer mode.
975
976        If not internal device boot, it will try to reboot the device and
977        bypass dev mode to boot into internal device.
978
979        @param original_dev_boot_usb: Original dev_boot_usb value.
980        """
981        logging.info('Checking internal device boot.')
982        self.faft_client.system.set_dev_default_boot()
983        if self.faft_client.system.is_removable_device_boot():
984            logging.info('Reboot into internal disk...')
985            self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
986            self.switcher.mode_aware_reboot()
987        self.check_state((self.checkers.dev_boot_usb_checker, False,
988                          'Device not booted from internal disk properly.'))
989
990    def set_hardware_write_protect(self, enable):
991        """Set hardware write protect pin.
992
993        @param enable: True if asserting write protect pin. Otherwise, False.
994        """
995        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
996
997    def set_ap_write_protect_and_reboot(self, enable):
998        """Set AP write protect status and reboot to take effect.
999
1000        @param enable: True if asserting write protect. Otherwise, False.
1001        """
1002        self.set_hardware_write_protect(enable)
1003        if hasattr(self, 'ec'):
1004            self.sync_and_ec_reboot()
1005            self.switcher.wait_for_client()
1006
1007    def run_chromeos_firmwareupdate(self, mode, append=None, options=(),
1008            ignore_status=False):
1009        """Use RPC to get the command to run, but do the actual run via ssh.
1010
1011        Running the command via SSH improves the reliability in cases where the
1012        USB network connection gets interrupted.  SSH will still return the
1013        output, and won't hang like RPC would.
1014        """
1015        update_cmd = self.faft_client.updater.get_firmwareupdate_command(
1016                mode, append, options)
1017        try:
1018            result = self._client.run(
1019                    update_cmd, timeout=300, ignore_status=ignore_status)
1020            if result.exit_status == 255:
1021                self.faft_client.disconnect()
1022            return result
1023        except error.AutoservRunError as e:
1024            if e.result_obj.exit_status == 255:
1025                self.faft_client.disconnect()
1026            if ignore_status:
1027                return e.result_obj
1028            raise
1029
1030    def set_ec_write_protect_and_reboot(self, enable):
1031        """Set EC write protect status and reboot to take effect.
1032
1033        The write protect state is only activated if both hardware write
1034        protect pin is asserted and software write protect flag is set.
1035        This method asserts/deasserts hardware write protect pin first, and
1036        set corresponding EC software write protect flag.
1037
1038        If the device uses non-Chrome EC, set the software write protect via
1039        flashrom.
1040
1041        If the device uses Chrome EC, a reboot is required for write protect
1042        to take effect. Since the software write protect flag cannot be unset
1043        if hardware write protect pin is asserted, we need to deasserted the
1044        pin first if we are deactivating write protect. Similarly, a reboot
1045        is required before we can modify the software flag.
1046
1047        @param enable: True if activating EC write protect. Otherwise, False.
1048        """
1049        self.set_hardware_write_protect(enable)
1050        if self.faft_config.chrome_ec:
1051            self.set_chrome_ec_write_protect_and_reboot(enable)
1052        else:
1053            self.faft_client.ec.set_write_protect(enable)
1054            self.switcher.mode_aware_reboot()
1055
1056    def set_chrome_ec_write_protect_and_reboot(self, enable):
1057        """Set Chrome EC write protect status and reboot to take effect.
1058
1059        @param enable: True if activating EC write protect. Otherwise, False.
1060        """
1061        if enable:
1062            # Set write protect flag and reboot to take effect.
1063            self.ec.set_flash_write_protect(enable)
1064            self.sync_and_ec_reboot(
1065                    flags='hard',
1066                    extra_sleep=self.faft_config.ec_boot_to_wp_en)
1067        else:
1068            # Reboot after deasserting hardware write protect pin to deactivate
1069            # write protect. And then remove software write protect flag.
1070            # Some ITE ECs can only clear their WP status on a power-on reset,
1071            # no software-initiated reset will do.
1072            self.sync_and_ec_reboot(flags='cold')
1073            self.ec.set_flash_write_protect(enable)
1074
1075    def _setup_ec_write_protect(self, ec_wp):
1076        """Setup for EC write-protection.
1077
1078        It makes sure the EC in the requested write-protection state. If not, it
1079        flips the state. Flipping the write-protection requires DUT reboot.
1080
1081        @param ec_wp: True to request EC write-protected; False to request EC
1082                      not write-protected; None to do nothing.
1083        """
1084        if ec_wp is None:
1085            return
1086        self._old_wpsw_cur = self.checkers.crossystem_checker(
1087                                    {'wpsw_cur': '1'}, suppress_logging=True)
1088        if ec_wp != self._old_wpsw_cur:
1089            if not self.faft_config.ap_access_ec_flash:
1090                raise error.TestNAError(
1091                        "Cannot change EC write-protect for this device")
1092
1093            logging.info('The test required EC is %swrite-protected. Reboot '
1094                         'and flip the state.', '' if ec_wp else 'not ')
1095            self.switcher.mode_aware_reboot(
1096                    'custom',
1097                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
1098        wpsw_cur = '1' if ec_wp else '0'
1099        self.check_state((self.checkers.crossystem_checker, {
1100                               'wpsw_cur': wpsw_cur}))
1101
1102    def _restore_ec_write_protect(self):
1103        """Restore the original EC write-protection."""
1104        if (not hasattr(self, '_old_wpsw_cur')) or (self._old_wpsw_cur is
1105                                                    None):
1106            return
1107        if not self.checkers.crossystem_checker({'wpsw_cur': '1' if
1108                       self._old_wpsw_cur else '0'}, suppress_logging=True):
1109            logging.info('Restore original EC write protection and reboot.')
1110            self.switcher.mode_aware_reboot(
1111                    'custom',
1112                    lambda:self.set_ec_write_protect_and_reboot(
1113                            self._old_wpsw_cur))
1114        self.check_state((self.checkers.crossystem_checker, {
1115                          'wpsw_cur': '1' if self._old_wpsw_cur else '0'}))
1116
1117    def _record_uart_capture(self):
1118        """Record the CPU/EC/PD UART output stream to files."""
1119        self.servo.record_uart_capture(self.resultsdir)
1120
1121    def _cleanup_uart_capture(self):
1122        """Cleanup the CPU/EC/PD UART capture."""
1123        self.servo.close(self.resultsdir)
1124
1125    def set_ap_off_power_mode(self, power_mode):
1126        """
1127        Set the DUT power mode to suspend (S0ix/S3) or shutdown (G3/S5).
1128        The DUT must be in S0 when calling this method.
1129
1130        @param power_mode: a string for the expected power mode, either
1131                           'suspend' or 'shutdown'.
1132        """
1133        if power_mode == 'suspend':
1134            target_power_state = self.POWER_STATE_SUSPEND
1135        elif power_mode == 'shutdown':
1136            target_power_state = self.POWER_STATE_G3
1137        else:
1138            raise error.TestError('%s is not a valid ap-off power mode.' %
1139                                  power_mode)
1140
1141        if self.get_power_state() != self.POWER_STATE_S0:
1142            raise error.TestError('The DUT is not in S0.')
1143
1144        self._restore_power_mode = True
1145
1146        if target_power_state == self.POWER_STATE_G3:
1147            self.run_shutdown_cmd()
1148            time.sleep(self.faft_config.shutdown)
1149        elif target_power_state == self.POWER_STATE_SUSPEND:
1150            self.suspend()
1151
1152        if self.wait_power_state(target_power_state, self.DEFAULT_PWR_RETRIES):
1153            logging.info('System entered %s state.', target_power_state)
1154        else:
1155            self._restore_power_mode = False
1156            raise error.TestFail('System fail to enter %s state. '
1157                    'Current state: %s', target_power_state,
1158                    self.get_power_state())
1159
1160    def restore_ap_on_power_mode(self):
1161        """
1162        Wake up the DUT to S0. If the DUT was not set to suspend or
1163        shutdown mode by set_ap_off_power_mode(), raise an error.
1164        """
1165        if self.get_power_state() != self.POWER_STATE_S0:
1166            logging.info('Wake up the DUT to S0.')
1167            self.servo.power_normal_press()
1168            # If the DUT is ping-able, it must be in S0.
1169            self.switcher.wait_for_client()
1170            if self._restore_power_mode != True:
1171                raise error.TestFail('The DUT was not set to suspend/shutdown '
1172                        'mode by set_ap_off_power_mode().')
1173            self._restore_power_mode = False
1174
1175    def get_power_state(self):
1176        """
1177        Return the current power state of the AP (via EC 'powerinfo' command)
1178
1179        @return the name of the power state, or None if a problem occurred
1180        """
1181        if not hasattr(self, 'ec'):
1182            # Don't fail when EC not present or not fully initialized
1183            return None
1184
1185        pattern = r'power state (\w+) = (\w+),'
1186
1187        try:
1188            match = self.ec.send_command_get_output("powerinfo", [pattern])
1189        except error.TestFail as err:
1190            logging.warn("powerinfo command encountered an error: %s", err)
1191            return None
1192        if not match:
1193            logging.warn("powerinfo output did not match pattern: %r", pattern)
1194            return None
1195        (line, state_num, state_name) = match[0]
1196        logging.debug("power state info %r", match)
1197        return state_name
1198
1199    def _check_power_state(self, power_state):
1200        """
1201        Check for correct power state of the AP (via EC 'powerinfo' command)
1202
1203        @return: the line and the match, if the output matched.
1204        @raise error.TestFail: if output didn't match after the delay.
1205        """
1206        if not isinstance(power_state, str):
1207            raise error.TestError('%s is not a string while it should be.' %
1208                                  power_state)
1209        return self.ec.send_command_get_output("powerinfo",
1210            ['\\b' + power_state + '\\b'])
1211
1212    def wait_power_state(self, power_state, retries, retry_delay=0):
1213        """
1214        Wait for certain power state.
1215
1216        @param power_state: power state you are expecting
1217        @param retries: retries.  This is necessary if AP is powering down
1218        and transitioning through different states.
1219        @param retry_delay: delay between retries in seconds
1220        """
1221        logging.info('Checking power state "%s" maximum %d times.',
1222                     power_state, retries)
1223
1224        # Reset the cache, in case previous calls silently changed it on servod
1225        self.ec.set_uart_regexp('None')
1226
1227        while retries > 0:
1228            logging.info("try count: %d", retries)
1229            start_time = time.time()
1230            try:
1231                retries = retries - 1
1232                if self._check_power_state(power_state):
1233                    return True
1234            except error.TestFail:
1235                pass
1236            delay_time = retry_delay - time.time() + start_time
1237            if delay_time > 0:
1238                time.sleep(delay_time)
1239        return False
1240
1241    def run_shutdown_cmd(self):
1242        """Shut down the DUT by running '/sbin/shutdown -P now'."""
1243        self.faft_client.disconnect()
1244        # Shut down in the background after sleeping so the call gets a reply.
1245        try:
1246            self._client.run_background('sleep 0.5; /sbin/shutdown -P now')
1247        except error.AutoservRunError as e:
1248            # From the ssh man page, error code 255 indicates ssh errors.
1249            if e.result_obj.exit_status == 255:
1250                logging.warn("Ignoring error from ssh: %s", e)
1251            else:
1252                raise
1253        self.switcher.wait_for_client_offline()
1254
1255    def suspend(self):
1256        """Suspends the DUT."""
1257        cmd = 'sleep %d; powerd_dbus_suspend' % self.EC_SUSPEND_DELAY
1258        block = False
1259        self.faft_client.system.run_shell_command(cmd, block)
1260        time.sleep(self.EC_SUSPEND_DELAY)
1261
1262    def _record_faft_client_log(self):
1263        """Record the faft client log to the results directory."""
1264        client_log = self.faft_client.system.dump_log(True)
1265        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
1266        with open(client_log_file, 'w') as f:
1267            f.write(client_log)
1268
1269    def _setup_gbb_flags(self):
1270        """Setup the GBB flags for FAFT test."""
1271        if self.check_setup_done('gbb_flags'):
1272            return
1273
1274        logging.info('Set proper GBB flags for test.')
1275        # Ensure that GBB flags are set to 0x140.
1276        flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE |
1277                        vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM)
1278        # And if the "no_ec_sync" argument is set, then disable EC software
1279        # sync.
1280        if self._no_ec_sync:
1281            logging.info(
1282                    'User selected to disable EC software sync')
1283            flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC
1284
1285        # And if the "no_fw_rollback_check" argument is set, then disable fw
1286        # rollback check.
1287        if self._no_fw_rollback_check:
1288            logging.info(
1289                    'User selected to disable FW rollback check')
1290            flags_to_set |= vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK
1291
1292        self.clear_set_gbb_flags(0xffffffff, flags_to_set)
1293        self.mark_setup_done('gbb_flags')
1294
1295    def _restore_gbb_flags(self):
1296        """Restore GBB flags to their original state."""
1297        if self._backup_gbb_flags is None:
1298            return
1299        # Setting up and restoring the GBB flags take a lot of time. For
1300        # speed-up purpose, don't restore it.
1301        logging.info('***')
1302        logging.info('*** Please manually restore the original GBB flags to: '
1303                     '0x%x ***', self._backup_gbb_flags)
1304        logging.info('***')
1305        self.unmark_setup_done('gbb_flags')
1306
1307    def setup_tried_fwb(self, tried_fwb):
1308        """Setup for fw B tried state.
1309
1310        It makes sure the system in the requested fw B tried state. If not, it
1311        tries to do so.
1312
1313        @param tried_fwb: True if requested in tried_fwb=1;
1314                          False if tried_fwb=0.
1315        """
1316        if tried_fwb:
1317            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
1318                logging.info(
1319                    'Firmware is not booted with tried_fwb. Reboot into it.')
1320                self.faft_client.system.set_try_fw_b()
1321        else:
1322            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
1323                logging.info(
1324                    'Firmware is booted with tried_fwb. Reboot to clear.')
1325
1326    def power_on(self):
1327        """Switch DUT AC power on."""
1328        self._client.power_on(self.power_control)
1329
1330    def power_off(self):
1331        """Switch DUT AC power off."""
1332        self._client.power_off(self.power_control)
1333
1334    def power_cycle(self):
1335        """Power cycle DUT AC power."""
1336        self._client.power_cycle(self.power_control)
1337
1338    def setup_rw_boot(self, section='a'):
1339        """Make sure firmware is in RW-boot mode.
1340
1341        If the given firmware section is in RO-boot mode, turn off the RO-boot
1342        flag and reboot DUT into RW-boot mode.
1343
1344        @param section: A firmware section, either 'a' or 'b'.
1345        """
1346        flags = self.faft_client.bios.get_preamble_flags(section)
1347        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
1348            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
1349            self.faft_client.bios.set_preamble_flags(section, flags)
1350            self.switcher.mode_aware_reboot()
1351
1352    def setup_kernel(self, part):
1353        """Setup for kernel test.
1354
1355        It makes sure both kernel A and B bootable and the current boot is
1356        the requested kernel part.
1357
1358        @param part: A string of kernel partition number or 'a'/'b'.
1359        """
1360        self.ensure_kernel_boot(part)
1361        logging.info('Checking the integrity of kernel B and rootfs B...')
1362        if (self.faft_client.kernel.diff_a_b() or
1363                not self.faft_client.rootfs.verify_rootfs('B')):
1364            logging.info('Copying kernel and rootfs from A to B...')
1365            self.copy_kernel_and_rootfs(from_part=part,
1366                                        to_part=self.OTHER_KERNEL_MAP[part])
1367        self.reset_and_prioritize_kernel(part)
1368
1369    def reset_and_prioritize_kernel(self, part):
1370        """Make the requested partition highest priority.
1371
1372        This function also reset kerenl A and B to bootable.
1373
1374        @param part: A string of partition number to be prioritized.
1375        """
1376        root_dev = self.faft_client.system.get_root_dev()
1377        # Reset kernel A and B to bootable.
1378        self.faft_client.system.run_shell_command(
1379            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
1380        self.faft_client.system.run_shell_command(
1381            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
1382        # Set kernel part highest priority.
1383        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
1384                (self.KERNEL_MAP[part], root_dev))
1385
1386    def do_blocking_sync(self, device):
1387        """Run a blocking sync command."""
1388        logging.info("Blocking sync for %s", device)
1389
1390        if 'mmcblk' in device:
1391            # For mmc devices, use `mmc status get` command to send an
1392            # empty command to wait for the disk to be available again.
1393            self.faft_client.system.run_shell_command('mmc status get %s' %
1394                                                      device)
1395        elif 'nvme' in device:
1396            # For NVMe devices, use `nvme flush` command to commit data
1397            # and metadata to non-volatile media.
1398
1399            # Get a list of NVMe namespaces, and flush them individually.
1400            # The output is assumed to be in the following format:
1401            # [ 0]:0x1
1402            # [ 1]:0x2
1403            list_ns_cmd = "nvme list-ns %s" % device
1404            available_ns = self.faft_client.system.run_shell_command_get_output(
1405                list_ns_cmd)
1406
1407            if not available_ns:
1408                raise error.TestError(
1409                    "Listing namespaces failed (empty output): %s"
1410                    % list_ns_cmd)
1411
1412            for ns in available_ns:
1413                ns = ns.split(':')[-1]
1414                flush_cmd = 'nvme flush %s -n %s' % (device, ns)
1415                flush_rc = self.faft_client.system.run_shell_command_get_status(
1416                    flush_cmd)
1417                if flush_rc != 0:
1418                    raise error.TestError(
1419                        "Flushing namespace %s failed (rc=%s): %s"
1420                        % (ns, flush_rc, flush_cmd))
1421        else:
1422            # For other devices, hdparm sends TUR to check if
1423            # a device is ready for transfer operation.
1424            self.faft_client.system.run_shell_command('hdparm -f %s' % device)
1425
1426    def blocking_sync(self, freeze_for_reset=False):
1427        """Sync root device and internal device, via script if possible.
1428
1429        The actual calls end up logged by the run() call, since they're printed
1430        to stdout/stderr in the script.
1431
1432        @param freeze_for_reset: if True, prepare for reset by blocking writes
1433                                 (only if enable_fs_sync_fsfreeze=True)
1434        """
1435
1436        if self._use_sync_script:
1437            if freeze_for_reset:
1438                self.faft_client.quit()
1439            try:
1440                return self._client.blocking_sync(freeze_for_reset)
1441            except (AttributeError, ImportError, error.AutoservRunError) as e:
1442                logging.warn(
1443                        'Falling back to old sync method due to error: %s', e)
1444
1445        # The double calls to sync fakes a blocking call
1446        # since the first call returns before the flush
1447        # is complete, but the second will wait for the
1448        # first to finish.
1449        self.faft_client.system.run_shell_command('sync')
1450        self.faft_client.system.run_shell_command('sync')
1451
1452        # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
1453        # This function will perform a device-specific sync command.
1454        root_dev = self.faft_client.system.get_root_dev()
1455        self.do_blocking_sync(root_dev)
1456
1457        # Also sync the internal device if booted from removable media.
1458        if self.faft_client.system.is_removable_device_boot():
1459            internal_dev = self.faft_client.system.get_internal_device()
1460            self.do_blocking_sync(internal_dev)
1461
1462    def sync_and_ec_reboot(self, flags='', extra_sleep=0):
1463        """Request the client sync and do a EC triggered reboot.
1464
1465        @param flags: Optional, a space-separated string of flags passed to EC
1466                      reboot command, including:
1467                          default: EC soft reboot;
1468                          'hard': EC hard reboot.
1469                          'cold': Cold reboot via servo.
1470        @param extra_sleep: Optional, int or float for extra wait time for EC
1471                            reboot in seconds.
1472        """
1473        self.blocking_sync(freeze_for_reset=True)
1474        if flags == 'cold':
1475            self.servo.get_power_state_controller().reset()
1476        else:
1477            self.ec.reboot(flags)
1478        time.sleep(self.faft_config.ec_boot_to_console + extra_sleep)
1479        self.check_lid_and_power_on()
1480
1481    def reboot_and_reset_tpm(self):
1482        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
1483        self.switcher.reboot_to_mode(to_mode='rec')
1484        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
1485        self.switcher.mode_aware_reboot()
1486
1487    def full_power_off_and_on(self):
1488        """Shutdown the device by pressing power button and power on again."""
1489        boot_id = self.get_bootid()
1490        self.faft_client.disconnect()
1491
1492        # Press power button to trigger Chrome OS normal shutdown process.
1493        # We use a customized delay since the normal-press 1.2s is not enough.
1494        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
1495        # device can take 44-51 seconds to restart,
1496        # add buffer from the default timeout of 60 seconds.
1497        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
1498        time.sleep(self.faft_config.shutdown)
1499        if self.faft_config.chrome_ec:
1500            self.check_shutdown_power_state(self.POWER_STATE_G3,
1501                                            orig_boot_id=boot_id)
1502        # Short press power button to boot DUT again.
1503        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1504
1505    def check_shutdown_power_state(self, power_state,
1506                                   pwr_retries=DEFAULT_PWR_RETRIES,
1507                                   orig_boot_id=None):
1508        """Check whether the device shut down and entered the given power state.
1509
1510        If orig_boot_id is specified, it will check whether the DUT responds to
1511        ssh requests, then use orig_boot_id to check if it rebooted.
1512
1513        @param power_state: EC power state has to be checked. Either S5 or G3.
1514        @param pwr_retries: Times to check if the DUT in expected power state.
1515        @param orig_boot_id: Old boot_id, to check for unexpected reboots.
1516        @raise TestFail: If device failed to enter into requested power state.
1517        """
1518        if not self.wait_power_state(power_state, pwr_retries):
1519            current_state = self.get_power_state()
1520            if current_state == self.POWER_STATE_S0 and self._client.wait_up():
1521                # DUT is unexpectedly up, so check whether it rebooted instead.
1522                new_boot_id = self.get_bootid()
1523                logging.debug('orig_boot_id=%s, new_boot_id=%s',
1524                              orig_boot_id, new_boot_id)
1525                if orig_boot_id is None or new_boot_id is None:
1526                    # Can't say anything more specific without values to compare
1527                    raise error.TestFail(
1528                            "Expected state %s, but the system is unexpectedly"
1529                            " still up.  Current state: %s"
1530                            % (power_state, current_state))
1531                if new_boot_id == orig_boot_id:
1532                    raise error.TestFail(
1533                            "Expected state %s, but the system didn't shut"
1534                            " down.  Current state: %s"
1535                            % (power_state, current_state))
1536                else:
1537                    raise error.TestFail(
1538                            "Expected state %s, but the system rebooted instead"
1539                            " of shutting down.  Current state: %s"
1540                            % (power_state, current_state))
1541
1542            if current_state is None:
1543                current_state = '(unknown)'
1544
1545            if current_state == power_state:
1546                raise error.TestFail(
1547                        "Expected state %s, but the system didn't reach it"
1548                        " until after the limit of %s tries."
1549                        % (power_state, pwr_retries))
1550
1551            raise error.TestFail('System not shutdown properly and EC fails'
1552                                 ' to enter into %s state.  Current state: %s'
1553                                 % (power_state, current_state))
1554        logging.info('System entered into %s state..', power_state)
1555
1556    def check_lid_and_power_on(self):
1557        """
1558        On devices with EC software sync, system powers on after EC reboots if
1559        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
1560        This method checks lid switch state and presses power button if
1561        necessary.
1562        """
1563        if self.servo.get("lid_open") == "no":
1564            time.sleep(self.faft_config.software_sync)
1565            self.servo.power_short_press()
1566
1567    def stop_powerd(self):
1568        """Stop the powerd daemon on the AP.
1569
1570        This will cause the AP to ignore power button presses sent by the EC.
1571        """
1572        powerd_running = self.faft_client.system.run_shell_command_check_output(
1573                'status powerd', 'start/running')
1574        if powerd_running:
1575            logging.debug('Stopping powerd')
1576            self.faft_client.system.run_shell_command("stop powerd")
1577
1578    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
1579        """Modify the kernel header magic in USB stick.
1580
1581        The kernel header magic is the first 8-byte of kernel partition.
1582        We modify it to make it fail on kernel verification check.
1583
1584        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1585        @param from_magic: A string of magic which we change it from.
1586        @param to_magic: A string of magic which we change it to.
1587        @raise TestError: if failed to change magic.
1588        """
1589        assert len(from_magic) == 8
1590        assert len(to_magic) == 8
1591        # USB image only contains one kernel.
1592        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1593        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1594        current_magic = self.servo.system_output(read_cmd)
1595        if current_magic == to_magic:
1596            logging.info("The kernel magic is already %s.", current_magic)
1597            return
1598        if current_magic != from_magic:
1599            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1600
1601        logging.info('Modify the kernel magic in USB, from %s to %s.',
1602                     from_magic, to_magic)
1603        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1604                     " 2>/dev/null" % (to_magic, kernel_part))
1605        self.servo.system(write_cmd)
1606
1607        if self.servo.system_output(read_cmd) != to_magic:
1608            raise error.TestError("Failed to write new magic.")
1609
1610    def corrupt_usb_kernel(self, usb_dev):
1611        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1612
1613        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1614        """
1615        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1616                                self.CORRUPTED_MAGIC)
1617
1618    def restore_usb_kernel(self, usb_dev):
1619        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1620
1621        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1622        """
1623        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1624                                self.CHROMEOS_MAGIC)
1625
1626    def _call_action(self, action_tuple, check_status=False):
1627        """Call the action function with/without arguments.
1628
1629        @param action_tuple: A function, or a tuple (function, args, error_msg),
1630                             in which, args and error_msg are optional. args is
1631                             either a value or a tuple if multiple arguments.
1632                             This can also be a list containing multiple
1633                             function or tuple. In this case, these actions are
1634                             called in sequence.
1635        @param check_status: Check the return value of action function. If not
1636                             succeed, raises a TestFail exception.
1637        @return: The result value of the action function.
1638        @raise TestError: An error when the action function is not callable.
1639        @raise TestFail: When check_status=True, action function not succeed.
1640        """
1641        if isinstance(action_tuple, list):
1642            return all([self._call_action(action, check_status=check_status)
1643                        for action in action_tuple])
1644
1645        action = action_tuple
1646        args = ()
1647        error_msg = 'Not succeed'
1648        if isinstance(action_tuple, tuple):
1649            action = action_tuple[0]
1650            if len(action_tuple) >= 2:
1651                args = action_tuple[1]
1652                if not isinstance(args, tuple):
1653                    args = (args,)
1654            if len(action_tuple) >= 3:
1655                error_msg = action_tuple[2]
1656
1657        if action is None:
1658            return
1659
1660        if not callable(action):
1661            raise error.TestError('action is not callable!')
1662
1663        info_msg = 'calling %s' % action.__name__
1664        if args:
1665            info_msg += ' with args %s' % str(args)
1666        logging.info(info_msg)
1667        ret = action(*args)
1668
1669        if check_status and not ret:
1670            raise error.TestFail('%s: %s returning %s' %
1671                                 (error_msg, info_msg, str(ret)))
1672        return ret
1673
1674    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1675                             run_power_action=True, post_power_action=None,
1676                             shutdown_timeout=None):
1677        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1678
1679        @param shutdown_action: function which makes DUT shutdown, like
1680                                pressing power key.
1681        @param pre_power_action: function which is called before next power on.
1682        @param run_power_action: power_key press by default, set to None to skip.
1683        @param post_power_action: function which is called after next power on.
1684        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1685        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1686        """
1687        self._call_action(shutdown_action)
1688        logging.info('Wait to ensure DUT shut down...')
1689        try:
1690            if shutdown_timeout is None:
1691                shutdown_timeout = self.faft_config.shutdown_timeout
1692            self.switcher.wait_for_client(timeout=shutdown_timeout)
1693            raise error.TestFail(
1694                    'Should shut the device down after calling %s.' %
1695                    shutdown_action.__name__)
1696        except ConnectionError:
1697            if self.faft_config.chrome_ec:
1698                self.check_shutdown_power_state(self.POWER_STATE_G3)
1699            logging.info(
1700                'DUT is surely shutdown. We are going to power it on again...')
1701
1702        if pre_power_action:
1703            self._call_action(pre_power_action)
1704        if run_power_action:
1705            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1706        if post_power_action:
1707            self._call_action(post_power_action)
1708
1709    def get_bootid(self, retry=3):
1710        """
1711        Return the bootid.
1712        """
1713        boot_id = None
1714        while retry:
1715            try:
1716                boot_id = self._client.get_boot_id()
1717                break
1718            except error.AutoservRunError:
1719                retry -= 1
1720                if retry:
1721                    logging.info('Retry to get boot_id...')
1722                else:
1723                    logging.warning('Failed to get boot_id.')
1724        logging.info('boot_id: %s', boot_id)
1725        return boot_id
1726
1727    def check_state(self, func):
1728        """
1729        Wrapper around _call_action with check_status set to True. This is a
1730        helper function to be used by tests and is currently implemented by
1731        calling _call_action with check_status=True.
1732
1733        TODO: This function's arguments need to be made more stringent. And
1734        its functionality should be moved over to check functions directly in
1735        the future.
1736
1737        @param func: A function, or a tuple (function, args, error_msg),
1738                             in which, args and error_msg are optional. args is
1739                             either a value or a tuple if multiple arguments.
1740                             This can also be a list containing multiple
1741                             function or tuple. In this case, these actions are
1742                             called in sequence.
1743        @return: The result value of the action function.
1744        @raise TestFail: If the function does notsucceed.
1745        """
1746        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1747        self._call_action(func, check_status=True)
1748        logging.info("-[FAFT]-[ end state_checker ]----------------")
1749
1750    def get_current_firmware_identity(self):
1751        """Get current firmware sha and fwids of body and vblock.
1752
1753        @return: Current firmware checksums and fwids, as a dict
1754        """
1755
1756        current_checksums = {
1757            'VBOOTA': self.faft_client.bios.get_sig_sha('a'),
1758            'FVMAINA': self.faft_client.bios.get_body_sha('a'),
1759            'VBOOTB': self.faft_client.bios.get_sig_sha('b'),
1760            'FVMAINB': self.faft_client.bios.get_body_sha('b'),
1761        }
1762        if not all(current_checksums.values()):
1763            raise error.TestError(
1764                    'Failed to get firmware sha: %s', current_checksums)
1765
1766        current_fwids = {
1767            'RO_FRID': self.faft_client.bios.get_section_fwid('ro'),
1768            'RW_FWID_A': self.faft_client.bios.get_section_fwid('a'),
1769            'RW_FWID_B': self.faft_client.bios.get_section_fwid('b'),
1770        }
1771        if not all(current_fwids.values()):
1772            raise error.TestError(
1773                    'Failed to get firmware fwid(s): %s', current_fwids)
1774
1775        identifying_info = dict(current_fwids)
1776        identifying_info.update(current_checksums)
1777        return identifying_info
1778
1779    def is_firmware_changed(self):
1780        """Check if the current firmware changed, by comparing its SHA and fwid.
1781
1782        @return: True if it is changed, otherwise False.
1783        """
1784        # Device may not be rebooted after test.
1785        self.faft_client.bios.reload()
1786
1787        current_info = self.get_current_firmware_identity()
1788        prev_info = self._backup_firmware_identity
1789
1790        if current_info == prev_info:
1791            return False
1792        else:
1793            changed = set()
1794            for section in set(current_info.keys()) | set(prev_info.keys()):
1795                if current_info.get(section) != prev_info.get(section):
1796                    changed.add(section)
1797
1798            logging.info('Firmware changed: %s', ', '.join(sorted(changed)))
1799            return True
1800
1801    def backup_firmware(self, suffix='.original'):
1802        """Backup firmware to file, and then send it to host.
1803
1804        @param suffix: a string appended to backup file name
1805        """
1806        remote_temp_dir = self.faft_client.system.create_temp_dir()
1807        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1808        self.faft_client.bios.dump_whole(remote_bios_path)
1809        self._client.get_file(remote_bios_path,
1810                              os.path.join(self.resultsdir, 'bios' + suffix))
1811
1812        if self.faft_config.chrome_ec:
1813            remote_ec_path = os.path.join(remote_temp_dir, 'ec')
1814            self.faft_client.ec.dump_whole(remote_ec_path)
1815            self._client.get_file(remote_ec_path,
1816                              os.path.join(self.resultsdir, 'ec' + suffix))
1817
1818        self._client.run('rm -rf %s' % remote_temp_dir)
1819        logging.info('Backup firmware stored in %s with suffix %s',
1820            self.resultsdir, suffix)
1821
1822        self._backup_firmware_identity = self.get_current_firmware_identity()
1823
1824    def is_firmware_saved(self):
1825        """Check if a firmware saved (called backup_firmware before).
1826
1827        @return: True if the firmware is backed up; otherwise False.
1828        """
1829        return bool(self._backup_firmware_identity)
1830
1831    def restore_firmware(self, suffix='.original', restore_ec=True,
1832                         reboot_ec=False):
1833        """Restore firmware from host in resultsdir.
1834
1835        @param suffix: a string appended to backup file name
1836        @param restore_ec: True to restore the ec firmware; False not to do.
1837        @param reboot_ec: True to reboot EC after restore (if it was restored)
1838        @return: True if firmware needed to be restored
1839        """
1840        if not self.is_firmware_changed():
1841            return False
1842
1843        # Backup current corrupted firmware.
1844        self.backup_firmware(suffix='.corrupt')
1845
1846        # Restore firmware.
1847        remote_temp_dir = self.faft_client.system.create_temp_dir()
1848
1849        bios_local = os.path.join(self.resultsdir, 'bios%s' % suffix)
1850        bios_remote = os.path.join(remote_temp_dir, 'bios%s' % suffix)
1851        self._client.send_file(bios_local, bios_remote)
1852        self.faft_client.bios.write_whole(bios_remote)
1853
1854        if self.faft_config.chrome_ec and restore_ec:
1855            ec_local = os.path.join(self.resultsdir, 'ec%s' % suffix)
1856            ec_remote = os.path.join(remote_temp_dir, 'ec%s' % suffix)
1857            self._client.send_file(ec_local, ec_remote)
1858            ec_cmd = self.faft_client.ec.get_write_cmd(ec_remote)
1859            try:
1860                self._client.run(ec_cmd, timeout=300)
1861            except error.AutoservSSHTimeout:
1862                logging.warn("DUT connection died during EC restore")
1863                self.faft_client.disconnect()
1864
1865            except error.GenericHostRunError:
1866                logging.warn("DUT command failed during EC restore")
1867                logging.debug("Full exception:", exc_info=True)
1868            if reboot_ec:
1869                self.switcher.mode_aware_reboot(
1870                        'custom', lambda: self.sync_and_ec_reboot('hard'))
1871            else:
1872                self.switcher.mode_aware_reboot()
1873        else:
1874            self.switcher.mode_aware_reboot()
1875        logging.info('Successfully restored firmware.')
1876        return True
1877
1878    def setup_firmwareupdate_shellball(self, shellball=None):
1879        """Setup a shellball to use in firmware update test.
1880
1881        Check if there is a given shellball, and it is a shell script. Then,
1882        send it to the remote host. Otherwise, use the
1883        /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
1884        BIOS and EC images with the active firmware images.
1885
1886        @param shellball: path of a shellball or default to None.
1887        """
1888        if shellball:
1889            # Determine the firmware file is a shellball or a raw binary.
1890            is_shellball = (utils.system_output("file %s" % shellball).find(
1891                    "shell script") != -1)
1892            if is_shellball:
1893                logging.info('Device will update firmware with shellball %s',
1894                             shellball)
1895                temp_path = self.faft_client.updater.get_temp_path()
1896                working_shellball = os.path.join(temp_path,
1897                                                 'chromeos-firmwareupdate')
1898                self._client.send_file(shellball, working_shellball)
1899                self.faft_client.updater.extract_shellball()
1900            else:
1901                raise error.TestFail(
1902                    'The given shellball is not a shell script.')
1903        else:
1904            logging.info('No shellball given, use the original shellball and '
1905                         'replace its BIOS and EC images.')
1906            work_path = self.faft_client.updater.get_work_path()
1907            bios_in_work_path = os.path.join(
1908                work_path, self.faft_client.updater.get_bios_relative_path())
1909            ec_in_work_path = os.path.join(
1910                work_path, self.faft_client.updater.get_ec_relative_path())
1911            logging.info('Writing current BIOS to: %s', bios_in_work_path)
1912            self.faft_client.bios.dump_whole(bios_in_work_path)
1913            if self.faft_config.chrome_ec:
1914                logging.info('Writing current EC to: %s', ec_in_work_path)
1915                self.faft_client.ec.dump_firmware(ec_in_work_path)
1916            self.faft_client.updater.repack_shellball()
1917
1918    def is_kernel_changed(self):
1919        """Check if the current kernel is changed, by comparing its SHA1 hash.
1920
1921        @return: True if it is changed; otherwise, False.
1922        """
1923        changed = False
1924        for p in ('A', 'B'):
1925            backup_sha = self._backup_kernel_sha.get(p, None)
1926            current_sha = self.faft_client.kernel.get_sha(p)
1927            if backup_sha != current_sha:
1928                changed = True
1929                logging.info('Kernel %s is changed', p)
1930        return changed
1931
1932    def backup_kernel(self, suffix='.original'):
1933        """Backup kernel to files, and the send them to host.
1934
1935        @param suffix: a string appended to backup file name.
1936        """
1937        remote_temp_dir = self.faft_client.system.create_temp_dir()
1938        for p in ('A', 'B'):
1939            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1940            self.faft_client.kernel.dump(p, remote_path)
1941            self._client.get_file(
1942                    remote_path,
1943                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1944            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1945        logging.info('Backup kernel stored in %s with suffix %s',
1946            self.resultsdir, suffix)
1947
1948    def is_kernel_saved(self):
1949        """Check if kernel images are saved (backup_kernel called before).
1950
1951        @return: True if the kernel is saved; otherwise, False.
1952        """
1953        return len(self._backup_kernel_sha) != 0
1954
1955    def restore_kernel(self, suffix='.original'):
1956        """Restore kernel from host in resultsdir.
1957
1958        @param suffix: a string appended to backup file name.
1959        """
1960        if not self.is_kernel_changed():
1961            return
1962
1963        # Backup current corrupted kernel.
1964        self.backup_kernel(suffix='.corrupt')
1965
1966        # Restore kernel.
1967        remote_temp_dir = self.faft_client.system.create_temp_dir()
1968        for p in ('A', 'B'):
1969            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1970            self._client.send_file(
1971                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1972                    remote_path)
1973            self.faft_client.kernel.write(p, remote_path)
1974
1975        self.switcher.mode_aware_reboot()
1976        logging.info('Successfully restored kernel.')
1977
1978    def backup_cgpt_attributes(self):
1979        """Backup CGPT partition table attributes."""
1980        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1981
1982    def restore_cgpt_attributes(self):
1983        """Restore CGPT partition table attributes."""
1984        current_table = self.faft_client.cgpt.get_attributes()
1985        if current_table == self._backup_cgpt_attr:
1986            return
1987        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1988                     self._backup_cgpt_attr,
1989                     current_table)
1990        self.faft_client.cgpt.set_attributes(
1991                self._backup_cgpt_attr['A'], self._backup_cgpt_attr['B'])
1992
1993        self.switcher.mode_aware_reboot()
1994        logging.info('Successfully restored CGPT table.')
1995
1996    def try_fwb(self, count=0):
1997        """set to try booting FWB count # times
1998
1999        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
2000        vboot2
2001
2002        @param count: an integer specifying value to program into
2003                      fwb_tries(vb1)/fw_try_next(vb2)
2004        """
2005        if self.fw_vboot2:
2006            self.faft_client.system.set_fw_try_next('B', count)
2007        else:
2008            # vboot1: we need to boot into fwb at least once
2009            if not count:
2010                count = count + 1
2011            self.faft_client.system.set_try_fw_b(count)
2012
2013    def identify_shellball(self, include_ec=None):
2014        """Get the FWIDs of all targets and sections in the shellball
2015
2016        @param include_ec: if True, get EC fwids.
2017                           If None (default), assume True if board has an EC
2018        @return: the dict of versions in the shellball
2019        """
2020        fwids = dict()
2021        fwids['bios'] = self.faft_client.updater.get_image_fwids('bios')
2022
2023        if include_ec is None:
2024            if self.faft_config.platform.lower() == 'samus':
2025                include_ec = False  # no ec.bin in shellball
2026            else:
2027                include_ec = self.faft_config.chrome_ec
2028
2029        if include_ec:
2030            fwids['ec'] = self.faft_client.updater.get_image_fwids('ec')
2031        return fwids
2032
2033    def modify_shellball(self, append, modify_ro=True, modify_ec=False):
2034        """Modify the FWIDs of targets and sections in the shellball
2035
2036        @return: the full path of the shellball
2037        """
2038
2039        if modify_ro:
2040            self.faft_client.updater.modify_image_fwids(
2041                    'bios', ['ro', 'a', 'b'])
2042        else:
2043            self.faft_client.updater.modify_image_fwids(
2044                    'bios', ['a', 'b'])
2045
2046        if modify_ec:
2047            if modify_ro:
2048                self.faft_client.updater.modify_image_fwids(
2049                        'ec', ['ro', 'rw'])
2050            else:
2051                self.faft_client.updater.modify_image_fwids(
2052                        'ec', ['rw'])
2053
2054        modded_shellball = self.faft_client.updater.repack_shellball(append)
2055
2056        return modded_shellball
2057
2058    @staticmethod
2059    def check_fwids_written(before_fwids, image_fwids, after_fwids,
2060                            expected_written):
2061        """Check the dicts of fwids for correctness after an update is applied.
2062
2063        The targets checked come from the keys of expected_written.
2064        The sections checked come from the inner dicts of the fwids parameters.
2065
2066        The fwids should be keyed by target (flash type), then by section:
2067        {'bios': {'ro': '<fwid>', 'a': '<fwid>', 'b': '<fwid>'},
2068         'ec': {'ro': '<fwid>', 'rw': '<fwid>'}
2069
2070        For expected_written, the dict should be keyed by flash type only:
2071        {'bios': ['ro'], 'ec': ['ro', 'rw']}
2072        To expect the contents completely unchanged, give only the keys:
2073        {'bios': [], 'ec': []} or {'bios': None, 'ec': None}
2074
2075        @param before_fwids: dict of versions from before the update
2076        @param image_fwids: dict of versions in the update
2077        @param after_fwids: dict of actual versions after the update
2078        @param expected_written: dict indicating which ones should have changed
2079        @return: list of error lines for mismatches
2080
2081        @type before_fwids: dict
2082        @type image_fwids: dict | None
2083        @type after_fwids: dict
2084        @type expected_written: dict
2085        @rtype: list
2086        """
2087        errors = []
2088
2089        if image_fwids is None:
2090            image_fwids = {}
2091
2092        for target in sorted(expected_written.keys()):
2093            # target is BIOS or EC
2094
2095            before_missing = (target not in before_fwids)
2096            after_missing = (target not in after_fwids)
2097            if before_missing or after_missing:
2098                if before_missing:
2099                    errors.append("...no before_fwids[%s]" % target)
2100                if after_missing:
2101                    errors.append("...no after_fwids[%s]" % target)
2102                continue
2103
2104            written_sections = expected_written.get(target) or list()
2105            written_sections = set(written_sections)
2106
2107            before_sections = set(before_fwids.get(target) or dict())
2108            image_sections = set(image_fwids.get(target) or dict())
2109            after_sections = set(after_fwids.get(target) or dict())
2110
2111            for section in before_sections | image_sections | after_sections:
2112                # section is RO, RW, A, or B
2113
2114                before_fwid = before_fwids[target][section]
2115                image_fwid = image_fwids.get(target, {}).get(section, None)
2116                actual_fwid = after_fwids[target][section]
2117
2118                if section in written_sections:
2119                    expected_fwid = image_fwid
2120                    expected_desc = 'rewritten fwid (%s)' % expected_fwid
2121                    if image_fwid == before_fwid:
2122                        expected_desc = ('rewritten (no changes) fwid (%s)' %
2123                                         expected_fwid)
2124                else:
2125                    expected_fwid = before_fwid
2126                    expected_desc = 'original fwid (%s)' % expected_fwid
2127
2128                if actual_fwid == expected_fwid:
2129                    actual_desc = 'correct value'
2130
2131                elif actual_fwid == image_fwid:
2132                    actual_desc = 'rewritten fwid (%s)' % actual_fwid
2133                    if image_fwid == before_fwid:
2134                        # The flash could have been rewritten with the same fwid
2135                        actual_desc = 'possibly written fwid (%s)' % actual_fwid
2136
2137                elif actual_fwid == before_fwid:
2138                    actual_desc = 'original fwid (%s)' % actual_fwid
2139
2140                else:
2141                    actual_desc = 'unknown fwid (%s)' % actual_fwid
2142
2143                msg = ("...FWID (%s %s): expected %s, got %s" %
2144                       (target.upper(), section.upper(),
2145                        expected_desc, actual_desc))
2146
2147                if actual_fwid != expected_fwid:
2148                    errors.append(msg)
2149        return errors
2150
2151
2152    def fwmp_is_cleared(self):
2153        """Return True if the FWMP has been created"""
2154        res = self.host.run('cryptohome '
2155                            '--action=get_firmware_management_parameters',
2156                            ignore_status=True)
2157        if res.exit_status and res.exit_status != self.FWMP_CLEARED_EXIT_STATUS:
2158            raise error.TestError('Could not run cryptohome command %r' % res)
2159        return self.FWMP_CLEARED_ERROR_MSG in res.stdout
2160
2161
2162    def _tpm_is_owned(self):
2163        """Returns True if the tpm is owned"""
2164        result = self.host.run('tpm_manager_client status --nonsensitive',
2165                               ignore_status=True)
2166        logging.debug(result)
2167        return result.exit_status == 0 and 'is_owned: true' in result.stdout
2168
2169    def clear_fwmp(self):
2170        """Clear the FWMP"""
2171        if self.fwmp_is_cleared():
2172            return
2173        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
2174        self.host.run('tpm_manager_client take_ownership')
2175        if not utils.wait_for_value(self._tpm_is_owned, expected_value=True):
2176            raise error.TestError('Unable to own tpm while clearing fwmp.')
2177        self.host.run('cryptohome '
2178                      '--action=remove_firmware_management_parameters')
2179
2180    def wait_for(self, cfg_field, action_msg=None, extra_time=0):
2181        """Waits for time specified in a config.
2182
2183        @ivar cfg_field: The name of the config field that specifies the
2184                            time to wait.
2185        @ivar action_msg: Optional log message describing the action that
2186                            will occur after the wait.
2187        @ivar extra_time: Additional time to be added to time from config.
2188        """
2189        wait_time = self.faft_config.__getattr__(cfg_field) + extra_time
2190        if extra_time:
2191            wait_src = "%s + %s" % (cfg_field, extra_time)
2192        else:
2193            wait_src = cfg_field
2194
2195        units = 'second' if wait_time==1 else 'seconds'
2196        start_msg = "Waiting %s(%s) %s" % (wait_time, wait_src, units)
2197        if action_msg:
2198            start_msg += ", before '%s'" % action_msg
2199        start_msg += "."
2200
2201        logging.info(start_msg)
2202        time.sleep(wait_time)
2203        logging.info("Done waiting.")
2204
2205    def _try_to_bring_dut_up(self):
2206        """Try to quickly get the dut in a pingable state"""
2207        if not hasattr(self, 'cr50'):
2208            raise error.TestNAError('Test can only be run on devices with '
2209                                    'access to the Cr50 console')
2210
2211        logging.info('checking dut state')
2212
2213        self.servo.set_nocheck('cold_reset', 'off')
2214        try:
2215            self.servo.set_nocheck('warm_reset', 'off')
2216        except error.TestFail as e:
2217            # TODO(b/159338538): remove once the kukui remap issue is resolved.
2218            if 'Timed out waiting for interfaces to become available' in str(e):
2219                logging.warn('Ignoring warm_reset interface issue b/159338538')
2220            else:
2221                raise
2222
2223        time.sleep(self.cr50.SHORT_WAIT)
2224        if not self.cr50.ap_is_on():
2225            logging.info('Pressing power button to turn on AP')
2226            self.servo.power_short_press()
2227
2228        end_time = time.time() + self.RESPONSE_TIMEOUT
2229        while not self.host.ping_wait_up(
2230                self.faft_config.delay_reboot_to_ping * 2):
2231            if time.time() > end_time:
2232                logging.warn('DUT is unresponsive after trying to bring it up')
2233                return
2234            self.servo.get_power_state_controller().reset()
2235            logging.info('DUT did not respond. Resetting it.')
2236
2237    def _check_open_and_press_power_button(self):
2238        """Check stdout and press the power button if prompted.
2239
2240        Returns:
2241            True if the process is still running.
2242        """
2243        if not hasattr(self, 'cr50'):
2244            raise error.TestNAError('Test can only be run on devices with '
2245                                    'access to the Cr50 console')
2246
2247        logging.info(self._get_ccd_open_output())
2248        self.servo.power_short_press()
2249        logging.info('long int power button press')
2250        # power button press cr50 erases nvmem and resets the dut before setting
2251        # the state to open. Wait a bit so we don't check the ccd state in the
2252        # middle of this reset process. Power button requests happen once a
2253        # minute, so waiting 10 seconds isn't a big deal.
2254        time.sleep(10)
2255        return (self.cr50.OPEN == self.cr50.get_ccd_level() or
2256                self._ccd_open_job.sp.poll() is not None)
2257
2258    def _get_ccd_open_output(self):
2259        """Read the new output."""
2260        if not hasattr(self, 'cr50'):
2261            raise error.TestNAError('Test can only be run on devices with '
2262                                    'access to the Cr50 console')
2263
2264        self._ccd_open_job.process_output()
2265        self._ccd_open_stdout.seek(self._ccd_open_last_len)
2266        output = self._ccd_open_stdout.read()
2267        self._ccd_open_last_len = self._ccd_open_stdout.len
2268        return output
2269
2270    def _close_ccd_open_job(self):
2271        """Terminate the process and check the results."""
2272        if not hasattr(self, 'cr50'):
2273            raise error.TestNAError('Test can only be run on devices with '
2274                                    'access to the Cr50 console')
2275
2276        exit_status = utils.nuke_subprocess(self._ccd_open_job.sp)
2277        stdout = self._ccd_open_stdout.getvalue().strip()
2278        delattr(self, '_ccd_open_job')
2279        if stdout:
2280            logging.info('stdout of ccd open:\n%s', stdout)
2281        if exit_status:
2282            logging.info('exit status: %d', exit_status)
2283        if 'Error' in stdout:
2284            raise error.TestFail('ccd open Error %s' %
2285                                 stdout.split('Error')[-1])
2286        if self.cr50.OPEN != self.cr50.get_ccd_level():
2287            raise error.TestFail('unable to open cr50: %s' % stdout)
2288        else:
2289            logging.info('Opened Cr50')
2290
2291    def ccd_open_from_ap(self):
2292        """Start the open process and press the power button."""
2293        if not hasattr(self, 'cr50'):
2294            raise error.TestNAError('Test can only be run on devices with '
2295                                    'access to the Cr50 console')
2296
2297        # Opening CCD requires power button presses. If those presses would
2298        # power off the AP and prevent CCD open from completing, ignore them.
2299        if self.faft_config.ec_forwards_short_pp_press:
2300            self.stop_powerd()
2301
2302        # Make sure the test waits long enough to avoid ccd rate limiting.
2303        time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT)
2304
2305        self._ccd_open_last_len = 0
2306
2307        self._ccd_open_stdout = StringIO.StringIO()
2308
2309        ccd_open_cmd = utils.sh_escape('gsctool -a -o')
2310        full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'),
2311                                    ccd_open_cmd)
2312        # Start running the Cr50 Open process in the background.
2313        self._ccd_open_job = utils.BgJob(full_ssh_cmd,
2314                                         nickname='ccd_open',
2315                                         stdout_tee=self._ccd_open_stdout,
2316                                         stderr_tee=utils.TEE_TO_LOGS)
2317        if self._ccd_open_job == None:
2318            raise error.TestFail('could not start ccd open')
2319
2320        try:
2321            # Wait for the first gsctool power button prompt before starting the
2322            # open process.
2323            logging.info(self._get_ccd_open_output())
2324            # Cr50 starts out by requesting 5 quick presses then 4 longer
2325            # power button presses. Run the quick presses without looking at the
2326            # command output, because getting the output can take some time. For
2327            # the presses that require a 1 minute wait check the output between
2328            # presses, so we can catch errors
2329            #
2330            # run quick presses for 30 seconds. It may take a couple of seconds
2331            # for open to start. 10 seconds should be enough. 30 is just used
2332            # because it will definitely be enough, and this process takes 300
2333            # seconds, so doing quick presses for 30 seconds won't matter.
2334            end_time = time.time() + 30
2335            while time.time() < end_time:
2336                self.servo.power_short_press()
2337                logging.info('short int power button press')
2338                time.sleep(self.PP_SHORT_INTERVAL)
2339            # Poll the output and press the power button for the longer presses.
2340            utils.wait_for_value(self._check_open_and_press_power_button,
2341                                 expected_value=True,
2342                                 timeout_sec=self.cr50.PP_LONG)
2343        except Exception as e:
2344            logging.info(e)
2345            raise
2346        finally:
2347            self._close_ccd_open_job()
2348            self._try_to_bring_dut_up()
2349        logging.info(self.cr50.get_ccd_info())
2350
2351    def enter_mode_after_checking_cr50_state(self, mode):
2352        """Reboot to mode if cr50 doesn't already match the state"""
2353        if not hasattr(self, 'cr50'):
2354            raise error.TestNAError('Test can only be run on devices with '
2355                                    'access to the Cr50 console')
2356
2357        # If the device is already in the correct mode, don't do anything
2358        if (mode == 'dev') == self.cr50.in_dev_mode():
2359            logging.info('already in %r mode', mode)
2360            return
2361
2362        self.switcher.reboot_to_mode(to_mode=mode)
2363
2364        if (mode == 'dev') != self.cr50.in_dev_mode():
2365            raise error.TestError('Unable to enter %r mode' % mode)
2366
2367    def fast_ccd_open(self, enable_testlab=False, reset_ccd=True,
2368                      dev_mode=False):
2369        """Try to use ccd testlab open. If that fails, do regular ap open.
2370
2371        Args:
2372            enable_testlab: If True, enable testlab mode after cr50 is open.
2373            reset_ccd: If True, reset ccd after open.
2374            dev_mode: True if the device should be in dev mode after ccd is
2375                      is opened.
2376        """
2377        if not hasattr(self, 'cr50'):
2378            raise error.TestNAError('Test can only be run on devices with '
2379                                    'access to the Cr50 console')
2380
2381        if self.servo.main_device_is_ccd():
2382            error_txt = 'because the main servo device is CCD.'
2383            if enable_testlab:
2384                raise error.TestNAError('Cannot enable testlab: %s' % error_txt)
2385            elif reset_ccd:
2386                raise error.TestNAError('CCD reset not allowed: %s' % error_txt)
2387
2388        if not self.faft_config.has_powerbutton:
2389            logging.warning('No power button', exc_info=True)
2390            enable_testlab = False
2391
2392        # Try to use testlab open first, so we don't have to wait for the
2393        # physical presence check.
2394        self.cr50.send_command('ccd testlab open')
2395        if self.cr50.OPEN != self.cr50.get_ccd_level():
2396            if self.servo.has_control('chassis_open'):
2397                self.servo.set('chassis_open', 'yes')
2398            pw = '' if self.cr50.password_is_reset() else self.CCD_PASSWORD
2399            # Use the console to open cr50 without entering dev mode if
2400            # possible. Ittakes longer and relies on more systems to enter dev
2401            # mode and ssh into the AP. Skip the steps that aren't required.
2402            if not (pw or self.cr50.get_cap(
2403                            'OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]):
2404                self.enter_mode_after_checking_cr50_state('dev')
2405
2406            if pw or self.cr50.get_cap(
2407                            'OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]:
2408                self.cr50.set_ccd_level(self.cr50.OPEN, pw)
2409            else:
2410                self.ccd_open_from_ap()
2411
2412            if self.servo.has_control('chassis_open'):
2413                self.servo.set('chassis_open', 'no')
2414
2415            if enable_testlab:
2416                self.cr50.set_ccd_testlab('on')
2417
2418        if reset_ccd:
2419            self.cr50.send_command('ccd reset')
2420
2421        # In default, the device should be in normal mode. After opening cr50,
2422        # the TPM should be cleared and the device should automatically reset to
2423        # normal mode. However, some tests might want the device in 'dev' mode.
2424        self.enter_mode_after_checking_cr50_state('dev' if dev_mode else
2425                                                 'normal')
2426