• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import print_function
7
8import ctypes
9import logging
10import os
11import pprint
12import re
13import time
14import uuid
15from xml.parsers import expat
16
17import six
18from autotest_lib.client.bin import utils
19from autotest_lib.client.common_lib import error, global_config
20from autotest_lib.client.common_lib.cros import dev_server, retry, tpm_utils
21from autotest_lib.server import test
22from autotest_lib.server.cros import vboot_constants as vboot
23from autotest_lib.server.cros.faft import telemetry
24from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
25from autotest_lib.server.cros.faft.utils import (menu_mode_switcher,
26                                                 menu_navigator, mode_switcher)
27from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
28from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
29from autotest_lib.server.cros.power import utils as PowerUtils
30from autotest_lib.server.cros.servo import (chrome_base_ec, chrome_cr50,
31                                            chrome_ec, chrome_ti50, servo)
32from autotest_lib.site_utils import test_runner_utils
33
34# Experimentally tuned time in minutes to wait for partition device nodes on a
35# USB stick to be ready after plugging in the stick.
36PARTITION_TABLE_READINESS_TIMEOUT = 0.1  # minutes
37# Experimentally tuned time in seconds to wait for the first retry of reading
38# the sysfs node of a USB stick's partition device node.
39PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY = 1  # seconds
40
41ConnectionError = mode_switcher.ConnectionError
42
43
44class FirmwareTest(test.test):
45    """
46    Base class that sets up helper objects/functions for firmware tests.
47
48    It launches the FAFTClient on DUT, such that the test can access its
49    firmware functions and interfaces. It also provides some methods to
50    handle the reboot mechanism, in order to ensure FAFTClient is still
51    connected after reboot.
52    @type servo: servo.Servo
53    @type _client: autotest_lib.server.hosts.ssh_host.SSHHost |
54                   autotest_lib.server.hosts.cros_host.CrosHost
55
56    TODO: add documentaion as the FAFT rework progresses.
57    """
58    version = 1
59
60    # Set this to True in test classes that need to boot from the USB stick.
61    # When True, initialize() will raise TestWarn if USB stick is marked bad.
62    NEEDS_SERVO_USB = False
63
64    # Mapping of partition number of kernel and rootfs.
65    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
66    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
67    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
68    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
69
70    # Mapping of kernel type and name.
71    KERNEL_TYPE_NAME_MAP = {'KERN': 'kernel', 'MINIOS': 'minios'}
72
73    CHROMEOS_MAGIC = "CHROMEOS"
74    CORRUPTED_MAGIC = "CORRUPTD"
75
76    # System power states
77    POWER_STATE_S0 = 'S0'
78    POWER_STATE_S0IX = 'S0ix'
79    POWER_STATE_S3 = 'S3'
80    POWER_STATE_S5 = 'S5'
81    POWER_STATE_G3 = 'G3'
82    POWER_STATE_SUSPEND = '|'.join([POWER_STATE_S0IX, POWER_STATE_S3])
83
84    # Delay for waiting client to return before EC suspend
85    EC_SUSPEND_DELAY = 5
86
87    # Delay between EC suspend and wake
88    WAKE_DELAY = 10
89
90    # Delay between closing and opening lid
91    LID_DELAY = 1
92
93    # Delay for establishing state after changing PD settings
94    PD_RESYNC_DELAY = 2
95
96    # Delay to wait for servo USB to work after power role swap:
97    # tPSSourceOff (920ms) + tPSSourceOn (480ms) + buffer
98    POWER_ROLE_SWAP_DELAY = 2
99
100    # The default number of power state check retries (each try takes 3 secs)
101    DEFAULT_PWR_RETRIES = 5
102
103    # FWMP space constants
104    FWMP_CLEARED_EXIT_STATUS = 1
105    FWMP_CLEARED_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS'
106                              '_INVALID')
107
108    _ROOTFS_PARTITION_NUMBER = 3
109
110    # Class level variable, keep track the states of one time setup.
111    # This variable is preserved across tests which inherit this class.
112    _global_setup_done = {
113        'gbb_flags': False,
114        'reimage': False,
115        'usb_check': False,
116    }
117
118    # CCD password used by tests.
119    CCD_PASSWORD = 'Password'
120
121    RESPONSE_TIMEOUT = 180
122
123    @classmethod
124    def check_setup_done(cls, label):
125        """Check if the given setup is done.
126
127        @param label: The label of the setup.
128        """
129        return cls._global_setup_done[label]
130
131    @classmethod
132    def mark_setup_done(cls, label):
133        """Mark the given setup done.
134
135        @param label: The label of the setup.
136        """
137        cls._global_setup_done[label] = True
138
139    @classmethod
140    def unmark_setup_done(cls, label):
141        """Mark the given setup not done.
142
143        @param label: The label of the setup.
144        """
145        cls._global_setup_done[label] = False
146
147    def initialize(self, host, cmdline_args, ec_wp=None):
148        """Initialize the FirmwareTest.
149
150        This method interacts with the Servo, FAFT RPC client, FAFT Config,
151        Mode Switcher, EC consoles, write-protection, GBB flags, and a lockfile.
152
153        @type host: autotest_lib.server.hosts.CrosHost
154        """
155        self.run_id = str(uuid.uuid4())
156        self._client = host
157        self.servo = host.servo
158        if self.servo is None:
159            raise error.TestError('FirmwareTest failed to set up servo')
160
161        self.lockfile = '/usr/local/tmp/faft/lock'
162        self._backup_gbb_flags = None
163        self._backup_firmware_identity = dict()
164        self._backup_kernel_sha = dict()
165        for kernel_type in self.KERNEL_TYPE_NAME_MAP:
166            self._backup_kernel_sha[kernel_type] = dict()
167        self._backup_cgpt_attr = dict()
168        self._backup_dev_mode = None
169        self._restore_power_mode = None
170        self._uart_file_dict = {}
171
172        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
173
174        # Parse arguments from command line
175        args = {}
176        self.power_control = host.POWER_CONTROL_RPM
177        for arg in cmdline_args:
178            match = re.search("^(\w+)=(.+)", arg)
179            if match:
180                args[match.group(1)] = match.group(2)
181
182        self._no_fw_rollback_check = False
183        if 'no_fw_rollback_check' in args:
184            if 'true' in args['no_fw_rollback_check'].lower():
185                self._no_fw_rollback_check = True
186
187        self._no_ec_sync = False
188        if 'no_ec_sync' in args:
189            if 'true' in args['no_ec_sync'].lower():
190                self._no_ec_sync = True
191
192        self._use_sync_script = global_config.global_config.get_config_value(
193                'CROS', 'enable_fs_sync_script', type=bool, default=False)
194
195        self.servo.initialize_dut()
196        self.faft_client = RPCProxy(host)
197        self.faft_config = FAFTConfig(
198                self.faft_client.system.get_platform_name(),
199                self.faft_client.system.get_model_name())
200        self.checkers = FAFTCheckers(self)
201
202        # Mapping of kernel type and kernel servicer class
203        self.kernel_servicer = {
204                'KERN': self.faft_client.kernel,
205                'MINIOS': self.faft_client.minios,
206        }
207
208        if self.faft_config.chrome_ec:
209            self.ec = chrome_ec.ChromeEC(self.servo)
210        self.menu_navigator = menu_navigator.create_menu_navigator(self)
211        self.switcher = mode_switcher.create_mode_switcher(
212                self, self.menu_navigator)
213        # This will be None for menuless UI
214        self.menu_switcher = menu_mode_switcher.create_menu_mode_switcher(
215                self, self.menu_navigator)
216        # Check for presence of a USBPD console
217        if self.faft_config.chrome_usbpd:
218            self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
219        elif self.faft_config.chrome_ec:
220            # If no separate USBPD console, then PD exists on EC console
221            self.usbpd = self.ec
222        # Get pdtester console
223        self.pdtester = host.pdtester
224        self.pdtester_host = host._pdtester_host
225        gsc = None
226        if self.servo.has_control('ti50_version') or \
227            self.servo.has_control('ti50_version', 'ccd_gsc'):
228            gsc = chrome_ti50.ChromeTi50(self.servo, self.faft_config)
229        elif self.servo.has_control('cr50_version'):
230            gsc = chrome_cr50.ChromeCr50(self.servo, self.faft_config)
231        if gsc:
232            try:
233                # Check that the gsc console works before declaring the
234                # connection exists and enabling uart capture.
235                gsc.get_version()
236                self.cr50 = gsc
237            except servo.ControlUnavailableError:
238                logging.warning('gsc console not supported.')
239            except Exception as e:
240                logging.warning('Ignored unknown gsc version error: %s',
241                                str(e))
242
243        if 'power_control' in args:
244            self.power_control = args['power_control']
245            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
246                raise error.TestError('Valid values for --args=power_control '
247                                      'are %s. But you entered wrong argument '
248                                      'as "%s".'
249                                      % (host.POWER_CONTROL_VALID_ARGS,
250                                         self.power_control))
251
252        if self.NEEDS_SERVO_USB and not host.is_servo_usb_usable():
253            usb_state = host.get_servo_usb_state()
254            raise error.TestWarn(
255                    "Servo USB disk unusable (%s); canceling test." %
256                    usb_state)
257
258        if not self.faft_client.system.dev_tpm_present():
259            raise error.TestError('/dev/tpm0 does not exist on the client')
260
261        # Initialize servo role to src
262        self.servo.set_servo_v4_role('src')
263
264        # Create the BaseEC object. None if not available.
265        self.base_ec = chrome_base_ec.create_base_ec(self.servo)
266
267        self._record_uart_capture()
268        self._record_system_info()
269        self.faft_client.system.set_dev_default_boot()
270        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
271        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
272        if self.fw_vboot2:
273            self.faft_client.system.set_fw_try_next('A')
274            if self.faft_client.system.get_crossystem_value(
275                    'mainfw_act') == 'B':
276                logging.info('mainfw_act is B. rebooting to set it A')
277                # TODO(crbug.com/1018322): remove try/catch once that bug is
278                # marked as fixed and verified. In that case the overlay for
279                # the board itself will map warm_reset to cold_reset.
280                try:
281                    self.switcher.mode_aware_reboot()
282                except ConnectionError as e:
283                    if 'DUT is still up unexpectedly' in str(e):
284                        # In this case, try doing a cold_reset instead
285                        self.switcher.mode_aware_reboot(reboot_type='cold')
286                    else:
287                        raise
288
289        # Check flashrom before first use, to avoid xmlrpclib.Fault.
290        if not self.faft_client.bios.is_available():
291            raise error.TestError(
292                    "flashrom is broken; check 'flashrom -p host'"
293                    "and rpc server log.")
294
295        self._setup_gbb_flags()
296        self.faft_client.updater.stop_daemon()
297        self._create_faft_lockfile()
298        self._setup_ec_write_protect(ec_wp)
299        # See chromium:239034 regarding needing this sync.
300        self.blocking_sync()
301        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
302
303    def stage_build_to_usbkey(self):
304        """Downloads host's build to the USB key attached to servo.
305
306        @return: True if build is verified to be on USB key, False otherwise.
307        """
308        info = self._client.host_info_store.get()
309        if info.build and info.build != test_runner_utils.NO_BUILD:
310            current_build = self._client._servo_host.validate_image_usbkey()
311            if current_build != info.build:
312                logging.debug('Current build on USB: %s differs from test'
313                              ' build: %s, proceed with download.',
314                              current_build, info.build)
315                try:
316                    self._client.stage_build_to_usb(info.build)
317                    return True
318                except (error.AutotestError,
319                        dev_server.DevServerException) as e:
320                    logging.warning(
321                            'Stage build to USB failed, tests that require'
322                            ' test image on Servo USB may fail: {}'.format(e))
323                    return False
324            else:
325                logging.debug('Current build on USB: %s is same as test'
326                              ' build, skip download.', current_build)
327                return True
328        else:
329            logging.warning('Failed to get build label from the DUT, will use'
330                            ' existing image in Servo USB.')
331            return False
332
333    def run_once(self, *args, **dargs):
334        """Delegates testing to a test method.
335
336        test_name is either the 1st positional argument or a named argument.
337
338        test_name will be mapped to a test method as follows:
339        test_name                     method
340        --------------                -----------
341        <TestClass>                   test
342        <TestClass>.<Case>            test_<Case>
343        <TestClass>.<Case>.<SubCase>  test_<Case>_<SubCase>
344
345        Any arguments not consumed by FirmwareTest are passed to the test method.
346
347        @param test_name: Should be set to NAME in the control file.
348
349        @raise TestError: If test_name wasn't found in args, does not start
350                          with test class, or if the method is not found.
351        """
352        self_name = type(self).__name__
353
354        # Parse and remove test name from args.
355        if 'test_name' in dargs:
356            test_name = dargs.pop('test_name')
357        elif len(args) >= 1:
358            test_name = args[0]
359            args = args[1:]
360        else:
361            raise error.TestError('"%s" class must define run_once, or the'
362                                  ' control file must specify "test_name".' %
363                                  self_name)
364
365        # Check that test_name starts with the test class name.
366        name_parts = test_name.split('.')
367
368        test_class = name_parts.pop(0)
369        if test_class != self_name:
370            raise error.TestError('Class "%s" does not match that found in test'
371                                  ' name "%s"' % (self_name, test_class))
372
373        # Construct and call the test method.
374        method_name = '_'.join(['test'] + name_parts)
375        if not hasattr(self, method_name):
376            raise error.TestError('Method "%s" for testing "%s" not found in'
377                                  ' "%s"' % (method_name, test_name, self_name))
378
379        logging.info('Starting test: "%s"', test_name)
380        utils.cherry_pick_call(getattr(self, method_name), *args, **dargs)
381
382    def cleanup(self):
383        """Autotest cleanup function."""
384        # Unset state checker in case it's set by subclass
385        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
386
387        # Capture UART before doing anything else, so we can guarantee we get
388        # some uart results.
389        try:
390            self._record_uart_capture()
391        except:
392            logging.warning('Failed initial uart capture during cleanup')
393
394        try:
395            self.faft_client.system.is_available()
396        except:
397            # Remote is not responding. Revive DUT so that subsequent tests
398            # don't fail.
399            self._restore_routine_from_timeout()
400
401        if hasattr(self, 'switcher'):
402            self.switcher.restore_mode()
403
404        self._restore_ec_write_protect()
405        self._restore_servo_v4_role()
406
407        if hasattr(self, 'faft_client'):
408            self._restore_gbb_flags()
409            self.faft_client.updater.start_daemon()
410            self.faft_client.updater.cleanup()
411            self._remove_faft_lockfile()
412            self.faft_client.quit()
413            self.faft_client.collect_logfiles(self.resultsdir)
414
415        # Capture any new uart output, then discard log messages again.
416        self._cleanup_uart_capture()
417
418        super(FirmwareTest, self).cleanup()
419        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
420
421    def _record_system_info(self):
422        """Record some critical system info to the attr keyval.
423
424        This info is used by generate_test_report later.
425        """
426        system_info = {
427            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
428            'ec_version': self.faft_client.ec.get_version(),
429            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
430            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
431            'servo_host_os_version' : self.servo.get_os_version(),
432            'servod_version': self.servo.get_servod_version(),
433            'os_version': self._client.get_release_builder_path(),
434            'servo_type': self.servo.get_servo_version()
435        }
436
437        # Record the servo v4 and servo micro versions when possible
438        system_info.update(self.servo.get_servo_fw_versions())
439
440        if hasattr(self, 'cr50'):
441            system_info['cr50_version'] = self.cr50.get_full_version()
442
443        logging.info('System info:\n%s', pprint.pformat(system_info))
444        self.write_attr_keyval(system_info)
445
446    def invalidate_firmware_setup(self):
447        """Invalidate all firmware related setup state.
448
449        This method is called when the firmware is re-flashed. It resets all
450        firmware related setup states so that the next test setup properly
451        again.
452        """
453        self.unmark_setup_done('gbb_flags')
454
455    def _retrieve_recovery_reason_from_trap(self):
456        """Try to retrieve the recovery reason from a trapped recovery screen.
457
458        @return: The recovery_reason, 0 if any error.
459        """
460        recovery_reason = 0
461        logging.info('Try to retrieve recovery reason...')
462        if self.servo.get_usbkey_state() == 'dut':
463            self.switcher.bypass_rec_mode()
464        else:
465            self.servo.switch_usbkey('dut')
466
467        try:
468            self.switcher.wait_for_client()
469            lines = self.faft_client.system.run_shell_command_get_output(
470                        'crossystem recovery_reason')
471            recovery_reason = int(lines[0])
472            logging.info('Got the recovery reason %d.', recovery_reason)
473        except ConnectionError:
474            logging.error('Failed to get the recovery reason due to connection '
475                          'error.')
476        return recovery_reason
477
478    def _reset_client(self):
479        """Reset client to a workable state.
480
481        This method is called when the client is not responsive. It may be
482        caused by the following cases:
483          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
484          - corrupted firmware;
485          - corrutped OS image.
486        """
487        # DUT may halt on a firmware screen. Try cold reboot.
488        logging.info('Try cold reboot...')
489        self.switcher.mode_aware_reboot(reboot_type='cold',
490                                        sync_before_boot=False,
491                                        wait_for_dut_up=False)
492        self.switcher.wait_for_client_offline()
493        self.switcher.bypass_dev_mode()
494        try:
495            self.switcher.wait_for_client()
496            return
497        except ConnectionError:
498            logging.warning("Cold reboot didn't help, still connection error.")
499
500        # DUT may be broken by a corrupted firmware. Restore firmware.
501        # We assume the recovery boot still works fine. Since the recovery
502        # code is in RO region and all FAFT tests don't change the RO region
503        # except GBB.
504        if self.is_firmware_saved():
505            self._ensure_client_in_recovery()
506            logging.info('Try restoring the original firmware...')
507            try:
508                if self.restore_firmware():
509                    return
510            except ConnectionError:
511                logging.warning("Restoring firmware didn't help, still "
512                                "connection error.")
513
514        # Perhaps it's kernel that's broken. Let's try restoring it.
515        for kernel_type, kernel_name in self.KERNEL_TYPE_NAME_MAP.items():
516            if self.is_kernel_saved(kernel_type):
517                self._ensure_client_in_recovery()
518                logging.info('Try restoring the original %s...', kernel_name)
519                try:
520                    if self.restore_kernel(kernel_type=kernel_type):
521                        return
522                except ConnectionError:
523                    logging.warning(
524                            "Restoring %s didn't help, still "
525                            "connection error.", kernel_name)
526
527        # DUT may be broken by a corrupted OS image. Restore OS image.
528        self._ensure_client_in_recovery()
529        logging.info('Try restoring the OS image...')
530        self.faft_client.system.run_shell_command('chromeos-install --yes')
531        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
532        self.switcher.wait_for_client_offline()
533        self.switcher.bypass_dev_mode()
534        try:
535            self.switcher.wait_for_client()
536            logging.info('Successfully restored OS image.')
537            return
538        except ConnectionError:
539            logging.warning("Restoring OS image didn't help, still connection "
540                            "error.")
541
542    def _ensure_client_in_recovery(self):
543        """Ensure client in recovery boot; reboot into it if necessary.
544
545        @raise TestError: if failed to boot the USB image.
546        """
547        logging.info('Try booting into USB image...')
548        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
549                                     wait_for_dut_up=False)
550        self.servo.switch_usbkey('host')
551        self.switcher.bypass_rec_mode()
552        try:
553            self.switcher.wait_for_client()
554        except ConnectionError:
555            raise error.TestError('Failed to boot the USB image.')
556
557    def _restore_routine_from_timeout(self):
558        """A routine to try to restore the system from a timeout error.
559
560        This method is called when FAFT failed to connect DUT after reboot.
561
562        @raise TestFail: This exception is already raised, with a decription
563                         why it failed.
564        """
565        # DUT is disconnected. Capture the UART output for debug.
566        self._record_uart_capture()
567
568        # Replug the Ethernet to identify if it is a network flaky.
569        self.servo.eth_power_reset()
570
571        recovery_reason = self._retrieve_recovery_reason_from_trap()
572
573        # Reset client to a workable state.
574        self._reset_client()
575
576        # Raise the proper TestFail exception.
577        if recovery_reason:
578            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
579                                 'and timed out' % recovery_reason)
580        else:
581            raise error.TestFail('Timed out waiting for DUT reboot')
582
583    def assert_test_image_in_usb_disk(self, usb_dev=None):
584        """Assert an USB disk plugged-in on servo and a test image inside.
585
586        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
587                        If None, it is detected automatically.
588        @raise TestError: if USB disk not detected or not a test image.
589        """
590        if self.check_setup_done('usb_check'):
591            return
592        if usb_dev:
593            assert self.servo.get_usbkey_state() == 'host'
594        else:
595            self.servo.switch_usbkey('host')
596            usb_dev = self.servo.probe_host_usb_dev()
597            if not usb_dev:
598                raise error.TestError(
599                    'An USB disk should be plugged in the servo board. %s' %
600                    telemetry.collect_usb_state(self.servo))
601
602        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
603        logging.info('usb dev is %s', usb_dev)
604        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
605        # After the USB key is muxed from the DUT to the servo host, there
606        # appears to be a delay between when servod can confirm that a sysfs
607        # entry exists for the disk (as done by probe_host_usb_dev) and when
608        # sysfs entries get populated for the disk's partitions.
609        @retry.retry(error.AutoservRunError,
610                     timeout_min=PARTITION_TABLE_READINESS_TIMEOUT,
611                     delay_sec=PARTITION_TABLE_READINESS_FIRST_RETRY_DELAY)
612        def confirm_rootfs_partition_device_node_readable():
613            """Repeatedly poll for the RootFS partition sysfs node."""
614            self.servo.system('ls {}'.format(rootfs))
615
616        try:
617            confirm_rootfs_partition_device_node_readable()
618        except error.AutoservRunError as e:
619            usb_info = telemetry.collect_usb_state(self.servo)
620            raise error.TestError(
621                    ('Could not ls the device node for the RootFS on the USB '
622                     'device. %s: %s\nMore telemetry: %s') %
623                    (type(e).__name__, e, usb_info))
624        try:
625            self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
626        except error.AutoservRunError as e:
627            usb_info = telemetry.collect_usb_state(self.servo)
628            raise error.TestError(
629                ('Could not mount the partition on USB device. %s: %s\n'
630                 'More telemetry: %s') % (type(e).__name__, e, usb_info))
631
632        try:
633            usb_lsb = self.servo.system_output('cat %s' %
634                os.path.join(tmpd, 'etc/lsb-release'))
635            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
636            dut_lsb = '\n'.join(self.faft_client.system.
637                run_shell_command_get_output('cat /etc/lsb-release'))
638            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
639            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
640                raise error.TestError('USB stick in servo is no test image')
641            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
642            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
643            if usb_board != dut_board:
644                raise error.TestError('USB stick in servo contains a %s '
645                    'image, but DUT is a %s' % (usb_board, dut_board))
646        finally:
647            for cmd in ('umount -l %s' % tmpd, 'sync %s' % usb_dev,
648                        'rm -rf %s' % tmpd):
649                self.servo.system(cmd)
650
651        self.mark_setup_done('usb_check')
652
653    def setup_pdtester(self, flip_cc=False, dts_mode=False, pd_faft=True,
654                       min_batt_level=None):
655        """Setup the PDTester to a given state.
656
657        @param flip_cc: True to flip CC polarity; False to not flip it.
658        @param dts_mode: True to config PDTester to DTS mode; False to not.
659        @param pd_faft: True to config PD FAFT setup.
660        @param min_batt_level: An int for minimum battery level, or None for
661                               skip.
662        @raise TestError: If Servo v4 not setup properly.
663        """
664
665        # PD FAFT is only tested with a combination of servo_v4 or servo_v4p1
666        # with servo micro or C2D2.
667        pd_setup = []
668        for first in self.pdtester.FIRST_PD_SETUP_ELEMENT:
669            for second in self.pdtester.SECOND_PD_SETUP_ELEMENT:
670                pd_setup.append(first + '_with_' + second)
671
672        if pd_faft and self.pdtester.servo_type not in pd_setup:
673            raise error.TestError(', '.join(pd_setup) +
674                                  ' is a mandatory setup '
675                                  'for PD FAFT. Got %s.' %
676                                  self.pdtester.servo_type)
677
678        # Ensure the battery is enough for testing, this should be done before
679        # all the following setup.
680        if (min_batt_level is not None) and self._client.has_battery():
681            logging.info('Start charging if batt level < %d', min_batt_level)
682            PowerUtils.put_host_battery_in_range(self._client, min_batt_level,
683                                                 100, 600)
684
685        # Servo v4 by default has dts_mode enabled. Enabling dts_mode affects
686        # the behaviors of what PD FAFT tests. So we want it disabled.
687        pd_tester_device = self.pdtester.servo_type.split('_with_')[0]
688        if pd_tester_device in self.pdtester.FIRST_PD_SETUP_ELEMENT:
689            self.servo.set_dts_mode('on' if dts_mode else 'off')
690        else:
691            logging.warning('Configuring DTS mode only supported on %s',
692                            pd_tester_device)
693
694        self.pdtester.set('usbc_polarity', 'cc2' if flip_cc else 'cc1')
695        # Make it sourcing max voltage.
696        self.pdtester.charge(self.pdtester.USBC_MAX_VOLTAGE)
697
698        time.sleep(self.PD_RESYNC_DELAY)
699
700        # Servo v4 requires an external charger to source power. Make sure
701        # this setup is correct.
702        if pd_tester_device in self.pdtester.FIRST_PD_SETUP_ELEMENT:
703            role = self.pdtester.get('servo_pd_role')
704            if role != 'src':
705                raise error.TestError(
706                        '%s is not sourcing power! Make sure the servo '
707                        '"DUT POWER" port is connected to a working charger. '
708                        'servo_pd_role:%s' % (pd_tester_device, role))
709
710    def setup_usbkey(self, usbkey, host=None, used_for_recovery=None):
711        """Setup the USB disk for the test.
712
713        It checks the setup of USB disk and a valid ChromeOS test image inside.
714        It also muxes the USB disk to either the host or DUT by request.
715
716        @param usbkey: True if the USB disk is required for the test, False if
717                       not required.
718        @param host: Optional, True to mux the USB disk to host, False to mux it
719                    to DUT, default to do nothing.
720        @param used_for_recovery: Optional, True if the USB disk is used for
721                                  recovery boot; False if the USB disk is not
722                                  used for recovery boot, like Ctrl-U USB boot.
723        """
724        if usbkey:
725            self.stage_build_to_usbkey()
726            self.assert_test_image_in_usb_disk()
727        elif host is None:
728            # USB disk is not required for the test. Better to mux it to host.
729            host = True
730
731        if host is True:
732            self.servo.switch_usbkey('host')
733        elif host is False:
734            self.servo.switch_usbkey('dut')
735
736        if used_for_recovery is None:
737            # Default value is True if usbkey == True.
738            # As the common usecase of USB disk is for recovery boot. Tests
739            # can define it explicitly if not.
740            used_for_recovery = usbkey
741
742        if used_for_recovery:
743            # In recovery boot, the locked EC RO doesn't support PD for most
744            # of the CrOS devices. The default servo v4 power role is a SRC.
745            # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the
746            # data role swap from UFP to DFP; as a result, DUT can't see the
747            # USB disk and the Ethernet dongle on servo v4.
748            #
749            # This is a workaround to set servo v4 as a SNK, for every FAFT
750            # test which boots into the USB disk in the recovery mode.
751            #
752            # TODO(waihong): Add a check to see if the battery level is too
753            # low and sleep for a while for charging.
754            self.set_servo_v4_role_to_snk()
755
756            # Force reconnection; otherwise, the next RPC call will timeout
757            logging.info('Waiting for reconnection after power role swap...')
758            time.sleep(self.POWER_ROLE_SWAP_DELAY)
759            self.faft_client.disconnect()
760            self.faft_client.connect()
761
762    def set_servo_v4_role_to_snk(self, pd_comm=False):
763        """Set the servo v4 role to SNK.
764
765        @param pd_comm: a bool. Enable PD communication if True, else otherwise
766        """
767        self._needed_restore_servo_v4_role = True
768        self.servo.set_servo_v4_role('snk')
769        if pd_comm:
770            self.servo.set_servo_v4_pd_comm('on')
771
772    def _restore_servo_v4_role(self):
773        """Restore the servo v4 role to default SRC."""
774        if not hasattr(self, '_needed_restore_servo_v4_role'):
775            return
776        if self._needed_restore_servo_v4_role:
777            self.servo.set_servo_v4_role('src')
778
779    def set_dut_low_power_idle_delay(self, delay):
780        """Set EC low power idle delay
781
782        @param delay: Delay in seconds
783        """
784        if not self.ec.has_command('dsleep'):
785            logging.info("Can't set low power idle delay.")
786            return
787        self._previous_ec_low_power_delay = int(
788                self.ec.send_command_get_output("dsleep",
789                ["timeout:\s+(\d+)\ssec"])[0][1])
790        self.ec.send_command("dsleep " + str(delay))
791
792    def restore_dut_low_power_idle_delay(self):
793        """Restore EC low power idle delay"""
794        if getattr(self, '_previous_ec_low_power_delay', None):
795            self.ec.send_command("dsleep " + str(
796                    self._previous_ec_low_power_delay))
797
798    def get_usbdisk_path_on_dut(self):
799        """Get the path of the USB disk device plugged-in the servo on DUT.
800
801        Returns:
802          A string representing USB disk path, like '/dev/sdb', or None if
803          no USB disk is found.
804        """
805        cmd = 'ls -d /dev/s*[a-z]'
806        original_value = self.servo.get_usbkey_state()
807
808        # Make the dut unable to see the USB disk.
809        self.servo.switch_usbkey('off')
810        time.sleep(self.faft_config.usb_unplug)
811        no_usb_set = set(
812            self.faft_client.system.run_shell_command_get_output(cmd))
813
814        # Make the dut able to see the USB disk.
815        self.servo.switch_usbkey('dut')
816        time.sleep(self.faft_config.usb_plug)
817        has_usb_set = set(
818            self.faft_client.system.run_shell_command_get_output(cmd))
819
820        # Back to its original value.
821        if original_value != self.servo.get_usbkey_state():
822            self.servo.switch_usbkey(original_value)
823
824        diff_set = has_usb_set - no_usb_set
825        if len(diff_set) == 1:
826            return diff_set.pop()
827        else:
828            return None
829
830    def _create_faft_lockfile(self):
831        """Creates the FAFT lockfile."""
832        logging.info('Creating FAFT lockfile...')
833        command = 'touch %s' % (self.lockfile)
834        self.faft_client.system.run_shell_command(command)
835
836    def _remove_faft_lockfile(self):
837        """Removes the FAFT lockfile."""
838        logging.info('Removing FAFT lockfile...')
839        command = 'rm -f %s' % (self.lockfile)
840        self.faft_client.system.run_shell_command(command)
841
842    def clear_set_gbb_flags(self, clear_mask, set_mask, reboot=True):
843        """Clear and set the GBB flags in the current flashrom.
844
845        @param clear_mask: A mask of flags to be cleared.
846        @param set_mask: A mask of flags to be set.
847        @param reboot: If true, then this method will reboot the DUT if certain
848                       flags are modified.
849        """
850        gbb_flags = self.faft_client.bios.get_gbb_flags()
851        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
852        if new_flags == gbb_flags:
853            logging.info(
854                    'Current GBB flags (0x%x) match desired clear mask '
855                    '(0x%x) and desired set mask (0x%x)', gbb_flags,
856                    clear_mask, set_mask)
857            return
858
859        logging.info('Changing GBB flags from 0x%x to 0x%x.', gbb_flags,
860                     new_flags)
861        if self._backup_gbb_flags is None:
862            self._backup_gbb_flags = gbb_flags
863        self.faft_client.bios.set_gbb_flags(new_flags)
864
865        # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
866        # reboot to get a clear state
867        if reboot and bool((gbb_flags ^ new_flags)
868                           & (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON
869                              | vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
870            self.switcher.mode_aware_reboot()
871
872
873    def _check_capability(self, target, required_cap, suppress_warning):
874        """Check if current platform has required capabilities for the target.
875
876        @param required_cap: A list containing required capabilities.
877        @param suppress_warning: True to suppress any warning messages.
878        @return: True if requirements are met. Otherwise, False.
879        """
880        if not required_cap:
881            return True
882
883        if target not in ['ec', 'cr50']:
884            raise error.TestError('Invalid capability target %r' % target)
885
886        for cap in required_cap:
887            if cap not in getattr(self.faft_config, target + '_capability'):
888                if not suppress_warning:
889                    logging.warning(
890                            'Requires %s capability "%s" to run this '
891                            'test.', target, cap)
892                return False
893
894        return True
895
896
897    def check_ec_capability(self, required_cap=None, suppress_warning=False):
898        """Check if current platform has required EC capabilities.
899
900        @param required_cap: A list containing required EC capabilities. Pass in
901                             None to only check for presence of Chrome EC.
902        @param suppress_warning: True to suppress any warning messages.
903        @return: True if requirements are met. Otherwise, False.
904        """
905        if not self.faft_config.chrome_ec:
906            if not suppress_warning:
907                logging.warning('Requires Chrome EC to run this test.')
908            return False
909        return self._check_capability('ec', required_cap, suppress_warning)
910
911
912    def check_cr50_capability(self, required_cap=None, suppress_warning=False):
913        """Check if current platform has required Cr50 capabilities.
914
915        @param required_cap: A list containing required Cr50 capabilities. Pass
916                             in None to only check for presence of cr50 uart.
917        @param suppress_warning: True to suppress any warning messages.
918        @return: True if requirements are met. Otherwise, False.
919        """
920        if not hasattr(self, 'cr50'):
921            if not suppress_warning:
922                logging.warning('Requires Chrome Cr50 to run this test.')
923            return False
924        return self._check_capability('cr50', required_cap, suppress_warning)
925
926
927    def check_root_part_on_non_recovery(self, part):
928        """Check the partition number of root device and on normal/dev boot.
929
930        @param part: A string of partition number, e.g.'3'.
931        @return: True if the root device matched and on normal/dev boot;
932                 otherwise, False.
933        """
934        return self.checkers.root_part_checker(part) and \
935                self.checkers.crossystem_checker({
936                    'mainfw_type': ('normal', 'developer'),
937                })
938
939    def _join_part(self, dev, part):
940        """Return a concatenated string of device and partition number.
941
942        @param dev: A string of device, e.g.'/dev/sda'.
943        @param part: A string of partition number, e.g.'3'.
944        @return: A concatenated string of device and partition number,
945                 e.g.'/dev/sda3'.
946
947        >>> seq = FirmwareTest()
948        >>> seq._join_part('/dev/sda', '3')
949        '/dev/sda3'
950        >>> seq._join_part('/dev/mmcblk0', '2')
951        '/dev/mmcblk0p2'
952        """
953        if 'mmcblk' in dev:
954            return dev + 'p' + part
955        elif 'nvme' in dev:
956            return dev + 'p' + part
957        else:
958            return dev + part
959
960    def copy_kernel_and_rootfs(self, from_part, to_part):
961        """Copy kernel and rootfs from from_part to to_part.
962
963        @param from_part: A string of partition number to be copied from.
964        @param to_part: A string of partition number to be copied to.
965        """
966        root_dev = self.faft_client.system.get_root_dev()
967        logging.info('Copying kernel from %s to %s. Please wait...',
968                     from_part, to_part)
969        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
970                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
971                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
972        logging.info('Copying rootfs from %s to %s. Please wait...',
973                     from_part, to_part)
974        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
975                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
976                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
977
978    def ensure_kernel_boot(self, part):
979        """Ensure the request kernel boot.
980
981        If not, it duplicates the current kernel to the requested kernel
982        and sets the requested higher priority to ensure it boot.
983
984        @param part: A string of kernel partition number or 'a'/'b'.
985        """
986        if not self.checkers.root_part_checker(part):
987            if self.faft_client.kernel.diff_a_b():
988                self.copy_kernel_and_rootfs(
989                        from_part=self.OTHER_KERNEL_MAP[part],
990                        to_part=part)
991            self.reset_and_prioritize_kernel(part)
992            self.switcher.mode_aware_reboot()
993
994    def ensure_dev_internal_boot(self, original_dev_boot_usb):
995        """Ensure internal device boot in developer mode.
996
997        If not internal device boot, it will try to reboot the device and
998        bypass dev mode to boot into internal device.
999
1000        @param original_dev_boot_usb: Original dev_boot_usb value.
1001        """
1002        self.faft_client.system.set_dev_default_boot()
1003        if self.faft_client.system.is_removable_device_boot():
1004            logging.info('Reboot into internal disk...')
1005            self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
1006            self.switcher.mode_aware_reboot()
1007        self.check_state((self.checkers.dev_boot_usb_checker, False,
1008                          'Device not booted from internal disk properly.'))
1009
1010    def set_hardware_write_protect(self, enable):
1011        """Set hardware write protect pin.
1012
1013        @param enable: True if asserting write protect pin. Otherwise, False.
1014        """
1015        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
1016
1017    def set_ap_write_protect_and_reboot(self, enable):
1018        """Set AP write protect status and reboot to take effect.
1019
1020        @param enable: True if asserting write protect. Otherwise, False.
1021        """
1022        self.set_hardware_write_protect(enable)
1023        if hasattr(self, 'ec'):
1024            self.sync_and_ec_reboot()
1025            self.switcher.wait_for_client()
1026
1027    def run_chromeos_firmwareupdate(self, mode, append=None, options=(),
1028            ignore_status=False):
1029        """Use RPC to get the command to run, but do the actual run via ssh.
1030
1031        Running the command via SSH improves the reliability in cases where the
1032        USB network connection gets interrupted.  SSH will still return the
1033        output, and won't hang like RPC would.
1034        """
1035        update_cmd = self.faft_client.updater.get_firmwareupdate_command(
1036                mode, append, options)
1037        try:
1038            result = self._client.run(
1039                    update_cmd, timeout=300, ignore_status=ignore_status)
1040            if result.exit_status == 255:
1041                self.faft_client.disconnect()
1042            return result
1043        except error.AutoservRunError as e:
1044            if e.result_obj.exit_status == 255:
1045                self.faft_client.disconnect()
1046            if ignore_status:
1047                return e.result_obj
1048            raise
1049
1050    def set_ec_write_protect_and_reboot(self, enable):
1051        """Set EC write protect status and reboot to take effect.
1052
1053        The write protect state is only activated if both hardware write
1054        protect pin is asserted and software write protect flag is set.
1055        This method asserts/deasserts hardware write protect pin first, and
1056        set corresponding EC software write protect flag.
1057
1058        If the device uses non-Chrome EC, set the software write protect via
1059        flashrom.
1060
1061        If the device uses Chrome EC, a reboot is required for write protect
1062        to take effect. Since the software write protect flag cannot be unset
1063        if hardware write protect pin is asserted, we need to deasserted the
1064        pin first if we are deactivating write protect. Similarly, a reboot
1065        is required before we can modify the software flag.
1066
1067        @param enable: True if activating EC write protect. Otherwise, False.
1068        """
1069        self.set_hardware_write_protect(enable)
1070        if self.faft_config.chrome_ec:
1071            self.set_chrome_ec_write_protect_and_reboot(enable)
1072        else:
1073            self.faft_client.ec.set_write_protect(enable)
1074            self.switcher.mode_aware_reboot()
1075
1076    def set_chrome_ec_write_protect_and_reboot(self, enable):
1077        """Set Chrome EC write protect status and reboot to take effect.
1078
1079        @param enable: True if activating EC write protect. Otherwise, False.
1080        """
1081        if enable:
1082            # Set write protect flag and reboot to take effect.
1083            self.ec.set_flash_write_protect(enable)
1084            self.sync_and_ec_reboot(
1085                    flags='hard',
1086                    extra_sleep=self.faft_config.ec_boot_to_wp_en)
1087        else:
1088            # Reboot after deasserting hardware write protect pin to deactivate
1089            # write protect. And then remove software write protect flag.
1090            # Some ITE ECs can only clear their WP status on a power-on reset,
1091            # no software-initiated reset will do.
1092            self.sync_and_ec_reboot(flags='cold')
1093            self.ec.set_flash_write_protect(enable)
1094
1095    def _setup_ec_write_protect(self, ec_wp):
1096        """Setup for EC write-protection.
1097
1098        It makes sure the EC in the requested write-protection state. If not, it
1099        flips the state. Flipping the write-protection requires DUT reboot.
1100
1101        @param ec_wp: True to request EC write-protected; False to request EC
1102                      not write-protected; None to do nothing.
1103        """
1104        if ec_wp is None:
1105            return
1106        self._old_wpsw_cur = self.checkers.crossystem_checker(
1107                                    {'wpsw_cur': '1'}, suppress_logging=True)
1108        if ec_wp != self._old_wpsw_cur:
1109            if not self.faft_config.ap_access_ec_flash:
1110                raise error.TestNAError(
1111                        "Cannot change EC write-protect for this device")
1112
1113            logging.info('The test required EC is %swrite-protected. Reboot '
1114                         'and flip the state.', '' if ec_wp else 'not ')
1115            self.switcher.mode_aware_reboot(
1116                    'custom',
1117                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
1118        wpsw_cur = '1' if ec_wp else '0'
1119        self.check_state((self.checkers.crossystem_checker, {
1120                               'wpsw_cur': wpsw_cur}))
1121
1122    def _restore_ec_write_protect(self):
1123        """Restore the original EC write-protection."""
1124        if (not hasattr(self, '_old_wpsw_cur')) or (self._old_wpsw_cur is
1125                                                    None):
1126            return
1127        if not self.checkers.crossystem_checker({'wpsw_cur': '1' if
1128                       self._old_wpsw_cur else '0'}, suppress_logging=True):
1129            logging.info('Restore original EC write protection and reboot.')
1130            self.switcher.mode_aware_reboot(
1131                    'custom',
1132                    lambda:self.set_ec_write_protect_and_reboot(
1133                            self._old_wpsw_cur))
1134        self.check_state((self.checkers.crossystem_checker, {
1135                          'wpsw_cur': '1' if self._old_wpsw_cur else '0'}))
1136
1137    def _record_uart_capture(self):
1138        """Record the CPU/EC/PD UART output stream to files."""
1139        self.servo.record_uart_capture(self.resultsdir)
1140
1141    def _cleanup_uart_capture(self):
1142        """Cleanup the CPU/EC/PD UART capture."""
1143        self.servo.close(self.resultsdir)
1144
1145    def set_ap_off_power_mode(self, power_mode):
1146        """
1147        Set the DUT power mode to suspend (S0ix/S3) or shutdown (G3/S5).
1148        The DUT must be in S0 when calling this method.
1149
1150        @param power_mode: a string for the expected power mode, either
1151                           'suspend' or 'shutdown'.
1152        """
1153        if power_mode == 'suspend':
1154            target_power_state = self.POWER_STATE_SUSPEND
1155        elif power_mode == 'shutdown':
1156            target_power_state = self.POWER_STATE_G3
1157        else:
1158            raise error.TestError('%s is not a valid ap-off power mode.' %
1159                                  power_mode)
1160
1161        if self.get_power_state() != self.POWER_STATE_S0:
1162            raise error.TestError('The DUT is not in S0.')
1163
1164        self._restore_power_mode = True
1165
1166        if target_power_state == self.POWER_STATE_G3:
1167            self.run_shutdown_cmd()
1168            time.sleep(self.faft_config.shutdown)
1169        elif target_power_state == self.POWER_STATE_SUSPEND:
1170            self.suspend()
1171
1172        if self.wait_power_state(target_power_state, self.DEFAULT_PWR_RETRIES):
1173            logging.info('System entered %s state.', target_power_state)
1174        else:
1175            self._restore_power_mode = False
1176            raise error.TestFail('System fail to enter %s state. '
1177                                 'Current state: %s' %
1178                                 (target_power_state, self.get_power_state()))
1179
1180    def restore_ap_on_power_mode(self):
1181        """
1182        Wake up the DUT to S0. If the DUT was not set to suspend or
1183        shutdown mode by set_ap_off_power_mode(), raise an error.
1184        """
1185        if self.get_power_state() != self.POWER_STATE_S0:
1186            logging.info('Wake up the DUT to S0.')
1187            self.servo.power_normal_press()
1188            # If the DUT is ping-able, it must be in S0.
1189            self.switcher.wait_for_client()
1190            if self._restore_power_mode != True:
1191                raise error.TestFail('The DUT was not set to suspend/shutdown '
1192                        'mode by set_ap_off_power_mode().')
1193            self._restore_power_mode = False
1194
1195    def get_power_state(self):
1196        """
1197        Return the current power state of the AP (via EC 'powerinfo' command)
1198
1199        @return the name of the power state, or None if a problem occurred
1200        """
1201        if not hasattr(self, 'ec'):
1202            # Don't fail when EC not present or not fully initialized
1203            return None
1204
1205        pattern = r'power state (\w+) = (\w+),'
1206
1207        try:
1208            match = self.ec.send_command_get_output("powerinfo", [pattern],
1209                                                    retries=3)
1210        except (error.TestFail, expat.ExpatError) as err:
1211            logging.warning("powerinfo command encountered an error: %s", err)
1212            return None
1213        if not match:
1214            logging.warning("powerinfo output did not match pattern: %r",
1215                            pattern)
1216            return None
1217        (line, state_num, state_name) = match[0]
1218        logging.debug("power state info %r", match)
1219        return state_name
1220
1221    def _check_power_state(self, expected_power_state, actual_power_state):
1222        """
1223        Check for correct power state of the AP (via EC 'powerinfo' command)
1224
1225        @param expected_power_state: full-string regex of power state you are
1226        expecting
1227        @param actual_power_state: the power state returned from get_power_state
1228        @return: the line and the match, if the output matched.
1229        @raise error.TestFail: if output didn't match after the delay.
1230        """
1231        if not isinstance(expected_power_state, six.string_types):
1232            raise error.TestError('%s is not a string while it should be.' %
1233                                  expected_power_state)
1234        if not isinstance(actual_power_state, six.string_types):
1235            raise error.TestError('%s is not a string while it should be.' %
1236                                  actual_power_state)
1237        if re.match('^' + expected_power_state + '$', actual_power_state):
1238            return True
1239        return False
1240
1241    def wait_power_state(self, power_state, retries, retry_delay=3):
1242        """
1243        Wait for certain power state.
1244
1245        @param power_state: full-string regex of power state you are expecting
1246        @param retries: retries.  This is necessary if AP is powering down
1247        and transitioning through different states.
1248        @param retry_delay: delay between retries in seconds
1249        """
1250        logging.info('Checking power state "%s" maximum %d times.',
1251                     power_state, retries)
1252
1253        last_power_state = ''
1254        while retries > 0:
1255            logging.debug("try count: %d", retries)
1256            start_time = time.time()
1257            try:
1258                retries = retries - 1
1259                actual_power_state = self.get_power_state()
1260                if last_power_state != actual_power_state:
1261                    logging.info("power state: %s", actual_power_state)
1262                if actual_power_state is None:
1263                    continue
1264                if self._check_power_state(power_state, actual_power_state):
1265                    return True
1266                last_power_state = actual_power_state
1267            except (error.TestFail, expat.ExpatError):
1268                pass
1269            delay_time = retry_delay - time.time() + start_time
1270            if delay_time > 0:
1271                time.sleep(delay_time)
1272        return False
1273
1274    def run_shutdown_cmd(self, wait_for_offline=True):
1275        """Shut down the DUT by running '/sbin/shutdown -P now'."""
1276        self.faft_client.disconnect()
1277        # Shut down in the background after sleeping so the call gets a reply.
1278        try:
1279            self._client.run_background('sleep 0.5; /sbin/shutdown -P now')
1280        except error.AutoservRunError as e:
1281            # From the ssh man page, error code 255 indicates ssh errors.
1282            if e.result_obj.exit_status == 255:
1283                logging.warning("Ignoring error from ssh: %s", e)
1284            else:
1285                raise
1286        if wait_for_offline:
1287            self.switcher.wait_for_client_offline()
1288        self._client.close_main_ssh()
1289
1290    def suspend(self):
1291        """Suspends the DUT."""
1292        cmd = 'sleep %d; powerd_dbus_suspend' % self.EC_SUSPEND_DELAY
1293        block = False
1294        self.faft_client.system.run_shell_command(cmd, block)
1295        time.sleep(self.EC_SUSPEND_DELAY)
1296
1297    def _setup_gbb_flags(self):
1298        """Setup the GBB flags for FAFT test."""
1299        if self.check_setup_done('gbb_flags'):
1300            return
1301
1302        logging.info('Setting proper GBB flags for test.')
1303        # Ensure that GBB flags are set to 0x140.
1304        flags_to_set = (vboot.GBB_FLAG_RUNNING_FAFT |
1305                        vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM)
1306        # And if the "no_ec_sync" argument is set, then disable EC software
1307        # sync.
1308        if self._no_ec_sync:
1309            logging.info(
1310                    'User selected to disable EC software sync')
1311            flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC
1312
1313        # And if the "no_fw_rollback_check" argument is set, then disable fw
1314        # rollback check.
1315        if self._no_fw_rollback_check:
1316            logging.info(
1317                    'User selected to disable FW rollback check')
1318            flags_to_set |= vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK
1319
1320        self.clear_set_gbb_flags(0xffffffff, flags_to_set)
1321        self.mark_setup_done('gbb_flags')
1322
1323    def _restore_gbb_flags(self):
1324        """Restore GBB flags to their original state."""
1325        if self._backup_gbb_flags is None:
1326            return
1327        # Setting up and restoring the GBB flags take a lot of time. For
1328        # speed-up purpose, don't restore it.
1329        logging.info('***')
1330        logging.info('*** Please manually restore the original GBB flags to: '
1331                     '0x%x ***', self._backup_gbb_flags)
1332        logging.info('***')
1333        self.unmark_setup_done('gbb_flags')
1334
1335    def setup_tried_fwb(self, tried_fwb):
1336        """Setup for fw B tried state.
1337
1338        It makes sure the system in the requested fw B tried state. If not, it
1339        tries to do so.
1340
1341        @param tried_fwb: True if requested in tried_fwb=1;
1342                          False if tried_fwb=0.
1343        """
1344        if tried_fwb:
1345            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
1346                logging.info(
1347                    'Firmware is not booted with tried_fwb. Reboot into it.')
1348                self.faft_client.system.set_try_fw_b()
1349        else:
1350            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
1351                logging.info(
1352                    'Firmware is booted with tried_fwb. Reboot to clear.')
1353
1354    def power_on(self):
1355        """Switch DUT AC power on."""
1356        self._client.power_on(self.power_control)
1357
1358    def power_off(self):
1359        """Switch DUT AC power off."""
1360        self._client.power_off(self.power_control)
1361
1362    def power_cycle(self):
1363        """Power cycle DUT AC power."""
1364        self._client.power_cycle(self.power_control)
1365
1366    def setup_rw_boot(self, section='a'):
1367        """Make sure firmware is in RW-boot mode.
1368
1369        If the given firmware section is in RO-boot mode, turn off the RO-boot
1370        flag and reboot DUT into RW-boot mode.
1371
1372        @param section: A firmware section, either 'a' or 'b'.
1373        """
1374        flags = self.faft_client.bios.get_preamble_flags(section)
1375        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
1376            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
1377            self.faft_client.bios.set_preamble_flags(section, flags)
1378            self.switcher.mode_aware_reboot()
1379
1380    def setup_kernel(self, part):
1381        """Setup for kernel test.
1382
1383        It makes sure both kernel A and B bootable and the current boot is
1384        the requested kernel part.
1385
1386        @param part: A string of kernel partition number or 'a'/'b'.
1387        """
1388        self.ensure_kernel_boot(part)
1389        logging.info('Checking the integrity of kernel B and rootfs B...')
1390        if (self.faft_client.kernel.diff_a_b() or
1391                not self.faft_client.rootfs.verify_rootfs('B')):
1392            logging.info('Copying kernel and rootfs from A to B...')
1393            self.copy_kernel_and_rootfs(from_part=part,
1394                                        to_part=self.OTHER_KERNEL_MAP[part])
1395        self.reset_and_prioritize_kernel(part)
1396
1397    def reset_and_prioritize_kernel(self, part):
1398        """Make the requested partition highest priority.
1399
1400        This function also reset kerenl A and B to bootable.
1401
1402        @param part: A string of partition number to be prioritized.
1403        """
1404        root_dev = self.faft_client.system.get_root_dev()
1405        # Reset kernel A and B to bootable.
1406        self.faft_client.system.run_shell_command(
1407            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
1408        self.faft_client.system.run_shell_command(
1409            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
1410        # Set kernel part highest priority.
1411        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
1412                (self.KERNEL_MAP[part], root_dev))
1413
1414    def do_blocking_sync(self, device):
1415        """Run a blocking sync command."""
1416        logging.debug("Blocking sync for %s", device)
1417
1418        if 'mmcblk' in device:
1419            # For mmc devices, use `mmc status get` command to send an
1420            # empty command to wait for the disk to be available again.
1421            self.faft_client.system.run_shell_command('mmc status get %s' %
1422                                                      device)
1423        elif 'nvme' in device:
1424            # For NVMe devices, use `nvme flush` command to commit data
1425            # and metadata to non-volatile media.
1426
1427            # Get a list of NVMe namespaces, and flush them individually.
1428            # The output is assumed to be in the following format:
1429            # [ 0]:0x1
1430            # [ 1]:0x2
1431            list_ns_cmd = "nvme list-ns %s" % device
1432            available_ns = self.faft_client.system.run_shell_command_get_output(
1433                list_ns_cmd)
1434
1435            if not available_ns:
1436                raise error.TestError(
1437                    "Listing namespaces failed (empty output): %s"
1438                    % list_ns_cmd)
1439
1440            for ns in available_ns:
1441                ns = ns.split(':')[-1]
1442                flush_cmd = 'nvme flush %s -n %s' % (device, ns)
1443                flush_rc = self.faft_client.system.run_shell_command_get_status(
1444                    flush_cmd)
1445                if flush_rc != 0:
1446                    raise error.TestError(
1447                        "Flushing namespace %s failed (rc=%s): %s"
1448                        % (ns, flush_rc, flush_cmd))
1449        else:
1450            # For other devices, hdparm sends TUR to check if
1451            # a device is ready for transfer operation.
1452            self.faft_client.system.run_shell_command('hdparm -f %s' % device)
1453
1454    def blocking_sync(self, freeze_for_reset=False):
1455        """Sync root device and internal device, via script if possible.
1456
1457        The actual calls end up logged by the run() call, since they're printed
1458        to stdout/stderr in the script.
1459
1460        @param freeze_for_reset: if True, prepare for reset by blocking writes
1461                                 (only if enable_fs_sync_fsfreeze=True)
1462        """
1463
1464        if self._use_sync_script:
1465            if freeze_for_reset:
1466                self.faft_client.quit()
1467            try:
1468                return self._client.blocking_sync(freeze_for_reset)
1469            except (AttributeError, ImportError, error.AutoservRunError) as e:
1470                logging.warning(
1471                        'Falling back to old sync method due to error: %s', e)
1472
1473        # The double calls to sync fakes a blocking call
1474        # since the first call returns before the flush
1475        # is complete, but the second will wait for the
1476        # first to finish.
1477        self.faft_client.system.run_shell_command('sync')
1478        self.faft_client.system.run_shell_command('sync')
1479
1480        # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
1481        # This function will perform a device-specific sync command.
1482        root_dev = self.faft_client.system.get_root_dev()
1483        self.do_blocking_sync(root_dev)
1484
1485        # Also sync the internal device if booted from removable media.
1486        if self.faft_client.system.is_removable_device_boot():
1487            internal_dev = self.faft_client.system.get_internal_device()
1488            self.do_blocking_sync(internal_dev)
1489
1490    def sync_and_ec_reboot(self, flags='', extra_sleep=0):
1491        """Request the client sync and do a EC triggered reboot.
1492
1493        @param flags: Optional, a space-separated string of flags passed to EC
1494                      reboot command, including:
1495                          default: EC soft reboot;
1496                          'hard': EC hard reboot.
1497                          'cold': Cold reboot via servo.
1498        @param extra_sleep: Optional, int or float for extra wait time for EC
1499                            reboot in seconds.
1500        """
1501        self.blocking_sync(freeze_for_reset=True)
1502        if flags == 'cold':
1503            self.servo.get_power_state_controller().reset()
1504        else:
1505            self.ec.reboot(flags)
1506        time.sleep(self.faft_config.ec_boot_to_console + extra_sleep)
1507        self.check_lid_and_power_on()
1508
1509    def reboot_and_reset_tpm(self):
1510        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
1511        self.switcher.reboot_to_mode(to_mode='rec')
1512        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
1513        self.switcher.mode_aware_reboot()
1514
1515    def full_power_off_and_on(self):
1516        """Shutdown the device by pressing power button and power on again."""
1517        boot_id = self.get_bootid()
1518        self.faft_client.disconnect()
1519
1520        # Press power button to trigger ChromeOS normal shutdown process.
1521        # We use a customized delay since the normal-press 1.2s is not enough.
1522        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
1523        # device can take 44-51 seconds to restart,
1524        # add buffer from the default timeout of 60 seconds.
1525        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
1526        time.sleep(self.faft_config.shutdown)
1527        if self.faft_config.chrome_ec:
1528            self.check_shutdown_power_state(self.POWER_STATE_G3,
1529                                            orig_boot_id=boot_id)
1530        # Short press power button to boot DUT again.
1531        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1532
1533    def check_shutdown_power_state(self, power_state,
1534                                   pwr_retries=DEFAULT_PWR_RETRIES,
1535                                   orig_boot_id=None):
1536        """Check whether the device shut down and entered the given power state.
1537
1538        If orig_boot_id is specified, it will check whether the DUT responds to
1539        ssh requests, then use orig_boot_id to check if it rebooted.
1540
1541        @param power_state: EC power state has to be checked. Either S5 or G3.
1542        @param pwr_retries: Times to check if the DUT in expected power state.
1543        @param orig_boot_id: Old boot_id, to check for unexpected reboots.
1544        @raise TestFail: If device failed to enter into requested power state.
1545        """
1546        if not self.wait_power_state(power_state, pwr_retries):
1547            current_state = self.get_power_state()
1548            if current_state == self.POWER_STATE_S0 and self._client.wait_up():
1549                # DUT is unexpectedly up, so check whether it rebooted instead.
1550                new_boot_id = self.get_bootid()
1551                logging.debug('orig_boot_id=%s, new_boot_id=%s',
1552                              orig_boot_id, new_boot_id)
1553                if orig_boot_id is None or new_boot_id is None:
1554                    # Can't say anything more specific without values to compare
1555                    raise error.TestFail(
1556                            "Expected state %s, but the system is unexpectedly"
1557                            " still up.  Current state: %s"
1558                            % (power_state, current_state))
1559                if new_boot_id == orig_boot_id:
1560                    raise error.TestFail(
1561                            "Expected state %s, but the system didn't shut"
1562                            " down.  Current state: %s"
1563                            % (power_state, current_state))
1564                else:
1565                    raise error.TestFail(
1566                            "Expected state %s, but the system rebooted instead"
1567                            " of shutting down.  Current state: %s"
1568                            % (power_state, current_state))
1569
1570            if current_state is None:
1571                current_state = '(unknown)'
1572
1573            if current_state == power_state:
1574                raise error.TestFail(
1575                        "Expected state %s, but the system didn't reach it"
1576                        " until after the limit of %s tries."
1577                        % (power_state, pwr_retries))
1578
1579            raise error.TestFail('System not shutdown properly and EC fails'
1580                                 ' to enter into %s state.  Current state: %s'
1581                                 % (power_state, current_state))
1582        logging.info('System entered into %s state..', power_state)
1583
1584    def check_lid_and_power_on(self):
1585        """
1586        On devices with EC software sync, system powers on after EC reboots if
1587        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
1588        This method checks lid switch state and presses power button if
1589        necessary.
1590        """
1591        if self.servo.get("lid_open") == "no":
1592            time.sleep(self.faft_config.software_sync)
1593            self.servo.power_short_press()
1594
1595    def stop_powerd(self):
1596        """Stop the powerd daemon on the AP.
1597
1598        This will cause the AP to ignore power button presses sent by the EC.
1599        """
1600        powerd_running = self.faft_client.system.run_shell_command_check_output(
1601                'status powerd', 'start/running')
1602        if powerd_running:
1603            logging.debug('Stopping powerd')
1604            self.faft_client.system.run_shell_command("stop powerd")
1605
1606    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
1607        """Modify the kernel header magic in USB stick.
1608
1609        The kernel header magic is the first 8-byte of kernel partition.
1610        We modify it to make it fail on kernel verification check.
1611
1612        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1613        @param from_magic: A string of magic which we change it from.
1614        @param to_magic: A string of magic which we change it to.
1615        @raise TestError: if failed to change magic.
1616        """
1617        assert len(from_magic) == 8
1618        assert len(to_magic) == 8
1619        # USB image only contains one kernel.
1620        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1621        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1622        current_magic = self.servo.system_output(read_cmd)
1623        if current_magic == to_magic:
1624            logging.info("The kernel magic is already %s.", current_magic)
1625            return
1626        if current_magic != from_magic:
1627            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1628
1629        logging.info('Modify the kernel magic in USB, from %s to %s.',
1630                     from_magic, to_magic)
1631        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1632                     " 2>/dev/null" % (to_magic, kernel_part))
1633        self.servo.system(write_cmd)
1634
1635        if self.servo.system_output(read_cmd) != to_magic:
1636            raise error.TestError("Failed to write new magic.")
1637
1638    def corrupt_usb_kernel(self, usb_dev):
1639        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1640
1641        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1642        """
1643        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1644                                self.CORRUPTED_MAGIC)
1645
1646    def restore_usb_kernel(self, usb_dev):
1647        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1648
1649        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1650        """
1651        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1652                                self.CHROMEOS_MAGIC)
1653
1654    def _call_action(self, action_tuple, check_status=False):
1655        """Call the action function with/without arguments.
1656
1657        @param action_tuple: A function, or a tuple (function, args, error_msg),
1658                             in which, args and error_msg are optional. args is
1659                             either a value or a tuple if multiple arguments.
1660                             This can also be a list containing multiple
1661                             function or tuple. In this case, these actions are
1662                             called in sequence.
1663        @param check_status: Check the return value of action function. If not
1664                             succeed, raises a TestFail exception.
1665        @return: The result value of the action function.
1666        @raise TestError: An error when the action function is not callable.
1667        @raise TestFail: When check_status=True, action function not succeed.
1668        """
1669        if isinstance(action_tuple, list):
1670            return all([self._call_action(action, check_status=check_status)
1671                        for action in action_tuple])
1672
1673        action = action_tuple
1674        args = ()
1675        error_msg = 'Not succeed'
1676        if isinstance(action_tuple, tuple):
1677            action = action_tuple[0]
1678            if len(action_tuple) >= 2:
1679                args = action_tuple[1]
1680                if not isinstance(args, tuple):
1681                    args = (args,)
1682            if len(action_tuple) >= 3:
1683                error_msg = action_tuple[2]
1684
1685        if action is None:
1686            return
1687
1688        if not callable(action):
1689            raise error.TestError('action is not callable!')
1690
1691        info_msg = 'calling %s' % action.__name__
1692        if args:
1693            info_msg += ' with args %s' % str(args)
1694        logging.info(info_msg)
1695        ret = action(*args)
1696
1697        if check_status and not ret:
1698            raise error.TestFail('%s: %s returning %s' %
1699                                 (error_msg, info_msg, str(ret)))
1700        return ret
1701
1702    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1703                             run_power_action=True, post_power_action=None,
1704                             shutdown_timeout=None):
1705        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1706
1707        @param shutdown_action: function which makes DUT shutdown, like
1708                                pressing power key.
1709        @param pre_power_action: function which is called before next power on.
1710        @param run_power_action: power_key press by default, set to None to skip.
1711        @param post_power_action: function which is called after next power on.
1712        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1713        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1714        """
1715        self._call_action(shutdown_action)
1716        logging.info('Wait to ensure DUT shut down...')
1717        try:
1718            if shutdown_timeout is None:
1719                shutdown_timeout = self.faft_config.shutdown_timeout
1720            self.switcher.wait_for_client(timeout=shutdown_timeout)
1721            raise error.TestFail(
1722                    'Should shut the device down after calling %s.' %
1723                    shutdown_action.__name__)
1724        except ConnectionError:
1725            if self.faft_config.chrome_ec:
1726                self.check_shutdown_power_state(self.POWER_STATE_G3)
1727            logging.info(
1728                'DUT is surely shutdown. We are going to power it on again...')
1729
1730        if pre_power_action:
1731            self._call_action(pre_power_action)
1732        if run_power_action:
1733            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1734        if post_power_action:
1735            self._call_action(post_power_action)
1736
1737    def get_bootid(self, retry=3):
1738        """
1739        Return the bootid.
1740        """
1741        boot_id = None
1742        while retry:
1743            try:
1744                boot_id = self._client.get_boot_id()
1745                break
1746            except error.AutoservRunError:
1747                retry -= 1
1748                if retry:
1749                    logging.debug('Retry to get boot_id...')
1750                else:
1751                    logging.warning('Failed to get boot_id.')
1752        logging.debug('boot_id: %s', boot_id)
1753        return boot_id
1754
1755    def check_state(self, func):
1756        """
1757        Wrapper around _call_action with check_status set to True. This is a
1758        helper function to be used by tests and is currently implemented by
1759        calling _call_action with check_status=True.
1760
1761        TODO: This function's arguments need to be made more stringent. And
1762        its functionality should be moved over to check functions directly in
1763        the future.
1764
1765        @param func: A function, or a tuple (function, args, error_msg),
1766                             in which, args and error_msg are optional. args is
1767                             either a value or a tuple if multiple arguments.
1768                             This can also be a list containing multiple
1769                             function or tuple. In this case, these actions are
1770                             called in sequence.
1771        @return: The result value of the action function.
1772        @raise TestFail: If the function does notsucceed.
1773        """
1774        self._call_action(func, check_status=True)
1775
1776    def get_current_firmware_identity(self):
1777        """Get current firmware sha and fwids of body and vblock.
1778
1779        @return: Current firmware checksums and fwids, as a dict
1780        """
1781
1782        current_checksums = {
1783            'VBOOTA': self.faft_client.bios.get_sig_sha('a'),
1784            'FVMAINA': self.faft_client.bios.get_body_sha('a'),
1785            'VBOOTB': self.faft_client.bios.get_sig_sha('b'),
1786            'FVMAINB': self.faft_client.bios.get_body_sha('b'),
1787        }
1788        if not all(current_checksums.values()):
1789            raise error.TestError(
1790                    'Failed to get firmware sha: %s', current_checksums)
1791
1792        current_fwids = {
1793            'RO_FRID': self.faft_client.bios.get_section_fwid('ro'),
1794            'RW_FWID_A': self.faft_client.bios.get_section_fwid('a'),
1795            'RW_FWID_B': self.faft_client.bios.get_section_fwid('b'),
1796        }
1797        if not all(current_fwids.values()):
1798            raise error.TestError(
1799                    'Failed to get firmware fwid(s): %s', current_fwids)
1800
1801        identifying_info = dict(current_fwids)
1802        identifying_info.update(current_checksums)
1803        return identifying_info
1804
1805    def is_firmware_changed(self):
1806        """Check if the current firmware changed, by comparing its SHA and fwid.
1807
1808        @return: True if it is changed, otherwise False.
1809        """
1810        # Device may not be rebooted after test.
1811        self.faft_client.bios.reload()
1812
1813        current_info = self.get_current_firmware_identity()
1814        prev_info = self._backup_firmware_identity
1815
1816        if current_info == prev_info:
1817            return False
1818        else:
1819            changed = set()
1820            for section in set(current_info.keys()) | set(prev_info.keys()):
1821                if current_info.get(section) != prev_info.get(section):
1822                    changed.add(section)
1823
1824            logging.info('Firmware changed: %s', ', '.join(sorted(changed)))
1825            return True
1826
1827    def backup_firmware(self, suffix='.original'):
1828        """Backup firmware to file, and then send it to host.
1829
1830        @param suffix: a string appended to backup file name
1831        """
1832        remote_temp_dir = self.faft_client.system.create_temp_dir()
1833        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1834        self.faft_client.bios.dump_whole(remote_bios_path)
1835        self._client.get_file(remote_bios_path,
1836                              os.path.join(self.resultsdir, 'bios' + suffix))
1837
1838        if self.faft_config.chrome_ec:
1839            remote_ec_path = os.path.join(remote_temp_dir, 'ec')
1840            self.faft_client.ec.dump_whole(remote_ec_path)
1841            self._client.get_file(remote_ec_path,
1842                              os.path.join(self.resultsdir, 'ec' + suffix))
1843
1844        self._client.run('rm -rf %s' % remote_temp_dir)
1845        logging.info('Backup firmware stored in %s with suffix %s',
1846            self.resultsdir, suffix)
1847
1848        self._backup_firmware_identity = self.get_current_firmware_identity()
1849
1850    def is_firmware_saved(self):
1851        """Check if a firmware saved (called backup_firmware before).
1852
1853        @return: True if the firmware is backed up; otherwise False.
1854        """
1855        return bool(self._backup_firmware_identity)
1856
1857    def restore_firmware(self, suffix='.original', restore_ec=True,
1858                         reboot_ec=False):
1859        """Restore firmware from host in resultsdir.
1860
1861        @param suffix: a string appended to backup file name
1862        @param restore_ec: True to restore the ec firmware; False not to do.
1863        @param reboot_ec: True to reboot EC after restore (if it was restored)
1864        @return: True if firmware needed to be restored
1865        """
1866        if not self.is_firmware_changed():
1867            return False
1868
1869        # Backup current corrupted firmware.
1870        self.backup_firmware(suffix='.corrupt')
1871
1872        # Restore firmware.
1873        remote_temp_dir = self.faft_client.system.create_temp_dir()
1874
1875        bios_local = os.path.join(self.resultsdir, 'bios%s' % suffix)
1876        bios_remote = os.path.join(remote_temp_dir, 'bios%s' % suffix)
1877        self._client.send_file(bios_local, bios_remote)
1878        self.faft_client.bios.write_whole(bios_remote)
1879
1880        if self.faft_config.chrome_ec and restore_ec:
1881            ec_local = os.path.join(self.resultsdir, 'ec%s' % suffix)
1882            ec_remote = os.path.join(remote_temp_dir, 'ec%s' % suffix)
1883            self._client.send_file(ec_local, ec_remote)
1884            ec_cmd = self.faft_client.ec.get_write_cmd(ec_remote)
1885            try:
1886                self._client.run(ec_cmd, timeout=300)
1887            except error.AutoservSSHTimeout:
1888                logging.warning("DUT connection died during EC restore")
1889                self.faft_client.disconnect()
1890
1891            except error.GenericHostRunError:
1892                logging.warning("DUT command failed during EC restore")
1893                logging.debug("Full exception:", exc_info=True)
1894            if reboot_ec:
1895                self.switcher.mode_aware_reboot(
1896                        'custom', lambda: self.sync_and_ec_reboot('hard'))
1897            else:
1898                self.switcher.mode_aware_reboot()
1899        else:
1900            self.switcher.mode_aware_reboot()
1901        logging.info('Successfully restored firmware.')
1902        return True
1903
1904    def setup_firmwareupdate_shellball(self, shellball=None):
1905        """Setup a shellball to use in firmware update test.
1906
1907        Check if there is a given shellball, and it is a shell script. Then,
1908        send it to the remote host. Otherwise, use the
1909        /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
1910        BIOS and EC images with the active firmware images.
1911
1912        @param shellball: path of a shellball or default to None.
1913        """
1914        if shellball:
1915            # Determine the firmware file is a shellball or a raw binary.
1916            is_shellball = (utils.system_output("file %s" % shellball).find(
1917                    "shell script") != -1)
1918            if is_shellball:
1919                logging.info('Device will update firmware with shellball %s',
1920                             shellball)
1921                temp_path = self.faft_client.updater.get_temp_path()
1922                working_shellball = os.path.join(temp_path,
1923                                                 'chromeos-firmwareupdate')
1924                self._client.send_file(shellball, working_shellball)
1925                self.faft_client.updater.extract_shellball()
1926            else:
1927                raise error.TestFail(
1928                    'The given shellball is not a shell script.')
1929        else:
1930            logging.info('No shellball given, use the original shellball and '
1931                         'replace its BIOS and EC images.')
1932            work_path = self.faft_client.updater.get_work_path()
1933            bios_in_work_path = os.path.join(
1934                work_path, self.faft_client.updater.get_bios_relative_path())
1935            ec_in_work_path = os.path.join(
1936                work_path, self.faft_client.updater.get_ec_relative_path())
1937            logging.info('Writing current BIOS to: %s', bios_in_work_path)
1938            self.faft_client.bios.dump_whole(bios_in_work_path)
1939            if self.faft_config.chrome_ec:
1940                logging.info('Writing current EC to: %s', ec_in_work_path)
1941                self.faft_client.ec.dump_firmware(ec_in_work_path)
1942            self.faft_client.updater.repack_shellball()
1943
1944    def is_kernel_changed(self, kernel_type='KERN'):
1945        """Check if the current kernel is changed, by comparing its SHA1 hash.
1946
1947        @param kernel_type: The type name of kernel ('KERN' or 'MINIOS').
1948        @return: True if it is changed; otherwise, False.
1949        """
1950        changed = False
1951        for p in ('A', 'B'):
1952            backup_sha = self._backup_kernel_sha[kernel_type].get(p, None)
1953            current_sha = self.kernel_servicer[kernel_type].get_sha(p)
1954            if backup_sha != current_sha:
1955                changed = True
1956                logging.info('Kernel %s-%s is changed', kernel_type, p)
1957        return changed
1958
1959    def backup_kernel(self, suffix='.original', kernel_type='KERN'):
1960        """Backup kernel to files, and the send them to host.
1961
1962        @param kernel_type: The type name of kernel ('KERN' or 'MINIOS').
1963        @param suffix: a string appended to backup file name.
1964        """
1965        kernel_name = self.KERNEL_TYPE_NAME_MAP[kernel_type]
1966        servicer = self.kernel_servicer[kernel_type]
1967        remote_temp_dir = self.faft_client.system.create_temp_dir()
1968        for p in ('A', 'B'):
1969            remote_path = os.path.join(remote_temp_dir,
1970                                       '%s_%s' % (kernel_name, p))
1971            servicer.dump(p, remote_path)
1972            self._client.get_file(
1973                    remote_path,
1974                    os.path.join(self.resultsdir,
1975                                 '%s_%s%s' % (kernel_name, p, suffix)))
1976            self._backup_kernel_sha[kernel_type][p] = servicer.get_sha(p)
1977        logging.info('Backup %s stored in %s with suffix %s', kernel_name,
1978                     self.resultsdir, suffix)
1979
1980    def is_kernel_saved(self, kernel_type='KERN'):
1981        """Check if kernel images are saved (backup_kernel called before).
1982
1983        @param kernel_type: The type name of kernel ('KERN' or 'MINIOS').
1984        @return: True if the kernel is saved; otherwise, False.
1985        """
1986        return len(self._backup_kernel_sha[kernel_type]) != 0
1987
1988    def restore_kernel(self, suffix='.original', kernel_type='KERN'):
1989        """Restore kernel from host in resultsdir.
1990
1991        @param kernel_type: The type name of kernel ('KERN' or 'MINIOS').
1992        @param suffix: a string appended to backup file name.
1993        @return: True if kernel needed to be restored
1994        """
1995        if not self.is_kernel_changed(kernel_type):
1996            return False
1997
1998        # Backup current corrupted kernel.
1999        self.backup_kernel(suffix='.corrupt', kernel_type=kernel_type)
2000
2001        # Restore kernel.
2002        kernel_name = self.KERNEL_TYPE_NAME_MAP[kernel_type]
2003        servicer = self.kernel_servicer[kernel_type]
2004        remote_temp_dir = self.faft_client.system.create_temp_dir()
2005        for p in ('A', 'B'):
2006            remote_path = os.path.join(remote_temp_dir,
2007                                       '%s_%s' % (kernel_name, p))
2008            servicer.dump(p, remote_path)
2009            self._client.send_file(
2010                    os.path.join(self.resultsdir,
2011                                 '%s_%s%s' % (kernel_name, p, suffix)),
2012                    remote_path)
2013            servicer.write(p, remote_path)
2014
2015        self.switcher.mode_aware_reboot()
2016        logging.info('Successfully restored %s.', kernel_type)
2017        return True
2018
2019    def backup_cgpt_attributes(self):
2020        """Backup CGPT partition table attributes."""
2021        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
2022
2023    def restore_cgpt_attributes(self):
2024        """Restore CGPT partition table attributes."""
2025        current_table = self.faft_client.cgpt.get_attributes()
2026        if current_table == self._backup_cgpt_attr:
2027            return
2028        logging.info('CGPT table is changed. Original: %r. Current: %r.',
2029                     self._backup_cgpt_attr,
2030                     current_table)
2031        self.faft_client.cgpt.set_attributes(
2032                self._backup_cgpt_attr['A'], self._backup_cgpt_attr['B'])
2033
2034        self.switcher.mode_aware_reboot()
2035        logging.info('Successfully restored CGPT table.')
2036
2037    def try_fwb(self, count=0):
2038        """set to try booting FWB count # times
2039
2040        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
2041        vboot2
2042
2043        @param count: an integer specifying value to program into
2044                      fwb_tries(vb1)/fw_try_next(vb2)
2045        """
2046        if self.fw_vboot2:
2047            self.faft_client.system.set_fw_try_next('B', count)
2048        else:
2049            # vboot1: we need to boot into fwb at least once
2050            if not count:
2051                count = count + 1
2052            self.faft_client.system.set_try_fw_b(count)
2053
2054    def identify_shellball(self, include_ec=None):
2055        """Get the FWIDs of all targets and sections in the shellball
2056
2057        @param include_ec: if True, get EC fwids.
2058                           If None (default), assume True if board has an EC
2059        @return: the dict of versions in the shellball
2060        """
2061        fwids = dict()
2062        fwids['bios'] = self.faft_client.updater.get_image_fwids('bios')
2063
2064        if include_ec is None:
2065            if self.faft_config.platform.lower() == 'samus':
2066                include_ec = False  # no ec.bin in shellball
2067            else:
2068                include_ec = self.faft_config.chrome_ec
2069
2070        if include_ec:
2071            fwids['ec'] = self.faft_client.updater.get_image_fwids('ec')
2072        return fwids
2073
2074    def modify_shellball(self, append, modify_ro=True, modify_ec=False):
2075        """Modify the FWIDs of targets and sections in the shellball
2076
2077        @return: the full path of the shellball
2078        """
2079
2080        if modify_ro:
2081            self.faft_client.updater.modify_image_fwids(
2082                    'bios', ['ro', 'a', 'b'])
2083        else:
2084            self.faft_client.updater.modify_image_fwids(
2085                    'bios', ['a', 'b'])
2086
2087        if modify_ec:
2088            if modify_ro:
2089                self.faft_client.updater.modify_image_fwids(
2090                        'ec', ['ro', 'rw'])
2091            else:
2092                self.faft_client.updater.modify_image_fwids(
2093                        'ec', ['rw'])
2094
2095        modded_shellball = self.faft_client.updater.repack_shellball(append)
2096
2097        return modded_shellball
2098
2099    @staticmethod
2100    def check_fwids_written(before_fwids, image_fwids, after_fwids,
2101                            expected_written):
2102        """Check the dicts of fwids for correctness after an update is applied.
2103
2104        The targets checked come from the keys of expected_written.
2105        The sections checked come from the inner dicts of the fwids parameters.
2106
2107        The fwids should be keyed by target (flash type), then by section:
2108        {'bios': {'ro': '<fwid>', 'a': '<fwid>', 'b': '<fwid>'},
2109         'ec': {'ro': '<fwid>', 'rw': '<fwid>'}
2110
2111        For expected_written, the dict should be keyed by flash type only:
2112        {'bios': ['ro'], 'ec': ['ro', 'rw']}
2113        To expect the contents completely unchanged, give only the keys:
2114        {'bios': [], 'ec': []} or {'bios': None, 'ec': None}
2115
2116        @param before_fwids: dict of versions from before the update
2117        @param image_fwids: dict of versions in the update
2118        @param after_fwids: dict of actual versions after the update
2119        @param expected_written: dict indicating which ones should have changed
2120        @return: list of error lines for mismatches
2121
2122        @type before_fwids: dict
2123        @type image_fwids: dict | None
2124        @type after_fwids: dict
2125        @type expected_written: dict
2126        @rtype: list
2127        """
2128        errors = []
2129
2130        if image_fwids is None:
2131            image_fwids = {}
2132
2133        for target in sorted(expected_written.keys()):
2134            # target is BIOS or EC
2135
2136            before_missing = (target not in before_fwids)
2137            after_missing = (target not in after_fwids)
2138            if before_missing or after_missing:
2139                if before_missing:
2140                    errors.append("...no before_fwids[%s]" % target)
2141                if after_missing:
2142                    errors.append("...no after_fwids[%s]" % target)
2143                continue
2144
2145            written_sections = expected_written.get(target) or list()
2146            written_sections = set(written_sections)
2147
2148            before_sections = set(before_fwids.get(target) or dict())
2149            image_sections = set(image_fwids.get(target) or dict())
2150            after_sections = set(after_fwids.get(target) or dict())
2151
2152            for section in before_sections | image_sections | after_sections:
2153                # section is RO, RW, A, or B
2154
2155                before_fwid = before_fwids[target][section]
2156                image_fwid = image_fwids.get(target, {}).get(section, None)
2157                actual_fwid = after_fwids[target][section]
2158
2159                if section in written_sections:
2160                    expected_fwid = image_fwid
2161                    expected_desc = 'rewritten fwid (%s)' % expected_fwid
2162                    if image_fwid == before_fwid:
2163                        expected_desc = ('rewritten (no changes) fwid (%s)' %
2164                                         expected_fwid)
2165                else:
2166                    expected_fwid = before_fwid
2167                    expected_desc = 'original fwid (%s)' % expected_fwid
2168
2169                if actual_fwid == expected_fwid:
2170                    actual_desc = 'correct value'
2171
2172                elif actual_fwid == image_fwid:
2173                    actual_desc = 'rewritten fwid (%s)' % actual_fwid
2174                    if image_fwid == before_fwid:
2175                        # The flash could have been rewritten with the same fwid
2176                        actual_desc = 'possibly written fwid (%s)' % actual_fwid
2177
2178                elif actual_fwid == before_fwid:
2179                    actual_desc = 'original fwid (%s)' % actual_fwid
2180
2181                else:
2182                    actual_desc = 'unknown fwid (%s)' % actual_fwid
2183
2184                msg = ("...FWID (%s %s): expected %s, got %s" %
2185                       (target.upper(), section.upper(),
2186                        expected_desc, actual_desc))
2187
2188                if actual_fwid != expected_fwid:
2189                    errors.append(msg)
2190        return errors
2191
2192
2193    def fwmp_is_cleared(self):
2194        """Return True if the FWMP has been created"""
2195        res = self.host.run('cryptohome '
2196                            '--action=get_firmware_management_parameters',
2197                            ignore_status=True)
2198        if res.exit_status and res.exit_status != self.FWMP_CLEARED_EXIT_STATUS:
2199            raise error.TestError('Could not run cryptohome command %r' % res)
2200        return (self.FWMP_CLEARED_ERROR_MSG in res.stdout
2201                or tpm_utils.FwmpIsAllZero(res.stdout))
2202
2203
2204    def _tpm_is_owned(self):
2205        """Returns True if the tpm is owned"""
2206        result = self.host.run('tpm_manager_client status --nonsensitive',
2207                               ignore_status=True)
2208        logging.debug(result)
2209        return result.exit_status == 0 and 'is_owned: true' in result.stdout
2210
2211    def clear_fwmp(self):
2212        """Clear the FWMP"""
2213        if self.fwmp_is_cleared():
2214            return
2215        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
2216        self.host.run('tpm_manager_client take_ownership')
2217        if not utils.wait_for_value(self._tpm_is_owned, expected_value=True):
2218            raise error.TestError('Unable to own tpm while clearing fwmp.')
2219        self.host.run('cryptohome '
2220                      '--action=remove_firmware_management_parameters')
2221
2222    def wait_for(self, cfg_field, action_msg=None, extra_time=0):
2223        """Waits for time specified in a config.
2224
2225        @ivar cfg_field: The name of the config field that specifies the
2226                            time to wait.
2227        @ivar action_msg: Optional log message describing the action that
2228                            will occur after the wait.
2229        @ivar extra_time: Additional time to be added to time from config.
2230        """
2231        wait_time = self.faft_config.__getattr__(cfg_field) + extra_time
2232        if extra_time:
2233            wait_src = "%s + %s" % (cfg_field, extra_time)
2234        else:
2235            wait_src = cfg_field
2236
2237        units = 'second' if wait_time==1 else 'seconds'
2238        start_msg = "Waiting %s(%s) %s" % (wait_time, wait_src, units)
2239        if action_msg:
2240            start_msg += ", before '%s'" % action_msg
2241        start_msg += "."
2242
2243        logging.info(start_msg)
2244        time.sleep(wait_time)
2245        logging.info("Done waiting.")
2246
2247    def _try_to_bring_dut_up(self):
2248        """Try to quickly get the dut in a pingable state"""
2249        if not hasattr(self, 'cr50'):
2250            raise error.TestNAError('Test can only be run on devices with '
2251                                    'access to the Cr50 console')
2252        logging.info('Bringing DUT up')
2253
2254        self.servo.set_nocheck('cold_reset', 'off')
2255        self.servo.set_nocheck('warm_reset', 'off')
2256
2257        time.sleep(self.cr50.SHORT_WAIT)
2258        if not self.cr50.ap_is_on():
2259            logging.info('Pressing power button to turn on AP')
2260            self.servo.power_short_press()
2261
2262        end_time = time.time() + self.RESPONSE_TIMEOUT
2263        while not self.host.ping_wait_up(
2264                self.faft_config.delay_reboot_to_ping * 2):
2265            if time.time() > end_time:
2266                logging.warning(
2267                        'DUT is unresponsive after trying to bring it up')
2268                return
2269            self.servo.get_power_state_controller().reset()
2270            logging.info('DUT did not respond. Resetting it.')
2271
2272    def _check_open_and_press_power_button(self):
2273        """Check stdout and press the power button if prompted.
2274
2275        Returns:
2276            True if the process is still running.
2277        """
2278        if not hasattr(self, 'cr50'):
2279            raise error.TestNAError('Test can only be run on devices with '
2280                                    'access to the Cr50 console')
2281
2282        logging.info(self._get_ccd_open_output())
2283        self.servo.power_short_press()
2284        logging.info('long int power button press')
2285        # power button press cr50 erases nvmem and resets the dut before setting
2286        # the state to open. Wait a bit so we don't check the ccd state in the
2287        # middle of this reset process. Power button requests happen once a
2288        # minute, so waiting 10 seconds isn't a big deal.
2289        time.sleep(10)
2290        return (self.cr50.OPEN == self.cr50.get_ccd_level() or
2291                self._ccd_open_job.sp.poll() is not None)
2292
2293    def _get_ccd_open_output(self):
2294        """Read the new output."""
2295        if not hasattr(self, 'cr50'):
2296            raise error.TestNAError('Test can only be run on devices with '
2297                                    'access to the Cr50 console')
2298
2299        self._ccd_open_job.process_output()
2300        output = self._ccd_open_stdout.getvalue()
2301        self._ccd_open_stdout.seek(self._ccd_open_last_len)
2302        self._ccd_open_last_len = len(output)
2303        return self._ccd_open_stdout.read().strip()
2304
2305    def _close_ccd_open_job(self):
2306        """Terminate the process and check the results."""
2307        if not hasattr(self, 'cr50'):
2308            raise error.TestNAError('Test can only be run on devices with '
2309                                    'access to the Cr50 console')
2310
2311        exit_status = utils.nuke_subprocess(self._ccd_open_job.sp)
2312        stdout = self._ccd_open_stdout.getvalue().strip()
2313        delattr(self, '_ccd_open_job')
2314        if stdout:
2315            logging.info('stdout of ccd open:\n%s', stdout)
2316        if exit_status:
2317            logging.info('exit status: %d', exit_status)
2318        if 'Error' in stdout:
2319            raise error.TestFail('ccd open Error %s' %
2320                                 stdout.split('Error')[-1])
2321        if self.cr50.OPEN != self.cr50.get_ccd_level():
2322            raise error.TestFail('unable to open cr50: %s' % stdout)
2323        else:
2324            logging.info('Opened Cr50')
2325
2326    def ccd_open_from_ap(self):
2327        """Start the open process and press the power button."""
2328        if not hasattr(self, 'cr50'):
2329            raise error.TestNAError('Test can only be run on devices with '
2330                                    'access to the Cr50 console')
2331
2332        # Opening CCD requires power button presses. If those presses would
2333        # power off the AP and prevent CCD open from completing, ignore them.
2334        if self.faft_config.ec_forwards_short_pp_press:
2335            self.stop_powerd()
2336
2337        # Make sure the test waits long enough to avoid ccd rate limiting.
2338        time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT)
2339
2340        self._ccd_open_last_len = 0
2341
2342        self._ccd_open_stdout = six.StringIO()
2343
2344        ccd_open_cmd = utils.sh_escape('gsctool -a -o')
2345        full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'),
2346                                    ccd_open_cmd)
2347        # Start running the Cr50 Open process in the background.
2348        self._ccd_open_job = utils.BgJob(full_ssh_cmd,
2349                                         nickname='ccd_open',
2350                                         stdout_tee=self._ccd_open_stdout,
2351                                         stderr_tee=utils.TEE_TO_LOGS)
2352        if self._ccd_open_job == None:
2353            raise error.TestFail('could not start ccd open')
2354
2355        try:
2356            # Wait for the first gsctool power button prompt before starting the
2357            # open process.
2358            logging.info(self._get_ccd_open_output())
2359            # Cr50 starts out by requesting 5 quick presses then 4 longer
2360            # power button presses. Run the quick presses without looking at the
2361            # command output, because getting the output can take some time. For
2362            # the presses that require a 1 minute wait check the output between
2363            # presses, so we can catch errors
2364            #
2365            # run quick presses for 30 seconds. It may take a couple of seconds
2366            # for open to start. 10 seconds should be enough. 30 is just used
2367            # because it will definitely be enough, and this process takes 300
2368            # seconds, so doing quick presses for 30 seconds won't matter.
2369            end_time = time.time() + 30
2370            while time.time() < end_time:
2371                self.servo.power_short_press()
2372                logging.info('short int power button press')
2373                time.sleep(self.PP_SHORT_INTERVAL)
2374            # Poll the output and press the power button for the longer presses.
2375            utils.wait_for_value(self._check_open_and_press_power_button,
2376                                 expected_value=True,
2377                                 timeout_sec=self.cr50.PP_LONG)
2378        except Exception as e:
2379            logging.info(e)
2380            raise
2381        finally:
2382            self._close_ccd_open_job()
2383            self._try_to_bring_dut_up()
2384        logging.info(self.cr50.get_ccd_info())
2385
2386    def enter_mode_after_checking_cr50_state(self, mode):
2387        """Reboot to mode if cr50 doesn't already match the state"""
2388        if not hasattr(self, 'cr50'):
2389            raise error.TestNAError('Test can only be run on devices with '
2390                                    'access to the Cr50 console')
2391
2392        # If the device is already in the correct mode, don't do anything
2393        if (mode == 'dev') == self.cr50.in_dev_mode():
2394            logging.info('already in %r mode', mode)
2395            return
2396
2397        self.switcher.reboot_to_mode(to_mode=mode)
2398
2399        if (mode == 'dev') != self.cr50.in_dev_mode():
2400            raise error.TestError('Unable to enter %r mode' % mode)
2401
2402    def fast_ccd_open(self, enable_testlab=False, reset_ccd=True,
2403                      dev_mode=False):
2404        """Try to use ccd testlab open. If that fails, do regular ap open.
2405
2406        Args:
2407            enable_testlab: If True, enable testlab mode after cr50 is open.
2408            reset_ccd: If True, reset ccd after open.
2409            dev_mode: True if the device should be in dev mode after ccd is
2410                      is opened.
2411        """
2412        if not hasattr(self, 'cr50'):
2413            raise error.TestNAError('Test can only be run on devices with '
2414                                    'access to the Cr50 console')
2415
2416        if self.servo.main_device_is_ccd() and not self.cr50.testlab_is_on():
2417            error_txt = 'because the main servo device is CCD.'
2418            if enable_testlab:
2419                raise error.TestNAError('Cannot enable testlab: %s' % error_txt)
2420            elif reset_ccd:
2421                raise error.TestNAError('CCD reset not allowed: %s' % error_txt)
2422
2423        if not self.faft_config.has_powerbutton:
2424            logging.warning('No power button', exc_info=True)
2425            enable_testlab = False
2426
2427        # Try to use testlab open first, so we don't have to wait for the
2428        # physical presence check.
2429        self.cr50.send_command('ccd testlab open')
2430        if self.cr50.OPEN != self.cr50.get_ccd_level():
2431            if self.servo.has_control('chassis_open'):
2432                self.servo.set('chassis_open', 'yes')
2433            pw = '' if self.cr50.password_is_reset() else self.CCD_PASSWORD
2434            # Use the console to open cr50 without entering dev mode if
2435            # possible. Ittakes longer and relies on more systems to enter dev
2436            # mode and ssh into the AP. Skip the steps that aren't required.
2437            if not (pw or self.cr50.get_cap(
2438                            'OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]):
2439                self.enter_mode_after_checking_cr50_state('dev')
2440
2441            if pw or self.cr50.get_cap(
2442                            'OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]:
2443                self.cr50.set_ccd_level(self.cr50.OPEN, pw)
2444            else:
2445                self.ccd_open_from_ap()
2446
2447            if self.servo.has_control('chassis_open'):
2448                self.servo.set('chassis_open', 'no')
2449
2450            if enable_testlab:
2451                self.cr50.set_ccd_testlab('on')
2452
2453        if reset_ccd:
2454            self.cr50.ccd_reset()
2455
2456        # In default, the device should be in normal mode. After opening cr50,
2457        # the TPM should be cleared and the device should automatically reset to
2458        # normal mode. However, some tests might want the device in 'dev' mode.
2459        self.enter_mode_after_checking_cr50_state('dev' if dev_mode else
2460                                                 'normal')
2461