• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import pprint
10import re
11import time
12import uuid
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.server import test
17from autotest_lib.server.cros import vboot_constants as vboot
18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
20from autotest_lib.server.cros.faft.utils import mode_switcher
21from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
22from autotest_lib.server.cros.servo import chrome_base_ec
23from autotest_lib.server.cros.servo import chrome_cr50
24from autotest_lib.server.cros.servo import chrome_ec
25
26ConnectionError = mode_switcher.ConnectionError
27
28
29class FAFTBase(test.test):
30    """The base class of FAFT classes.
31
32    It launches the FAFTClient on DUT, such that the test can access its
33    firmware functions and interfaces. It also provides some methods to
34    handle the reboot mechanism, in order to ensure FAFTClient is still
35    connected after reboot.
36    """
37    def initialize(self, host):
38        """Create a FAFTClient object and install the dependency."""
39        self.servo = host.servo
40        self.servo.initialize_dut()
41        self._client = host
42        self.faft_client = RPCProxy(host)
43        self.lockfile = '/var/tmp/faft/lock'
44
45
46class FirmwareTest(FAFTBase):
47    """
48    Base class that sets up helper objects/functions for firmware tests.
49
50    TODO: add documentaion as the FAFT rework progresses.
51    """
52    version = 1
53
54    # Mapping of partition number of kernel and rootfs.
55    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
56    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
57    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
58    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
59
60    CHROMEOS_MAGIC = "CHROMEOS"
61    CORRUPTED_MAGIC = "CORRUPTD"
62
63    # Delay for waiting client to return before EC suspend
64    EC_SUSPEND_DELAY = 5
65
66    # Delay between EC suspend and wake
67    WAKE_DELAY = 10
68
69    # Delay between closing and opening lid
70    LID_DELAY = 1
71
72    _SERVOD_LOG = '/var/log/servod.log'
73
74    _ROOTFS_PARTITION_NUMBER = 3
75
76    _backup_firmware_sha = ()
77    _backup_kernel_sha = dict()
78    _backup_cgpt_attr = dict()
79    _backup_gbb_flags = None
80    _backup_dev_mode = None
81
82    # Class level variable, keep track the states of one time setup.
83    # This variable is preserved across tests which inherit this class.
84    _global_setup_done = {
85        'gbb_flags': False,
86        'reimage': False,
87        'usb_check': False,
88    }
89
90    @classmethod
91    def check_setup_done(cls, label):
92        """Check if the given setup is done.
93
94        @param label: The label of the setup.
95        """
96        return cls._global_setup_done[label]
97
98    @classmethod
99    def mark_setup_done(cls, label):
100        """Mark the given setup done.
101
102        @param label: The label of the setup.
103        """
104        cls._global_setup_done[label] = True
105
106    @classmethod
107    def unmark_setup_done(cls, label):
108        """Mark the given setup not done.
109
110        @param label: The label of the setup.
111        """
112        cls._global_setup_done[label] = False
113
114    def initialize(self, host, cmdline_args, ec_wp=None):
115        super(FirmwareTest, self).initialize(host)
116        self.run_id = str(uuid.uuid4())
117        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
118        # Parse arguments from command line
119        args = {}
120        self.power_control = host.POWER_CONTROL_RPM
121        for arg in cmdline_args:
122            match = re.search("^(\w+)=(.+)", arg)
123            if match:
124                args[match.group(1)] = match.group(2)
125        if 'power_control' in args:
126            self.power_control = args['power_control']
127            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
128                raise error.TestError('Valid values for --args=power_control '
129                                      'are %s. But you entered wrong argument '
130                                      'as "%s".'
131                                       % (host.POWER_CONTROL_VALID_ARGS,
132                                       self.power_control))
133        self._no_ec_sync = False
134        if 'no_ec_sync' in args:
135            if 'true' in args['no_ec_sync'].lower():
136                self._no_ec_sync = True
137
138        if not self.faft_client.system.dev_tpm_present():
139            raise error.TestError('/dev/tpm0 does not exist on the client')
140
141        self.faft_config = FAFTConfig(
142                self.faft_client.system.get_platform_name())
143        self.checkers = FAFTCheckers(self)
144        self.switcher = mode_switcher.create_mode_switcher(self)
145
146        if self.faft_config.chrome_ec:
147            self.ec = chrome_ec.ChromeEC(self.servo)
148        # Check for presence of a USBPD console
149        if self.faft_config.chrome_usbpd:
150            self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
151        elif self.faft_config.chrome_ec:
152            # If no separate USBPD console, then PD exists on EC console
153            self.usbpd = self.ec
154        # Get plankton console
155        self.plankton = host.plankton
156        self.plankton_host = host._plankton_host
157
158        # Create the BaseEC object. None if not available.
159        self.base_ec = chrome_base_ec.create_base_ec(self.servo)
160
161        self._setup_uart_capture()
162        self._setup_servo_log()
163        self._record_system_info()
164        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
165        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
166        if self.fw_vboot2:
167            self.faft_client.system.set_fw_try_next('A')
168            if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
169                logging.info('mainfw_act is B. rebooting to set it A')
170                self.switcher.mode_aware_reboot()
171        self._setup_gbb_flags()
172        self.faft_client.updater.stop_daemon()
173        self._create_faft_lockfile()
174        self._setup_ec_write_protect(ec_wp)
175        # See chromium:239034 regarding needing this sync.
176        self.blocking_sync()
177        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
178
179    def cleanup(self):
180        """Autotest cleanup function."""
181        # Unset state checker in case it's set by subclass
182        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
183        try:
184            self.faft_client.system.is_available()
185        except:
186            # Remote is not responding. Revive DUT so that subsequent tests
187            # don't fail.
188            self._restore_routine_from_timeout()
189        self.switcher.restore_mode()
190        self._restore_ec_write_protect()
191        self._restore_servo_v4_role()
192        self._restore_gbb_flags()
193        self.faft_client.updater.start_daemon()
194        self.faft_client.updater.cleanup()
195        self._remove_faft_lockfile()
196        self._record_servo_log()
197        self._record_faft_client_log()
198        self._cleanup_uart_capture()
199        super(FirmwareTest, self).cleanup()
200        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
201
202    def _record_system_info(self):
203        """Record some critical system info to the attr keyval.
204
205        This info is used by generate_test_report later.
206        """
207        system_info = {
208            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
209            'ec_version': self.faft_client.ec.get_version(),
210            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
211            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
212            'servod_version': self._client._servo_host.run(
213                'servod --version').stdout.strip(),
214            'os_version': self._client.get_release_builder_path(),
215            'servo_type': self.servo.get_servo_version()
216        }
217
218        # Record the servo v4 and servo micro versions when possible
219        if 'servo_micro' in system_info['servo_type']:
220            system_info['servo_micro_version'] = self.servo.get(
221                    'servo_micro_version')
222
223        if 'servo_v4' in system_info['servo_type']:
224            system_info['servo_v4_version'] = self.servo.get('servo_v4_version')
225
226        if hasattr(self, 'cr50'):
227            system_info['cr50_version'] = self.servo.get('cr50_version')
228
229        logging.info('System info:\n%s', pprint.pformat(system_info))
230        self.write_attr_keyval(system_info)
231
232    def invalidate_firmware_setup(self):
233        """Invalidate all firmware related setup state.
234
235        This method is called when the firmware is re-flashed. It resets all
236        firmware related setup states so that the next test setup properly
237        again.
238        """
239        self.unmark_setup_done('gbb_flags')
240
241    def _retrieve_recovery_reason_from_trap(self):
242        """Try to retrieve the recovery reason from a trapped recovery screen.
243
244        @return: The recovery_reason, 0 if any error.
245        """
246        recovery_reason = 0
247        logging.info('Try to retrieve recovery reason...')
248        if self.servo.get_usbkey_direction() == 'dut':
249            self.switcher.bypass_rec_mode()
250        else:
251            self.servo.switch_usbkey('dut')
252
253        try:
254            self.switcher.wait_for_client()
255            lines = self.faft_client.system.run_shell_command_get_output(
256                        'crossystem recovery_reason')
257            recovery_reason = int(lines[0])
258            logging.info('Got the recovery reason %d.', recovery_reason)
259        except ConnectionError:
260            logging.error('Failed to get the recovery reason due to connection '
261                          'error.')
262        return recovery_reason
263
264    def _reset_client(self):
265        """Reset client to a workable state.
266
267        This method is called when the client is not responsive. It may be
268        caused by the following cases:
269          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
270          - corrupted firmware;
271          - corrutped OS image.
272        """
273        # DUT may halt on a firmware screen. Try cold reboot.
274        logging.info('Try cold reboot...')
275        self.switcher.mode_aware_reboot(reboot_type='cold',
276                                        sync_before_boot=False,
277                                        wait_for_dut_up=False)
278        self.switcher.wait_for_client_offline()
279        self.switcher.bypass_dev_mode()
280        try:
281            self.switcher.wait_for_client()
282            return
283        except ConnectionError:
284            logging.warn('Cold reboot doesn\'t help, still connection error.')
285
286        # DUT may be broken by a corrupted firmware. Restore firmware.
287        # We assume the recovery boot still works fine. Since the recovery
288        # code is in RO region and all FAFT tests don't change the RO region
289        # except GBB.
290        if self.is_firmware_saved():
291            self._ensure_client_in_recovery()
292            logging.info('Try restore the original firmware...')
293            if self.is_firmware_changed():
294                try:
295                    self.restore_firmware()
296                    return
297                except ConnectionError:
298                    logging.warn('Restoring firmware doesn\'t help, still '
299                                 'connection error.')
300
301        # Perhaps it's kernel that's broken. Let's try restoring it.
302        if self.is_kernel_saved():
303            self._ensure_client_in_recovery()
304            logging.info('Try restore the original kernel...')
305            if self.is_kernel_changed():
306                try:
307                    self.restore_kernel()
308                    return
309                except ConnectionError:
310                    logging.warn('Restoring kernel doesn\'t help, still '
311                                 'connection error.')
312
313        # DUT may be broken by a corrupted OS image. Restore OS image.
314        self._ensure_client_in_recovery()
315        logging.info('Try restore the OS image...')
316        self.faft_client.system.run_shell_command('chromeos-install --yes')
317        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
318        self.switcher.wait_for_client_offline()
319        self.switcher.bypass_dev_mode()
320        try:
321            self.switcher.wait_for_client()
322            logging.info('Successfully restore OS image.')
323            return
324        except ConnectionError:
325            logging.warn('Restoring OS image doesn\'t help, still connection '
326                         'error.')
327
328    def _ensure_client_in_recovery(self):
329        """Ensure client in recovery boot; reboot into it if necessary.
330
331        @raise TestError: if failed to boot the USB image.
332        """
333        logging.info('Try boot into USB image...')
334        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
335                                     wait_for_dut_up=False)
336        self.servo.switch_usbkey('host')
337        self.switcher.bypass_rec_mode()
338        try:
339            self.switcher.wait_for_client()
340        except ConnectionError:
341            raise error.TestError('Failed to boot the USB image.')
342
343    def _restore_routine_from_timeout(self):
344        """A routine to try to restore the system from a timeout error.
345
346        This method is called when FAFT failed to connect DUT after reboot.
347
348        @raise TestFail: This exception is already raised, with a decription
349                         why it failed.
350        """
351        # DUT is disconnected. Capture the UART output for debug.
352        self._record_uart_capture()
353
354        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
355        # identify if it is a network flaky.
356
357        recovery_reason = self._retrieve_recovery_reason_from_trap()
358
359        # Reset client to a workable state.
360        self._reset_client()
361
362        # Raise the proper TestFail exception.
363        if recovery_reason:
364            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
365                                 'and timed out' % recovery_reason)
366        else:
367            raise error.TestFail('Timed out waiting for DUT reboot')
368
369    def assert_test_image_in_usb_disk(self, usb_dev=None):
370        """Assert an USB disk plugged-in on servo and a test image inside.
371
372        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
373                        If None, it is detected automatically.
374        @raise TestError: if USB disk not detected or not a test image.
375        """
376        if self.check_setup_done('usb_check'):
377            return
378        if usb_dev:
379            assert self.servo.get_usbkey_direction() == 'host'
380        else:
381            self.servo.switch_usbkey('host')
382            usb_dev = self.servo.probe_host_usb_dev()
383            if not usb_dev:
384                raise error.TestError(
385                        'An USB disk should be plugged in the servo board.')
386
387        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
388        logging.info('usb dev is %s', usb_dev)
389        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
390        self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
391
392        try:
393            usb_lsb = self.servo.system_output('cat %s' %
394                os.path.join(tmpd, 'etc/lsb-release'))
395            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
396            dut_lsb = '\n'.join(self.faft_client.system.
397                run_shell_command_get_output('cat /etc/lsb-release'))
398            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
399            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
400                raise error.TestError('USB stick in servo is no test image')
401            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
402            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
403            if usb_board != dut_board:
404                raise error.TestError('USB stick in servo contains a %s '
405                    'image, but DUT is a %s' % (usb_board, dut_board))
406        finally:
407            for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd):
408                self.servo.system(cmd)
409
410        self.mark_setup_done('usb_check')
411
412    def setup_usbkey(self, usbkey, host=None, used_for_recovery=None):
413        """Setup the USB disk for the test.
414
415        It checks the setup of USB disk and a valid ChromeOS test image inside.
416        It also muxes the USB disk to either the host or DUT by request.
417
418        @param usbkey: True if the USB disk is required for the test, False if
419                       not required.
420        @param host: Optional, True to mux the USB disk to host, False to mux it
421                    to DUT, default to do nothing.
422        @param used_for_recovery: Optional, True if the USB disk is used for
423                                  recovery boot; False if the USB disk is not
424                                  used for recovery boot, like Ctrl-U USB boot.
425        """
426        if usbkey:
427            self.assert_test_image_in_usb_disk()
428        elif host is None:
429            # USB disk is not required for the test. Better to mux it to host.
430            host = True
431
432        if host is True:
433            self.servo.switch_usbkey('host')
434        elif host is False:
435            self.servo.switch_usbkey('dut')
436
437        if used_for_recovery is None:
438            # Default value is True if usbkey == True.
439            # As the common usecase of USB disk is for recovery boot. Tests
440            # can define it explicitly if not.
441            used_for_recovery = usbkey
442
443        if used_for_recovery:
444            # In recovery boot, the locked EC RO doesn't support PD for most
445            # of the CrOS devices. The default servo v4 power role is a SRC.
446            # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the
447            # data role swap from UFP to DFP; as a result, DUT can't see the
448            # USB disk and the Ethernet dongle on servo v4.
449            #
450            # This is a workaround to set servo v4 as a SNK, for every FAFT
451            # test which boots into the USB disk in the recovery mode.
452            #
453            # TODO(waihong): Add a check to see if the battery level is too
454            # low and sleep for a while for charging.
455            self.set_servo_v4_role_to_snk()
456
457    def set_servo_v4_role_to_snk(self):
458        """Set the servo v4 role to SNK."""
459        self._needed_restore_servo_v4_role = True
460        self.servo.set_servo_v4_role('snk')
461
462    def _restore_servo_v4_role(self):
463        """Restore the servo v4 role to default SRC."""
464        if not hasattr(self, '_needed_restore_servo_v4_role'):
465            return
466        if self._needed_restore_servo_v4_role:
467            self.servo.set_servo_v4_role('src')
468
469    def get_usbdisk_path_on_dut(self):
470        """Get the path of the USB disk device plugged-in the servo on DUT.
471
472        Returns:
473          A string representing USB disk path, like '/dev/sdb', or None if
474          no USB disk is found.
475        """
476        cmd = 'ls -d /dev/s*[a-z]'
477        original_value = self.servo.get_usbkey_direction()
478
479        # Make the dut unable to see the USB disk.
480        self.servo.switch_usbkey('off')
481        no_usb_set = set(
482            self.faft_client.system.run_shell_command_get_output(cmd))
483
484        # Make the dut able to see the USB disk.
485        self.servo.switch_usbkey('dut')
486        time.sleep(self.faft_config.usb_plug)
487        has_usb_set = set(
488            self.faft_client.system.run_shell_command_get_output(cmd))
489
490        # Back to its original value.
491        if original_value != self.servo.get_usbkey_direction():
492            self.servo.switch_usbkey(original_value)
493
494        diff_set = has_usb_set - no_usb_set
495        if len(diff_set) == 1:
496            return diff_set.pop()
497        else:
498            return None
499
500    def _create_faft_lockfile(self):
501        """Creates the FAFT lockfile."""
502        logging.info('Creating FAFT lockfile...')
503        command = 'touch %s' % (self.lockfile)
504        self.faft_client.system.run_shell_command(command)
505
506    def _remove_faft_lockfile(self):
507        """Removes the FAFT lockfile."""
508        logging.info('Removing FAFT lockfile...')
509        command = 'rm -f %s' % (self.lockfile)
510        self.faft_client.system.run_shell_command(command)
511
512    def clear_set_gbb_flags(self, clear_mask, set_mask):
513        """Clear and set the GBB flags in the current flashrom.
514
515        @param clear_mask: A mask of flags to be cleared.
516        @param set_mask: A mask of flags to be set.
517        """
518        gbb_flags = self.faft_client.bios.get_gbb_flags()
519        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
520        self.gbb_flags = new_flags
521        if new_flags != gbb_flags:
522            self._backup_gbb_flags = gbb_flags
523            logging.info('Changing GBB flags from 0x%x to 0x%x.',
524                         gbb_flags, new_flags)
525            self.faft_client.bios.set_gbb_flags(new_flags)
526            # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
527            # reboot to get a clear state
528            if ((gbb_flags ^ new_flags) &
529                (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
530                 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
531                self.switcher.mode_aware_reboot()
532        else:
533            logging.info('Current GBB flags look good for test: 0x%x.',
534                         gbb_flags)
535
536    def check_ec_capability(self, required_cap=None, suppress_warning=False):
537        """Check if current platform has required EC capabilities.
538
539        @param required_cap: A list containing required EC capabilities. Pass in
540                             None to only check for presence of Chrome EC.
541        @param suppress_warning: True to suppress any warning messages.
542        @return: True if requirements are met. Otherwise, False.
543        """
544        if not self.faft_config.chrome_ec:
545            if not suppress_warning:
546                logging.warn('Requires Chrome EC to run this test.')
547            return False
548
549        if not required_cap:
550            return True
551
552        for cap in required_cap:
553            if cap not in self.faft_config.ec_capability:
554                if not suppress_warning:
555                    logging.warn('Requires EC capability "%s" to run this '
556                                 'test.', cap)
557                return False
558
559        return True
560
561    def check_root_part_on_non_recovery(self, part):
562        """Check the partition number of root device and on normal/dev boot.
563
564        @param part: A string of partition number, e.g.'3'.
565        @return: True if the root device matched and on normal/dev boot;
566                 otherwise, False.
567        """
568        return self.checkers.root_part_checker(part) and \
569                self.checkers.crossystem_checker({
570                    'mainfw_type': ('normal', 'developer'),
571                })
572
573    def _join_part(self, dev, part):
574        """Return a concatenated string of device and partition number.
575
576        @param dev: A string of device, e.g.'/dev/sda'.
577        @param part: A string of partition number, e.g.'3'.
578        @return: A concatenated string of device and partition number,
579                 e.g.'/dev/sda3'.
580
581        >>> seq = FirmwareTest()
582        >>> seq._join_part('/dev/sda', '3')
583        '/dev/sda3'
584        >>> seq._join_part('/dev/mmcblk0', '2')
585        '/dev/mmcblk0p2'
586        """
587        if 'mmcblk' in dev:
588            return dev + 'p' + part
589        elif 'nvme' in dev:
590            return dev + 'p' + part
591        else:
592            return dev + part
593
594    def copy_kernel_and_rootfs(self, from_part, to_part):
595        """Copy kernel and rootfs from from_part to to_part.
596
597        @param from_part: A string of partition number to be copied from.
598        @param to_part: A string of partition number to be copied to.
599        """
600        root_dev = self.faft_client.system.get_root_dev()
601        logging.info('Copying kernel from %s to %s. Please wait...',
602                     from_part, to_part)
603        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
604                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
605                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
606        logging.info('Copying rootfs from %s to %s. Please wait...',
607                     from_part, to_part)
608        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
609                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
610                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
611
612    def ensure_kernel_boot(self, part):
613        """Ensure the request kernel boot.
614
615        If not, it duplicates the current kernel to the requested kernel
616        and sets the requested higher priority to ensure it boot.
617
618        @param part: A string of kernel partition number or 'a'/'b'.
619        """
620        if not self.checkers.root_part_checker(part):
621            if self.faft_client.kernel.diff_a_b():
622                self.copy_kernel_and_rootfs(
623                        from_part=self.OTHER_KERNEL_MAP[part],
624                        to_part=part)
625            self.reset_and_prioritize_kernel(part)
626            self.switcher.mode_aware_reboot()
627
628    def ensure_dev_internal_boot(self, original_dev_boot_usb):
629        """Ensure internal device boot in developer mode.
630
631        If not internal device boot, it will try to reboot the device and
632        bypass dev mode to boot into internal device.
633
634        @param original_dev_boot_usb: Original dev_boot_usb value.
635        """
636        logging.info('Checking internal device boot.')
637        if self.faft_client.system.is_removable_device_boot():
638            logging.info('Reboot into internal disk...')
639            self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
640            self.switcher.mode_aware_reboot()
641        self.check_state((self.checkers.dev_boot_usb_checker, False,
642                          'Device not booted from internal disk properly.'))
643
644    def set_hardware_write_protect(self, enable):
645        """Set hardware write protect pin.
646
647        @param enable: True if asserting write protect pin. Otherwise, False.
648        """
649        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
650
651    def set_ec_write_protect_and_reboot(self, enable):
652        """Set EC write protect status and reboot to take effect.
653
654        The write protect state is only activated if both hardware write
655        protect pin is asserted and software write protect flag is set.
656        This method asserts/deasserts hardware write protect pin first, and
657        set corresponding EC software write protect flag.
658
659        If the device uses non-Chrome EC, set the software write protect via
660        flashrom.
661
662        If the device uses Chrome EC, a reboot is required for write protect
663        to take effect. Since the software write protect flag cannot be unset
664        if hardware write protect pin is asserted, we need to deasserted the
665        pin first if we are deactivating write protect. Similarly, a reboot
666        is required before we can modify the software flag.
667
668        @param enable: True if activating EC write protect. Otherwise, False.
669        """
670        self.set_hardware_write_protect(enable)
671        if self.faft_config.chrome_ec:
672            self.set_chrome_ec_write_protect_and_reboot(enable)
673        else:
674            self.faft_client.ec.set_write_protect(enable)
675            self.switcher.mode_aware_reboot()
676
677    def set_chrome_ec_write_protect_and_reboot(self, enable):
678        """Set Chrome EC write protect status and reboot to take effect.
679
680        @param enable: True if activating EC write protect. Otherwise, False.
681        """
682        if enable:
683            # Set write protect flag and reboot to take effect.
684            self.ec.set_flash_write_protect(enable)
685            self.sync_and_ec_reboot()
686        else:
687            # Reboot after deasserting hardware write protect pin to deactivate
688            # write protect. And then remove software write protect flag.
689            self.sync_and_ec_reboot()
690            self.ec.set_flash_write_protect(enable)
691
692    def _setup_ec_write_protect(self, ec_wp):
693        """Setup for EC write-protection.
694
695        It makes sure the EC in the requested write-protection state. If not, it
696        flips the state. Flipping the write-protection requires DUT reboot.
697
698        @param ec_wp: True to request EC write-protected; False to request EC
699                      not write-protected; None to do nothing.
700        """
701        if ec_wp is None:
702            self._old_wpsw_boot = None
703            return
704        self._old_wpsw_cur = self.checkers.crossystem_checker(
705                                    {'wpsw_cur': '1'}, suppress_logging=True)
706        self._old_wpsw_boot = self.checkers.crossystem_checker(
707                                   {'wpsw_boot': '1'}, suppress_logging=True)
708        if not (ec_wp == self._old_wpsw_cur == self._old_wpsw_boot):
709            if not self.faft_config.ap_access_ec_flash:
710                raise error.TestNAError(
711                        "Cannot change EC write-protect for this device")
712
713            logging.info('The test required EC is %swrite-protected. Reboot '
714                         'and flip the state.', '' if ec_wp else 'not ')
715            self.switcher.mode_aware_reboot(
716                    'custom',
717                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
718        wpsw_boot = wpsw_cur = '1' if ec_wp == True else '0'
719        self.check_state((self.checkers.crossystem_checker, {
720                               'wpsw_boot': wpsw_boot, 'wpsw_cur': wpsw_cur}))
721
722    def _restore_ec_write_protect(self):
723        """Restore the original EC write-protection."""
724        if (not hasattr(self, '_old_wpsw_boot')) or (self._old_wpsw_boot is
725                                                     None):
726            return
727        if not self.checkers.crossystem_checker({'wpsw_boot': '1' if
728                       self._old_wpsw_boot else '0'}, suppress_logging=True):
729            logging.info('Restore original EC write protection and reboot.')
730            self.switcher.mode_aware_reboot(
731                    'custom',
732                    lambda:self.set_ec_write_protect_and_reboot(
733                            self._old_wpsw_boot))
734        self.check_state((self.checkers.crossystem_checker, {
735                           'wpsw_boot': '1' if self._old_wpsw_boot else '0'}))
736
737    def _setup_uart_capture(self):
738        """Setup the CPU/EC/PD UART capture."""
739        self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
740        self.servo.set('cpu_uart_capture', 'on')
741        self.cr50_uart_file = None
742        self.ec_uart_file = None
743        self.servo_micro_uart_file = None
744        self.servo_v4_uart_file = None
745        self.usbpd_uart_file = None
746        try:
747            # Check that the console works before declaring the cr50 console
748            # connection exists and enabling uart capture.
749            self.servo.get('cr50_version')
750            self.servo.set('cr50_uart_capture', 'on')
751            self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt')
752            self.cr50 = chrome_cr50.ChromeCr50(self.servo)
753        except error.TestFail as e:
754            if 'No control named' in str(e):
755                logging.warn('cr50 console not supported.')
756        if self.faft_config.chrome_ec:
757            try:
758                self.servo.set('ec_uart_capture', 'on')
759                self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
760            except error.TestFail as e:
761                if 'No control named' in str(e):
762                    logging.warn('The servod is too old that ec_uart_capture '
763                                 'not supported.')
764            # Log separate PD console if supported
765            if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
766                try:
767                    self.servo.set('usbpd_uart_capture', 'on')
768                    self.usbpd_uart_file = os.path.join(self.resultsdir,
769                                                        'usbpd_uart.txt')
770                except error.TestFail as e:
771                    if 'No control named' in str(e):
772                        logging.warn('The servod is too old that '
773                                     'usbpd_uart_capture is not supported.')
774        else:
775            logging.info('Not a Google EC, cannot capture ec console output.')
776        try:
777            self.servo.set('servo_micro_uart_capture', 'on')
778            self.servo_micro_uart_file = os.path.join(self.resultsdir,
779                                                      'servo_micro_uart.txt')
780        except error.TestFail as e:
781            if 'No control named' in str(e):
782                logging.warn('servo micro console not supported.')
783        try:
784            self.servo.set('servo_v4_uart_capture', 'on')
785            self.servo_v4_uart_file = os.path.join(self.resultsdir,
786                                                   'servo_v4_uart.txt')
787        except error.TestFail as e:
788            if 'No control named' in str(e):
789                logging.warn('servo v4 console not supported.')
790
791    def _record_uart_capture(self):
792        """Record the CPU/EC/PD UART output stream to files."""
793        if self.cpu_uart_file:
794            with open(self.cpu_uart_file, 'a') as f:
795                f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
796        if self.cr50_uart_file:
797            with open(self.cr50_uart_file, 'a') as f:
798                f.write(ast.literal_eval(self.servo.get('cr50_uart_stream')))
799        if self.ec_uart_file and self.faft_config.chrome_ec:
800            with open(self.ec_uart_file, 'a') as f:
801                f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
802        if self.servo_micro_uart_file:
803            with open(self.servo_micro_uart_file, 'a') as f:
804                f.write(ast.literal_eval(self.servo.get(
805                        'servo_micro_uart_stream')))
806        if self.servo_v4_uart_file:
807            with open(self.servo_v4_uart_file, 'a') as f:
808                f.write(ast.literal_eval(self.servo.get(
809                        'servo_v4_uart_stream')))
810        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
811            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
812            with open(self.usbpd_uart_file, 'a') as f:
813                f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
814
815    def _cleanup_uart_capture(self):
816        """Cleanup the CPU/EC/PD UART capture."""
817        # Flush the remaining UART output.
818        self._record_uart_capture()
819        self.servo.set('cpu_uart_capture', 'off')
820        if self.cr50_uart_file:
821            self.servo.set('cr50_uart_capture', 'off')
822        if self.ec_uart_file and self.faft_config.chrome_ec:
823            self.servo.set('ec_uart_capture', 'off')
824        if self.servo_micro_uart_file:
825            self.servo.set('servo_micro_uart_capture', 'off')
826        if self.servo_v4_uart_file:
827            self.servo.set('servo_v4_uart_capture', 'off')
828        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
829            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
830            self.servo.set('usbpd_uart_capture', 'off')
831
832    def _get_power_state(self, power_state):
833        """
834        Return the current power state of the AP
835        """
836        return self.ec.send_command_get_output("powerinfo", [power_state])
837
838    def wait_power_state(self, power_state, retries):
839        """
840        Wait for certain power state.
841
842        @param power_state: power state you are expecting
843        @param retries: retries.  This is necessary if AP is powering down
844        and transitioning through different states.
845        """
846        logging.info('Checking power state "%s" maximum %d times.',
847                     power_state, retries)
848        while retries > 0:
849            logging.info("try count: %d", retries)
850            try:
851                retries = retries - 1
852                ret = self._get_power_state(power_state)
853                return True
854            except error.TestFail:
855                pass
856        return False
857
858    def suspend(self):
859        """Suspends the DUT."""
860        cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
861        self.faft_client.system.run_shell_command(cmd)
862        time.sleep(self.EC_SUSPEND_DELAY)
863
864    def _fetch_servo_log(self):
865        """Fetch the servo log."""
866        cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
867        servo_log = self.servo.system_output(cmd)
868        return None if servo_log == 'NOTFOUND' else servo_log
869
870    def _setup_servo_log(self):
871        """Setup the servo log capturing."""
872        self.servo_log_original_len = -1
873        if self.servo.is_localhost():
874            # No servo log recorded when servod runs locally.
875            return
876
877        servo_log = self._fetch_servo_log()
878        if servo_log:
879            self.servo_log_original_len = len(servo_log)
880        else:
881            logging.warn('Servo log file not found.')
882
883    def _record_servo_log(self):
884        """Record the servo log to the results directory."""
885        if self.servo_log_original_len != -1:
886            servo_log = self._fetch_servo_log()
887            servo_log_file = os.path.join(self.resultsdir, 'servod.log')
888            with open(servo_log_file, 'a') as f:
889                f.write(servo_log[self.servo_log_original_len:])
890
891    def _record_faft_client_log(self):
892        """Record the faft client log to the results directory."""
893        client_log = self.faft_client.system.dump_log(True)
894        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
895        with open(client_log_file, 'w') as f:
896            f.write(client_log)
897
898    def _setup_gbb_flags(self):
899        """Setup the GBB flags for FAFT test."""
900        if self.check_setup_done('gbb_flags'):
901            return
902
903        logging.info('Set proper GBB flags for test.')
904        # Ensure that GBB flags are set to 0x140.
905        flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE |
906                        vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM)
907        # And if the "no_ec_sync" argument is set, then disable EC software
908        # sync.
909        if self._no_ec_sync:
910            flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC
911
912        self.clear_set_gbb_flags(0xffffffff, flags_to_set)
913        self.mark_setup_done('gbb_flags')
914
915    def drop_backup_gbb_flags(self):
916        """Drops the backup GBB flags.
917
918        This can be used when a test intends to permanently change GBB flags.
919        """
920        self._backup_gbb_flags = None
921
922    def _restore_gbb_flags(self):
923        """Restore GBB flags to their original state."""
924        if self._backup_gbb_flags is None:
925            return
926        # Setting up and restoring the GBB flags take a lot of time. For
927        # speed-up purpose, don't restore it.
928        logging.info('***')
929        logging.info('*** Please manually restore the original GBB flags to: '
930                     '0x%x ***', self._backup_gbb_flags)
931        logging.info('***')
932        self.unmark_setup_done('gbb_flags')
933
934    def setup_tried_fwb(self, tried_fwb):
935        """Setup for fw B tried state.
936
937        It makes sure the system in the requested fw B tried state. If not, it
938        tries to do so.
939
940        @param tried_fwb: True if requested in tried_fwb=1;
941                          False if tried_fwb=0.
942        """
943        if tried_fwb:
944            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
945                logging.info(
946                    'Firmware is not booted with tried_fwb. Reboot into it.')
947                self.faft_client.system.set_try_fw_b()
948        else:
949            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
950                logging.info(
951                    'Firmware is booted with tried_fwb. Reboot to clear.')
952
953    def power_on(self):
954        """Switch DUT AC power on."""
955        self._client.power_on(self.power_control)
956
957    def power_off(self):
958        """Switch DUT AC power off."""
959        self._client.power_off(self.power_control)
960
961    def power_cycle(self):
962        """Power cycle DUT AC power."""
963        self._client.power_cycle(self.power_control)
964
965    def setup_rw_boot(self, section='a'):
966        """Make sure firmware is in RW-boot mode.
967
968        If the given firmware section is in RO-boot mode, turn off the RO-boot
969        flag and reboot DUT into RW-boot mode.
970
971        @param section: A firmware section, either 'a' or 'b'.
972        """
973        flags = self.faft_client.bios.get_preamble_flags(section)
974        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
975            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
976            self.faft_client.bios.set_preamble_flags(section, flags)
977            self.switcher.mode_aware_reboot()
978
979    def setup_kernel(self, part):
980        """Setup for kernel test.
981
982        It makes sure both kernel A and B bootable and the current boot is
983        the requested kernel part.
984
985        @param part: A string of kernel partition number or 'a'/'b'.
986        """
987        self.ensure_kernel_boot(part)
988        logging.info('Checking the integrity of kernel B and rootfs B...')
989        if (self.faft_client.kernel.diff_a_b() or
990                not self.faft_client.rootfs.verify_rootfs('B')):
991            logging.info('Copying kernel and rootfs from A to B...')
992            self.copy_kernel_and_rootfs(from_part=part,
993                                        to_part=self.OTHER_KERNEL_MAP[part])
994        self.reset_and_prioritize_kernel(part)
995
996    def reset_and_prioritize_kernel(self, part):
997        """Make the requested partition highest priority.
998
999        This function also reset kerenl A and B to bootable.
1000
1001        @param part: A string of partition number to be prioritized.
1002        """
1003        root_dev = self.faft_client.system.get_root_dev()
1004        # Reset kernel A and B to bootable.
1005        self.faft_client.system.run_shell_command(
1006            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
1007        self.faft_client.system.run_shell_command(
1008            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
1009        # Set kernel part highest priority.
1010        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
1011                (self.KERNEL_MAP[part], root_dev))
1012
1013    def do_blocking_sync(self, device):
1014        """Run a blocking sync command."""
1015        logging.info("Blocking sync for %s", device)
1016        if 'mmcblk' in device:
1017            # For mmc devices, use `mmc status get` command to send an
1018            # empty command to wait for the disk to be available again.
1019            self.faft_client.system.run_shell_command('mmc status get %s' %
1020                                                      device)
1021        elif 'nvme' in device:
1022            # Get a list of NVMe namespace and flush them individually
1023            # Assumes the output format from nvme list-ns command will
1024            # be something like follows:
1025            # [ 0]:0x1
1026            # [ 1]:0x2
1027            available_ns = self.faft_client.system.run_shell_command_get_output(
1028                                                  'nvme list-ns %s -a' % device)
1029            for ns in available_ns:
1030                ns = ns.split(':')[-1]
1031                # For NVMe devices, use `nvme flush` command to commit data
1032                # and metadata to non-volatile media.
1033                self.faft_client.system.run_shell_command(
1034                                 'nvme flush %s -n %s' % (device, ns))
1035        else:
1036            # For other devices, hdparm sends TUR to check if
1037            # a device is ready for transfer operation.
1038            self.faft_client.system.run_shell_command('hdparm -f %s' % device)
1039
1040    def blocking_sync(self):
1041        """Sync root device and internal device."""
1042        # The double calls to sync fakes a blocking call
1043        # since the first call returns before the flush
1044        # is complete, but the second will wait for the
1045        # first to finish.
1046        self.faft_client.system.run_shell_command('sync')
1047        self.faft_client.system.run_shell_command('sync')
1048
1049        # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
1050        # This function will perform a device-specific sync command.
1051        root_dev = self.faft_client.system.get_root_dev()
1052        self.do_blocking_sync(root_dev)
1053
1054        # Also sync the internal device if booted from removable media.
1055        if self.faft_client.system.is_removable_device_boot():
1056            internal_dev = self.faft_client.system.get_internal_device()
1057            self.do_blocking_sync(internal_dev)
1058
1059    def sync_and_ec_reboot(self, flags=''):
1060        """Request the client sync and do a EC triggered reboot.
1061
1062        @param flags: Optional, a space-separated string of flags passed to EC
1063                      reboot command, including:
1064                          default: EC soft reboot;
1065                          'hard': EC cold/hard reboot.
1066        """
1067        self.blocking_sync()
1068        self.ec.reboot(flags)
1069        time.sleep(self.faft_config.ec_boot_to_console)
1070        self.check_lid_and_power_on()
1071
1072    def reboot_and_reset_tpm(self):
1073        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
1074        self.switcher.reboot_to_mode(to_mode='rec')
1075        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
1076        self.switcher.mode_aware_reboot()
1077
1078    def full_power_off_and_on(self):
1079        """Shutdown the device by pressing power button and power on again."""
1080        boot_id = self.get_bootid()
1081        # Press power button to trigger Chrome OS normal shutdown process.
1082        # We use a customized delay since the normal-press 1.2s is not enough.
1083        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
1084        # device can take 44-51 seconds to restart,
1085        # add buffer from the default timeout of 60 seconds.
1086        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
1087        time.sleep(self.faft_config.shutdown)
1088        if self.check_ec_capability(['x86'], suppress_warning=True):
1089            self.check_shutdown_power_state("G3", pwr_retries=5)
1090        # Short press power button to boot DUT again.
1091        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1092
1093    def check_shutdown_power_state(self, power_state, pwr_retries):
1094        """Check whether the device entered into requested EC power state
1095        after shutdown.
1096
1097        @param power_state: EC power state has to be checked. Either S5 or G3.
1098        @param pwr_retries: Times to check if the DUT in expected power state.
1099        @raise TestFail: If device failed to enter into requested power state.
1100        """
1101        if not self.wait_power_state(power_state, pwr_retries):
1102            raise error.TestFail('System not shutdown properly and EC fails '
1103                                 'to enter into %s state.' % power_state)
1104        logging.info('System entered into %s state..', power_state)
1105
1106    def check_lid_and_power_on(self):
1107        """
1108        On devices with EC software sync, system powers on after EC reboots if
1109        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
1110        This method checks lid switch state and presses power button if
1111        necessary.
1112        """
1113        if self.servo.get("lid_open") == "no":
1114            time.sleep(self.faft_config.software_sync)
1115            self.servo.power_short_press()
1116
1117    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
1118        """Modify the kernel header magic in USB stick.
1119
1120        The kernel header magic is the first 8-byte of kernel partition.
1121        We modify it to make it fail on kernel verification check.
1122
1123        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1124        @param from_magic: A string of magic which we change it from.
1125        @param to_magic: A string of magic which we change it to.
1126        @raise TestError: if failed to change magic.
1127        """
1128        assert len(from_magic) == 8
1129        assert len(to_magic) == 8
1130        # USB image only contains one kernel.
1131        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1132        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1133        current_magic = self.servo.system_output(read_cmd)
1134        if current_magic == to_magic:
1135            logging.info("The kernel magic is already %s.", current_magic)
1136            return
1137        if current_magic != from_magic:
1138            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1139
1140        logging.info('Modify the kernel magic in USB, from %s to %s.',
1141                     from_magic, to_magic)
1142        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1143                     " 2>/dev/null" % (to_magic, kernel_part))
1144        self.servo.system(write_cmd)
1145
1146        if self.servo.system_output(read_cmd) != to_magic:
1147            raise error.TestError("Failed to write new magic.")
1148
1149    def corrupt_usb_kernel(self, usb_dev):
1150        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1151
1152        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1153        """
1154        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1155                                self.CORRUPTED_MAGIC)
1156
1157    def restore_usb_kernel(self, usb_dev):
1158        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1159
1160        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1161        """
1162        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1163                                self.CHROMEOS_MAGIC)
1164
1165    def _call_action(self, action_tuple, check_status=False):
1166        """Call the action function with/without arguments.
1167
1168        @param action_tuple: A function, or a tuple (function, args, error_msg),
1169                             in which, args and error_msg are optional. args is
1170                             either a value or a tuple if multiple arguments.
1171                             This can also be a list containing multiple
1172                             function or tuple. In this case, these actions are
1173                             called in sequence.
1174        @param check_status: Check the return value of action function. If not
1175                             succeed, raises a TestFail exception.
1176        @return: The result value of the action function.
1177        @raise TestError: An error when the action function is not callable.
1178        @raise TestFail: When check_status=True, action function not succeed.
1179        """
1180        if isinstance(action_tuple, list):
1181            return all([self._call_action(action, check_status=check_status)
1182                        for action in action_tuple])
1183
1184        action = action_tuple
1185        args = ()
1186        error_msg = 'Not succeed'
1187        if isinstance(action_tuple, tuple):
1188            action = action_tuple[0]
1189            if len(action_tuple) >= 2:
1190                args = action_tuple[1]
1191                if not isinstance(args, tuple):
1192                    args = (args,)
1193            if len(action_tuple) >= 3:
1194                error_msg = action_tuple[2]
1195
1196        if action is None:
1197            return
1198
1199        if not callable(action):
1200            raise error.TestError('action is not callable!')
1201
1202        info_msg = 'calling %s' % action.__name__
1203        if args:
1204            info_msg += ' with args %s' % str(args)
1205        logging.info(info_msg)
1206        ret = action(*args)
1207
1208        if check_status and not ret:
1209            raise error.TestFail('%s: %s returning %s' %
1210                                 (error_msg, info_msg, str(ret)))
1211        return ret
1212
1213    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1214                             run_power_action=True, post_power_action=None,
1215                             shutdown_timeout=None):
1216        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1217
1218        @param shutdown_action: function which makes DUT shutdown, like
1219                                pressing power key.
1220        @param pre_power_action: function which is called before next power on.
1221        @param run_power_action: power_key press by default, set to None to skip.
1222        @param post_power_action: function which is called after next power on.
1223        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1224        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1225        """
1226        self._call_action(shutdown_action)
1227        logging.info('Wait to ensure DUT shut down...')
1228        try:
1229            if shutdown_timeout is None:
1230                shutdown_timeout = self.faft_config.shutdown_timeout
1231            self.switcher.wait_for_client(timeout=shutdown_timeout)
1232            raise error.TestFail(
1233                    'Should shut the device down after calling %s.' %
1234                    shutdown_action.__name__)
1235        except ConnectionError:
1236            if self.check_ec_capability(['x86'], suppress_warning=True):
1237                self.check_shutdown_power_state("G3", pwr_retries=5)
1238            logging.info(
1239                'DUT is surely shutdown. We are going to power it on again...')
1240
1241        if pre_power_action:
1242            self._call_action(pre_power_action)
1243        if run_power_action:
1244            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1245        if post_power_action:
1246            self._call_action(post_power_action)
1247
1248    def get_bootid(self, retry=3):
1249        """
1250        Return the bootid.
1251        """
1252        boot_id = None
1253        while retry:
1254            try:
1255                boot_id = self._client.get_boot_id()
1256                break
1257            except error.AutoservRunError:
1258                retry -= 1
1259                if retry:
1260                    logging.info('Retry to get boot_id...')
1261                else:
1262                    logging.warning('Failed to get boot_id.')
1263        logging.info('boot_id: %s', boot_id)
1264        return boot_id
1265
1266    def check_state(self, func):
1267        """
1268        Wrapper around _call_action with check_status set to True. This is a
1269        helper function to be used by tests and is currently implemented by
1270        calling _call_action with check_status=True.
1271
1272        TODO: This function's arguments need to be made more stringent. And
1273        its functionality should be moved over to check functions directly in
1274        the future.
1275
1276        @param func: A function, or a tuple (function, args, error_msg),
1277                             in which, args and error_msg are optional. args is
1278                             either a value or a tuple if multiple arguments.
1279                             This can also be a list containing multiple
1280                             function or tuple. In this case, these actions are
1281                             called in sequence.
1282        @return: The result value of the action function.
1283        @raise TestFail: If the function does notsucceed.
1284        """
1285        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1286        self._call_action(func, check_status=True)
1287        logging.info("-[FAFT]-[ end state_checker ]----------------")
1288
1289    def get_current_firmware_sha(self):
1290        """Get current firmware sha of body and vblock.
1291
1292        @return: Current firmware sha follows the order (
1293                 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
1294        """
1295        current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
1296                                self.faft_client.bios.get_body_sha('a'),
1297                                self.faft_client.bios.get_sig_sha('b'),
1298                                self.faft_client.bios.get_body_sha('b'))
1299        if not all(current_firmware_sha):
1300            raise error.TestError('Failed to get firmware sha.')
1301        return current_firmware_sha
1302
1303    def is_firmware_changed(self):
1304        """Check if the current firmware changed, by comparing its SHA.
1305
1306        @return: True if it is changed, otherwise Flase.
1307        """
1308        # Device may not be rebooted after test.
1309        self.faft_client.bios.reload()
1310
1311        current_sha = self.get_current_firmware_sha()
1312
1313        if current_sha == self._backup_firmware_sha:
1314            return False
1315        else:
1316            corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
1317            corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
1318            corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
1319            corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
1320            logging.info('Firmware changed:')
1321            logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
1322            logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
1323            logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
1324            logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
1325            return True
1326
1327    def backup_firmware(self, suffix='.original'):
1328        """Backup firmware to file, and then send it to host.
1329
1330        @param suffix: a string appended to backup file name
1331        """
1332        remote_temp_dir = self.faft_client.system.create_temp_dir()
1333        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1334        self.faft_client.bios.dump_whole(remote_bios_path)
1335        self._client.get_file(remote_bios_path,
1336                              os.path.join(self.resultsdir, 'bios' + suffix))
1337
1338        if self.faft_config.chrome_ec:
1339            remote_ec_path = os.path.join(remote_temp_dir, 'ec')
1340            self.faft_client.ec.dump_whole(remote_ec_path)
1341            self._client.get_file(remote_ec_path,
1342                              os.path.join(self.resultsdir, 'ec' + suffix))
1343
1344        self._client.run('rm -rf %s' % remote_temp_dir)
1345        logging.info('Backup firmware stored in %s with suffix %s',
1346            self.resultsdir, suffix)
1347
1348        self._backup_firmware_sha = self.get_current_firmware_sha()
1349
1350    def is_firmware_saved(self):
1351        """Check if a firmware saved (called backup_firmware before).
1352
1353        @return: True if the firmware is backuped; otherwise False.
1354        """
1355        return self._backup_firmware_sha != ()
1356
1357    def clear_saved_firmware(self):
1358        """Clear the firmware saved by the method backup_firmware."""
1359        self._backup_firmware_sha = ()
1360
1361    def restore_firmware(self, suffix='.original', restore_ec=True):
1362        """Restore firmware from host in resultsdir.
1363
1364        @param suffix: a string appended to backup file name
1365        @param restore_ec: True to restore the ec firmware; False not to do.
1366        """
1367        if not self.is_firmware_changed():
1368            return
1369
1370        # Backup current corrupted firmware.
1371        self.backup_firmware(suffix='.corrupt')
1372
1373        # Restore firmware.
1374        remote_temp_dir = self.faft_client.system.create_temp_dir()
1375        self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1376                               os.path.join(remote_temp_dir, 'bios'))
1377
1378        self.faft_client.bios.write_whole(
1379            os.path.join(remote_temp_dir, 'bios'))
1380
1381        if self.faft_config.chrome_ec and restore_ec:
1382            self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix),
1383                os.path.join(remote_temp_dir, 'ec'))
1384            self.faft_client.ec.write_whole(
1385                os.path.join(remote_temp_dir, 'ec'))
1386
1387        self.switcher.mode_aware_reboot()
1388        logging.info('Successfully restore firmware.')
1389
1390    def setup_firmwareupdate_shellball(self, shellball=None):
1391        """Setup a shellball to use in firmware update test.
1392
1393        Check if there is a given shellball, and it is a shell script. Then,
1394        send it to the remote host. Otherwise, use the
1395        /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
1396        BIOS and EC images with the active firmware images.
1397
1398        @param shellball: path of a shellball or default to None.
1399        """
1400        if shellball:
1401            # Determine the firmware file is a shellball or a raw binary.
1402            is_shellball = (utils.system_output("file %s" % shellball).find(
1403                    "shell script") != -1)
1404            if is_shellball:
1405                logging.info('Device will update firmware with shellball %s',
1406                             shellball)
1407                temp_path = self.faft_client.updater.get_temp_path()
1408                working_shellball = os.path.join(temp_path,
1409                                                 'chromeos-firmwareupdate')
1410                self._client.send_file(shellball, working_shellball)
1411                self.faft_client.updater.extract_shellball()
1412            else:
1413                raise error.TestFail(
1414                    'The given shellball is not a shell script.')
1415        else:
1416            logging.info('No shellball given, use the original shellball and '
1417                         'replace its BIOS and EC images.')
1418            work_path = self.faft_client.updater.get_work_path()
1419            bios_in_work_path = os.path.join(
1420                work_path, self.faft_client.updater.get_bios_relative_path())
1421            ec_in_work_path = os.path.join(
1422                work_path, self.faft_client.updater.get_ec_relative_path())
1423            logging.info('Writing current BIOS to: %s', bios_in_work_path)
1424            self.faft_client.bios.dump_whole(bios_in_work_path)
1425            if self.faft_config.chrome_ec:
1426                logging.info('Writing current EC to: %s', ec_in_work_path)
1427                self.faft_client.ec.dump_firmware(ec_in_work_path)
1428            self.faft_client.updater.repack_shellball()
1429
1430    def is_kernel_changed(self):
1431        """Check if the current kernel is changed, by comparing its SHA1 hash.
1432
1433        @return: True if it is changed; otherwise, False.
1434        """
1435        changed = False
1436        for p in ('A', 'B'):
1437            backup_sha = self._backup_kernel_sha.get(p, None)
1438            current_sha = self.faft_client.kernel.get_sha(p)
1439            if backup_sha != current_sha:
1440                changed = True
1441                logging.info('Kernel %s is changed', p)
1442        return changed
1443
1444    def backup_kernel(self, suffix='.original'):
1445        """Backup kernel to files, and the send them to host.
1446
1447        @param suffix: a string appended to backup file name.
1448        """
1449        remote_temp_dir = self.faft_client.system.create_temp_dir()
1450        for p in ('A', 'B'):
1451            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1452            self.faft_client.kernel.dump(p, remote_path)
1453            self._client.get_file(
1454                    remote_path,
1455                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1456            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1457        logging.info('Backup kernel stored in %s with suffix %s',
1458            self.resultsdir, suffix)
1459
1460    def is_kernel_saved(self):
1461        """Check if kernel images are saved (backup_kernel called before).
1462
1463        @return: True if the kernel is saved; otherwise, False.
1464        """
1465        return len(self._backup_kernel_sha) != 0
1466
1467    def clear_saved_kernel(self):
1468        """Clear the kernel saved by backup_kernel()."""
1469        self._backup_kernel_sha = dict()
1470
1471    def restore_kernel(self, suffix='.original'):
1472        """Restore kernel from host in resultsdir.
1473
1474        @param suffix: a string appended to backup file name.
1475        """
1476        if not self.is_kernel_changed():
1477            return
1478
1479        # Backup current corrupted kernel.
1480        self.backup_kernel(suffix='.corrupt')
1481
1482        # Restore kernel.
1483        remote_temp_dir = self.faft_client.system.create_temp_dir()
1484        for p in ('A', 'B'):
1485            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1486            self._client.send_file(
1487                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1488                    remote_path)
1489            self.faft_client.kernel.write(p, remote_path)
1490
1491        self.switcher.mode_aware_reboot()
1492        logging.info('Successfully restored kernel.')
1493
1494    def backup_cgpt_attributes(self):
1495        """Backup CGPT partition table attributes."""
1496        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1497
1498    def restore_cgpt_attributes(self):
1499        """Restore CGPT partition table attributes."""
1500        current_table = self.faft_client.cgpt.get_attributes()
1501        if current_table == self._backup_cgpt_attr:
1502            return
1503        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1504                     self._backup_cgpt_attr,
1505                     current_table)
1506        self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
1507
1508        self.switcher.mode_aware_reboot()
1509        logging.info('Successfully restored CGPT table.')
1510
1511    def try_fwb(self, count=0):
1512        """set to try booting FWB count # times
1513
1514        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
1515        vboot2
1516
1517        @param count: an integer specifying value to program into
1518                      fwb_tries(vb1)/fw_try_next(vb2)
1519        """
1520        if self.fw_vboot2:
1521            self.faft_client.system.set_fw_try_next('B', count)
1522        else:
1523            # vboot1: we need to boot into fwb at least once
1524            if not count:
1525                count = count + 1
1526            self.faft_client.system.set_try_fw_b(count)
1527
1528