1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import print_function 6 7import logging 8import os 9import pprint 10import StringIO 11import subprocess 12import time 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error, utils 16from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils 17from autotest_lib.server.cros import filesystem_util, gsutil_wrapper 18from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 19 20 21class Cr50Test(FirmwareTest): 22 """Base class that sets up helper objects/functions for cr50 tests.""" 23 version = 1 24 25 RELEASE_POOLS = ['faft-cr50-experimental'] 26 RESPONSE_TIMEOUT = 180 27 GS_PRIVATE = 'gs://chromeos-localmirror-private/distfiles/' 28 # Prod signed test images are stored in the private cr50 directory. 29 GS_PRIVATE_PROD = GS_PRIVATE + 'cr50/' 30 # Node locked test images are in this private debug directory. 31 GS_PRIVATE_DBG = GS_PRIVATE + 'chromeos-cr50-debug-0.0.11/' 32 GS_PUBLIC = 'gs://chromeos-localmirror/distfiles/' 33 CR50_PROD_FILE = 'cr50.r0.0.1*.w%s%s.tbz2' 34 CR50_DEBUG_FILE = '*/cr50.dbg.%s.bin.*%s' 35 CR50_ERASEFLASHINFO_FILE = ( 36 '*/cr50_Unknown_NodeLocked-%s_cr50-accessory-mp.bin') 37 CR50_QUAL_VERSION_FILE = 'chromeos-cr50-QUAL_VERSION' 38 NONE = 0 39 # Saved the original device state during init. 40 INITIAL_IMAGE_STATE = 1 << 0 41 # Saved the original image, the device image, and the debug image. These 42 # images are needed to be able to restore the original image and board id. 43 DEVICE_IMAGES = 1 << 1 44 DBG_IMAGE = 1 << 2 45 ERASEFLASHINFO_IMAGE = 1 << 3 46 # Different attributes of the device state require the test to download 47 # different images. STATE_IMAGE_RESTORES is a dictionary of the state each 48 # image type can restore. 49 STATE_IMAGE_RESTORES = { 50 DEVICE_IMAGES: ['prod_version', 'prepvt_version'], 51 DBG_IMAGE: ['running_image_ver', 'running_image_bid', 'chip_bid'], 52 ERASEFLASHINFO_IMAGE: ['chip_bid'], 53 } 54 PP_SHORT_INTERVAL = 3 55 # Cr50 may have flash operation errors during the test. Here's an example 56 # of one error message. 57 # do_flash_op:245 errors 20 fsh_pe_control 40720004 58 # The stuff after the ':' may change, but all flash operation errors 59 # contain do_flash_op. do_flash_op is only ever printed if there is an 60 # error during the flash operation. Just search for do_flash_op to simplify 61 # the search string and make it applicable to all flash op errors. 62 CR50_FLASH_OP_ERROR_MSG = 'do_flash_op' 63 # USB issues may show up with the timer sof calibration overflow interrupt. 64 # Count these during cleanup. 65 CR50_USB_ERROR = 'timer_sof_calibration_overflow_int' 66 67 def initialize(self, 68 host, 69 cmdline_args, 70 full_args, 71 restore_cr50_image=False, 72 restore_cr50_board_id=False, 73 provision_update=False): 74 self._saved_state = self.NONE 75 self._raise_error_on_mismatch = not restore_cr50_image 76 self._provision_update = provision_update 77 self.tot_test_run = full_args.get('tot_test_run', '').lower() == 'true' 78 super(Cr50Test, self).initialize(host, cmdline_args) 79 80 if not hasattr(self, 'cr50'): 81 raise error.TestNAError('Test can only be run on devices with ' 82 'access to the Cr50 console') 83 # TODO(b/149948314): remove when dual-v4 is sorted out. 84 if 'ccd_cr50' in self.servo.get_servo_version(): 85 self.servo.disable_ccd_watchdog_for_test() 86 87 logging.info('Test Args: %r', full_args) 88 89 self._devid = self.cr50.get_devid() 90 self.can_set_ccd_level = (not self.servo.main_device_is_ccd() 91 or self.cr50.testlab_is_on()) 92 self.original_ccd_level = self.cr50.get_ccd_level() 93 self.original_ccd_settings = self.cr50.get_cap_dict( 94 info=self.cr50.CAP_SETTING) 95 96 self.host = host 97 # SSH commands should complete within 3 minutes. Change the default, so 98 # it doesn't take half an hour for commands to timeout when the DUT is 99 # down. 100 self.host.set_default_run_timeout(180) 101 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 102 # Clear the FWMP, so it can't disable CCD. 103 self.clear_fwmp() 104 105 if self.can_set_ccd_level: 106 # Lock cr50 so the console will be restricted 107 self.cr50.set_ccd_level('lock') 108 elif self.original_ccd_level != 'lock': 109 raise error.TestNAError( 110 'Lock the console before running cr50 test') 111 112 self._save_original_state(full_args.get('release_path', '')) 113 114 # Try and download all images necessary to restore cr50 state. 115 try: 116 self._save_dbg_image(full_args.get('cr50_dbg_image_path', '')) 117 self._saved_state |= self.DBG_IMAGE 118 except Exception as e: 119 logging.warning('Error saving DBG image: %s', str(e)) 120 if restore_cr50_image: 121 raise error.TestNAError('Need DBG image: %s' % str(e)) 122 123 try: 124 self._save_eraseflashinfo_image( 125 full_args.get('cr50_eraseflashinfo_image_path', '')) 126 if self.cr50.uses_board_property('BOARD_EC_CR50_COMM_SUPPORT'): 127 raise error.TestError('Board cannot boot EFI image') 128 self._saved_state |= self.ERASEFLASHINFO_IMAGE 129 except Exception as e: 130 logging.warning('Error saving eraseflashinfo image: %s', str(e)) 131 if restore_cr50_board_id: 132 raise error.TestNAError( 133 'Need eraseflashinfo image: %s' % str(e)) 134 135 # TODO(b/143888583): remove qual update during init once new design to 136 # to provision cr50 updates is in place. 137 # Make sure the release image is running before starting the test. 138 is_release_qual = full_args.get('is_release_qual', 139 '').lower() == 'true' 140 if is_release_qual or self.running_cr50_release_suite(): 141 release_ver_arg = full_args.get('release_ver', '') 142 release_path_arg = full_args.get('release_path', '') 143 self.ensure_qual_image_is_running(release_ver_arg, 144 release_path_arg) 145 146 def running_cr50_release_suite(self): 147 """Return True if the DUT is in a release pool.""" 148 for pool in self.host.host_info_store.get().pools: 149 # TODO(b/149109740): remove once the pool values are verified. 150 # Change to run with faft-cr50 and faft-cr50-experimental suites. 151 logging.info('Checking pool: %s', pool) 152 if pool in self.RELEASE_POOLS: 153 logging.info('Running a release test.') 154 return True 155 return False 156 157 def ensure_qual_image_is_running(self, qual_ver_str, qual_path): 158 """Update to the qualification image if it's not running. 159 160 qual_ver_str and path are command line args that may be supplied to 161 specify a local version or path. If neither are supplied, the version 162 from gs will be used to determine what version cr50 should run. 163 164 qual_ver_str and qual_path should not be supplied together. If they are, 165 the path will be used. It's not a big deal as long as they agree with 166 each other. 167 168 @param qual_ver_str: qualification version string or None. 169 @param qual_path: local path to the qualification image or None. 170 """ 171 # Get the local image information. 172 if qual_path: 173 dest, qual_ver = cr50_utils.InstallImage(self.host, qual_path, 174 '/tmp/qual_cr50.bin') 175 self.host.run('rm ' + dest) 176 qual_bid_str = (cr50_utils.GetBoardIdInfoString( 177 qual_ver[2], False) if qual_ver[2] else '') 178 qual_ver_str = '%s/%s' % (qual_ver[1], qual_bid_str) 179 180 # Determine the qualification version from. 181 if not qual_ver_str: 182 gsurl = os.path.join(self.GS_PRIVATE, self.CR50_QUAL_VERSION_FILE) 183 dut_path = self.download_cr50_gs_file(gsurl, False)[1] 184 qual_ver_str = self.host.run('cat ' + dut_path).stdout.strip() 185 186 # Download the qualification image based on the version. 187 if not qual_path: 188 rw, bid = qual_ver_str.split('/') 189 qual_path, qual_ver = self.download_cr50_release_image(rw, bid) 190 191 logging.info('Cr50 Qual Version: %s', qual_ver_str) 192 logging.info('Cr50 Qual Path: %s', qual_path) 193 qual_chip_bid = cr50_utils.GetChipBIDFromImageBID( 194 qual_ver[2], self.get_device_brand()) 195 logging.info('Cr50 Qual Chip BID: %s', qual_chip_bid) 196 197 # Replace only the prod or prepvt image based on the major version. 198 if int(qual_ver[1].split('.')[1]) % 2: 199 prod_ver = qual_ver 200 prepvt_ver = self._original_image_state['prepvt_version'] 201 prod_path = qual_path 202 prepvt_path = self._device_prepvt_image 203 else: 204 prod_ver = self._original_image_state['prod_version'] 205 prepvt_ver = qual_ver 206 prod_path = self._device_prod_image 207 prepvt_path = qual_path 208 209 # Generate a dictionary with all of the expected state. 210 qual_state = {} 211 qual_state['prod_version'] = prod_ver 212 qual_state['prepvt_version'] = prepvt_ver 213 qual_state['chip_bid'] = qual_chip_bid 214 qual_state['running_image_bid'] = qual_ver[2] 215 # The test can't rollback RO. The newest RO should be running at the end 216 # of the test. max_ro will be none if the versions are the same. Use the 217 # running_ro in that case. 218 running_ro = self.get_saved_cr50_original_version()[0] 219 max_ro = cr50_utils.GetNewestVersion(running_ro, qual_ver[0]) 220 qual_state['running_image_ver'] = (max_ro or running_ro, qual_ver[1], 221 None) 222 mismatch = self._check_running_image_and_board_id(qual_state) 223 if not mismatch: 224 logging.info('Running qual image. No update needed.') 225 return 226 logging.info('Cr50 qual update required.') 227 filesystem_util.make_rootfs_writable(self.host) 228 self._update_device_images_and_running_cr50_firmware( 229 qual_state, qual_path, prod_path, prepvt_path) 230 logging.info("Recording qual device state as 'original' device state") 231 self._save_original_state(qual_path) 232 233 def _saved_cr50_state(self, state): 234 """Returns True if the test has saved the given state 235 236 @param state: a integer representing the state to check. 237 """ 238 return state & self._saved_state 239 240 def after_run_once(self): 241 """Log which iteration just ran""" 242 logging.info('successfully ran iteration %d', self.iteration) 243 self._try_to_bring_dut_up() 244 245 def _save_dbg_image(self, cr50_dbg_image_path): 246 """Save or download the node locked dev image. 247 248 @param cr50_dbg_image_path: The path to the node locked cr50 image. 249 """ 250 if os.path.isfile(cr50_dbg_image_path): 251 self._dbg_image_path = cr50_dbg_image_path 252 else: 253 self._dbg_image_path = self.download_cr50_debug_image()[0] 254 255 def _save_eraseflashinfo_image(self, cr50_eraseflashinfo_image_path): 256 """Save or download the node locked eraseflashinfo image. 257 258 @param cr50_eraseflashinfo_image_path: The path to the node locked cr50 259 image. 260 """ 261 if os.path.isfile(cr50_eraseflashinfo_image_path): 262 self._eraseflashinfo_image_path = cr50_eraseflashinfo_image_path 263 else: 264 self._eraseflashinfo_image_path = ( 265 self.download_cr50_eraseflashinfo_image()[0]) 266 267 def _save_device_image(self, ext): 268 """Download the .prod or .prepvt device image and get the version. 269 270 @param ext: The Cr50 file extension: prod or prepvt. 271 @returns (local_path, rw_version, bid_string) or (None, None, None) if 272 the file doesn't exist on the DUT. 273 """ 274 version = self._original_image_state[ext + '_version'] 275 if not version: 276 return None, None, None 277 _, rw_ver, bid = version 278 rw_filename = 'cr50.device.bin.%s.%s' % (ext, rw_ver) 279 local_path = os.path.join(self.resultsdir, rw_filename) 280 dut_path = cr50_utils.GetDevicePath(ext) 281 self.host.get_file(dut_path, local_path) 282 bid = cr50_utils.GetBoardIdInfoString(bid) 283 return local_path, rw_ver, bid 284 285 def _save_original_images(self, release_path): 286 """Use the saved state to find all of the device images. 287 288 This will download running cr50 image and the device image. 289 290 @param release_path: The release path given by test args 291 """ 292 local_path, prod_rw, prod_bid = self._save_device_image('prod') 293 self._device_prod_image = local_path 294 295 local_path, prepvt_rw, prepvt_bid = self._save_device_image('prepvt') 296 self._device_prepvt_image = local_path 297 298 if os.path.isfile(release_path): 299 self._original_cr50_image = release_path 300 logging.info('using supplied image') 301 return 302 if self.tot_test_run: 303 self._original_cr50_image = self.download_cr50_tot_image() 304 return 305 306 # If the running cr50 image version matches the image on the DUT use 307 # the DUT image as the original image. If the versions don't match 308 # download the image from google storage 309 _, running_rw, running_bid = self.get_saved_cr50_original_version() 310 311 # Convert the running board id to the same format as the prod and 312 # prepvt board ids. 313 running_bid = cr50_utils.GetBoardIdInfoString(running_bid) 314 if running_rw == prod_rw and running_bid == prod_bid: 315 logging.info('Using device cr50 prod image %s %s', prod_rw, 316 prod_bid) 317 self._original_cr50_image = self._device_prod_image 318 elif running_rw == prepvt_rw and running_bid == prepvt_bid: 319 logging.info('Using device cr50 prepvt image %s %s', prepvt_rw, 320 prepvt_bid) 321 self._original_cr50_image = self._device_prepvt_image 322 else: 323 logging.info('Downloading cr50 image %s %s', running_rw, 324 running_bid) 325 self._original_cr50_image = self.download_cr50_release_image( 326 running_rw, running_bid)[0] 327 328 def _save_original_state(self, release_path): 329 """Save the cr50 related state. 330 331 Save the device's current cr50 version, cr50 board id, the running cr50 332 image, the prepvt, and prod cr50 images. These will be used to restore 333 the cr50 state during cleanup. 334 335 @param release_path: the optional command line arg of path for the local 336 cr50 image. 337 """ 338 self._saved_state &= ~self.INITIAL_IMAGE_STATE 339 self._original_image_state = self.get_image_and_bid_state() 340 # We successfully saved the device state 341 self._saved_state |= self.INITIAL_IMAGE_STATE 342 self._saved_state &= ~self.DEVICE_IMAGES 343 try: 344 self._save_original_images(release_path) 345 self._saved_state |= self.DEVICE_IMAGES 346 except Exception as e: 347 logging.warning('Error saving ChromeOS image cr50 firmware: %s', 348 str(e)) 349 350 def get_saved_cr50_original_version(self): 351 """Return (ro ver, rw ver, bid).""" 352 if ('running_image_ver' not in self._original_image_state 353 or 'running_image_bid' not in self._original_image_state): 354 raise error.TestError('No record of original cr50 image version') 355 return (self._original_image_state['running_image_ver'][0], 356 self._original_image_state['running_image_ver'][1], 357 self._original_image_state['running_image_bid']) 358 359 def get_saved_cr50_original_path(self): 360 """Return the local path for the original cr50 image.""" 361 if not hasattr(self, '_original_cr50_image'): 362 raise error.TestError('No record of original image') 363 return self._original_cr50_image 364 365 def has_saved_dbg_image_path(self): 366 """Returns true if we saved the node locked debug image.""" 367 return hasattr(self, '_dbg_image_path') 368 369 def get_saved_dbg_image_path(self): 370 """Return the local path for the cr50 dev image.""" 371 if not self.has_saved_dbg_image_path(): 372 raise error.TestError('No record of debug image') 373 return self._dbg_image_path 374 375 def get_saved_eraseflashinfo_image_path(self): 376 """Return the local path for the cr50 eraseflashinfo image.""" 377 if not hasattr(self, '_eraseflashinfo_image_path'): 378 raise error.TestError('No record of eraseflashinfo image') 379 return self._eraseflashinfo_image_path 380 381 def get_device_brand(self): 382 """Returns the 4 character device brand.""" 383 return self._original_image_state['cros_config / brand-code'] 384 385 def _retry_cr50_update(self, image, retries, rollback): 386 """Try to update to the given image retries amount of times. 387 388 @param image: The image path. 389 @param retries: The number of times to try to update. 390 @param rollback: Run rollback after the update. 391 @raises TestFail if the update failed. 392 """ 393 for i in range(retries): 394 try: 395 return self.cr50_update(image, rollback=rollback) 396 except Exception as e: 397 logging.warning('Failed to update to %s attempt %d: %s', 398 os.path.basename(image), i, str(e)) 399 logging.info('Sleeping 60 seconds') 400 time.sleep(60) 401 raise error.TestError( 402 'Failed to update to %s' % os.path.basename(image)) 403 404 def run_update_to_eraseflashinfo(self): 405 """Erase flashinfo using the eraseflashinfo image. 406 407 Update to the DBG image, rollback to the eraseflashinfo, and run 408 eraseflashinfo. 409 """ 410 self._retry_cr50_update(self._dbg_image_path, 3, False) 411 self._retry_cr50_update(self._eraseflashinfo_image_path, 3, True) 412 if not self.cr50.eraseflashinfo(): 413 raise error.TestError('Unable to erase the board id') 414 415 def eraseflashinfo_and_restore_image(self, image=''): 416 """eraseflashinfo and update to the given the image. 417 418 @param image: the image to end on. Use the original test image if no 419 image is given. 420 """ 421 image = image if image else self.get_saved_cr50_original_path() 422 self.run_update_to_eraseflashinfo() 423 self.cr50_update(image) 424 425 def update_cr50_image_and_board_id(self, image_path, bid): 426 """Set the chip board id and updating the cr50 image. 427 428 Make 3 attempts to update to the original image. Use a rollback from 429 the DBG image to erase the state that can only be erased by a DBG image. 430 Set the chip board id during rollback. 431 432 @param image_path: path of the image to update to. 433 @param bid: the board id to set. 434 """ 435 current_bid = cr50_utils.GetChipBoardId(self.host) 436 bid_mismatch = current_bid != bid 437 set_bid = bid_mismatch and bid != cr50_utils.ERASED_CHIP_BID 438 bid_is_erased = current_bid == cr50_utils.ERASED_CHIP_BID 439 eraseflashinfo = bid_mismatch and not bid_is_erased 440 441 if (eraseflashinfo 442 and not self._saved_cr50_state(self.ERASEFLASHINFO_IMAGE)): 443 raise error.TestFail('Did not save eraseflashinfo image') 444 445 # Remove prepvt and prod iamges, so they don't interfere with the test 446 # rolling back and updating to images that my be older than the images 447 # on the device. 448 if filesystem_util.is_rootfs_writable(self.host): 449 self.host.run('rm %s' % cr50_utils.CR50_PREPVT, ignore_status=True) 450 self.host.run('rm %s' % cr50_utils.CR50_PROD, ignore_status=True) 451 452 if eraseflashinfo: 453 self.run_update_to_eraseflashinfo() 454 455 self._retry_cr50_update(self._dbg_image_path, 3, False) 456 457 chip_bid = bid[0] 458 chip_flags = bid[2] 459 if set_bid: 460 self.cr50.set_board_id(chip_bid, chip_flags) 461 462 self._retry_cr50_update(image_path, 3, True) 463 464 def _cleanup_required(self, state_mismatch, image_type): 465 """Return True if the update can fix something in the mismatched state. 466 467 @param state_mismatch: a dictionary of the mismatched state. 468 @param image_type: The integer representing the type of image 469 """ 470 state_image_restores = set(self.STATE_IMAGE_RESTORES[image_type]) 471 restore = state_image_restores.intersection(state_mismatch.keys()) 472 if restore and not self._saved_cr50_state(image_type): 473 raise error.TestError( 474 'Did not save images to restore %s' % (', '.join(restore))) 475 return not not restore 476 477 def _get_image_information(self, ext): 478 """Get the image information for the .prod or .prepvt image. 479 480 @param ext: The extension string prod or prepvt 481 @param returns: The image version or None if the image doesn't exist. 482 """ 483 dut_path = cr50_utils.GetDevicePath(ext) 484 file_exists = self.host.path_exists(dut_path) 485 if file_exists: 486 return cr50_utils.GetBinVersion(self.host, dut_path) 487 return None 488 489 def get_image_and_bid_state(self): 490 """Get a dict with the current device cr50 information. 491 492 The state dict will include the platform brand, chip board id, the 493 running cr50 image version, the running cr50 image board id, and the 494 device cr50 image version. 495 """ 496 state = {} 497 state['cros_config / brand-code'] = self.host.run( 498 'cros_config / brand-code', ignore_status=True).stdout.strip() 499 state['prod_version'] = self._get_image_information('prod') 500 state['prepvt_version'] = self._get_image_information('prepvt') 501 state['chip_bid'] = cr50_utils.GetChipBoardId(self.host) 502 state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid'] 503 state['running_image_ver'] = cr50_utils.GetRunningVersion(self.host) 504 state['running_image_bid'] = self.cr50.get_active_board_id_str() 505 506 logging.debug('Current Cr50 state:\n%s', pprint.pformat(state)) 507 return state 508 509 def _check_running_image_and_board_id(self, expected_state): 510 """Compare the current image and board id to the given state. 511 512 @param expected_state: A dictionary of the state to compare to. 513 @return: A dictionary with the state that is wrong as the key and the 514 expected and current state as the value. 515 """ 516 if not (self._saved_state & self.INITIAL_IMAGE_STATE): 517 logging.warning( 518 'Did not save the original state. Cannot verify it ' 519 'matches') 520 return 521 # Make sure the /var/cache/cr50* state is up to date. 522 cr50_utils.ClearUpdateStateAndReboot(self.host) 523 524 mismatch = {} 525 state = self.get_image_and_bid_state() 526 527 for k, expected_val in expected_state.iteritems(): 528 val = state[k] 529 if val != expected_val: 530 mismatch[k] = 'expected: %s, current: %s' % (expected_val, val) 531 532 if mismatch: 533 logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch)) 534 return mismatch 535 536 def _check_original_image_state(self): 537 """Compare the current cr50 state to the original state. 538 539 @return: A dictionary with the state that is wrong as the key and the 540 new and old state as the value 541 """ 542 mismatch = self._check_running_image_and_board_id( 543 self._original_image_state) 544 if not mismatch: 545 logging.info('The device is in the original state') 546 return mismatch 547 548 def _reset_ccd_settings(self): 549 """Reset the ccd lock and capability states.""" 550 if not self.cr50.ccd_is_reset(): 551 # Try to open cr50 and enable testlab mode if it isn't enabled. 552 try: 553 self.fast_ccd_open(True) 554 except: 555 # Even if we can't open cr50, do our best to reset the rest of 556 # the system state. Log a warning here. 557 logging.warning('Unable to Open cr50', exc_info=True) 558 self.cr50.send_command('ccd reset') 559 if not self.cr50.ccd_is_reset(): 560 raise error.TestFail('Could not reset ccd') 561 562 current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING) 563 if self.original_ccd_settings != current_settings: 564 if not self.can_set_ccd_level: 565 raise error.TestError("CCD state has changed, but we can't " 566 "restore it") 567 self.fast_ccd_open(True) 568 self.cr50.set_caps(self.original_ccd_settings) 569 570 # First try using testlab open to open the device 571 if self.original_ccd_level == 'open': 572 self.fast_ccd_open(True) 573 elif self.original_ccd_level != self.cr50.get_ccd_level(): 574 self.cr50.set_ccd_level(self.original_ccd_level) 575 576 def cleanup(self): 577 """Attempt to cleanup the cr50 state. Then run firmware cleanup""" 578 try: 579 # Reset the password as the first thing in cleanup. It is important 580 # that if some other part of cleanup fails, the password has at 581 # least been reset. 582 # DO NOT PUT ANYTHING BEFORE THIS. 583 self._try_quick_ccd_cleanup() 584 585 self.servo.enable_main_servo_device() 586 587 self._try_to_bring_dut_up() 588 self._restore_cr50_state() 589 finally: 590 super(Cr50Test, self).cleanup() 591 592 # Check the logs captured during firmware_test cleanup for cr50 errors. 593 self._get_cr50_stats_from_uart_capture() 594 self.servo.allow_ccd_watchdog_for_test() 595 596 def _get_cr50_stats_from_uart_capture(self): 597 """Check cr50 uart output for errors. 598 599 Use the logs captured during firmware_test cleanup to check for cr50 600 errors. Flash operation issues aren't obvious unless you check the logs. 601 All flash op errors print do_flash_op and it isn't printed during normal 602 operation. Open the cr50 uart file and count the number of times this is 603 printed. Log the number of errors. 604 """ 605 cr50_uart_file = self.servo.get_uart_logfile('cr50') 606 if not cr50_uart_file: 607 logging.info('There is not a cr50 uart file') 608 return 609 610 flash_error_count = 0 611 usb_error_count = 0 612 with open(cr50_uart_file, 'r') as f: 613 for line in f: 614 if self.CR50_FLASH_OP_ERROR_MSG in line: 615 flash_error_count += 1 616 if self.CR50_USB_ERROR in line: 617 usb_error_count += 1 618 619 # Log any flash operation errors. 620 logging.info('do_flash_op count: %d', flash_error_count) 621 logging.info('usb error count: %d', usb_error_count) 622 623 def _update_device_images_and_running_cr50_firmware( 624 self, state, release_path, prod_path, prepvt_path): 625 """Update cr50, set the board id, and copy firmware to the DUT. 626 627 @param state: A dictionary with the expected running version, board id, 628 device cr50 firmware versions. 629 @param release_path: The image to update cr50 to 630 @param prod_path: The path to the .prod image 631 @param prepvt_path: The path to the .prepvt image 632 @raises TestError: if setting any state failed 633 """ 634 mismatch = self._check_running_image_and_board_id(state) 635 if not mismatch: 636 logging.info('Nothing to do.') 637 return 638 639 # Use the DBG image to restore the original image. 640 if self._cleanup_required(mismatch, self.DBG_IMAGE): 641 self.update_cr50_image_and_board_id(release_path, 642 state['chip_bid']) 643 644 new_mismatch = self._check_running_image_and_board_id(state) 645 # Copy the original .prod and .prepvt images back onto the DUT. 646 if (self._cleanup_required(new_mismatch, self.DEVICE_IMAGES) 647 and filesystem_util.is_rootfs_writable(self.host)): 648 # Copy the .prod file onto the DUT. 649 if prod_path and 'prod_version' in new_mismatch: 650 cr50_utils.InstallImage(self.host, prod_path, 651 cr50_utils.CR50_PROD) 652 # Copy the .prepvt file onto the DUT. 653 if prepvt_path and 'prepvt_version' in new_mismatch: 654 cr50_utils.InstallImage(self.host, prepvt_path, 655 cr50_utils.CR50_PREPVT) 656 657 final_mismatch = self._check_running_image_and_board_id(state) 658 if final_mismatch: 659 raise error.TestError( 660 'Could not update cr50 image state: %s' % final_mismatch) 661 logging.info('Successfully updated all device cr50 firmware state.') 662 663 def _restore_device_images_and_running_cr50_firmware(self): 664 """Restore the images on the device and the running cr50 image.""" 665 if self._provision_update: 666 return 667 mismatch = self._check_original_image_state() 668 if not mismatch: 669 return 670 self._update_device_images_and_running_cr50_firmware( 671 self._original_image_state, 672 self.get_saved_cr50_original_path(), self._device_prod_image, 673 self._device_prepvt_image) 674 675 if self._raise_error_on_mismatch and mismatch: 676 raise error.TestError('Unexpected state mismatch during ' 677 'cleanup %s' % mismatch) 678 679 def _try_quick_ccd_cleanup(self): 680 """Try to clear all ccd state.""" 681 # This is just a first pass at cleanup. Don't raise any errors. 682 try: 683 self.cr50.ccd_enable() 684 except Exception as e: 685 logging.warn('Ignored exception enabling ccd %r', str(e)) 686 self.cr50.send_command('ccd testlab open') 687 self.cr50.send_command('rddkeepalive disable') 688 self.cr50.send_command('ccd reset') 689 self.cr50.send_command('wp follow_batt_pres atboot') 690 691 def _restore_ccd_settings(self): 692 """Restore the original ccd state.""" 693 self._try_quick_ccd_cleanup() 694 695 # Reboot cr50 if the console is accessible. This will reset most state. 696 if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]: 697 self.cr50.reboot() 698 699 # Reenable servo v4 CCD 700 self.cr50.ccd_enable() 701 702 # reboot to normal mode if the device is in dev mode. 703 self.enter_mode_after_checking_cr50_state('normal') 704 705 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 706 self.clear_fwmp() 707 708 # Restore the ccd privilege level 709 self._reset_ccd_settings() 710 711 def _restore_cr50_state(self): 712 """Restore cr50 state, so the device can be used for further testing. 713 714 Restore the cr50 image and board id first. Then CCD, because flashing 715 dev signed images completely clears the CCD state. 716 """ 717 try: 718 self._restore_device_images_and_running_cr50_firmware() 719 except Exception as e: 720 logging.warning('Issue restoring Cr50 image: %s', str(e)) 721 raise 722 finally: 723 self._restore_ccd_settings() 724 725 def find_cr50_gs_image(self, gsurl): 726 """Find the cr50 gs image name. 727 728 @param gsurl: the cr50 image location 729 @return: a list of the gsutil bucket, filename or None if the file 730 can't be found 731 """ 732 try: 733 return utils.gs_ls(gsurl)[0].rsplit('/', 1) 734 except error.CmdError: 735 logging.info('%s does not exist', gsurl) 736 return None 737 738 def _extract_cr50_image(self, archive, fn): 739 """Extract the filename from the given archive 740 Aargs: 741 archive: the archive location on the host 742 fn: the file to extract 743 744 Returns: 745 The location of the extracted file 746 """ 747 remote_dir = os.path.dirname(archive) 748 result = self.host.run('tar xfv %s -C %s' % (archive, remote_dir)) 749 for line in result.stdout.splitlines(): 750 if os.path.basename(line) == fn: 751 return os.path.join(remote_dir, line) 752 raise error.TestFail('%s was not extracted from %s' % (fn, archive)) 753 754 def download_cr50_gs_file(self, gsurl, extract_fn): 755 """Download and extract the file at gsurl. 756 757 @param gsurl: The gs url for the cr50 image 758 @param extract_fn: The name of the file to extract from the cr50 image 759 tarball. Don't extract anything if extract_fn is None. 760 @return: a tuple (local path, host path) 761 """ 762 file_info = self.find_cr50_gs_image(gsurl) 763 if not file_info: 764 raise error.TestFail('Could not find %s' % gsurl) 765 bucket, fn = file_info 766 767 remote_temp_dir = '/tmp/' 768 src = os.path.join(remote_temp_dir, fn) 769 dest = os.path.join(self.resultsdir, fn) 770 771 # Copy the image to the dut 772 gsutil_wrapper.copy_private_bucket( 773 host=self.host, 774 bucket=bucket, 775 filename=fn, 776 destination=remote_temp_dir) 777 if extract_fn: 778 src = self._extract_cr50_image(src, extract_fn) 779 logging.info('extracted %s', src) 780 # Remove .tbz2 from the local path. 781 dest = os.path.splitext(dest)[0] 782 783 self.host.get_file(src, dest) 784 return dest, src 785 786 def download_cr50_gs_image(self, gsurl, extract_fn, image_bid): 787 """Get the image from gs and save it in the autotest dir. 788 789 @param gsurl: The gs url for the cr50 image 790 @param extract_fn: The name of the file to extract from the cr50 image 791 tarball. Don't extract anything if extract_fn is None. 792 @param image_bid: the image symbolic board id 793 @return: A tuple with the local path and version 794 """ 795 dest, src = self.download_cr50_gs_file(gsurl, extract_fn) 796 ver = cr50_utils.GetBinVersion(self.host, src) 797 798 # Compare the image board id to the downloaded image to make sure we got 799 # the right file 800 downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True) 801 if image_bid and image_bid != downloaded_bid: 802 raise error.TestError( 803 'Could not download image with matching ' 804 'board id wanted %s got %s' % (image_bid, downloaded_bid)) 805 return dest, ver 806 807 def download_cr50_eraseflashinfo_image(self): 808 """download the cr50 image that allows erasing flashinfo. 809 810 Get the file with the matching devid. 811 812 @return: A tuple with the debug image local path and version 813 """ 814 devid = self._devid.replace(' ', '-').replace('0x', '') 815 gsurl = os.path.join(self.GS_PRIVATE_DBG, 816 self.CR50_ERASEFLASHINFO_FILE % devid) 817 return self.download_cr50_gs_image(gsurl, None, None) 818 819 def download_cr50_debug_image(self, devid='', image_bid=''): 820 """download the cr50 debug file. 821 822 Get the file with the matching devid and image board id info 823 824 @param image_bid: the image board id info string or list 825 @return: A tuple with the debug image local path and version 826 """ 827 bid_ext = '' 828 # Add the image bid string to the filename 829 if image_bid: 830 image_bid = cr50_utils.GetBoardIdInfoString( 831 image_bid, symbolic=True) 832 bid_ext = '.' + image_bid.replace(':', '_') 833 834 devid = devid if devid else self._devid 835 dbg_file = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'), bid_ext) 836 gsurl = os.path.join(self.GS_PRIVATE_DBG, dbg_file) 837 return self.download_cr50_gs_image(gsurl, None, image_bid) 838 839 def download_cr50_tot_image(self): 840 """download the cr50 TOT image. 841 842 @return: the local path to the TOT image. 843 """ 844 # TODO(mruthven): use logic from provision_Cr50TOT 845 raise error.TestNAError('Could not download TOT image') 846 847 def _find_release_image_gsurl(self, fn): 848 """Find the gs url for the release image""" 849 for gsbucket in [self.GS_PUBLIC, self.GS_PRIVATE_PROD]: 850 gsurl = os.path.join(gsbucket, fn) 851 if self.find_cr50_gs_image(gsurl): 852 return gsurl 853 raise error.TestFail('%s is not on google storage' % fn) 854 855 def download_cr50_release_image(self, image_rw, image_bid=''): 856 """download the cr50 release file. 857 858 Get the file with the matching version and image board id info 859 860 @param image_rw: the rw version string 861 @param image_bid: the image board id info string or list 862 @return: A tuple with the release image local path and version 863 """ 864 bid_ext = '' 865 # Add the image bid string to the gsurl 866 if image_bid: 867 image_bid = cr50_utils.GetBoardIdInfoString( 868 image_bid, symbolic=True) 869 bid_ext = '_' + image_bid.replace(':', '_') 870 release_fn = self.CR50_PROD_FILE % (image_rw, bid_ext) 871 gsurl = self._find_release_image_gsurl(release_fn) 872 873 # Release images can be found using the rw version 874 # Download the image 875 dest, ver = self.download_cr50_gs_image(gsurl, 'cr50.bin.prod', 876 image_bid) 877 878 # Compare the rw version and board id info to make sure the right image 879 # was found 880 if image_rw != ver[1]: 881 raise error.TestError('Could not download image with matching ' 882 'rw version') 883 return dest, ver 884 885 def _cr50_verify_update(self, expected_rw, expect_rollback): 886 """Verify the expected version is running on cr50. 887 888 @param expected_rw: The RW version string we expect to be running 889 @param expect_rollback: True if cr50 should have rolled back during the 890 update 891 @raise TestFail: if there is any unexpected update state 892 """ 893 errors = [] 894 running_rw = self.cr50.get_version() 895 if expected_rw != running_rw: 896 errors.append('running %s not %s' % (running_rw, expected_rw)) 897 898 if expect_rollback != self.cr50.rolledback(): 899 errors.append('%srollback detected' % 900 'no ' if expect_rollback else '') 901 if len(errors): 902 raise error.TestFail('cr50_update failed: %s' % ', '.join(errors)) 903 logging.info('RUNNING %s after %s', expected_rw, 904 'rollback' if expect_rollback else 'update') 905 906 def _cr50_run_update(self, path): 907 """Install the image at path onto cr50. 908 909 @param path: the location of the image to update to 910 @return: the rw version of the image 911 """ 912 tmp_dest = '/tmp/' + os.path.basename(path) 913 914 dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest) 915 # Use the -p option to make sure the DUT does a clean reboot. 916 cr50_utils.GSCTool(self.host, ['-a', dest, '-p']) 917 # Reboot the DUT to finish the cr50 update. 918 self.host.reboot(wait=False) 919 return image_ver[1] 920 921 def cr50_update(self, path, rollback=False, expect_rollback=False): 922 """Attempt to update to the given image. 923 924 If rollback is True, we assume that cr50 is already running an image 925 that can rollback. 926 927 @param path: the location of the update image 928 @param rollback: True if we need to force cr50 to rollback to update to 929 the given image 930 @param expect_rollback: True if cr50 should rollback on its own 931 @raise TestFail: if the update failed 932 """ 933 original_rw = self.cr50.get_version() 934 935 # Cr50 is going to reject an update if it hasn't been up for more than 936 # 60 seconds. Wait until that passes before trying to run the update. 937 self.cr50.wait_until_update_is_allowed() 938 939 image_rw = self._cr50_run_update(path) 940 941 # Running the update may cause cr50 to reboot. Wait for that before 942 # sending more commands. The reboot should happen quickly. 943 self.cr50.wait_for_reboot( 944 timeout=self.faft_config.gsc_update_wait_for_reboot) 945 946 if rollback: 947 self.cr50.rollback() 948 949 expected_rw = original_rw if expect_rollback else image_rw 950 # If we expect a rollback, the version should remain unchanged 951 self._cr50_verify_update(expected_rw, rollback or expect_rollback) 952 953 def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error): 954 """Run a gsctool command and input the password 955 956 @param password: The cr50 password string 957 @param cmd: The gsctool command 958 @param name: The name to give the job 959 @param expect_error: True if the command should fail 960 """ 961 set_pwd_cmd = utils.sh_escape(cmd) 962 full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'), 963 set_pwd_cmd) 964 logging.info('Running: %s', cmd) 965 logging.info('Password: %s', password) 966 967 # Make sure the test waits long enough to avoid ccd rate limiting. 968 time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT) 969 970 stdout = StringIO.StringIO() 971 # Start running the gsctool Command in the background. 972 gsctool_job = utils.BgJob( 973 full_ssh_command, 974 nickname='%s_with_password' % name, 975 stdout_tee=stdout, 976 stderr_tee=utils.TEE_TO_LOGS, 977 stdin=subprocess.PIPE) 978 if gsctool_job == None: 979 raise error.TestFail('could not start gsctool command %r' % cmd) 980 981 try: 982 # Wait for enter prompt 983 gsctool_job.process_output() 984 logging.info(stdout.getvalue().strip()) 985 # Enter the password 986 gsctool_job.sp.stdin.write(password + '\n') 987 988 # Wait for re-enter prompt 989 gsctool_job.process_output() 990 logging.info(stdout.getvalue().strip()) 991 # Re-enter the password 992 gsctool_job.sp.stdin.write(password + '\n') 993 time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT) 994 gsctool_job.process_output() 995 finally: 996 exit_status = utils.nuke_subprocess(gsctool_job.sp) 997 output = stdout.getvalue().strip() 998 logging.info('%s stdout: %s', name, output) 999 logging.info('%s exit status: %s', name, exit_status) 1000 if exit_status: 1001 message = ('gsctool %s failed using %r: %s %s' % 1002 (name, password, exit_status, output)) 1003 if expect_error: 1004 logging.info(message) 1005 else: 1006 raise error.TestFail(message) 1007 elif expect_error: 1008 raise error.TestFail('%s with %r did not fail when expected' % 1009 (name, password)) 1010 1011 def set_ccd_password(self, password, expect_error=False): 1012 """Set the ccd password""" 1013 # Testlab mode can't be enabled if there is no power button, so we 1014 # shouldn't allow setting the password. 1015 if not self.faft_config.has_powerbutton: 1016 raise error.TestError('No power button') 1017 1018 # If for some reason the test sets a password and is interrupted before 1019 # we can clear it, we want testlab mode to be enabled, so it's possible 1020 # to clear the password without knowing it. 1021 if not self.cr50.testlab_is_on(): 1022 raise error.TestError('Will not set password unless testlab mode ' 1023 'is enabled.') 1024 try: 1025 self.run_gsctool_cmd_with_password(password, 'gsctool -a -P', 1026 'set_password', expect_error) 1027 finally: 1028 logging.info('Cr50 password is %s', 1029 'cleared' if self.cr50.password_is_reset() else 'set') 1030 1031 def ccd_unlock_from_ap(self, password=None, expect_error=False): 1032 """Unlock cr50""" 1033 if not password: 1034 self.host.run('gsctool -a -U') 1035 return 1036 self.run_gsctool_cmd_with_password(password, 'gsctool -a -U', 'unlock', 1037 expect_error) 1038 1039 def tpm_is_responsive(self): 1040 """Check TPM responsiveness by running tpm_version.""" 1041 result = self.host.run('tpm_version', ignore_status=True) 1042 logging.debug(result.stdout.strip()) 1043 return not result.exit_status 1044