1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import functools 6import logging 7import pprint 8import re 9import time 10 11from autotest_lib.client.bin import utils 12from autotest_lib.client.common_lib import error 13from autotest_lib.client.common_lib.cros import cr50_utils 14from autotest_lib.server.cros.servo import chrome_ec 15 16 17def dts_control_command(func): 18 """For methods that should only run when dts mode control is supported.""" 19 @functools.wraps(func) 20 def wrapper(instance, *args, **kwargs): 21 """Ignore those functions if dts mode control is not supported.""" 22 if instance._servo.dts_mode_is_valid(): 23 return func(instance, *args, **kwargs) 24 logging.info('Servo setup does not support DTS mode. ignoring %s', 25 func.func_name) 26 return wrapper 27 28 29class ChromeCr50(chrome_ec.ChromeConsole): 30 """Manages control of a Chrome Cr50. 31 32 We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50 33 provides many interfaces to set and get its behavior via console commands. 34 This class is to abstract these interfaces. 35 """ 36 PROD_RW_KEYIDS = ['0x87b73b67', '0xde88588d'] 37 PROD_RO_KEYIDS = ['0xaa66150f'] 38 OPEN = 'open' 39 UNLOCK = 'unlock' 40 LOCK = 'lock' 41 # The amount of time you need to show physical presence. 42 PP_SHORT = 15 43 PP_LONG = 300 44 CCD_PASSWORD_RATE_LIMIT = 3 45 IDLE_COUNT = 'count: (\d+)\s' 46 SHORT_WAIT = 3 47 # The version has four groups: the partition, the header version, debug 48 # descriptor and then version string. 49 # There are two partitions A and B. The active partition is marked with a 50 # '*'. If it is a debug image '/DBG' is added to the version string. If the 51 # image has been corrupted, the version information will be replaced with 52 # 'Error'. 53 # So the output may look something like this. 54 # RW_A: 0.0.21/cr50_v1.1.6133-fd788b 55 # RW_B: * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d 56 # Or like this if the region was corrupted. 57 # RW_A: * 0.0.21/cr50_v1.1.6133-fd788b 58 # RW_B: Error 59 VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s' 60 INACTIVE_VERSION = VERSION_FORMAT % '' 61 ACTIVE_VERSION = VERSION_FORMAT % '\*' 62 # Following lines of the version output may print the image board id 63 # information. eg. 64 # BID A: 5a5a4146:ffffffff:00007f00 Yes 65 # BID B: 00000000:00000000:00000000 Yes 66 # Use the first group from ACTIVE_VERSION to match the active board id 67 # partition. 68 BID_ERROR = 'read_board_id: failed' 69 BID_FORMAT = ':\s+[a-f0-9:]+ ' 70 ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT, 71 BID_ERROR) 72 WAKE_CHAR = '\n\n' 73 WAKE_RESPONSE = ['(>|Console is enabled)'] 74 START_UNLOCK_TIMEOUT = 20 75 GETTIME = ['= (\S+)'] 76 FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"] 77 FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting'] 78 MAX_RETRY_COUNT = 5 79 CCDSTATE_MAX_RETRY_COUNT = 20 80 START_STR = ['(.*Console is enabled;)'] 81 REBOOT_DELAY_WITH_CCD = 60 82 REBOOT_DELAY_WITH_FLEX = 3 83 ON_STRINGS = ['enable', 'enabled', 'on'] 84 CONSERVATIVE_CCD_WAIT = 10 85 CCD_SHORT_PRESSES = 5 86 CAP_IS_ACCESSIBLE = 0 87 CAP_SETTING = 1 88 CAP_REQ = 2 89 GET_CAP_TRIES = 5 90 # Regex to match the valid capability settings. 91 CAP_STATES = '(Always|Default|IfOpened|UnlessLocked)' 92 # List of all cr50 ccd capabilities. Same order of 'ccd' output 93 CAP_NAMES = [ 94 'UartGscRxAPTx', 'UartGscTxAPRx', 'UartGscRxECTx', 'UartGscTxECRx', 95 'FlashAP', 'FlashEC', 'OverrideWP', 'RebootECAP', 'GscFullConsole', 96 'UnlockNoReboot', 'UnlockNoShortPP', 'OpenNoTPMWipe', 'OpenNoLongPP', 97 'BatteryBypassPP', 'UpdateNoTPMWipe', 'I2C', 'FlashRead', 98 'OpenNoDevMode', 'OpenFromUSB' 99 ] 100 # There are two capability formats. Match both. 101 # UartGscRxECTx Y 3=IfOpened 102 # or 103 # UartGscRxECTx Y 0=Default (Always) 104 # Make sure the last word is at the end of the line. The next line will 105 # start with some whitespace, so account for that too. 106 CAP_FORMAT = '\s+(Y|-) \d\=%s( \(%s\))?[\r\n]+\s*' % (CAP_STATES, 107 CAP_STATES) 108 # Name each group, so we can use groupdict to extract all useful information 109 # from the ccd outupt. 110 CCD_FORMAT = [ 111 '(State: (?P<State>Opened|Locked|Unlocked))', 112 '(Password: (?P<Password>set|none))', 113 '(Flags: (?P<Flags>\S*))', 114 '(Capabilities:.*(?P<Capabilities>%s))' % 115 (CAP_FORMAT.join(CAP_NAMES) + CAP_FORMAT), 116 '(TPM:(?P<TPM>[ \S]*)\r)', 117 ] 118 119 # CR50 Board Properties as defined in platform/ec/board/cr50/scratch-reg1.h 120 BOARD_PROP = { 121 'BOARD_SLAVE_CONFIG_SPI' : 1 << 0, 122 'BOARD_SLAVE_CONFIG_I2C' : 1 << 1, 123 'BOARD_NEEDS_SYS_RST_PULL_UP' : 1 << 5, 124 'BOARD_USE_PLT_RESET' : 1 << 6, 125 'BOARD_WP_ASSERTED' : 1 << 8, 126 'BOARD_FORCING_WP' : 1 << 9, 127 'BOARD_NO_RO_UART' : 1 << 10, 128 'BOARD_CCD_STATE_MASK' : 3 << 11, 129 'BOARD_DEEP_SLEEP_DISABLED' : 1 << 13, 130 'BOARD_DETECT_AP_WITH_UART' : 1 << 14, 131 'BOARD_ITE_EC_SYNC_NEEDED' : 1 << 15, 132 'BOARD_WP_DISABLE_DELAY' : 1 << 16, 133 'BOARD_CLOSED_SOURCE_SET1' : 1 << 17, 134 'BOARD_CLOSED_LOOP_RESET' : 1 << 18, 135 'BOARD_NO_INA_SUPPORT' : 1 << 19, 136 'BOARD_ALLOW_CHANGE_TPM_MODE' : 1 << 20, 137 } 138 139 # CR50 reset flags as defined in platform ec_commands.h. These are only the 140 # flags used by cr50. 141 RESET_FLAGS = { 142 'RESET_FLAG_OTHER' : 1 << 0, 143 'RESET_FLAG_BROWNOUT' : 1 << 2, 144 'RESET_FLAG_POWER_ON' : 1 << 3, 145 'RESET_FLAG_SOFT' : 1 << 5, 146 'RESET_FLAG_HIBERNATE' : 1 << 6, 147 'RESET_FLAG_RTC_ALARM' : 3 << 7, 148 'RESET_FLAG_WAKE_PIN' : 1 << 8, 149 'RESET_FLAG_HARD' : 1 << 11, 150 'RESET_FLAG_USB_RESUME' : 1 << 14, 151 'RESET_FLAG_RDD' : 1 << 15, 152 'RESET_FLAG_RBOX' : 1 << 16, 153 'RESET_FLAG_SECURITY' : 1 << 17, 154 } 155 156 def __init__(self, servo, faft_config): 157 """Initializes a ChromeCr50 object. 158 159 @param servo: A servo object. 160 @param faft_config: A faft config object. 161 """ 162 super(ChromeCr50, self).__init__(servo, 'cr50_uart') 163 self.faft_config = faft_config 164 165 166 def wake_cr50(self): 167 """Wake up cr50 by sending some linebreaks and wait for the response""" 168 logging.debug(super(ChromeCr50, self).send_command_get_output( 169 self.WAKE_CHAR, self.WAKE_RESPONSE)) 170 171 172 def send_command(self, commands): 173 """Send command through UART. 174 175 Cr50 will drop characters input to the UART when it resumes from sleep. 176 If servo is not using ccd, send some dummy characters before sending the 177 real command to make sure cr50 is awake. 178 179 @param commands: the command string to send to cr50 180 """ 181 if self._servo.main_device_is_flex(): 182 self.wake_cr50() 183 super(ChromeCr50, self).send_command(commands) 184 185 186 def set_cap(self, cap, setting): 187 """Set the capability to setting 188 189 @param cap: The capability string 190 @param setting: The setting to set the capability to. 191 """ 192 self.set_caps({ cap : setting }) 193 194 195 def set_caps(self, cap_dict): 196 """Use cap_dict to set all the cap values 197 198 Set all of the capabilities in cap_dict to the correct config. 199 200 @param cap_dict: A dictionary with the capability as key and the desired 201 setting as values 202 """ 203 for cap, config in cap_dict.iteritems(): 204 self.send_command('ccd set %s %s' % (cap, config)) 205 current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING) 206 for cap, config in cap_dict.iteritems(): 207 if (current_cap_settings[cap].lower() != 208 config.lower()): 209 raise error.TestFail('Failed to set %s to %s' % (cap, config)) 210 211 212 def get_cap_overview(self, cap_dict): 213 """Get a basic overview of the capability dictionary 214 215 If all capabilities are set to Default, ccd has been reset to default. 216 If all capabilities are set to Always, ccd is in factory mode. 217 218 @param cap_dict: A dictionary of the capability settings 219 @return: A tuple of the capability overview (in factory mode, is reset) 220 """ 221 in_factory_mode = True 222 is_reset = True 223 for cap, cap_info in cap_dict.iteritems(): 224 cap_setting = cap_info[self.CAP_SETTING] 225 if cap_setting != 'Always': 226 in_factory_mode = False 227 if cap_setting != 'Default': 228 is_reset = False 229 return in_factory_mode, is_reset 230 231 232 def password_is_reset(self): 233 """Returns True if the password is cleared""" 234 return self.get_ccd_info()['Password'] == 'none' 235 236 237 def ccd_is_reset(self): 238 """Returns True if the ccd is reset 239 240 The password must be cleared, write protect and battery presence must 241 follow battery presence, and all capabilities must be Always 242 """ 243 return (self.password_is_reset() and self.wp_is_reset() and 244 self.batt_pres_is_reset() and 245 self.get_cap_overview(self.get_cap_dict())[1]) 246 247 248 def wp_is_reset(self): 249 """Returns True if wp is reset to follow batt pres at all times""" 250 follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state() 251 return follow_batt_pres and follow_batt_pres_atboot 252 253 254 def get_wp_state(self): 255 """Get the current write protect and atboot state 256 257 The atboot setting cannot really be determined now if it is set to 258 follow battery presence. It is likely to remain the same after reboot, 259 but who knows. If the third element of the tuple is True, the last 260 element will not be that useful 261 262 @return: a tuple with the current write protect state 263 (True if current state is to follow batt presence, 264 True if write protect is enabled, 265 True if current state is to follow batt presence atboot, 266 True if write protect is enabled atboot) 267 """ 268 rv = self.send_command_retry_get_output('wp', 269 ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?' 270 '(follow|enabled|disabled)'], safe=True)[0] 271 _, forced, enabled, _, atboot = rv 272 logging.debug(rv) 273 return (not forced, enabled =='enabled', 274 atboot == 'follow', atboot == 'enabled') 275 276 277 def in_dev_mode(self): 278 """Return True if cr50 thinks the device is in dev mode""" 279 return 'dev_mode' in self.get_ccd_info()['TPM'] 280 281 282 def get_ccd_info(self): 283 """Get the current ccd state. 284 285 Take the output of 'ccd' and convert it to a dictionary. 286 287 @return: A dictionary with the ccd state name as the key and setting as 288 value. 289 """ 290 matched_output = None 291 original_timeout = float(self._servo.get('cr50_uart_timeout')) 292 # Change the console timeout to 10s, it may take longer than 3s to read 293 # ccd info 294 self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT) 295 for i in range(self.GET_CAP_TRIES): 296 try: 297 # If some ccd output is dropped and the output doesn't match the 298 # expected ccd output format, send_command_get_output will wait the 299 # full CONSERVATIVE_CCD_WAIT even though ccd is done printing. Use 300 # re to search the command output instead of 301 # send_safe_command_get_output, so we don't have to wait the full 302 # timeout if output is dropped. 303 rv = self.send_command_retry_get_output('ccd', ['ccd.*>'], 304 safe=True)[0] 305 matched_output = re.search('.*'.join(self.CCD_FORMAT), rv, 306 re.DOTALL) 307 if matched_output: 308 break 309 logging.info('try %d: could not match ccd output %s', i, rv) 310 except Exception, e: 311 logging.info('try %d got error %s', i, str(e)) 312 313 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 314 if not matched_output: 315 raise error.TestFail('Could not get ccd output') 316 logging.info('Current CCD settings:\n%s', 317 pprint.pformat(matched_output.groupdict())) 318 return matched_output.groupdict() 319 320 321 def get_cap(self, cap): 322 """Returns the capabilitiy from the capability dictionary""" 323 return self.get_cap_dict()[cap] 324 325 326 def get_cap_dict(self, info=None): 327 """Get the current ccd capability settings. 328 329 The capability may be using the 'Default' setting. That doesn't say much 330 about the ccd state required to use the capability. Return all ccd 331 information in the cap_dict 332 [is accessible, setting, requirement] 333 334 @param info: Only fill the cap_dict with the requested information: 335 CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ 336 @return: A dictionary with the capability as the key a list of the 337 current settings as the value [is_accessible, setting, 338 requirement] 339 """ 340 # Add whitespace at the end, so we can still match the last line. 341 cap_info_str = self.get_ccd_info()['Capabilities'] + '\r\n' 342 cap_settings = re.findall('(\S+) ' + self.CAP_FORMAT, 343 cap_info_str) 344 caps = {} 345 for cap, accessible, setting, _, required in cap_settings: 346 # If there's only 1 value after =, then the setting is the 347 # requirement. 348 if not required: 349 required = setting 350 cap_info = [accessible == 'Y', setting, required] 351 if info is not None: 352 caps[cap] = cap_info[info] 353 else: 354 caps[cap] = cap_info 355 logging.debug(pprint.pformat(caps)) 356 return caps 357 358 359 def send_command_get_output(self, command, regexp_list): 360 """Send command through UART and wait for response. 361 362 Cr50 will drop characters input to the UART when it resumes from sleep. 363 If servo is not using ccd, send some dummy characters before sending the 364 real command to make sure cr50 is awake. 365 366 @param command: the command to send 367 @param regexp_list: The list of regular expressions to match in the 368 command output 369 @return: A list of matched output 370 """ 371 if self._servo.main_device_is_flex(): 372 self.wake_cr50() 373 374 # We have started prepending '\n' to separate cr50 console junk from 375 # the real command. If someone is just searching for .*>, then they will 376 # only get the output from the first '\n' we added. Raise an error to 377 # change the test to look for something more specific ex command.*>. 378 # cr50 will print the command in the output, so that is an easy way to 379 # modify '.*>' to match the real command output. 380 if '.*>' in regexp_list: 381 raise error.TestError('Send more specific regexp %r %r' % (command, 382 regexp_list)) 383 384 # prepend \n to separate the command from any junk that may have been 385 # sent to the cr50 uart. 386 command = '\n' + command 387 return super(ChromeCr50, self).send_command_get_output(command, 388 regexp_list) 389 390 391 def send_safe_command_get_output(self, command, regexp_list, 392 channel_mask=0x1): 393 """Restrict the console channels while sending console commands. 394 395 @param command: the command to send 396 @param regexp_list: The list of regular expressions to match in the 397 command output 398 @param channel_mask: The mask to pass to 'chan' prior to running the 399 command, indicating which channels should remain 400 enabled (0x1 is command output) 401 @return: A list of matched output 402 """ 403 self.send_command('chan save') 404 self.send_command('chan 0x%x' % channel_mask) 405 try: 406 rv = self.send_command_get_output(command, regexp_list) 407 finally: 408 self.send_command('chan restore') 409 return rv 410 411 412 def send_command_retry_get_output(self, command, regexp_list, safe=False, 413 compare_output=False): 414 """Retry the command 5 times if you get a timeout or drop some output 415 416 417 @param command: the command string 418 @param regexp_list: the regex to search for 419 @param safe: use send_safe_command_get_output if True otherwise use 420 send_command_get_output 421 @param compare_output: look for reproducible output 422 """ 423 send_command = (self.send_safe_command_get_output if safe else 424 self.send_command_get_output) 425 err = 'no consistent output' if compare_output else 'unknown' 426 past_rv = [] 427 for i in range(self.MAX_RETRY_COUNT): 428 try: 429 rv = send_command(command, regexp_list) 430 if not compare_output or rv in past_rv: 431 return rv 432 if past_rv: 433 logging.debug('%d %s not in %s', i, rv, past_rv) 434 past_rv.append(rv) 435 except Exception, e: 436 err = str(e) 437 logging.info('attempt %d %r: %s', i, command, str(e)) 438 if compare_output: 439 logging.info('No consistent output for %r %s', command, 440 pprint.pformat(past_rv)) 441 raise error.TestError('Issue sending %r command: %s' % (command, err)) 442 443 444 def get_deep_sleep_count(self): 445 """Get the deep sleep count from the idle task""" 446 result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT], 447 safe=True) 448 return int(result[0][1]) 449 450 451 def clear_deep_sleep_count(self): 452 """Clear the deep sleep count""" 453 self.send_command('idle c') 454 if self.get_deep_sleep_count(): 455 raise error.TestFail("Could not clear deep sleep count") 456 457 458 def get_board_properties(self): 459 """Get information from the version command""" 460 rv = self.send_command_retry_get_output('brdprop', 461 ['properties = (\S+)\s'], safe=True) 462 return int(rv[0][1], 16) 463 464 465 def uses_board_property(self, prop_name): 466 """Returns 1 if the given property is set, or 0 otherwise 467 468 @param prop_name: a property name in string type. 469 """ 470 brdprop = self.get_board_properties() 471 prop = self.BOARD_PROP[prop_name] 472 return bool(brdprop & prop) 473 474 475 def has_command(self, cmd): 476 """Returns 1 if cr50 has the command 0 if it doesn't""" 477 try: 478 self.send_safe_command_get_output('help', [cmd]) 479 except: 480 logging.info("Image does not include '%s' command", cmd) 481 return 0 482 return 1 483 484 485 def reboot(self): 486 """Reboot Cr50 and wait for cr50 to reset""" 487 self.wait_for_reboot(cmd='reboot') 488 489 490 def _uart_wait_for_reboot(self, cmd='\n', timeout=60): 491 """Use uart to wait for cr50 to reboot. 492 493 If a command is given run it and wait for cr50 to reboot. Monitor 494 the cr50 uart to detect the reset. Wait up to timeout seconds 495 for the reset. 496 497 @param cmd: the command to run to reset cr50. 498 @param timeout: seconds to wait to detect the reboot. 499 """ 500 original_timeout = float(self._servo.get('cr50_uart_timeout')) 501 # Change the console timeout to timeout, so we wait at least that long 502 # for cr50 to print the start string. 503 self._servo.set_nocheck('cr50_uart_timeout', timeout) 504 try: 505 self.send_command_get_output(cmd, self.START_STR) 506 logging.debug('Detected cr50 reboot') 507 except error.TestFail, e: 508 logging.debug('Failed to detect cr50 reboot') 509 # Reset the timeout. 510 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 511 512 513 def wait_for_reboot(self, cmd='\n', timeout=60): 514 """Wait for cr50 to reboot 515 516 Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if 517 necessary. 518 519 @param cmd: the command to run to reset cr50. 520 @param timeout: seconds to wait to detect the reboot. 521 """ 522 if self._servo.main_device_is_ccd(): 523 self.send_command(cmd) 524 # Cr50 USB is reset when it reboots. Wait for the CCD connection to 525 # go down to detect the reboot. 526 self.wait_for_ccd_disable(timeout, raise_error=False) 527 self.ccd_enable() 528 else: 529 self._uart_wait_for_reboot(cmd, timeout) 530 531 # On most devices, a Cr50 reset will cause an AP reset. Force this to 532 # happen on devices where the AP is left down. 533 if not self.faft_config.ap_up_after_cr50_reboot: 534 logging.info('Resetting DUT after Cr50 reset') 535 self._servo.get_power_state_controller().reset() 536 537 538 def set_board_id(self, chip_bid, chip_flags): 539 """Set the chip board id type and flags.""" 540 self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags)) 541 542 543 def get_board_id(self): 544 """Get the chip board id type and flags. 545 546 bid_type_inv will be '' if the bid output doesn't show it. If no board 547 id type inv is shown, then board id is erased will just check the type 548 and flags. 549 550 @returns a tuple (A string of bid_type:bid_type_inv:bid_flags, 551 True if board id is erased) 552 """ 553 bid = self.send_command_retry_get_output('bid', 554 ['Board ID: (\S{8}):?(|\S{8}), flags (\S{8})\s'], 555 safe=True)[0][1:] 556 bid_str = ':'.join(bid) 557 bid_is_erased = set(bid).issubset({'', 'ffffffff'}) 558 logging.info('chip board id: %s', bid_str) 559 logging.info('chip board id is erased: %s', 560 'yes' if bid_is_erased else 'no') 561 return bid_str, bid_is_erased 562 563 564 def eraseflashinfo(self, retries=10): 565 """Run eraseflashinfo. 566 567 @returns True if the board id is erased 568 """ 569 for i in range(retries): 570 # The console could drop characters while matching 'eraseflashinfo'. 571 # Retry if the command times out. It's ok to run eraseflashinfo 572 # multiple times. 573 rv = self.send_command_retry_get_output( 574 'eraseflashinfo', ['eraseflashinfo(.*)>'])[0][1].strip() 575 logging.info('eraseflashinfo output: %r', rv) 576 bid_erased = self.get_board_id()[1] 577 eraseflashinfo_issue = 'Busy' in rv or 'do_flash_op' in rv 578 if not eraseflashinfo_issue and bid_erased: 579 break 580 logging.info('Retrying eraseflashinfo') 581 return bid_erased 582 583 584 def rollback(self): 585 """Set the reset counter high enough to force a rollback and reboot.""" 586 if not self.has_command('rollback'): 587 raise error.TestError("need image with 'rollback'") 588 589 inactive_partition = self.get_inactive_version_info()[0] 590 591 self.wait_for_reboot(cmd='rollback') 592 593 running_partition = self.get_active_version_info()[0] 594 if inactive_partition != running_partition: 595 raise error.TestError("Failed to rollback to inactive image") 596 597 598 def rolledback(self): 599 """Returns true if cr50 just rolled back""" 600 return 'Rollback detected' in self.send_command_retry_get_output( 601 'sysinfo', ['sysinfo.*>'], safe=True)[0] 602 603 604 def get_version_info(self, regexp): 605 """Get information from the version command""" 606 return self.send_command_retry_get_output('ver', [regexp], 607 safe=True)[0][1::] 608 609 610 def get_inactive_version_info(self): 611 """Get the active partition, version, and hash""" 612 return self.get_version_info(self.INACTIVE_VERSION) 613 614 615 def get_active_version_info(self): 616 """Get the active partition, version, and hash""" 617 return self.get_version_info(self.ACTIVE_VERSION) 618 619 620 def using_prod_rw_keys(self): 621 """Returns True if the RW keyid is prod""" 622 rv = self.send_command_retry_get_output('sysinfo', 623 ['RW keyid:\s+(0x[0-9a-f]{8})'], safe=True)[0][1] 624 logging.info('RW Keyid: 0x%s', rv) 625 return rv in self.PROD_RW_KEYIDS 626 627 628 def get_active_board_id_str(self): 629 """Get the running image board id. 630 631 @return: The board id string or None if the image does not support board 632 id or the image is not board id locked. 633 """ 634 # Getting the board id from the version console command is only 635 # supported in board id locked images .22 and above. Any image that is 636 # board id locked will have support for getting the image board id. 637 # 638 # If board id is not supported on the device, return None. This is 639 # still expected on all current non board id locked release images. 640 try: 641 version_info = self.get_version_info(self.ACTIVE_BID) 642 except error.TestFail, e: 643 logging.info(str(e)) 644 logging.info('Cannot use the version to get the board id') 645 return None 646 647 if self.BID_ERROR in version_info[4]: 648 raise error.TestError(version_info) 649 bid = version_info[4].split()[1] 650 return cr50_utils.GetBoardIdInfoString(bid) 651 652 653 def get_version(self): 654 """Get the RW version""" 655 return self.get_active_version_info()[1].strip() 656 657 658 def ccd_is_enabled(self): 659 """Return True if ccd is enabled. 660 661 If the test is running through ccd, return the ccd_state value. If 662 a flex cable is being used, use the CCD_MODE_L gpio setting to determine 663 if Cr50 has ccd enabled. 664 665 @return: 'off' or 'on' based on whether the cr50 console is working. 666 """ 667 if self._servo.main_device_is_ccd(): 668 return self._servo.get('ccd_state') == 'on' 669 else: 670 return not bool(self.gpioget('CCD_MODE_L')) 671 672 673 @dts_control_command 674 def wait_for_stable_ccd_state(self, state, timeout, raise_error): 675 """Wait up to timeout seconds for CCD to be 'on' or 'off' 676 677 Verify ccd is off or on and remains in that state for 3 seconds. 678 679 @param state: a string either 'on' or 'off'. 680 @param timeout: time in seconds to wait 681 @param raise_error: Raise TestFail if the value is state is not reached. 682 @raise TestFail: if ccd never reaches the specified state 683 """ 684 wait_for_enable = state == 'on' 685 logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off') 686 enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable, 687 timeout_sec=timeout) 688 if enabled != wait_for_enable: 689 error_msg = ("timed out before detecting ccd '%s'" % 690 ('on' if wait_for_enable else 'off')) 691 if raise_error: 692 raise error.TestFail(error_msg) 693 logging.warning(error_msg) 694 else: 695 # Make sure the state doesn't change. 696 enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled, 697 timeout_sec=self.SHORT_WAIT) 698 if enabled != wait_for_enable: 699 error_msg = ("CCD switched %r after briefly being %r" % 700 ('on' if enabled else 'off', state)) 701 if raise_error: 702 raise error.TestFail(error_msg) 703 logging.info(error_msg) 704 logging.info("ccd is %r", 'on' if enabled else 'off') 705 706 707 @dts_control_command 708 def wait_for_ccd_disable(self, timeout=60, raise_error=True): 709 """Wait for the cr50 console to stop working""" 710 self.wait_for_stable_ccd_state('off', timeout, raise_error) 711 712 713 @dts_control_command 714 def wait_for_ccd_enable(self, timeout=60, raise_error=False): 715 """Wait for the cr50 console to start working""" 716 self.wait_for_stable_ccd_state('on', timeout, raise_error) 717 718 719 @dts_control_command 720 def ccd_disable(self, raise_error=True): 721 """Change the values of the CC lines to disable CCD""" 722 logging.info("disable ccd") 723 self._servo.set_dts_mode('off') 724 self.wait_for_ccd_disable(raise_error=raise_error) 725 726 727 @dts_control_command 728 def ccd_enable(self, raise_error=False): 729 """Reenable CCD and reset servo interfaces""" 730 logging.info("reenable ccd") 731 self._servo.set_dts_mode('on') 732 # If the test is actually running with ccd, wait for USB communication 733 # to come up after reset. 734 if self._servo.main_device_is_ccd(): 735 time.sleep(self._servo.USB_DETECTION_DELAY) 736 self.wait_for_ccd_enable(raise_error=raise_error) 737 738 739 def _level_change_req_pp(self, level): 740 """Returns True if setting the level will require physical presence""" 741 testlab_pp = level != 'testlab open' and 'testlab' in level 742 # If the level is open and the ccd capabilities say physical presence 743 # is required, then physical presence will be required. 744 open_pp = (level == 'open' and 745 not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE]) 746 return testlab_pp or open_pp 747 748 749 def _state_to_bool(self, state): 750 """Converts the state string to True or False""" 751 # TODO(mruthven): compare to 'on' once servo is up to date in the lab 752 return state.lower() in self.ON_STRINGS 753 754 755 def testlab_is_on(self): 756 """Returns True of testlab mode is on""" 757 return self._state_to_bool(self._servo.get('cr50_testlab')) 758 759 760 def set_ccd_testlab(self, state): 761 """Set the testlab mode 762 763 @param state: the desired testlab mode string: 'on' or 'off' 764 @raise TestFail: if testlab mode was not changed 765 """ 766 if self._servo.main_device_is_ccd(): 767 raise error.TestError('Cannot set testlab mode with CCD. Use flex ' 768 'cable instead.') 769 if not self.faft_config.has_powerbutton: 770 raise error.TestError('No power button on device') 771 772 request_on = self._state_to_bool(state) 773 testlab_on = self.testlab_is_on() 774 request_str = 'on' if request_on else 'off' 775 776 if testlab_on == request_on: 777 logging.info('ccd testlab already set to %s', request_str) 778 return 779 780 original_level = self.get_ccd_level() 781 782 # We can only change the testlab mode when the device is open. If 783 # testlab mode is already enabled, we can go directly to open using 'ccd 784 # testlab open'. This will save 5 minutes, because we can skip the 785 # physical presence check. 786 if testlab_on: 787 self.send_command('ccd testlab open') 788 else: 789 self.set_ccd_level('open') 790 791 # Set testlab mode 792 rv = self.send_command_get_output('ccd testlab %s' % request_str, 793 ['ccd.*>'])[0] 794 if 'Access Denied' in rv: 795 raise error.TestFail("'ccd %s' %s" % (request_str, rv)) 796 797 # Press the power button once a second for 15 seconds. 798 self.run_pp(self.PP_SHORT) 799 800 self.set_ccd_level(original_level) 801 if request_on != self.testlab_is_on(): 802 raise error.TestFail('Failed to set ccd testlab to %s' % state) 803 804 805 def get_ccd_level(self): 806 """Returns the current ccd privilege level""" 807 return self._servo.get('cr50_ccd_level').lower() 808 809 810 def set_ccd_level(self, level, password=''): 811 """Set the Cr50 CCD privilege level. 812 813 @param level: a string of the ccd privilege level: 'open', 'lock', or 814 'unlock'. 815 @param password: send the ccd command with password. This will still 816 require the same physical presence. 817 @raise TestFail: if the level couldn't be set 818 """ 819 # TODO(mruthven): add support for CCD password 820 level = level.lower() 821 822 if level == self.get_ccd_level(): 823 logging.info('CCD privilege level is already %s', level) 824 return 825 826 if 'testlab' in level: 827 raise error.TestError("Can't change testlab mode using " 828 "ccd_set_level") 829 830 testlab_on = self._state_to_bool(self._servo.get('cr50_testlab')) 831 batt_is_disconnected = self.get_batt_pres_state()[1] 832 req_pp = self._level_change_req_pp(level) 833 has_pp = not self._servo.main_device_is_ccd() 834 dbg_en = 'DBG' in self._servo.get('cr50_version') 835 836 if req_pp and not has_pp: 837 raise error.TestError("Can't change privilege level to '%s' " 838 "without physical presence." % level) 839 840 if not testlab_on and not has_pp: 841 raise error.TestError("Wont change privilege level without " 842 "physical presence or testlab mode enabled") 843 844 original_timeout = float(self._servo.get('cr50_uart_timeout')) 845 # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may 846 # take more than 3 seconds. 847 self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT) 848 # Start the unlock process. 849 850 if level == 'open' or level == 'unlock': 851 logging.info('waiting %d seconds, the minimum time between' 852 ' ccd password attempts', 853 self.CCD_PASSWORD_RATE_LIMIT) 854 time.sleep(self.CCD_PASSWORD_RATE_LIMIT) 855 856 try: 857 cmd = 'ccd %s%s' % (level, (' ' + password) if password else '') 858 # ccd command outputs on the rbox, ccd, and console channels, 859 # respectively. Cr50 uses these channels to print relevant ccd 860 # information. 861 # Restrict all other channels. 862 ccd_output_channels = 0x20000 | 0x8 | 0x1 863 rv = self.send_safe_command_get_output( 864 cmd, [cmd + '(.*)>'], 865 channel_mask=ccd_output_channels)[0][1] 866 finally: 867 self._servo.set('cr50_uart_timeout', original_timeout) 868 logging.info(rv) 869 if 'ccd_open denied: fwmp' in rv: 870 raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv)) 871 if 'Access Denied' in rv: 872 raise error.TestFail("%r %s" % (cmd, rv)) 873 if 'Busy' in rv: 874 raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv)) 875 876 # Press the power button once a second, if we need physical presence. 877 if req_pp and batt_is_disconnected: 878 # DBG images have shorter unlock processes 879 self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG) 880 881 if level != self.get_ccd_level(): 882 raise error.TestFail('Could not set privilege level to %s' % level) 883 884 logging.info('Successfully set CCD privelege level to %s', level) 885 886 887 def run_pp(self, unlock_timeout): 888 """Press the power button a for unlock_timeout seconds. 889 890 This will press the power button many more times than it needs to be 891 pressed. Cr50 doesn't care if you press it too often. It just cares that 892 you press the power button at least once within the detect interval. 893 894 For privilege level changes you need to press the power button 5 times 895 in the short interval and then 4 times within the long interval. 896 Short Interval 897 100msec < power button press < 5 seconds 898 Long Interval 899 60s < power button press < 300s 900 901 For testlab enable/disable you must press the power button 5 times 902 spaced between 100msec and 5 seconds apart. 903 """ 904 ap_on_before = self.ap_is_on() 905 906 end_time = time.time() + unlock_timeout 907 908 logging.info('Pressing power button for %ds to unlock the console.', 909 unlock_timeout) 910 logging.info('The process should end at %s', time.ctime(end_time)) 911 912 # Press the power button once a second to unlock the console. 913 while time.time() < end_time: 914 self._servo.power_short_press() 915 time.sleep(1) 916 917 # If the last power button press left the AP powered off, and it was on 918 # before, turn it back on. 919 time.sleep(self.faft_config.shutdown) 920 ap_on_after = self.ap_is_on() 921 logging.debug('During run_pp, AP %s -> %s', 922 'on' if ap_on_before else 'off', 923 'on' if ap_on_after else 'off') 924 if ap_on_before and not ap_on_after: 925 self._servo.power_short_press() 926 logging.debug('Pressing PP to turn back on') 927 928 929 def gettime(self): 930 """Get the current cr50 system time""" 931 result = self.send_safe_command_get_output('gettime', [' = (.*) s']) 932 return float(result[0][1]) 933 934 935 def servo_dts_mode_is_valid(self): 936 """Returns True if cr50 registers change in servo dts mode.""" 937 # This is to test that Cr50 actually recognizes the change in ccd state 938 # We cant do that with tests using ccd, because the cr50 communication 939 # goes down once ccd is enabled. 940 if not self._servo.dts_mode_is_safe(): 941 return False 942 943 ccd_start = 'on' if self.ccd_is_enabled() else 'off' 944 dts_start = self._servo.get_dts_mode() 945 try: 946 # Verify both ccd enable and disable 947 self.ccd_disable(raise_error=True) 948 self.ccd_enable(raise_error=True) 949 rv = True 950 except Exception, e: 951 logging.info(e) 952 rv = False 953 self._servo.set_dts_mode(dts_start) 954 self.wait_for_stable_ccd_state(ccd_start, 60, True) 955 logging.info('Test setup does%s support servo DTS mode', 956 '' if rv else 'n\'t') 957 return rv 958 959 960 def wait_until_update_is_allowed(self): 961 """Wait until cr50 will be able to accept an update. 962 963 Cr50 rejects any attempt to update if it has been less than 60 seconds 964 since it last recovered from deep sleep or came up from reboot. This 965 will wait until cr50 gettime shows a time greater than 60. 966 """ 967 if self.get_active_version_info()[2]: 968 logging.info("Running DBG image. Don't need to wait for update.") 969 return 970 cr50_time = self.gettime() 971 if cr50_time < 60: 972 sleep_time = 61 - cr50_time 973 logging.info('Cr50 has been up for %ds waiting %ds before update', 974 cr50_time, sleep_time) 975 time.sleep(sleep_time) 976 977 978 def tpm_is_enabled(self): 979 """Query the current TPM mode. 980 981 @return: True if TPM is enabled, False otherwise. 982 """ 983 result = self.send_command_retry_get_output('sysinfo', 984 ['(?i)TPM\s+MODE:\s+(enabled|disabled)'], safe=True)[0][1] 985 logging.debug(result) 986 987 return result.lower() == 'enabled' 988 989 990 def get_keyladder_state(self): 991 """Get the status of H1 Key Ladder. 992 993 @return: The keyladder state string. prod or dev both mean enabled. 994 """ 995 result = self.send_command_retry_get_output('sysinfo', 996 ['(?i)Key\s+Ladder:\s+(enabled|prod|dev|disabled)'], 997 safe=True)[0][1] 998 logging.debug(result) 999 return result 1000 1001 1002 def keyladder_is_disabled(self): 1003 """Get the status of H1 Key Ladder. 1004 1005 @return: True if H1 Key Ladder is disabled. False otherwise. 1006 """ 1007 return self.get_keyladder_state() == 'disabled' 1008 1009 1010 def get_sleepmask(self): 1011 """Returns the sleepmask as an int""" 1012 rv = self.send_command_retry_get_output('sleepmask', 1013 ['sleep mask: (\S{8})\s+'], safe=True)[0][1] 1014 logging.info('sleepmask %s', rv) 1015 return int(rv, 16) 1016 1017 1018 def get_ccdstate(self): 1019 """Return a dictionary of the ccdstate once it's done debouncing""" 1020 for i in range(self.CCDSTATE_MAX_RETRY_COUNT): 1021 rv = self.send_command_retry_get_output('ccdstate', 1022 ['ccdstate(.*)>'], safe=True)[0][0] 1023 1024 # Look for a line like 'AP: on' or 'AP: off'. 'debouncing' or 1025 # 'unknown' may appear transiently. 'debouncing' should transition 1026 # to 'on' or 'off' within 1 second, and 'unknown' should do so 1027 # within 20 seconds. 1028 if 'debouncing' not in rv and 'unknown' not in rv: 1029 break 1030 time.sleep(self.SHORT_WAIT) 1031 ccdstate = {} 1032 for line in rv.splitlines(): 1033 line = line.strip() 1034 if ':' in line: 1035 k, v = line.split(':', 1) 1036 ccdstate[k.strip()] = v.strip() 1037 logging.info('Current CCD state:\n%s', pprint.pformat(ccdstate)) 1038 return ccdstate 1039 1040 1041 def ap_is_on(self): 1042 """Get the power state of the AP. 1043 1044 @return: True if the AP is on; False otherwise. 1045 """ 1046 ap_state = self.get_ccdstate()['AP'] 1047 if ap_state == 'on': 1048 return True 1049 elif ap_state == 'off': 1050 return False 1051 else: 1052 raise error.TestFail('Read unusable AP state from ccdstate: "%s"', 1053 ap_state) 1054 1055 1056 def gpioget(self, signal_name): 1057 """Get the current state of the signal 1058 1059 @return an integer 1 or 0 based on the gpioget value 1060 """ 1061 result = self.send_command_retry_get_output('gpioget', 1062 ['(0|1)[ \S]*%s' % signal_name], safe=True) 1063 return int(result[0][1]) 1064 1065 1066 def batt_pres_is_reset(self): 1067 """Returns True if batt pres is reset to always follow batt pres""" 1068 follow_bp, _, follow_bp_atboot, _ = self.get_batt_pres_state() 1069 return follow_bp and follow_bp_atboot 1070 1071 1072 def get_batt_pres_state(self): 1073 """Get the current and atboot battery presence state 1074 1075 The atboot setting cannot really be determined now if it is set to 1076 follow battery presence. It is likely to remain the same after reboot, 1077 but who knows. If the third element of the tuple is True, the last 1078 element will not be that useful 1079 1080 @return: a tuple of the current battery presence state 1081 (True if current state is to follow batt presence, 1082 True if battery is connected, 1083 True if current state is to follow batt presence atboot, 1084 True if battery is connected atboot) 1085 """ 1086 # bpforce is added in 4.16. If the image doesn't have the command, cr50 1087 # always follows battery presence. In these images 'gpioget BATT_PRES_L' 1088 # accurately represents the battery presence state, because it can't be 1089 # overidden. 1090 if not self.has_command('bpforce'): 1091 batt_pres = not bool(self.gpioget('BATT_PRES_L')) 1092 return (True, batt_pres, True, batt_pres) 1093 1094 # The bpforce command is very similar to the wp command. It just 1095 # substitutes 'connected' for 'enabled' and 'disconnected' for 1096 # 'disabled'. 1097 rv = self.send_command_retry_get_output('bpforce', 1098 ['batt pres: (forced )?(con|dis).*at boot: (forced )?' 1099 '(follow|discon|con)'], safe=True)[0] 1100 _, forced, connected, _, atboot = rv 1101 logging.info(rv) 1102 return (not forced, connected == 'con', atboot == 'follow', 1103 atboot == 'con') 1104 1105 1106 def set_batt_pres_state(self, state, atboot): 1107 """Override the battery presence state. 1108 1109 @param state: a string of the battery presence setting: 'connected', 1110 'disconnected', or 'follow_batt_pres' 1111 @param atboot: True if we're overriding battery presence atboot 1112 """ 1113 cmd = 'bpforce %s%s' % (state, ' atboot' if atboot else '') 1114 logging.info('running %r', cmd) 1115 self.send_command(cmd) 1116 1117 1118 def dump_nvmem(self): 1119 """Print nvmem objects.""" 1120 rv = self.send_safe_command_get_output('dump_nvmem', 1121 ['dump_nvmem(.*)>'])[0][1] 1122 logging.info('NVMEM OUTPUT:\n%s', rv) 1123 1124 1125 def get_reset_cause(self): 1126 """Returns the reset flags for the last reset.""" 1127 rv = self.send_command_retry_get_output('sysinfo', 1128 ['Reset flags:\s+0x([0-9a-f]{8})\s'], compare_output=True)[0][1] 1129 logging.info('reset cause: %s', rv) 1130 return int(rv, 16) 1131 1132 1133 def was_reset(self, reset_type): 1134 """Returns 1 if the reset type is found in the reset_cause. 1135 1136 @param reset_type: reset name in string type. 1137 """ 1138 reset_cause = self.get_reset_cause() 1139 reset_flag = self.RESET_FLAGS[reset_type] 1140 return bool(reset_cause & reset_flag) 1141