1# Lint as: python2, python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import print_function 7 8import logging 9import os 10import pprint 11import six 12import time 13 14from autotest_lib.client.bin import utils 15from autotest_lib.client.common_lib import error, utils 16from autotest_lib.client.common_lib.cros import cr50_utils, tpm_utils 17from autotest_lib.server.cros import filesystem_util, gsutil_wrapper 18from autotest_lib.server.cros.faft.firmware_test import FirmwareTest 19 20 21class Cr50Test(FirmwareTest): 22 """Base class that sets up helper objects/functions for cr50 tests.""" 23 version = 1 24 25 RELEASE_POOLS = ['faft-cr50-experimental', 'faft-cr50'] 26 RESPONSE_TIMEOUT = 180 27 GS_PRIVATE = 'gs://chromeos-localmirror-private/distfiles/' 28 # Prod signed test images are stored in the private cr50 directory. 29 GS_PRIVATE_PROD = GS_PRIVATE + 'cr50/' 30 # Node locked test images are in this private debug directory. 31 GS_PRIVATE_DBG = GS_PRIVATE + 'chromeos-cr50-debug-0.0.11/' 32 GS_PUBLIC = 'gs://chromeos-localmirror/distfiles/' 33 CR50_PROD_FILE = 'cr50.r0.0.1*.w%s%s.tbz2' 34 CR50_DEBUG_FILE = '*/cr50.dbg.%s.bin.*%s' 35 CR50_ERASEFLASHINFO_FILE = ( 36 '*/cr50_Unknown_NodeLocked-%s_cr50-accessory-mp.bin') 37 CR50_QUAL_VERSION_FILE = 'chromeos-cr50-QUAL_VERSION' 38 NONE = 0 39 # Saved the original device state during init. 40 INITIAL_IMAGE_STATE = 1 << 0 41 # Saved the original image, the device image, and the debug image. These 42 # images are needed to be able to restore the original image and board id. 43 DEVICE_IMAGES = 1 << 1 44 DBG_IMAGE = 1 << 2 45 ERASEFLASHINFO_IMAGE = 1 << 3 46 # Different attributes of the device state require the test to download 47 # different images. STATE_IMAGE_RESTORES is a dictionary of the state each 48 # image type can restore. 49 STATE_IMAGE_RESTORES = { 50 DEVICE_IMAGES: ['prod_version', 'prepvt_version'], 51 DBG_IMAGE: ['running_image_ver', 'running_image_bid', 'chip_bid'], 52 ERASEFLASHINFO_IMAGE: ['chip_bid'], 53 } 54 PP_SHORT_INTERVAL = 3 55 56 def initialize(self, 57 host, 58 cmdline_args, 59 full_args, 60 restore_cr50_image=False, 61 restore_cr50_board_id=False, 62 provision_update=False): 63 self._saved_state = self.NONE 64 self._raise_error_on_mismatch = not restore_cr50_image 65 self._provision_update = provision_update 66 self.tot_test_run = full_args.get('tot_test_run', '').lower() == 'true' 67 super(Cr50Test, self).initialize(host, cmdline_args) 68 69 if not hasattr(self, 'cr50'): 70 raise error.TestNAError('Test can only be run on devices with ' 71 'access to the Cr50 console') 72 # TODO(b/149948314): remove when dual-v4 is sorted out. 73 if 'ccd' in self.servo.get_servo_version(): 74 self.servo.disable_ccd_watchdog_for_test() 75 76 logging.info('Test Args: %r', full_args) 77 78 self._devid = self.cr50.get_devid() 79 self.can_set_ccd_level = (not self.servo.main_device_is_ccd() 80 or self.cr50.testlab_is_on()) 81 self.original_ccd_level = self.cr50.get_ccd_level() 82 self.original_ccd_settings = self.cr50.get_cap_dict( 83 info=self.cr50.CAP_SETTING) 84 85 self.host = host 86 # SSH commands should complete within 3 minutes. Change the default, so 87 # it doesn't take half an hour for commands to timeout when the DUT is 88 # down. 89 self.host.set_default_run_timeout(180) 90 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 91 # Clear the FWMP, so it can't disable CCD. 92 self.clear_fwmp() 93 94 # TODO(b/218492933) : find better way to disable rddkeepalive 95 # Disable rddkeepalive, so the test can disable ccd. 96 self.cr50.send_command('ccd testlab open') 97 self.cr50.send_command('rddkeepalive disable') 98 # faft-cr50 locks and reopens ccd. This will restrict some capabilities 99 # c2d2 uses to control the duts. Set the capabilities to Always, so 100 # individiual tests don't need to care that much. 101 self.cr50.enable_servo_control_caps() 102 103 if self.can_set_ccd_level: 104 # Lock cr50 so the console will be restricted 105 self.cr50.set_ccd_level('lock') 106 elif self.original_ccd_level != 'lock': 107 raise error.TestNAError( 108 'Lock the console before running cr50 test') 109 110 self._save_original_state(full_args.get('release_path', '')) 111 112 # Try and download all images necessary to restore cr50 state. 113 try: 114 self._save_dbg_image(full_args.get('cr50_dbg_image_path', '')) 115 self._saved_state |= self.DBG_IMAGE 116 except Exception as e: 117 logging.warning('Error saving DBG image: %s', str(e)) 118 if restore_cr50_image: 119 raise error.TestNAError('Need DBG image: %s' % str(e)) 120 121 try: 122 self._save_eraseflashinfo_image( 123 full_args.get('cr50_eraseflashinfo_image_path', '')) 124 if self.cr50.uses_board_property('BOARD_EC_CR50_COMM_SUPPORT'): 125 raise error.TestError('Board cannot boot EFI image') 126 self._saved_state |= self.ERASEFLASHINFO_IMAGE 127 except Exception as e: 128 logging.warning('Error saving eraseflashinfo image: %s', str(e)) 129 if restore_cr50_board_id: 130 raise error.TestNAError( 131 'Need eraseflashinfo image: %s' % str(e)) 132 133 # TODO(b/143888583): remove qual update during init once new design to 134 # to provision cr50 updates is in place. 135 # Make sure the release image is running before starting the test. 136 is_release_qual = full_args.get('is_release_qual', 137 '').lower() == 'true' 138 if is_release_qual or self.running_cr50_release_suite(): 139 release_ver_arg = full_args.get('release_ver', '') 140 release_path_arg = full_args.get('release_path', '') 141 self.ensure_qual_image_is_running(release_ver_arg, 142 release_path_arg) 143 144 def running_cr50_release_suite(self): 145 """Return True if the DUT is in a release pool.""" 146 for pool in self.host.host_info_store.get().pools: 147 # TODO(b/149109740): remove once the pool values are verified. 148 # Change to run with faft-cr50 and faft-cr50-experimental suites. 149 logging.info('Checking pool: %s', pool) 150 if pool in self.RELEASE_POOLS: 151 logging.info('Running a release test.') 152 return True 153 return False 154 155 def ensure_qual_image_is_running(self, qual_ver_str, qual_path): 156 """Update to the qualification image if it's not running. 157 158 qual_ver_str and path are command line args that may be supplied to 159 specify a local version or path. If neither are supplied, the version 160 from gs will be used to determine what version cr50 should run. 161 162 qual_ver_str and qual_path should not be supplied together. If they are, 163 the path will be used. It's not a big deal as long as they agree with 164 each other. 165 166 @param qual_ver_str: qualification version string or None. 167 @param qual_path: local path to the qualification image or None. 168 """ 169 # Get the local image information. 170 if qual_path: 171 dest, qual_ver = cr50_utils.InstallImage(self.host, qual_path, 172 '/tmp/qual_cr50.bin') 173 self.host.run('rm ' + dest) 174 qual_bid_str = (cr50_utils.GetBoardIdInfoString( 175 qual_ver[2], False) if qual_ver[2] else '') 176 qual_ver_str = '%s/%s' % (qual_ver[1], qual_bid_str) 177 178 # Determine the qualification version from. 179 if not qual_ver_str: 180 gsurl = os.path.join(self.GS_PRIVATE, self.CR50_QUAL_VERSION_FILE) 181 dut_path = self.download_cr50_gs_file(gsurl, False)[1] 182 qual_ver_str = self.host.run('cat ' + dut_path).stdout.strip() 183 184 # Download the qualification image based on the version. 185 if not qual_path: 186 rw, bid = qual_ver_str.split('/') 187 qual_path, qual_ver = self.download_cr50_release_image(rw, bid) 188 189 logging.info('Cr50 Qual Version: %s', qual_ver_str) 190 logging.info('Cr50 Qual Path: %s', qual_path) 191 qual_chip_bid = cr50_utils.GetChipBIDFromImageBID( 192 qual_ver[2], self.get_device_brand()) 193 logging.info('Cr50 Qual Chip BID: %s', qual_chip_bid) 194 195 # Replace only the prod or prepvt image based on the major version. 196 if int(qual_ver[1].split('.')[1]) % 2: 197 prod_ver = qual_ver 198 prepvt_ver = self._original_image_state['prepvt_version'] 199 prod_path = qual_path 200 prepvt_path = self._device_prepvt_image 201 else: 202 prod_ver = self._original_image_state['prod_version'] 203 prepvt_ver = qual_ver 204 prod_path = self._device_prod_image 205 prepvt_path = qual_path 206 207 # Generate a dictionary with all of the expected state. 208 qual_state = {} 209 qual_state['prod_version'] = prod_ver 210 qual_state['prepvt_version'] = prepvt_ver 211 qual_state['chip_bid'] = qual_chip_bid 212 qual_state['running_image_bid'] = qual_ver[2] 213 # The test can't rollback RO. The newest RO should be running at the end 214 # of the test. max_ro will be none if the versions are the same. Use the 215 # running_ro in that case. 216 running_ro = self.get_saved_cr50_original_version()[0] 217 max_ro = cr50_utils.GetNewestVersion(running_ro, qual_ver[0]) 218 qual_state['running_image_ver'] = (max_ro or running_ro, qual_ver[1], 219 None) 220 mismatch = self._check_running_image_and_board_id(qual_state) 221 if not mismatch: 222 logging.info('Running qual image. No update needed.') 223 return 224 logging.info('Cr50 qual update required.') 225 self.make_rootfs_writable() 226 self._update_device_images_and_running_cr50_firmware( 227 qual_state, qual_path, prod_path, prepvt_path) 228 logging.info("Recording qual device state as 'original' device state") 229 self._save_original_state(qual_path) 230 231 def make_rootfs_writable(self): 232 """Make rootfs writeable. Recover the dut if necessary.""" 233 path = None 234 try: 235 filesystem_util.make_rootfs_writable(self.host) 236 return 237 except error.AutoservRunError as e: 238 if 'cannot remount' not in e.result_obj.stderr: 239 raise 240 path = e.result_obj.stderr.partition( 241 'cannot remount')[2].split()[0] 242 # This shouldn't be possible. 243 if not path: 244 raise error.TestError('Need path to repair filesystem') 245 logging.info('repair %s', path) 246 # Repair the block. Assume yes to all questions. The exit status will be 247 # 3, so ignore errors. make_rootfs_writable will fail if something 248 # actually went wrong. 249 self.host.run('e2fsck -y %s' % path, ignore_status=True) 250 self.host.reboot() 251 filesystem_util.make_rootfs_writable(self.host) 252 253 def _saved_cr50_state(self, state): 254 """Returns True if the test has saved the given state 255 256 @param state: a integer representing the state to check. 257 """ 258 return state & self._saved_state 259 260 def after_run_once(self): 261 """Log which iteration just ran""" 262 logging.info('successfully ran iteration %d', self.iteration) 263 self._try_to_bring_dut_up() 264 265 def _save_dbg_image(self, cr50_dbg_image_path): 266 """Save or download the node locked dev image. 267 268 @param cr50_dbg_image_path: The path to the node locked cr50 image. 269 """ 270 if os.path.isfile(cr50_dbg_image_path): 271 self._dbg_image_path = cr50_dbg_image_path 272 else: 273 self._dbg_image_path = self.download_cr50_debug_image()[0] 274 275 def _save_eraseflashinfo_image(self, cr50_eraseflashinfo_image_path): 276 """Save or download the node locked eraseflashinfo image. 277 278 @param cr50_eraseflashinfo_image_path: The path to the node locked cr50 279 image. 280 """ 281 if os.path.isfile(cr50_eraseflashinfo_image_path): 282 self._eraseflashinfo_image_path = cr50_eraseflashinfo_image_path 283 else: 284 self._eraseflashinfo_image_path = ( 285 self.download_cr50_eraseflashinfo_image()[0]) 286 287 def _save_device_image(self, ext): 288 """Download the .prod or .prepvt device image and get the version. 289 290 @param ext: The Cr50 file extension: prod or prepvt. 291 @returns (local_path, rw_version, bid_string) or (None, None, None) if 292 the file doesn't exist on the DUT. 293 """ 294 version = self._original_image_state[ext + '_version'] 295 if not version: 296 return None, None, None 297 _, rw_ver, bid = version 298 rw_filename = 'cr50.device.bin.%s.%s' % (ext, rw_ver) 299 local_path = os.path.join(self.resultsdir, rw_filename) 300 dut_path = cr50_utils.GetDevicePath(ext) 301 self.host.get_file(dut_path, local_path) 302 bid = cr50_utils.GetBoardIdInfoString(bid) 303 return local_path, rw_ver, bid 304 305 def _save_original_images(self, release_path): 306 """Use the saved state to find all of the device images. 307 308 This will download running cr50 image and the device image. 309 310 @param release_path: The release path given by test args 311 """ 312 local_path, prod_rw, prod_bid = self._save_device_image('prod') 313 self._device_prod_image = local_path 314 315 local_path, prepvt_rw, prepvt_bid = self._save_device_image('prepvt') 316 self._device_prepvt_image = local_path 317 318 if os.path.isfile(release_path): 319 self._original_cr50_image = release_path 320 logging.info('using supplied image') 321 return 322 if self.tot_test_run: 323 self._original_cr50_image = self.download_cr50_tot_image() 324 return 325 326 # If the running cr50 image version matches the image on the DUT use 327 # the DUT image as the original image. If the versions don't match 328 # download the image from google storage 329 _, running_rw, running_bid = self.get_saved_cr50_original_version() 330 331 # Convert the running board id to the same format as the prod and 332 # prepvt board ids. 333 running_bid = cr50_utils.GetBoardIdInfoString(running_bid) 334 if running_rw == prod_rw and running_bid == prod_bid: 335 logging.info('Using device cr50 prod image %s %s', prod_rw, 336 prod_bid) 337 self._original_cr50_image = self._device_prod_image 338 elif running_rw == prepvt_rw and running_bid == prepvt_bid: 339 logging.info('Using device cr50 prepvt image %s %s', prepvt_rw, 340 prepvt_bid) 341 self._original_cr50_image = self._device_prepvt_image 342 else: 343 logging.info('Downloading cr50 image %s %s', running_rw, 344 running_bid) 345 self._original_cr50_image = self.download_cr50_release_image( 346 running_rw, running_bid)[0] 347 348 def _save_original_state(self, release_path): 349 """Save the cr50 related state. 350 351 Save the device's current cr50 version, cr50 board id, the running cr50 352 image, the prepvt, and prod cr50 images. These will be used to restore 353 the cr50 state during cleanup. 354 355 @param release_path: the optional command line arg of path for the local 356 cr50 image. 357 """ 358 self._saved_state &= ~self.INITIAL_IMAGE_STATE 359 self._original_image_state = self.get_image_and_bid_state() 360 # We successfully saved the device state 361 self._saved_state |= self.INITIAL_IMAGE_STATE 362 self._saved_state &= ~self.DEVICE_IMAGES 363 try: 364 self._save_original_images(release_path) 365 self._saved_state |= self.DEVICE_IMAGES 366 except Exception as e: 367 logging.warning('Error saving ChromeOS image cr50 firmware: %s', 368 str(e)) 369 370 def get_saved_cr50_original_version(self): 371 """Return (ro ver, rw ver, bid).""" 372 if ('running_image_ver' not in self._original_image_state 373 or 'running_image_bid' not in self._original_image_state): 374 raise error.TestError('No record of original cr50 image version') 375 return (self._original_image_state['running_image_ver'][0], 376 self._original_image_state['running_image_ver'][1], 377 self._original_image_state['running_image_bid']) 378 379 def get_saved_cr50_original_path(self): 380 """Return the local path for the original cr50 image.""" 381 if not hasattr(self, '_original_cr50_image'): 382 raise error.TestError('No record of original image') 383 return self._original_cr50_image 384 385 def has_saved_dbg_image_path(self): 386 """Returns true if we saved the node locked debug image.""" 387 return hasattr(self, '_dbg_image_path') 388 389 def get_saved_dbg_image_path(self): 390 """Return the local path for the cr50 dev image.""" 391 if not self.has_saved_dbg_image_path(): 392 raise error.TestError('No record of debug image') 393 return self._dbg_image_path 394 395 def get_saved_eraseflashinfo_image_path(self): 396 """Return the local path for the cr50 eraseflashinfo image.""" 397 if not hasattr(self, '_eraseflashinfo_image_path'): 398 raise error.TestError('No record of eraseflashinfo image') 399 return self._eraseflashinfo_image_path 400 401 def get_device_brand(self): 402 """Returns the 4 character device brand.""" 403 return self._original_image_state['cros_config / brand-code'] 404 405 def _retry_cr50_update(self, image, retries, rollback): 406 """Try to update to the given image retries amount of times. 407 408 @param image: The image path. 409 @param retries: The number of times to try to update. 410 @param rollback: Run rollback after the update. 411 @raises TestFail if the update failed. 412 """ 413 for i in range(retries): 414 try: 415 return self.cr50_update(image, rollback=rollback) 416 except Exception as e: 417 logging.warning('Failed to update to %s attempt %d: %s', 418 os.path.basename(image), i, str(e)) 419 logging.info('Sleeping 60 seconds') 420 time.sleep(60) 421 raise error.TestError( 422 'Failed to update to %s' % os.path.basename(image)) 423 424 def run_update_to_eraseflashinfo(self): 425 """Erase flashinfo using the eraseflashinfo image. 426 427 Update to the DBG image, rollback to the eraseflashinfo, and run 428 eraseflashinfo. 429 """ 430 self._retry_cr50_update(self._dbg_image_path, 3, False) 431 self._retry_cr50_update(self._eraseflashinfo_image_path, 3, True) 432 if not self.cr50.eraseflashinfo(): 433 raise error.TestError('Unable to erase the board id') 434 435 def eraseflashinfo_and_restore_image(self, image=''): 436 """eraseflashinfo and update to the given the image. 437 438 @param image: the image to end on. Use the original test image if no 439 image is given. 440 """ 441 image = image if image else self.get_saved_cr50_original_path() 442 self.run_update_to_eraseflashinfo() 443 self.cr50_update(image) 444 445 def update_cr50_image_and_board_id(self, image_path, bid): 446 """Set the chip board id and updating the cr50 image. 447 448 Make 3 attempts to update to the original image. Use a rollback from 449 the DBG image to erase the state that can only be erased by a DBG image. 450 Set the chip board id during rollback. 451 452 @param image_path: path of the image to update to. 453 @param bid: the board id to set. 454 """ 455 current_bid = cr50_utils.GetChipBoardId(self.host) 456 bid_mismatch = current_bid != bid 457 set_bid = bid_mismatch and bid != cr50_utils.ERASED_CHIP_BID 458 bid_is_erased = current_bid == cr50_utils.ERASED_CHIP_BID 459 eraseflashinfo = bid_mismatch and not bid_is_erased 460 461 if (eraseflashinfo 462 and not self._saved_cr50_state(self.ERASEFLASHINFO_IMAGE)): 463 raise error.TestFail('Did not save eraseflashinfo image') 464 465 # Remove prepvt and prod iamges, so they don't interfere with the test 466 # rolling back and updating to images that my be older than the images 467 # on the device. 468 if filesystem_util.is_rootfs_writable(self.host): 469 self.host.run('rm %s' % cr50_utils.CR50_PREPVT, ignore_status=True) 470 self.host.run('rm %s' % cr50_utils.CR50_PROD, ignore_status=True) 471 472 if eraseflashinfo: 473 self.run_update_to_eraseflashinfo() 474 475 self._retry_cr50_update(self._dbg_image_path, 3, False) 476 477 chip_bid = bid[0] 478 chip_flags = bid[2] 479 if set_bid: 480 self.cr50.set_board_id(chip_bid, chip_flags) 481 482 self._retry_cr50_update(image_path, 3, True) 483 484 def _discharging_factory_mode_cleanup(self): 485 """Try to get the dut back into charging mode. 486 487 Shutdown the DUT, fake disconnect AC, and then turn on the DUT to 488 try to recover the EC. 489 490 When Cr50 enters factory mode on Wilco, the EC disables charging. 491 Try to run the sequence to get the Wilco EC out of the factory mode 492 state, so it reenables charging. 493 """ 494 if self.faft_config.chrome_ec: 495 return 496 charge_state = self.host.get_power_supply_info()['Battery']['state'] 497 logging.info('Charge state: %r', charge_state) 498 if 'Discharging' not in charge_state: 499 logging.info('Charge state is ok') 500 return 501 502 if not self.servo.is_servo_v4_type_c(): 503 raise error.TestError( 504 'Cannot recover charging without Type C servo') 505 # Disconnect the charger and reset the dut to recover charging. 506 logging.info('Recovering charging') 507 self.faft_client.system.run_shell_command('poweroff') 508 time.sleep(self.cr50.SHORT_WAIT) 509 self.servo.set_nocheck('servo_v4_uart_cmd', 'fakedisconnect 100 20000') 510 time.sleep(self.cr50.SHORT_WAIT) 511 self._try_to_bring_dut_up() 512 charge_state = self.host.get_power_supply_info()['Battery']['state'] 513 logging.info('Charge state: %r', charge_state) 514 if 'Discharging' in charge_state: 515 logging.warning('DUT still discharging') 516 517 def _cleanup_required(self, state_mismatch, image_type): 518 """Return True if the update can fix something in the mismatched state. 519 520 @param state_mismatch: a dictionary of the mismatched state. 521 @param image_type: The integer representing the type of image 522 """ 523 state_image_restores = set(self.STATE_IMAGE_RESTORES[image_type]) 524 restore = state_image_restores.intersection(state_mismatch.keys()) 525 if restore and not self._saved_cr50_state(image_type): 526 raise error.TestError( 527 'Did not save images to restore %s' % (', '.join(restore))) 528 return not not restore 529 530 def _get_image_information(self, ext): 531 """Get the image information for the .prod or .prepvt image. 532 533 @param ext: The extension string prod or prepvt 534 @param returns: The image version or None if the image doesn't exist. 535 """ 536 dut_path = cr50_utils.GetDevicePath(ext) 537 file_exists = self.host.path_exists(dut_path) 538 if file_exists: 539 return cr50_utils.GetBinVersion(self.host, dut_path) 540 return None 541 542 def get_image_and_bid_state(self): 543 """Get a dict with the current device cr50 information. 544 545 The state dict will include the platform brand, chip board id, the 546 running cr50 image version, the running cr50 image board id, and the 547 device cr50 image version. 548 """ 549 state = {} 550 state['cros_config / brand-code'] = self.host.run( 551 'cros_config / brand-code', ignore_status=True).stdout.strip() 552 state['prod_version'] = self._get_image_information('prod') 553 state['prepvt_version'] = self._get_image_information('prepvt') 554 state['chip_bid'] = cr50_utils.GetChipBoardId(self.host) 555 state['chip_bid_str'] = '%08x:%08x:%08x' % state['chip_bid'] 556 state['running_image_ver'] = cr50_utils.GetRunningVersion(self.host) 557 state['running_image_bid'] = self.cr50.get_active_board_id_str() 558 559 logging.debug('Current Cr50 state:\n%s', pprint.pformat(state)) 560 return state 561 562 def _check_running_image_and_board_id(self, expected_state): 563 """Compare the current image and board id to the given state. 564 565 @param expected_state: A dictionary of the state to compare to. 566 @return: A dictionary with the state that is wrong as the key and the 567 expected and current state as the value. 568 """ 569 if not (self._saved_state & self.INITIAL_IMAGE_STATE): 570 logging.warning( 571 'Did not save the original state. Cannot verify it ' 572 'matches') 573 return 574 # Make sure the /var/cache/cr50* state is up to date. 575 cr50_utils.ClearUpdateStateAndReboot(self.host) 576 577 mismatch = {} 578 state = self.get_image_and_bid_state() 579 580 for k, expected_val in six.iteritems(expected_state): 581 val = state[k] 582 if val != expected_val: 583 mismatch[k] = 'expected: %s, current: %s' % (expected_val, val) 584 585 if mismatch: 586 logging.warning('State Mismatch:\n%s', pprint.pformat(mismatch)) 587 return mismatch 588 589 def _check_original_image_state(self): 590 """Compare the current cr50 state to the original state. 591 592 @return: A dictionary with the state that is wrong as the key and the 593 new and old state as the value 594 """ 595 mismatch = self._check_running_image_and_board_id( 596 self._original_image_state) 597 if not mismatch: 598 logging.info('The device is in the original state') 599 return mismatch 600 601 def _reset_ccd_settings(self): 602 """Reset the ccd lock and capability states.""" 603 if not self.cr50.ccd_is_reset(): 604 # Try to open cr50 and enable testlab mode if it isn't enabled. 605 try: 606 self.fast_ccd_open(True) 607 except: 608 # Even if we can't open cr50, do our best to reset the rest of 609 # the system state. Log a warning here. 610 logging.warning('Unable to Open cr50', exc_info=True) 611 self.cr50.ccd_reset(servo_en=False) 612 if not self.cr50.ccd_is_reset(): 613 raise error.TestFail('Could not reset ccd') 614 615 current_settings = self.cr50.get_cap_dict(info=self.cr50.CAP_SETTING) 616 if self.original_ccd_settings != current_settings: 617 if not self.can_set_ccd_level: 618 raise error.TestError("CCD state has changed, but we can't " 619 "restore it") 620 self.fast_ccd_open(True) 621 self.cr50.set_caps(self.original_ccd_settings) 622 623 # First try using testlab open to open the device 624 if self.original_ccd_level == 'open': 625 self.fast_ccd_open(True) 626 elif self.original_ccd_level != self.cr50.get_ccd_level(): 627 self.cr50.set_ccd_level(self.original_ccd_level) 628 629 def fast_ccd_open(self, 630 enable_testlab=False, 631 reset_ccd=True, 632 dev_mode=False): 633 """Check for watchdog resets after opening ccd. 634 635 Args: 636 enable_testlab: If True, enable testlab mode after cr50 is open. 637 reset_ccd: If True, reset ccd after open. 638 dev_mode: True if the device should be in dev mode after ccd is 639 is opened. 640 """ 641 try: 642 super(Cr50Test, self).fast_ccd_open(enable_testlab, reset_ccd, 643 dev_mode) 644 except Exception as e: 645 # Check for cr50 watchdog resets. 646 self.cr50.check_for_console_errors('Fast ccd open') 647 raise 648 649 def cleanup(self): 650 """Attempt to cleanup the cr50 state. Then run firmware cleanup""" 651 try: 652 # Reset the password as the first thing in cleanup. It is important 653 # that if some other part of cleanup fails, the password has at 654 # least been reset. 655 # DO NOT PUT ANYTHING BEFORE THIS. 656 self._try_quick_ccd_cleanup() 657 658 self.servo.enable_main_servo_device() 659 660 self._try_to_bring_dut_up() 661 self._restore_cr50_state() 662 663 # Make sure the sarien EC isn't stuck in factory mode. 664 self._discharging_factory_mode_cleanup() 665 finally: 666 super(Cr50Test, self).cleanup() 667 668 # Check the logs captured during firmware_test cleanup for cr50 errors. 669 self.cr50.check_for_console_errors('Test Cleanup') 670 self.servo.allow_ccd_watchdog_for_test() 671 672 def _update_device_images_and_running_cr50_firmware( 673 self, state, release_path, prod_path, prepvt_path): 674 """Update cr50, set the board id, and copy firmware to the DUT. 675 676 @param state: A dictionary with the expected running version, board id, 677 device cr50 firmware versions. 678 @param release_path: The image to update cr50 to 679 @param prod_path: The path to the .prod image 680 @param prepvt_path: The path to the .prepvt image 681 @raises TestError: if setting any state failed 682 """ 683 mismatch = self._check_running_image_and_board_id(state) 684 if not mismatch: 685 logging.info('Nothing to do.') 686 return 687 688 # Use the DBG image to restore the original image. 689 if self._cleanup_required(mismatch, self.DBG_IMAGE): 690 self.update_cr50_image_and_board_id(release_path, 691 state['chip_bid']) 692 693 self._try_to_bring_dut_up() 694 new_mismatch = self._check_running_image_and_board_id(state) 695 # Copy the original .prod and .prepvt images back onto the DUT. 696 if (self._cleanup_required(new_mismatch, self.DEVICE_IMAGES) 697 and filesystem_util.is_rootfs_writable(self.host)): 698 # Copy the .prod file onto the DUT. 699 if prod_path and 'prod_version' in new_mismatch: 700 cr50_utils.InstallImage(self.host, prod_path, 701 cr50_utils.CR50_PROD) 702 # Copy the .prepvt file onto the DUT. 703 if prepvt_path and 'prepvt_version' in new_mismatch: 704 cr50_utils.InstallImage(self.host, prepvt_path, 705 cr50_utils.CR50_PREPVT) 706 707 final_mismatch = self._check_running_image_and_board_id(state) 708 if final_mismatch: 709 raise error.TestError( 710 'Could not update cr50 image state: %s' % final_mismatch) 711 logging.info('Successfully updated all device cr50 firmware state.') 712 713 def _restore_device_images_and_running_cr50_firmware(self): 714 """Restore the images on the device and the running cr50 image.""" 715 if self._provision_update: 716 return 717 mismatch = self._check_original_image_state() 718 if not mismatch: 719 return 720 self._update_device_images_and_running_cr50_firmware( 721 self._original_image_state, 722 self.get_saved_cr50_original_path(), self._device_prod_image, 723 self._device_prepvt_image) 724 725 if self._raise_error_on_mismatch and mismatch: 726 raise error.TestError('Unexpected state mismatch during ' 727 'cleanup %s' % mismatch) 728 729 def _try_quick_ccd_cleanup(self): 730 """Try to clear all ccd state.""" 731 # This is just a first pass at cleanup. Don't raise any errors. 732 try: 733 self.cr50.ccd_enable() 734 except Exception as e: 735 logging.warning('Ignored exception enabling ccd %r', str(e)) 736 self.cr50.send_command('ccd testlab open') 737 self.cr50.send_command('rddkeepalive disable') 738 self.cr50.ccd_reset() 739 self.cr50.send_command('wp follow_batt_pres atboot') 740 741 def _restore_ccd_settings(self): 742 """Restore the original ccd state.""" 743 self._try_quick_ccd_cleanup() 744 745 # Reboot cr50 if the console is accessible. This will reset most state. 746 if self.cr50.get_cap('GscFullConsole')[self.cr50.CAP_IS_ACCESSIBLE]: 747 self.cr50.reboot() 748 749 # Reenable servo v4 CCD 750 self.cr50.ccd_enable() 751 752 # reboot to normal mode if the device is in dev mode. 753 self.enter_mode_after_checking_cr50_state('normal') 754 755 self._try_to_bring_dut_up() 756 tpm_utils.ClearTPMOwnerRequest(self.host, wait_for_ready=True) 757 self.clear_fwmp() 758 759 # Restore the ccd privilege level 760 self._reset_ccd_settings() 761 762 def _restore_cr50_state(self): 763 """Restore cr50 state, so the device can be used for further testing. 764 765 Restore the cr50 image and board id first. Then CCD, because flashing 766 dev signed images completely clears the CCD state. 767 """ 768 try: 769 self._restore_device_images_and_running_cr50_firmware() 770 except Exception as e: 771 logging.warning('Issue restoring Cr50 image: %s', str(e)) 772 raise 773 finally: 774 self._restore_ccd_settings() 775 776 def find_cr50_gs_image(self, gsurl): 777 """Find the cr50 gs image name. 778 779 @param gsurl: the cr50 image location 780 @return: a list of the gsutil bucket, filename or None if the file 781 can't be found 782 """ 783 try: 784 return utils.gs_ls(gsurl)[0].rsplit('/', 1) 785 except error.CmdError: 786 logging.info('%s does not exist', gsurl) 787 return None 788 789 def _extract_cr50_image(self, archive, fn): 790 """Extract the filename from the given archive 791 Aargs: 792 archive: the archive location on the host 793 fn: the file to extract 794 795 Returns: 796 The location of the extracted file 797 """ 798 remote_dir = os.path.dirname(archive) 799 result = self.host.run('tar xfv %s -C %s' % (archive, remote_dir)) 800 for line in result.stdout.splitlines(): 801 if os.path.basename(line) == fn: 802 return os.path.join(remote_dir, line) 803 raise error.TestFail('%s was not extracted from %s' % (fn, archive)) 804 805 def download_cr50_gs_file(self, gsurl, extract_fn): 806 """Download and extract the file at gsurl. 807 808 @param gsurl: The gs url for the cr50 image 809 @param extract_fn: The name of the file to extract from the cr50 image 810 tarball. Don't extract anything if extract_fn is None. 811 @return: a tuple (local path, host path) 812 """ 813 file_info = self.find_cr50_gs_image(gsurl) 814 if not file_info: 815 raise error.TestFail('Could not find %s' % gsurl) 816 bucket, fn = file_info 817 818 remote_temp_dir = '/tmp/' 819 src = os.path.join(remote_temp_dir, fn) 820 dest = os.path.join(self.resultsdir, fn) 821 822 # Copy the image to the dut 823 gsutil_wrapper.copy_private_bucket( 824 host=self.host, 825 bucket=bucket, 826 filename=fn, 827 destination=remote_temp_dir) 828 if extract_fn: 829 src = self._extract_cr50_image(src, extract_fn) 830 logging.info('extracted %s', src) 831 # Remove .tbz2 from the local path. 832 dest = os.path.splitext(dest)[0] 833 834 self.host.get_file(src, dest) 835 return dest, src 836 837 def download_cr50_gs_image(self, gsurl, extract_fn, image_bid): 838 """Get the image from gs and save it in the autotest dir. 839 840 @param gsurl: The gs url for the cr50 image 841 @param extract_fn: The name of the file to extract from the cr50 image 842 tarball. Don't extract anything if extract_fn is None. 843 @param image_bid: the image symbolic board id 844 @return: A tuple with the local path and version 845 """ 846 dest, src = self.download_cr50_gs_file(gsurl, extract_fn) 847 ver = cr50_utils.GetBinVersion(self.host, src) 848 849 # Compare the image board id to the downloaded image to make sure we got 850 # the right file 851 downloaded_bid = cr50_utils.GetBoardIdInfoString(ver[2], symbolic=True) 852 if image_bid and image_bid != downloaded_bid: 853 raise error.TestError( 854 'Could not download image with matching ' 855 'board id wanted %s got %s' % (image_bid, downloaded_bid)) 856 return dest, ver 857 858 def download_cr50_eraseflashinfo_image(self): 859 """download the cr50 image that allows erasing flashinfo. 860 861 Get the file with the matching devid. 862 863 @return: A tuple with the debug image local path and version 864 """ 865 devid = self._devid.replace(' ', '-').replace('0x', '') 866 gsurl = os.path.join(self.GS_PRIVATE_DBG, 867 self.CR50_ERASEFLASHINFO_FILE % devid) 868 return self.download_cr50_gs_image(gsurl, None, None) 869 870 def download_cr50_debug_image(self, devid='', image_bid=''): 871 """download the cr50 debug file. 872 873 Get the file with the matching devid and image board id info 874 875 @param image_bid: the image board id info string or list 876 @return: A tuple with the debug image local path and version 877 """ 878 bid_ext = '' 879 # Add the image bid string to the filename 880 if image_bid: 881 image_bid = cr50_utils.GetBoardIdInfoString( 882 image_bid, symbolic=True) 883 bid_ext = '.' + image_bid.replace(':', '_') 884 885 devid = devid if devid else self._devid 886 dbg_file = self.CR50_DEBUG_FILE % (devid.replace(' ', '_'), bid_ext) 887 gsurl = os.path.join(self.GS_PRIVATE_DBG, dbg_file) 888 return self.download_cr50_gs_image(gsurl, None, image_bid) 889 890 def download_cr50_tot_image(self): 891 """download the cr50 TOT image. 892 893 @return: the local path to the TOT image. 894 """ 895 # TODO(mruthven): use logic from provision_Cr50TOT 896 raise error.TestNAError('Could not download TOT image') 897 898 def _find_release_image_gsurl(self, fn): 899 """Find the gs url for the release image""" 900 for gsbucket in [self.GS_PUBLIC, self.GS_PRIVATE_PROD]: 901 gsurl = os.path.join(gsbucket, fn) 902 if self.find_cr50_gs_image(gsurl): 903 return gsurl 904 raise error.TestFail('%s is not on google storage' % fn) 905 906 def download_cr50_release_image(self, image_rw, image_bid=''): 907 """download the cr50 release file. 908 909 Get the file with the matching version and image board id info 910 911 @param image_rw: the rw version string 912 @param image_bid: the image board id info string or list 913 @return: A tuple with the release image local path and version 914 """ 915 bid_ext = '' 916 # Add the image bid string to the gsurl 917 if image_bid: 918 image_bid = cr50_utils.GetBoardIdInfoString( 919 image_bid, symbolic=True) 920 bid_ext = '_' + image_bid.replace(':', '_') 921 release_fn = self.CR50_PROD_FILE % (image_rw, bid_ext) 922 gsurl = self._find_release_image_gsurl(release_fn) 923 924 # Release images can be found using the rw version 925 # Download the image 926 dest, ver = self.download_cr50_gs_image(gsurl, 'cr50.bin.prod', 927 image_bid) 928 929 # Compare the rw version and board id info to make sure the right image 930 # was found 931 if image_rw != ver[1]: 932 raise error.TestError('Could not download image with matching ' 933 'rw version') 934 return dest, ver 935 936 def _cr50_verify_update(self, expected_rw, expect_rollback): 937 """Verify the expected version is running on cr50. 938 939 @param expected_rw: The RW version string we expect to be running 940 @param expect_rollback: True if cr50 should have rolled back during the 941 update 942 @raise TestFail: if there is any unexpected update state 943 """ 944 errors = [] 945 running_rw = self.cr50.get_version() 946 if expected_rw != running_rw: 947 errors.append('running %s not %s' % (running_rw, expected_rw)) 948 949 if expect_rollback != self.cr50.rolledback(): 950 errors.append('%srollback detected' % 951 'no ' if expect_rollback else '') 952 if len(errors): 953 raise error.TestFail('cr50_update failed: %s' % ', '.join(errors)) 954 logging.info('RUNNING %s after %s', expected_rw, 955 'rollback' if expect_rollback else 'update') 956 957 def _cr50_run_update(self, path): 958 """Install the image at path onto cr50. 959 960 @param path: the location of the image to update to 961 @return: the rw version of the image 962 """ 963 tmp_dest = '/tmp/' + os.path.basename(path) 964 965 # Make sure the dut is sshable before installing the image. 966 self._try_to_bring_dut_up() 967 968 dest, image_ver = cr50_utils.InstallImage(self.host, path, tmp_dest) 969 # Use the -p option to make sure the DUT does a clean reboot. 970 cr50_utils.GSCTool(self.host, ['-a', dest, '-p']) 971 # Reboot the DUT to finish the cr50 update. 972 self.host.reboot(wait=False) 973 return image_ver[1] 974 975 def cr50_update(self, path, rollback=False, expect_rollback=False): 976 """Attempt to update to the given image. 977 978 If rollback is True, we assume that cr50 is already running an image 979 that can rollback. 980 981 @param path: the location of the update image 982 @param rollback: True if we need to force cr50 to rollback to update to 983 the given image 984 @param expect_rollback: True if cr50 should rollback on its own 985 @raise TestFail: if the update failed 986 """ 987 original_rw = self.cr50.get_version() 988 989 # Cr50 is going to reject an update if it hasn't been up for more than 990 # 60 seconds. Wait until that passes before trying to run the update. 991 self.cr50.wait_until_update_is_allowed() 992 993 image_rw = self._cr50_run_update(path) 994 995 # Running the update may cause cr50 to reboot. Wait for that before 996 # sending more commands. The reboot should happen quickly. 997 self.cr50.wait_for_reboot( 998 timeout=self.faft_config.gsc_update_wait_for_reboot) 999 1000 if rollback: 1001 self.cr50.rollback() 1002 1003 expected_rw = original_rw if expect_rollback else image_rw 1004 # If we expect a rollback, the version should remain unchanged 1005 self._cr50_verify_update(expected_rw, rollback or expect_rollback) 1006 1007 def run_gsctool_cmd_with_password(self, password, cmd, name, expect_error): 1008 """Run a gsctool command and input the password 1009 1010 @param password: The cr50 password string 1011 @param cmd: The gsctool command 1012 @param name: The name to give the job 1013 @param expect_error: True if the command should fail 1014 """ 1015 logging.info('Running: %s', cmd) 1016 logging.info('Password: %s', password) 1017 # Make sure the test waits long enough to avoid ccd rate limiting. 1018 time.sleep(self.cr50.CCD_PASSWORD_RATE_LIMIT) 1019 full_cmd = "echo -e '%s\n%s\n' | %s" % (password, password, cmd) 1020 result = self.host.run(full_cmd, ignore_status=expect_error) 1021 if result.exit_status: 1022 message = ('gsctool %s failed using %r: %s %s' % 1023 (name, password, result.exit_status, result.stderr)) 1024 if expect_error: 1025 logging.info(message) 1026 else: 1027 raise error.TestFail(message) 1028 elif expect_error: 1029 raise error.TestFail('%s with %r did not fail when expected' % 1030 (name, password)) 1031 else: 1032 logging.info('ran %s password command: %r', name, result.stdout) 1033 1034 def set_ccd_password(self, password, expect_error=False): 1035 """Set the ccd password""" 1036 # Testlab mode can't be enabled if there is no power button, so we 1037 # shouldn't allow setting the password. 1038 if not self.faft_config.has_powerbutton: 1039 raise error.TestError('No power button') 1040 1041 # If for some reason the test sets a password and is interrupted before 1042 # we can clear it, we want testlab mode to be enabled, so it's possible 1043 # to clear the password without knowing it. 1044 if not self.cr50.testlab_is_on(): 1045 raise error.TestError('Will not set password unless testlab mode ' 1046 'is enabled.') 1047 try: 1048 self.run_gsctool_cmd_with_password(password, 'gsctool -a -P', 1049 'set_password', expect_error) 1050 finally: 1051 logging.info('Cr50 password is %s', 1052 'cleared' if self.cr50.password_is_reset() else 'set') 1053 1054 def ccd_unlock_from_ap(self, password=None, expect_error=False): 1055 """Unlock cr50""" 1056 if not password: 1057 self.host.run('gsctool -a -U') 1058 return 1059 self.run_gsctool_cmd_with_password(password, 'gsctool -a -U', 'unlock', 1060 expect_error) 1061 1062 def tpm_is_responsive(self): 1063 """Check TPM responsiveness by running tpm_version.""" 1064 result = self.host.run('tpm_version', ignore_status=True) 1065 logging.debug(result.stdout.strip()) 1066 return not result.exit_status 1067