1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import pprint 8 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils 12from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper 13from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 14 15 16class Cr50Test(FirmwareTest): 17 """ 18 Base class that sets up helper objects/functions for cr50 tests. 19 """ 20 version = 1 21 22 CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/' 23 CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin' 24 CR50_PROD_FILE = 'cr50.%s.bin.prod' 25 NONE = 0 26 # Saved the original device state during init. 27 INITIAL_STATE = 1 << 0 28 # Saved the original image, the device image, and the debug image. These 29 # images are needed to be able to restore the original image and board id. 30 IMAGES = 1 << 1 31 32 def initialize(self, host, cmdline_args, restore_cr50_state=False, 33 cr50_dev_path='', provision_update=False): 34 self._saved_state = self.NONE 35 self._raise_error_on_mismatch = not restore_cr50_state 36 self._provision_update = provision_update 37 super(Cr50Test, self).initialize(host, cmdline_args) 38 39 if not hasattr(self, 'cr50'): 40 raise error.TestNAError('Test can only be run on devices with ' 41 'access to the Cr50 console') 42 43 self.host = host 44 self._save_original_state() 45 # We successfully saved the device state 46 self._saved_state |= self.INITIAL_STATE 47 try: 48 self._save_node_locked_dev_image(cr50_dev_path) 49 self._save_original_images() 50 # We successfully saved the device images 51 self._saved_state |= self.IMAGES 52 except: 53 if restore_cr50_state: 54 raise 55 56 57 def _save_node_locked_dev_image(self, cr50_dev_path): 58 """Save or download the node locked dev image. 59 60 Args: 61 cr50_dev_path: The path to the node locked cr50 image. 62 """ 63 if os.path.isfile(cr50_dev_path): 64 self._node_locked_cr50_image = cr50_dev_path 65 else: 66 devid = self.servo.get('cr50_devid') 67 self._node_locked_cr50_image = self.download_cr50_debug_image( 68 devid)[0] 69 70 71 def _save_original_images(self): 72 """Use the saved state to find all of the device images. 73 74 This will download running cr50 image and the device image. 75 """ 76 # Copy the prod and prepvt images from the DUT 77 _, prod_rw, prod_bid = self._original_state['device_prod_ver'] 78 filename = 'prod_device_image_' + prod_rw 79 self._device_prod_image = os.path.join(self.resultsdir, 80 filename) 81 self.host.get_file(cr50_utils.CR50_PROD, 82 self._device_prod_image) 83 84 if cr50_utils.HasPrepvtImage(self.host): 85 _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver'] 86 filename = 'prepvt_device_image_' + prepvt_rw 87 self._device_prepvt_image = os.path.join(self.resultsdir, 88 filename) 89 self.host.get_file(cr50_utils.CR50_PREPVT, 90 self._device_prepvt_image) 91 prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid) 92 else: 93 self._device_prepvt_image = None 94 prepvt_rw = None 95 prepvt_bid = None 96 97 # If the running cr50 image version matches the image on the DUT use 98 # the DUT image as the original image. If the versions don't match 99 # download the image from google storage 100 _, running_rw, running_bid = self.get_saved_cr50_original_version() 101 102 # Make sure prod_bid and running_bid are in the same format 103 prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid) 104 running_bid = cr50_utils.GetBoardIdInfoString(running_bid) 105 if running_rw == prod_rw and running_bid == prod_bid: 106 logging.info('Using device cr50 prod image %s %s', prod_rw, 107 prod_bid) 108 self._original_cr50_image = self._device_prod_image 109 elif running_rw == prepvt_rw and running_bid == prepvt_bid: 110 logging.info('Using device cr50 prepvt image %s %s', prepvt_rw, 111 prepvt_bid) 112 self._original_cr50_image = self._device_prepvt_image 113 else: 114 logging.info('Downloading cr50 image %s %s', running_rw, 115 running_bid) 116 self._original_cr50_image = self.download_cr50_release_image( 117 running_rw, running_bid)[0] 118 119 120 def _save_original_state(self): 121 """Save the cr50 related state. 122 123 Save the device's current cr50 version, cr50 board id, rlz, and image 124 at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to 125 restore the state during cleanup. 126 """ 127 self._original_state = self.get_cr50_device_state() 128 129 130 def get_saved_cr50_original_version(self): 131 """Return (ro ver, rw ver, bid)""" 132 if ('running_ver' not in self._original_state or 'cr50_image_bid' not in 133 self._original_state): 134 raise error.TestError('No record of original cr50 image version') 135 return (self._original_state['running_ver'][0], 136 self._original_state['running_ver'][1], 137 self._original_state['cr50_image_bid']) 138 139 140 def get_saved_cr50_original_path(self): 141 """Return the local path for the original cr50 image""" 142 if not hasattr(self, '_original_cr50_image'): 143 raise error.TestError('No record of original image') 144 return self._original_cr50_image 145 146 147 def has_saved_cr50_dev_path(self): 148 """Returns true if we saved the node locked debug image""" 149 return hasattr(self, '_node_locked_cr50_image') 150 151 152 def get_saved_cr50_dev_path(self): 153 """Return the local path for the cr50 dev image""" 154 if not self.has_saved_cr50_dev_path(): 155 raise error.TestError('No record of debug image') 156 return self._node_locked_cr50_image 157 158 159 def _restore_original_image(self, chip_bid, chip_flags): 160 """Restore the cr50 image and erase the state. 161 162 Make 3 attempts to update to the original image. Use a rollback from 163 the DBG image to erase the state that can only be erased by a DBG image. 164 Set the chip board id during rollback 165 166 Args: 167 chip_bid: the integer representation of chip board id or None if the 168 board id should be erased 169 chip_flags: the integer representation of chip board id flags or 170 None if the board id should be erased 171 """ 172 for i in range(3): 173 try: 174 # Update to the node-locked DBG image so we can erase all of 175 # the state we are trying to reset 176 self.cr50_update(self._node_locked_cr50_image) 177 178 # Rollback to the original cr50 image. 179 self.cr50_update(self._original_cr50_image, rollback=True, 180 chip_bid=chip_bid, chip_flags=chip_flags) 181 break 182 except Exception, e: 183 logging.warning('Failed to restore original image attempt %d: ' 184 '%r', i, e) 185 186 187 def rootfs_verification_disable(self): 188 """Remove rootfs verification""" 189 if not self._rootfs_verification_is_disabled(): 190 logging.debug('Removing rootfs verification.') 191 self.rootfs_tool.enable() 192 193 194 def _rootfs_verification_is_disabled(self): 195 """Returns true if rootfs verification is enabled""" 196 # Clear the TPM owner before trying to check rootfs verification 197 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 198 self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool() 199 self.rootfs_tool.initialize(self.host) 200 # rootfs_tool.is_enabled is True, that means rootfs verification is 201 # disabled. 202 return self.rootfs_tool.is_enabled() 203 204 205 def _restore_original_state(self): 206 """Restore the original cr50 related device state""" 207 if not (self._saved_state & self.IMAGES): 208 logging.warning('Did not save the original images. Cannot restore ' 209 'state') 210 return 211 # Remove the prepvt image if the test installed one. 212 if (not self._original_state['has_prepvt'] and 213 cr50_utils.HasPrepvtImage(self.host)): 214 self.host.run('rm %s' % cr50_utils.CR50_PREPVT) 215 # If rootfs verification has been disabled, copy the cr50 device image 216 # back onto the DUT. 217 if self._rootfs_verification_is_disabled(): 218 cr50_utils.InstallImage(self.host, self._device_prod_image, 219 cr50_utils.CR50_PROD) 220 # Install the prepvt image if there was one. 221 if self._device_prepvt_image: 222 cr50_utils.InstallImage(self.host, self._device_prepvt_image, 223 cr50_utils.CR50_PREPVT) 224 225 chip_bid_info = self._original_state['chip_bid'] 226 bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID 227 chip_bid = None if bid_is_erased else chip_bid_info[0] 228 chip_flags = None if bid_is_erased else chip_bid_info[2] 229 # Update to the original image and erase the board id 230 self._restore_original_image(chip_bid, chip_flags) 231 232 # Set the RLZ code 233 cr50_utils.SetRLZ(self.host, self._original_state['rlz']) 234 235 # Verify everything is still the same 236 mismatch = self._check_original_state() 237 if mismatch: 238 raise error.TestError('Could not restore state: %s' % mismatch) 239 240 logging.info('Successfully restored the original cr50 state') 241 242 243 def get_cr50_device_state(self): 244 """Get a dict with the current device cr50 information 245 246 The state dict will include the platform brand, rlz code, chip board id, 247 the running cr50 image version, the running cr50 image board id, and the 248 device cr50 image version. 249 """ 250 state = {} 251 state['mosys platform brand'] = self.host.run('mosys platform brand', 252 ignore_status=True).stdout.strip() 253 state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host, 254 cr50_utils.CR50_PROD) 255 state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host) 256 if state['has_prepvt']: 257 state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host, 258 cr50_utils.CR50_PREPVT) 259 else: 260 state['device_prepvt_ver'] = None 261 state['rlz'] = cr50_utils.GetRLZ(self.host) 262 state['chip_bid'] = cr50_utils.GetChipBoardId(self.host) 263 state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid'] 264 state['running_ver'] = cr50_utils.GetRunningVersion(self.host) 265 state['cr50_image_bid'] = self.cr50.get_active_board_id_str() 266 267 logging.debug('Current Cr50 state:\n%s', pprint.pformat(state)) 268 return state 269 270 271 def _check_original_state(self): 272 """Compare the current cr50 state to the original state 273 274 Returns: 275 A dictionary with the state that is wrong as the key and 276 the new and old state as the value 277 """ 278 if not (self._saved_state & self.INITIAL_STATE): 279 logging.warning('Did not save the original state. Cannot verify it ' 280 'matches') 281 return 282 # Make sure the /var/cache/cr50* state is up to date. 283 cr50_utils.ClearUpdateStateAndReboot(self.host) 284 285 mismatch = {} 286 new_state = self.get_cr50_device_state() 287 288 for k, new_val in new_state.iteritems(): 289 original_val = self._original_state[k] 290 if new_val != original_val: 291 mismatch[k] = 'old: %s, new: %s' % (original_val, new_val) 292 293 if mismatch: 294 logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch)) 295 else: 296 logging.info('The device is in the original state') 297 return mismatch 298 299 300 def cleanup(self): 301 """Make sure the device state is the same as the start of the test""" 302 state_mismatch = self._check_original_state() 303 304 if state_mismatch and not self._provision_update: 305 self._restore_original_state() 306 if self._raise_error_on_mismatch: 307 raise error.TestError('Unexpected state mismatch during ' 308 'cleanup %s' % state_mismatch) 309 super(Cr50Test, self).cleanup() 310 311 312 def find_cr50_gs_image(self, filename, image_type=None): 313 """Find the cr50 gs image name 314 315 Args: 316 filename: the cr50 filename to match to 317 image_type: release or debug. If it is not specified we will search 318 both the release and debug directories 319 Returns: 320 a tuple of the gsutil bucket, filename 321 """ 322 gs_url = self.CR50_GS_URL % (image_type if image_type else '*') 323 gs_filename = os.path.join(gs_url, filename) 324 bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1) 325 return bucket, gs_filename 326 327 328 def download_cr50_gs_image(self, filename, image_bid='', bucket=None, 329 image_type=None): 330 """Get the image from gs and save it in the autotest dir 331 332 Args: 333 filename: The cr50 image basename 334 image_bid: the board id info list or string. It will be added to the 335 filename. 336 bucket: The gs bucket name 337 image_type: 'debug' or 'release'. This will be used to determine 338 the bucket if the bucket is not given. 339 Returns: 340 A tuple with the local path and version 341 """ 342 # Add the image bid string to the filename 343 if image_bid: 344 bid_str = cr50_utils.GetBoardIdInfoString(image_bid, 345 symbolic=True) 346 filename += '.' + bid_str.replace(':', '_') 347 348 if not bucket: 349 bucket, filename = self.find_cr50_gs_image(filename, image_type) 350 351 remote_temp_dir = '/tmp/' 352 src = os.path.join(remote_temp_dir, filename) 353 dest = os.path.join(self.resultsdir, filename) 354 355 # Copy the image to the dut 356 gsutil_wrapper.copy_private_bucket(host=self.host, 357 bucket=bucket, 358 filename=filename, 359 destination=remote_temp_dir) 360 361 self.host.get_file(src, dest) 362 ver = cr50_utils.GetBinVersion(self.host, src) 363 364 # Compare the image board id to the downloaded image to make sure we got 365 # the right file 366 downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True) 367 if image_bid and bid_str != downloaded_bid: 368 raise error.TestError('Could not download image with matching ' 369 'board id wanted %s got %s' % (bid_str, 370 downloaded_bid)) 371 return dest, ver 372 373 374 def download_cr50_debug_image(self, devid, image_bid=''): 375 """download the cr50 debug file 376 377 Get the file with the matching devid and image board id info 378 379 Args: 380 devid: the cr50_devid string '${DEVID0} ${DEVID1}' 381 image_bid: the image board id info string or list 382 Returns: 383 A tuple with the debug image local path and version 384 """ 385 # Debug images are node locked with the devid. Add the devid to the 386 # filename 387 filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_')) 388 389 # Download the image 390 dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid, 391 image_type='debug') 392 393 return dest, ver 394 395 396 def download_cr50_release_image(self, rw_ver, image_bid=''): 397 """download the cr50 release file 398 399 Get the file with the matching version and image board id info 400 401 Args: 402 rw_ver: the rw version string 403 image_bid: the image board id info string or list 404 Returns: 405 A tuple with the release image local path and version 406 """ 407 # Release images can be found using the rw version 408 filename = self.CR50_PROD_FILE % rw_ver 409 410 # Download the image 411 dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid, 412 image_type='release') 413 414 # Compare the rw version and board id info to make sure the right image 415 # was found 416 if rw_ver != ver[1]: 417 raise error.TestError('Could not download image with matching ' 418 'rw version') 419 return dest, ver 420 421 422 def _cr50_verify_update(self, expected_ver, expect_rollback): 423 """Verify the expected version is running on cr50 424 425 Args: 426 expect_ver: The RW version string we expect to be running 427 expect_rollback: True if cr50 should have rolled back during the 428 update 429 430 Raises: 431 TestFail if there is any unexpected update state 432 """ 433 errors = [] 434 running_ver = self.cr50.get_version() 435 if expected_ver != running_ver: 436 errors.append('running %s not %s' % (running_ver, expected_ver)) 437 438 if expect_rollback != self.cr50.rolledback(): 439 errors.append('%srollback detected' % 440 'no ' if expect_rollback else '') 441 if len(errors): 442 raise error.TestFail('cr50_update failed: %s' % ', '.join(errors)) 443 logging.info('RUNNING %s after %s', expected_ver, 444 'rollback' if expect_rollback else 'update') 445 446 447 def _cr50_run_update(self, path): 448 """Install the image at path onto cr50 449 450 Args: 451 path: the location of the image to update to 452 453 Returns: 454 the rw version of the image 455 """ 456 tmp_dest = '/tmp/' + os.path.basename(path) 457 458 dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest) 459 cr50_utils.UsbUpdater(self.host, ['-a', dest]) 460 return image_ver[1] 461 462 463 def cr50_update(self, path, rollback=False, erase_nvmem=False, 464 expect_rollback=False, chip_bid=None, chip_flags=None): 465 """Attempt to update to the given image. 466 467 If rollback is True, we assume that cr50 is already running an image 468 that can rollback. 469 470 Args: 471 path: the location of the update image 472 rollback: True if we need to force cr50 to rollback to update to 473 the given image 474 erase_nvmem: True if we need to erase nvmem during rollback 475 expect_rollback: True if cr50 should rollback on its own 476 chip_bid: the integer representation of chip board id or None if the 477 board id should be erased during rollback 478 chip_flags: the integer representation of chip board id flags or 479 None if the board id should be erased during rollback 480 481 Raises: 482 TestFail if the update failed 483 """ 484 original_ver = self.cr50.get_version() 485 486 # Cr50 is going to reject an update if it hasn't been up for more than 487 # 60 seconds. Wait until that passes before trying to run the update. 488 self.cr50.wait_until_update_is_allowed() 489 490 rw_ver = self._cr50_run_update(path) 491 492 # Running the update may cause cr50 to reboot. Wait for that before 493 # sending more commands. The reboot should happen quickly. Wait a 494 # maximum of 10 seconds. 495 self.cr50.wait_for_reboot(10) 496 497 if erase_nvmem and rollback: 498 self.cr50.erase_nvmem() 499 500 if rollback: 501 self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags) 502 503 expected_ver = original_ver if expect_rollback else rw_ver 504 # If we expect a rollback, the version should remain unchanged 505 self._cr50_verify_update(expected_ver, rollback or expect_rollback) 506