1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import functools 6import logging 7import pprint 8import re 9import time 10 11from autotest_lib.client.bin import utils 12from autotest_lib.client.common_lib import error 13from autotest_lib.client.common_lib.cros import cr50_utils 14from autotest_lib.server.cros.servo import chrome_ec 15 16 17def servo_v4_command(func): 18 """Decorator for methods only relevant to tests running with servo v4.""" 19 @functools.wraps(func) 20 def wrapper(instance, *args, **kwargs): 21 """Ignore servo v4 functions it's not being used.""" 22 if instance.using_servo_v4(): 23 return func(instance, *args, **kwargs) 24 logging.info("not using servo v4. ignoring %s", func.func_name) 25 return wrapper 26 27 28class ChromeCr50(chrome_ec.ChromeConsole): 29 """Manages control of a Chrome Cr50. 30 31 We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50 32 provides many interfaces to set and get its behavior via console commands. 33 This class is to abstract these interfaces. 34 """ 35 OPEN = 'open' 36 UNLOCK = 'unlock' 37 LOCK = 'lock' 38 # The amount of time you need to show physical presence. 39 PP_SHORT = 15 40 PP_LONG = 300 41 CCD_PASSWORD_RATE_LIMIT = 3 42 IDLE_COUNT = 'count: (\d+)\s' 43 SHORT_WAIT = 3 44 # The version has four groups: the partition, the header version, debug 45 # descriptor and then version string. 46 # There are two partitions A and B. The active partition is marked with a 47 # '*'. If it is a debug image '/DBG' is added to the version string. If the 48 # image has been corrupted, the version information will be replaced with 49 # 'Error'. 50 # So the output may look something like this. 51 # RW_A: 0.0.21/cr50_v1.1.6133-fd788b 52 # RW_B: * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d 53 # Or like this if the region was corrupted. 54 # RW_A: * 0.0.21/cr50_v1.1.6133-fd788b 55 # RW_B: Error 56 VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s' 57 INACTIVE_VERSION = VERSION_FORMAT % '' 58 ACTIVE_VERSION = VERSION_FORMAT % '\*' 59 # Following lines of the version output may print the image board id 60 # information. eg. 61 # BID A: 5a5a4146:ffffffff:00007f00 Yes 62 # BID B: 00000000:00000000:00000000 Yes 63 # Use the first group from ACTIVE_VERSION to match the active board id 64 # partition. 65 BID_ERROR = 'read_board_id: failed' 66 BID_FORMAT = ':\s+[a-f0-9:]+ ' 67 ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT, 68 BID_ERROR) 69 WAKE_CHAR = '\n\n' 70 WAKE_RESPONSE = ['(>|Console is enabled)'] 71 START_UNLOCK_TIMEOUT = 20 72 GETTIME = ['= (\S+)'] 73 FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"] 74 FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting'] 75 MAX_RETRY_COUNT = 5 76 START_STR = ['(.*Console is enabled;)'] 77 REBOOT_DELAY_WITH_CCD = 60 78 REBOOT_DELAY_WITH_FLEX = 3 79 ON_STRINGS = ['enable', 'enabled', 'on'] 80 CONSERVATIVE_CCD_WAIT = 10 81 CCD_SHORT_PRESSES = 5 82 CAP_IS_ACCESSIBLE = 0 83 CAP_SETTING = 1 84 CAP_REQ = 2 85 GET_CAP_TRIES = 3 86 87 88 def __init__(self, servo): 89 super(ChromeCr50, self).__init__(servo, 'cr50_uart') 90 91 92 def wake_cr50(self): 93 """Wake up cr50 by sending some linebreaks and wait for the response""" 94 logging.debug(super(ChromeCr50, self).send_command_get_output( 95 self.WAKE_CHAR, self.WAKE_RESPONSE)) 96 97 98 def send_command(self, commands): 99 """Send command through UART. 100 101 Cr50 will drop characters input to the UART when it resumes from sleep. 102 If servo is not using ccd, send some dummy characters before sending the 103 real command to make sure cr50 is awake. 104 """ 105 if not self.using_ccd(): 106 self.wake_cr50() 107 super(ChromeCr50, self).send_command(commands) 108 109 110 def set_cap(self, cap, setting): 111 """Set the capability to setting""" 112 self.set_caps({ cap : setting }) 113 114 115 def set_caps(self, cap_dict): 116 """Use cap_dict to set all the cap values 117 118 Set all of the capabilities in cap_dict to the correct config. 119 120 Args: 121 cap_dict: A dictionary with the capability as key and the desired 122 setting as values 123 """ 124 for cap, config in cap_dict.iteritems(): 125 self.send_command('ccd set %s %s' % (cap, config)) 126 current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING) 127 for cap, config in cap_dict.iteritems(): 128 if (current_cap_settings[cap].lower() != 129 config.lower()): 130 raise error.TestFail('Failed to set %s to %s' % (cap, config)) 131 132 133 def get_cap_overview(self, cap_dict): 134 """Returns a tuple (in factory mode, is reset) 135 136 If all capabilities are set to Default, ccd has been reset to default. 137 If all capabilities are set to Always, ccd is in factory mode. 138 """ 139 in_factory_mode = True 140 is_reset = True 141 for cap, cap_info in cap_dict.iteritems(): 142 cap_setting = cap_info[self.CAP_SETTING] 143 if cap_setting != 'Always': 144 in_factory_mode = False 145 if cap_setting != 'Default': 146 is_reset = False 147 return in_factory_mode, is_reset 148 149 150 def wp_is_reset(self): 151 """Returns True if wp is reset to follow batt pres at all times""" 152 follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state() 153 return follow_batt_pres and follow_batt_pres_atboot 154 155 156 def get_wp_state(self): 157 """Returns a tuple of the current write protect state. 158 159 The atboot setting cannot really be determined now if it is set to 160 follow battery presence. It is likely to remain the same after reboot, 161 but who knows. If the third element of the tuple is True, the last 162 element will not be that useful 163 164 Returns: 165 (True if current state is to follow batt presence, 166 True if write protect is enabled, 167 True if current state is to follow batt presence atboot, 168 True if write protect is enabled atboot) 169 """ 170 rv = self.send_command_get_output('wp', 171 ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?' 172 '(follow|enabled|disabled)'])[0] 173 _, forced, enabled, _, atboot = rv 174 logging.debug(rv) 175 return (not forced, enabled =='enabled', 176 atboot == 'follow', atboot == 'enabled') 177 178 179 def in_dev_mode(self): 180 """Return True if cr50 thinks the device is in dev mode""" 181 return 'dev_mode' in self.get_ccd_info()['TPM'] 182 183 184 def get_ccd_info(self): 185 """Get the current ccd state. 186 187 Take the output of 'ccd' and convert it to a dictionary. 188 189 Returns: 190 A dictionary with the ccd state name as the key and setting as 191 value. 192 """ 193 info = {} 194 original_timeout = float(self._servo.get('cr50_uart_timeout')) 195 # Change the console timeout to 10s, it may take longer than 3s to read 196 # ccd info 197 self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT) 198 try: 199 rv = self.send_command_get_output('ccd', ["ccd.*>"])[0] 200 finally: 201 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 202 for line in rv.splitlines(): 203 # CCD information is separated with an : 204 # State: Opened 205 # Extract the state name and the value. 206 line = line.strip() 207 if ':' not in line or '[' in line: 208 continue 209 key, value = line.split(':') 210 info[key.strip()] = value.strip() 211 logging.info('Current CCD settings:\n%s', pprint.pformat(info)) 212 return info 213 214 215 def get_cap(self, cap): 216 """Returns the capabilitiy from the capability dictionary""" 217 return self.get_cap_dict()[cap] 218 219 220 def _get_ccd_cap_string(self): 221 """Return a string with the current capability settings. 222 223 The ccd information is pretty long. Servo micro sometimes drops 224 characters. Run the command a couple of times. Return the capapability 225 string that matches a previous run. 226 227 Raises: 228 TestError if the test could not retrieve consistent capability 229 information. 230 """ 231 past_results = [] 232 for i in range(self.GET_CAP_TRIES): 233 rv = self.send_safe_command_get_output('ccd', 234 ["Capabilities:\s+[\da-f]+\s(.*)TPM:"])[0][1] 235 logging.debug(rv) 236 if rv in past_results: 237 return rv 238 past_results.append(rv) 239 logging.debug(past_results) 240 raise error.TestError('Could not get consistent capability information') 241 242 243 def get_cap_dict(self, info=None): 244 """Get the current ccd capability settings. 245 246 The capability may be using the 'Default' setting. That doesn't say much 247 about the ccd state required to use the capability. Return all ccd 248 information in the cap_dict 249 [is accessible, setting, requirement] 250 251 Args: 252 info: Only fill the cap_dict with the requested information: 253 CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ 254 255 Returns: 256 A dictionary with the capability as the key a list of the current 257 settings as the value [is_accessible, setting, requirement] 258 """ 259 caps = {} 260 cap_info_str = self._get_ccd_cap_string() 261 # There are two capability formats. Extract the setting and the 262 # requirement from both formats 263 # UartGscRxECTx Y 3=IfOpened 264 # or 265 # UartGscRxECTx Y 0=Default (Always) 266 cap_settings = re.findall('(\S+) +(Y|-).*=(\w+)( \((\S+)\))?', 267 cap_info_str) 268 for cap, accessible, setting, _, required in cap_settings: 269 cap_info = [accessible == 'Y', setting, required] 270 # If there's only 1 value after =, then the setting is the 271 # requirement. 272 if not required: 273 cap_info[self.CAP_REQ] = setting 274 if info is not None: 275 caps[cap] = cap_info[info] 276 else: 277 caps[cap] = cap_info 278 logging.debug(pprint.pformat(caps)) 279 return caps 280 281 282 def send_command_get_output(self, command, regexp_list): 283 """Send command through UART and wait for response. 284 285 Cr50 will drop characters input to the UART when it resumes from sleep. 286 If servo is not using ccd, send some dummy characters before sending the 287 real command to make sure cr50 is awake. 288 """ 289 if not self.using_ccd(): 290 self.wake_cr50() 291 292 # We have started prepending '\n' to separate cr50 console junk from 293 # the real command. If someone is just searching for .*>, then they will 294 # only get the output from the first '\n' we added. Raise an error to 295 # change the test to look for something more specific ex command.*>. 296 # cr50 will print the command in the output, so that is an easy way to 297 # modify '.*>' to match the real command output. 298 if '.*>' in regexp_list: 299 raise error.TestError('Send more specific regexp %r %r' % (command, 300 regexp_list)) 301 302 # prepend \n to separate the command from any junk that may have been 303 # sent to the cr50 uart. 304 command = '\n' + command 305 return super(ChromeCr50, self).send_command_get_output(command, 306 regexp_list) 307 308 309 def send_safe_command_get_output(self, command, regexp_list): 310 """Restrict the console channels while sending console commands""" 311 self.send_command('chan save') 312 self.send_command('chan 0') 313 try: 314 rv = self.send_command_get_output(command, regexp_list) 315 finally: 316 self.send_command('chan restore') 317 return rv 318 319 320 def send_command_retry_get_output(self, command, regexp_list, tries=3): 321 """Retry sending a command if we can't find the output. 322 323 Cr50 may print irrelevant output while printing command output. It may 324 prevent the regex from matching. Send command and get the output. If it 325 fails try again. 326 327 If it fails every time, raise an error. 328 329 Don't use this to set something that should only be set once. 330 """ 331 # TODO(b/80319784): once chan is unrestricted, use it to restrict what 332 # output cr50 prints while we are sending commands. 333 for i in range(tries): 334 try: 335 return self.send_command_get_output(command, regexp_list) 336 except error.TestFail, e: 337 logging.info('Failed to get %r output: %r', command, str(e)) 338 if i == tries - 1: 339 # Raise the last error, if we never successfully returned 340 # the command output 341 logging.info('Could not get %r output after %d tries', 342 command, tries) 343 raise 344 345 346 def get_deep_sleep_count(self): 347 """Get the deep sleep count from the idle task""" 348 result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT]) 349 return int(result[0][1]) 350 351 352 def clear_deep_sleep_count(self): 353 """Clear the deep sleep count""" 354 result = self.send_command_retry_get_output('idle c', [self.IDLE_COUNT]) 355 if int(result[0][1]): 356 raise error.TestFail("Could not clear deep sleep count") 357 358 359 def get_board_properties(self): 360 """Get information from the version command""" 361 rv = self.send_command_retry_get_output('brdprop', 362 ['properties = (\S+)\s']) 363 return int(rv[0][1], 16) 364 365 366 def has_command(self, cmd): 367 """Returns 1 if cr50 has the command 0 if it doesn't""" 368 try: 369 self.send_command_retry_get_output('help', [cmd]) 370 except: 371 logging.info("Image does not include '%s' command", cmd) 372 return 0 373 return 1 374 375 376 def erase_nvmem(self): 377 """Use flasherase to erase both nvmem sections""" 378 if not self.has_command('flasherase'): 379 raise error.TestError("need image with 'flasherase'") 380 381 self.send_command('flasherase 0x7d000 0x3000') 382 self.send_command('flasherase 0x3d000 0x3000') 383 384 385 def reboot(self): 386 """Reboot Cr50 and wait for cr50 to reset""" 387 self.wait_for_reboot(cmd='reboot') 388 389 390 def _uart_wait_for_reboot(self, cmd='\n', timeout=60): 391 """Use uart to wait for cr50 to reboot. 392 393 If a command is given run it and wait for cr50 to reboot. Monitor 394 the cr50 uart to detect the reset. Wait up to timeout seconds 395 for the reset. 396 397 Args: 398 cmd: the command to run to reset cr50. 399 timeout: seconds to wait to detect the reboot. 400 """ 401 original_timeout = float(self._servo.get('cr50_uart_timeout')) 402 # Change the console timeout to timeout, so we wait at least that long 403 # for cr50 to print the start string. 404 self._servo.set_nocheck('cr50_uart_timeout', timeout) 405 try: 406 self.send_command_get_output(cmd, self.START_STR) 407 logging.debug('Detected cr50 reboot') 408 except error.TestFail, e: 409 logging.debug('Failed to detect cr50 reboot') 410 # Reset the timeout. 411 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 412 413 414 def wait_for_reboot(self, cmd='\n', timeout=60): 415 """Wait for cr50 to reboot 416 417 Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if 418 necessary. 419 420 Args: 421 cmd: the command to run to reset cr50. 422 timeout: seconds to wait to detect the reboot. 423 """ 424 if self.using_ccd(): 425 self.send_command(cmd) 426 # Cr50 USB is reset when it reboots. Wait for the CCD connection to 427 # go down to detect the reboot. 428 self.wait_for_ccd_disable(timeout, raise_error=False) 429 self.ccd_enable() 430 else: 431 self._uart_wait_for_reboot(cmd, timeout) 432 433 434 def rollback(self, eraseflashinfo=True, chip_bid=None, chip_flags=None): 435 """Set the reset counter high enough to force a rollback then reboot 436 437 Set the new board id before rolling back if one is given. 438 439 Args: 440 eraseflashinfo: True if eraseflashinfo should be run before rollback 441 chip_bid: the integer representation of chip board id or None if the 442 board id should be erased during rollback 443 chip_flags: the integer representation of chip board id flags or 444 None if the board id should be erased during rollback 445 """ 446 if (not self.has_command('rollback') or not 447 self.has_command('eraseflashinfo')): 448 raise error.TestError("need image with 'rollback' and " 449 "'eraseflashinfo'") 450 451 inactive_partition = self.get_inactive_version_info()[0] 452 # Set the board id if both the board id and flags have been given. 453 set_bid = chip_bid and chip_flags 454 455 # Erase the infomap 456 if eraseflashinfo or set_bid: 457 self.send_command('eraseflashinfo') 458 459 # Update the board id after it has been erased 460 if set_bid: 461 self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags)) 462 463 self.wait_for_reboot(cmd='rollback') 464 465 running_partition = self.get_active_version_info()[0] 466 if inactive_partition != running_partition: 467 raise error.TestError("Failed to rollback to inactive image") 468 469 470 def rolledback(self): 471 """Returns true if cr50 just rolled back""" 472 return 'Rollback detected' in self.send_safe_command_get_output( 473 'sysinfo', ['sysinfo.*>'])[0] 474 475 476 def get_version_info(self, regexp): 477 """Get information from the version command""" 478 return self.send_command_retry_get_output('ver', [regexp])[0][1::] 479 480 481 def get_inactive_version_info(self): 482 """Get the active partition, version, and hash""" 483 return self.get_version_info(self.INACTIVE_VERSION) 484 485 486 def get_active_version_info(self): 487 """Get the active partition, version, and hash""" 488 return self.get_version_info(self.ACTIVE_VERSION) 489 490 491 def using_prod_rw_keys(self): 492 """Returns True if the RW keyid is prod""" 493 rv = self.send_safe_command_retry_get_output('sysinfo', 494 ['RW keyid:.*\(([a-z]+)\)']) 495 logging.info(rv) 496 return rv[0][1] == 'prod' 497 498 499 def get_active_board_id_str(self): 500 """Get the running image board id. 501 502 Returns: 503 The board id string or None if the image does not support board id 504 or the image is not board id locked. 505 """ 506 # Getting the board id from the version console command is only 507 # supported in board id locked images .22 and above. Any image that is 508 # board id locked will have support for getting the image board id. 509 # 510 # If board id is not supported on the device, return None. This is 511 # still expected on all current non board id locked release images. 512 try: 513 version_info = self.get_version_info(self.ACTIVE_BID) 514 except error.TestFail, e: 515 logging.info(str(e)) 516 logging.info('Cannot use the version to get the board id') 517 return None 518 519 if self.BID_ERROR in version_info[4]: 520 raise error.TestError(version_info) 521 bid = version_info[4].split()[1] 522 return cr50_utils.GetBoardIdInfoString(bid) 523 524 525 def get_version(self): 526 """Get the RW version""" 527 return self.get_active_version_info()[1].strip() 528 529 530 def using_servo_v4(self): 531 """Returns true if the console is being served using servo v4""" 532 return 'servo_v4' in self._servo.get_servo_version() 533 534 535 def using_ccd(self): 536 """Returns true if the console is being served using CCD""" 537 return 'ccd_cr50' in self._servo.get_servo_version() 538 539 540 def ccd_is_enabled(self): 541 """Return True if ccd is enabled. 542 543 If the test is running through ccd, return the ccd_state value. If 544 a flex cable is being used, use the CCD_MODE_L gpio setting to determine 545 if Cr50 has ccd enabled. 546 547 Returns: 548 'off' or 'on' based on whether the cr50 console is working. 549 """ 550 if self.using_ccd(): 551 return self._servo.get('ccd_state') == 'on' 552 else: 553 result = self.send_command_retry_get_output('gpioget', 554 ['(0|1)[ \S]*CCD_MODE_L']) 555 return not bool(int(result[0][1])) 556 557 558 @servo_v4_command 559 def wait_for_stable_ccd_state(self, state, timeout, raise_error): 560 """Wait up to timeout seconds for CCD to be 'on' or 'off' 561 562 Verify ccd is off or on and remains in that state for 3 seconds. 563 564 Args: 565 state: a string either 'on' or 'off'. 566 timeout: time in seconds to wait 567 raise_error: Raise TestFail if the value is state is not reached. 568 569 Raises: 570 TestFail if ccd never reaches the specified state 571 """ 572 wait_for_enable = state == 'on' 573 logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off') 574 enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable, 575 timeout_sec=timeout) 576 if enabled != wait_for_enable: 577 error_msg = ("timed out before detecting ccd '%s'" % 578 ('on' if wait_for_enable else 'off')) 579 if raise_error: 580 raise error.TestFail(error_msg) 581 logging.warning(error_msg) 582 else: 583 # Make sure the state doesn't change. 584 enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled, 585 timeout_sec=self.SHORT_WAIT) 586 if enabled != wait_for_enable: 587 error_msg = ("CCD switched %r after briefly being %r" % 588 ('on' if enabled else 'off', state)) 589 if raise_error: 590 raise error.TestFail(error_msg) 591 logging.info(error_msg) 592 logging.info("ccd is %r", 'on' if enabled else 'off') 593 594 595 @servo_v4_command 596 def wait_for_ccd_disable(self, timeout=60, raise_error=True): 597 """Wait for the cr50 console to stop working""" 598 self.wait_for_stable_ccd_state('off', timeout, raise_error) 599 600 601 @servo_v4_command 602 def wait_for_ccd_enable(self, timeout=60, raise_error=False): 603 """Wait for the cr50 console to start working""" 604 self.wait_for_stable_ccd_state('on', timeout, raise_error) 605 606 607 @servo_v4_command 608 def ccd_disable(self, raise_error=True): 609 """Change the values of the CC lines to disable CCD""" 610 logging.info("disable ccd") 611 self._servo.set_nocheck('servo_v4_dts_mode', 'off') 612 self.wait_for_ccd_disable(raise_error=raise_error) 613 614 615 @servo_v4_command 616 def ccd_enable(self, raise_error=False): 617 """Reenable CCD and reset servo interfaces""" 618 logging.info("reenable ccd") 619 self._servo.set_nocheck('servo_v4_dts_mode', 'on') 620 # If the test is actually running with ccd, wait for USB communication 621 # to come up after reset. 622 if self.using_ccd(): 623 time.sleep(self._servo.USB_DETECTION_DELAY) 624 self.wait_for_ccd_enable(raise_error=raise_error) 625 626 627 def _level_change_req_pp(self, level): 628 """Returns True if setting the level will require physical presence""" 629 testlab_pp = level != 'testlab open' and 'testlab' in level 630 # If the level is open and the ccd capabilities say physical presence 631 # is required, then physical presence will be required. 632 open_pp = (level == 'open' and 633 not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE]) 634 return testlab_pp or open_pp 635 636 637 def _state_to_bool(self, state): 638 """Converts the state string to True or False""" 639 # TODO(mruthven): compare to 'on' once servo is up to date in the lab 640 return state.lower() in self.ON_STRINGS 641 642 643 def testlab_is_on(self): 644 """Returns True of testlab mode is on""" 645 return self._state_to_bool(self._servo.get('cr50_testlab')) 646 647 648 def set_ccd_testlab(self, state): 649 """Set the testlab mode 650 651 Args: 652 state: the desired testlab mode string: 'on' or 'off' 653 654 Raises: 655 TestFail if testlab mode was not changed 656 """ 657 if self.using_ccd(): 658 raise error.TestError('Cannot set testlab mode with CCD. Use flex ' 659 'cable instead.') 660 661 request_on = self._state_to_bool(state) 662 testlab_on = self.testlab_is_on() 663 request_str = 'on' if request_on else 'off' 664 665 if testlab_on == request_on: 666 logging.info('ccd testlab already set to %s', request_str) 667 return 668 669 original_level = self.get_ccd_level() 670 671 # We can only change the testlab mode when the device is open. If 672 # testlab mode is already enabled, we can go directly to open using 'ccd 673 # testlab open'. This will save 5 minutes, because we can skip the 674 # physical presence check. 675 if testlab_on: 676 self.send_command('ccd testlab open') 677 else: 678 self.set_ccd_level('open') 679 680 # Set testlab mode 681 rv = self.send_command_get_output('ccd testlab %s' % request_str, 682 ['ccd.*>'])[0] 683 if 'Access Denied' in rv: 684 raise error.TestFail("'ccd %s' %s" % (request_str, rv)) 685 686 # Press the power button once a second for 15 seconds. 687 self.run_pp(self.PP_SHORT) 688 689 self.set_ccd_level(original_level) 690 691 if request_on != self.testlab_is_on(): 692 raise error.TestFail('Failed to set ccd testlab to %s' % state) 693 694 695 def get_ccd_level(self): 696 """Returns the current ccd privilege level""" 697 return self._servo.get('cr50_ccd_level').lower() 698 699 700 def set_ccd_level(self, level, password=''): 701 """Set the Cr50 CCD privilege level. 702 703 Args: 704 level: a string of the ccd privilege level: 'open', 'lock', or 705 'unlock'. 706 password: send the ccd command with password. This will still 707 require the same physical presence. 708 709 Raises: 710 TestFail if the level couldn't be set 711 .""" 712 # TODO(mruthven): add support for CCD password 713 level = level.lower() 714 715 if level == self.get_ccd_level(): 716 logging.info('CCD privilege level is already %s', level) 717 return 718 719 if 'testlab' in level: 720 raise error.TestError("Can't change testlab mode using " 721 "ccd_set_level") 722 723 testlab_on = self._state_to_bool(self._servo.get('cr50_testlab')) 724 req_pp = self._level_change_req_pp(level) 725 has_pp = not self.using_ccd() 726 dbg_en = 'DBG' in self._servo.get('cr50_version') 727 728 if req_pp and not has_pp: 729 raise error.TestError("Can't change privilege level to '%s' " 730 "without physical presence." % level) 731 732 if not testlab_on and not has_pp: 733 raise error.TestError("Wont change privilege level without " 734 "physical presence or testlab mode enabled") 735 736 original_timeout = float(self._servo.get('cr50_uart_timeout')) 737 # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may 738 # take more than 3 seconds. 739 self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT) 740 # Start the unlock process. 741 742 if level == 'open' or level == 'unlock': 743 logging.info('waiting %d seconds, the minimum time between' 744 ' ccd password attempts', 745 self.CCD_PASSWORD_RATE_LIMIT) 746 time.sleep(self.CCD_PASSWORD_RATE_LIMIT) 747 748 try: 749 cmd = 'ccd %s%s' % (level, (' ' + password) if password else '') 750 rv = self.send_command_get_output(cmd, [cmd + '(.*)>'])[0][1] 751 finally: 752 self._servo.set('cr50_uart_timeout', original_timeout) 753 logging.info(rv) 754 if 'ccd_open denied: fwmp' in rv: 755 raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv)) 756 if 'Access Denied' in rv: 757 raise error.TestFail("%r %s" % (cmd, rv)) 758 if 'Busy' in rv: 759 raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv)) 760 761 # Press the power button once a second, if we need physical presence. 762 if req_pp: 763 # DBG images have shorter unlock processes 764 self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG) 765 766 if level != self.get_ccd_level(): 767 raise error.TestFail('Could not set privilege level to %s' % level) 768 769 logging.info('Successfully set CCD privelege level to %s', level) 770 771 772 def run_pp(self, unlock_timeout): 773 """Press the power button a for unlock_timeout seconds. 774 775 This will press the power button many more times than it needs to be 776 pressed. Cr50 doesn't care if you press it too often. It just cares that 777 you press the power button at least once within the detect interval. 778 779 For privilege level changes you need to press the power button 5 times 780 in the short interval and then 4 times within the long interval. 781 Short Interval 782 100msec < power button press < 5 seconds 783 Long Interval 784 60s < power button press < 300s 785 786 For testlab enable/disable you must press the power button 5 times 787 spaced between 100msec and 5 seconds apart. 788 """ 789 end_time = time.time() + unlock_timeout 790 791 logging.info('Pressing power button for %ds to unlock the console.', 792 unlock_timeout) 793 logging.info('The process should end at %s', time.ctime(end_time)) 794 795 # Press the power button once a second to unlock the console. 796 while time.time() < end_time: 797 self._servo.power_short_press() 798 time.sleep(1) 799 800 801 def gettime(self): 802 """Get the current cr50 system time""" 803 result = self.send_command_retry_get_output('gettime', [' = (.*) s']) 804 return float(result[0][1]) 805 806 807 def servo_v4_supports_dts_mode(self): 808 """Returns True if cr50 registers changes in servo v4 dts mode.""" 809 # This is to test that Cr50 actually recognizes the change in ccd state 810 # We cant do that with tests using ccd, because the cr50 communication 811 # goes down once ccd is enabled. 812 if 'servo_v4_with_servo_micro' != self._servo.get_servo_version(): 813 return False 814 815 ccd_start = 'on' if self.ccd_is_enabled() else 'off' 816 dts_start = self._servo.get('servo_v4_dts_mode') 817 try: 818 # Verify both ccd enable and disable 819 self.ccd_disable(raise_error=True) 820 self.ccd_enable(raise_error=True) 821 rv = True 822 except Exception, e: 823 logging.info(e) 824 rv = False 825 self._servo.set_nocheck('servo_v4_dts_mode', dts_start) 826 self.wait_for_stable_ccd_state(ccd_start, 60, True) 827 logging.info('Test setup does%s support servo DTS mode', 828 '' if rv else 'n\'t') 829 return rv 830 831 832 def wait_until_update_is_allowed(self): 833 """Wait until cr50 will be able to accept an update. 834 835 Cr50 rejects any attempt to update if it has been less than 60 seconds 836 since it last recovered from deep sleep or came up from reboot. This 837 will wait until cr50 gettime shows a time greater than 60. 838 """ 839 if self.get_active_version_info()[2]: 840 logging.info("Running DBG image. Don't need to wait for update.") 841 return 842 cr50_time = self.gettime() 843 if cr50_time < 60: 844 sleep_time = 61 - cr50_time 845 logging.info('Cr50 has been up for %ds waiting %ds before update', 846 cr50_time, sleep_time) 847 time.sleep(sleep_time) 848 849 def tpm_is_enabled(self): 850 """Query the current TPM mode. 851 852 Returns True if TPM is enabled, 853 False otherwise. 854 """ 855 result = self.send_command_get_output('sysinfo', 856 ['(?i)TPM\s+MODE:\s+(enabled|disabled)'])[0][1] 857 logging.debug(result) 858 859 return result.lower() == 'enabled' 860 861 def keyladder_is_enabled(self): 862 """Get the status of H1 Key Ladder. 863 864 Returns True if H1 Key Ladder is enabled. 865 False otherwise. 866 """ 867 result = self.send_command_get_output('sysinfo', 868 ['(?i)Key\s+Ladder:\s+(enabled|disabled)'])[0][1] 869 logging.debug(result) 870 871 return result.lower() == 'enabled' 872