• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import time
8
9from autotest_lib.server import test
10from autotest_lib.server.cros import filesystem_util
11from autotest_lib.client.common_lib import error, utils
12
13
14class FingerprintTest(test.test):
15    """Base class that sets up helpers for fingerprint tests."""
16    version = 1
17
18    # Location of firmware from the build on the DUT
19    _FINGERPRINT_BUILD_FW_DIR = '/opt/google/biod/fw'
20
21    _DISABLE_FP_UPDATER_FILE = '.disable_fp_updater'
22
23    _UPSTART_DIR = '/etc/init'
24    _BIOD_UPSTART_JOB_FILE = 'biod.conf'
25    _STATEFUL_PARTITION_DIR = '/mnt/stateful_partition'
26
27    _GENIMAGES_SCRIPT_NAME = 'gen_test_images.sh'
28    _GENIMAGES_OUTPUT_DIR_NAME = 'images'
29
30    _TEST_IMAGE_FORMAT_MAP = {
31        'TEST_IMAGE_ORIGINAL': '%s.bin',
32        'TEST_IMAGE_DEV': '%s.dev',
33        'TEST_IMAGE_CORRUPT_FIRST_BYTE': '%s_corrupt_first_byte.bin',
34        'TEST_IMAGE_CORRUPT_LAST_BYTE': '%s_corrupt_last_byte.bin',
35        'TEST_IMAGE_DEV_RB_ZERO': '%s.dev.rb0',
36        'TEST_IMAGE_DEV_RB_ONE': '%s.dev.rb1',
37        'TEST_IMAGE_DEV_RB_NINE': '%s.dev.rb9'
38    }
39
40    _ROLLBACK_ZERO_BLOCK_ID = '0'
41    _ROLLBACK_INITIAL_BLOCK_ID = '1'
42    _ROLLBACK_INITIAL_MIN_VERSION = '0'
43    _ROLLBACK_INITIAL_RW_VERSION = '0'
44
45    _SERVER_GENERATED_FW_DIR_NAME = 'generated_fw'
46
47    _DUT_TMP_PATH_BASE = '/tmp/fp_test'
48
49    # Name of key in "futility show" output corresponds to the signing key ID
50    _FUTILITY_KEY_ID_KEY_NAME = 'ID'
51
52    # Types of firmware
53    _FIRMWARE_TYPE_RO = 'RO'
54    _FIRMWARE_TYPE_RW = 'RW'
55
56    # Types of signing keys
57    _KEY_TYPE_DEV = 'dev'
58    _KEY_TYPE_PRE_MP = 'premp'
59    _KEY_TYPE_MP = 'mp'
60
61    # EC board names for FPMCUs
62    _FP_BOARD_NAME_BLOONCHIPPER = 'bloonchipper'
63    _FP_BOARD_NAME_DARTMONKEY = 'dartmonkey'
64    _FP_BOARD_NAME_NOCTURNE = 'nocturne_fp'
65    _FP_BOARD_NAME_NAMI = 'nami_fp'
66
67    # Map from signing key ID to type of signing key
68    _KEY_ID_MAP_ = {
69        # bloonchipper
70        '61382804da86b4156d666cc9a976088f8b647d44': _KEY_TYPE_DEV,
71        '07b1af57220c196e363e68d73a5966047c77011e': _KEY_TYPE_PRE_MP,
72        '1c590ef36399f6a2b2ef87079c135b69ef89eb60': _KEY_TYPE_MP,
73
74        # dartmonkey
75        '257a0aa3ac9e81aa4bc3aabdb6d3d079117c5799': _KEY_TYPE_MP,
76
77        # nocturne
78        '8a8fc039a9463271995392f079b83ce33832d07d': _KEY_TYPE_DEV,
79        '6f38c866182bd9bf7a4462c06ac04fa6a0074351': _KEY_TYPE_MP,
80        'f6f7d96c48bd154dbae7e3fe3a3b4c6268a10934': _KEY_TYPE_PRE_MP,
81
82        # nami
83        '754aea623d69975a22998f7b97315dd53115d723': _KEY_TYPE_PRE_MP,
84        '35486c0090ca390408f1fbbf2a182966084fe2f8': _KEY_TYPE_MP
85
86    }
87
88    # RO versions that are flashed in the factory
89    # (for eternity for a given board)
90    _GOLDEN_RO_FIRMWARE_VERSION_MAP = {
91            _FP_BOARD_NAME_BLOONCHIPPER: {
92                    'hatch': 'bloonchipper_v2.0.4277-9f652bb3',
93                    'zork': 'bloonchipper_v2.0.5938-197506c1',
94            },
95            _FP_BOARD_NAME_DARTMONKEY: 'dartmonkey_v2.0.2887-311310808',
96            _FP_BOARD_NAME_NOCTURNE: 'nocturne_fp_v2.2.64-58cf5974e',
97            _FP_BOARD_NAME_NAMI: 'nami_fp_v2.2.144-7a08e07eb',
98    }
99
100    _FIRMWARE_VERSION_SHA256SUM = 'sha256sum'
101    _FIRMWARE_VERSION_RO_VERSION = 'ro_version'
102    _FIRMWARE_VERSION_RW_VERSION = 'rw_version'
103    _FIRMWARE_VERSION_KEY_ID = 'key_id'
104
105    # Map of attributes for a given board's various firmware file releases
106    #
107    # Two purposes:
108    #   1) Documents the exact versions and keys used for a given firmware file.
109    #   2) Used to verify that files that end up in the build (and therefore
110    #      what we release) is exactly what we expect.
111    _FIRMWARE_VERSION_MAP = {
112        _FP_BOARD_NAME_BLOONCHIPPER: {
113            'bloonchipper_v2.0.4277-9f652bb3.bin': {
114                _FIRMWARE_VERSION_SHA256SUM: '7d9b788a908bee5c83e27450258b2bbf110d7253d49faa4804562ae27e42cb3b',
115                _FIRMWARE_VERSION_RO_VERSION: 'bloonchipper_v2.0.4277-9f652bb3',
116                _FIRMWARE_VERSION_RW_VERSION: 'bloonchipper_v2.0.4277-9f652bb3',
117                _FIRMWARE_VERSION_KEY_ID: '1c590ef36399f6a2b2ef87079c135b69ef89eb60',
118            },
119            'bloonchipper_v2.0.5938-197506c1.bin': {
120                _FIRMWARE_VERSION_SHA256SUM: 'dc62e4b05eaf4fa8ab5546dcf18abdb30c8e64e9bf0fbf377ebc85155c7c3a47',
121                _FIRMWARE_VERSION_RO_VERSION: 'bloonchipper_v2.0.5938-197506c1',
122                _FIRMWARE_VERSION_RW_VERSION: 'bloonchipper_v2.0.5938-197506c1',
123                _FIRMWARE_VERSION_KEY_ID: '1c590ef36399f6a2b2ef87079c135b69ef89eb60',
124            },
125        },
126        _FP_BOARD_NAME_NOCTURNE: {
127            'nocturne_fp_v2.2.64-58cf5974e-RO_v2.0.4017-9c45fb4b3-RW.bin': {
128                _FIRMWARE_VERSION_SHA256SUM: '16c405eeaff75dcbc76dbc9f368f66e3fabc47e2ebcf13bd2b64b8b133bbff97',
129                _FIRMWARE_VERSION_RO_VERSION: 'nocturne_fp_v2.2.64-58cf5974e',
130                _FIRMWARE_VERSION_RW_VERSION: 'nocturne_fp_v2.0.4017-9c45fb4b3',
131                _FIRMWARE_VERSION_KEY_ID: '6f38c866182bd9bf7a4462c06ac04fa6a0074351',
132            },
133        },
134        _FP_BOARD_NAME_NAMI: {
135            'nami_fp_v2.2.144-7a08e07eb-RO_v2.0.4017-9c45fb4b3-RW.bin': {
136                _FIRMWARE_VERSION_SHA256SUM: '7965ea4c4371ee6d21dc462b9ed7c99078d17f4b772bec51441ca9af7d8f3a80',
137                _FIRMWARE_VERSION_RO_VERSION: 'nami_fp_v2.2.144-7a08e07eb',
138                _FIRMWARE_VERSION_RW_VERSION: 'nami_fp_v2.0.4017-9c45fb4b3',
139                _FIRMWARE_VERSION_KEY_ID: '35486c0090ca390408f1fbbf2a182966084fe2f8',
140            },
141        },
142        _FP_BOARD_NAME_DARTMONKEY: {
143            'dartmonkey_v2.0.2887-311310808-RO_v2.0.4017-9c45fb4b3-RW.bin': {
144                _FIRMWARE_VERSION_SHA256SUM: 'b84914c70e93c28e2221f48be338dbf0ad0cfb12b7877baaf6b47f7bfd2aa958',
145                _FIRMWARE_VERSION_RO_VERSION: 'dartmonkey_v2.0.2887-311310808',
146                _FIRMWARE_VERSION_RW_VERSION: 'dartmonkey_v2.0.4017-9c45fb4b3',
147                _FIRMWARE_VERSION_KEY_ID: '257a0aa3ac9e81aa4bc3aabdb6d3d079117c5799',
148            }
149        }
150    }
151
152    _BIOD_UPSTART_JOB_NAME = 'biod'
153    # TODO(crbug.com/925545)
154    _TIMBERSLIDE_UPSTART_JOB_NAME = \
155        'timberslide LOG_PATH=/sys/kernel/debug/cros_fp/console_log'
156
157    _INIT_ENTROPY_CMD = 'bio_wash --factory_init'
158
159    _CROS_FP_ARG = '--name=cros_fp'
160    _CROS_CONFIG_FINGERPRINT_PATH = '/fingerprint'
161    _ECTOOL_RO_VERSION = 'RO version'
162    _ECTOOL_RW_VERSION = 'RW version'
163    _ECTOOL_FIRMWARE_COPY = 'Firmware copy'
164    _ECTOOL_ROLLBACK_BLOCK_ID = 'Rollback block id'
165    _ECTOOL_ROLLBACK_MIN_VERSION = 'Rollback min version'
166    _ECTOOL_ROLLBACK_RW_VERSION = 'RW rollback version'
167
168    @staticmethod
169    def _parse_colon_delimited_output(ectool_output):
170        """
171        Converts ectool's (or any other tool with similar output) colon
172        delimited output into python dict. Ignores any lines that do not have
173        colons.
174
175        Example:
176        RO version:    nocturne_fp_v2.2.64-58cf5974e
177        RW version:    nocturne_fp_v2.2.110-b936c0a3c
178
179        becomes:
180        {
181          'RO version': 'nocturne_fp_v2.2.64-58cf5974e',
182          'RW version': 'nocturne_fp_v2.2.110-b936c0a3c'
183        }
184        """
185        ret = {}
186        try:
187            for line in ectool_output.strip().split('\n'):
188                splits = line.split(':', 1)
189                if len(splits) != 2:
190                    continue
191                key = splits[0].strip()
192                val = splits[1].strip()
193                ret[key] = val
194        except:
195            raise error.TestFail('Unable to parse ectool output: %s'
196                                 % ectool_output)
197        return ret
198
199    def initialize(self, host):
200        """Perform minimal initialization, to avoid AttributeError in cleanup"""
201        self.host = host
202        self.servo = host.servo
203
204        self._validate_compatible_servo_version()
205
206        self.servo.initialize_dut()
207
208        self.fp_board = self.get_fp_board()
209        self._build_fw_file = self.get_build_fw_file()
210
211    def setup_test(self, test_dir, use_dev_signed_fw=False,
212                   enable_hardware_write_protect=True,
213                   enable_software_write_protect=True,
214                   force_firmware_flashing=False, init_entropy=True):
215        """Perform more complete initialization, including copying test files"""
216        logging.info('HW write protect enabled: %s',
217                     self.is_hardware_write_protect_enabled())
218
219        # TODO(crbug.com/925545): stop timberslide so /var/log/cros_fp.log
220        # continues to update after flashing.
221        self._timberslide_running = self.host.upstart_status(
222            self._TIMBERSLIDE_UPSTART_JOB_NAME)
223        if self._timberslide_running:
224            logging.info('Stopping %s', self._TIMBERSLIDE_UPSTART_JOB_NAME)
225            self.host.upstart_stop(self._TIMBERSLIDE_UPSTART_JOB_NAME)
226
227        self._biod_running = self.host.upstart_status(
228            self._BIOD_UPSTART_JOB_NAME)
229        if self._biod_running:
230            logging.info('Stopping %s', self._BIOD_UPSTART_JOB_NAME)
231            self.host.upstart_stop(self._BIOD_UPSTART_JOB_NAME)
232
233        # On some platforms an AP reboot is needed after flashing firmware to
234        # rebind the driver.
235        self._dut_needs_reboot = self.get_host_board() == 'zork'
236
237        if filesystem_util.is_rootfs_writable(self.host):
238            if self._dut_needs_reboot:
239                logging.warning('rootfs is writable')
240            else:
241                raise error.TestFail('rootfs is writable')
242
243        if not self.biod_upstart_job_enabled():
244            raise error.TestFail(
245                    'Biod upstart job is disabled at the beginning of test')
246        if not self.fp_updater_is_enabled():
247            raise error.TestFail(
248                    'Fingerprint firmware updater is disabled at the beginning of test'
249            )
250
251        # Disable biod and updater so that they won't interfere after reboot.
252        if self._dut_needs_reboot:
253            self.disable_biod_upstart_job()
254            self.disable_fp_updater()
255
256        # create tmp working directory on device (automatically cleaned up)
257        self._dut_working_dir = self.host.get_tmp_dir(
258            parent=self._DUT_TMP_PATH_BASE)
259        logging.info('Created dut_working_dir: %s', self._dut_working_dir)
260        self.copy_files_to_dut(test_dir, self._dut_working_dir)
261
262        self.validate_build_fw_file()
263
264        gen_script = os.path.abspath(os.path.join(self.autodir,
265                                                  'server', 'cros', 'faft',
266                                                  self._GENIMAGES_SCRIPT_NAME))
267        self._dut_firmware_test_images_dir = \
268            self._generate_test_firmware_images(gen_script,
269                                                self._build_fw_file,
270                                                self._dut_working_dir)
271        logging.info('dut_firmware_test_images_dir: %s',
272                     self._dut_firmware_test_images_dir)
273
274        self._initialize_test_firmware_image_attrs(
275            self._dut_firmware_test_images_dir)
276
277        self._initialize_running_fw_version(use_dev_signed_fw,
278                                            force_firmware_flashing)
279
280        if init_entropy:
281            self._initialize_fw_entropy()
282
283        self._initialize_hw_and_sw_write_protect(enable_hardware_write_protect,
284                                                 enable_software_write_protect)
285
286    def cleanup(self):
287        """Restores original state."""
288        # Once the tests complete we need to make sure we're running the
289        # original firmware (not dev version) and potentially reset rollback.
290        self._initialize_running_fw_version(use_dev_signed_fw=False,
291                                            force_firmware_flashing=False)
292        self._initialize_fw_entropy()
293        # Re-enable biod and updater after flashing and initializing entropy so
294        # that they don't interfere if there was a reboot.
295        if hasattr(self, '_dut_needs_reboot') and self._dut_needs_reboot:
296            if not self.biod_upstart_job_enabled():
297                self.enable_biod_upstart_job()
298            if not self.fp_updater_is_enabled():
299                self.enable_fp_updater()
300        self._initialize_hw_and_sw_write_protect(
301            enable_hardware_write_protect=True,
302            enable_software_write_protect=True)
303        if hasattr(self, '_biod_running') and self._biod_running:
304            logging.info('Restarting biod')
305            self.host.upstart_restart(self._BIOD_UPSTART_JOB_NAME)
306        # TODO(crbug.com/925545)
307        if hasattr(self, '_timberslide_running') and self._timberslide_running:
308            logging.info('Restarting timberslide')
309            self.host.upstart_restart(self._TIMBERSLIDE_UPSTART_JOB_NAME)
310
311        super(FingerprintTest, self).cleanup()
312
313    def after_run_once(self):
314        """Logs which iteration just ran."""
315        logging.info('successfully ran iteration %d', self.iteration)
316
317    def _validate_compatible_servo_version(self):
318        """Asserts if a compatible servo version is not attached."""
319        servo_version = self.servo.get_servo_version()
320        logging.info('servo version: %s', servo_version)
321
322    def _generate_test_firmware_images(self, gen_script, build_fw_file,
323                                       dut_working_dir):
324        """
325        Copies the fingerprint firmware from the DUT to the server running
326        the tests, which runs a script to generate various test versions of
327        the firmware.
328
329        @return full path to location of test images on DUT
330        """
331        # create subdirectory under existing tmp dir
332        server_tmp_dir = os.path.join(self.tmpdir,
333                                      self._SERVER_GENERATED_FW_DIR_NAME)
334        os.mkdir(server_tmp_dir)
335        logging.info('server_tmp_dir: %s', server_tmp_dir)
336
337        # Copy firmware from device to server
338        self.get_files_from_dut(build_fw_file, server_tmp_dir)
339
340        # Run the test image generation script on server
341        pushd = os.getcwd()
342        os.chdir(server_tmp_dir)
343        cmd = ' '.join([gen_script,
344                        self.get_fp_board(),
345                        os.path.basename(build_fw_file)])
346        result = self.run_server_cmd(cmd)
347        if result.exit_status != 0:
348            raise error.TestFail('Failed to run test image generation script')
349
350        os.chdir(pushd)
351
352        # Copy resulting files to DUT tmp dir
353        server_generated_images_dir = \
354            os.path.join(server_tmp_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
355        self.copy_files_to_dut(server_generated_images_dir, dut_working_dir)
356
357        return os.path.join(dut_working_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
358
359    def _initialize_test_firmware_image_attrs(self, dut_fw_test_images_dir):
360        """Sets attributes with full path to test images on DUT.
361
362        Example: self.TEST_IMAGE_DEV = /some/path/images/nocturne_fp.dev
363        """
364        for key, val in self._TEST_IMAGE_FORMAT_MAP.iteritems():
365            full_path = os.path.join(dut_fw_test_images_dir,
366                                     val % self.get_fp_board())
367            setattr(self, key, full_path)
368
369    def _initialize_running_fw_version(self, use_dev_signed_fw,
370                                       force_firmware_flashing):
371        """
372        Ensures that the running firmware version matches build version
373        and factory rollback settings; flashes to correct version if either
374        fails to match is requested to force flashing.
375
376        RO firmware: original version released at factory
377        RW firmware: firmware from current build
378        """
379        build_rw_firmware_version = \
380            self.get_build_rw_firmware_version(use_dev_signed_fw)
381        golden_ro_firmware_version = \
382            self.get_golden_ro_firmware_version(use_dev_signed_fw)
383        logging.info('Build RW firmware version: %s', build_rw_firmware_version)
384        logging.info('Golden RO firmware version: %s',
385                     golden_ro_firmware_version)
386
387        running_rw_firmware = self.ensure_running_rw_firmware()
388
389        fw_versions_match = self.running_fw_version_matches_given_version(
390            build_rw_firmware_version, golden_ro_firmware_version)
391
392        if not running_rw_firmware or not fw_versions_match \
393            or not self.is_rollback_set_to_initial_val() \
394            or force_firmware_flashing:
395            fw_file = self._build_fw_file
396            if use_dev_signed_fw:
397                fw_file = self.TEST_IMAGE_DEV
398            self.flash_rw_ro_firmware(fw_file)
399            if not self.running_fw_version_matches_given_version(
400                build_rw_firmware_version, golden_ro_firmware_version):
401                raise error.TestFail(
402                    'Running firmware version does not match expected version')
403
404    def _initialize_fw_entropy(self):
405        """Sets the entropy (key) in FPMCU flash (if not set)."""
406        result = self.run_cmd(self._INIT_ENTROPY_CMD)
407        if result.exit_status != 0:
408            raise error.TestFail('Unable to initialize entropy')
409
410    def _initialize_hw_and_sw_write_protect(self, enable_hardware_write_protect,
411                                            enable_software_write_protect):
412        """Enables/disables hardware/software write protect."""
413        # sw: 0, hw: 0 => initial_hw(0) -> sw(0) -> hw(0)
414        # sw: 0, hw: 1 => initial_hw(0) -> sw(0) -> hw(1)
415        # sw: 1, hw: 0 => initial_hw(1) -> sw(1) -> hw(0)
416        # sw: 1, hw: 1 => initial_hw(1) -> sw(1) -> hw(1)
417        hardware_write_protect_initial_enabled = True
418        if not enable_software_write_protect:
419            hardware_write_protect_initial_enabled = False
420
421        self.set_hardware_write_protect(hardware_write_protect_initial_enabled)
422
423        self.set_software_write_protect(enable_software_write_protect)
424        self.set_hardware_write_protect(enable_hardware_write_protect)
425
426    def get_fp_board(self):
427        """Returns name of fingerprint EC.
428
429        nocturne and nami are special cases and have "_fp" appended. Newer
430        FPMCUs have unique names.
431        See go/cros-fingerprint-firmware-branching-and-signing.
432        """
433        # Use cros_config to get fingerprint board.
434        # Due to b/160271883, we will try running the cmd via cat instead.
435        result = self._run_cros_config_cmd_cat('fingerprint/board')
436        if result.exit_status != 0:
437            raise error.TestFail(
438                'Unable to get fingerprint board with cros_config')
439        return result.stdout.rstrip()
440
441    def get_host_board(self):
442        """Returns name of the host board."""
443        return self.host.get_board().split(':')[-1]
444
445    def get_build_fw_file(self):
446        """Returns full path to build FW file on DUT."""
447        ls_cmd = 'ls %s/%s*.bin' % (
448            self._FINGERPRINT_BUILD_FW_DIR, self.fp_board)
449        result = self.run_cmd(ls_cmd)
450        if result.exit_status != 0:
451            raise error.TestFail(
452                'Unable to find firmware file on device:'
453                ' command failed (rc=%s): %s'
454                % (result.exit_status, result.stderr.strip() or ls_cmd))
455        ret = result.stdout.rstrip()
456        logging.info('Build firmware file: %s', ret)
457        return ret
458
459    def check_equal(self, a, b):
460        """Raises exception if "a" does not equal "b"."""
461        if a != b:
462            raise error.TestFail('"%s" does not match expected "%s" for board '
463                                 '%s' % (a, b, self.get_fp_board()))
464
465    def validate_build_fw_file(self,
466                               allowed_types=(_KEY_TYPE_PRE_MP, _KEY_TYPE_MP)):
467        """
468        Checks that all attributes in the given firmware file match their
469        expected values.
470
471        @param allowed_types: If key type is something else, raise TestFail.
472                              Default: pre-MP or MP.
473        @type allowed_types: tuple | list
474        """
475        build_fw_file = self._build_fw_file
476        # check hash
477        actual_hash = self._calculate_sha256sum(build_fw_file)
478        expected_hash = self._get_expected_firmware_hash(build_fw_file)
479        self.check_equal(actual_hash, expected_hash)
480
481        # check signing key_id
482        actual_key_id = self._read_firmware_key_id(build_fw_file)
483        expected_key_id = self._get_expected_firmware_key_id(build_fw_file)
484        self.check_equal(actual_key_id, expected_key_id)
485
486        # check that the signing key for firmware in the build
487        # is "pre mass production" (pre-mp) or "mass production" (MP)
488        key_type = self._get_key_type(actual_key_id)
489        if key_type not in allowed_types:
490            raise error.TestFail(
491                'Firmware key type must be %s for board %s; got %s (%s)' %
492                (' or '.join(allowed_types), self.fp_board, key_type,
493                 actual_key_id))
494
495        # check ro_version
496        actual_ro_version = self._read_firmware_ro_version(build_fw_file)
497        expected_ro_version = \
498            self._get_expected_firmware_ro_version(build_fw_file)
499        self.check_equal(actual_ro_version, expected_ro_version)
500
501        # check rw_version
502        actual_rw_version = self._read_firmware_rw_version(build_fw_file)
503        expected_rw_version = \
504            self._get_expected_firmware_rw_version(build_fw_file)
505        self.check_equal(actual_rw_version, expected_rw_version)
506
507        logging.info("Validated build firmware metadata.")
508
509    def _get_key_type(self, key_id):
510        """Returns the key "type" for a given "key id"."""
511        key_type = self._KEY_ID_MAP_.get(key_id)
512        if key_type is None:
513            raise error.TestFail('Unable to get key type for key id: %s'
514                                 % key_id)
515        return key_type
516
517    def _get_expected_firmware_info(self, build_fw_file, info_type):
518        """
519        Returns expected firmware info for a given firmware file name.
520        """
521        build_fw_file_name = os.path.basename(build_fw_file)
522
523        board = self.get_fp_board()
524        board_expected_fw_info = self._FIRMWARE_VERSION_MAP.get(board)
525        if board_expected_fw_info is None:
526            raise error.TestFail('Unable to get firmware info for board: %s'
527                                 % board)
528
529        expected_fw_info = board_expected_fw_info.get(build_fw_file_name)
530        if expected_fw_info is None:
531            raise error.TestFail('Unable to get firmware info for file: %s'
532                                 % build_fw_file_name)
533
534        ret = expected_fw_info.get(info_type)
535        if ret is None:
536            raise error.TestFail('Unable to get firmware info type: %s'
537                                 % info_type)
538
539        return ret
540
541    def _get_expected_firmware_hash(self, build_fw_file):
542        """Returns expected hash of firmware file."""
543        return self._get_expected_firmware_info(
544            build_fw_file, self._FIRMWARE_VERSION_SHA256SUM)
545
546    def _get_expected_firmware_key_id(self, build_fw_file):
547        """Returns expected "key id" for firmware file."""
548        return self._get_expected_firmware_info(
549            build_fw_file, self._FIRMWARE_VERSION_KEY_ID)
550
551    def _get_expected_firmware_ro_version(self, build_fw_file):
552        """Returns expected RO version for firmware file."""
553        return self._get_expected_firmware_info(
554            build_fw_file, self._FIRMWARE_VERSION_RO_VERSION)
555
556    def _get_expected_firmware_rw_version(self, build_fw_file):
557        """Returns expected RW version for firmware file."""
558        return self._get_expected_firmware_info(
559            build_fw_file, self._FIRMWARE_VERSION_RW_VERSION)
560
561    def _read_firmware_key_id(self, file_name):
562        """Returns "key id" as read from the given file."""
563        result = self._run_futility_show_cmd(file_name)
564        parsed = self._parse_colon_delimited_output(result)
565        key_id = parsed.get(self._FUTILITY_KEY_ID_KEY_NAME)
566        if key_id is None:
567            raise error.TestFail('Failed to get key ID for file: %s'
568                                 % file_name)
569        return key_id
570
571    def _read_firmware_ro_version(self, file_name):
572        """Returns RO firmware version as read from the given file."""
573        return self._run_dump_fmap_cmd(file_name, 'RO_FRID')
574
575    def _read_firmware_rw_version(self, file_name):
576        """Returns RW firmware version as read from the given file."""
577        return self._run_dump_fmap_cmd(file_name, 'RW_FWID')
578
579    def _calculate_sha256sum(self, file_name):
580        """Returns SHA256 hash of the given file contents."""
581        result = self._run_sha256sum_cmd(file_name)
582        return result.stdout.split()[0]
583
584    def _get_running_firmware_info(self, key):
585        """
586        Returns requested firmware info (RW version, RO version, or firmware
587        type).
588        """
589        result = self._run_ectool_cmd('version')
590        parsed = self._parse_colon_delimited_output(result.stdout)
591        if result.exit_status != 0:
592            raise error.TestFail('Failed to get running firmware info')
593        info = parsed.get(key)
594        if info is None:
595            raise error.TestFail(
596                'Failed to get running firmware info: %s' % key)
597        return info
598
599    def get_running_rw_firmware_version(self):
600        """Returns running RW firmware version."""
601        return self._get_running_firmware_info(self._ECTOOL_RW_VERSION)
602
603    def get_running_ro_firmware_version(self):
604        """Returns running RO firmware version."""
605        return self._get_running_firmware_info(self._ECTOOL_RO_VERSION)
606
607    def get_running_firmware_type(self):
608        """Returns type of firmware we are running (RW or RO)."""
609        return self._get_running_firmware_info(self._ECTOOL_FIRMWARE_COPY)
610
611    def _get_rollback_info(self, info_type):
612        """Returns requested type of rollback info."""
613        result = self._run_ectool_cmd('rollbackinfo')
614        parsed = self._parse_colon_delimited_output(result.stdout)
615        if result.exit_status != 0:
616            raise error.TestFail('Failed to get rollback info')
617        info = parsed.get(info_type)
618        if info is None:
619            raise error.TestFail('Failed to get rollback info: %s' % info_type)
620        return info
621
622    def get_rollback_id(self):
623        """Returns rollback ID."""
624        return self._get_rollback_info(self._ECTOOL_ROLLBACK_BLOCK_ID)
625
626    def get_rollback_min_version(self):
627        """Returns rollback min version."""
628        return self._get_rollback_info(self._ECTOOL_ROLLBACK_MIN_VERSION)
629
630    def get_rollback_rw_version(self):
631        """Returns RW rollback version."""
632        return self._get_rollback_info(self._ECTOOL_ROLLBACK_RW_VERSION)
633
634    def _construct_dev_version(self, orig_version):
635        """
636        Given a "regular" version string from a signed build, returns the
637        special "dev" version that we use when creating the test images.
638        """
639        fw_version = orig_version
640        if len(fw_version) + len('.dev') > 31:
641            fw_version = fw_version[:27]
642        fw_version = fw_version + '.dev'
643        return fw_version
644
645    def get_golden_ro_firmware_version(self, use_dev_signed_fw):
646        """Returns RO firmware version used in factory."""
647        board = self.get_fp_board()
648        golden_version = self._GOLDEN_RO_FIRMWARE_VERSION_MAP.get(board)
649        if isinstance(golden_version, dict):
650            golden_version = golden_version.get(self.get_host_board())
651        if golden_version is None:
652            raise error.TestFail('Unable to get golden RO version for board: %s'
653                                 % board)
654        if use_dev_signed_fw:
655            golden_version = self._construct_dev_version(golden_version)
656        return golden_version
657
658    def get_build_rw_firmware_version(self, use_dev_signed_fw):
659        """Returns RW firmware version from build."""
660        fw_version = self._read_firmware_rw_version(self._build_fw_file)
661        if use_dev_signed_fw:
662            fw_version = self._construct_dev_version(fw_version)
663        return fw_version
664
665    def ensure_running_rw_firmware(self):
666        """
667        Check whether the device is running RW firmware. If not, try rebooting
668        to RW.
669
670        @return true if successfully verified running RW firmware, false
671        otherwise.
672        """
673        try:
674            if self.get_running_firmware_type() != self._FIRMWARE_TYPE_RW:
675                self._reboot_ec()
676                if self.get_running_firmware_type() != self._FIRMWARE_TYPE_RW:
677                    # RW may be corrupted.
678                    return False
679        except:
680            # We may not always be able to read the firmware version.
681            # For example, if the firmware is erased due to RDP1, running any
682            # commands (such as getting the version) won't work.
683            return False
684        return True
685
686    def running_fw_version_matches_given_version(self, rw_version, ro_version):
687        """
688        Returns True if the running RO and RW firmware versions match the
689        provided versions.
690        """
691        try:
692            running_rw_firmware_version = self.get_running_rw_firmware_version()
693            running_ro_firmware_version = self.get_running_ro_firmware_version()
694
695            logging.info('RW firmware, running: %s, expected: %s',
696                         running_rw_firmware_version, rw_version)
697            logging.info('RO firmware, running: %s, expected: %s',
698                         running_ro_firmware_version, ro_version)
699
700            return (running_rw_firmware_version == rw_version and
701                    running_ro_firmware_version == ro_version)
702        except:
703            # We may not always be able to read the firmware version.
704            # For example, if the firmware is erased due to RDP1, running any
705            # commands (such as getting the version) won't work.
706            return False
707
708    def is_rollback_set_to_initial_val(self):
709        """
710        Returns True if rollbackinfo matches the initial value that it
711        should have coming from the factory.
712        """
713        return (self.get_rollback_id() ==
714                self._ROLLBACK_INITIAL_BLOCK_ID
715                and
716                self.get_rollback_min_version() ==
717                self._ROLLBACK_INITIAL_MIN_VERSION
718                and
719                self.get_rollback_rw_version() ==
720                self._ROLLBACK_INITIAL_RW_VERSION)
721
722    def is_rollback_unset(self):
723        """
724        Returns True if rollbackinfo matches the uninitialized value that it
725        should have after flashing the entire flash.
726        """
727        return (self.get_rollback_id() == self._ROLLBACK_ZERO_BLOCK_ID
728                and self.get_rollback_min_version() ==
729                self._ROLLBACK_INITIAL_MIN_VERSION
730                and self.get_rollback_rw_version() ==
731                self._ROLLBACK_INITIAL_RW_VERSION)
732
733    def biod_upstart_job_enabled(self):
734        """Returns whether biod's upstart job file is at original location."""
735        return self.host.is_file_exists(
736                os.path.join(self._UPSTART_DIR, self._BIOD_UPSTART_JOB_FILE))
737
738    def disable_biod_upstart_job(self):
739        """
740        Disable biod's upstart job so that biod will not run after a reboot.
741        """
742        logging.info('Disabling biod\'s upstart job')
743        filesystem_util.make_rootfs_writable(self.host)
744        cmd = 'mv %s %s' % (os.path.join(
745                self._UPSTART_DIR,
746                self._BIOD_UPSTART_JOB_FILE), self._STATEFUL_PARTITION_DIR)
747        result = self.run_cmd(cmd)
748        if result.exit_status != 0:
749            raise error.TestFail('Unable to disable biod upstart job: %s' %
750                                 result.stderr.strip())
751
752    def enable_biod_upstart_job(self):
753        """
754        Enable biod's upstart job so that biod will run after a reboot.
755        """
756        logging.info('Enabling biod\'s upstart job')
757        filesystem_util.make_rootfs_writable(self.host)
758        cmd = 'mv %s %s' % (os.path.join(
759                self._STATEFUL_PARTITION_DIR,
760                self._BIOD_UPSTART_JOB_FILE), self._UPSTART_DIR)
761        result = self.run_cmd(cmd)
762        if result.exit_status != 0:
763            raise error.TestFail('Unable to enable biod upstart job: %s' %
764                                 result.stderr.strip())
765
766    def fp_updater_is_enabled(self):
767        """Returns whether the fingerprint firmware updater is disabled."""
768        return not self.host.is_file_exists(
769                os.path.join(self._FINGERPRINT_BUILD_FW_DIR,
770                             self._DISABLE_FP_UPDATER_FILE))
771
772    def disable_fp_updater(self):
773        """Disable the fingerprint firmware updater."""
774        filesystem_util.make_rootfs_writable(self.host)
775        touch_cmd = 'touch %s' % os.path.join(self._FINGERPRINT_BUILD_FW_DIR,
776                                              self._DISABLE_FP_UPDATER_FILE)
777        logging.info('Disabling fp firmware updater')
778        result = self.run_cmd(touch_cmd)
779        if result.exit_status != 0:
780            raise error.TestFail(
781                    'Unable to write file to disable fp updater:'
782                    ' command failed (rc=%s): %s' %
783                    (result.exit_status, result.stderr.strip() or touch_cmd))
784        self.run_cmd('sync')
785
786    def enable_fp_updater(self):
787        """
788        Enable the fingerprint firmware updater. Must be called only after
789        disable_fp_updater().
790        """
791        filesystem_util.make_rootfs_writable(self.host)
792        rm_cmd = 'rm %s' % os.path.join(self._FINGERPRINT_BUILD_FW_DIR,
793                                        self._DISABLE_FP_UPDATER_FILE)
794        logging.info('Enabling fp firmware updater')
795        result = self.run_cmd(rm_cmd)
796        if result.exit_status != 0:
797            raise error.TestFail(
798                    'Unable to rm .disable_fp_updater:'
799                    ' command failed (rc=%s): %s' %
800                    (result.exit_status, result.stderr.strip() or rm_cmd))
801        self.run_cmd('sync')
802
803    def flash_rw_ro_firmware(self, fw_path):
804        """Flashes *all* firmware (both RO and RW)."""
805        self.set_hardware_write_protect(False)
806        flash_cmd = 'flash_fp_mcu' + ' ' + fw_path
807        logging.info('Running flash cmd: %s', flash_cmd)
808        flash_result = self.run_cmd(flash_cmd)
809        self.set_hardware_write_protect(True)
810
811        # Zork cannot rebind cros-ec-uart after flashing, so an AP reboot is
812        # needed to talk to FPMCU. See b/170213489.
813        # We have to do this even if flashing failed.
814        if hasattr(self, '_dut_needs_reboot') and self._dut_needs_reboot:
815            self.host.reboot()
816            if self.fp_updater_is_enabled():
817                raise error.TestFail(
818                        'Fp updater was not disabled when firmware is flashed')
819            # If we just re-enable fp updater, it can still update (race
820            # condition), so do it later in cleanup.
821
822        if flash_result.exit_status != 0:
823            raise error.TestFail('Flashing RW/RO firmware failed')
824
825    def is_hardware_write_protect_enabled(self):
826        """Returns state of hardware write protect."""
827        fw_wp_state = self.servo.get('fw_wp_state')
828        return fw_wp_state == 'on' or fw_wp_state == 'force_on'
829
830    def set_hardware_write_protect(self, enable):
831        """Enables or disables hardware write protect."""
832        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
833
834    def set_software_write_protect(self, enable):
835        """Enables or disables software write protect."""
836        arg  = 'enable' if enable else 'disable'
837        self._run_ectool_cmd('flashprotect ' + arg)
838        # TODO(b/116396469): The flashprotect command returns an error even on
839        # success.
840        # if result.exit_status != 0:
841        #    raise error.TestFail('Failed to modify software write protect')
842
843        # TODO(b/116396469): "flashprotect enable" command is slow, so wait for
844        # it to complete before attempting to reboot.
845        time.sleep(2)
846        self._reboot_ec()
847
848    def _reboot_ec(self):
849        """Reboots the fingerprint MCU (FPMCU)."""
850        self._run_ectool_cmd('reboot_ec')
851        # TODO(b/116396469): The reboot_ec command returns an error even on
852        # success.
853        # if result.exit_status != 0:
854        #    raise error.TestFail('Failed to reboot ec')
855        time.sleep(2)
856
857    def get_files_from_dut(self, src, dst):
858        """Copes files from DUT to server."""
859        logging.info('Copying files from (%s) to (%s).', src, dst)
860        self.host.get_file(src, dst, delete_dest=True)
861
862    def copy_files_to_dut(self, src_dir, dst_dir):
863        """Copies files from server to DUT."""
864        logging.info('Copying files from (%s) to (%s).', src_dir, dst_dir)
865        self.host.send_file(src_dir, dst_dir, delete_dest=True)
866        # Sync the filesystem in case we need to reboot the AP soon.
867        self.run_cmd('sync')
868
869    def run_server_cmd(self, command, timeout=60):
870        """Runs command on server; return result with output and exit code."""
871        logging.info('Server execute: %s', command)
872        result = utils.run(command, timeout=timeout, ignore_status=True)
873        logging.info('exit_code: %d', result.exit_status)
874        logging.info('stdout:\n%s', result.stdout)
875        logging.info('stderr:\n%s', result.stderr)
876        return result
877
878    def run_cmd(self, command, timeout=300):
879        """Runs command on the DUT; return result with output and exit code."""
880        logging.debug('DUT Execute: %s', command)
881        result = self.host.run(command, timeout=timeout, ignore_status=True)
882        logging.info('exit_code: %d', result.exit_status)
883        logging.info('stdout:\n%s', result.stdout)
884        logging.info('stderr:\n%s', result.stderr)
885        return result
886
887    def _run_ectool_cmd(self, command):
888        """Runs ectool on DUT; return result with output and exit code."""
889        cmd = 'ectool ' + self._CROS_FP_ARG + ' ' + command
890        result = self.run_cmd(cmd)
891        return result
892
893    def _run_cros_config_cmd(self, command):
894        """Runs cros_config on DUT; return result with output and exit code."""
895        cmd = 'cros_config ' + self._CROS_CONFIG_FINGERPRINT_PATH + ' ' \
896              + command
897        result = self.run_cmd(cmd)
898        return result
899
900    def _run_cros_config_cmd_cat(self, command):
901        """Runs cat /run/chromeos-config/v1 on DUT; return result."""
902        cmd = "cat /run/chromeos-config/v1/{}".format(command)
903        return self.run_cmd(cmd)
904
905    def _run_dump_fmap_cmd(self, fw_file, section):
906        """
907        Runs "dump_fmap" on DUT for given file.
908        Returns value of given section.
909        """
910        # Write result to stderr while redirecting stderr to stdout
911        # and dropping stdout. This is done because dump_map only writes the
912        # value read from a section to a file (will not just print it to
913        # stdout).
914        cmd = 'dump_fmap -x ' + fw_file + ' ' + section +\
915              ':/dev/stderr /dev/stderr >& /dev/stdout > /dev/null'
916        result = self.run_cmd(cmd)
917        if result.exit_status != 0:
918            raise error.TestFail('Failed to read section: %s' % section)
919        return result.stdout.rstrip('\0')
920
921    def _run_futility_show_cmd(self, fw_file):
922        """
923        Runs "futility show" on DUT for given file.
924        Returns stdout on success.
925        """
926        futility_cmd = 'futility show ' + fw_file
927        result = self.run_cmd(futility_cmd)
928        if result.exit_status != 0:
929            raise error.TestFail('Unable to run futility on device')
930        return result.stdout
931
932    def _run_sha256sum_cmd(self, file_name):
933        """
934        Runs "sha256sum" on DUT for given file.
935        Returns stdout on success.
936        """
937        sha_cmd = 'sha256sum ' + file_name
938        result = self.run_cmd(sha_cmd)
939        if result.exit_status != 0:
940            raise error.TestFail('Unable to calculate sha256sum on device')
941        return result
942
943    def run_test(self, test_name, *args):
944        """Runs test on DUT."""
945        logging.info('Running %s', test_name)
946        # Redirecting stderr to stdout since some commands intentionally fail
947        # and it's easier to read when everything ordered in the same output
948        test_cmd = ' '.join([os.path.join(self._dut_working_dir, test_name)] +
949                            list(args) + ['2>&1'])
950        # Change the working dir so we can write files from within the test
951        # (otherwise defaults to $HOME (/root), which is not usually writable)
952        # Note that dut_working_dir is automatically cleaned up so tests don't
953        # need to worry about files from previous invocations or other tests.
954        test_cmd = '(cd ' + self._dut_working_dir + ' && ' + test_cmd + ')'
955        logging.info('Test command: %s', test_cmd)
956        result = self.run_cmd(test_cmd)
957        if result.exit_status != 0:
958            raise error.TestFail(test_name + ' failed')
959