• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import pprint
10import re
11import time
12import uuid
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.server import test
17from autotest_lib.server.cros import vboot_constants as vboot
18from autotest_lib.server.cros.faft.config.config import Config as FAFTConfig
19from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
20from autotest_lib.server.cros.faft.utils import mode_switcher
21from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
22from autotest_lib.server.cros.servo import chrome_base_ec
23from autotest_lib.server.cros.servo import chrome_cr50
24from autotest_lib.server.cros.servo import chrome_ec
25
26ConnectionError = mode_switcher.ConnectionError
27
28
29class FAFTBase(test.test):
30    """The base class of FAFT classes.
31
32    It launches the FAFTClient on DUT, such that the test can access its
33    firmware functions and interfaces. It also provides some methods to
34    handle the reboot mechanism, in order to ensure FAFTClient is still
35    connected after reboot.
36    """
37    def initialize(self, host):
38        """Create a FAFTClient object and install the dependency."""
39        self.servo = host.servo
40        self.servo.initialize_dut()
41        self._client = host
42        self.faft_client = RPCProxy(host)
43        self.lockfile = '/var/tmp/faft/lock'
44
45
46class FirmwareTest(FAFTBase):
47    """
48    Base class that sets up helper objects/functions for firmware tests.
49
50    TODO: add documentaion as the FAFT rework progresses.
51    """
52    version = 1
53
54    # Mapping of partition number of kernel and rootfs.
55    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
56    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
57    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
58    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
59
60    CHROMEOS_MAGIC = "CHROMEOS"
61    CORRUPTED_MAGIC = "CORRUPTD"
62
63    # Delay for waiting client to return before EC suspend
64    EC_SUSPEND_DELAY = 5
65
66    # Delay between EC suspend and wake
67    WAKE_DELAY = 10
68
69    # Delay between closing and opening lid
70    LID_DELAY = 1
71
72    _SERVOD_LOG = '/var/log/servod.log'
73
74    _ROOTFS_PARTITION_NUMBER = 3
75
76    _backup_firmware_sha = ()
77    _backup_kernel_sha = dict()
78    _backup_cgpt_attr = dict()
79    _backup_gbb_flags = None
80    _backup_dev_mode = None
81
82    # Class level variable, keep track the states of one time setup.
83    # This variable is preserved across tests which inherit this class.
84    _global_setup_done = {
85        'gbb_flags': False,
86        'reimage': False,
87        'usb_check': False,
88    }
89
90    @classmethod
91    def check_setup_done(cls, label):
92        """Check if the given setup is done.
93
94        @param label: The label of the setup.
95        """
96        return cls._global_setup_done[label]
97
98    @classmethod
99    def mark_setup_done(cls, label):
100        """Mark the given setup done.
101
102        @param label: The label of the setup.
103        """
104        cls._global_setup_done[label] = True
105
106    @classmethod
107    def unmark_setup_done(cls, label):
108        """Mark the given setup not done.
109
110        @param label: The label of the setup.
111        """
112        cls._global_setup_done[label] = False
113
114    def initialize(self, host, cmdline_args, ec_wp=None):
115        super(FirmwareTest, self).initialize(host)
116        self.run_id = str(uuid.uuid4())
117        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
118        # Parse arguments from command line
119        args = {}
120        self.power_control = host.POWER_CONTROL_RPM
121        for arg in cmdline_args:
122            match = re.search("^(\w+)=(.+)", arg)
123            if match:
124                args[match.group(1)] = match.group(2)
125        if 'power_control' in args:
126            self.power_control = args['power_control']
127            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
128                raise error.TestError('Valid values for --args=power_control '
129                                      'are %s. But you entered wrong argument '
130                                      'as "%s".'
131                                       % (host.POWER_CONTROL_VALID_ARGS,
132                                       self.power_control))
133
134        if not self.faft_client.system.dev_tpm_present():
135            raise error.TestError('/dev/tpm0 does not exist on the client')
136
137        self.faft_config = FAFTConfig(
138                self.faft_client.system.get_platform_name())
139        self.checkers = FAFTCheckers(self)
140        self.switcher = mode_switcher.create_mode_switcher(self)
141
142        if self.faft_config.chrome_ec:
143            self.ec = chrome_ec.ChromeEC(self.servo)
144        # Check for presence of a USBPD console
145        if self.faft_config.chrome_usbpd:
146            self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
147        elif self.faft_config.chrome_ec:
148            # If no separate USBPD console, then PD exists on EC console
149            self.usbpd = self.ec
150        # Get plankton console
151        self.plankton = host.plankton
152        self.plankton_host = host._plankton_host
153
154        # Create the BaseEC object. None if not available.
155        self.base_ec = chrome_base_ec.create_base_ec(self.servo)
156
157        self._setup_uart_capture()
158        self._setup_servo_log()
159        self._record_system_info()
160        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
161        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
162        if self.fw_vboot2:
163            self.faft_client.system.set_fw_try_next('A')
164            if self.faft_client.system.get_crossystem_value('mainfw_act') == 'B':
165                logging.info('mainfw_act is B. rebooting to set it A')
166                self.switcher.mode_aware_reboot()
167        self._setup_gbb_flags()
168        self.faft_client.updater.stop_daemon()
169        self._create_faft_lockfile()
170        self._setup_ec_write_protect(ec_wp)
171        # See chromium:239034 regarding needing this sync.
172        self.blocking_sync()
173        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
174
175    def cleanup(self):
176        """Autotest cleanup function."""
177        # Unset state checker in case it's set by subclass
178        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
179        try:
180            self.faft_client.system.is_available()
181        except:
182            # Remote is not responding. Revive DUT so that subsequent tests
183            # don't fail.
184            self._restore_routine_from_timeout()
185        self.switcher.restore_mode()
186        self._restore_ec_write_protect()
187        self._restore_gbb_flags()
188        self.faft_client.updater.start_daemon()
189        self._remove_faft_lockfile()
190        self._record_servo_log()
191        self._record_faft_client_log()
192        self._cleanup_uart_capture()
193        super(FirmwareTest, self).cleanup()
194        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
195
196    def _record_system_info(self):
197        """Record some critical system info to the attr keyval.
198
199        This info is used by generate_test_report later.
200        """
201        system_info = {
202            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
203            'ec_version': self.faft_client.ec.get_version(),
204            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
205            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
206            'servod_version': self._client._servo_host.run(
207                'servod --version').stdout.strip(),
208        }
209
210        if hasattr(self, 'cr50'):
211            system_info['cr50_version'] = self.servo.get('cr50_version')
212
213        logging.info('System info:\n' + pprint.pformat(system_info))
214        self.write_attr_keyval(system_info)
215
216    def invalidate_firmware_setup(self):
217        """Invalidate all firmware related setup state.
218
219        This method is called when the firmware is re-flashed. It resets all
220        firmware related setup states so that the next test setup properly
221        again.
222        """
223        self.unmark_setup_done('gbb_flags')
224
225    def _retrieve_recovery_reason_from_trap(self):
226        """Try to retrieve the recovery reason from a trapped recovery screen.
227
228        @return: The recovery_reason, 0 if any error.
229        """
230        recovery_reason = 0
231        logging.info('Try to retrieve recovery reason...')
232        if self.servo.get_usbkey_direction() == 'dut':
233            self.switcher.bypass_rec_mode()
234        else:
235            self.servo.switch_usbkey('dut')
236
237        try:
238            self.switcher.wait_for_client()
239            lines = self.faft_client.system.run_shell_command_get_output(
240                        'crossystem recovery_reason')
241            recovery_reason = int(lines[0])
242            logging.info('Got the recovery reason %d.', recovery_reason)
243        except ConnectionError:
244            logging.error('Failed to get the recovery reason due to connection '
245                          'error.')
246        return recovery_reason
247
248    def _reset_client(self):
249        """Reset client to a workable state.
250
251        This method is called when the client is not responsive. It may be
252        caused by the following cases:
253          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
254          - corrupted firmware;
255          - corrutped OS image.
256        """
257        # DUT may halt on a firmware screen. Try cold reboot.
258        logging.info('Try cold reboot...')
259        self.switcher.mode_aware_reboot(reboot_type='cold',
260                                        sync_before_boot=False,
261                                        wait_for_dut_up=False)
262        self.switcher.wait_for_client_offline()
263        self.switcher.bypass_dev_mode()
264        try:
265            self.switcher.wait_for_client()
266            return
267        except ConnectionError:
268            logging.warn('Cold reboot doesn\'t help, still connection error.')
269
270        # DUT may be broken by a corrupted firmware. Restore firmware.
271        # We assume the recovery boot still works fine. Since the recovery
272        # code is in RO region and all FAFT tests don't change the RO region
273        # except GBB.
274        if self.is_firmware_saved():
275            self._ensure_client_in_recovery()
276            logging.info('Try restore the original firmware...')
277            if self.is_firmware_changed():
278                try:
279                    self.restore_firmware()
280                    return
281                except ConnectionError:
282                    logging.warn('Restoring firmware doesn\'t help, still '
283                                 'connection error.')
284
285        # Perhaps it's kernel that's broken. Let's try restoring it.
286        if self.is_kernel_saved():
287            self._ensure_client_in_recovery()
288            logging.info('Try restore the original kernel...')
289            if self.is_kernel_changed():
290                try:
291                    self.restore_kernel()
292                    return
293                except ConnectionError:
294                    logging.warn('Restoring kernel doesn\'t help, still '
295                                 'connection error.')
296
297        # DUT may be broken by a corrupted OS image. Restore OS image.
298        self._ensure_client_in_recovery()
299        logging.info('Try restore the OS image...')
300        self.faft_client.system.run_shell_command('chromeos-install --yes')
301        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
302        self.switcher.wait_for_client_offline()
303        self.switcher.bypass_dev_mode()
304        try:
305            self.switcher.wait_for_client()
306            logging.info('Successfully restore OS image.')
307            return
308        except ConnectionError:
309            logging.warn('Restoring OS image doesn\'t help, still connection '
310                         'error.')
311
312    def _ensure_client_in_recovery(self):
313        """Ensure client in recovery boot; reboot into it if necessary.
314
315        @raise TestError: if failed to boot the USB image.
316        """
317        logging.info('Try boot into USB image...')
318        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
319                                     wait_for_dut_up=False)
320        self.servo.switch_usbkey('host')
321        self.switcher.bypass_rec_mode()
322        try:
323            self.switcher.wait_for_client()
324        except ConnectionError:
325            raise error.TestError('Failed to boot the USB image.')
326
327    def _restore_routine_from_timeout(self):
328        """A routine to try to restore the system from a timeout error.
329
330        This method is called when FAFT failed to connect DUT after reboot.
331
332        @raise TestFail: This exception is already raised, with a decription
333                         why it failed.
334        """
335        # DUT is disconnected. Capture the UART output for debug.
336        self._record_uart_capture()
337
338        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
339        # identify if it is a network flaky.
340
341        recovery_reason = self._retrieve_recovery_reason_from_trap()
342
343        # Reset client to a workable state.
344        self._reset_client()
345
346        # Raise the proper TestFail exception.
347        if recovery_reason:
348            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
349                                 'and timed out' % recovery_reason)
350        else:
351            raise error.TestFail('Timed out waiting for DUT reboot')
352
353    def assert_test_image_in_usb_disk(self, usb_dev=None):
354        """Assert an USB disk plugged-in on servo and a test image inside.
355
356        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
357                        If None, it is detected automatically.
358        @raise TestError: if USB disk not detected or not a test image.
359        """
360        if self.check_setup_done('usb_check'):
361            return
362        if usb_dev:
363            assert self.servo.get_usbkey_direction() == 'host'
364        else:
365            self.servo.switch_usbkey('host')
366            usb_dev = self.servo.probe_host_usb_dev()
367            if not usb_dev:
368                raise error.TestError(
369                        'An USB disk should be plugged in the servo board.')
370
371        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
372        logging.info('usb dev is %s', usb_dev)
373        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
374        self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
375
376        try:
377            usb_lsb = self.servo.system_output('cat %s' %
378                os.path.join(tmpd, 'etc/lsb-release'))
379            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
380            dut_lsb = '\n'.join(self.faft_client.system.
381                run_shell_command_get_output('cat /etc/lsb-release'))
382            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
383            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
384                raise error.TestError('USB stick in servo is no test image')
385            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
386            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
387            if usb_board != dut_board:
388                raise error.TestError('USB stick in servo contains a %s '
389                    'image, but DUT is a %s' % (usb_board, dut_board))
390        finally:
391            for cmd in ('umount -l %s' % rootfs, 'sync', 'rm -rf %s' % tmpd):
392                self.servo.system(cmd)
393
394        self.mark_setup_done('usb_check')
395
396    def setup_usbkey(self, usbkey, host=None):
397        """Setup the USB disk for the test.
398
399        It checks the setup of USB disk and a valid ChromeOS test image inside.
400        It also muxes the USB disk to either the host or DUT by request.
401
402        @param usbkey: True if the USB disk is required for the test, False if
403                       not required.
404        @param host: Optional, True to mux the USB disk to host, False to mux it
405                    to DUT, default to do nothing.
406        """
407        if usbkey:
408            self.assert_test_image_in_usb_disk()
409        elif host is None:
410            # USB disk is not required for the test. Better to mux it to host.
411            host = True
412
413        if host is True:
414            self.servo.switch_usbkey('host')
415        elif host is False:
416            self.servo.switch_usbkey('dut')
417
418    def get_usbdisk_path_on_dut(self):
419        """Get the path of the USB disk device plugged-in the servo on DUT.
420
421        Returns:
422          A string representing USB disk path, like '/dev/sdb', or None if
423          no USB disk is found.
424        """
425        cmd = 'ls -d /dev/s*[a-z]'
426        original_value = self.servo.get_usbkey_direction()
427
428        # Make the dut unable to see the USB disk.
429        self.servo.switch_usbkey('off')
430        no_usb_set = set(
431            self.faft_client.system.run_shell_command_get_output(cmd))
432
433        # Make the dut able to see the USB disk.
434        self.servo.switch_usbkey('dut')
435        time.sleep(self.faft_config.usb_plug)
436        has_usb_set = set(
437            self.faft_client.system.run_shell_command_get_output(cmd))
438
439        # Back to its original value.
440        if original_value != self.servo.get_usbkey_direction():
441            self.servo.switch_usbkey(original_value)
442
443        diff_set = has_usb_set - no_usb_set
444        if len(diff_set) == 1:
445            return diff_set.pop()
446        else:
447            return None
448
449    def _create_faft_lockfile(self):
450        """Creates the FAFT lockfile."""
451        logging.info('Creating FAFT lockfile...')
452        command = 'touch %s' % (self.lockfile)
453        self.faft_client.system.run_shell_command(command)
454
455    def _remove_faft_lockfile(self):
456        """Removes the FAFT lockfile."""
457        logging.info('Removing FAFT lockfile...')
458        command = 'rm -f %s' % (self.lockfile)
459        self.faft_client.system.run_shell_command(command)
460
461    def clear_set_gbb_flags(self, clear_mask, set_mask):
462        """Clear and set the GBB flags in the current flashrom.
463
464        @param clear_mask: A mask of flags to be cleared.
465        @param set_mask: A mask of flags to be set.
466        """
467        gbb_flags = self.faft_client.bios.get_gbb_flags()
468        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
469        if new_flags != gbb_flags:
470            self._backup_gbb_flags = gbb_flags
471            logging.info('Changing GBB flags from 0x%x to 0x%x.',
472                         gbb_flags, new_flags)
473            self.faft_client.bios.set_gbb_flags(new_flags)
474            # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
475            # reboot to get a clear state
476            if ((gbb_flags ^ new_flags) &
477                (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
478                 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
479                self.switcher.mode_aware_reboot()
480        else:
481            logging.info('Current GBB flags look good for test: 0x%x.',
482                         gbb_flags)
483
484    def check_ec_capability(self, required_cap=None, suppress_warning=False):
485        """Check if current platform has required EC capabilities.
486
487        @param required_cap: A list containing required EC capabilities. Pass in
488                             None to only check for presence of Chrome EC.
489        @param suppress_warning: True to suppress any warning messages.
490        @return: True if requirements are met. Otherwise, False.
491        """
492        if not self.faft_config.chrome_ec:
493            if not suppress_warning:
494                logging.warn('Requires Chrome EC to run this test.')
495            return False
496
497        if not required_cap:
498            return True
499
500        for cap in required_cap:
501            if cap not in self.faft_config.ec_capability:
502                if not suppress_warning:
503                    logging.warn('Requires EC capability "%s" to run this '
504                                 'test.', cap)
505                return False
506
507        return True
508
509    def check_root_part_on_non_recovery(self, part):
510        """Check the partition number of root device and on normal/dev boot.
511
512        @param part: A string of partition number, e.g.'3'.
513        @return: True if the root device matched and on normal/dev boot;
514                 otherwise, False.
515        """
516        return self.checkers.root_part_checker(part) and \
517                self.checkers.crossystem_checker({
518                    'mainfw_type': ('normal', 'developer'),
519                })
520
521    def _join_part(self, dev, part):
522        """Return a concatenated string of device and partition number.
523
524        @param dev: A string of device, e.g.'/dev/sda'.
525        @param part: A string of partition number, e.g.'3'.
526        @return: A concatenated string of device and partition number,
527                 e.g.'/dev/sda3'.
528
529        >>> seq = FirmwareTest()
530        >>> seq._join_part('/dev/sda', '3')
531        '/dev/sda3'
532        >>> seq._join_part('/dev/mmcblk0', '2')
533        '/dev/mmcblk0p2'
534        """
535        if 'mmcblk' in dev:
536            return dev + 'p' + part
537        elif 'nvme' in dev:
538            return dev + 'p' + part
539        else:
540            return dev + part
541
542    def copy_kernel_and_rootfs(self, from_part, to_part):
543        """Copy kernel and rootfs from from_part to to_part.
544
545        @param from_part: A string of partition number to be copied from.
546        @param to_part: A string of partition number to be copied to.
547        """
548        root_dev = self.faft_client.system.get_root_dev()
549        logging.info('Copying kernel from %s to %s. Please wait...',
550                     from_part, to_part)
551        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
552                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
553                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
554        logging.info('Copying rootfs from %s to %s. Please wait...',
555                     from_part, to_part)
556        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
557                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
558                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
559
560    def ensure_kernel_boot(self, part):
561        """Ensure the request kernel boot.
562
563        If not, it duplicates the current kernel to the requested kernel
564        and sets the requested higher priority to ensure it boot.
565
566        @param part: A string of kernel partition number or 'a'/'b'.
567        """
568        if not self.checkers.root_part_checker(part):
569            if self.faft_client.kernel.diff_a_b():
570                self.copy_kernel_and_rootfs(
571                        from_part=self.OTHER_KERNEL_MAP[part],
572                        to_part=part)
573            self.reset_and_prioritize_kernel(part)
574            self.switcher.mode_aware_reboot()
575
576    def ensure_dev_internal_boot(self, original_dev_boot_usb):
577        """Ensure internal device boot in developer mode.
578
579        If not internal device boot, it will try to reboot the device and
580        bypass dev mode to boot into internal device.
581
582        @param original_dev_boot_usb: Original dev_boot_usb value.
583        """
584        logging.info('Checking internal device boot.')
585        if self.faft_client.system.is_removable_device_boot():
586            logging.info('Reboot into internal disk...')
587            self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
588            self.switcher.mode_aware_reboot()
589        self.check_state((self.checkers.dev_boot_usb_checker, False,
590                          'Device not booted from internal disk properly.'))
591
592    def set_hardware_write_protect(self, enable):
593        """Set hardware write protect pin.
594
595        @param enable: True if asserting write protect pin. Otherwise, False.
596        """
597        try:
598            self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
599        except:
600            # TODO(waihong): Remove this fallback when all servos have the
601            # above new fw_wp_state control.
602            self.servo.set('fw_wp_vref', self.faft_config.wp_voltage)
603            self.servo.set('fw_wp_en', 'on')
604            self.servo.set('fw_wp', 'on' if enable else 'off')
605
606    def set_ec_write_protect_and_reboot(self, enable):
607        """Set EC write protect status and reboot to take effect.
608
609        The write protect state is only activated if both hardware write
610        protect pin is asserted and software write protect flag is set.
611        This method asserts/deasserts hardware write protect pin first, and
612        set corresponding EC software write protect flag.
613
614        If the device uses non-Chrome EC, set the software write protect via
615        flashrom.
616
617        If the device uses Chrome EC, a reboot is required for write protect
618        to take effect. Since the software write protect flag cannot be unset
619        if hardware write protect pin is asserted, we need to deasserted the
620        pin first if we are deactivating write protect. Similarly, a reboot
621        is required before we can modify the software flag.
622
623        @param enable: True if activating EC write protect. Otherwise, False.
624        """
625        self.set_hardware_write_protect(enable)
626        if self.faft_config.chrome_ec:
627            self.set_chrome_ec_write_protect_and_reboot(enable)
628        else:
629            self.faft_client.ec.set_write_protect(enable)
630            self.switcher.mode_aware_reboot()
631
632    def set_chrome_ec_write_protect_and_reboot(self, enable):
633        """Set Chrome EC write protect status and reboot to take effect.
634
635        @param enable: True if activating EC write protect. Otherwise, False.
636        """
637        if enable:
638            # Set write protect flag and reboot to take effect.
639            self.ec.set_flash_write_protect(enable)
640            self.sync_and_ec_reboot()
641        else:
642            # Reboot after deasserting hardware write protect pin to deactivate
643            # write protect. And then remove software write protect flag.
644            self.sync_and_ec_reboot()
645            self.ec.set_flash_write_protect(enable)
646
647    def _setup_ec_write_protect(self, ec_wp):
648        """Setup for EC write-protection.
649
650        It makes sure the EC in the requested write-protection state. If not, it
651        flips the state. Flipping the write-protection requires DUT reboot.
652
653        @param ec_wp: True to request EC write-protected; False to request EC
654                      not write-protected; None to do nothing.
655        """
656        if ec_wp is None:
657            self._old_ec_wp = None
658            return
659        self._old_ec_wp = self.checkers.crossystem_checker({'wpsw_boot': '1'})
660        if ec_wp != self._old_ec_wp:
661            logging.info('The test required EC is %swrite-protected. Reboot '
662                         'and flip the state.', '' if ec_wp else 'not ')
663            self.switcher.mode_aware_reboot(
664                    'custom',
665                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
666
667    def _restore_ec_write_protect(self):
668        """Restore the original EC write-protection."""
669        if (not hasattr(self, '_old_ec_wp')) or (self._old_ec_wp is None):
670            return
671        if not self.checkers.crossystem_checker(
672                {'wpsw_boot': '1' if self._old_ec_wp else '0'}):
673            logging.info('Restore original EC write protection and reboot.')
674            self.switcher.mode_aware_reboot(
675                    'custom',
676                    lambda:self.set_ec_write_protect_and_reboot(
677                            self._old_ec_wp))
678
679    def _setup_uart_capture(self):
680        """Setup the CPU/EC/PD UART capture."""
681        self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
682        self.servo.set('cpu_uart_capture', 'on')
683        self.cr50_uart_file = None
684        self.ec_uart_file = None
685        self.usbpd_uart_file = None
686        try:
687            # Check that the console works before declaring the cr50 console
688            # connection exists and enabling uart capture.
689            self.servo.get('cr50_version')
690            self.servo.set('cr50_uart_capture', 'on')
691            self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt')
692            self.cr50 = chrome_cr50.ChromeCr50(self.servo)
693        except error.TestFail as e:
694            if 'No control named' in str(e):
695                logging.warn('cr50 console not supported.')
696        if self.faft_config.chrome_ec:
697            try:
698                self.servo.set('ec_uart_capture', 'on')
699                self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
700            except error.TestFail as e:
701                if 'No control named' in str(e):
702                    logging.warn('The servod is too old that ec_uart_capture '
703                                 'not supported.')
704            # Log separate PD console if supported
705            if self.check_ec_capability(['usbpd_uart'], suppress_warning=True):
706                try:
707                    self.servo.set('usbpd_uart_capture', 'on')
708                    self.usbpd_uart_file = os.path.join(self.resultsdir,
709                                                        'usbpd_uart.txt')
710                except error.TestFail as e:
711                    if 'No control named' in str(e):
712                        logging.warn('The servod is too old that '
713                                     'usbpd_uart_capture is not supported.')
714        else:
715            logging.info('Not a Google EC, cannot capture ec console output.')
716
717    def _record_uart_capture(self):
718        """Record the CPU/EC/PD UART output stream to files."""
719        if self.cpu_uart_file:
720            with open(self.cpu_uart_file, 'a') as f:
721                f.write(ast.literal_eval(self.servo.get('cpu_uart_stream')))
722        if self.cr50_uart_file:
723            with open(self.cr50_uart_file, 'a') as f:
724                f.write(ast.literal_eval(self.servo.get('cr50_uart_stream')))
725        if self.ec_uart_file and self.faft_config.chrome_ec:
726            with open(self.ec_uart_file, 'a') as f:
727                f.write(ast.literal_eval(self.servo.get('ec_uart_stream')))
728        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
729            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
730            with open(self.usbpd_uart_file, 'a') as f:
731                f.write(ast.literal_eval(self.servo.get('usbpd_uart_stream')))
732
733    def _cleanup_uart_capture(self):
734        """Cleanup the CPU/EC/PD UART capture."""
735        # Flush the remaining UART output.
736        self._record_uart_capture()
737        self.servo.set('cpu_uart_capture', 'off')
738        if self.cr50_uart_file:
739            self.servo.set('cr50_uart_capture', 'off')
740        if self.ec_uart_file and self.faft_config.chrome_ec:
741            self.servo.set('ec_uart_capture', 'off')
742        if (self.usbpd_uart_file and self.faft_config.chrome_ec and
743            self.check_ec_capability(['usbpd_uart'], suppress_warning=True)):
744            self.servo.set('usbpd_uart_capture', 'off')
745
746    def _get_power_state(self, power_state):
747        """
748        Return the current power state of the AP
749        """
750        return self.ec.send_command_get_output("powerinfo", [power_state])
751
752    def wait_power_state(self, power_state, retries):
753        """
754        Wait for certain power state.
755
756        @param power_state: power state you are expecting
757        @param retries: retries.  This is necessary if AP is powering down
758        and transitioning through different states.
759        """
760        logging.info('Checking power state "%s" maximum %d times.',
761                     power_state, retries)
762        while retries > 0:
763            logging.info("try count: %d", retries)
764            try:
765                retries = retries - 1
766                ret = self._get_power_state(power_state)
767                return True
768            except error.TestFail:
769                pass
770        return False
771
772    def suspend(self):
773        """Suspends the DUT."""
774        cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
775        self.faft_client.system.run_shell_command(cmd)
776        time.sleep(self.EC_SUSPEND_DELAY)
777
778    def _fetch_servo_log(self):
779        """Fetch the servo log."""
780        cmd = '[ -e %s ] && cat %s || echo NOTFOUND' % ((self._SERVOD_LOG,) * 2)
781        servo_log = self.servo.system_output(cmd)
782        return None if servo_log == 'NOTFOUND' else servo_log
783
784    def _setup_servo_log(self):
785        """Setup the servo log capturing."""
786        self.servo_log_original_len = -1
787        if self.servo.is_localhost():
788            # No servo log recorded when servod runs locally.
789            return
790
791        servo_log = self._fetch_servo_log()
792        if servo_log:
793            self.servo_log_original_len = len(servo_log)
794        else:
795            logging.warn('Servo log file not found.')
796
797    def _record_servo_log(self):
798        """Record the servo log to the results directory."""
799        if self.servo_log_original_len != -1:
800            servo_log = self._fetch_servo_log()
801            servo_log_file = os.path.join(self.resultsdir, 'servod.log')
802            with open(servo_log_file, 'a') as f:
803                f.write(servo_log[self.servo_log_original_len:])
804
805    def _record_faft_client_log(self):
806        """Record the faft client log to the results directory."""
807        client_log = self.faft_client.system.dump_log(True)
808        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
809        with open(client_log_file, 'w') as f:
810            f.write(client_log)
811
812    def _setup_gbb_flags(self):
813        """Setup the GBB flags for FAFT test."""
814        if self.faft_config.gbb_version < 1.1:
815            logging.info('Skip modifying GBB on versions older than 1.1.')
816            return
817
818        if self.check_setup_done('gbb_flags'):
819            return
820
821        logging.info('Set proper GBB flags for test.')
822        self.clear_set_gbb_flags(vboot.GBB_FLAG_DEV_SCREEN_SHORT_DELAY |
823                                 vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
824                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_USB |
825                                 vboot.GBB_FLAG_DISABLE_FW_ROLLBACK_CHECK |
826                                 vboot.GBB_FLAG_FORCE_DEV_BOOT_FASTBOOT_FULL_CAP,
827                                 vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM |
828                                 vboot.GBB_FLAG_FAFT_KEY_OVERIDE)
829        self.mark_setup_done('gbb_flags')
830
831    def drop_backup_gbb_flags(self):
832        """Drops the backup GBB flags.
833
834        This can be used when a test intends to permanently change GBB flags.
835        """
836        self._backup_gbb_flags = None
837
838    def _restore_gbb_flags(self):
839        """Restore GBB flags to their original state."""
840        if self._backup_gbb_flags is None:
841            return
842        # Setting up and restoring the GBB flags take a lot of time. For
843        # speed-up purpose, don't restore it.
844        logging.info('***')
845        logging.info('*** Please manually restore the original GBB flags to: '
846                     '0x%x ***', self._backup_gbb_flags)
847        logging.info('***')
848        self.unmark_setup_done('gbb_flags')
849
850    def setup_tried_fwb(self, tried_fwb):
851        """Setup for fw B tried state.
852
853        It makes sure the system in the requested fw B tried state. If not, it
854        tries to do so.
855
856        @param tried_fwb: True if requested in tried_fwb=1;
857                          False if tried_fwb=0.
858        """
859        if tried_fwb:
860            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
861                logging.info(
862                    'Firmware is not booted with tried_fwb. Reboot into it.')
863                self.faft_client.system.set_try_fw_b()
864        else:
865            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
866                logging.info(
867                    'Firmware is booted with tried_fwb. Reboot to clear.')
868
869    def power_on(self):
870        """Switch DUT AC power on."""
871        self._client.power_on(self.power_control)
872
873    def power_off(self):
874        """Switch DUT AC power off."""
875        self._client.power_off(self.power_control)
876
877    def power_cycle(self):
878        """Power cycle DUT AC power."""
879        self._client.power_cycle(self.power_control)
880
881    def setup_rw_boot(self, section='a'):
882        """Make sure firmware is in RW-boot mode.
883
884        If the given firmware section is in RO-boot mode, turn off the RO-boot
885        flag and reboot DUT into RW-boot mode.
886
887        @param section: A firmware section, either 'a' or 'b'.
888        """
889        flags = self.faft_client.bios.get_preamble_flags(section)
890        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
891            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
892            self.faft_client.bios.set_preamble_flags(section, flags)
893            self.switcher.mode_aware_reboot()
894
895    def setup_kernel(self, part):
896        """Setup for kernel test.
897
898        It makes sure both kernel A and B bootable and the current boot is
899        the requested kernel part.
900
901        @param part: A string of kernel partition number or 'a'/'b'.
902        """
903        self.ensure_kernel_boot(part)
904        logging.info('Checking the integrity of kernel B and rootfs B...')
905        if (self.faft_client.kernel.diff_a_b() or
906                not self.faft_client.rootfs.verify_rootfs('B')):
907            logging.info('Copying kernel and rootfs from A to B...')
908            self.copy_kernel_and_rootfs(from_part=part,
909                                        to_part=self.OTHER_KERNEL_MAP[part])
910        self.reset_and_prioritize_kernel(part)
911
912    def reset_and_prioritize_kernel(self, part):
913        """Make the requested partition highest priority.
914
915        This function also reset kerenl A and B to bootable.
916
917        @param part: A string of partition number to be prioritized.
918        """
919        root_dev = self.faft_client.system.get_root_dev()
920        # Reset kernel A and B to bootable.
921        self.faft_client.system.run_shell_command(
922            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
923        self.faft_client.system.run_shell_command(
924            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
925        # Set kernel part highest priority.
926        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
927                (self.KERNEL_MAP[part], root_dev))
928
929    def do_blocking_sync(self, device):
930        """Run a blocking sync command."""
931        logging.info("Blocking sync for %s", device)
932        if 'mmcblk' in device:
933            # For mmc devices, use `mmc status get` command to send an
934            # empty command to wait for the disk to be available again.
935            self.faft_client.system.run_shell_command('mmc status get %s' %
936                                                      device)
937        elif 'nvme' in device:
938            # Get a list of NVMe namespace and flush them individually
939            # Assumes the output format from nvme list-ns command will
940            # be something like follows:
941            # [ 0]:0x1
942            # [ 1]:0x2
943            available_ns = self.faft_client.system.run_shell_command_get_output(
944                                                  'nvme list-ns %s -a' % device)
945            for ns in available_ns:
946                ns = ns.split(':')[-1]
947                # For NVMe devices, use `nvme flush` command to commit data
948                # and metadata to non-volatile media.
949                self.faft_client.system.run_shell_command(
950                                 'nvme flush %s -n %s' % (device, ns))
951        else:
952            # For other devices, hdparm sends TUR to check if
953            # a device is ready for transfer operation.
954            self.faft_client.system.run_shell_command('hdparm -f %s' % device)
955
956    def blocking_sync(self):
957        """Sync root device and internal device."""
958        # The double calls to sync fakes a blocking call
959        # since the first call returns before the flush
960        # is complete, but the second will wait for the
961        # first to finish.
962        self.faft_client.system.run_shell_command('sync')
963        self.faft_client.system.run_shell_command('sync')
964
965        # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
966        # This function will perform a device-specific sync command.
967        root_dev = self.faft_client.system.get_root_dev()
968        self.do_blocking_sync(root_dev)
969
970        # Also sync the internal device if booted from removable media.
971        if self.faft_client.system.is_removable_device_boot():
972            internal_dev = self.faft_client.system.get_internal_device()
973            self.do_blocking_sync(internal_dev)
974
975    def sync_and_ec_reboot(self, flags=''):
976        """Request the client sync and do a EC triggered reboot.
977
978        @param flags: Optional, a space-separated string of flags passed to EC
979                      reboot command, including:
980                          default: EC soft reboot;
981                          'hard': EC cold/hard reboot.
982        """
983        self.blocking_sync()
984        self.ec.reboot(flags)
985        time.sleep(self.faft_config.ec_boot_to_console)
986        self.check_lid_and_power_on()
987
988    def reboot_and_reset_tpm(self):
989        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
990        self.switcher.reboot_to_mode(to_mode='rec')
991        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
992        self.switcher.mode_aware_reboot()
993
994    def full_power_off_and_on(self):
995        """Shutdown the device by pressing power button and power on again."""
996        boot_id = self.get_bootid()
997        # Press power button to trigger Chrome OS normal shutdown process.
998        # We use a customized delay since the normal-press 1.2s is not enough.
999        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
1000        # device can take 44-51 seconds to restart,
1001        # add buffer from the default timeout of 60 seconds.
1002        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
1003        time.sleep(self.faft_config.shutdown)
1004        # Short press power button to boot DUT again.
1005        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1006
1007    def check_lid_and_power_on(self):
1008        """
1009        On devices with EC software sync, system powers on after EC reboots if
1010        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
1011        This method checks lid switch state and presses power button if
1012        necessary.
1013        """
1014        if self.servo.get("lid_open") == "no":
1015            time.sleep(self.faft_config.software_sync)
1016            self.servo.power_short_press()
1017
1018    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
1019        """Modify the kernel header magic in USB stick.
1020
1021        The kernel header magic is the first 8-byte of kernel partition.
1022        We modify it to make it fail on kernel verification check.
1023
1024        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1025        @param from_magic: A string of magic which we change it from.
1026        @param to_magic: A string of magic which we change it to.
1027        @raise TestError: if failed to change magic.
1028        """
1029        assert len(from_magic) == 8
1030        assert len(to_magic) == 8
1031        # USB image only contains one kernel.
1032        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1033        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1034        current_magic = self.servo.system_output(read_cmd)
1035        if current_magic == to_magic:
1036            logging.info("The kernel magic is already %s.", current_magic)
1037            return
1038        if current_magic != from_magic:
1039            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1040
1041        logging.info('Modify the kernel magic in USB, from %s to %s.',
1042                     from_magic, to_magic)
1043        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1044                     " 2>/dev/null" % (to_magic, kernel_part))
1045        self.servo.system(write_cmd)
1046
1047        if self.servo.system_output(read_cmd) != to_magic:
1048            raise error.TestError("Failed to write new magic.")
1049
1050    def corrupt_usb_kernel(self, usb_dev):
1051        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1052
1053        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1054        """
1055        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1056                                self.CORRUPTED_MAGIC)
1057
1058    def restore_usb_kernel(self, usb_dev):
1059        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1060
1061        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1062        """
1063        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1064                                self.CHROMEOS_MAGIC)
1065
1066    def _call_action(self, action_tuple, check_status=False):
1067        """Call the action function with/without arguments.
1068
1069        @param action_tuple: A function, or a tuple (function, args, error_msg),
1070                             in which, args and error_msg are optional. args is
1071                             either a value or a tuple if multiple arguments.
1072                             This can also be a list containing multiple
1073                             function or tuple. In this case, these actions are
1074                             called in sequence.
1075        @param check_status: Check the return value of action function. If not
1076                             succeed, raises a TestFail exception.
1077        @return: The result value of the action function.
1078        @raise TestError: An error when the action function is not callable.
1079        @raise TestFail: When check_status=True, action function not succeed.
1080        """
1081        if isinstance(action_tuple, list):
1082            return all([self._call_action(action, check_status=check_status)
1083                        for action in action_tuple])
1084
1085        action = action_tuple
1086        args = ()
1087        error_msg = 'Not succeed'
1088        if isinstance(action_tuple, tuple):
1089            action = action_tuple[0]
1090            if len(action_tuple) >= 2:
1091                args = action_tuple[1]
1092                if not isinstance(args, tuple):
1093                    args = (args,)
1094            if len(action_tuple) >= 3:
1095                error_msg = action_tuple[2]
1096
1097        if action is None:
1098            return
1099
1100        if not callable(action):
1101            raise error.TestError('action is not callable!')
1102
1103        info_msg = 'calling %s' % action.__name__
1104        if args:
1105            info_msg += ' with args %s' % str(args)
1106        logging.info(info_msg)
1107        ret = action(*args)
1108
1109        if check_status and not ret:
1110            raise error.TestFail('%s: %s returning %s' %
1111                                 (error_msg, info_msg, str(ret)))
1112        return ret
1113
1114    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1115                             run_power_action=True, post_power_action=None,
1116                             shutdown_timeout=None):
1117        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1118
1119        @param shutdown_action: function which makes DUT shutdown, like
1120                                pressing power key.
1121        @param pre_power_action: function which is called before next power on.
1122        @param run_power_action: power_key press by default, set to None to skip.
1123        @param post_power_action: function which is called after next power on.
1124        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1125        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1126        """
1127        self._call_action(shutdown_action)
1128        logging.info('Wait to ensure DUT shut down...')
1129        try:
1130            if shutdown_timeout is None:
1131                shutdown_timeout = self.faft_config.shutdown_timeout
1132            self.switcher.wait_for_client(timeout=shutdown_timeout)
1133            raise error.TestFail(
1134                    'Should shut the device down after calling %s.' %
1135                    shutdown_action.__name__)
1136        except ConnectionError:
1137            if self.check_ec_capability(['x86'], suppress_warning=True):
1138                PWR_RETRIES=5
1139                if not self.wait_power_state("G3", PWR_RETRIES):
1140                    raise error.TestFail("System not shutdown properly and EC"
1141                                         "fails to enter into G3 state.")
1142                logging.info('System entered into G3 state..')
1143            logging.info(
1144                'DUT is surely shutdown. We are going to power it on again...')
1145
1146        if pre_power_action:
1147            self._call_action(pre_power_action)
1148        if run_power_action:
1149            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1150        if post_power_action:
1151            self._call_action(post_power_action)
1152
1153    def get_bootid(self, retry=3):
1154        """
1155        Return the bootid.
1156        """
1157        boot_id = None
1158        while retry:
1159            try:
1160                boot_id = self._client.get_boot_id()
1161                break
1162            except error.AutoservRunError:
1163                retry -= 1
1164                if retry:
1165                    logging.info('Retry to get boot_id...')
1166                else:
1167                    logging.warning('Failed to get boot_id.')
1168        logging.info('boot_id: %s', boot_id)
1169        return boot_id
1170
1171    def check_state(self, func):
1172        """
1173        Wrapper around _call_action with check_status set to True. This is a
1174        helper function to be used by tests and is currently implemented by
1175        calling _call_action with check_status=True.
1176
1177        TODO: This function's arguments need to be made more stringent. And
1178        its functionality should be moved over to check functions directly in
1179        the future.
1180
1181        @param func: A function, or a tuple (function, args, error_msg),
1182                             in which, args and error_msg are optional. args is
1183                             either a value or a tuple if multiple arguments.
1184                             This can also be a list containing multiple
1185                             function or tuple. In this case, these actions are
1186                             called in sequence.
1187        @return: The result value of the action function.
1188        @raise TestFail: If the function does notsucceed.
1189        """
1190        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1191        self._call_action(func, check_status=True)
1192        logging.info("-[FAFT]-[ end state_checker ]----------------")
1193
1194    def get_current_firmware_sha(self):
1195        """Get current firmware sha of body and vblock.
1196
1197        @return: Current firmware sha follows the order (
1198                 vblock_a_sha, body_a_sha, vblock_b_sha, body_b_sha)
1199        """
1200        current_firmware_sha = (self.faft_client.bios.get_sig_sha('a'),
1201                                self.faft_client.bios.get_body_sha('a'),
1202                                self.faft_client.bios.get_sig_sha('b'),
1203                                self.faft_client.bios.get_body_sha('b'))
1204        if not all(current_firmware_sha):
1205            raise error.TestError('Failed to get firmware sha.')
1206        return current_firmware_sha
1207
1208    def is_firmware_changed(self):
1209        """Check if the current firmware changed, by comparing its SHA.
1210
1211        @return: True if it is changed, otherwise Flase.
1212        """
1213        # Device may not be rebooted after test.
1214        self.faft_client.bios.reload()
1215
1216        current_sha = self.get_current_firmware_sha()
1217
1218        if current_sha == self._backup_firmware_sha:
1219            return False
1220        else:
1221            corrupt_VBOOTA = (current_sha[0] != self._backup_firmware_sha[0])
1222            corrupt_FVMAIN = (current_sha[1] != self._backup_firmware_sha[1])
1223            corrupt_VBOOTB = (current_sha[2] != self._backup_firmware_sha[2])
1224            corrupt_FVMAINB = (current_sha[3] != self._backup_firmware_sha[3])
1225            logging.info('Firmware changed:')
1226            logging.info('VBOOTA is changed: %s', corrupt_VBOOTA)
1227            logging.info('VBOOTB is changed: %s', corrupt_VBOOTB)
1228            logging.info('FVMAIN is changed: %s', corrupt_FVMAIN)
1229            logging.info('FVMAINB is changed: %s', corrupt_FVMAINB)
1230            return True
1231
1232    def backup_firmware(self, suffix='.original'):
1233        """Backup firmware to file, and then send it to host.
1234
1235        @param suffix: a string appended to backup file name
1236        """
1237        remote_temp_dir = self.faft_client.system.create_temp_dir()
1238        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1239        self.faft_client.bios.dump_whole(remote_bios_path)
1240        self._client.get_file(remote_bios_path,
1241                              os.path.join(self.resultsdir, 'bios' + suffix))
1242
1243        if self.faft_config.chrome_ec:
1244            remote_ec_path = os.path.join(remote_temp_dir, 'ec')
1245            self.faft_client.ec.dump_whole(remote_ec_path)
1246            self._client.get_file(remote_ec_path,
1247                              os.path.join(self.resultsdir, 'ec' + suffix))
1248
1249        self._client.run('rm -rf %s' % remote_temp_dir)
1250        logging.info('Backup firmware stored in %s with suffix %s',
1251            self.resultsdir, suffix)
1252
1253        self._backup_firmware_sha = self.get_current_firmware_sha()
1254
1255    def is_firmware_saved(self):
1256        """Check if a firmware saved (called backup_firmware before).
1257
1258        @return: True if the firmware is backuped; otherwise False.
1259        """
1260        return self._backup_firmware_sha != ()
1261
1262    def clear_saved_firmware(self):
1263        """Clear the firmware saved by the method backup_firmware."""
1264        self._backup_firmware_sha = ()
1265
1266    def restore_firmware(self, suffix='.original', restore_ec=True):
1267        """Restore firmware from host in resultsdir.
1268
1269        @param suffix: a string appended to backup file name
1270        @param restore_ec: True to restore the ec firmware; False not to do.
1271        """
1272        if not self.is_firmware_changed():
1273            return
1274
1275        # Backup current corrupted firmware.
1276        self.backup_firmware(suffix='.corrupt')
1277
1278        # Restore firmware.
1279        remote_temp_dir = self.faft_client.system.create_temp_dir()
1280        self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1281                               os.path.join(remote_temp_dir, 'bios'))
1282
1283        self.faft_client.bios.write_whole(
1284            os.path.join(remote_temp_dir, 'bios'))
1285
1286        if self.faft_config.chrome_ec and restore_ec:
1287            self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix),
1288                os.path.join(remote_temp_dir, 'ec'))
1289            self.faft_client.ec.write_whole(
1290                os.path.join(remote_temp_dir, 'ec'))
1291
1292        self.switcher.mode_aware_reboot()
1293        logging.info('Successfully restore firmware.')
1294
1295    def setup_firmwareupdate_shellball(self, shellball=None):
1296        """Setup a shellball to use in firmware update test.
1297
1298        Check if there is a given shellball, and it is a shell script. Then,
1299        send it to the remote host. Otherwise, use the
1300        /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
1301        BIOS and EC images with the active firmware images.
1302
1303        @param shellball: path of a shellball or default to None.
1304        """
1305        if shellball:
1306            # Determine the firmware file is a shellball or a raw binary.
1307            is_shellball = (utils.system_output("file %s" % shellball).find(
1308                    "shell script") != -1)
1309            if is_shellball:
1310                logging.info('Device will update firmware with shellball %s',
1311                             shellball)
1312                temp_path = self.faft_client.updater.get_temp_path()
1313                working_shellball = os.path.join(temp_path,
1314                                                 'chromeos-firmwareupdate')
1315                self._client.send_file(shellball, working_shellball)
1316                self.faft_client.updater.extract_shellball()
1317            else:
1318                raise error.TestFail(
1319                    'The given shellball is not a shell script.')
1320        else:
1321            logging.info('No shellball given, use the original shellball and '
1322                         'replace its BIOS and EC images.')
1323            work_path = self.faft_client.updater.get_work_path()
1324            bios_in_work_path = os.path.join(
1325                work_path, self.faft_client.updater.get_bios_relative_path())
1326            ec_in_work_path = os.path.join(
1327                work_path, self.faft_client.updater.get_ec_relative_path())
1328            logging.info('Writing current BIOS to: %s', bios_in_work_path)
1329            self.faft_client.bios.dump_whole(bios_in_work_path)
1330            if self.faft_config.chrome_ec:
1331                logging.info('Writing current EC to: %s', ec_in_work_path)
1332                self.faft_client.ec.dump_firmware(ec_in_work_path)
1333            self.faft_client.updater.repack_shellball()
1334
1335    def is_kernel_changed(self):
1336        """Check if the current kernel is changed, by comparing its SHA1 hash.
1337
1338        @return: True if it is changed; otherwise, False.
1339        """
1340        changed = False
1341        for p in ('A', 'B'):
1342            backup_sha = self._backup_kernel_sha.get(p, None)
1343            current_sha = self.faft_client.kernel.get_sha(p)
1344            if backup_sha != current_sha:
1345                changed = True
1346                logging.info('Kernel %s is changed', p)
1347        return changed
1348
1349    def backup_kernel(self, suffix='.original'):
1350        """Backup kernel to files, and the send them to host.
1351
1352        @param suffix: a string appended to backup file name.
1353        """
1354        remote_temp_dir = self.faft_client.system.create_temp_dir()
1355        for p in ('A', 'B'):
1356            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1357            self.faft_client.kernel.dump(p, remote_path)
1358            self._client.get_file(
1359                    remote_path,
1360                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1361            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1362        logging.info('Backup kernel stored in %s with suffix %s',
1363            self.resultsdir, suffix)
1364
1365    def is_kernel_saved(self):
1366        """Check if kernel images are saved (backup_kernel called before).
1367
1368        @return: True if the kernel is saved; otherwise, False.
1369        """
1370        return len(self._backup_kernel_sha) != 0
1371
1372    def clear_saved_kernel(self):
1373        """Clear the kernel saved by backup_kernel()."""
1374        self._backup_kernel_sha = dict()
1375
1376    def restore_kernel(self, suffix='.original'):
1377        """Restore kernel from host in resultsdir.
1378
1379        @param suffix: a string appended to backup file name.
1380        """
1381        if not self.is_kernel_changed():
1382            return
1383
1384        # Backup current corrupted kernel.
1385        self.backup_kernel(suffix='.corrupt')
1386
1387        # Restore kernel.
1388        remote_temp_dir = self.faft_client.system.create_temp_dir()
1389        for p in ('A', 'B'):
1390            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1391            self._client.send_file(
1392                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1393                    remote_path)
1394            self.faft_client.kernel.write(p, remote_path)
1395
1396        self.switcher.mode_aware_reboot()
1397        logging.info('Successfully restored kernel.')
1398
1399    def backup_cgpt_attributes(self):
1400        """Backup CGPT partition table attributes."""
1401        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1402
1403    def restore_cgpt_attributes(self):
1404        """Restore CGPT partition table attributes."""
1405        current_table = self.faft_client.cgpt.get_attributes()
1406        if current_table == self._backup_cgpt_attr:
1407            return
1408        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1409                     self._backup_cgpt_attr,
1410                     current_table)
1411        self.faft_client.cgpt.set_attributes(self._backup_cgpt_attr)
1412
1413        self.switcher.mode_aware_reboot()
1414        logging.info('Successfully restored CGPT table.')
1415
1416    def try_fwb(self, count=0):
1417        """set to try booting FWB count # times
1418
1419        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
1420        vboot2
1421
1422        @param count: an integer specifying value to program into
1423                      fwb_tries(vb1)/fw_try_next(vb2)
1424        """
1425        if self.fw_vboot2:
1426            self.faft_client.system.set_fw_try_next('B', count)
1427        else:
1428            # vboot1: we need to boot into fwb at least once
1429            if not count:
1430                count = count + 1
1431            self.faft_client.system.set_try_fw_b(count)
1432
1433