• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import ctypes
7import logging
8import os
9import pprint
10import re
11import time
12import uuid
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error
16from autotest_lib.client.common_lib.cros import tpm_utils
17from autotest_lib.server import test
18from autotest_lib.server.cros import vboot_constants as vboot
19from autotest_lib.server.cros.faft.utils.config import Config as FAFTConfig
20from autotest_lib.server.cros.faft.rpc_proxy import RPCProxy
21from autotest_lib.server.cros.faft.utils import mode_switcher
22from autotest_lib.server.cros.faft.utils.faft_checkers import FAFTCheckers
23from autotest_lib.server.cros.servo import chrome_base_ec
24from autotest_lib.server.cros.servo import chrome_cr50
25from autotest_lib.server.cros.servo import chrome_ec
26from autotest_lib.server.cros.servo import servo
27
28ConnectionError = mode_switcher.ConnectionError
29
30
31class FAFTBase(test.test):
32    """The base class of FAFT classes.
33
34    It launches the FAFTClient on DUT, such that the test can access its
35    firmware functions and interfaces. It also provides some methods to
36    handle the reboot mechanism, in order to ensure FAFTClient is still
37    connected after reboot.
38    @type servo: servo.Servo
39    """
40    def initialize(self, host):
41        """Create a FAFTClient object and install the dependency."""
42
43        self.servo = host.servo
44
45        # Rotate old logs out of the way before test starts, to avoid noise.
46        self.servo.rotate_servod_logs(filename=None)
47        self.servo.initialize_dut()
48
49        self._client = host
50        self.faft_client = RPCProxy(host)
51        self.lockfile = '/usr/local/tmp/faft/lock'
52
53
54class FirmwareTest(FAFTBase):
55    """
56    Base class that sets up helper objects/functions for firmware tests.
57
58    TODO: add documentaion as the FAFT rework progresses.
59    """
60    version = 1
61
62    # Mapping of partition number of kernel and rootfs.
63    KERNEL_MAP = {'a':'2', 'b':'4', '2':'2', '4':'4', '3':'2', '5':'4'}
64    ROOTFS_MAP = {'a':'3', 'b':'5', '2':'3', '4':'5', '3':'3', '5':'5'}
65    OTHER_KERNEL_MAP = {'a':'4', 'b':'2', '2':'4', '4':'2', '3':'4', '5':'2'}
66    OTHER_ROOTFS_MAP = {'a':'5', 'b':'3', '2':'5', '4':'3', '3':'5', '5':'3'}
67
68    CHROMEOS_MAGIC = "CHROMEOS"
69    CORRUPTED_MAGIC = "CORRUPTD"
70
71    # Delay for waiting client to return before EC suspend
72    EC_SUSPEND_DELAY = 5
73
74    # Delay between EC suspend and wake
75    WAKE_DELAY = 10
76
77    # Delay between closing and opening lid
78    LID_DELAY = 1
79
80    # FWMP space constants
81    FWMP_CLEARED_EXIT_STATUS = 1
82    FWMP_CLEARED_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS'
83                              '_INVALID')
84
85    # UARTs that may be captured
86    UARTS = ('cpu', 'cr50', 'ec', 'servo_micro', 'servo_v4', 'usbpd')
87
88    _ROOTFS_PARTITION_NUMBER = 3
89
90    _backup_firmware_identity = dict()
91    _backup_kernel_sha = dict()
92    _backup_cgpt_attr = dict()
93    _backup_gbb_flags = None
94    _backup_dev_mode = None
95
96    # Class level variable, keep track the states of one time setup.
97    # This variable is preserved across tests which inherit this class.
98    _global_setup_done = {
99        'gbb_flags': False,
100        'reimage': False,
101        'usb_check': False,
102    }
103
104    @classmethod
105    def check_setup_done(cls, label):
106        """Check if the given setup is done.
107
108        @param label: The label of the setup.
109        """
110        return cls._global_setup_done[label]
111
112    @classmethod
113    def mark_setup_done(cls, label):
114        """Mark the given setup done.
115
116        @param label: The label of the setup.
117        """
118        cls._global_setup_done[label] = True
119
120    @classmethod
121    def unmark_setup_done(cls, label):
122        """Mark the given setup not done.
123
124        @param label: The label of the setup.
125        """
126        cls._global_setup_done[label] = False
127
128    def initialize(self, host, cmdline_args, ec_wp=None):
129        super(FirmwareTest, self).initialize(host)
130        self.run_id = str(uuid.uuid4())
131        logging.info('FirmwareTest initialize begin (id=%s)', self.run_id)
132        # Parse arguments from command line
133        args = {}
134        self.power_control = host.POWER_CONTROL_RPM
135        for arg in cmdline_args:
136            match = re.search("^(\w+)=(.+)", arg)
137            if match:
138                args[match.group(1)] = match.group(2)
139
140        self._no_ec_sync = False
141        if 'no_ec_sync' in args:
142            if 'true' in args['no_ec_sync'].lower():
143                self._no_ec_sync = True
144
145        self.faft_config = FAFTConfig(
146                self.faft_client.system.get_platform_name(),
147                self.faft_client.system.get_model_name())
148        self.checkers = FAFTCheckers(self)
149        self.switcher = mode_switcher.create_mode_switcher(self)
150
151        if self.faft_config.chrome_ec:
152            self.ec = chrome_ec.ChromeEC(self.servo)
153        # Check for presence of a USBPD console
154        if self.faft_config.chrome_usbpd:
155            self.usbpd = chrome_ec.ChromeUSBPD(self.servo)
156        elif self.faft_config.chrome_ec:
157            # If no separate USBPD console, then PD exists on EC console
158            self.usbpd = self.ec
159        # Get pdtester console
160        self.pdtester = host.pdtester
161        self.pdtester_host = host._pdtester_host
162
163        if 'power_control' in args:
164            self.power_control = args['power_control']
165            if self.power_control not in host.POWER_CONTROL_VALID_ARGS:
166                raise error.TestError('Valid values for --args=power_control '
167                                      'are %s. But you entered wrong argument '
168                                      'as "%s".'
169                                      % (host.POWER_CONTROL_VALID_ARGS,
170                                         self.power_control))
171
172        if not self.faft_client.system.dev_tpm_present():
173            raise error.TestError('/dev/tpm0 does not exist on the client')
174
175        # Create the BaseEC object. None if not available.
176        self.base_ec = chrome_base_ec.create_base_ec(self.servo)
177
178        self._setup_uart_capture()
179        self._record_system_info()
180        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
181        logging.info('vboot version: %d', 2 if self.fw_vboot2 else 1)
182        if self.fw_vboot2:
183            self.faft_client.system.set_fw_try_next('A')
184            if self.faft_client.system.get_crossystem_value(
185                    'mainfw_act') == 'B':
186                logging.info('mainfw_act is B. rebooting to set it A')
187                # TODO(crbug.com/1018322): remove try/catch once that bug is
188                # marked as fixed and verified. In that case the overlay for
189                # the board itself will map warm_reset to cold_reset.
190                try:
191                    self.switcher.mode_aware_reboot()
192                except ConnectionError as e:
193                    if 'DUT is still up unexpectedly' in str(e):
194                        # In this case, try doing a cold_reset instead
195                        self.switcher.mode_aware_reboot(reboot_type='cold')
196                    else:
197                      raise
198
199        # Check flashrom before first use, to avoid xmlrpclib.Fault.
200        if not self.faft_client.bios.is_available():
201            raise error.TestError(
202                    "flashrom is broken; check 'flashrom -p host'"
203                    "and rpc server log.")
204
205        self._setup_gbb_flags()
206        self.faft_client.updater.stop_daemon()
207        self._create_faft_lockfile()
208        self._create_old_faft_lockfile()
209        self._setup_ec_write_protect(ec_wp)
210        # See chromium:239034 regarding needing this sync.
211        self.blocking_sync()
212        self.servo.rotate_servod_logs('servod.init', self.resultsdir)
213        logging.info('FirmwareTest initialize done (id=%s)', self.run_id)
214
215    def cleanup(self):
216        """Autotest cleanup function."""
217        # Unset state checker in case it's set by subclass
218        logging.info('FirmwareTest cleaning up (id=%s)', self.run_id)
219
220        # capture servod logs for body of test
221        self.servo.rotate_servod_logs('servod', self.resultsdir)
222
223        # Capture UART before doing anything else, so we can guarantee we get
224        # some uart results.
225        try:
226            self._record_uart_capture()
227        except:
228            logging.warn('Failed initial uart capture during cleanup')
229
230        # Discard redundant log messages containing the captured uart text:
231        # ... Servod - DEBUG - servo_server.py:765:get - ec_uart_stream = '...'
232        self.servo.rotate_servod_logs(filename=None)
233
234        try:
235            self.faft_client.system.is_available()
236        except:
237            # Remote is not responding. Revive DUT so that subsequent tests
238            # don't fail.
239            self._restore_routine_from_timeout()
240        self.switcher.restore_mode()
241        self._restore_ec_write_protect()
242        self._restore_servo_v4_role()
243        self._restore_gbb_flags()
244        self.faft_client.updater.start_daemon()
245        self.faft_client.updater.cleanup()
246        self._remove_faft_lockfile()
247        self._remove_old_faft_lockfile()
248        self._record_faft_client_log()
249        self.servo.rotate_servod_logs('servod.cleanup', self.resultsdir)
250
251        # Capture any new uart output, then discard log messages again.
252        self._cleanup_uart_capture()
253        self.servo.rotate_servod_logs(filename=None)
254
255        super(FirmwareTest, self).cleanup()
256        logging.info('FirmwareTest cleanup done (id=%s)', self.run_id)
257
258    def _record_system_info(self):
259        """Record some critical system info to the attr keyval.
260
261        This info is used by generate_test_report later.
262        """
263        system_info = {
264            'hwid': self.faft_client.system.get_crossystem_value('hwid'),
265            'ec_version': self.faft_client.ec.get_version(),
266            'ro_fwid': self.faft_client.system.get_crossystem_value('ro_fwid'),
267            'rw_fwid': self.faft_client.system.get_crossystem_value('fwid'),
268            'servo_host_os_version' : self.servo.get_os_version(),
269            'servod_version': self.servo.get_servod_version(),
270            'os_version': self._client.get_release_builder_path(),
271            'servo_type': self.servo.get_servo_version()
272        }
273
274        # Record the servo v4 and servo micro versions when possible
275        system_info.update(self.servo.get_servo_fw_versions())
276
277        if hasattr(self, 'cr50'):
278            system_info['cr50_version'] = self.servo.get('cr50_version')
279
280        logging.info('System info:\n%s', pprint.pformat(system_info))
281        self.write_attr_keyval(system_info)
282
283    def invalidate_firmware_setup(self):
284        """Invalidate all firmware related setup state.
285
286        This method is called when the firmware is re-flashed. It resets all
287        firmware related setup states so that the next test setup properly
288        again.
289        """
290        self.unmark_setup_done('gbb_flags')
291
292    def _retrieve_recovery_reason_from_trap(self):
293        """Try to retrieve the recovery reason from a trapped recovery screen.
294
295        @return: The recovery_reason, 0 if any error.
296        """
297        recovery_reason = 0
298        logging.info('Try to retrieve recovery reason...')
299        if self.servo.get_usbkey_direction() == 'dut':
300            self.switcher.bypass_rec_mode()
301        else:
302            self.servo.switch_usbkey('dut')
303
304        try:
305            self.switcher.wait_for_client()
306            lines = self.faft_client.system.run_shell_command_get_output(
307                        'crossystem recovery_reason')
308            recovery_reason = int(lines[0])
309            logging.info('Got the recovery reason %d.', recovery_reason)
310        except ConnectionError:
311            logging.error('Failed to get the recovery reason due to connection '
312                          'error.')
313        return recovery_reason
314
315    def _reset_client(self):
316        """Reset client to a workable state.
317
318        This method is called when the client is not responsive. It may be
319        caused by the following cases:
320          - halt on a firmware screen without timeout, e.g. REC_INSERT screen;
321          - corrupted firmware;
322          - corrutped OS image.
323        """
324        # DUT may halt on a firmware screen. Try cold reboot.
325        logging.info('Try cold reboot...')
326        self.switcher.mode_aware_reboot(reboot_type='cold',
327                                        sync_before_boot=False,
328                                        wait_for_dut_up=False)
329        self.switcher.wait_for_client_offline()
330        self.switcher.bypass_dev_mode()
331        try:
332            self.switcher.wait_for_client()
333            return
334        except ConnectionError:
335            logging.warn('Cold reboot doesn\'t help, still connection error.')
336
337        # DUT may be broken by a corrupted firmware. Restore firmware.
338        # We assume the recovery boot still works fine. Since the recovery
339        # code is in RO region and all FAFT tests don't change the RO region
340        # except GBB.
341        if self.is_firmware_saved():
342            self._ensure_client_in_recovery()
343            logging.info('Try restore the original firmware...')
344            if self.is_firmware_changed():
345                try:
346                    self.restore_firmware()
347                    return
348                except ConnectionError:
349                    logging.warn('Restoring firmware doesn\'t help, still '
350                                 'connection error.')
351
352        # Perhaps it's kernel that's broken. Let's try restoring it.
353        if self.is_kernel_saved():
354            self._ensure_client_in_recovery()
355            logging.info('Try restore the original kernel...')
356            if self.is_kernel_changed():
357                try:
358                    self.restore_kernel()
359                    return
360                except ConnectionError:
361                    logging.warn('Restoring kernel doesn\'t help, still '
362                                 'connection error.')
363
364        # DUT may be broken by a corrupted OS image. Restore OS image.
365        self._ensure_client_in_recovery()
366        logging.info('Try restore the OS image...')
367        self.faft_client.system.run_shell_command('chromeos-install --yes')
368        self.switcher.mode_aware_reboot(wait_for_dut_up=False)
369        self.switcher.wait_for_client_offline()
370        self.switcher.bypass_dev_mode()
371        try:
372            self.switcher.wait_for_client()
373            logging.info('Successfully restore OS image.')
374            return
375        except ConnectionError:
376            logging.warn('Restoring OS image doesn\'t help, still connection '
377                         'error.')
378
379    def _ensure_client_in_recovery(self):
380        """Ensure client in recovery boot; reboot into it if necessary.
381
382        @raise TestError: if failed to boot the USB image.
383        """
384        logging.info('Try boot into USB image...')
385        self.switcher.reboot_to_mode(to_mode='rec', sync_before_boot=False,
386                                     wait_for_dut_up=False)
387        self.servo.switch_usbkey('host')
388        self.switcher.bypass_rec_mode()
389        try:
390            self.switcher.wait_for_client()
391        except ConnectionError:
392            raise error.TestError('Failed to boot the USB image.')
393
394    def _restore_routine_from_timeout(self):
395        """A routine to try to restore the system from a timeout error.
396
397        This method is called when FAFT failed to connect DUT after reboot.
398
399        @raise TestFail: This exception is already raised, with a decription
400                         why it failed.
401        """
402        # DUT is disconnected. Capture the UART output for debug.
403        self._record_uart_capture()
404
405        # TODO(waihong@chromium.org): Implement replugging the Ethernet to
406        # identify if it is a network flaky.
407
408        recovery_reason = self._retrieve_recovery_reason_from_trap()
409
410        # Reset client to a workable state.
411        self._reset_client()
412
413        # Raise the proper TestFail exception.
414        if recovery_reason:
415            raise error.TestFail('Trapped in the recovery screen (reason: %d) '
416                                 'and timed out' % recovery_reason)
417        else:
418            raise error.TestFail('Timed out waiting for DUT reboot')
419
420    def assert_test_image_in_usb_disk(self, usb_dev=None):
421        """Assert an USB disk plugged-in on servo and a test image inside.
422
423        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
424                        If None, it is detected automatically.
425        @raise TestError: if USB disk not detected or not a test image.
426        """
427        if self.check_setup_done('usb_check'):
428            return
429        if usb_dev:
430            assert self.servo.get_usbkey_direction() == 'host'
431        else:
432            self.servo.switch_usbkey('host')
433            usb_dev = self.servo.probe_host_usb_dev()
434            if not usb_dev:
435                raise error.TestError(
436                        'An USB disk should be plugged in the servo board.')
437
438        rootfs = '%s%s' % (usb_dev, self._ROOTFS_PARTITION_NUMBER)
439        logging.info('usb dev is %s', usb_dev)
440        tmpd = self.servo.system_output('mktemp -d -t usbcheck.XXXX')
441        self.servo.system('mount -o ro %s %s' % (rootfs, tmpd))
442
443        try:
444            usb_lsb = self.servo.system_output('cat %s' %
445                os.path.join(tmpd, 'etc/lsb-release'))
446            logging.debug('Dumping lsb-release on USB stick:\n%s', usb_lsb)
447            dut_lsb = '\n'.join(self.faft_client.system.
448                run_shell_command_get_output('cat /etc/lsb-release'))
449            logging.debug('Dumping lsb-release on DUT:\n%s', dut_lsb)
450            if not re.search(r'RELEASE_TRACK=.*test', usb_lsb):
451                raise error.TestError('USB stick in servo is no test image')
452            usb_board = re.search(r'BOARD=(.*)', usb_lsb).group(1)
453            dut_board = re.search(r'BOARD=(.*)', dut_lsb).group(1)
454            if usb_board != dut_board:
455                raise error.TestError('USB stick in servo contains a %s '
456                    'image, but DUT is a %s' % (usb_board, dut_board))
457        finally:
458            for cmd in ('umount -l %s' % tmpd, 'sync', 'rm -rf %s' % tmpd):
459                self.servo.system(cmd)
460
461        self.mark_setup_done('usb_check')
462
463    def setup_pdtester(self, flip_cc=False):
464        """Setup the PDTester to a given state.
465
466        @param flip_cc: True to flip CC polarity; False to not flip it.
467        @raise TestError: If Servo v4 not setup properly.
468        """
469        # Servo v4 by default has dts_mode enabled. Enabling dts_mode affects
470        # the behaviors of what PD FAFT tests. So we want it disabled.
471        if 'servo_v4' in self.pdtester.servo_type:
472            self.pdtester.set('servo_v4_dts_mode', 'off')
473
474        self.pdtester.set('usbc_polarity', 'cc2' if flip_cc else 'cc1')
475        # Make it sourcing max voltage.
476        self.pdtester.charge(self.pdtester.USBC_MAX_VOLTAGE)
477
478        # Servo v4 requires an external charger to source power. Make sure
479        # this setup is correct.
480        if ('servo_v4' in self.pdtester.servo_type and
481            self.pdtester.get('servo_v4_role') != 'src'):
482            raise error.TestError('Servo v4 failed sourcing power! Check '
483                    'the "DUT POWER" port connecting a valid charger.')
484
485    def setup_usbkey(self, usbkey, host=None, used_for_recovery=None):
486        """Setup the USB disk for the test.
487
488        It checks the setup of USB disk and a valid ChromeOS test image inside.
489        It also muxes the USB disk to either the host or DUT by request.
490
491        @param usbkey: True if the USB disk is required for the test, False if
492                       not required.
493        @param host: Optional, True to mux the USB disk to host, False to mux it
494                    to DUT, default to do nothing.
495        @param used_for_recovery: Optional, True if the USB disk is used for
496                                  recovery boot; False if the USB disk is not
497                                  used for recovery boot, like Ctrl-U USB boot.
498        """
499        if usbkey:
500            self.assert_test_image_in_usb_disk()
501        elif host is None:
502            # USB disk is not required for the test. Better to mux it to host.
503            host = True
504
505        if host is True:
506            self.servo.switch_usbkey('host')
507        elif host is False:
508            self.servo.switch_usbkey('dut')
509
510        if used_for_recovery is None:
511            # Default value is True if usbkey == True.
512            # As the common usecase of USB disk is for recovery boot. Tests
513            # can define it explicitly if not.
514            used_for_recovery = usbkey
515
516        if used_for_recovery:
517            # In recovery boot, the locked EC RO doesn't support PD for most
518            # of the CrOS devices. The default servo v4 power role is a SRC.
519            # The DUT becomes a SNK. Lack of PD makes CrOS unable to do the
520            # data role swap from UFP to DFP; as a result, DUT can't see the
521            # USB disk and the Ethernet dongle on servo v4.
522            #
523            # This is a workaround to set servo v4 as a SNK, for every FAFT
524            # test which boots into the USB disk in the recovery mode.
525            #
526            # TODO(waihong): Add a check to see if the battery level is too
527            # low and sleep for a while for charging.
528            self.set_servo_v4_role_to_snk()
529
530    def set_servo_v4_role_to_snk(self):
531        """Set the servo v4 role to SNK."""
532        self._needed_restore_servo_v4_role = True
533        self.servo.set_servo_v4_role('snk')
534
535    def _restore_servo_v4_role(self):
536        """Restore the servo v4 role to default SRC."""
537        if not hasattr(self, '_needed_restore_servo_v4_role'):
538            return
539        if self._needed_restore_servo_v4_role:
540            self.servo.set_servo_v4_role('src')
541
542    def get_usbdisk_path_on_dut(self):
543        """Get the path of the USB disk device plugged-in the servo on DUT.
544
545        Returns:
546          A string representing USB disk path, like '/dev/sdb', or None if
547          no USB disk is found.
548        """
549        cmd = 'ls -d /dev/s*[a-z]'
550        original_value = self.servo.get_usbkey_direction()
551
552        # Make the dut unable to see the USB disk.
553        self.servo.switch_usbkey('off')
554        no_usb_set = set(
555            self.faft_client.system.run_shell_command_get_output(cmd))
556
557        # Make the dut able to see the USB disk.
558        self.servo.switch_usbkey('dut')
559        time.sleep(self.faft_config.usb_plug)
560        has_usb_set = set(
561            self.faft_client.system.run_shell_command_get_output(cmd))
562
563        # Back to its original value.
564        if original_value != self.servo.get_usbkey_direction():
565            self.servo.switch_usbkey(original_value)
566
567        diff_set = has_usb_set - no_usb_set
568        if len(diff_set) == 1:
569            return diff_set.pop()
570        else:
571            return None
572
573    def _create_faft_lockfile(self):
574        """Creates the FAFT lockfile."""
575        logging.info('Creating FAFT lockfile...')
576        command = 'touch %s' % (self.lockfile)
577        self.faft_client.system.run_shell_command(command)
578
579    def _create_old_faft_lockfile(self):
580        """
581        Creates the FAFT lockfile in its legacy location.
582
583        TODO (once M83 is stable, approx. June 9 2020):
584        Delete this function, as platform/installer/chromeos-setgoodkernel
585        will look for the lockfile in the new location
586        (/usr/local/tmp/faft/lock)
587        """
588        logging.info('Creating legacy FAFT lockfile...')
589        self.faft_client.system.run_shell_command('mkdir -p /var/tmp/faft')
590        self.faft_client.system.run_shell_command('touch /var/tmp/faft/lock')
591
592    def _remove_faft_lockfile(self):
593        """Removes the FAFT lockfile."""
594        logging.info('Removing FAFT lockfile...')
595        command = 'rm -f %s' % (self.lockfile)
596        self.faft_client.system.run_shell_command(command)
597
598    def _remove_old_faft_lockfile(self):
599        """
600        Removes the FAFT lockfile from its legacy location.
601
602        TODO (once M83 is stable, approx. June 9 2020):
603        Delete this function, as platform/installer/chromeos-setgoodkernel
604        will look for the lockfile in the new location
605        (/usr/local/tmp/faft/lock)
606        """
607        logging.info('Removing legacy FAFT lockfile...')
608        self.faft_client.system.run_shell_command('rm -rf /var/tmp/faft')
609
610    def clear_set_gbb_flags(self, clear_mask, set_mask):
611        """Clear and set the GBB flags in the current flashrom.
612
613        @param clear_mask: A mask of flags to be cleared.
614        @param set_mask: A mask of flags to be set.
615        """
616        gbb_flags = self.faft_client.bios.get_gbb_flags()
617        new_flags = gbb_flags & ctypes.c_uint32(~clear_mask).value | set_mask
618        self.gbb_flags = new_flags
619        if new_flags != gbb_flags:
620            self._backup_gbb_flags = gbb_flags
621            logging.info('Changing GBB flags from 0x%x to 0x%x.',
622                         gbb_flags, new_flags)
623            self.faft_client.bios.set_gbb_flags(new_flags)
624            # If changing FORCE_DEV_SWITCH_ON or DISABLE_EC_SOFTWARE_SYNC flag,
625            # reboot to get a clear state
626            if ((gbb_flags ^ new_flags) &
627                (vboot.GBB_FLAG_FORCE_DEV_SWITCH_ON |
628                 vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC)):
629                self.switcher.mode_aware_reboot()
630        else:
631            logging.info('Current GBB flags look good for test: 0x%x.',
632                         gbb_flags)
633
634
635    def _check_capability(self, target, required_cap, suppress_warning):
636        """Check if current platform has required capabilities for the target.
637
638        @param required_cap: A list containing required capabilities.
639        @param suppress_warning: True to suppress any warning messages.
640        @return: True if requirements are met. Otherwise, False.
641        """
642        if not required_cap:
643            return True
644
645        if target not in ['ec', 'cr50']:
646            raise error.TestError('Invalid capability target %r' % target)
647
648        for cap in required_cap:
649            if cap not in getattr(self.faft_config, target + '_capability'):
650                if not suppress_warning:
651                    logging.warn('Requires %s capability "%s" to run this '
652                                 'test.', target, cap)
653                return False
654
655        return True
656
657
658    def check_ec_capability(self, required_cap=None, suppress_warning=False):
659        """Check if current platform has required EC capabilities.
660
661        @param required_cap: A list containing required EC capabilities. Pass in
662                             None to only check for presence of Chrome EC.
663        @param suppress_warning: True to suppress any warning messages.
664        @return: True if requirements are met. Otherwise, False.
665        """
666        if not self.faft_config.chrome_ec:
667            if not suppress_warning:
668                logging.warn('Requires Chrome EC to run this test.')
669            return False
670        return self._check_capability('ec', required_cap, suppress_warning)
671
672
673    def check_cr50_capability(self, required_cap=None, suppress_warning=False):
674        """Check if current platform has required Cr50 capabilities.
675
676        @param required_cap: A list containing required Cr50 capabilities. Pass
677                             in None to only check for presence of cr50 uart.
678        @param suppress_warning: True to suppress any warning messages.
679        @return: True if requirements are met. Otherwise, False.
680        """
681        if not hasattr(self, 'cr50'):
682            if not suppress_warning:
683                logging.warn('Requires Chrome Cr50 to run this test.')
684            return False
685        return self._check_capability('cr50', required_cap, suppress_warning)
686
687
688    def check_root_part_on_non_recovery(self, part):
689        """Check the partition number of root device and on normal/dev boot.
690
691        @param part: A string of partition number, e.g.'3'.
692        @return: True if the root device matched and on normal/dev boot;
693                 otherwise, False.
694        """
695        return self.checkers.root_part_checker(part) and \
696                self.checkers.crossystem_checker({
697                    'mainfw_type': ('normal', 'developer'),
698                })
699
700    def _join_part(self, dev, part):
701        """Return a concatenated string of device and partition number.
702
703        @param dev: A string of device, e.g.'/dev/sda'.
704        @param part: A string of partition number, e.g.'3'.
705        @return: A concatenated string of device and partition number,
706                 e.g.'/dev/sda3'.
707
708        >>> seq = FirmwareTest()
709        >>> seq._join_part('/dev/sda', '3')
710        '/dev/sda3'
711        >>> seq._join_part('/dev/mmcblk0', '2')
712        '/dev/mmcblk0p2'
713        """
714        if 'mmcblk' in dev:
715            return dev + 'p' + part
716        elif 'nvme' in dev:
717            return dev + 'p' + part
718        else:
719            return dev + part
720
721    def copy_kernel_and_rootfs(self, from_part, to_part):
722        """Copy kernel and rootfs from from_part to to_part.
723
724        @param from_part: A string of partition number to be copied from.
725        @param to_part: A string of partition number to be copied to.
726        """
727        root_dev = self.faft_client.system.get_root_dev()
728        logging.info('Copying kernel from %s to %s. Please wait...',
729                     from_part, to_part)
730        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
731                (self._join_part(root_dev, self.KERNEL_MAP[from_part]),
732                 self._join_part(root_dev, self.KERNEL_MAP[to_part])))
733        logging.info('Copying rootfs from %s to %s. Please wait...',
734                     from_part, to_part)
735        self.faft_client.system.run_shell_command('dd if=%s of=%s bs=4M' %
736                (self._join_part(root_dev, self.ROOTFS_MAP[from_part]),
737                 self._join_part(root_dev, self.ROOTFS_MAP[to_part])))
738
739    def ensure_kernel_boot(self, part):
740        """Ensure the request kernel boot.
741
742        If not, it duplicates the current kernel to the requested kernel
743        and sets the requested higher priority to ensure it boot.
744
745        @param part: A string of kernel partition number or 'a'/'b'.
746        """
747        if not self.checkers.root_part_checker(part):
748            if self.faft_client.kernel.diff_a_b():
749                self.copy_kernel_and_rootfs(
750                        from_part=self.OTHER_KERNEL_MAP[part],
751                        to_part=part)
752            self.reset_and_prioritize_kernel(part)
753            self.switcher.mode_aware_reboot()
754
755    def ensure_dev_internal_boot(self, original_dev_boot_usb):
756        """Ensure internal device boot in developer mode.
757
758        If not internal device boot, it will try to reboot the device and
759        bypass dev mode to boot into internal device.
760
761        @param original_dev_boot_usb: Original dev_boot_usb value.
762        """
763        logging.info('Checking internal device boot.')
764        if self.faft_client.system.is_removable_device_boot():
765            logging.info('Reboot into internal disk...')
766            self.faft_client.system.set_dev_boot_usb(original_dev_boot_usb)
767            self.switcher.mode_aware_reboot()
768        self.check_state((self.checkers.dev_boot_usb_checker, False,
769                          'Device not booted from internal disk properly.'))
770
771    def set_hardware_write_protect(self, enable):
772        """Set hardware write protect pin.
773
774        @param enable: True if asserting write protect pin. Otherwise, False.
775        """
776        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
777
778    def set_ec_write_protect_and_reboot(self, enable):
779        """Set EC write protect status and reboot to take effect.
780
781        The write protect state is only activated if both hardware write
782        protect pin is asserted and software write protect flag is set.
783        This method asserts/deasserts hardware write protect pin first, and
784        set corresponding EC software write protect flag.
785
786        If the device uses non-Chrome EC, set the software write protect via
787        flashrom.
788
789        If the device uses Chrome EC, a reboot is required for write protect
790        to take effect. Since the software write protect flag cannot be unset
791        if hardware write protect pin is asserted, we need to deasserted the
792        pin first if we are deactivating write protect. Similarly, a reboot
793        is required before we can modify the software flag.
794
795        @param enable: True if activating EC write protect. Otherwise, False.
796        """
797        self.set_hardware_write_protect(enable)
798        if self.faft_config.chrome_ec:
799            self.set_chrome_ec_write_protect_and_reboot(enable)
800        else:
801            self.faft_client.ec.set_write_protect(enable)
802            self.switcher.mode_aware_reboot()
803
804    def set_chrome_ec_write_protect_and_reboot(self, enable):
805        """Set Chrome EC write protect status and reboot to take effect.
806
807        @param enable: True if activating EC write protect. Otherwise, False.
808        """
809        if enable:
810            # Set write protect flag and reboot to take effect.
811            self.ec.set_flash_write_protect(enable)
812            self.sync_and_ec_reboot()
813        else:
814            # Reboot after deasserting hardware write protect pin to deactivate
815            # write protect. And then remove software write protect flag.
816            self.sync_and_ec_reboot()
817            self.ec.set_flash_write_protect(enable)
818
819    def _setup_ec_write_protect(self, ec_wp):
820        """Setup for EC write-protection.
821
822        It makes sure the EC in the requested write-protection state. If not, it
823        flips the state. Flipping the write-protection requires DUT reboot.
824
825        @param ec_wp: True to request EC write-protected; False to request EC
826                      not write-protected; None to do nothing.
827        """
828        if ec_wp is None:
829            self._old_wpsw_boot = None
830            return
831        self._old_wpsw_cur = self.checkers.crossystem_checker(
832                                    {'wpsw_cur': '1'}, suppress_logging=True)
833        self._old_wpsw_boot = self.checkers.crossystem_checker(
834                                   {'wpsw_boot': '1'}, suppress_logging=True)
835        if not (ec_wp == self._old_wpsw_cur == self._old_wpsw_boot):
836            if not self.faft_config.ap_access_ec_flash:
837                raise error.TestNAError(
838                        "Cannot change EC write-protect for this device")
839
840            logging.info('The test required EC is %swrite-protected. Reboot '
841                         'and flip the state.', '' if ec_wp else 'not ')
842            self.switcher.mode_aware_reboot(
843                    'custom',
844                     lambda:self.set_ec_write_protect_and_reboot(ec_wp))
845        wpsw_boot = wpsw_cur = '1' if ec_wp == True else '0'
846        self.check_state((self.checkers.crossystem_checker, {
847                               'wpsw_boot': wpsw_boot, 'wpsw_cur': wpsw_cur}))
848
849    def _restore_ec_write_protect(self):
850        """Restore the original EC write-protection."""
851        if (not hasattr(self, '_old_wpsw_boot')) or (self._old_wpsw_boot is
852                                                     None):
853            return
854        if not self.checkers.crossystem_checker({'wpsw_boot': '1' if
855                       self._old_wpsw_boot else '0'}, suppress_logging=True):
856            logging.info('Restore original EC write protection and reboot.')
857            self.switcher.mode_aware_reboot(
858                    'custom',
859                    lambda:self.set_ec_write_protect_and_reboot(
860                            self._old_wpsw_boot))
861        self.check_state((self.checkers.crossystem_checker, {
862                           'wpsw_boot': '1' if self._old_wpsw_boot else '0'}))
863
864    def _setup_uart_capture(self):
865        """Set up the CPU/EC/PD UART capture."""
866
867        # If adding another capture, make sure to update the UARTS constant.
868        self.cpu_uart_file = os.path.join(self.resultsdir, 'cpu_uart.txt')
869        self.servo.set('cpu_uart_capture', 'on')
870        self.cr50_uart_file = None
871        self.ec_uart_file = None
872        self.servo_micro_uart_file = None
873        self.servo_v4_uart_file = None
874        self.usbpd_uart_file = None
875
876        try:
877            # Check that the console works before declaring the cr50 console
878            # connection exists and enabling uart capture.
879            self.servo.get('cr50_version')
880            self.servo.set('cr50_uart_capture', 'on')
881            self.cr50_uart_file = os.path.join(self.resultsdir, 'cr50_uart.txt')
882            self.cr50 = chrome_cr50.ChromeCr50(self.servo, self.faft_config)
883        except servo.ControlUnavailableError:
884            logging.warn('cr50 console not supported.')
885        except error.TestFail as e:
886            logging.warn('Unknown cr50 uart capture error: %s', str(e))
887        if (self.faft_config.chrome_ec and
888            self.servo.has_control('ec_uart_capture')):
889            self.servo.set('ec_uart_capture', 'on')
890            self.ec_uart_file = os.path.join(self.resultsdir, 'ec_uart.txt')
891            # Log separate PD console if supported
892            if (self.check_ec_capability(['usbpd_uart'], suppress_warning=True)
893                and self.servo.has_control('usb_pd_uart_capture')):
894                self.servo.set('usbpd_uart_capture', 'on')
895                self.usbpd_uart_file = os.path.join(self.resultsdir,
896                                                    'usbpd_uart.txt')
897        else:
898            logging.info('Not a Google EC, cannot capture ec console output.')
899
900        for servo_console in ['servo_micro', 'servo_v4']:
901            capture_cmd = '%s_uart_capture' % servo_console
902            uart_file_attr = '%s_uart_file' % servo_console
903            if self.servo.has_control(capture_cmd):
904                self.servo.set(capture_cmd, 'on')
905                outfile = '%s_uart.txt' % servo_console
906                setattr(self, uart_file_attr, os.path.join(self.resultsdir,
907                                                           outfile))
908
909    def _record_uart_capture(self):
910        """Record the CPU/EC/PD UART output stream to files."""
911        for uart in self.UARTS:
912            # Attribute will be nonexistent or empty if capture wasn't set up.
913            uart_file = getattr(self, '%s_uart_file' % uart, None)
914            if uart_file:
915                with open(uart_file, 'a') as f:
916                    f.write(ast.literal_eval(
917                        self.servo.get('%s_uart_stream' % uart)))
918
919    def _cleanup_uart_capture(self):
920        """Cleanup the CPU/EC/PD UART capture."""
921        # Flush the remaining UART output first.
922        self._record_uart_capture()
923        for uart in self.UARTS:
924            # Attribute will be nonexistent or empty if capture wasn't set up.
925            uart_file = getattr(self, '%s_uart_file' % uart, None)
926            if uart_file:
927                self.servo.set('%s_uart_capture' % uart, 'off')
928
929    def _get_power_state(self, power_state):
930        """
931        Return the current power state of the AP
932        """
933        return self.ec.send_command_get_output("powerinfo", [power_state])
934
935    def wait_power_state(self, power_state, retries):
936        """
937        Wait for certain power state.
938
939        @param power_state: power state you are expecting
940        @param retries: retries.  This is necessary if AP is powering down
941        and transitioning through different states.
942        """
943        logging.info('Checking power state "%s" maximum %d times.',
944                     power_state, retries)
945        while retries > 0:
946            logging.info("try count: %d", retries)
947            try:
948                retries = retries - 1
949                ret = self._get_power_state(power_state)
950                return True
951            except error.TestFail:
952                pass
953        return False
954
955    def suspend(self):
956        """Suspends the DUT."""
957        cmd = '(sleep %d; powerd_dbus_suspend) &' % self.EC_SUSPEND_DELAY
958        self.faft_client.system.run_shell_command(cmd)
959        time.sleep(self.EC_SUSPEND_DELAY)
960
961    def _record_faft_client_log(self):
962        """Record the faft client log to the results directory."""
963        client_log = self.faft_client.system.dump_log(True)
964        client_log_file = os.path.join(self.resultsdir, 'faft_client.log')
965        with open(client_log_file, 'w') as f:
966            f.write(client_log)
967
968    def _setup_gbb_flags(self):
969        """Setup the GBB flags for FAFT test."""
970        if self.check_setup_done('gbb_flags'):
971            return
972
973        logging.info('Set proper GBB flags for test.')
974        # Ensure that GBB flags are set to 0x140.
975        flags_to_set = (vboot.GBB_FLAG_FAFT_KEY_OVERIDE |
976                        vboot.GBB_FLAG_ENTER_TRIGGERS_TONORM)
977        # And if the "no_ec_sync" argument is set, then disable EC software
978        # sync.
979        if self._no_ec_sync:
980            flags_to_set |= vboot.GBB_FLAG_DISABLE_EC_SOFTWARE_SYNC
981
982        self.clear_set_gbb_flags(0xffffffff, flags_to_set)
983        self.mark_setup_done('gbb_flags')
984
985    def drop_backup_gbb_flags(self):
986        """Drops the backup GBB flags.
987
988        This can be used when a test intends to permanently change GBB flags.
989        """
990        self._backup_gbb_flags = None
991
992    def _restore_gbb_flags(self):
993        """Restore GBB flags to their original state."""
994        if self._backup_gbb_flags is None:
995            return
996        # Setting up and restoring the GBB flags take a lot of time. For
997        # speed-up purpose, don't restore it.
998        logging.info('***')
999        logging.info('*** Please manually restore the original GBB flags to: '
1000                     '0x%x ***', self._backup_gbb_flags)
1001        logging.info('***')
1002        self.unmark_setup_done('gbb_flags')
1003
1004    def setup_tried_fwb(self, tried_fwb):
1005        """Setup for fw B tried state.
1006
1007        It makes sure the system in the requested fw B tried state. If not, it
1008        tries to do so.
1009
1010        @param tried_fwb: True if requested in tried_fwb=1;
1011                          False if tried_fwb=0.
1012        """
1013        if tried_fwb:
1014            if not self.checkers.crossystem_checker({'tried_fwb': '1'}):
1015                logging.info(
1016                    'Firmware is not booted with tried_fwb. Reboot into it.')
1017                self.faft_client.system.set_try_fw_b()
1018        else:
1019            if not self.checkers.crossystem_checker({'tried_fwb': '0'}):
1020                logging.info(
1021                    'Firmware is booted with tried_fwb. Reboot to clear.')
1022
1023    def power_on(self):
1024        """Switch DUT AC power on."""
1025        self._client.power_on(self.power_control)
1026
1027    def power_off(self):
1028        """Switch DUT AC power off."""
1029        self._client.power_off(self.power_control)
1030
1031    def power_cycle(self):
1032        """Power cycle DUT AC power."""
1033        self._client.power_cycle(self.power_control)
1034
1035    def setup_rw_boot(self, section='a'):
1036        """Make sure firmware is in RW-boot mode.
1037
1038        If the given firmware section is in RO-boot mode, turn off the RO-boot
1039        flag and reboot DUT into RW-boot mode.
1040
1041        @param section: A firmware section, either 'a' or 'b'.
1042        """
1043        flags = self.faft_client.bios.get_preamble_flags(section)
1044        if flags & vboot.PREAMBLE_USE_RO_NORMAL:
1045            flags = flags ^ vboot.PREAMBLE_USE_RO_NORMAL
1046            self.faft_client.bios.set_preamble_flags(section, flags)
1047            self.switcher.mode_aware_reboot()
1048
1049    def setup_kernel(self, part):
1050        """Setup for kernel test.
1051
1052        It makes sure both kernel A and B bootable and the current boot is
1053        the requested kernel part.
1054
1055        @param part: A string of kernel partition number or 'a'/'b'.
1056        """
1057        self.ensure_kernel_boot(part)
1058        logging.info('Checking the integrity of kernel B and rootfs B...')
1059        if (self.faft_client.kernel.diff_a_b() or
1060                not self.faft_client.rootfs.verify_rootfs('B')):
1061            logging.info('Copying kernel and rootfs from A to B...')
1062            self.copy_kernel_and_rootfs(from_part=part,
1063                                        to_part=self.OTHER_KERNEL_MAP[part])
1064        self.reset_and_prioritize_kernel(part)
1065
1066    def reset_and_prioritize_kernel(self, part):
1067        """Make the requested partition highest priority.
1068
1069        This function also reset kerenl A and B to bootable.
1070
1071        @param part: A string of partition number to be prioritized.
1072        """
1073        root_dev = self.faft_client.system.get_root_dev()
1074        # Reset kernel A and B to bootable.
1075        self.faft_client.system.run_shell_command(
1076            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['a'], root_dev))
1077        self.faft_client.system.run_shell_command(
1078            'cgpt add -i%s -P1 -S1 -T0 %s' % (self.KERNEL_MAP['b'], root_dev))
1079        # Set kernel part highest priority.
1080        self.faft_client.system.run_shell_command('cgpt prioritize -i%s %s' %
1081                (self.KERNEL_MAP[part], root_dev))
1082
1083    def do_blocking_sync(self, device):
1084        """Run a blocking sync command."""
1085        logging.info("Blocking sync for %s", device)
1086
1087        if 'mmcblk' in device:
1088            # For mmc devices, use `mmc status get` command to send an
1089            # empty command to wait for the disk to be available again.
1090            self.faft_client.system.run_shell_command('mmc status get %s' %
1091                                                      device)
1092        elif 'nvme' in device:
1093            # For NVMe devices, use `nvme flush` command to commit data
1094            # and metadata to non-volatile media.
1095
1096            # Get a list of NVMe namespaces, and flush them individually.
1097            # The output is assumed to be in the following format:
1098            # [ 0]:0x1
1099            # [ 1]:0x2
1100            list_ns_cmd = "nvme list-ns %s" % device
1101            available_ns = self.faft_client.system.run_shell_command_get_output(
1102                list_ns_cmd)
1103
1104            if not available_ns:
1105                raise error.TestError(
1106                    "Listing namespaces failed (empty output): %s"
1107                    % list_ns_cmd)
1108
1109            for ns in available_ns:
1110                ns = ns.split(':')[-1]
1111                flush_cmd = 'nvme flush %s -n %s' % (device, ns)
1112                flush_rc = self.faft_client.system.run_shell_command_get_status(
1113                    flush_cmd)
1114                if flush_rc != 0:
1115                    raise error.TestError(
1116                        "Flushing namespace %s failed (rc=%s): %s"
1117                        % (ns, flush_rc, flush_cmd))
1118        else:
1119            # For other devices, hdparm sends TUR to check if
1120            # a device is ready for transfer operation.
1121            self.faft_client.system.run_shell_command('hdparm -f %s' % device)
1122
1123    def blocking_sync(self):
1124        """Sync root device and internal device."""
1125        # The double calls to sync fakes a blocking call
1126        # since the first call returns before the flush
1127        # is complete, but the second will wait for the
1128        # first to finish.
1129        self.faft_client.system.run_shell_command('sync')
1130        self.faft_client.system.run_shell_command('sync')
1131
1132        # sync only sends SYNCHRONIZE_CACHE but doesn't check the status.
1133        # This function will perform a device-specific sync command.
1134        root_dev = self.faft_client.system.get_root_dev()
1135        self.do_blocking_sync(root_dev)
1136
1137        # Also sync the internal device if booted from removable media.
1138        if self.faft_client.system.is_removable_device_boot():
1139            internal_dev = self.faft_client.system.get_internal_device()
1140            self.do_blocking_sync(internal_dev)
1141
1142    def sync_and_ec_reboot(self, flags=''):
1143        """Request the client sync and do a EC triggered reboot.
1144
1145        @param flags: Optional, a space-separated string of flags passed to EC
1146                      reboot command, including:
1147                          default: EC soft reboot;
1148                          'hard': EC cold/hard reboot.
1149        """
1150        self.blocking_sync()
1151        self.ec.reboot(flags)
1152        time.sleep(self.faft_config.ec_boot_to_console)
1153        self.check_lid_and_power_on()
1154
1155    def reboot_and_reset_tpm(self):
1156        """Reboot into recovery mode, reset TPM, then reboot back to disk."""
1157        self.switcher.reboot_to_mode(to_mode='rec')
1158        self.faft_client.system.run_shell_command('chromeos-tpm-recovery')
1159        self.switcher.mode_aware_reboot()
1160
1161    def full_power_off_and_on(self):
1162        """Shutdown the device by pressing power button and power on again."""
1163        boot_id = self.get_bootid()
1164        # Press power button to trigger Chrome OS normal shutdown process.
1165        # We use a customized delay since the normal-press 1.2s is not enough.
1166        self.servo.power_key(self.faft_config.hold_pwr_button_poweroff)
1167        # device can take 44-51 seconds to restart,
1168        # add buffer from the default timeout of 60 seconds.
1169        self.switcher.wait_for_client_offline(timeout=100, orig_boot_id=boot_id)
1170        time.sleep(self.faft_config.shutdown)
1171        if self.check_ec_capability(['x86'], suppress_warning=True):
1172            self.check_shutdown_power_state("G3", pwr_retries=5)
1173        # Short press power button to boot DUT again.
1174        self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1175
1176    def check_shutdown_power_state(self, power_state, pwr_retries):
1177        """Check whether the device entered into requested EC power state
1178        after shutdown.
1179
1180        @param power_state: EC power state has to be checked. Either S5 or G3.
1181        @param pwr_retries: Times to check if the DUT in expected power state.
1182        @raise TestFail: If device failed to enter into requested power state.
1183        """
1184        if not self.wait_power_state(power_state, pwr_retries):
1185            raise error.TestFail('System not shutdown properly and EC fails '
1186                                 'to enter into %s state.' % power_state)
1187        logging.info('System entered into %s state..', power_state)
1188
1189    def check_lid_and_power_on(self):
1190        """
1191        On devices with EC software sync, system powers on after EC reboots if
1192        lid is open. Otherwise, the EC shuts down CPU after about 3 seconds.
1193        This method checks lid switch state and presses power button if
1194        necessary.
1195        """
1196        if self.servo.get("lid_open") == "no":
1197            time.sleep(self.faft_config.software_sync)
1198            self.servo.power_short_press()
1199
1200    def stop_powerd(self):
1201        """Stop the powerd daemon on the AP.
1202
1203        This will cause the AP to ignore power button presses sent by the EC.
1204        """
1205        powerd_running = self.faft_client.system.run_shell_command_check_output(
1206                'status powerd', 'start/running')
1207        if powerd_running:
1208            logging.debug('Stopping powerd')
1209            self.faft_client.system.run_shell_command("stop powerd")
1210
1211    def _modify_usb_kernel(self, usb_dev, from_magic, to_magic):
1212        """Modify the kernel header magic in USB stick.
1213
1214        The kernel header magic is the first 8-byte of kernel partition.
1215        We modify it to make it fail on kernel verification check.
1216
1217        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1218        @param from_magic: A string of magic which we change it from.
1219        @param to_magic: A string of magic which we change it to.
1220        @raise TestError: if failed to change magic.
1221        """
1222        assert len(from_magic) == 8
1223        assert len(to_magic) == 8
1224        # USB image only contains one kernel.
1225        kernel_part = self._join_part(usb_dev, self.KERNEL_MAP['a'])
1226        read_cmd = "sudo dd if=%s bs=8 count=1 2>/dev/null" % kernel_part
1227        current_magic = self.servo.system_output(read_cmd)
1228        if current_magic == to_magic:
1229            logging.info("The kernel magic is already %s.", current_magic)
1230            return
1231        if current_magic != from_magic:
1232            raise error.TestError("Invalid kernel image on USB: wrong magic.")
1233
1234        logging.info('Modify the kernel magic in USB, from %s to %s.',
1235                     from_magic, to_magic)
1236        write_cmd = ("echo -n '%s' | sudo dd of=%s oflag=sync conv=notrunc "
1237                     " 2>/dev/null" % (to_magic, kernel_part))
1238        self.servo.system(write_cmd)
1239
1240        if self.servo.system_output(read_cmd) != to_magic:
1241            raise error.TestError("Failed to write new magic.")
1242
1243    def corrupt_usb_kernel(self, usb_dev):
1244        """Corrupt USB kernel by modifying its magic from CHROMEOS to CORRUPTD.
1245
1246        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1247        """
1248        self._modify_usb_kernel(usb_dev, self.CHROMEOS_MAGIC,
1249                                self.CORRUPTED_MAGIC)
1250
1251    def restore_usb_kernel(self, usb_dev):
1252        """Restore USB kernel by modifying its magic from CORRUPTD to CHROMEOS.
1253
1254        @param usb_dev: A string of USB stick path on the host, like '/dev/sdc'.
1255        """
1256        self._modify_usb_kernel(usb_dev, self.CORRUPTED_MAGIC,
1257                                self.CHROMEOS_MAGIC)
1258
1259    def _call_action(self, action_tuple, check_status=False):
1260        """Call the action function with/without arguments.
1261
1262        @param action_tuple: A function, or a tuple (function, args, error_msg),
1263                             in which, args and error_msg are optional. args is
1264                             either a value or a tuple if multiple arguments.
1265                             This can also be a list containing multiple
1266                             function or tuple. In this case, these actions are
1267                             called in sequence.
1268        @param check_status: Check the return value of action function. If not
1269                             succeed, raises a TestFail exception.
1270        @return: The result value of the action function.
1271        @raise TestError: An error when the action function is not callable.
1272        @raise TestFail: When check_status=True, action function not succeed.
1273        """
1274        if isinstance(action_tuple, list):
1275            return all([self._call_action(action, check_status=check_status)
1276                        for action in action_tuple])
1277
1278        action = action_tuple
1279        args = ()
1280        error_msg = 'Not succeed'
1281        if isinstance(action_tuple, tuple):
1282            action = action_tuple[0]
1283            if len(action_tuple) >= 2:
1284                args = action_tuple[1]
1285                if not isinstance(args, tuple):
1286                    args = (args,)
1287            if len(action_tuple) >= 3:
1288                error_msg = action_tuple[2]
1289
1290        if action is None:
1291            return
1292
1293        if not callable(action):
1294            raise error.TestError('action is not callable!')
1295
1296        info_msg = 'calling %s' % action.__name__
1297        if args:
1298            info_msg += ' with args %s' % str(args)
1299        logging.info(info_msg)
1300        ret = action(*args)
1301
1302        if check_status and not ret:
1303            raise error.TestFail('%s: %s returning %s' %
1304                                 (error_msg, info_msg, str(ret)))
1305        return ret
1306
1307    def run_shutdown_process(self, shutdown_action, pre_power_action=None,
1308                             run_power_action=True, post_power_action=None,
1309                             shutdown_timeout=None):
1310        """Run shutdown_action(), which makes DUT shutdown, and power it on.
1311
1312        @param shutdown_action: function which makes DUT shutdown, like
1313                                pressing power key.
1314        @param pre_power_action: function which is called before next power on.
1315        @param run_power_action: power_key press by default, set to None to skip.
1316        @param post_power_action: function which is called after next power on.
1317        @param shutdown_timeout: a timeout to confirm DUT shutdown.
1318        @raise TestFail: if the shutdown_action() failed to turn DUT off.
1319        """
1320        self._call_action(shutdown_action)
1321        logging.info('Wait to ensure DUT shut down...')
1322        try:
1323            if shutdown_timeout is None:
1324                shutdown_timeout = self.faft_config.shutdown_timeout
1325            self.switcher.wait_for_client(timeout=shutdown_timeout)
1326            raise error.TestFail(
1327                    'Should shut the device down after calling %s.' %
1328                    shutdown_action.__name__)
1329        except ConnectionError:
1330            if self.check_ec_capability(['x86'], suppress_warning=True):
1331                self.check_shutdown_power_state("G3", pwr_retries=5)
1332            logging.info(
1333                'DUT is surely shutdown. We are going to power it on again...')
1334
1335        if pre_power_action:
1336            self._call_action(pre_power_action)
1337        if run_power_action:
1338            self.servo.power_key(self.faft_config.hold_pwr_button_poweron)
1339        if post_power_action:
1340            self._call_action(post_power_action)
1341
1342    def get_bootid(self, retry=3):
1343        """
1344        Return the bootid.
1345        """
1346        boot_id = None
1347        while retry:
1348            try:
1349                boot_id = self._client.get_boot_id()
1350                break
1351            except error.AutoservRunError:
1352                retry -= 1
1353                if retry:
1354                    logging.info('Retry to get boot_id...')
1355                else:
1356                    logging.warning('Failed to get boot_id.')
1357        logging.info('boot_id: %s', boot_id)
1358        return boot_id
1359
1360    def check_state(self, func):
1361        """
1362        Wrapper around _call_action with check_status set to True. This is a
1363        helper function to be used by tests and is currently implemented by
1364        calling _call_action with check_status=True.
1365
1366        TODO: This function's arguments need to be made more stringent. And
1367        its functionality should be moved over to check functions directly in
1368        the future.
1369
1370        @param func: A function, or a tuple (function, args, error_msg),
1371                             in which, args and error_msg are optional. args is
1372                             either a value or a tuple if multiple arguments.
1373                             This can also be a list containing multiple
1374                             function or tuple. In this case, these actions are
1375                             called in sequence.
1376        @return: The result value of the action function.
1377        @raise TestFail: If the function does notsucceed.
1378        """
1379        logging.info("-[FAFT]-[ start stepstate_checker ]----------")
1380        self._call_action(func, check_status=True)
1381        logging.info("-[FAFT]-[ end state_checker ]----------------")
1382
1383    def get_current_firmware_identity(self):
1384        """Get current firmware sha and fwids of body and vblock.
1385
1386        @return: Current firmware checksums and fwids, as a dict
1387        """
1388
1389        # TODO(dgoyette): add a way to avoid hardcoding the keys (section names)
1390        current_checksums = {
1391            'VBOOTA': self.faft_client.bios.get_sig_sha('a'),
1392            'FVMAINA': self.faft_client.bios.get_body_sha('a'),
1393            'VBOOTB': self.faft_client.bios.get_sig_sha('b'),
1394            'FVMAINB': self.faft_client.bios.get_body_sha('b'),
1395        }
1396        if not all(current_checksums.values()):
1397            raise error.TestError(
1398                    'Failed to get firmware sha: %s', current_checksums)
1399
1400        current_fwids = {
1401            'RO_FRID': self.faft_client.bios.get_section_fwid('ro'),
1402            'RW_FWID_A': self.faft_client.bios.get_section_fwid('a'),
1403            'RW_FWID_B': self.faft_client.bios.get_section_fwid('b'),
1404        }
1405        if not all(current_fwids.values()):
1406            raise error.TestError(
1407                    'Failed to get firmware fwid(s): %s', current_fwids)
1408
1409        identifying_info = dict(current_fwids)
1410        identifying_info.update(current_checksums)
1411        return identifying_info
1412
1413    def is_firmware_changed(self):
1414        """Check if the current firmware changed, by comparing its SHA and fwid.
1415
1416        @return: True if it is changed, otherwise False.
1417        """
1418        # Device may not be rebooted after test.
1419        self.faft_client.bios.reload()
1420
1421        current_info = self.get_current_firmware_identity()
1422        prev_info = self._backup_firmware_identity
1423
1424        if current_info == prev_info:
1425            return False
1426        else:
1427            changed = set()
1428            for section in set(current_info.keys()) | set(prev_info.keys()):
1429                if current_info.get(section) != prev_info.get(section):
1430                    changed.add(section)
1431
1432            logging.info('Firmware changed: %s', ', '.join(sorted(changed)))
1433            return True
1434
1435    def backup_firmware(self, suffix='.original'):
1436        """Backup firmware to file, and then send it to host.
1437
1438        @param suffix: a string appended to backup file name
1439        """
1440        remote_temp_dir = self.faft_client.system.create_temp_dir()
1441        remote_bios_path = os.path.join(remote_temp_dir, 'bios')
1442        self.faft_client.bios.dump_whole(remote_bios_path)
1443        self._client.get_file(remote_bios_path,
1444                              os.path.join(self.resultsdir, 'bios' + suffix))
1445
1446        if self.faft_config.chrome_ec:
1447            remote_ec_path = os.path.join(remote_temp_dir, 'ec')
1448            self.faft_client.ec.dump_whole(remote_ec_path)
1449            self._client.get_file(remote_ec_path,
1450                              os.path.join(self.resultsdir, 'ec' + suffix))
1451
1452        self._client.run('rm -rf %s' % remote_temp_dir)
1453        logging.info('Backup firmware stored in %s with suffix %s',
1454            self.resultsdir, suffix)
1455
1456        self._backup_firmware_identity = self.get_current_firmware_identity()
1457
1458    def is_firmware_saved(self):
1459        """Check if a firmware saved (called backup_firmware before).
1460
1461        @return: True if the firmware is backed up; otherwise False.
1462        """
1463        return bool(self._backup_firmware_identity)
1464
1465    def clear_saved_firmware(self):
1466        """Clear the firmware saved by the method backup_firmware."""
1467        self._backup_firmware_identity = {}
1468
1469    def restore_firmware(self, suffix='.original', restore_ec=True):
1470        """Restore firmware from host in resultsdir.
1471
1472        @param suffix: a string appended to backup file name
1473        @param restore_ec: True to restore the ec firmware; False not to do.
1474        @return: True if firmware needed to be restored
1475        """
1476        if not self.is_firmware_changed():
1477            return False
1478
1479        # Backup current corrupted firmware.
1480        self.backup_firmware(suffix='.corrupt')
1481
1482        # Restore firmware.
1483        remote_temp_dir = self.faft_client.system.create_temp_dir()
1484        self._client.send_file(os.path.join(self.resultsdir, 'bios' + suffix),
1485                               os.path.join(remote_temp_dir, 'bios'))
1486
1487        self.faft_client.bios.write_whole(
1488            os.path.join(remote_temp_dir, 'bios'))
1489
1490        if self.faft_config.chrome_ec and restore_ec:
1491            self._client.send_file(os.path.join(self.resultsdir, 'ec' + suffix),
1492                os.path.join(remote_temp_dir, 'ec'))
1493            self.faft_client.ec.write_whole(
1494                os.path.join(remote_temp_dir, 'ec'))
1495
1496        self.switcher.mode_aware_reboot()
1497        logging.info('Successfully restored firmware.')
1498        return True
1499
1500    def setup_firmwareupdate_shellball(self, shellball=None):
1501        """Setup a shellball to use in firmware update test.
1502
1503        Check if there is a given shellball, and it is a shell script. Then,
1504        send it to the remote host. Otherwise, use the
1505        /usr/sbin/chromeos-firmwareupdate in the image and replace its inside
1506        BIOS and EC images with the active firmware images.
1507
1508        @param shellball: path of a shellball or default to None.
1509        """
1510        if shellball:
1511            # Determine the firmware file is a shellball or a raw binary.
1512            is_shellball = (utils.system_output("file %s" % shellball).find(
1513                    "shell script") != -1)
1514            if is_shellball:
1515                logging.info('Device will update firmware with shellball %s',
1516                             shellball)
1517                temp_path = self.faft_client.updater.get_temp_path()
1518                working_shellball = os.path.join(temp_path,
1519                                                 'chromeos-firmwareupdate')
1520                self._client.send_file(shellball, working_shellball)
1521                self.faft_client.updater.extract_shellball()
1522            else:
1523                raise error.TestFail(
1524                    'The given shellball is not a shell script.')
1525        else:
1526            logging.info('No shellball given, use the original shellball and '
1527                         'replace its BIOS and EC images.')
1528            work_path = self.faft_client.updater.get_work_path()
1529            bios_in_work_path = os.path.join(
1530                work_path, self.faft_client.updater.get_bios_relative_path())
1531            ec_in_work_path = os.path.join(
1532                work_path, self.faft_client.updater.get_ec_relative_path())
1533            logging.info('Writing current BIOS to: %s', bios_in_work_path)
1534            self.faft_client.bios.dump_whole(bios_in_work_path)
1535            if self.faft_config.chrome_ec:
1536                logging.info('Writing current EC to: %s', ec_in_work_path)
1537                self.faft_client.ec.dump_firmware(ec_in_work_path)
1538            self.faft_client.updater.repack_shellball()
1539
1540    def is_kernel_changed(self):
1541        """Check if the current kernel is changed, by comparing its SHA1 hash.
1542
1543        @return: True if it is changed; otherwise, False.
1544        """
1545        changed = False
1546        for p in ('A', 'B'):
1547            backup_sha = self._backup_kernel_sha.get(p, None)
1548            current_sha = self.faft_client.kernel.get_sha(p)
1549            if backup_sha != current_sha:
1550                changed = True
1551                logging.info('Kernel %s is changed', p)
1552        return changed
1553
1554    def backup_kernel(self, suffix='.original'):
1555        """Backup kernel to files, and the send them to host.
1556
1557        @param suffix: a string appended to backup file name.
1558        """
1559        remote_temp_dir = self.faft_client.system.create_temp_dir()
1560        for p in ('A', 'B'):
1561            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1562            self.faft_client.kernel.dump(p, remote_path)
1563            self._client.get_file(
1564                    remote_path,
1565                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)))
1566            self._backup_kernel_sha[p] = self.faft_client.kernel.get_sha(p)
1567        logging.info('Backup kernel stored in %s with suffix %s',
1568            self.resultsdir, suffix)
1569
1570    def is_kernel_saved(self):
1571        """Check if kernel images are saved (backup_kernel called before).
1572
1573        @return: True if the kernel is saved; otherwise, False.
1574        """
1575        return len(self._backup_kernel_sha) != 0
1576
1577    def clear_saved_kernel(self):
1578        """Clear the kernel saved by backup_kernel()."""
1579        self._backup_kernel_sha = dict()
1580
1581    def restore_kernel(self, suffix='.original'):
1582        """Restore kernel from host in resultsdir.
1583
1584        @param suffix: a string appended to backup file name.
1585        """
1586        if not self.is_kernel_changed():
1587            return
1588
1589        # Backup current corrupted kernel.
1590        self.backup_kernel(suffix='.corrupt')
1591
1592        # Restore kernel.
1593        remote_temp_dir = self.faft_client.system.create_temp_dir()
1594        for p in ('A', 'B'):
1595            remote_path = os.path.join(remote_temp_dir, 'kernel_%s' % p)
1596            self._client.send_file(
1597                    os.path.join(self.resultsdir, 'kernel_%s%s' % (p, suffix)),
1598                    remote_path)
1599            self.faft_client.kernel.write(p, remote_path)
1600
1601        self.switcher.mode_aware_reboot()
1602        logging.info('Successfully restored kernel.')
1603
1604    def backup_cgpt_attributes(self):
1605        """Backup CGPT partition table attributes."""
1606        self._backup_cgpt_attr = self.faft_client.cgpt.get_attributes()
1607
1608    def restore_cgpt_attributes(self):
1609        """Restore CGPT partition table attributes."""
1610        current_table = self.faft_client.cgpt.get_attributes()
1611        if current_table == self._backup_cgpt_attr:
1612            return
1613        logging.info('CGPT table is changed. Original: %r. Current: %r.',
1614                     self._backup_cgpt_attr,
1615                     current_table)
1616        self.faft_client.cgpt.set_attributes(
1617                self._backup_cgpt_attr['A'], self._backup_cgpt_attr['B'])
1618
1619        self.switcher.mode_aware_reboot()
1620        logging.info('Successfully restored CGPT table.')
1621
1622    def try_fwb(self, count=0):
1623        """set to try booting FWB count # times
1624
1625        Wrapper to set fwb_tries for vboot1 and fw_try_count,fw_try_next for
1626        vboot2
1627
1628        @param count: an integer specifying value to program into
1629                      fwb_tries(vb1)/fw_try_next(vb2)
1630        """
1631        if self.fw_vboot2:
1632            self.faft_client.system.set_fw_try_next('B', count)
1633        else:
1634            # vboot1: we need to boot into fwb at least once
1635            if not count:
1636                count = count + 1
1637            self.faft_client.system.set_try_fw_b(count)
1638
1639    def identify_shellball(self, include_ec=None):
1640        """Get the FWIDs of all targets and sections in the shellball
1641
1642        @param include_ec: if True, get EC fwids.
1643                           If None (default), assume True if board has an EC
1644        @return: the dict of versions in the shellball
1645        """
1646        fwids = dict()
1647        fwids['bios'] = self.faft_client.updater.get_all_fwids('bios')
1648
1649        if include_ec is None:
1650            if self.faft_config.platform == 'Samus':
1651                include_ec = False  # no ec.bin in shellball
1652            else:
1653                include_ec = self.faft_config.chrome_ec
1654
1655        if include_ec:
1656            fwids['ec'] = self.faft_client.updater.get_all_fwids('ec')
1657        return fwids
1658
1659    def modify_shellball(self, append, modify_ro=True, modify_ec=False):
1660        """Modify the FWIDs of targets and sections in the shellball
1661
1662        @return: the full path of the shellball
1663        """
1664
1665        if modify_ro:
1666            self.faft_client.updater.modify_fwids('bios', ['ro', 'a', 'b'])
1667        else:
1668            self.faft_client.updater.modify_fwids('bios', ['a', 'b'])
1669
1670        if modify_ec:
1671            if modify_ro:
1672                self.faft_client.updater.modify_fwids('ec', ['ro', 'rw'])
1673            else:
1674                self.faft_client.updater.modify_fwids('ec', ['rw'])
1675
1676        modded_shellball = self.faft_client.updater.repack_shellball(append)
1677
1678        return modded_shellball
1679
1680    @staticmethod
1681    def check_fwids_written(before_fwids, image_fwids, after_fwids,
1682                            expected_written):
1683        """Check the dicts of fwids for correctness after an update is applied.
1684
1685        The targets checked come from the keys of expected_written.
1686        The sections checked come from the inner dicts of the fwids parameters.
1687
1688        The fwids should be keyed by target (flash type), then by section:
1689        {'bios': {'ro': '<fwid>', 'a': '<fwid>', 'b': '<fwid>'},
1690         'ec': {'ro': '<fwid>', 'rw': '<fwid>'}
1691
1692        For expected_written, the dict should be keyed by flash type only:
1693        {'bios': ['ro'], 'ec': ['ro', 'rw']}
1694
1695        @param before_fwids: dict of versions from before the update
1696        @param image_fwids: dict of versions in the update
1697        @param after_fwids: dict of actual versions after the update
1698        @param expected_written: dict indicating which ones should have changed
1699        @return: list of error lines for mismatches
1700
1701        @type before_fwids: dict
1702        @type image_fwids: dict
1703        @type after_fwids: dict
1704        @type expected_written: dict
1705        @rtype: list
1706        """
1707        errors = []
1708
1709        for target in sorted(expected_written.keys()):
1710            # target is BIOS or EC
1711
1712            before_missing = (target not in before_fwids)
1713            after_missing = (target not in after_fwids)
1714            if before_missing or after_missing:
1715                if before_missing:
1716                    errors.append("...no before_fwids[%s]" % target)
1717                if after_missing:
1718                    errors.append("...no after_fwids[%s]" % target)
1719                continue
1720
1721            written_sections = expected_written.get(target) or list()
1722            written_sections = set(written_sections)
1723
1724            before_sections = set(before_fwids.get(target) or dict())
1725            image_sections = set(image_fwids.get(target) or dict())
1726            after_sections = set(after_fwids.get(target) or dict())
1727
1728            for section in before_sections | image_sections | after_sections:
1729                # section is RO, RW, A, or B
1730
1731                before_fwid = before_fwids[target][section]
1732                image_fwid = image_fwids[target][section]
1733                actual_fwid = after_fwids[target][section]
1734
1735                if section in written_sections:
1736                    expected_fwid = image_fwid
1737                    expected_desc = 'rewritten fwid (%s)' % expected_fwid
1738                    if image_fwid == before_fwid:
1739                        expected_desc = ('rewritten (no changes) fwid (%s)' %
1740                                         expected_fwid)
1741                else:
1742                    expected_fwid = before_fwid
1743                    expected_desc = 'original fwid (%s)' % expected_fwid
1744
1745                if actual_fwid == expected_fwid:
1746                    actual_desc = 'correct value'
1747
1748                elif actual_fwid == image_fwid:
1749                    actual_desc = 'rewritten fwid (%s)' % actual_fwid
1750                    if image_fwid == before_fwid:
1751                        # The flash could have been rewritten with the same fwid
1752                        actual_desc = 'possibly written fwid (%s)' % actual_fwid
1753
1754                elif actual_fwid == before_fwid:
1755                    actual_desc = 'original fwid (%s)' % actual_fwid
1756
1757                else:
1758                    actual_desc = 'unknown fwid (%s)' % actual_fwid
1759
1760                msg = ("...FWID (%s %s): expected %s, got %s" %
1761                       (target.upper(), section.upper(),
1762                        expected_desc, actual_desc))
1763
1764                if actual_fwid != expected_fwid:
1765                    errors.append(msg)
1766        return errors
1767
1768
1769    def fwmp_is_cleared(self):
1770        """Return True if the FWMP has been created"""
1771        res = self.host.run('cryptohome '
1772                            '--action=get_firmware_management_parameters',
1773                            ignore_status=True)
1774        if res.exit_status and res.exit_status != self.FWMP_CLEARED_EXIT_STATUS:
1775            raise error.TestError('Could not run cryptohome command %r' % res)
1776        return self.FWMP_CLEARED_ERROR_MSG in res.stdout
1777
1778
1779    def _tpm_is_owned(self):
1780        """Returns True if the tpm is owned"""
1781        result = self.host.run('cryptohome --action=tpm_more_status',
1782                               ignore_status=True)
1783        logging.debug(result)
1784        return result.exit_status == 0 and 'owned: true' in result.stdout
1785
1786    def clear_fwmp(self):
1787        """Clear the FWMP"""
1788        if self.fwmp_is_cleared():
1789            return
1790        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
1791        self.host.run('cryptohome --action=tpm_take_ownership')
1792        if not utils.wait_for_value(self._tpm_is_owned, expected_value=True):
1793            raise error.TestError('Unable to own tpm while clearing fwmp.')
1794        self.host.run('cryptohome '
1795                      '--action=remove_firmware_management_parameters')
1796