1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import pprint 8import StringIO 9import subprocess 10import time 11 12from autotest_lib.client.bin import utils 13from autotest_lib.client.common_lib import error, utils 14from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils 15from autotest_lib.server.cros import filesystem_util, gsutil_wrapper 16from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 17 18 19class Cr50Test(FirmwareTest): 20 """Base class that sets up helper objects/functions for cr50 tests.""" 21 version = 1 22 23 RELEASE_POOLS = ['faft-cr50', 'faft-cr50-experimental'] 24 RESPONSE_TIMEOUT = 180 25 GS_PRIVATE = 'gs://chromeos-localmirror-private/distfiles/' 26 # Prod signed test images are stored in the private cr50 directory. 27 GS_PRIVATE_PROD = GS_PRIVATE + 'cr50/' 28 # Node locked test images are in this private debug directory. 29 GS_PRIVATE_DBG = GS_PRIVATE + 'chromeos-cr50-debug-0.0.11/' 30 GS_PUBLIC = 'gs://chromeos-localmirror/distfiles/' 31 CR50_PROD_FILE = 'cr50.r0.0.1*.w%s%s.tbz2' 32 CR50_DEBUG_FILE = '*/cr50.dbg.%s.bin.*%s' 33 CR50_ERASEFLASHINFO_FILE = ( 34 '*/cr50_Unknown_NodeLocked-%s_cr50-accessory-mp.bin') 35 CR50_QUAL_VERSION_FILE = 'chromeos-cr50-QUAL_VERSION' 36 NONE = 0 37 # Saved the original device state during init. 38 INITIAL_IMAGE_STATE = 1 << 0 39 # Saved the original image, the device image, and the debug image. These 40 # images are needed to be able to restore the original image and board id. 41 DEVICE_IMAGES = 1 << 1 42 DBG_IMAGE = 1 << 2 43 ERASEFLASHINFO_IMAGE = 1 << 3 44 # Different attributes of the device state require the test to download 45 # different images. STATE_IMAGE_RESTORES is a dictionary of the state each 46 # image type can restore. 47 STATE_IMAGE_RESTORES = { 48 DEVICE_IMAGES : ['prod_version', 'prepvt_version'], 49 DBG_IMAGE : ['running_image_ver', 'running_image_bid', 'chip_bid'], 50 ERASEFLASHINFO_IMAGE : ['chip_bid'], 51 } 52 PP_SHORT_INTERVAL = 3 53 # Cr50 may have flash operation errors during the test. Here's an example 54 # of one error message. 55 # do_flash_op:245 errors 20 fsh_pe_control 40720004 56 # The stuff after the ':' may change, but all flash operation errors 57 # contain do_flash_op. do_flash_op is only ever printed if there is an 58 # error during the flash operation. Just search for do_flash_op to simplify 59 # the search string and make it applicable to all flash op errors. 60 CR50_FLASH_OP_ERROR_MSG = 'do_flash_op' 61 # USB issues may show up with the timer sof calibration overflow interrupt. 62 # Count these during cleanup. 63 CR50_USB_ERROR = 'timer_sof_calibration_overflow_int' 64 65 def initialize(self, host, cmdline_args, full_args, 66 restore_cr50_image=False, restore_cr50_board_id=False, 67 provision_update=False): 68 self._saved_state = self.NONE 69 self._raise_error_on_mismatch = not restore_cr50_image 70 self._provision_update = provision_update 71 self.tot_test_run = full_args.get('tot_test_run', '').lower() == 'true' 72 super(Cr50Test, self).initialize(host, cmdline_args) 73 74 if not hasattr(self, 'cr50'): 75 raise error.TestNAError('Test can only be run on devices with ' 76 'access to the Cr50 console') 77 # TODO(b/149948314): remove when dual-v4 is sorted out. 78 if 'ccd_cr50' in self.servo.get_servo_version(): 79 self.servo.set_nocheck('watchdog_remove', 'ccd') 80 81 logging.info('Test Args: %r', full_args) 82 83 self._devid = self.servo.get('cr50_devid') 84 self.can_set_ccd_level = (not self.servo.main_device_is_ccd() or 85 self.cr50.testlab_is_on()) 86 self.original_ccd_level = self.cr50.get_ccd_level() 87 self.original_ccd_settings = self.cr50.get_cap_dict( 88 info=self.cr50.CAP_SETTING) 89 90 self.host = host 91 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 92 # Clear the FWMP, so it can't disable CCD. 93 self.clear_fwmp() 94 95 if self.can_set_ccd_level: 96 # Lock cr50 so the console will be restricted 97 self.cr50.set_ccd_level('lock') 98 elif self.original_ccd_level != 'lock': 99 raise error.TestNAError('Lock the console before running cr50 test') 100 101 self._save_original_state(full_args.get('release_path', '')) 102 103 # Try and download all images necessary to restore cr50 state. 104 try: 105 self._save_dbg_image(full_args.get('cr50_dbg_image_path', '')) 106 self._saved_state |= self.DBG_IMAGE 107 except Exception as e: 108 logging.warning('Error saving DBG image: %s', str(e)) 109 if restore_cr50_image: 110 raise error.TestNAError('Need DBG image: %s' % str(e)) 111 112 try: 113 self._save_eraseflashinfo_image( 114 full_args.get('cr50_eraseflashinfo_image_path', '')) 115 self._saved_state |= self.ERASEFLASHINFO_IMAGE 116 except Exception as e: 117 logging.warning('Error saving eraseflashinfo image: %s', str(e)) 118 if restore_cr50_board_id: 119 raise error.TestNAError('Need eraseflashinfo image: %s' % 120 str(e)) 121 122 # TODO(b/143888583): remove qual update during init once new design to 123 # to provision cr50 updates is in place. 124 # Make sure the release image is running before starting the test. 125 is_release_qual = full_args.get('is_release_qual', '').lower() == 'true' 126 if is_release_qual or self.running_cr50_release_suite(): 127 release_ver_arg = full_args.get('release_ver', '') 128 release_path_arg = full_args.get('release_path', '') 129 self.ensure_qual_image_is_running(release_ver_arg, release_path_arg) 130 131 132 def running_cr50_release_suite(self): 133 """Return True if the DUT is in a release pool.""" 134 for pool in self.host.host_info_store.get().pools: 135 # TODO(b/149109740): remove once the pool values are verified. 136 # Change to run with faft-cr50 and faft-cr50-experimental suites. 137 logging.info('Checking pool: %s', pool) 138 if pool in self.RELEASE_POOLS: 139 logging.info('Running a release test.') 140 return True 141 return False 142 143 144 def ensure_qual_image_is_running(self, qual_ver_str, qual_path): 145 """Update to the qualification image if it's not running. 146 147 qual_ver_str and path are command line args that may be supplied to 148 specify a local version or path. If neither are supplied, the version 149 from gs will be used to determine what version cr50 should run. 150 151 qual_ver_str and qual_path should not be supplied together. If they are, 152 the path will be used. It's not a big deal as long as they agree with 153 each other. 154 155 @param qual_ver_str: qualification version string or None. 156 @param qual_path: local path to the qualification image or None. 157 """ 158 # Get the local image information. 159 if qual_path: 160 dest, qual_ver = cr50_utils.InstallImage(self.host, qual_path, 161 '/tmp/qual_cr50.bin') 162 self.host.run('rm ' + dest) 163 qual_bid_str = (cr50_utils.GetBoardIdInfoString(qual_ver[2], False) 164 if qual_ver[2] else '') 165 qual_ver_str = '%s/%s' % (qual_ver[1], qual_bid_str) 166 167 # Determine the qualification version from. 168 if not qual_ver_str: 169 gsurl = os.path.join(self.GS_PRIVATE, self.CR50_QUAL_VERSION_FILE) 170 dut_path = self.download_cr50_gs_file(gsurl, False)[1] 171 qual_ver_str = self.host.run('cat ' + dut_path).stdout.strip() 172 173 # Download the qualification image based on the version. 174 if not qual_path: 175 rw, bid = qual_ver_str.split('/') 176 qual_path, qual_ver = self.download_cr50_release_image(rw, bid) 177 178 logging.info('Cr50 Qual Version: %s', qual_ver_str) 179 logging.info('Cr50 Qual Path: %s', qual_path) 180 qual_chip_bid = cr50_utils.GetChipBIDFromImageBID( 181 qual_ver[2], self.get_device_brand()) 182 logging.info('Cr50 Qual Chip BID: %s', qual_chip_bid) 183 184 # Replace only the prod or prepvt image based on the major version. 185 if int(qual_ver[1].split('.')[1]) % 2: 186 prod_ver = self._original_image_state['prod_version'] 187 prepvt_ver = qual_ver 188 prod_path = self._device_prod_image 189 prepvt_path = qual_path 190 else: 191 prod_ver = qual_ver 192 prepvt_ver = self._original_image_state['prepvt_version'] 193 prod_path = qual_path 194 prepvt_path = self._device_prepvt_image 195 196 # Generate a dictionary with all of the expected state. 197 qual_state = {} 198 qual_state['prod_version'] = prod_ver 199 qual_state['prepvt_version'] = prepvt_ver 200 qual_state['chip_bid'] = qual_chip_bid 201 qual_state['running_image_bid'] = qual_ver[2] 202 # The test can't rollback RO. The newest RO should be running at the end 203 # of the test. max_ro will be none if the versions are the same. Use the 204 # running_ro in that case. 205 running_ro = self.get_saved_cr50_original_version()[0] 206 max_ro = cr50_utils.GetNewestVersion(running_ro, qual_ver[0]) 207 qual_state['running_image_ver'] = (max_ro or running_ro, qual_ver[1], 208 None) 209 mismatch = self._check_running_image_and_board_id(qual_state) 210 if not mismatch: 211 logging.info('Running qual image. No update needed.') 212 return 213 logging.info('Cr50 qual update required.') 214 # TODO(b/149109740): remove once running_cr50_release_suite logic has 215 # been verified. 216 logging.info('Skipping until logic has been verified') 217 return 218 filesystem_util.make_rootfs_writable(self.host) 219 self._update_device_images_and_running_cr50_firmware(qual_state, 220 qual_path, prod_path, prepvt_path) 221 logging.info("Recording qual device state as 'original' device state") 222 self._save_original_state(qual_path) 223 224 225 def _saved_cr50_state(self, state): 226 """Returns True if the test has saved the given state 227 228 @param state: a integer representing the state to check. 229 """ 230 return state & self._saved_state 231 232 233 def after_run_once(self): 234 """Log which iteration just ran""" 235 logging.info('successfully ran iteration %d', self.iteration) 236 self._try_to_bring_dut_up() 237 238 239 def _save_dbg_image(self, cr50_dbg_image_path): 240 """Save or download the node locked dev image. 241 242 @param cr50_dbg_image_path: The path to the node locked cr50 image. 243 """ 244 if os.path.isfile(cr50_dbg_image_path): 245 self._dbg_image_path = cr50_dbg_image_path 246 else: 247 self._dbg_image_path = self.download_cr50_debug_image()[0] 248 249 250 def _save_eraseflashinfo_image(self, cr50_eraseflashinfo_image_path): 251 """Save or download the node locked eraseflashinfo image. 252 253 @param cr50_eraseflashinfo_image_path: The path to the node locked cr50 254 image. 255 """ 256 if os.path.isfile(cr50_eraseflashinfo_image_path): 257 self._eraseflashinfo_image_path = cr50_eraseflashinfo_image_path 258 else: 259 self._eraseflashinfo_image_path = ( 260 self.download_cr50_eraseflashinfo_image()[0]) 261 262 263 def _save_device_image(self, ext): 264 """Download the .prod or .prepvt device image and get the version. 265 266 @param ext: The Cr50 file extension: prod or prepvt. 267 @returns (local_path, rw_version, bid_string) or (None, None, None) if 268 the file doesn't exist on the DUT. 269 """ 270 version = self._original_image_state[ext + '_version'] 271 if not version: 272 return None, None, None 273 _, rw_ver, bid = version 274 rw_filename = 'cr50.device.bin.%s.%s' % (ext, rw_ver) 275 local_path = os.path.join(self.resultsdir, rw_filename) 276 dut_path = cr50_utils.GetDevicePath(ext) 277 self.host.get_file(dut_path, local_path) 278 bid = cr50_utils.GetBoardIdInfoString(bid) 279 return local_path, rw_ver, bid 280 281 282 def _save_original_images(self, release_path): 283 """Use the saved state to find all of the device images. 284 285 This will download running cr50 image and the device image. 286 287 @param release_path: The release path given by test args 288 """ 289 local_path, prod_rw, prod_bid = self._save_device_image('prod') 290 self._device_prod_image = local_path 291 292 local_path, prepvt_rw, prepvt_bid = self._save_device_image('prepvt') 293 self._device_prepvt_image = local_path 294 295 if os.path.isfile(release_path): 296 self._original_cr50_image = release_path 297 logging.info('using supplied image') 298 return 299 if self.tot_test_run: 300 self._original_cr50_image = self.download_cr50_tot_image() 301 return 302 303 # If the running cr50 image version matches the image on the DUT use 304 # the DUT image as the original image. If the versions don't match 305 # download the image from google storage 306 _, running_rw, running_bid = self.get_saved_cr50_original_version() 307 308 # Convert the running board id to the same format as the prod and 309 # prepvt board ids. 310 running_bid = cr50_utils.GetBoardIdInfoString(running_bid) 311 if running_rw == prod_rw and running_bid == prod_bid: 312 logging.info('Using device cr50 prod image %s %s', prod_rw, 313 prod_bid) 314 self._original_cr50_image = self._device_prod_image 315 elif running_rw == prepvt_rw and running_bid == prepvt_bid: 316 logging.info('Using device cr50 prepvt image %s %s', prepvt_rw, 317 prepvt_bid) 318 self._original_cr50_image = self._device_prepvt_image 319 else: 320 logging.info('Downloading cr50 image %s %s', running_rw, 321 running_bid) 322 self._original_cr50_image = self.download_cr50_release_image( 323 running_rw, running_bid)[0] 324 325 326 def _save_original_state(self, release_path): 327 """Save the cr50 related state. 328 329 Save the device's current cr50 version, cr50 board id, the running cr50 330 image, the prepvt, and prod cr50 images. These will be used to restore 331 the cr50 state during cleanup. 332 333 @param release_path: the optional command line arg of path for the local 334 cr50 image. 335 """ 336 self._saved_state &= ~self.INITIAL_IMAGE_STATE 337 self._original_image_state = self.get_image_and_bid_state() 338 # We successfully saved the device state 339 self._saved_state |= self.INITIAL_IMAGE_STATE 340 self._saved_state &= ~self.DEVICE_IMAGES 341 try: 342 self._save_original_images(release_path) 343 self._saved_state |= self.DEVICE_IMAGES 344 except Exception as e: 345 logging.warning('Error saving ChromeOS image cr50 firmware: %s', 346 str(e)) 347 348 349 def get_saved_cr50_original_version(self): 350 """Return (ro ver, rw ver, bid).""" 351 if ('running_image_ver' not in self._original_image_state or 352 'running_image_bid' not in self._original_image_state): 353 raise error.TestError('No record of original cr50 image version') 354 return (self._original_image_state['running_image_ver'][0], 355 self._original_image_state['running_image_ver'][1], 356 self._original_image_state['running_image_bid']) 357 358 359 def get_saved_cr50_original_path(self): 360 """Return the local path for the original cr50 image.""" 361 if not hasattr(self, '_original_cr50_image'): 362 raise error.TestError('No record of original image') 363 return self._original_cr50_image 364 365 366 def has_saved_dbg_image_path(self): 367 """Returns true if we saved the node locked debug image.""" 368 return hasattr(self, '_dbg_image_path') 369 370 371 def get_saved_dbg_image_path(self): 372 """Return the local path for the cr50 dev image.""" 373 if not self.has_saved_dbg_image_path(): 374 raise error.TestError('No record of debug image') 375 return self._dbg_image_path 376 377 378 def get_saved_eraseflashinfo_image_path(self): 379 """Return the local path for the cr50 eraseflashinfo image.""" 380 if not hasattr(self, '_eraseflashinfo_image_path'): 381 raise error.TestError('No record of eraseflashinfo image') 382 return self._eraseflashinfo_image_path 383 384 def get_device_brand(self): 385 """Returns the 4 character device brand.""" 386 return self._original_image_state['cros_config / brand-code'] 387 388 389 def _retry_cr50_update(self, image, retries, rollback): 390 """Try to update to the given image retries amount of times. 391 392 @param image: The image path. 393 @param retries: The number of times to try to update. 394 @param rollback: Run rollback after the update. 395 @raises TestFail if the update failed. 396 """ 397 for i in range(retries): 398 try: 399 return self.cr50_update(image, rollback=rollback) 400 except Exception, e: 401 logging.warning('Failed to update to %s attempt %d: %s', 402 os.path.basename(image), i, str(e)) 403 logging.info('Sleeping 60 seconds') 404 time.sleep(60) 405 raise error.TestError('Failed to update to %s' % 406 os.path.basename(image)) 407 408 409 def run_update_to_eraseflashinfo(self): 410 """Erase flashinfo using the eraseflashinfo image. 411 412 Update to the DBG image, rollback to the eraseflashinfo, and run 413 eraseflashinfo. 414 """ 415 self._retry_cr50_update(self._dbg_image_path, 3, False) 416 self._retry_cr50_update(self._eraseflashinfo_image_path, 3, True) 417 if not self.cr50.eraseflashinfo(): 418 raise error.TestError('Unable to erase the board id') 419 420 421 422 423 def eraseflashinfo_and_restore_image(self, image=''): 424 """eraseflashinfo and update to the given the image. 425 426 @param image: the image to end on. Use the original test image if no 427 image is given. 428 """ 429 image = image if image else self.get_saved_cr50_original_path() 430 self.run_update_to_eraseflashinfo() 431 self.cr50_update(image) 432 433 434 def update_cr50_image_and_board_id(self, image_path, bid): 435 """Set the chip board id and updating the cr50 image. 436 437 Make 3 attempts to update to the original image. Use a rollback from 438 the DBG image to erase the state that can only be erased by a DBG image. 439 Set the chip board id during rollback. 440 441 @param image_path: path of the image to update to. 442 @param bid: the board id to set. 443 """ 444 current_bid = cr50_utils.GetChipBoardId(self.host) 445 bid_mismatch = current_bid != bid 446 set_bid = bid_mismatch and bid != cr50_utils.ERASED_CHIP_BID 447 bid_is_erased = current_bid == cr50_utils.ERASED_CHIP_BID 448 eraseflashinfo = bid_mismatch and not bid_is_erased 449 450 if (eraseflashinfo and not 451 self._saved_cr50_state(self.ERASEFLASHINFO_IMAGE)): 452 raise error.TestFail('Did not save eraseflashinfo image') 453 454 # Remove prepvt and prod iamges, so they don't interfere with the test 455 # rolling back and updating to images that my be older than the images 456 # on the device. 457 if filesystem_util.is_rootfs_writable(self.host): 458 self.host.run('rm %s' % cr50_utils.CR50_PREPVT, ignore_status=True) 459 self.host.run('rm %s' % cr50_utils.CR50_PROD, ignore_status=True) 460 461 if eraseflashinfo: 462 self.run_update_to_eraseflashinfo() 463 464 self._retry_cr50_update(self._dbg_image_path, 3, False) 465 466 chip_bid = bid[0] 467 chip_flags = bid[2] 468 if set_bid: 469 self.cr50.set_board_id(chip_bid, chip_flags) 470 471 self._retry_cr50_update(image_path, 3, True) 472 473 474 def _cleanup_required(self, state_mismatch, image_type): 475 """Return True if the update can fix something in the mismatched state. 476 477 @param state_mismatch: a dictionary of the mismatched state. 478 @param image_type: The integer representing the type of image 479 """ 480 state_image_restores = set(self.STATE_IMAGE_RESTORES[image_type]) 481 restore = state_image_restores.intersection(state_mismatch.keys()) 482 if restore and not self._saved_cr50_state(image_type): 483 raise error.TestError('Did not save images to restore %s' % 484 (', '.join(restore))) 485 return not not restore 486 487 488 def _get_image_information(self, ext): 489 """Get the image information for the .prod or .prepvt image. 490 491 @param ext: The extension string prod or prepvt 492 @param returns: The image version or None if the image doesn't exist. 493 """ 494 dut_path = cr50_utils.GetDevicePath(ext) 495 file_exists = self.host.path_exists(dut_path) 496 if file_exists: 497 return cr50_utils.GetBinVersion(self.host, dut_path) 498 return None 499 500 501 def get_image_and_bid_state(self): 502 """Get a dict with the current device cr50 information. 503 504 The state dict will include the platform brand, chip board id, the 505 running cr50 image version, the running cr50 image board id, and the 506 device cr50 image version. 507 """ 508 state = {} 509 state['cros_config / brand-code'] = self.host.run( 510 'cros_config / brand-code', ignore_status=True).stdout.strip() 511 state['prod_version'] = self._get_image_information('prod') 512 state['prepvt_version'] = self._get_image_information('prepvt') 513 state['chip_bid'] = cr50_utils.GetChipBoardId(self.host) 514 state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid'] 515 state['running_image_ver'] = cr50_utils.GetRunningVersion(self.host) 516 state['running_image_bid'] = self.cr50.get_active_board_id_str() 517 518 logging.debug('Current Cr50 state:\n%s', pprint.pformat(state)) 519 return state 520 521 522 def _check_running_image_and_board_id(self, expected_state): 523 """Compare the current image and board id to the given state. 524 525 @param expected_state: A dictionary of the state to compare to. 526 @return: A dictionary with the state that is wrong as the key and the 527 expected and current state as the value. 528 """ 529 if not (self._saved_state & self.INITIAL_IMAGE_STATE): 530 logging.warning('Did not save the original state. Cannot verify it ' 531 'matches') 532 return 533 # Make sure the /var/cache/cr50* state is up to date. 534 cr50_utils.ClearUpdateStateAndReboot(self.host) 535 536 mismatch = {} 537 state = self.get_image_and_bid_state() 538 539 for k, expected_val in expected_state.iteritems(): 540 val = state[k] 541 if val != expected_val: 542 mismatch[k] = 'expected: %s, current: %s' % (expected_val, val) 543 544 if mismatch: 545 logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch)) 546 return mismatch 547 548 549 def _check_original_image_state(self): 550 """Compare the current cr50 state to the original state. 551 552 @return: A dictionary with the state that is wrong as the key and the 553 new and old state as the value 554 """ 555 mismatch = self._check_running_image_and_board_id( 556 self._original_image_state) 557 if not mismatch: 558 logging.info('The device is in the original state') 559 return mismatch 560 561 562 def _reset_ccd_settings(self): 563 """Reset the ccd lock and capability states.""" 564 if not self.cr50.ccd_is_reset(): 565 # Try to open cr50 and enable testlab mode if it isn't enabled. 566 try: 567 self.fast_open(True) 568 except: 569 # Even if we can't open cr50, do our best to reset the rest of 570 # the system state. Log a warning here. 571 logging.warning('Unable to Open cr50', exc_info=True) 572 self.cr50.send_command('ccd reset') 573 if not self.cr50.ccd_is_reset(): 574 raise error.TestFail('Could not reset ccd') 575 576 current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING) 577 if self.original_ccd_settings != current_settings: 578 if not self.can_set_ccd_level: 579 raise error.TestError("CCD state has changed, but we can't " 580 "restore it") 581 self.fast_open(True) 582 self.cr50.set_caps(self.original_ccd_settings) 583 584 # First try using testlab open to open the device 585 if self.original_ccd_level == 'open': 586 self.fast_open(True) 587 elif self.original_ccd_level != self.cr50.get_ccd_level(): 588 self.cr50.set_ccd_level(self.original_ccd_level) 589 590 def cleanup(self): 591 """Attempt to cleanup the cr50 state. Then run firmware cleanup""" 592 try: 593 self._try_to_bring_dut_up() 594 self._restore_cr50_state() 595 finally: 596 super(Cr50Test, self).cleanup() 597 598 # Check the logs captured during firmware_test cleanup for cr50 errors. 599 self._get_cr50_stats_from_uart_capture() 600 601 602 def _get_cr50_stats_from_uart_capture(self): 603 """Check cr50 uart output for errors. 604 605 Use the logs captured during firmware_test cleanup to check for cr50 606 errors. Flash operation issues aren't obvious unless you check the logs. 607 All flash op errors print do_flash_op and it isn't printed during normal 608 operation. Open the cr50 uart file and count the number of times this is 609 printed. Log the number of errors. 610 """ 611 if not hasattr(self, 'cr50_uart_file'): 612 logging.info('There is not a cr50 uart file') 613 return 614 615 flash_error_count = 0 616 usb_error_count = 0 617 with open(self.cr50_uart_file, 'r') as f: 618 for line in f: 619 if self.CR50_FLASH_OP_ERROR_MSG in line: 620 flash_error_count += 1 621 if self.CR50_USB_ERROR in line: 622 usb_error_count += 1 623 624 # Log any flash operation errors. 625 logging.info('do_flash_op count: %d', flash_error_count) 626 logging.info('usb error count: %d', usb_error_count) 627 628 629 def _try_to_bring_dut_up(self): 630 """Try to quickly get the dut in a pingable state""" 631 logging.info('checking dut state') 632 633 self.servo.set_nocheck('cold_reset', 'off') 634 self.servo.set_nocheck('warm_reset', 'off') 635 time.sleep(self.cr50.SHORT_WAIT) 636 if not self.cr50.ap_is_on(): 637 logging.info('Pressing power button to turn on AP') 638 self.servo.power_short_press() 639 640 end_time = time.time() + self.RESPONSE_TIMEOUT 641 while not self.host.ping_wait_up( 642 self.faft_config.delay_reboot_to_ping * 2): 643 if time.time() > end_time: 644 logging.warn('DUT is unresponsive after trying to bring it up') 645 return 646 self.servo.get_power_state_controller().reset() 647 logging.info('DUT did not respond. Resetting it.') 648 649 650 def _update_device_images_and_running_cr50_firmware( 651 self, state, release_path, prod_path, prepvt_path): 652 """Update cr50, set the board id, and copy firmware to the DUT. 653 654 @param state: A dictionary with the expected running version, board id, 655 device cr50 firmware versions. 656 @param release_path: The image to update cr50 to 657 @param prod_path: The path to the .prod image 658 @param prepvt_path: The path to the .prepvt image 659 @raises TestError: if setting any state failed 660 """ 661 mismatch = self._check_running_image_and_board_id(state) 662 if not mismatch: 663 logging.info('Nothing to do.') 664 return 665 666 # Use the DBG image to restore the original image. 667 if self._cleanup_required(mismatch, self.DBG_IMAGE): 668 self.update_cr50_image_and_board_id(release_path, state['chip_bid']) 669 670 new_mismatch = self._check_running_image_and_board_id(state) 671 # Copy the original .prod and .prepvt images back onto the DUT. 672 if (self._cleanup_required(new_mismatch, self.DEVICE_IMAGES) and 673 filesystem_util.is_rootfs_writable(self.host)): 674 # Copy the .prod file onto the DUT. 675 if prod_path and 'prod_version' in new_mismatch: 676 cr50_utils.InstallImage(self.host, prod_path, 677 cr50_utils.CR50_PROD) 678 # Copy the .prepvt file onto the DUT. 679 if prepvt_path and 'prepvt_version' in new_mismatch: 680 cr50_utils.InstallImage(self.host, prepvt_path, 681 cr50_utils.CR50_PREPVT) 682 683 final_mismatch = self._check_running_image_and_board_id(state) 684 if final_mismatch: 685 raise error.TestError('Could not update cr50 image state: %s' % 686 final_mismatch) 687 logging.info('Successfully updated all device cr50 firmware state.') 688 689 690 def _restore_device_images_and_running_cr50_firmware(self): 691 """Restore the images on the device and the running cr50 image.""" 692 if self._provision_update: 693 return 694 mismatch = self._check_original_image_state() 695 self._update_device_images_and_running_cr50_firmware( 696 self._original_image_state, self.get_saved_cr50_original_path(), 697 self._device_prod_image, self._device_prepvt_image) 698 699 if self._raise_error_on_mismatch and mismatch: 700 raise error.TestError('Unexpected state mismatch during ' 701 'cleanup %s' % mismatch) 702 703 704 def _restore_ccd_settings(self): 705 """Restore the original ccd state.""" 706 # Reset the password as the first thing in cleanup. It is important that 707 # if some other part of cleanup fails, the password has at least been 708 # reset. 709 self.cr50.send_command('ccd testlab open') 710 self.cr50.send_command('rddkeepalive disable') 711 self.cr50.send_command('ccd reset') 712 self.cr50.send_command('wp follow_batt_pres atboot') 713 714 # Reboot cr50 if the console is accessible. This will reset most state. 715 if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]: 716 self.cr50.reboot() 717 718 # Reenable servo v4 CCD 719 self.cr50.ccd_enable() 720 721 # reboot to normal mode if the device is in dev mode. 722 self.enter_mode_after_checking_tpm_state('normal') 723 724 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 725 self.clear_fwmp() 726 727 # Restore the ccd privilege level 728 self._reset_ccd_settings() 729 730 731 732 def _restore_cr50_state(self): 733 """Restore cr50 state, so the device can be used for further testing. 734 735 Restore the cr50 image and board id first. Then CCD, because flashing 736 dev signed images completely clears the CCD state. 737 """ 738 try: 739 self._restore_device_images_and_running_cr50_firmware() 740 except Exception as e: 741 logging.warning('Issue restoring Cr50 image: %s', str(e)) 742 raise 743 finally: 744 self._restore_ccd_settings() 745 746 747 def find_cr50_gs_image(self, gsurl): 748 """Find the cr50 gs image name. 749 750 @param gsurl: the cr50 image location 751 @return: a list of the gsutil bucket, filename or None if the file 752 can't be found 753 """ 754 try: 755 return utils.gs_ls(gsurl)[0].rsplit('/', 1) 756 except error.CmdError: 757 logging.info('%s does not exist', gsurl) 758 return None 759 760 761 def _extract_cr50_image(self, archive, fn): 762 """Extract the filename from the given archive 763 Aargs: 764 archive: the archive location on the host 765 fn: the file to extract 766 767 Returns: 768 The location of the extracted file 769 """ 770 remote_dir = os.path.dirname(archive) 771 result = self.host.run('tar xfv %s -C %s' % (archive, remote_dir)) 772 for line in result.stdout.splitlines(): 773 if os.path.basename(line) == fn: 774 return os.path.join(remote_dir, line) 775 raise error.TestFail('%s was not extracted from %s' % (fn , archive)) 776 777 778 def download_cr50_gs_file(self, gsurl, extract_fn): 779 """Download and extract the file at gsurl. 780 781 @param gsurl: The gs url for the cr50 image 782 @param extract_fn: The name of the file to extract from the cr50 image 783 tarball. Don't extract anything if extract_fn is None. 784 @return: a tuple (local path, host path) 785 """ 786 file_info = self.find_cr50_gs_image(gsurl) 787 if not file_info: 788 raise error.TestFail('Could not find %s' % gsurl) 789 bucket, fn = file_info 790 791 remote_temp_dir = '/tmp/' 792 src = os.path.join(remote_temp_dir, fn) 793 dest = os.path.join(self.resultsdir, fn) 794 795 # Copy the image to the dut 796 gsutil_wrapper.copy_private_bucket(host=self.host, 797 bucket=bucket, 798 filename=fn, 799 destination=remote_temp_dir) 800 if extract_fn: 801 src = self._extract_cr50_image(src, extract_fn) 802 logging.info('extracted %s', src) 803 # Remove .tbz2 from the local path. 804 dest = os.path.splitext(dest)[0] 805 806 self.host.get_file(src, dest) 807 return dest, src 808 809 810 def download_cr50_gs_image(self, gsurl, extract_fn, image_bid): 811 """Get the image from gs and save it in the autotest dir. 812 813 @param gsurl: The gs url for the cr50 image 814 @param extract_fn: The name of the file to extract from the cr50 image 815 tarball. Don't extract anything if extract_fn is None. 816 @param image_bid: the image symbolic board id 817 @return: A tuple with the local path and version 818 """ 819 dest, src = self.download_cr50_gs_file(gsurl, extract_fn) 820 ver = cr50_utils.GetBinVersion(self.host, src) 821 822 # Compare the image board id to the downloaded image to make sure we got 823 # the right file 824 downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True) 825 if image_bid and image_bid != downloaded_bid: 826 raise error.TestError('Could not download image with matching ' 827 'board id wanted %s got %s' % (image_bid, 828 downloaded_bid)) 829 return dest, ver 830 831 832 def download_cr50_eraseflashinfo_image(self): 833 """download the cr50 image that allows erasing flashinfo. 834 835 Get the file with the matching devid. 836 837 @return: A tuple with the debug image local path and version 838 """ 839 devid = self._devid.replace(' ', '-').replace('0x', '') 840 gsurl = os.path.join(self.GS_PRIVATE_DBG, 841 self.CR50_ERASEFLASHINFO_FILE % devid) 842 return self.download_cr50_gs_image(gsurl, None, None) 843 844 845 def download_cr50_debug_image(self, devid='', image_bid=''): 846 """download the cr50 debug file. 847 848 Get the file with the matching devid and image board id info 849 850 @param image_bid: the image board id info string or list 851 @return: A tuple with the debug image local path and version 852 """ 853 bid_ext = '' 854 # Add the image bid string to the filename 855 if image_bid: 856 image_bid = cr50_utils.GetBoardIdInfoString(image_bid, 857 symbolic=True) 858 bid_ext = '.' + image_bid.replace(':', '_') 859 860 devid = devid if devid else self._devid 861 dbg_file = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'), bid_ext) 862 gsurl = os.path.join(self.GS_PRIVATE_DBG, dbg_file) 863 return self.download_cr50_gs_image(gsurl, None, image_bid) 864 865 866 def download_cr50_tot_image(self): 867 """download the cr50 TOT image. 868 869 @return: the local path to the TOT image. 870 """ 871 # TODO(mruthven): use logic from provision_Cr50TOT 872 raise error.TestNAError('Could not download TOT image') 873 874 875 def _find_release_image_gsurl(self, fn): 876 """Find the gs url for the release image""" 877 for gsbucket in [self.GS_PUBLIC, self.GS_PRIVATE_PROD]: 878 gsurl = os.path.join(gsbucket, fn) 879 if self.find_cr50_gs_image(gsurl): 880 return gsurl 881 raise error.TestFail('%s is not on google storage' % fn) 882 883 884 def download_cr50_release_image(self, image_rw, image_bid=''): 885 """download the cr50 release file. 886 887 Get the file with the matching version and image board id info 888 889 @param image_rw: the rw version string 890 @param image_bid: the image board id info string or list 891 @return: A tuple with the release image local path and version 892 """ 893 bid_ext = '' 894 # Add the image bid string to the gsurl 895 if image_bid: 896 image_bid = cr50_utils.GetBoardIdInfoString(image_bid, 897 symbolic=True) 898 bid_ext = '_' + image_bid.replace(':', '_') 899 release_fn = self.CR50_PROD_FILE % (image_rw, bid_ext) 900 gsurl = self._find_release_image_gsurl(release_fn) 901 902 # Release images can be found using the rw version 903 # Download the image 904 dest, ver = self.download_cr50_gs_image(gsurl, 'cr50.bin.prod', 905 image_bid) 906 907 # Compare the rw version and board id info to make sure the right image 908 # was found 909 if image_rw != ver[1]: 910 raise error.TestError('Could not download image with matching ' 911 'rw version') 912 return dest, ver 913 914 915 def _cr50_verify_update(self, expected_rw, expect_rollback): 916 """Verify the expected version is running on cr50. 917 918 @param expected_rw: The RW version string we expect to be running 919 @param expect_rollback: True if cr50 should have rolled back during the 920 update 921 @raise TestFail: if there is any unexpected update state 922 """ 923 errors = [] 924 running_rw = self.cr50.get_version() 925 if expected_rw != running_rw: 926 errors.append('running %s not %s' % (running_rw, expected_rw)) 927 928 if expect_rollback != self.cr50.rolledback(): 929 errors.append('%srollback detected' % 930 'no ' if expect_rollback else '') 931 if len(errors): 932 raise error.TestFail('cr50_update failed: %s' % ', '.join(errors)) 933 logging.info('RUNNING %s after %s', expected_rw, 934 'rollback' if expect_rollback else 'update') 935 936 937 def _cr50_run_update(self, path): 938 """Install the image at path onto cr50. 939 940 @param path: the location of the image to update to 941 @return: the rw version of the image 942 """ 943 tmp_dest = '/tmp/' + os.path.basename(path) 944 945 dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest) 946 # Use the -p option to make sure the DUT does a clean reboot. 947 cr50_utils.GSCTool(self.host, ['-a', dest, '-p']) 948 # Reboot the DUT to finish the cr50 update. 949 self.host.reboot(wait=False) 950 return image_ver[1] 951 952 953 def cr50_update(self, path, rollback=False, expect_rollback=False): 954 """Attempt to update to the given image. 955 956 If rollback is True, we assume that cr50 is already running an image 957 that can rollback. 958 959 @param path: the location of the update image 960 @param rollback: True if we need to force cr50 to rollback to update to 961 the given image 962 @param expect_rollback: True if cr50 should rollback on its own 963 @raise TestFail: if the update failed 964 """ 965 original_rw = self.cr50.get_version() 966 967 # Cr50 is going to reject an update if it hasn't been up for more than 968 # 60 seconds. Wait until that passes before trying to run the update. 969 self.cr50.wait_until_update_is_allowed() 970 971 image_rw = self._cr50_run_update(path) 972 973 # Running the update may cause cr50 to reboot. Wait for that before 974 # sending more commands. The reboot should happen quickly. Wait a 975 # maximum of 10 seconds. 976 self.cr50.wait_for_reboot(timeout=10) 977 978 if rollback: 979 self.cr50.rollback() 980 981 expected_rw = original_rw if expect_rollback else image_rw 982 # If we expect a rollback, the version should remain unchanged 983 self._cr50_verify_update(expected_rw, rollback or expect_rollback) 984 985 986 def ccd_open_from_ap(self): 987 """Start the open process and press the power button.""" 988 # Opening CCD requires power button presses. If those presses would 989 # power off the AP and prevent CCD open from completing, ignore them. 990 if self.faft_config.ec_forwards_short_pp_press: 991 self.stop_powerd() 992 993 self._ccd_open_last_len = 0 994 995 self._ccd_open_stdout = StringIO.StringIO() 996 997 ccd_open_cmd = utils.sh_escape('gsctool -a -o') 998 full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'), 999 ccd_open_cmd) 1000 # Start running the Cr50 Open process in the background. 1001 self._ccd_open_job = utils.BgJob(full_ssh_cmd, 1002 nickname='ccd_open', 1003 stdout_tee=self._ccd_open_stdout, 1004 stderr_tee=utils.TEE_TO_LOGS) 1005 if self._ccd_open_job == None: 1006 raise error.TestFail('could not start ccd open') 1007 1008 try: 1009 # Wait for the first gsctool power button prompt before starting the 1010 # open process. 1011 logging.info(self._get_ccd_open_output()) 1012 # Cr50 starts out by requesting 5 quick presses then 4 longer 1013 # power button presses. Run the quick presses without looking at the 1014 # command output, because getting the output can take some time. For 1015 # the presses that require a 1 minute wait check the output between 1016 # presses, so we can catch errors 1017 # 1018 # run quick presses for 30 seconds. It may take a couple of seconds 1019 # for open to start. 10 seconds should be enough. 30 is just used 1020 # because it will definitely be enough, and this process takes 300 1021 # seconds, so doing quick presses for 30 seconds won't matter. 1022 end_time = time.time() + 30 1023 while time.time() < end_time: 1024 self.servo.power_short_press() 1025 logging.info('short int power button press') 1026 time.sleep(self.PP_SHORT_INTERVAL) 1027 # Poll the output and press the power button for the longer presses. 1028 utils.wait_for_value(self._check_open_and_press_power_button, 1029 expected_value=True, timeout_sec=self.cr50.PP_LONG) 1030 except Exception, e: 1031 logging.info(e) 1032 raise 1033 finally: 1034 self._close_ccd_open_job() 1035 self._try_to_bring_dut_up() 1036 logging.info(self.cr50.get_ccd_info()) 1037 1038 1039 def _check_open_and_press_power_button(self): 1040 """Check stdout and press the power button if prompted. 1041 1042 @return: True if the process is still running. 1043 """ 1044 logging.info(self._get_ccd_open_output()) 1045 self.servo.power_short_press() 1046 logging.info('long int power button press') 1047 # Give cr50 some time to complete the open process. After the last 1048 # power button press cr50 erases nvmem and resets the dut before setting 1049 # the state to open. Wait a bit so we don't check the ccd state in the 1050 # middle of this reset process. Power button requests happen once a 1051 # minute, so waiting 10 seconds isn't a big deal. 1052 time.sleep(10) 1053 return (self._ccd_open_job.sp.poll() is not None or 'Open' in 1054 self.cr50.get_ccd_info()['State']) 1055 1056 1057 def _get_ccd_open_output(self): 1058 """Read the new output.""" 1059 self._ccd_open_job.process_output() 1060 self._ccd_open_stdout.seek(self._ccd_open_last_len) 1061 output = self._ccd_open_stdout.read() 1062 self._ccd_open_last_len = self._ccd_open_stdout.len 1063 return output 1064 1065 1066 def _close_ccd_open_job(self): 1067 """Terminate the process and check the results.""" 1068 exit_status = utils.nuke_subprocess(self._ccd_open_job.sp) 1069 stdout = self._ccd_open_stdout.getvalue().strip() 1070 delattr(self, '_ccd_open_job') 1071 if stdout: 1072 logging.info('stdout of ccd open:\n%s', stdout) 1073 if exit_status: 1074 logging.info('exit status: %d', exit_status) 1075 if 'Error' in stdout: 1076 raise error.TestFail('ccd open Error %s' % 1077 stdout.split('Error')[-1]) 1078 if self.cr50.OPEN != self.cr50.get_ccd_level(): 1079 raise error.TestFail('unable to open cr50: %s' % stdout) 1080 else: 1081 logging.info('Opened Cr50') 1082 1083 1084 def fast_open(self, enable_testlab=False): 1085 """Try to use testlab open. If that fails, do regular ap open. 1086 1087 @param enable_testlab: If True, enable testlab mode after cr50 is open. 1088 """ 1089 if not self.faft_config.has_powerbutton: 1090 logging.warning('No power button', exc_info=True) 1091 enable_testlab = False 1092 # Try to use testlab open first, so we don't have to wait for the 1093 # physical presence check. 1094 self.cr50.send_command('ccd testlab open') 1095 if self.cr50.get_ccd_level() == 'open': 1096 return 1097 1098 if self.servo.has_control('chassis_open'): 1099 self.servo.set('chassis_open','yes') 1100 # Use the console to open cr50 without entering dev mode if possible. It 1101 # takes longer and relies on more systems to enter dev mode and ssh into 1102 # the AP. Skip the steps that aren't required. 1103 if not self.cr50.get_cap('OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]: 1104 self.enter_mode_after_checking_tpm_state('dev') 1105 1106 if self.cr50.get_cap('OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]: 1107 self.cr50.set_ccd_level(self.cr50.OPEN) 1108 else: 1109 self.ccd_open_from_ap() 1110 1111 if self.servo.has_control('chassis_open'): 1112 self.servo.set('chassis_open','no') 1113 1114 if enable_testlab: 1115 self.cr50.set_ccd_testlab('on') 1116 1117 # Make sure the device is in normal mode. After opening cr50, the TPM 1118 # should be cleared and the device should automatically reset to normal 1119 # mode. Just check to be consistent. It's possible capabilitiy settings 1120 # are set to skip wiping the TPM. 1121 self.enter_mode_after_checking_tpm_state('normal') 1122 1123 1124 def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error): 1125 """Run a gsctool command and input the password 1126 1127 @param password: The cr50 password string 1128 @param cmd: The gsctool command 1129 @param name: The name to give the job 1130 @param expect_error: True if the command should fail 1131 """ 1132 set_pwd_cmd = utils.sh_escape(cmd) 1133 full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'), 1134 set_pwd_cmd) 1135 stdout = StringIO.StringIO() 1136 # Start running the gsctool Command in the background. 1137 gsctool_job = utils.BgJob(full_ssh_command, 1138 nickname='%s_with_password' % name, 1139 stdout_tee=stdout, 1140 stderr_tee=utils.TEE_TO_LOGS, 1141 stdin=subprocess.PIPE) 1142 if gsctool_job == None: 1143 raise error.TestFail('could not start gsctool command %r', cmd) 1144 1145 try: 1146 # Wait for enter prompt 1147 gsctool_job.process_output() 1148 logging.info(stdout.getvalue().strip()) 1149 # Enter the password 1150 gsctool_job.sp.stdin.write(password + '\n') 1151 1152 # Wait for re-enter prompt 1153 gsctool_job.process_output() 1154 logging.info(stdout.getvalue().strip()) 1155 # Re-enter the password 1156 gsctool_job.sp.stdin.write(password + '\n') 1157 time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT) 1158 gsctool_job.process_output() 1159 finally: 1160 exit_status = utils.nuke_subprocess(gsctool_job.sp) 1161 output = stdout.getvalue().strip() 1162 logging.info('%s stdout: %s', name, output) 1163 logging.info('%s exit status: %s', name, exit_status) 1164 if exit_status: 1165 message = ('gsctool %s failed using %r: %s %s' % 1166 (name, password, exit_status, output)) 1167 if expect_error: 1168 logging.info(message) 1169 else: 1170 raise error.TestFail(message) 1171 elif expect_error: 1172 raise error.TestFail('%s with %r did not fail when expected' % 1173 (name, password)) 1174 1175 1176 def set_ccd_password(self, password, expect_error=False): 1177 """Set the ccd password""" 1178 # Testlab mode can't be enabled if there is no power button, so we 1179 # shouldn't allow setting the password. 1180 if not self.faft_config.has_powerbutton: 1181 raise error.TestError('No power button') 1182 1183 # If for some reason the test sets a password and is interrupted before 1184 # we can clear it, we want testlab mode to be enabled, so it's possible 1185 # to clear the password without knowing it. 1186 if not self.cr50.testlab_is_on(): 1187 raise error.TestError('Will not set password unless testlab mode ' 1188 'is enabled.') 1189 self.run_gsctool_cmd_with_password( 1190 password, 'gsctool -a -P', 'set_password', expect_error) 1191 1192 1193 def ccd_unlock_from_ap(self, password=None, expect_error=False): 1194 """Unlock cr50""" 1195 if not password: 1196 self.host.run('gsctool -a -U') 1197 return 1198 self.run_gsctool_cmd_with_password( 1199 password, 'gsctool -a -U', 'unlock', expect_error) 1200 1201 1202 def enter_mode_after_checking_tpm_state(self, mode): 1203 """Reboot to mode if cr50 doesn't already match the state""" 1204 # If the device is already in the correct mode, don't do anything 1205 if (mode == 'dev') == self.cr50.in_dev_mode(): 1206 logging.info('already in %r mode', mode) 1207 return 1208 1209 self.switcher.reboot_to_mode(to_mode=mode) 1210 1211 if (mode == 'dev') != self.cr50.in_dev_mode(): 1212 raise error.TestError('Unable to enter %r mode' % mode) 1213 1214 def tpm_is_responsive(self): 1215 """Check TPM responsiveness by running tpm_version.""" 1216 result = self.host.run('tpm_version', ignore_status=True) 1217 logging.debug(result.stdout.strip()) 1218 return not result.exit_status 1219