• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7
8from autotest_lib.server import test
9from autotest_lib.client.common_lib import error, utils
10from autotest_lib.server.cros import gsutil_wrapper
11from autotest_lib.server.cros.dynamic_suite import constants as ds_constants
12
13
14class FingerprintTest(test.test):
15    """Base class that sets up helpers for fingerprint tests."""
16    version = 1
17
18    _FINGERPRINT_BOARD_NAME_SUFFIX = '_fp'
19
20    # Location of firmware from the build on the DUT
21    _FINGERPRINT_BUILD_FW_GLOB = '/opt/google/biod/fw/*_fp*.bin'
22
23    _GENIMAGES_SCRIPT_NAME = 'gen_test_images.sh'
24    _GENIMAGES_OUTPUT_DIR_NAME = 'images'
25
26    _TEST_IMAGE_FORMAT_MAP = {
27        'TEST_IMAGE_ORIGINAL': '%s.bin',
28        'TEST_IMAGE_DEV': '%s.dev',
29        'TEST_IMAGE_CORRUPT_FIRST_BYTE': '%s_corrupt_first_byte.bin',
30        'TEST_IMAGE_CORRUPT_LAST_BYTE': '%s_corrupt_last_byte.bin',
31        'TEST_IMAGE_DEV_RB_ZERO': '%s.dev.rb0',
32        'TEST_IMAGE_DEV_RB_ONE': '%s.dev.rb1',
33        'TEST_IMAGE_DEV_RB_NINE': '%s.dev.rb9'
34    }
35
36    _ROLLBACK_INITIAL_BLOCK_ID = '1'
37    _ROLLBACK_INITIAL_MIN_VERSION = '0'
38    _ROLLBACK_INITIAL_RW_VERSION = '0'
39
40    _SERVER_GENERATED_FW_DIR_NAME = 'generated_fw'
41
42    _DUT_TMP_PATH_BASE = '/tmp/fp_test'
43
44    _GOLDEN_RO_FIRMWARE_VERSION_MAP = {
45        'nocturne_fp': 'nocturne_fp_v2.2.64-58cf5974e'
46    }
47
48    _BIOD_UPSTART_JOB_NAME = 'biod'
49    # TODO(crbug.com/925545)
50    _TIMBERSLIDE_UPSTART_JOB_NAME = \
51        'timberslide LOG_PATH=/sys/kernel/debug/cros_fp/console_log'
52
53    _INIT_ENTROPY_CMD = 'bio_wash --factory_init'
54
55    _CROS_FP_ARG = '--name=cros_fp'
56    _ECTOOL_RO_VERSION = 'RO version'
57    _ECTOOL_RW_VERSION = 'RW version'
58    _ECTOOL_ROLLBACK_BLOCK_ID = 'Rollback block id'
59    _ECTOOL_ROLLBACK_MIN_VERSION = 'Rollback min version'
60    _ECTOOL_ROLLBACK_RW_VERSION = 'RW rollback version'
61
62    @staticmethod
63    def _parse_ectool_output(ectool_output):
64        """Converts ectool colon delimited output into python dict.
65
66        Example:
67        RO version:    nocturne_fp_v2.2.64-58cf5974e
68        RW version:    nocturne_fp_v2.2.110-b936c0a3c
69
70        becomes:
71        {
72          'RO version': 'nocturne_fp_v2.2.64-58cf5974e',
73          'RW version': 'nocturne_fp_v2.2.110-b936c0a3c'
74        }
75        """
76        ret = {}
77        try:
78            for line in ectool_output.strip().split('\n'):
79                key = line.split(':', 1)[0].strip()
80                val = line.split(':', 1)[1].strip()
81                ret[key] = val
82        except:
83            raise error.TestFail('Unable to parse ectool output: %s'
84                                 % ectool_output)
85        return ret
86
87    def initialize(self, host, test_dir, use_dev_signed_fw=False):
88        """Performs initialization."""
89        self.host = host
90        self.servo = host.servo
91
92        self._validate_compatible_servo_version()
93
94        self.servo.initialize_dut()
95
96        logging.info('HW write protect enabled: %s',
97                     self.is_hardware_write_protect_enabled())
98
99        # TODO(crbug.com/925545): stop timberslide so /var/log/cros_fp.log
100        # continues to update after flashing.
101        self._timberslide_running = self.host.upstart_status(
102            self._TIMBERSLIDE_UPSTART_JOB_NAME)
103        if self._timberslide_running:
104            logging.info('Stopping %s', self._TIMBERSLIDE_UPSTART_JOB_NAME)
105            self.host.upstart_stop(self._TIMBERSLIDE_UPSTART_JOB_NAME)
106
107        self._biod_running = self.host.upstart_status(
108            self._BIOD_UPSTART_JOB_NAME)
109        if self._biod_running:
110            logging.info('Stopping %s', self._BIOD_UPSTART_JOB_NAME)
111            self.host.upstart_stop(self._BIOD_UPSTART_JOB_NAME)
112
113        # create tmp working directory on device (automatically cleaned up)
114        self._dut_working_dir = self.host.get_tmp_dir(
115            parent=self._DUT_TMP_PATH_BASE)
116        logging.info('Created dut_working_dir: %s', self._dut_working_dir)
117        self.copy_files_to_dut(test_dir, self._dut_working_dir)
118
119        self._build_fw_file = self.get_build_fw_file()
120
121        gen_script = os.path.abspath(os.path.join(self.autodir,
122                                                  'server', 'cros', 'faft',
123                                                  self._GENIMAGES_SCRIPT_NAME))
124        self._dut_firmware_test_images_dir = \
125            self._generate_test_firmware_images(gen_script,
126                                                self._build_fw_file,
127                                                self._dut_working_dir)
128        logging.info('dut_firmware_test_images_dir: %s',
129                     self._dut_firmware_test_images_dir)
130
131        self._initialize_test_firmware_image_attrs(
132            self._dut_firmware_test_images_dir)
133
134        self._initialize_running_fw_version(use_dev_signed_fw)
135        self._initialize_fw_entropy()
136
137    def cleanup(self):
138        """Restores original state."""
139        # Once the tests complete we need to make sure we're running the
140        # original firmware (not dev version) and potentially reset rollback.
141        self._initialize_running_fw_version(False)
142        self._initialize_fw_entropy()
143        if hasattr(self, '_biod_running') and self._biod_running:
144            logging.info('Restarting biod')
145            self.host.upstart_restart(self._BIOD_UPSTART_JOB_NAME)
146        # TODO(crbug.com/925545)
147        if hasattr(self, '_timberslide_running') and self._timberslide_running:
148            logging.info('Restarting timberslide')
149            self.host.upstart_restart(self._TIMBERSLIDE_UPSTART_JOB_NAME)
150
151        super(FingerprintTest, self).cleanup()
152
153    def after_run_once(self):
154        """Logs which iteration just ran."""
155        logging.info('successfully ran iteration %d', self.iteration)
156
157    def _validate_compatible_servo_version(self):
158        """Asserts if a compatible servo version is not attached."""
159        servo_version = self.servo.get_servo_version()
160        logging.info('servo version: %s', servo_version)
161        if not servo_version.startswith('servo_v4'):
162            raise error.TestFail(
163                'These tests have only been tested while using servo v4')
164
165    def _generate_test_firmware_images(self, gen_script, build_fw_file,
166                                       dut_working_dir):
167        """
168        Copies the fingerprint firmware from the DUT to the server running
169        the tests, which runs a script to generate various test versions of
170        the firmware.
171
172        @return full path to location of test images on DUT
173        """
174        # create subdirectory under existing tmp dir
175        server_tmp_dir = os.path.join(self.tmpdir,
176                                      self._SERVER_GENERATED_FW_DIR_NAME)
177        os.mkdir(server_tmp_dir)
178        logging.info('server_tmp_dir: %s', server_tmp_dir)
179
180        # Copy firmware from device to server
181        self.get_files_from_dut(build_fw_file, server_tmp_dir)
182
183        # Run the test image generation script on server
184        pushd = os.getcwd()
185        os.chdir(server_tmp_dir)
186        cmd = ' '.join([gen_script,
187                        self.get_fp_board(),
188                        os.path.basename(build_fw_file)])
189        self.run_server_cmd(cmd)
190        os.chdir(pushd)
191
192        # Copy resulting files to DUT tmp dir
193        server_generated_images_dir = \
194            os.path.join(server_tmp_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
195        self.copy_files_to_dut(server_generated_images_dir, dut_working_dir)
196
197        return os.path.join(dut_working_dir, self._GENIMAGES_OUTPUT_DIR_NAME)
198
199    def _initialize_test_firmware_image_attrs(self, dut_fw_test_images_dir):
200        """Sets attributes with full path to test images on DUT.
201
202        Example: self.TEST_IMAGE_DEV = /some/path/images/nocturne_fp.dev
203        """
204        for key, val in self._TEST_IMAGE_FORMAT_MAP.iteritems():
205            full_path = os.path.join(dut_fw_test_images_dir,
206                                     val % self.get_fp_board())
207            setattr(self, key, full_path)
208
209    def _initialize_running_fw_version(self, use_dev_signed_fw):
210        """
211        Ensures that the running firmware version matches build version
212        and factory rollback settings; flashes to correct version if either
213        fails to match.
214
215        RO firmware: original version released at factory
216        RW firmware: firmware from current build
217        """
218        build_rw_firmware_version = \
219            self.get_build_rw_firmware_version(use_dev_signed_fw)
220        golden_ro_firmware_version = \
221            self.get_golden_ro_firmware_version(use_dev_signed_fw)
222        logging.info('Build RW firmware version: %s', build_rw_firmware_version)
223        logging.info('Golden RO firmware version: %s',
224                     golden_ro_firmware_version)
225
226        fw_versions_match = self.running_fw_version_matches_given_version(
227            build_rw_firmware_version, golden_ro_firmware_version)
228
229        if not fw_versions_match or not self.is_rollback_set_to_initial_val():
230            fw_file = self._build_fw_file
231            if use_dev_signed_fw:
232                fw_file = self.TEST_IMAGE_DEV
233            self.flash_rw_ro_firmware(fw_file)
234            if not self.running_fw_version_matches_given_version(
235                build_rw_firmware_version, golden_ro_firmware_version):
236                raise error.TestFail(
237                    'Running firmware version does not match expected version')
238
239    def _initialize_fw_entropy(self):
240        """Sets the entropy (key) in FPMCU flash (if not set)."""
241        result = self.run_cmd(self._INIT_ENTROPY_CMD)
242        if result.exit_status != 0:
243            raise error.TestFail('Unable to initialize entropy')
244
245    def get_fp_board(self):
246        """Returns name of fingerprint EC."""
247        board = self.host.get_board().replace(ds_constants.BOARD_PREFIX, '')
248        return board + self._FINGERPRINT_BOARD_NAME_SUFFIX
249
250    def get_build_fw_file(self):
251        """Returns full path to build FW file on DUT."""
252        ls_cmd = 'ls ' + self._FINGERPRINT_BUILD_FW_GLOB
253        result = self.run_cmd(ls_cmd)
254        if result.exit_status != 0:
255            raise error.TestFail('Unable to find firmware from build on device')
256        ret = result.stdout.rstrip()
257        logging.info('Build firmware file: %s', ret)
258        return ret
259
260    def _get_running_firmware_version(self, fw_type):
261        """Returns requested firmware version (RW or RO)."""
262        result = self._run_ectool_cmd('version')
263        parsed = self._parse_ectool_output(result.stdout)
264        if result.exit_status != 0:
265            raise error.TestFail('Failed to get firmware version')
266        version = parsed.get(fw_type)
267        if version is None:
268            raise error.TestFail('Failed to get firmware version: %s' % fw_type)
269        return version
270
271    def get_running_rw_firmware_version(self):
272        """Returns running RW firmware version."""
273        return self._get_running_firmware_version(self._ECTOOL_RW_VERSION)
274
275    def get_running_ro_firmware_version(self):
276        """Returns running RO firmware version."""
277        return self._get_running_firmware_version(self._ECTOOL_RO_VERSION)
278
279    def _get_rollback_info(self, info_type):
280        """Returns requested type of rollback info."""
281        result = self._run_ectool_cmd('rollbackinfo')
282        parsed = self._parse_ectool_output(result.stdout)
283        # TODO(crbug.com/924283): rollbackinfo always returns an error
284        # if result.exit_status != 0:
285        #    raise error.TestFail('Failed to get rollback info')
286        info = parsed.get(info_type)
287        if info is None:
288            raise error.TestFail('Failed to get rollback info: %s' % info_type)
289        return info
290
291    def get_rollback_id(self):
292        """Returns rollback ID."""
293        return self._get_rollback_info(self._ECTOOL_ROLLBACK_BLOCK_ID)
294
295    def get_rollback_min_version(self):
296        """Returns rollback min version."""
297        return self._get_rollback_info(self._ECTOOL_ROLLBACK_MIN_VERSION)
298
299    def get_rollback_rw_version(self):
300        """Returns RW rollback version."""
301        return self._get_rollback_info(self._ECTOOL_ROLLBACK_RW_VERSION)
302
303    def _construct_dev_version(self, orig_version):
304        """
305        Given a "regular" version string from a signed build, returns the
306        special "dev" version that we use when creating the test images.
307        """
308        fw_version = orig_version
309        if len(fw_version) + len('.dev') > 31:
310            fw_version = fw_version[:27]
311        fw_version = fw_version + '.dev'
312        return fw_version
313
314    def get_golden_ro_firmware_version(self, use_dev_signed_fw):
315        """Returns RO firmware version used in factory."""
316        board = self.get_fp_board()
317        golden_version = self._GOLDEN_RO_FIRMWARE_VERSION_MAP.get(board)
318        if golden_version is None:
319            raise error.TestFail('Unable to get golden RO version for board: '
320                                 % board)
321        if use_dev_signed_fw:
322            golden_version = self._construct_dev_version(golden_version)
323        return golden_version
324
325    def get_build_rw_firmware_version(self, use_dev_signed_fw):
326        """Returns RW firmware version from build (based on filename)."""
327        fw_file = os.path.basename(self._build_fw_file)
328        if not fw_file.endswith('.bin'):
329            raise error.TestFail('Unexpected filename for RW firmware: '
330                                 % fw_file)
331        fw_version = fw_file[:-4]
332        if use_dev_signed_fw:
333            fw_version = self._construct_dev_version(fw_version)
334        return fw_version
335
336    def running_fw_version_matches_given_version(self, rw_version, ro_version):
337        """
338        Returns True if the running RO and RW firmware versions match the
339        provided versions.
340        """
341        running_rw_firmware_version = self.get_running_rw_firmware_version()
342        running_ro_firmware_version = self.get_running_ro_firmware_version()
343
344        logging.info('RW firmware, running: %s, expected: %s',
345                     running_rw_firmware_version, rw_version)
346        logging.info('RO firmware, running: %s, expected: %s',
347                     running_ro_firmware_version, ro_version)
348
349        return (running_rw_firmware_version == rw_version and
350                running_ro_firmware_version == ro_version)
351
352    def is_rollback_set_to_initial_val(self):
353        """
354        Returns True if rollbackinfo matches the initial value that it
355        should have coming from the factory.
356        """
357        return (self.get_rollback_id() ==
358                self._ROLLBACK_INITIAL_BLOCK_ID
359                and
360                self.get_rollback_min_version() ==
361                self._ROLLBACK_INITIAL_MIN_VERSION
362                and
363                self.get_rollback_rw_version() ==
364                self._ROLLBACK_INITIAL_RW_VERSION)
365
366    def _download_firmware(self, gs_path, dut_file_path):
367        """Downloads firmware from Google Storage bucket."""
368        bucket = os.path.dirname(gs_path)
369        filename = os.path.basename(gs_path)
370        logging.info('Downloading firmware, '
371                     'bucket: %s, filename: %s, dest: %s',
372                     bucket, filename, dut_file_path)
373        gsutil_wrapper.copy_private_bucket(host=self.host,
374                                           bucket=bucket,
375                                           filename=filename,
376                                           destination=dut_file_path)
377        return os.path.join(dut_file_path, filename)
378
379    def flash_rw_firmware(self, fw_path):
380        """Flashes the RW (read-write) firmware."""
381        flash_cmd = os.path.join(self._dut_working_dir,
382                                 'flash_fp_rw.sh' + ' ' + fw_path)
383        result = self.run_cmd(flash_cmd)
384        if result.exit_status != 0:
385            raise error.TestFail('Flashing RW firmware failed')
386
387    def flash_rw_ro_firmware(self, fw_path):
388        """Flashes *all* firmware (both RO and RW)."""
389        self.set_hardware_write_protect(False)
390        flash_cmd = 'flash_fp_mcu' + ' ' + fw_path
391        logging.info('Running flash cmd: %s', flash_cmd)
392        result = self.run_cmd(flash_cmd)
393        self.set_hardware_write_protect(True)
394        if result.exit_status != 0:
395            raise error.TestFail('Flashing RW/RO firmware failed')
396
397    def is_hardware_write_protect_enabled(self):
398        """Returns state of hardware write protect."""
399        fw_wp_state = self.servo.get('fw_wp_state')
400        return fw_wp_state == 'on' or fw_wp_state == 'force_on'
401
402    def set_hardware_write_protect(self, enable):
403        """Enables or disables hardware write protect."""
404        self.servo.set('fw_wp_state', 'force_on' if enable else 'force_off')
405
406    def get_files_from_dut(self, src, dst):
407        """Copes files from DUT to server."""
408        logging.info('Copying files from (%s) to (%s).', src, dst)
409        self.host.get_file(src, dst, delete_dest=True)
410
411    def copy_files_to_dut(self, src_dir, dst_dir):
412        """Copies files from server to DUT."""
413        logging.info('Copying files from (%s) to (%s).', src_dir, dst_dir)
414        self.host.send_file(src_dir, dst_dir, delete_dest=True)
415
416    def run_server_cmd(self, command, timeout=60):
417        """Runs command on server; return result with output and exit code."""
418        logging.info('Server execute: %s', command)
419        result = utils.run(command, timeout=timeout, ignore_status=True)
420        logging.info('exit_code: %d', result.exit_status)
421        logging.info('stdout:\n%s', result.stdout)
422        logging.info('stderr:\n%s', result.stderr)
423        return result
424
425    def run_cmd(self, command, timeout=300):
426        """Runs command on the DUT; return result with output and exit code."""
427        logging.debug('DUT Execute: %s', command)
428        result = self.host.run(command, timeout=timeout, ignore_status=True)
429        logging.info('exit_code: %d', result.exit_status)
430        logging.info('stdout:\n%s', result.stdout)
431        logging.info('stderr:\n%s', result.stderr)
432        return result
433
434    def _run_ectool_cmd(self, command):
435        """Runs ectool on DUT; return result with output and exit code."""
436        cmd = 'ectool ' + self._CROS_FP_ARG + ' ' + command
437        result = self.run_cmd(cmd)
438        return result
439
440    def run_test(self, test_name, *args):
441        """Runs test on DUT."""
442        logging.info('Running %s', test_name)
443        # Redirecting stderr to stdout since some commands intentionally fail
444        # and it's easier to read when everything ordered in the same output
445        test_cmd = ' '.join([os.path.join(self._dut_working_dir, test_name)] +
446                            list(args) + ['2>&1'])
447        logging.info('Test command: %s', test_cmd)
448        result = self.run_cmd(test_cmd)
449        if result.exit_status != 0:
450            raise error.TestFail(test_name + ' failed')
451