• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import print_function
7
8import logging
9import os
10import pprint
11import six
12import time
13
14from autotest_lib.client.bin import utils
15from autotest_lib.client.common_lib import error, utils
16from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils
17from autotest_lib.server.cros import filesystem_util, gsutil_wrapper
18from autotest_lib.server.cros.faft.firmware_test import FirmwareTest
19
20
21class Cr50Test(FirmwareTest):
22    """Base class that sets up helper objects/functions for cr50 tests."""
23    version = 1
24
25    RELEASE_POOLS = ['faft-cr50-experimental', 'faft-cr50']
26    RESPONSE_TIMEOUT = 180
27    GS_PRIVATE = 'gs://chromeos-localmirror-private/distfiles/'
28    # Prod signed test images are stored in the private cr50 directory.
29    GS_PRIVATE_PROD = GS_PRIVATE + 'cr50/'
30    # Node locked test images are in this private debug directory.
31    GS_PRIVATE_DBG = GS_PRIVATE + 'chromeos-cr50-debug-0.0.11/'
32    GS_PUBLIC = 'gs://chromeos-localmirror/distfiles/'
33    CR50_PROD_FILE = 'cr50.r0.0.1*.w%s%s.tbz2'
34    CR50_DEBUG_FILE = '*/cr50.dbg.%s.bin.*%s'
35    CR50_ERASEFLASHINFO_FILE = (
36            '*/cr50_Unknown_NodeLocked-%s_cr50-accessory-mp.bin')
37    CR50_QUAL_VERSION_FILE = 'chromeos-cr50-QUAL_VERSION'
38    NONE = 0
39    # Saved the original device state during init.
40    INITIAL_IMAGE_STATE = 1 << 0
41    # Saved the original image, the device image, and the debug image. These
42    # images are needed to be able to restore the original image and board id.
43    DEVICE_IMAGES = 1 << 1
44    DBG_IMAGE = 1 << 2
45    ERASEFLASHINFO_IMAGE = 1 << 3
46    # Different attributes of the device state require the test to download
47    # different images. STATE_IMAGE_RESTORES is a dictionary of the state each
48    # image type can restore.
49    STATE_IMAGE_RESTORES = {
50            DEVICE_IMAGES: ['prod_version', 'prepvt_version'],
51            DBG_IMAGE: ['running_image_ver', 'running_image_bid', 'chip_bid'],
52            ERASEFLASHINFO_IMAGE: ['chip_bid'],
53    }
54    PP_SHORT_INTERVAL = 3
55
56    def initialize(self,
57                   host,
58                   cmdline_args,
59                   full_args,
60                   restore_cr50_image=False,
61                   restore_cr50_board_id=False,
62                   provision_update=False):
63        self._saved_state = self.NONE
64        self._raise_error_on_mismatch = not restore_cr50_image
65        self._provision_update = provision_update
66        self.tot_test_run = full_args.get('tot_test_run', '').lower() == 'true'
67        super(Cr50Test, self).initialize(host, cmdline_args)
68
69        if not hasattr(self, 'cr50'):
70            raise error.TestNAError('Test can only be run on devices with '
71                                    'access to the Cr50 console')
72        # TODO(b/149948314): remove when dual-v4 is sorted out.
73        if 'ccd' in self.servo.get_servo_version():
74            self.servo.disable_ccd_watchdog_for_test()
75
76        logging.info('Test Args: %r', full_args)
77
78        self._devid = self.cr50.get_devid()
79        self.can_set_ccd_level = (not self.servo.main_device_is_ccd()
80                                  or self.cr50.testlab_is_on())
81        self.original_ccd_level = self.cr50.get_ccd_level()
82        self.original_ccd_settings = self.cr50.get_cap_dict(
83                info=self.cr50.CAP_SETTING)
84
85        self.host = host
86        # SSH commands should complete within 3 minutes. Change the default, so
87        # it doesn't take half an hour for commands to timeout when the DUT is
88        # down.
89        self.host.set_default_run_timeout(180)
90        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
91        # Clear the FWMP, so it can't disable CCD.
92        self.clear_fwmp()
93
94        # TODO(b/218492933) : find better way to disable rddkeepalive
95        # Disable rddkeepalive, so the test can disable ccd.
96        self.cr50.send_command('ccd testlab open')
97        self.cr50.send_command('rddkeepalive disable')
98        # faft-cr50 locks and reopens ccd. This will restrict some capabilities
99        # c2d2 uses to control the duts. Set the capabilities to Always, so
100        # individiual tests don't need to care that much.
101        self.cr50.enable_servo_control_caps()
102
103        if self.can_set_ccd_level:
104            # Lock cr50 so the console will be restricted
105            self.cr50.set_ccd_level('lock')
106        elif self.original_ccd_level != 'lock':
107            raise error.TestNAError(
108                    'Lock the console before running cr50 test')
109
110        self._save_original_state(full_args.get('release_path', ''))
111
112        # Try and download all images necessary to restore cr50 state.
113        try:
114            self._save_dbg_image(full_args.get('cr50_dbg_image_path', ''))
115            self._saved_state |= self.DBG_IMAGE
116        except Exception as e:
117            logging.warning('Error saving DBG image: %s', str(e))
118            if restore_cr50_image:
119                raise error.TestNAError('Need DBG image: %s' % str(e))
120
121        try:
122            self._save_eraseflashinfo_image(
123                    full_args.get('cr50_eraseflashinfo_image_path', ''))
124            if self.cr50.uses_board_property('BOARD_EC_CR50_COMM_SUPPORT'):
125                raise error.TestError('Board cannot boot EFI image')
126            self._saved_state |= self.ERASEFLASHINFO_IMAGE
127        except Exception as e:
128            logging.warning('Error saving eraseflashinfo image: %s', str(e))
129            if restore_cr50_board_id:
130                raise error.TestNAError(
131                        'Need eraseflashinfo image: %s' % str(e))
132
133        # TODO(b/143888583): remove qual update during init once new design to
134        # to provision cr50 updates is in place.
135        # Make sure the release image is running before starting the test.
136        is_release_qual = full_args.get('is_release_qual',
137                                        '').lower() == 'true'
138        if is_release_qual or self.running_cr50_release_suite():
139            release_ver_arg = full_args.get('release_ver', '')
140            release_path_arg = full_args.get('release_path', '')
141            self.ensure_qual_image_is_running(release_ver_arg,
142                                              release_path_arg)
143
144    def running_cr50_release_suite(self):
145        """Return True if the DUT is in a release pool."""
146        for pool in self.host.host_info_store.get().pools:
147            # TODO(b/149109740): remove once the pool values are verified.
148            # Change to run with faft-cr50 and faft-cr50-experimental suites.
149            logging.info('Checking pool: %s', pool)
150            if pool in self.RELEASE_POOLS:
151                logging.info('Running a release test.')
152                return True
153        return False
154
155    def ensure_qual_image_is_running(self, qual_ver_str, qual_path):
156        """Update to the qualification image if it's not running.
157
158        qual_ver_str and path are command line args that may be supplied to
159        specify a local version or path. If neither are supplied, the version
160        from gs will be used to determine what version cr50 should run.
161
162        qual_ver_str and qual_path should not be supplied together. If they are,
163        the path will be used. It's not a big deal as long as they agree with
164        each other.
165
166        @param qual_ver_str: qualification version string or None.
167        @param qual_path: local path to the qualification image or None.
168        """
169        # Get the local image information.
170        if qual_path:
171            dest, qual_ver = cr50_utils.InstallImage(self.host, qual_path,
172                                                     '/tmp/qual_cr50.bin')
173            self.host.run('rm ' + dest)
174            qual_bid_str = (cr50_utils.GetBoardIdInfoString(
175                    qual_ver[2], False) if qual_ver[2] else '')
176            qual_ver_str = '%s/%s' % (qual_ver[1], qual_bid_str)
177
178        # Determine the qualification version from.
179        if not qual_ver_str:
180            gsurl = os.path.join(self.GS_PRIVATE, self.CR50_QUAL_VERSION_FILE)
181            dut_path = self.download_cr50_gs_file(gsurl, False)[1]
182            qual_ver_str = self.host.run('cat ' + dut_path).stdout.strip()
183
184        # Download the qualification image based on the version.
185        if not qual_path:
186            rw, bid = qual_ver_str.split('/')
187            qual_path, qual_ver = self.download_cr50_release_image(rw, bid)
188
189        logging.info('Cr50 Qual Version: %s', qual_ver_str)
190        logging.info('Cr50 Qual Path: %s', qual_path)
191        qual_chip_bid = cr50_utils.GetChipBIDFromImageBID(
192                qual_ver[2], self.get_device_brand())
193        logging.info('Cr50 Qual Chip BID: %s', qual_chip_bid)
194
195        # Replace only the prod or prepvt image based on the major version.
196        if int(qual_ver[1].split('.')[1]) % 2:
197            prod_ver = qual_ver
198            prepvt_ver = self._original_image_state['prepvt_version']
199            prod_path = qual_path
200            prepvt_path = self._device_prepvt_image
201        else:
202            prod_ver = self._original_image_state['prod_version']
203            prepvt_ver = qual_ver
204            prod_path = self._device_prod_image
205            prepvt_path = qual_path
206
207        # Generate a dictionary with all of the expected state.
208        qual_state = {}
209        qual_state['prod_version'] = prod_ver
210        qual_state['prepvt_version'] = prepvt_ver
211        qual_state['chip_bid'] = qual_chip_bid
212        qual_state['running_image_bid'] = qual_ver[2]
213        # The test can't rollback RO. The newest RO should be running at the end
214        # of the test. max_ro will be none if the versions are the same. Use the
215        # running_ro in that case.
216        running_ro = self.get_saved_cr50_original_version()[0]
217        max_ro = cr50_utils.GetNewestVersion(running_ro, qual_ver[0])
218        qual_state['running_image_ver'] = (max_ro or running_ro, qual_ver[1],
219                                           None)
220        mismatch = self._check_running_image_and_board_id(qual_state)
221        if not mismatch:
222            logging.info('Running qual image. No update needed.')
223            return
224        logging.info('Cr50 qual update required.')
225        self.make_rootfs_writable()
226        self._update_device_images_and_running_cr50_firmware(
227                qual_state, qual_path, prod_path, prepvt_path)
228        logging.info("Recording qual device state as 'original' device state")
229        self._save_original_state(qual_path)
230
231    def make_rootfs_writable(self):
232        """Make rootfs writeable. Recover the dut if necessary."""
233        path = None
234        try:
235            filesystem_util.make_rootfs_writable(self.host)
236            return
237        except error.AutoservRunError as e:
238            if 'cannot remount' not in e.result_obj.stderr:
239                raise
240            path = e.result_obj.stderr.partition(
241                    'cannot remount')[2].split()[0]
242        # This shouldn't be possible.
243        if not path:
244            raise error.TestError('Need path to repair filesystem')
245        logging.info('repair %s', path)
246        # Repair the block. Assume yes to all questions. The exit status will be
247        # 3, so ignore errors. make_rootfs_writable will fail if something
248        # actually went wrong.
249        self.host.run('e2fsck -y %s' % path, ignore_status=True)
250        self.host.reboot()
251        filesystem_util.make_rootfs_writable(self.host)
252
253    def _saved_cr50_state(self, state):
254        """Returns True if the test has saved the given state
255
256        @param state: a integer representing the state to check.
257        """
258        return state & self._saved_state
259
260    def after_run_once(self):
261        """Log which iteration just ran"""
262        logging.info('successfully ran iteration %d', self.iteration)
263        self._try_to_bring_dut_up()
264
265    def _save_dbg_image(self, cr50_dbg_image_path):
266        """Save or download the node locked dev image.
267
268        @param cr50_dbg_image_path: The path to the node locked cr50 image.
269        """
270        if os.path.isfile(cr50_dbg_image_path):
271            self._dbg_image_path = cr50_dbg_image_path
272        else:
273            self._dbg_image_path = self.download_cr50_debug_image()[0]
274
275    def _save_eraseflashinfo_image(self, cr50_eraseflashinfo_image_path):
276        """Save or download the node locked eraseflashinfo image.
277
278        @param cr50_eraseflashinfo_image_path: The path to the node locked cr50
279                                               image.
280        """
281        if os.path.isfile(cr50_eraseflashinfo_image_path):
282            self._eraseflashinfo_image_path = cr50_eraseflashinfo_image_path
283        else:
284            self._eraseflashinfo_image_path = (
285                    self.download_cr50_eraseflashinfo_image()[0])
286
287    def _save_device_image(self, ext):
288        """Download the .prod or .prepvt device image and get the version.
289
290        @param ext: The Cr50 file extension: prod or prepvt.
291        @returns (local_path, rw_version, bid_string) or (None, None, None) if
292                 the file doesn't exist on the DUT.
293        """
294        version = self._original_image_state[ext + '_version']
295        if not version:
296            return None, None, None
297        _, rw_ver, bid = version
298        rw_filename = 'cr50.device.bin.%s.%s' % (ext, rw_ver)
299        local_path = os.path.join(self.resultsdir, rw_filename)
300        dut_path = cr50_utils.GetDevicePath(ext)
301        self.host.get_file(dut_path, local_path)
302        bid = cr50_utils.GetBoardIdInfoString(bid)
303        return local_path, rw_ver, bid
304
305    def _save_original_images(self, release_path):
306        """Use the saved state to find all of the device images.
307
308        This will download running cr50 image and the device image.
309
310        @param release_path: The release path given by test args
311        """
312        local_path, prod_rw, prod_bid = self._save_device_image('prod')
313        self._device_prod_image = local_path
314
315        local_path, prepvt_rw, prepvt_bid = self._save_device_image('prepvt')
316        self._device_prepvt_image = local_path
317
318        if os.path.isfile(release_path):
319            self._original_cr50_image = release_path
320            logging.info('using supplied image')
321            return
322        if self.tot_test_run:
323            self._original_cr50_image = self.download_cr50_tot_image()
324            return
325
326        # If the running cr50 image version matches the image on the DUT use
327        # the DUT image as the original image. If the versions don't match
328        # download the image from google storage
329        _, running_rw, running_bid = self.get_saved_cr50_original_version()
330
331        # Convert the running board id to the same format as the prod and
332        # prepvt board ids.
333        running_bid = cr50_utils.GetBoardIdInfoString(running_bid)
334        if running_rw == prod_rw and running_bid == prod_bid:
335            logging.info('Using device cr50 prod image %s %s', prod_rw,
336                         prod_bid)
337            self._original_cr50_image = self._device_prod_image
338        elif running_rw == prepvt_rw and running_bid == prepvt_bid:
339            logging.info('Using device cr50 prepvt image %s %s', prepvt_rw,
340                         prepvt_bid)
341            self._original_cr50_image = self._device_prepvt_image
342        else:
343            logging.info('Downloading cr50 image %s %s', running_rw,
344                         running_bid)
345            self._original_cr50_image = self.download_cr50_release_image(
346                    running_rw, running_bid)[0]
347
348    def _save_original_state(self, release_path):
349        """Save the cr50 related state.
350
351        Save the device's current cr50 version, cr50 board id, the running cr50
352        image, the prepvt, and prod cr50 images. These will be used to restore
353        the cr50 state during cleanup.
354
355        @param release_path: the optional command line arg of path for the local
356                             cr50 image.
357        """
358        self._saved_state &= ~self.INITIAL_IMAGE_STATE
359        self._original_image_state = self.get_image_and_bid_state()
360        # We successfully saved the device state
361        self._saved_state |= self.INITIAL_IMAGE_STATE
362        self._saved_state &= ~self.DEVICE_IMAGES
363        try:
364            self._save_original_images(release_path)
365            self._saved_state |= self.DEVICE_IMAGES
366        except Exception as e:
367            logging.warning('Error saving ChromeOS image cr50 firmware: %s',
368                            str(e))
369
370    def get_saved_cr50_original_version(self):
371        """Return (ro ver, rw ver, bid)."""
372        if ('running_image_ver' not in self._original_image_state
373                    or 'running_image_bid' not in self._original_image_state):
374            raise error.TestError('No record of original cr50 image version')
375        return (self._original_image_state['running_image_ver'][0],
376                self._original_image_state['running_image_ver'][1],
377                self._original_image_state['running_image_bid'])
378
379    def get_saved_cr50_original_path(self):
380        """Return the local path for the original cr50 image."""
381        if not hasattr(self, '_original_cr50_image'):
382            raise error.TestError('No record of original image')
383        return self._original_cr50_image
384
385    def has_saved_dbg_image_path(self):
386        """Returns true if we saved the node locked debug image."""
387        return hasattr(self, '_dbg_image_path')
388
389    def get_saved_dbg_image_path(self):
390        """Return the local path for the cr50 dev image."""
391        if not self.has_saved_dbg_image_path():
392            raise error.TestError('No record of debug image')
393        return self._dbg_image_path
394
395    def get_saved_eraseflashinfo_image_path(self):
396        """Return the local path for the cr50 eraseflashinfo image."""
397        if not hasattr(self, '_eraseflashinfo_image_path'):
398            raise error.TestError('No record of eraseflashinfo image')
399        return self._eraseflashinfo_image_path
400
401    def get_device_brand(self):
402        """Returns the 4 character device brand."""
403        return self._original_image_state['cros_config / brand-code']
404
405    def _retry_cr50_update(self, image, retries, rollback):
406        """Try to update to the given image retries amount of times.
407
408        @param image: The image path.
409        @param retries: The number of times to try to update.
410        @param rollback: Run rollback after the update.
411        @raises TestFail if the update failed.
412        """
413        for i in range(retries):
414            try:
415                return self.cr50_update(image, rollback=rollback)
416            except Exception as e:
417                logging.warning('Failed to update to %s attempt %d: %s',
418                                os.path.basename(image), i, str(e))
419                logging.info('Sleeping 60 seconds')
420                time.sleep(60)
421        raise error.TestError(
422                'Failed to update to %s' % os.path.basename(image))
423
424    def run_update_to_eraseflashinfo(self):
425        """Erase flashinfo using the eraseflashinfo image.
426
427        Update to the DBG image, rollback to the eraseflashinfo, and run
428        eraseflashinfo.
429        """
430        self._retry_cr50_update(self._dbg_image_path, 3, False)
431        self._retry_cr50_update(self._eraseflashinfo_image_path, 3, True)
432        if not self.cr50.eraseflashinfo():
433            raise error.TestError('Unable to erase the board id')
434
435    def eraseflashinfo_and_restore_image(self, image=''):
436        """eraseflashinfo and update to the given the image.
437
438        @param image: the image to end on. Use the original test image if no
439                      image is given.
440        """
441        image = image if image else self.get_saved_cr50_original_path()
442        self.run_update_to_eraseflashinfo()
443        self.cr50_update(image)
444
445    def update_cr50_image_and_board_id(self, image_path, bid):
446        """Set the chip board id and updating the cr50 image.
447
448        Make 3 attempts to update to the original image. Use a rollback from
449        the DBG image to erase the state that can only be erased by a DBG image.
450        Set the chip board id during rollback.
451
452        @param image_path: path of the image to update to.
453        @param bid: the board id to set.
454        """
455        current_bid = cr50_utils.GetChipBoardId(self.host)
456        bid_mismatch = current_bid != bid
457        set_bid = bid_mismatch and bid != cr50_utils.ERASED_CHIP_BID
458        bid_is_erased = current_bid == cr50_utils.ERASED_CHIP_BID
459        eraseflashinfo = bid_mismatch and not bid_is_erased
460
461        if (eraseflashinfo
462                    and not self._saved_cr50_state(self.ERASEFLASHINFO_IMAGE)):
463            raise error.TestFail('Did not save eraseflashinfo image')
464
465        # Remove prepvt and prod iamges, so they don't interfere with the test
466        # rolling back and updating to images that my be older than the images
467        # on the device.
468        if filesystem_util.is_rootfs_writable(self.host):
469            self.host.run('rm %s' % cr50_utils.CR50_PREPVT, ignore_status=True)
470            self.host.run('rm %s' % cr50_utils.CR50_PROD, ignore_status=True)
471
472        if eraseflashinfo:
473            self.run_update_to_eraseflashinfo()
474
475        self._retry_cr50_update(self._dbg_image_path, 3, False)
476
477        chip_bid = bid[0]
478        chip_flags = bid[2]
479        if set_bid:
480            self.cr50.set_board_id(chip_bid, chip_flags)
481
482        self._retry_cr50_update(image_path, 3, True)
483
484    def _discharging_factory_mode_cleanup(self):
485        """Try to get the dut back into charging mode.
486
487        Shutdown the DUT, fake disconnect AC, and then turn on the DUT to
488        try to recover the EC.
489
490        When Cr50 enters factory mode on Wilco, the EC disables charging.
491        Try to run the sequence to get the Wilco EC out of the factory mode
492        state, so it reenables charging.
493        """
494        if self.faft_config.chrome_ec:
495            return
496        charge_state = self.host.get_power_supply_info()['Battery']['state']
497        logging.info('Charge state: %r', charge_state)
498        if 'Discharging' not in charge_state:
499            logging.info('Charge state is ok')
500            return
501
502        if not self.servo.is_servo_v4_type_c():
503            raise error.TestError(
504                    'Cannot recover charging without Type C servo')
505        # Disconnect the charger and reset the dut to recover charging.
506        logging.info('Recovering charging')
507        self.faft_client.system.run_shell_command('poweroff')
508        time.sleep(self.cr50.SHORT_WAIT)
509        self.servo.set_nocheck('servo_v4_uart_cmd', 'fakedisconnect 100 20000')
510        time.sleep(self.cr50.SHORT_WAIT)
511        self._try_to_bring_dut_up()
512        charge_state = self.host.get_power_supply_info()['Battery']['state']
513        logging.info('Charge state: %r', charge_state)
514        if 'Discharging' in charge_state:
515            logging.warning('DUT still discharging')
516
517    def _cleanup_required(self, state_mismatch, image_type):
518        """Return True if the update can fix something in the mismatched state.
519
520        @param state_mismatch: a dictionary of the mismatched state.
521        @param image_type: The integer representing the type of image
522        """
523        state_image_restores = set(self.STATE_IMAGE_RESTORES[image_type])
524        restore = state_image_restores.intersection(state_mismatch.keys())
525        if restore and not self._saved_cr50_state(image_type):
526            raise error.TestError(
527                    'Did not save images to restore %s' % (', '.join(restore)))
528        return not not restore
529
530    def _get_image_information(self, ext):
531        """Get the image information for the .prod or .prepvt image.
532
533        @param ext: The extension string prod or prepvt
534        @param returns: The image version or None if the image doesn't exist.
535        """
536        dut_path = cr50_utils.GetDevicePath(ext)
537        file_exists = self.host.path_exists(dut_path)
538        if file_exists:
539            return cr50_utils.GetBinVersion(self.host, dut_path)
540        return None
541
542    def get_image_and_bid_state(self):
543        """Get a dict with the current device cr50 information.
544
545        The state dict will include the platform brand, chip board id, the
546        running cr50 image version, the running cr50 image board id, and the
547        device cr50 image version.
548        """
549        state = {}
550        state['cros_config / brand-code'] = self.host.run(
551                'cros_config / brand-code', ignore_status=True).stdout.strip()
552        state['prod_version'] = self._get_image_information('prod')
553        state['prepvt_version'] = self._get_image_information('prepvt')
554        state['chip_bid'] = cr50_utils.GetChipBoardId(self.host)
555        state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid']
556        state['running_image_ver'] = cr50_utils.GetRunningVersion(self.host)
557        state['running_image_bid'] = self.cr50.get_active_board_id_str()
558
559        logging.debug('Current Cr50 state:\n%s', pprint.pformat(state))
560        return state
561
562    def _check_running_image_and_board_id(self, expected_state):
563        """Compare the current image and board id to the given state.
564
565        @param expected_state: A dictionary of the state to compare to.
566        @return: A dictionary with the state that is wrong as the key and the
567                 expected and current state as the value.
568        """
569        if not (self._saved_state & self.INITIAL_IMAGE_STATE):
570            logging.warning(
571                    'Did not save the original state. Cannot verify it '
572                    'matches')
573            return
574        # Make sure the /var/cache/cr50* state is up to date.
575        cr50_utils.ClearUpdateStateAndReboot(self.host)
576
577        mismatch = {}
578        state = self.get_image_and_bid_state()
579
580        for k, expected_val in six.iteritems(expected_state):
581            val = state[k]
582            if val != expected_val:
583                mismatch[k] = 'expected: %s, current: %s' % (expected_val, val)
584
585        if mismatch:
586            logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch))
587        return mismatch
588
589    def _check_original_image_state(self):
590        """Compare the current cr50 state to the original state.
591
592        @return: A dictionary with the state that is wrong as the key and the
593                 new and old state as the value
594        """
595        mismatch = self._check_running_image_and_board_id(
596                self._original_image_state)
597        if not mismatch:
598            logging.info('The device is in the original state')
599        return mismatch
600
601    def _reset_ccd_settings(self):
602        """Reset the ccd lock and capability states."""
603        if not self.cr50.ccd_is_reset():
604            # Try to open cr50 and enable testlab mode if it isn't enabled.
605            try:
606                self.fast_ccd_open(True)
607            except:
608                # Even if we can't open cr50, do our best to reset the rest of
609                # the system state. Log a warning here.
610                logging.warning('Unable to Open cr50', exc_info=True)
611            self.cr50.ccd_reset(servo_en=False)
612            if not self.cr50.ccd_is_reset():
613                raise error.TestFail('Could not reset ccd')
614
615        current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING)
616        if self.original_ccd_settings != current_settings:
617            if not self.can_set_ccd_level:
618                raise error.TestError("CCD state has changed, but we can't "
619                                      "restore it")
620            self.fast_ccd_open(True)
621            self.cr50.set_caps(self.original_ccd_settings)
622
623        # First try using testlab open to open the device
624        if self.original_ccd_level == 'open':
625            self.fast_ccd_open(True)
626        elif self.original_ccd_level != self.cr50.get_ccd_level():
627            self.cr50.set_ccd_level(self.original_ccd_level)
628
629    def fast_ccd_open(self,
630                      enable_testlab=False,
631                      reset_ccd=True,
632                      dev_mode=False):
633        """Check for watchdog resets after opening ccd.
634
635        Args:
636            enable_testlab: If True, enable testlab mode after cr50 is open.
637            reset_ccd: If True, reset ccd after open.
638            dev_mode: True if the device should be in dev mode after ccd is
639                      is opened.
640        """
641        try:
642            super(Cr50Test, self).fast_ccd_open(enable_testlab, reset_ccd,
643                                                dev_mode)
644        except Exception as e:
645            # Check for cr50 watchdog resets.
646            self.cr50.check_for_console_errors('Fast ccd open')
647            raise
648
649    def cleanup(self):
650        """Attempt to cleanup the cr50 state. Then run firmware cleanup"""
651        try:
652            # Reset the password as the first thing in cleanup. It is important
653            # that if some other part of cleanup fails, the password has at
654            # least been reset.
655            # DO NOT PUT ANYTHING BEFORE THIS.
656            self._try_quick_ccd_cleanup()
657
658            self.servo.enable_main_servo_device()
659
660            self._try_to_bring_dut_up()
661            self._restore_cr50_state()
662
663            # Make sure the sarien EC isn't stuck in factory mode.
664            self._discharging_factory_mode_cleanup()
665        finally:
666            super(Cr50Test, self).cleanup()
667
668        # Check the logs captured during firmware_test cleanup for cr50 errors.
669        self.cr50.check_for_console_errors('Test Cleanup')
670        self.servo.allow_ccd_watchdog_for_test()
671
672    def _update_device_images_and_running_cr50_firmware(
673            self, state, release_path, prod_path, prepvt_path):
674        """Update cr50, set the board id, and copy firmware to the DUT.
675
676        @param state: A dictionary with the expected running version, board id,
677                      device cr50 firmware versions.
678        @param release_path: The image to update cr50 to
679        @param prod_path: The path to the .prod image
680        @param prepvt_path: The path to the .prepvt image
681        @raises TestError: if setting any state failed
682        """
683        mismatch = self._check_running_image_and_board_id(state)
684        if not mismatch:
685            logging.info('Nothing to do.')
686            return
687
688        # Use the DBG image to restore the original image.
689        if self._cleanup_required(mismatch, self.DBG_IMAGE):
690            self.update_cr50_image_and_board_id(release_path,
691                                                state['chip_bid'])
692
693        self._try_to_bring_dut_up()
694        new_mismatch = self._check_running_image_and_board_id(state)
695        # Copy the original .prod and .prepvt images back onto the DUT.
696        if (self._cleanup_required(new_mismatch, self.DEVICE_IMAGES)
697                    and filesystem_util.is_rootfs_writable(self.host)):
698            # Copy the .prod file onto the DUT.
699            if prod_path and 'prod_version' in new_mismatch:
700                cr50_utils.InstallImage(self.host, prod_path,
701                                        cr50_utils.CR50_PROD)
702            # Copy the .prepvt file onto the DUT.
703            if prepvt_path and 'prepvt_version' in new_mismatch:
704                cr50_utils.InstallImage(self.host, prepvt_path,
705                                        cr50_utils.CR50_PREPVT)
706
707        final_mismatch = self._check_running_image_and_board_id(state)
708        if final_mismatch:
709            raise error.TestError(
710                    'Could not update cr50 image state: %s' % final_mismatch)
711        logging.info('Successfully updated all device cr50 firmware state.')
712
713    def _restore_device_images_and_running_cr50_firmware(self):
714        """Restore the images on the device and the running cr50 image."""
715        if self._provision_update:
716            return
717        mismatch = self._check_original_image_state()
718        if not mismatch:
719            return
720        self._update_device_images_and_running_cr50_firmware(
721                self._original_image_state,
722                self.get_saved_cr50_original_path(), self._device_prod_image,
723                self._device_prepvt_image)
724
725        if self._raise_error_on_mismatch and mismatch:
726            raise error.TestError('Unexpected state mismatch during '
727                                  'cleanup %s' % mismatch)
728
729    def _try_quick_ccd_cleanup(self):
730        """Try to clear all ccd state."""
731        # This is just a first pass at cleanup. Don't raise any errors.
732        try:
733            self.cr50.ccd_enable()
734        except Exception as e:
735            logging.warning('Ignored exception enabling ccd %r', str(e))
736        self.cr50.send_command('ccd testlab open')
737        self.cr50.send_command('rddkeepalive disable')
738        self.cr50.ccd_reset()
739        self.cr50.send_command('wp follow_batt_pres atboot')
740
741    def _restore_ccd_settings(self):
742        """Restore the original ccd state."""
743        self._try_quick_ccd_cleanup()
744
745        # Reboot cr50 if the console is accessible. This will reset most state.
746        if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]:
747            self.cr50.reboot()
748
749        # Reenable servo v4 CCD
750        self.cr50.ccd_enable()
751
752        # reboot to normal mode if the device is in dev mode.
753        self.enter_mode_after_checking_cr50_state('normal')
754
755        self._try_to_bring_dut_up()
756        tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True)
757        self.clear_fwmp()
758
759        # Restore the ccd privilege level
760        self._reset_ccd_settings()
761
762    def _restore_cr50_state(self):
763        """Restore cr50 state, so the device can be used for further testing.
764
765        Restore the cr50 image and board id first. Then CCD, because flashing
766        dev signed images completely clears the CCD state.
767        """
768        try:
769            self._restore_device_images_and_running_cr50_firmware()
770        except Exception as e:
771            logging.warning('Issue restoring Cr50 image: %s', str(e))
772            raise
773        finally:
774            self._restore_ccd_settings()
775
776    def find_cr50_gs_image(self, gsurl):
777        """Find the cr50 gs image name.
778
779        @param gsurl: the cr50 image location
780        @return: a list of the gsutil bucket, filename or None if the file
781                 can't be found
782        """
783        try:
784            return utils.gs_ls(gsurl)[0].rsplit('/', 1)
785        except error.CmdError:
786            logging.info('%s does not exist', gsurl)
787            return None
788
789    def _extract_cr50_image(self, archive, fn):
790        """Extract the filename from the given archive
791        Aargs:
792            archive: the archive location on the host
793            fn: the file to extract
794
795        Returns:
796            The location of the extracted file
797        """
798        remote_dir = os.path.dirname(archive)
799        result = self.host.run('tar xfv %s -C %s' % (archive, remote_dir))
800        for line in result.stdout.splitlines():
801            if os.path.basename(line) == fn:
802                return os.path.join(remote_dir, line)
803        raise error.TestFail('%s was not extracted from %s' % (fn, archive))
804
805    def download_cr50_gs_file(self, gsurl, extract_fn):
806        """Download and extract the file at gsurl.
807
808        @param gsurl: The gs url for the cr50 image
809        @param extract_fn: The name of the file to extract from the cr50 image
810                        tarball. Don't extract anything if extract_fn is None.
811        @return: a tuple (local path, host path)
812        """
813        file_info = self.find_cr50_gs_image(gsurl)
814        if not file_info:
815            raise error.TestFail('Could not find %s' % gsurl)
816        bucket, fn = file_info
817
818        remote_temp_dir = '/tmp/'
819        src = os.path.join(remote_temp_dir, fn)
820        dest = os.path.join(self.resultsdir, fn)
821
822        # Copy the image to the dut
823        gsutil_wrapper.copy_private_bucket(
824                host=self.host,
825                bucket=bucket,
826                filename=fn,
827                destination=remote_temp_dir)
828        if extract_fn:
829            src = self._extract_cr50_image(src, extract_fn)
830            logging.info('extracted %s', src)
831            # Remove .tbz2 from the local path.
832            dest = os.path.splitext(dest)[0]
833
834        self.host.get_file(src, dest)
835        return dest, src
836
837    def download_cr50_gs_image(self, gsurl, extract_fn, image_bid):
838        """Get the image from gs and save it in the autotest dir.
839
840        @param gsurl: The gs url for the cr50 image
841        @param extract_fn: The name of the file to extract from the cr50 image
842                        tarball. Don't extract anything if extract_fn is None.
843        @param image_bid: the image symbolic board id
844        @return: A tuple with the local path and version
845        """
846        dest, src = self.download_cr50_gs_file(gsurl, extract_fn)
847        ver = cr50_utils.GetBinVersion(self.host, src)
848
849        # Compare the image board id to the downloaded image to make sure we got
850        # the right file
851        downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True)
852        if image_bid and image_bid != downloaded_bid:
853            raise error.TestError(
854                    'Could not download image with matching '
855                    'board id wanted %s got %s' % (image_bid, downloaded_bid))
856        return dest, ver
857
858    def download_cr50_eraseflashinfo_image(self):
859        """download the cr50 image that allows erasing flashinfo.
860
861        Get the file with the matching devid.
862
863        @return: A tuple with the debug image local path and version
864        """
865        devid = self._devid.replace(' ', '-').replace('0x', '')
866        gsurl = os.path.join(self.GS_PRIVATE_DBG,
867                             self.CR50_ERASEFLASHINFO_FILE % devid)
868        return self.download_cr50_gs_image(gsurl, None, None)
869
870    def download_cr50_debug_image(self, devid='', image_bid=''):
871        """download the cr50 debug file.
872
873        Get the file with the matching devid and image board id info
874
875        @param image_bid: the image board id info string or list
876        @return: A tuple with the debug image local path and version
877        """
878        bid_ext = ''
879        # Add the image bid string to the filename
880        if image_bid:
881            image_bid = cr50_utils.GetBoardIdInfoString(
882                    image_bid, symbolic=True)
883            bid_ext = '.' + image_bid.replace(':', '_')
884
885        devid = devid if devid else self._devid
886        dbg_file = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'), bid_ext)
887        gsurl = os.path.join(self.GS_PRIVATE_DBG, dbg_file)
888        return self.download_cr50_gs_image(gsurl, None, image_bid)
889
890    def download_cr50_tot_image(self):
891        """download the cr50 TOT image.
892
893        @return: the local path to the TOT image.
894        """
895        # TODO(mruthven): use logic from provision_Cr50TOT
896        raise error.TestNAError('Could not download TOT image')
897
898    def _find_release_image_gsurl(self, fn):
899        """Find the gs url for the release image"""
900        for gsbucket in [self.GS_PUBLIC, self.GS_PRIVATE_PROD]:
901            gsurl = os.path.join(gsbucket, fn)
902            if self.find_cr50_gs_image(gsurl):
903                return gsurl
904        raise error.TestFail('%s is not on google storage' % fn)
905
906    def download_cr50_release_image(self, image_rw, image_bid=''):
907        """download the cr50 release file.
908
909        Get the file with the matching version and image board id info
910
911        @param image_rw: the rw version string
912        @param image_bid: the image board id info string or list
913        @return: A tuple with the release image local path and version
914        """
915        bid_ext = ''
916        # Add the image bid string to the gsurl
917        if image_bid:
918            image_bid = cr50_utils.GetBoardIdInfoString(
919                    image_bid, symbolic=True)
920            bid_ext = '_' + image_bid.replace(':', '_')
921        release_fn = self.CR50_PROD_FILE % (image_rw, bid_ext)
922        gsurl = self._find_release_image_gsurl(release_fn)
923
924        # Release images can be found using the rw version
925        # Download the image
926        dest, ver = self.download_cr50_gs_image(gsurl, 'cr50.bin.prod',
927                                                image_bid)
928
929        # Compare the rw version and board id info to make sure the right image
930        # was found
931        if image_rw != ver[1]:
932            raise error.TestError('Could not download image with matching '
933                                  'rw version')
934        return dest, ver
935
936    def _cr50_verify_update(self, expected_rw, expect_rollback):
937        """Verify the expected version is running on cr50.
938
939        @param expected_rw: The RW version string we expect to be running
940        @param expect_rollback: True if cr50 should have rolled back during the
941                                update
942        @raise TestFail: if there is any unexpected update state
943        """
944        errors = []
945        running_rw = self.cr50.get_version()
946        if expected_rw != running_rw:
947            errors.append('running %s not %s' % (running_rw, expected_rw))
948
949        if expect_rollback != self.cr50.rolledback():
950            errors.append('%srollback detected' %
951                          'no ' if expect_rollback else '')
952        if len(errors):
953            raise error.TestFail('cr50_update failed: %s' % ', '.join(errors))
954        logging.info('RUNNING %s after %s', expected_rw,
955                     'rollback' if expect_rollback else 'update')
956
957    def _cr50_run_update(self, path):
958        """Install the image at path onto cr50.
959
960        @param path: the location of the image to update to
961        @return: the rw version of the image
962        """
963        tmp_dest = '/tmp/' + os.path.basename(path)
964
965        # Make sure the dut is sshable before installing the image.
966        self._try_to_bring_dut_up()
967
968        dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest)
969        # Use the -p option to make sure the DUT does a clean reboot.
970        cr50_utils.GSCTool(self.host, ['-a', dest, '-p'])
971        # Reboot the DUT to finish the cr50 update.
972        self.host.reboot(wait=False)
973        return image_ver[1]
974
975    def cr50_update(self, path, rollback=False, expect_rollback=False):
976        """Attempt to update to the given image.
977
978        If rollback is True, we assume that cr50 is already running an image
979        that can rollback.
980
981        @param path: the location of the update image
982        @param rollback: True if we need to force cr50 to rollback to update to
983                         the given image
984        @param expect_rollback: True if cr50 should rollback on its own
985        @raise TestFail: if the update failed
986        """
987        original_rw = self.cr50.get_version()
988
989        # Cr50 is going to reject an update if it hasn't been up for more than
990        # 60 seconds. Wait until that passes before trying to run the update.
991        self.cr50.wait_until_update_is_allowed()
992
993        image_rw = self._cr50_run_update(path)
994
995        # Running the update may cause cr50 to reboot. Wait for that before
996        # sending more commands. The reboot should happen quickly.
997        self.cr50.wait_for_reboot(
998                timeout=self.faft_config.gsc_update_wait_for_reboot)
999
1000        if rollback:
1001            self.cr50.rollback()
1002
1003        expected_rw = original_rw if expect_rollback else image_rw
1004        # If we expect a rollback, the version should remain unchanged
1005        self._cr50_verify_update(expected_rw, rollback or expect_rollback)
1006
1007    def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error):
1008        """Run a gsctool command and input the password
1009
1010        @param password: The cr50 password string
1011        @param cmd: The gsctool command
1012        @param name: The name to give the job
1013        @param expect_error: True if the command should fail
1014        """
1015        logging.info('Running: %s', cmd)
1016        logging.info('Password: %s', password)
1017        # Make sure the test waits long enough to avoid ccd rate limiting.
1018        time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT)
1019        full_cmd = "echo -e '%s\n%s\n' | %s" % (password, password, cmd)
1020        result = self.host.run(full_cmd, ignore_status=expect_error)
1021        if result.exit_status:
1022            message = ('gsctool %s failed using %r: %s %s' %
1023                       (name, password, result.exit_status, result.stderr))
1024            if expect_error:
1025                logging.info(message)
1026            else:
1027                raise error.TestFail(message)
1028        elif expect_error:
1029            raise error.TestFail('%s with %r did not fail when expected' %
1030                                 (name, password))
1031        else:
1032            logging.info('ran %s password command: %r', name, result.stdout)
1033
1034    def set_ccd_password(self, password, expect_error=False):
1035        """Set the ccd password"""
1036        # Testlab mode can't be enabled if there is no power button, so we
1037        # shouldn't allow setting the password.
1038        if not self.faft_config.has_powerbutton:
1039            raise error.TestError('No power button')
1040
1041        # If for some reason the test sets a password and is interrupted before
1042        # we can clear it, we want testlab mode to be enabled, so it's possible
1043        # to clear the password without knowing it.
1044        if not self.cr50.testlab_is_on():
1045            raise error.TestError('Will not set password unless testlab mode '
1046                                  'is enabled.')
1047        try:
1048            self.run_gsctool_cmd_with_password(password, 'gsctool -a -P',
1049                                               'set_password', expect_error)
1050        finally:
1051            logging.info('Cr50 password is %s',
1052                         'cleared' if self.cr50.password_is_reset() else 'set')
1053
1054    def ccd_unlock_from_ap(self, password=None, expect_error=False):
1055        """Unlock cr50"""
1056        if not password:
1057            self.host.run('gsctool -a -U')
1058            return
1059        self.run_gsctool_cmd_with_password(password, 'gsctool -a -U', 'unlock',
1060                                           expect_error)
1061
1062    def tpm_is_responsive(self):
1063        """Check TPM responsiveness by running tpm_version."""
1064        result = self.host.run('tpm_version', ignore_status=True)
1065        logging.debug(result.stdout.strip())
1066        return not result.exit_status
1067