• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6import os
7import pprint
8
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils
12from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper
13from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
14
15
16class Cr50Test(FirmwareTest):
17    """
18    Base class that sets up helper objects/functions for cr50 tests.
19    """
20    version = 1
21
22    CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/'
23    CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin'
24    CR50_PROD_FILE = 'cr50.%s.bin.prod'
25    NONE = 0
26    # Saved the original device state during init.
27    INITIAL_STATE = 1 << 0
28    # Saved the original image, the device image, and the debug image. These
29    # images are needed to be able to restore the original image and board id.
30    IMAGES = 1 << 1
31
32    def initialize(self, host, cmdline_args, restore_cr50_state=False,
33                   cr50_dev_path='', provision_update=False):
34        self._saved_state = self.NONE
35        self._raise_error_on_mismatch = not restore_cr50_state
36        self._provision_update = provision_update
37        super(Cr50Test, self).initialize(host, cmdline_args)
38
39        if not hasattr(self, 'cr50'):
40            raise error.TestNAError('Test can only be run on devices with '
41                                    'access to the Cr50 console')
42
43        self.host = host
44        self._save_original_state()
45        # We successfully saved the device state
46        self._saved_state |= self.INITIAL_STATE
47        try:
48            self._save_node_locked_dev_image(cr50_dev_path)
49            self._save_original_images()
50            # We successfully saved the device images
51            self._saved_state |= self.IMAGES
52        except:
53            if restore_cr50_state:
54                raise
55
56
57    def _save_node_locked_dev_image(self, cr50_dev_path):
58        """Save or download the node locked dev image.
59
60        Args:
61            cr50_dev_path: The path to the node locked cr50 image.
62        """
63        if os.path.isfile(cr50_dev_path):
64            self._node_locked_cr50_image = cr50_dev_path
65        else:
66            devid = self.servo.get('cr50_devid')
67            self._node_locked_cr50_image = self.download_cr50_debug_image(
68                devid)[0]
69
70
71    def _save_original_images(self):
72        """Use the saved state to find all of the device images.
73
74        This will download running cr50 image and the device image.
75        """
76        # Copy the prod and prepvt images from the DUT
77        _, prod_rw, prod_bid = self._original_state['device_prod_ver']
78        filename = 'prod_device_image_' + prod_rw
79        self._device_prod_image = os.path.join(self.resultsdir,
80                filename)
81        self.host.get_file(cr50_utils.CR50_PROD,
82                self._device_prod_image)
83
84        if cr50_utils.HasPrepvtImage(self.host):
85            _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver']
86            filename = 'prepvt_device_image_' + prepvt_rw
87            self._device_prepvt_image = os.path.join(self.resultsdir,
88                    filename)
89            self.host.get_file(cr50_utils.CR50_PREPVT,
90                    self._device_prepvt_image)
91            prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid)
92        else:
93            self._device_prepvt_image = None
94            prepvt_rw = None
95            prepvt_bid = None
96
97        # If the running cr50 image version matches the image on the DUT use
98        # the DUT image as the original image. If the versions don't match
99        # download the image from google storage
100        _, running_rw, running_bid = self.get_saved_cr50_original_version()
101
102        # Make sure prod_bid and running_bid are in the same format
103        prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid)
104        running_bid = cr50_utils.GetBoardIdInfoString(running_bid)
105        if running_rw == prod_rw and running_bid == prod_bid:
106            logging.info('Using device cr50 prod image %s %s', prod_rw,
107                    prod_bid)
108            self._original_cr50_image = self._device_prod_image
109        elif running_rw == prepvt_rw and running_bid == prepvt_bid:
110            logging.info('Using device cr50 prepvt image %s %s', prepvt_rw,
111                    prepvt_bid)
112            self._original_cr50_image = self._device_prepvt_image
113        else:
114            logging.info('Downloading cr50 image %s %s', running_rw,
115                    running_bid)
116            self._original_cr50_image = self.download_cr50_release_image(
117                running_rw, running_bid)[0]
118
119
120    def _save_original_state(self):
121        """Save the cr50 related state.
122
123        Save the device's current cr50 version, cr50 board id, rlz, and image
124        at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to
125        restore the state during cleanup.
126        """
127        self._original_state = self.get_cr50_device_state()
128
129
130    def get_saved_cr50_original_version(self):
131        """Return (ro ver, rw ver, bid)"""
132        if ('running_ver' not in self._original_state or 'cr50_image_bid' not in
133            self._original_state):
134            raise error.TestError('No record of original cr50 image version')
135        return (self._original_state['running_ver'][0],
136                self._original_state['running_ver'][1],
137                self._original_state['cr50_image_bid'])
138
139
140    def get_saved_cr50_original_path(self):
141        """Return the local path for the original cr50 image"""
142        if not hasattr(self, '_original_cr50_image'):
143            raise error.TestError('No record of original image')
144        return self._original_cr50_image
145
146
147    def has_saved_cr50_dev_path(self):
148        """Returns true if we saved the node locked debug image"""
149        return hasattr(self, '_node_locked_cr50_image')
150
151
152    def get_saved_cr50_dev_path(self):
153        """Return the local path for the cr50 dev image"""
154        if not self.has_saved_cr50_dev_path():
155            raise error.TestError('No record of debug image')
156        return self._node_locked_cr50_image
157
158
159    def _restore_original_image(self, chip_bid, chip_flags):
160        """Restore the cr50 image and erase the state.
161
162        Make 3 attempts to update to the original image. Use a rollback from
163        the DBG image to erase the state that can only be erased by a DBG image.
164        Set the chip board id during rollback
165
166        Args:
167            chip_bid: the integer representation of chip board id or None if the
168                      board id should be erased
169            chip_flags: the integer representation of chip board id flags or
170                        None if the board id should be erased
171        """
172        for i in range(3):
173            try:
174                # Update to the node-locked DBG image so we can erase all of
175                # the state we are trying to reset
176                self.cr50_update(self._node_locked_cr50_image)
177
178                # Rollback to the original cr50 image.
179                self.cr50_update(self._original_cr50_image, rollback=True,
180                                 chip_bid=chip_bid, chip_flags=chip_flags)
181                break
182            except Exception, e:
183                logging.warning('Failed to restore original image attempt %d: '
184                                '%r', i, e)
185
186
187    def rootfs_verification_disable(self):
188        """Remove rootfs verification"""
189        if not self._rootfs_verification_is_disabled():
190            logging.debug('Removing rootfs verification.')
191            self.rootfs_tool.enable()
192
193
194    def _rootfs_verification_is_disabled(self):
195        """Returns true if rootfs verification is enabled"""
196        # Clear the TPM owner before trying to check rootfs verification
197        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
198        self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool()
199        self.rootfs_tool.initialize(self.host)
200        # rootfs_tool.is_enabled is True, that means rootfs verification is
201        # disabled.
202        return self.rootfs_tool.is_enabled()
203
204
205    def _restore_original_state(self):
206        """Restore the original cr50 related device state"""
207        if not (self._saved_state & self.IMAGES):
208            logging.warning('Did not save the original images. Cannot restore '
209                            'state')
210            return
211        # Remove the prepvt image if the test installed one.
212        if (not self._original_state['has_prepvt'] and
213            cr50_utils.HasPrepvtImage(self.host)):
214            self.host.run('rm %s' % cr50_utils.CR50_PREPVT)
215        # If rootfs verification has been disabled, copy the cr50 device image
216        # back onto the DUT.
217        if self._rootfs_verification_is_disabled():
218            cr50_utils.InstallImage(self.host, self._device_prod_image,
219                    cr50_utils.CR50_PROD)
220            # Install the prepvt image if there was one.
221            if self._device_prepvt_image:
222                cr50_utils.InstallImage(self.host, self._device_prepvt_image,
223                        cr50_utils.CR50_PREPVT)
224
225        chip_bid_info = self._original_state['chip_bid']
226        bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID
227        chip_bid = None if bid_is_erased else chip_bid_info[0]
228        chip_flags = None if bid_is_erased else chip_bid_info[2]
229        # Update to the original image and erase the board id
230        self._restore_original_image(chip_bid, chip_flags)
231
232        # Set the RLZ code
233        cr50_utils.SetRLZ(self.host, self._original_state['rlz'])
234
235        # Verify everything is still the same
236        mismatch = self._check_original_state()
237        if mismatch:
238            raise error.TestError('Could not restore state: %s' % mismatch)
239
240        logging.info('Successfully restored the original cr50 state')
241
242
243    def get_cr50_device_state(self):
244        """Get a dict with the current device cr50 information
245
246        The state dict will include the platform brand, rlz code, chip board id,
247        the running cr50 image version, the running cr50 image board id, and the
248        device cr50 image version.
249        """
250        state = {}
251        state['mosys platform brand'] = self.host.run('mosys platform brand',
252            ignore_status=True).stdout.strip()
253        state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host,
254                cr50_utils.CR50_PROD)
255        state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host)
256        if state['has_prepvt']:
257            state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host,
258                    cr50_utils.CR50_PREPVT)
259        else:
260            state['device_prepvt_ver'] = None
261        state['rlz'] = cr50_utils.GetRLZ(self.host)
262        state['chip_bid'] = cr50_utils.GetChipBoardId(self.host)
263        state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid']
264        state['running_ver'] = cr50_utils.GetRunningVersion(self.host)
265        state['cr50_image_bid'] = self.cr50.get_active_board_id_str()
266
267        logging.debug('Current Cr50 state:\n%s', pprint.pformat(state))
268        return state
269
270
271    def _check_original_state(self):
272        """Compare the current cr50 state to the original state
273
274        Returns:
275            A dictionary with the state that is wrong as the key and
276            the new and old state as the value
277        """
278        if not (self._saved_state & self.INITIAL_STATE):
279            logging.warning('Did not save the original state. Cannot verify it '
280                            'matches')
281            return
282        # Make sure the /var/cache/cr50* state is up to date.
283        cr50_utils.ClearUpdateStateAndReboot(self.host)
284
285        mismatch = {}
286        new_state = self.get_cr50_device_state()
287
288        for k, new_val in new_state.iteritems():
289            original_val = self._original_state[k]
290            if new_val != original_val:
291                mismatch[k] = 'old: %s, new: %s' % (original_val, new_val)
292
293        if mismatch:
294            logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch))
295        else:
296            logging.info('The device is in the original state')
297        return mismatch
298
299
300    def cleanup(self):
301        """Make sure the device state is the same as the start of the test"""
302        state_mismatch = self._check_original_state()
303
304        if state_mismatch and not self._provision_update:
305            self._restore_original_state()
306            if self._raise_error_on_mismatch:
307                raise error.TestError('Unexpected state mismatch during '
308                                      'cleanup %s' % state_mismatch)
309        super(Cr50Test, self).cleanup()
310
311
312    def find_cr50_gs_image(self, filename, image_type=None):
313        """Find the cr50 gs image name
314
315        Args:
316            filename: the cr50 filename to match to
317            image_type: release or debug. If it is not specified we will search
318                        both the release and debug directories
319        Returns:
320            a tuple of the gsutil bucket, filename
321        """
322        gs_url = self.CR50_GS_URL % (image_type if image_type else '*')
323        gs_filename = os.path.join(gs_url, filename)
324        bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1)
325        return bucket, gs_filename
326
327
328    def download_cr50_gs_image(self, filename, image_bid='', bucket=None,
329                               image_type=None):
330        """Get the image from gs and save it in the autotest dir
331
332        Args:
333            filename: The cr50 image basename
334            image_bid: the board id info list or string. It will be added to the
335                       filename.
336            bucket: The gs bucket name
337            image_type: 'debug' or 'release'. This will be used to determine
338                        the bucket if the bucket is not given.
339        Returns:
340            A tuple with the local path and version
341        """
342        # Add the image bid string to the filename
343        if image_bid:
344            bid_str = cr50_utils.GetBoardIdInfoString(image_bid,
345                                                       symbolic=True)
346            filename += '.' + bid_str.replace(':', '_')
347
348        if not bucket:
349            bucket, filename = self.find_cr50_gs_image(filename, image_type)
350
351        remote_temp_dir = '/tmp/'
352        src = os.path.join(remote_temp_dir, filename)
353        dest = os.path.join(self.resultsdir, filename)
354
355        # Copy the image to the dut
356        gsutil_wrapper.copy_private_bucket(host=self.host,
357                                           bucket=bucket,
358                                           filename=filename,
359                                           destination=remote_temp_dir)
360
361        self.host.get_file(src, dest)
362        ver = cr50_utils.GetBinVersion(self.host, src)
363
364        # Compare the image board id to the downloaded image to make sure we got
365        # the right file
366        downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True)
367        if image_bid and bid_str != downloaded_bid:
368            raise error.TestError('Could not download image with matching '
369                                  'board id wanted %s got %s' % (bid_str,
370                                  downloaded_bid))
371        return dest, ver
372
373
374    def download_cr50_debug_image(self, devid, image_bid=''):
375        """download the cr50 debug file
376
377        Get the file with the matching devid and image board id info
378
379        Args:
380            devid: the cr50_devid string '${DEVID0} ${DEVID1}'
381            image_bid: the image board id info string or list
382        Returns:
383            A tuple with the debug image local path and version
384        """
385        # Debug images are node locked with the devid. Add the devid to the
386        # filename
387        filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'))
388
389        # Download the image
390        dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
391                                                image_type='debug')
392
393        return dest, ver
394
395
396    def download_cr50_release_image(self, rw_ver, image_bid=''):
397        """download the cr50 release file
398
399        Get the file with the matching version and image board id info
400
401        Args:
402            rw_ver: the rw version string
403            image_bid: the image board id info string or list
404        Returns:
405            A tuple with the release image local path and version
406        """
407        # Release images can be found using the rw version
408        filename = self.CR50_PROD_FILE % rw_ver
409
410        # Download the image
411        dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid,
412                                                image_type='release')
413
414        # Compare the rw version and board id info to make sure the right image
415        # was found
416        if rw_ver != ver[1]:
417            raise error.TestError('Could not download image with matching '
418                                  'rw version')
419        return dest, ver
420
421
422    def _cr50_verify_update(self, expected_ver, expect_rollback):
423        """Verify the expected version is running on cr50
424
425        Args:
426            expect_ver: The RW version string we expect to be running
427            expect_rollback: True if cr50 should have rolled back during the
428                             update
429
430        Raises:
431            TestFail if there is any unexpected update state
432        """
433        errors = []
434        running_ver = self.cr50.get_version()
435        if expected_ver != running_ver:
436            errors.append('running %s not %s' % (running_ver, expected_ver))
437
438        if expect_rollback != self.cr50.rolledback():
439            errors.append('%srollback detected' %
440                          'no ' if expect_rollback else '')
441        if len(errors):
442            raise error.TestFail('cr50_update failed: %s' % ', '.join(errors))
443        logging.info('RUNNING %s after %s', expected_ver,
444                     'rollback' if expect_rollback else 'update')
445
446
447    def _cr50_run_update(self, path):
448        """Install the image at path onto cr50
449
450        Args:
451            path: the location of the image to update to
452
453        Returns:
454            the rw version of the image
455        """
456        tmp_dest = '/tmp/' + os.path.basename(path)
457
458        dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest)
459        cr50_utils.UsbUpdater(self.host, ['-a', dest])
460        return image_ver[1]
461
462
463    def cr50_update(self, path, rollback=False, erase_nvmem=False,
464                    expect_rollback=False, chip_bid=None, chip_flags=None):
465        """Attempt to update to the given image.
466
467        If rollback is True, we assume that cr50 is already running an image
468        that can rollback.
469
470        Args:
471            path: the location of the update image
472            rollback: True if we need to force cr50 to rollback to update to
473                      the given image
474            erase_nvmem: True if we need to erase nvmem during rollback
475            expect_rollback: True if cr50 should rollback on its own
476            chip_bid: the integer representation of chip board id or None if the
477                      board id should be erased during rollback
478            chip_flags: the integer representation of chip board id flags or
479                        None if the board id should be erased during rollback
480
481        Raises:
482            TestFail if the update failed
483        """
484        original_ver = self.cr50.get_version()
485
486        # Cr50 is going to reject an update if it hasn't been up for more than
487        # 60 seconds. Wait until that passes before trying to run the update.
488        self.cr50.wait_until_update_is_allowed()
489
490        rw_ver = self._cr50_run_update(path)
491
492        # Running the update may cause cr50 to reboot. Wait for that before
493        # sending more commands. The reboot should happen quickly. Wait a
494        # maximum of 10 seconds.
495        self.cr50.wait_for_reboot(10)
496
497        if erase_nvmem and rollback:
498            self.cr50.erase_nvmem()
499
500        if rollback:
501            self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags)
502
503        expected_ver = original_ver if expect_rollback else rw_ver
504        # If we expect a rollback, the version should remain unchanged
505        self._cr50_verify_update(expected_ver, rollback or expect_rollback)
506