1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import pprint 8import StringIO 9import subprocess 10import time 11 12from autotest_lib.client.bin import utils 13from autotest_lib.client.common_lib import error, utils 14from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils 15from autotest_lib.server.cros import debugd_dev_tools, gsutil_wrapper 16from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 17 18 19class Cr50Test(FirmwareTest): 20 """Base class that sets up helper objects/functions for cr50 tests.""" 21 version = 1 22 23 CR50_GS_URL = 'gs://chromeos-localmirror-private/distfiles/chromeos-cr50-%s/' 24 CR50_DEBUG_FILE = '*/cr50_dbg_%s.bin' 25 CR50_PROD_FILE = 'cr50.%s.bin.prod' 26 NONE = 0 27 # Saved the original device state during init. 28 INITIAL_STATE = 1 << 0 29 # Saved the original image, the device image, and the debug image. These 30 # images are needed to be able to restore the original image and board id. 31 IMAGES = 1 << 1 32 PP_SHORT_INTERVAL = 3 33 CLEARED_FWMP_EXIT_STATUS = 1 34 CLEARED_FWMP_ERROR_MSG = ('CRYPTOHOME_ERROR_FIRMWARE_MANAGEMENT_PARAMETERS' 35 '_INVALID') 36 # Cr50 may have flash operation errors during the test. Here's an example 37 # of one error message. 38 # do_flash_op:245 errors 20 fsh_pe_control 40720004 39 # The stuff after the ':' may change, but all flash operation errors 40 # contain do_flash_op. do_flash_op is only ever printed if there is an 41 # error during the flash operation. Just search for do_flash_op to simplify 42 # the search string and make it applicable to all flash op errors. 43 CR50_FLASH_OP_ERROR_MSG = 'do_flash_op' 44 45 def initialize(self, host, cmdline_args, full_args, 46 restore_cr50_state=False, cr50_dev_path='', provision_update=False): 47 self._saved_state = self.NONE 48 self._raise_error_on_mismatch = not restore_cr50_state 49 self._provision_update = provision_update 50 super(Cr50Test, self).initialize(host, cmdline_args) 51 52 if not hasattr(self, 'cr50'): 53 raise error.TestNAError('Test can only be run on devices with ' 54 'access to the Cr50 console') 55 56 logging.info('Test Args: %r', full_args) 57 58 self.can_set_ccd_level = (not self.cr50.using_ccd() or 59 self.cr50.testlab_is_on()) 60 self.original_ccd_level = self.cr50.get_ccd_level() 61 self.original_ccd_settings = self.cr50.get_cap_dict( 62 info=self.cr50.CAP_SETTING) 63 64 self.host = host 65 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 66 # Clear the FWMP, so it can't disable CCD. 67 self.clear_fwmp() 68 69 if self.can_set_ccd_level: 70 # Lock cr50 so the console will be restricted 71 self.cr50.set_ccd_level('lock') 72 elif self.original_ccd_level != 'lock': 73 raise error.TestNAError('Lock the console before running cr50 test') 74 75 self._save_original_state() 76 77 # Verify cr50 is still running the correct version 78 cr50_qual_version = full_args.get('cr50_qual_version', '').strip() 79 if cr50_qual_version: 80 _, running_rw, running_bid = self.get_saved_cr50_original_version() 81 expected_rw, expected_bid_sym = cr50_qual_version.split('/') 82 expected_bid = cr50_utils.GetBoardIdInfoString(expected_bid_sym, 83 symbolic=False) 84 logging.debug('Running %s %s Expect %s %s', running_rw, running_bid, 85 expected_rw, expected_bid) 86 if running_rw != expected_rw or expected_bid != running_bid: 87 raise error.TestError('Not running %s' % cr50_qual_version) 88 89 # We successfully saved the device state 90 self._saved_state |= self.INITIAL_STATE 91 try: 92 self._save_node_locked_dev_image(cr50_dev_path) 93 self._save_original_images(full_args.get('release_path', '')) 94 # We successfully saved the device images 95 self._saved_state |= self.IMAGES 96 except: 97 if restore_cr50_state: 98 raise 99 100 101 def after_run_once(self): 102 """Log which iteration just ran""" 103 logging.info('successfully ran iteration %d', self.iteration) 104 105 106 def _save_node_locked_dev_image(self, cr50_dev_path): 107 """Save or download the node locked dev image. 108 109 Args: 110 cr50_dev_path: The path to the node locked cr50 image. 111 """ 112 if os.path.isfile(cr50_dev_path): 113 self._node_locked_cr50_image = cr50_dev_path 114 else: 115 devid = self.servo.get('cr50_devid') 116 self._node_locked_cr50_image = self.download_cr50_debug_image( 117 devid)[0] 118 119 120 def _save_original_images(self, release_path): 121 """Use the saved state to find all of the device images. 122 123 This will download running cr50 image and the device image. 124 125 Args: 126 release_path: The release path given by test args 127 """ 128 # Copy the prod and prepvt images from the DUT 129 _, prod_rw, prod_bid = self._original_state['device_prod_ver'] 130 filename = 'prod_device_image_' + prod_rw 131 self._device_prod_image = os.path.join(self.resultsdir, 132 filename) 133 self.host.get_file(cr50_utils.CR50_PROD, 134 self._device_prod_image) 135 136 if cr50_utils.HasPrepvtImage(self.host): 137 _, prepvt_rw, prepvt_bid = self._original_state['device_prepvt_ver'] 138 filename = 'prepvt_device_image_' + prepvt_rw 139 self._device_prepvt_image = os.path.join(self.resultsdir, 140 filename) 141 self.host.get_file(cr50_utils.CR50_PREPVT, 142 self._device_prepvt_image) 143 prepvt_bid = cr50_utils.GetBoardIdInfoString(prepvt_bid) 144 else: 145 self._device_prepvt_image = None 146 prepvt_rw = None 147 prepvt_bid = None 148 149 if os.path.isfile(release_path): 150 self._original_cr50_image = release_path 151 logging.info('using supplied image') 152 return 153 # If the running cr50 image version matches the image on the DUT use 154 # the DUT image as the original image. If the versions don't match 155 # download the image from google storage 156 _, running_rw, running_bid = self.get_saved_cr50_original_version() 157 158 # Make sure prod_bid and running_bid are in the same format 159 prod_bid = cr50_utils.GetBoardIdInfoString(prod_bid) 160 running_bid = cr50_utils.GetBoardIdInfoString(running_bid) 161 if running_rw == prod_rw and running_bid == prod_bid: 162 logging.info('Using device cr50 prod image %s %s', prod_rw, 163 prod_bid) 164 self._original_cr50_image = self._device_prod_image 165 elif running_rw == prepvt_rw and running_bid == prepvt_bid: 166 logging.info('Using device cr50 prepvt image %s %s', prepvt_rw, 167 prepvt_bid) 168 self._original_cr50_image = self._device_prepvt_image 169 else: 170 logging.info('Downloading cr50 image %s %s', running_rw, 171 running_bid) 172 self._original_cr50_image = self.download_cr50_release_image( 173 running_rw, running_bid)[0] 174 175 176 def _save_original_state(self): 177 """Save the cr50 related state. 178 179 Save the device's current cr50 version, cr50 board id, rlz, and image 180 at /opt/google/cr50/firmware/cr50.bin.prod. These will be used to 181 restore the state during cleanup. 182 """ 183 self._original_state = self.get_cr50_device_state() 184 185 186 def get_saved_cr50_original_version(self): 187 """Return (ro ver, rw ver, bid).""" 188 if ('running_ver' not in self._original_state or 'cr50_image_bid' not in 189 self._original_state): 190 raise error.TestError('No record of original cr50 image version') 191 return (self._original_state['running_ver'][0], 192 self._original_state['running_ver'][1], 193 self._original_state['cr50_image_bid']) 194 195 196 def get_saved_cr50_original_path(self): 197 """Return the local path for the original cr50 image.""" 198 if not hasattr(self, '_original_cr50_image'): 199 raise error.TestError('No record of original image') 200 return self._original_cr50_image 201 202 203 def has_saved_cr50_dev_path(self): 204 """Returns true if we saved the node locked debug image.""" 205 return hasattr(self, '_node_locked_cr50_image') 206 207 208 def get_saved_cr50_dev_path(self): 209 """Return the local path for the cr50 dev image.""" 210 if not self.has_saved_cr50_dev_path(): 211 raise error.TestError('No record of debug image') 212 return self._node_locked_cr50_image 213 214 215 def _restore_original_image(self, chip_bid, chip_flags): 216 """Restore the cr50 image and erase the state. 217 218 Make 3 attempts to update to the original image. Use a rollback from 219 the DBG image to erase the state that can only be erased by a DBG image. 220 Set the chip board id during rollback 221 222 Args: 223 chip_bid: the integer representation of chip board id or None if the 224 board id should be erased 225 chip_flags: the integer representation of chip board id flags or 226 None if the board id should be erased 227 """ 228 for i in range(3): 229 try: 230 # Update to the node-locked DBG image so we can erase all of 231 # the state we are trying to reset 232 self.cr50_update(self._node_locked_cr50_image) 233 234 # Rollback to the original cr50 image. 235 self.cr50_update(self._original_cr50_image, rollback=True, 236 chip_bid=chip_bid, chip_flags=chip_flags) 237 break 238 except Exception, e: 239 logging.warning('Failed to restore original image attempt %d: ' 240 '%r', i, e) 241 242 243 def rootfs_verification_disable(self): 244 """Remove rootfs verification.""" 245 if not self._rootfs_verification_is_disabled(): 246 logging.debug('Removing rootfs verification.') 247 self.rootfs_tool.enable() 248 249 250 def _rootfs_verification_is_disabled(self): 251 """Returns true if rootfs verification is enabled.""" 252 # Clear the TPM owner before trying to check rootfs verification 253 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 254 self.rootfs_tool = debugd_dev_tools.RootfsVerificationTool() 255 self.rootfs_tool.initialize(self.host) 256 # rootfs_tool.is_enabled is True, that means rootfs verification is 257 # disabled. 258 return self.rootfs_tool.is_enabled() 259 260 261 def _restore_original_state(self): 262 """Restore the original cr50 related device state.""" 263 if not (self._saved_state & self.IMAGES): 264 logging.warning('Did not save the original images. Cannot restore ' 265 'state') 266 return 267 # Remove the prepvt image if the test installed one. 268 if (not self._original_state['has_prepvt'] and 269 cr50_utils.HasPrepvtImage(self.host)): 270 self.host.run('rm %s' % cr50_utils.CR50_PREPVT) 271 # If rootfs verification has been disabled, copy the cr50 device image 272 # back onto the DUT. 273 if self._rootfs_verification_is_disabled(): 274 cr50_utils.InstallImage(self.host, self._device_prod_image, 275 cr50_utils.CR50_PROD) 276 # Install the prepvt image if there was one. 277 if self._device_prepvt_image: 278 cr50_utils.InstallImage(self.host, self._device_prepvt_image, 279 cr50_utils.CR50_PREPVT) 280 281 chip_bid_info = self._original_state['chip_bid'] 282 bid_is_erased = chip_bid_info == cr50_utils.ERASED_CHIP_BID 283 chip_bid = None if bid_is_erased else chip_bid_info[0] 284 chip_flags = None if bid_is_erased else chip_bid_info[2] 285 # Update to the original image and erase the board id 286 self._restore_original_image(chip_bid, chip_flags) 287 288 # Set the RLZ code 289 cr50_utils.SetRLZ(self.host, self._original_state['rlz']) 290 291 # Verify everything is still the same 292 mismatch = self._check_original_state() 293 if mismatch: 294 raise error.TestError('Could not restore state: %s' % mismatch) 295 296 logging.info('Successfully restored the original cr50 state') 297 298 299 def get_cr50_device_state(self): 300 """Get a dict with the current device cr50 information. 301 302 The state dict will include the platform brand, rlz code, chip board id, 303 the running cr50 image version, the running cr50 image board id, and the 304 device cr50 image version. 305 """ 306 state = {} 307 state['mosys platform brand'] = self.host.run('mosys platform brand', 308 ignore_status=True).stdout.strip() 309 state['device_prod_ver'] = cr50_utils.GetBinVersion(self.host, 310 cr50_utils.CR50_PROD) 311 state['has_prepvt'] = cr50_utils.HasPrepvtImage(self.host) 312 if state['has_prepvt']: 313 state['device_prepvt_ver'] = cr50_utils.GetBinVersion(self.host, 314 cr50_utils.CR50_PREPVT) 315 else: 316 state['device_prepvt_ver'] = None 317 state['rlz'] = cr50_utils.GetRLZ(self.host) 318 state['chip_bid'] = cr50_utils.GetChipBoardId(self.host) 319 state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid'] 320 state['running_ver'] = cr50_utils.GetRunningVersion(self.host) 321 state['cr50_image_bid'] = self.cr50.get_active_board_id_str() 322 323 logging.debug('Current Cr50 state:\n%s', pprint.pformat(state)) 324 return state 325 326 327 def _check_original_state(self): 328 """Compare the current cr50 state to the original state. 329 330 Returns: 331 A dictionary with the state that is wrong as the key and 332 the new and old state as the value 333 """ 334 if not (self._saved_state & self.INITIAL_STATE): 335 logging.warning('Did not save the original state. Cannot verify it ' 336 'matches') 337 return 338 # Make sure the /var/cache/cr50* state is up to date. 339 cr50_utils.ClearUpdateStateAndReboot(self.host) 340 341 mismatch = {} 342 new_state = self.get_cr50_device_state() 343 344 for k, new_val in new_state.iteritems(): 345 original_val = self._original_state[k] 346 if new_val != original_val: 347 mismatch[k] = 'old: %s, new: %s' % (original_val, new_val) 348 349 if mismatch: 350 logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch)) 351 else: 352 logging.info('The device is in the original state') 353 return mismatch 354 355 356 def _reset_ccd_settings(self): 357 """Reset the ccd lock and capability states.""" 358 # Clear the password if one was set. 359 if self.cr50.get_ccd_info()['Password'] != 'none': 360 self.servo.set_nocheck('cr50_testlab', 'open') 361 self.cr50.send_command('ccd reset') 362 if self.cr50.get_ccd_info()['Password'] != 'none': 363 raise error.TestFail('Could not clear password') 364 365 current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING) 366 if self.original_ccd_settings != current_settings: 367 if not self.can_set_ccd_level: 368 raise error.TestError("CCD state has changed, but we can't " 369 "restore it") 370 self.fast_open(True) 371 self.cr50.set_caps(self.original_ccd_settings) 372 373 # First try using testlab open to open the device 374 if self.cr50.testlab_is_on() and self.original_ccd_level == 'open': 375 self.servo.set_nocheck('cr50_testlab', 'open') 376 if self.original_ccd_level != self.cr50.get_ccd_level(): 377 self.cr50.set_ccd_level(self.original_ccd_level) 378 379 380 381 def cleanup(self): 382 """Attempt to cleanup the cr50 state. Then run firmware cleanup""" 383 try: 384 self._restore_cr50_state() 385 finally: 386 super(Cr50Test, self).cleanup() 387 388 # Check the logs captured during firmware_test cleanup for cr50 errors. 389 self._get_cr50_stats_from_uart_capture() 390 391 392 def _get_cr50_stats_from_uart_capture(self): 393 """Check cr50 uart output for errors. 394 395 Use the logs captured during firmware_test cleanup to check for cr50 396 errors. Flash operation issues aren't obvious unless you check the logs. 397 All flash op errors print do_flash_op and it isn't printed during normal 398 operation. Open the cr50 uart file and count the number of times this is 399 printed. Log the number of errors. 400 """ 401 if not hasattr(self, 'cr50_uart_file'): 402 logging.info('There is not a cr50 uart file') 403 return 404 405 flash_error_count = 0 406 with open(self.cr50_uart_file, 'r') as f: 407 for line in f: 408 if self.CR50_FLASH_OP_ERROR_MSG in line: 409 flash_error_count += 1 410 411 # Log any flash operation errors. 412 logging.info('do_flash_op count: %d', flash_error_count) 413 414 415 def _restore_cr50_state(self): 416 """Restore cr50 state, so the device can be used for further testing""" 417 state_mismatch = self._check_original_state() 418 if state_mismatch and not self._provision_update: 419 self._restore_original_state() 420 if self._raise_error_on_mismatch: 421 raise error.TestError('Unexpected state mismatch during ' 422 'cleanup %s' % state_mismatch) 423 424 # Try to open cr50 and enable testlab mode if it isn't enabled. 425 try: 426 self.fast_open(True) 427 except: 428 # Even if we can't open cr50, do our best to reset the rest of the 429 # system state. Log a warning here. 430 logging.warning('Unable to Open cr50', exc_info=True) 431 # Reset the password as the first thing in cleanup. It is important that 432 # if some other part of cleanup fails, the password has at least been 433 # reset. 434 self.cr50.send_command('rddkeepalive disable') 435 self.cr50.send_command('ccd reset') 436 self.cr50.send_command('wp follow_batt_pres atboot') 437 438 # Reboot cr50 if the console is accessible. This will reset most state. 439 if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]: 440 self.cr50.reboot() 441 442 # Reenable servo v4 CCD 443 self.cr50.ccd_enable() 444 445 # reboot to normal mode if the device is in dev mode. 446 self.enter_mode_after_checking_tpm_state('normal') 447 448 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 449 self.clear_fwmp() 450 451 # Restore the ccd privilege level 452 if hasattr(self, 'original_ccd_level'): 453 self._reset_ccd_settings() 454 455 456 def find_cr50_gs_image(self, filename, image_type=None): 457 """Find the cr50 gs image name. 458 459 Args: 460 filename: the cr50 filename to match to 461 image_type: release or debug. If it is not specified we will search 462 both the release and debug directories 463 Returns: 464 a tuple of the gsutil bucket, filename 465 """ 466 gs_url = self.CR50_GS_URL % (image_type if image_type else '*') 467 gs_filename = os.path.join(gs_url, filename) 468 bucket, gs_filename = utils.gs_ls(gs_filename)[0].rsplit('/', 1) 469 return bucket, gs_filename 470 471 472 def download_cr50_gs_image(self, filename, image_bid='', bucket=None, 473 image_type=None): 474 """Get the image from gs and save it in the autotest dir. 475 476 Args: 477 filename: The cr50 image basename 478 image_bid: the board id info list or string. It will be added to the 479 filename. 480 bucket: The gs bucket name 481 image_type: 'debug' or 'release'. This will be used to determine 482 the bucket if the bucket is not given. 483 Returns: 484 A tuple with the local path and version 485 """ 486 # Add the image bid string to the filename 487 if image_bid: 488 bid_str = cr50_utils.GetBoardIdInfoString(image_bid, 489 symbolic=True) 490 filename += '.' + bid_str.replace(':', '_') 491 492 if not bucket: 493 bucket, filename = self.find_cr50_gs_image(filename, image_type) 494 495 remote_temp_dir = '/tmp/' 496 src = os.path.join(remote_temp_dir, filename) 497 dest = os.path.join(self.resultsdir, filename) 498 499 # Copy the image to the dut 500 gsutil_wrapper.copy_private_bucket(host=self.host, 501 bucket=bucket, 502 filename=filename, 503 destination=remote_temp_dir) 504 505 self.host.get_file(src, dest) 506 ver = cr50_utils.GetBinVersion(self.host, src) 507 508 # Compare the image board id to the downloaded image to make sure we got 509 # the right file 510 downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True) 511 if image_bid and bid_str != downloaded_bid: 512 raise error.TestError('Could not download image with matching ' 513 'board id wanted %s got %s' % (bid_str, 514 downloaded_bid)) 515 return dest, ver 516 517 518 def download_cr50_debug_image(self, devid, image_bid=''): 519 """download the cr50 debug file. 520 521 Get the file with the matching devid and image board id info 522 523 Args: 524 devid: the cr50_devid string '${DEVID0} ${DEVID1}' 525 image_bid: the image board id info string or list 526 Returns: 527 A tuple with the debug image local path and version 528 """ 529 # Debug images are node locked with the devid. Add the devid to the 530 # filename 531 filename = self.CR50_DEBUG_FILE % (devid.replace(' ', '_')) 532 533 # Download the image 534 dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid, 535 image_type='debug') 536 537 return dest, ver 538 539 540 def download_cr50_release_image(self, image_rw, image_bid=''): 541 """download the cr50 release file. 542 543 Get the file with the matching version and image board id info 544 545 Args: 546 image_rw: the rw version string 547 image_bid: the image board id info string or list 548 Returns: 549 A tuple with the release image local path and version 550 """ 551 # Release images can be found using the rw version 552 filename = self.CR50_PROD_FILE % image_rw 553 554 # Download the image 555 dest, ver = self.download_cr50_gs_image(filename, image_bid=image_bid, 556 image_type='release') 557 558 # Compare the rw version and board id info to make sure the right image 559 # was found 560 if image_rw != ver[1]: 561 raise error.TestError('Could not download image with matching ' 562 'rw version') 563 return dest, ver 564 565 566 def _cr50_verify_update(self, expected_rw, expect_rollback): 567 """Verify the expected version is running on cr50. 568 569 Args: 570 expected_rw: The RW version string we expect to be running 571 expect_rollback: True if cr50 should have rolled back during the 572 update 573 574 Raises: 575 TestFail if there is any unexpected update state 576 """ 577 errors = [] 578 running_rw = self.cr50.get_version() 579 if expected_rw != running_rw: 580 errors.append('running %s not %s' % (running_rw, expected_rw)) 581 582 if expect_rollback != self.cr50.rolledback(): 583 errors.append('%srollback detected' % 584 'no ' if expect_rollback else '') 585 if len(errors): 586 raise error.TestFail('cr50_update failed: %s' % ', '.join(errors)) 587 logging.info('RUNNING %s after %s', expected_rw, 588 'rollback' if expect_rollback else 'update') 589 590 591 def _cr50_run_update(self, path): 592 """Install the image at path onto cr50. 593 594 Args: 595 path: the location of the image to update to 596 597 Returns: 598 the rw version of the image 599 """ 600 tmp_dest = '/tmp/' + os.path.basename(path) 601 602 dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest) 603 cr50_utils.GSCTool(self.host, ['-a', dest]) 604 return image_ver[1] 605 606 607 def cr50_update(self, path, rollback=False, erase_nvmem=False, 608 expect_rollback=False, chip_bid=None, chip_flags=None): 609 """Attempt to update to the given image. 610 611 If rollback is True, we assume that cr50 is already running an image 612 that can rollback. 613 614 Args: 615 path: the location of the update image 616 rollback: True if we need to force cr50 to rollback to update to 617 the given image 618 erase_nvmem: True if we need to erase nvmem during rollback 619 expect_rollback: True if cr50 should rollback on its own 620 chip_bid: the integer representation of chip board id or None if the 621 board id should be erased during rollback 622 chip_flags: the integer representation of chip board id flags or 623 None if the board id should be erased during rollback 624 625 Raises: 626 TestFail if the update failed 627 """ 628 original_rw = self.cr50.get_version() 629 630 # Cr50 is going to reject an update if it hasn't been up for more than 631 # 60 seconds. Wait until that passes before trying to run the update. 632 self.cr50.wait_until_update_is_allowed() 633 634 image_rw = self._cr50_run_update(path) 635 636 # Running the update may cause cr50 to reboot. Wait for that before 637 # sending more commands. The reboot should happen quickly. Wait a 638 # maximum of 10 seconds. 639 self.cr50.wait_for_reboot(timeout=10) 640 641 if erase_nvmem and rollback: 642 self.cr50.erase_nvmem() 643 644 if rollback: 645 self.cr50.rollback(chip_bid=chip_bid, chip_flags=chip_flags) 646 647 expected_rw = original_rw if expect_rollback else image_rw 648 # If we expect a rollback, the version should remain unchanged 649 self._cr50_verify_update(expected_rw, rollback or expect_rollback) 650 651 652 def ccd_open_from_ap(self): 653 """Start the open process and press the power button.""" 654 self._ccd_open_last_len = 0 655 656 self._ccd_open_stdout = StringIO.StringIO() 657 658 ccd_open_cmd = utils.sh_escape('gsctool -a -o') 659 full_ssh_cmd = '%s "%s"' % (self.host.ssh_command(options='-tt'), 660 ccd_open_cmd) 661 # Start running the Cr50 Open process in the background. 662 self._ccd_open_job = utils.BgJob(full_ssh_cmd, 663 nickname='ccd_open', 664 stdout_tee=self._ccd_open_stdout, 665 stderr_tee=utils.TEE_TO_LOGS) 666 667 if self._ccd_open_job == None: 668 raise error.TestFail('could not start ccd open') 669 670 try: 671 # Wait for the first gsctool power button prompt before starting the 672 # open process. 673 logging.info(self._get_ccd_open_output()) 674 # Cr50 starts out by requesting 5 quick presses then 4 longer 675 # power button presses. Run the quick presses without looking at the 676 # command output, because getting the output can take some time. For 677 # the presses that require a 1 minute wait check the output between 678 # presses, so we can catch errors 679 # 680 # run quick presses for 30 seconds. It may take a couple of seconds 681 # for open to start. 10 seconds should be enough. 30 is just used 682 # because it will definitely be enough, and this process takes 300 683 # seconds, so doing quick presses for 30 seconds won't matter. 684 end_time = time.time() + 30 685 while time.time() < end_time: 686 self.servo.power_short_press() 687 logging.info('short int power button press') 688 time.sleep(self.PP_SHORT_INTERVAL) 689 # Poll the output and press the power button for the longer presses. 690 utils.wait_for_value(self._check_open_and_press_power_button, 691 expected_value=True, timeout_sec=self.cr50.PP_LONG) 692 except Exception, e: 693 logging.info(e) 694 raise 695 finally: 696 self._close_ccd_open_job() 697 logging.info(self.cr50.get_ccd_info()) 698 699 700 def _check_open_and_press_power_button(self): 701 """Check stdout and press the power button if prompted. 702 703 Returns: 704 True if the process is still running. 705 """ 706 logging.info(self._get_ccd_open_output()) 707 self.servo.power_short_press() 708 logging.info('long int power button press') 709 # Give cr50 some time to complete the open process. After the last 710 # power button press cr50 erases nvmem and resets the dut before setting 711 # the state to open. Wait a bit so we don't check the ccd state in the 712 # middle of this reset process. Power button requests happen once a 713 # minute, so waiting 10 seconds isn't a big deal. 714 time.sleep(10) 715 return (self._ccd_open_job.sp.poll() is not None or 'Open' in 716 self.cr50.get_ccd_info()['State']) 717 718 719 def _get_ccd_open_output(self): 720 """Read the new output.""" 721 self._ccd_open_job.process_output() 722 self._ccd_open_stdout.seek(self._ccd_open_last_len) 723 output = self._ccd_open_stdout.read() 724 self._ccd_open_last_len = self._ccd_open_stdout.len 725 return output 726 727 728 def _close_ccd_open_job(self): 729 """Terminate the process and check the results.""" 730 exit_status = utils.nuke_subprocess(self._ccd_open_job.sp) 731 stdout = self._ccd_open_stdout.getvalue().strip() 732 delattr(self, '_ccd_open_job') 733 if stdout: 734 logging.info('stdout of ccd open:\n%s', stdout) 735 if exit_status: 736 logging.info('exit status: %d', exit_status) 737 if 'Error' in stdout: 738 raise error.TestFail('ccd open Error %s' % 739 stdout.split('Error')[-1]) 740 if self.cr50.OPEN != self.cr50.get_ccd_level(): 741 raise error.TestFail('unable to open cr50: %s' % stdout) 742 else: 743 logging.info('Opened Cr50') 744 745 746 def fast_open(self, enable_testlab=False): 747 """Try to use testlab open. If that fails, do regular ap open. 748 749 Args: 750 enable_testlab: If True, enable testlab mode after cr50 is open. 751 """ 752 # Try to use testlab open first, so we don't have to wait for the 753 # physical presence check. 754 self.cr50.send_command('ccd testlab open') 755 if self.cr50.get_ccd_level() == 'open': 756 return 757 758 # Use the console to open cr50 without entering dev mode if possible. It 759 # takes longer and relies on more systems to enter dev mode and ssh into 760 # the AP. Skip the steps that aren't required. 761 if not self.cr50.get_cap('OpenNoDevMode')[self.cr50.CAP_IS_ACCESSIBLE]: 762 self.enter_mode_after_checking_tpm_state('dev') 763 764 if self.cr50.get_cap('OpenFromUSB')[self.cr50.CAP_IS_ACCESSIBLE]: 765 self.cr50.set_ccd_level(self.cr50.OPEN) 766 else: 767 self.ccd_open_from_ap() 768 769 if enable_testlab: 770 self.cr50.set_ccd_testlab('on') 771 772 # Make sure the device is in normal mode. After opening cr50, the TPM 773 # should be cleared and the device should automatically reset to normal 774 # mode. Just check to be consistent. It's possible capabilitiy settings 775 # are set to skip wiping the TPM. 776 self.enter_mode_after_checking_tpm_state('normal') 777 778 779 def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error): 780 """Run a gsctool command and input the password 781 782 Args: 783 password: The cr50 password string 784 cmd: The gsctool command 785 name: The name to give the job 786 expect_error: True if the command should fail 787 """ 788 set_pwd_cmd = utils.sh_escape(cmd) 789 full_ssh_command = '%s "%s"' % (self.host.ssh_command(options='-tt'), 790 set_pwd_cmd) 791 stdout = StringIO.StringIO() 792 # Start running the gsctool Command in the background. 793 gsctool_job = utils.BgJob(full_ssh_command, 794 nickname='%s_with_password' % name, 795 stdout_tee=stdout, 796 stderr_tee=utils.TEE_TO_LOGS, 797 stdin=subprocess.PIPE) 798 if gsctool_job == None: 799 raise error.TestFail('could not start gsctool command %r', cmd) 800 801 try: 802 # Wait for enter prompt 803 gsctool_job.process_output() 804 logging.info(stdout.getvalue().strip()) 805 # Enter the password 806 gsctool_job.sp.stdin.write(password + '\n') 807 808 # Wait for re-enter prompt 809 gsctool_job.process_output() 810 logging.info(stdout.getvalue().strip()) 811 # Re-enter the password 812 gsctool_job.sp.stdin.write(password + '\n') 813 time.sleep(self.cr50.CONSERVATIVE_CCD_WAIT) 814 gsctool_job.process_output() 815 finally: 816 exit_status = utils.nuke_subprocess(gsctool_job.sp) 817 output = stdout.getvalue().strip() 818 logging.info('%s stdout: %s', name, output) 819 logging.info('%s exit status: %s', name, exit_status) 820 if exit_status: 821 message = ('gsctool %s failed using %r: %s %s' % 822 (name, password, exit_status, output)) 823 if expect_error: 824 logging.info(message) 825 else: 826 raise error.TestFail(message) 827 elif expect_error: 828 raise error.TestFail('%s with %r did not fail when expected' % 829 (name, password)) 830 831 832 def set_ccd_password(self, password, expect_error=False): 833 """Set the ccd password""" 834 # If for some reason the test sets a password and is interrupted before 835 # we can clear it, we want testlab mode to be enabled, so it's possible 836 # to clear the password without knowing it. 837 if not self.cr50.testlab_is_on(): 838 raise error.TestError('Will not set password unless testlab mode ' 839 'is enabled.') 840 self.run_gsctool_cmd_with_password( 841 password, 'gsctool -a -P', 'set_password', expect_error) 842 843 844 def ccd_unlock_from_ap(self, password=None, expect_error=False): 845 """Unlock cr50""" 846 if not password: 847 self.host.run('gsctool -a -U') 848 return 849 self.run_gsctool_cmd_with_password( 850 password, 'gsctool -a -U', 'unlock', expect_error) 851 852 853 def enter_mode_after_checking_tpm_state(self, mode): 854 """Reboot to mode if cr50 doesn't already match the state""" 855 # If the device is already in the correct mode, don't do anything 856 if (mode == 'dev') == self.cr50.in_dev_mode(): 857 logging.info('already in %r mode', mode) 858 return 859 860 self.switcher.reboot_to_mode(to_mode=mode) 861 862 if (mode == 'dev') != self.cr50.in_dev_mode(): 863 raise error.TestError('Unable to enter %r mode' % mode) 864 865 866 def _fwmp_is_cleared(self): 867 """Return True if the FWMP has been created""" 868 res = self.host.run('cryptohome ' 869 '--action=get_firmware_management_parameters', 870 ignore_status=True) 871 if res.exit_status and res.exit_status != self.CLEARED_FWMP_EXIT_STATUS: 872 raise error.TestError('Could not run cryptohome command %r' % res) 873 return self.CLEARED_FWMP_ERROR_MSG in res.stdout 874 875 876 def clear_fwmp(self): 877 """Clear the FWMP""" 878 if self._fwmp_is_cleared(): 879 return 880 status = self.host.run('cryptohome --action=tpm_status').stdout 881 logging.debug(status) 882 if 'TPM Owned: true' not in status: 883 self.host.run('cryptohome --action=tpm_take_ownership') 884 self.host.run('cryptohome --action=tpm_wait_ownership') 885 self.host.run('cryptohome ' 886 '--action=remove_firmware_management_parameters') 887 888 def tpm_is_responsive(self): 889 """Check TPM responsiveness by running tpm_version.""" 890 result = self.host.run('tpm_version', ignore_status=True) 891 logging.debug(result.stdout.strip()) 892 return not result.exit_status 893