1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import functools 6import logging 7import time 8 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib.cros import cr50_utils 12from autotest_lib.server.cros.servo import chrome_ec 13 14 15def servo_v4_command(func): 16 """Decorator for methods only relevant to tests running with servo v4.""" 17 @functools.wraps(func) 18 def wrapper(instance, *args, **kwargs): 19 """Ignore servo v4 functions it's not being used.""" 20 if instance.using_servo_v4(): 21 return func(instance, *args, **kwargs) 22 logging.info("not using servo v4. ignoring %s", func.func_name) 23 return wrapper 24 25 26class ChromeCr50(chrome_ec.ChromeConsole): 27 """Manages control of a Chrome Cr50. 28 29 We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50 30 provides many interfaces to set and get its behavior via console commands. 31 This class is to abstract these interfaces. 32 """ 33 # The amount of time you need to show physical presence. 34 PP_SHORT = 15 35 PP_LONG = 300 36 IDLE_COUNT = 'count: (\d+)' 37 # The version has four groups: the partition, the header version, debug 38 # descriptor and then version string. 39 # There are two partitions A and B. The active partition is marked with a 40 # '*'. If it is a debug image '/DBG' is added to the version string. If the 41 # image has been corrupted, the version information will be replaced with 42 # 'Error'. 43 # So the output may look something like this. 44 # RW_A: 0.0.21/cr50_v1.1.6133-fd788b 45 # RW_B: * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d 46 # Or like this if the region was corrupted. 47 # RW_A: * 0.0.21/cr50_v1.1.6133-fd788b 48 # RW_B: Error 49 VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s' 50 INACTIVE_VERSION = VERSION_FORMAT % '' 51 ACTIVE_VERSION = VERSION_FORMAT % '\*' 52 # Following lines of the version output may print the image board id 53 # information. eg. 54 # BID A: 5a5a4146:ffffffff:00007f00 Yes 55 # BID B: 00000000:00000000:00000000 Yes 56 # Use the first group from ACTIVE_VERSION to match the active board id 57 # partition. 58 BID_ERROR = 'read_board_id: failed' 59 BID_FORMAT = ':\s+[a-f0-9:]+ ' 60 ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT, 61 BID_ERROR) 62 WAKE_CHAR = '\n' 63 START_UNLOCK_TIMEOUT = 20 64 GETTIME = ['= (\S+)'] 65 FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"] 66 FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting'] 67 MAX_RETRY_COUNT = 5 68 START_STR = ['(.*Console is enabled;)'] 69 REBOOT_DELAY_WITH_CCD = 60 70 REBOOT_DELAY_WITH_FLEX = 3 71 ON_STRINGS = ['enable', 'enabled', 'on'] 72 73 74 def __init__(self, servo): 75 super(ChromeCr50, self).__init__(servo, 'cr50_uart') 76 77 78 def send_command(self, commands): 79 """Send command through UART. 80 81 Cr50 will drop characters input to the UART when it resumes from sleep. 82 If servo is not using ccd, send some dummy characters before sending the 83 real command to make sure cr50 is awake. 84 """ 85 if not self.using_ccd(): 86 super(ChromeCr50, self).send_command(self.WAKE_CHAR) 87 super(ChromeCr50, self).send_command(commands) 88 89 90 def send_command_get_output(self, command, regexp_list): 91 """Send command through UART and wait for response. 92 93 Cr50 will drop characters input to the UART when it resumes from sleep. 94 If servo is not using ccd, send some dummy characters before sending the 95 real command to make sure cr50 is awake. 96 """ 97 if not self.using_ccd(): 98 super(ChromeCr50, self).send_command(self.WAKE_CHAR) 99 return super(ChromeCr50, self).send_command_get_output(command, 100 regexp_list) 101 102 103 def get_deep_sleep_count(self): 104 """Get the deep sleep count from the idle task""" 105 result = self.send_command_get_output('idle', [self.IDLE_COUNT]) 106 return int(result[0][1]) 107 108 109 def clear_deep_sleep_count(self): 110 """Clear the deep sleep count""" 111 result = self.send_command_get_output('idle c', [self.IDLE_COUNT]) 112 if int(result[0][1]): 113 raise error.TestFail("Could not clear deep sleep count") 114 115 116 def has_command(self, cmd): 117 """Returns 1 if cr50 has the command 0 if it doesn't""" 118 try: 119 self.send_command_get_output('help', [cmd]) 120 except: 121 logging.info("Image does not include '%s' command", cmd) 122 return 0 123 return 1 124 125 126 def erase_nvmem(self): 127 """Use flasherase to erase both nvmem sections""" 128 if not self.has_command('flasherase'): 129 raise error.TestError("need image with 'flasherase'") 130 131 self.send_command('flasherase 0x7d000 0x3000') 132 self.send_command('flasherase 0x3d000 0x3000') 133 134 135 def reboot(self): 136 """Reboot Cr50 and wait for cr50 to reset""" 137 response = [] if self.using_ccd() else self.START_STR 138 self.send_command_get_output('reboot', response) 139 140 # ccd will stop working after the reboot. Wait until that happens and 141 # reenable it. 142 if self.using_ccd(): 143 self.wait_for_reboot() 144 145 146 def _uart_wait_for_reboot(self, timeout=60): 147 """Wait for the cr50 to reboot and enable the console. 148 149 This will wait up to timeout seconds for cr50 to print the start string. 150 151 Args: 152 timeout: seconds to wait to detect the reboot. 153 """ 154 original_timeout = float(self._servo.get('cr50_uart_timeout')) 155 # Change the console timeout to timeout, so we wait at least that long 156 # for cr50 to print the start string. 157 self._servo.set_nocheck('cr50_uart_timeout', timeout) 158 try: 159 self.send_command_get_output('\n', self.START_STR) 160 logging.debug('Detected cr50 reboot') 161 except error.TestFail, e: 162 logging.debug('Failed to detect cr50 reboot') 163 # Reset the timeout. 164 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 165 166 167 def wait_for_reboot(self, timeout=60): 168 """Wait for cr50 to reboot""" 169 if self.using_ccd(): 170 # Cr50 USB is reset when it reboots. Wait for the CCD connection to 171 # go down to detect the reboot. 172 self.wait_for_ccd_disable(timeout, raise_error=False) 173 self.ccd_enable() 174 else: 175 self._uart_wait_for_reboot(timeout) 176 177 178 def rollback(self, eraseflashinfo=True, chip_bid=None, chip_flags=None): 179 """Set the reset counter high enough to force a rollback then reboot 180 181 Set the new board id before rolling back if one is given. 182 183 Args: 184 eraseflashinfo: True if eraseflashinfo should be run before rollback 185 chip_bid: the integer representation of chip board id or None if the 186 board id should be erased during rollback 187 chip_flags: the integer representation of chip board id flags or 188 None if the board id should be erased during rollback 189 """ 190 if (not self.has_command('rollback') or not 191 self.has_command('eraseflashinfo')): 192 raise error.TestError("need image with 'rollback' and " 193 "'eraseflashinfo'") 194 195 inactive_partition = self.get_inactive_version_info()[0] 196 # Set the board id if both the board id and flags have been given. 197 set_bid = chip_bid and chip_flags 198 199 # Erase the infomap 200 if eraseflashinfo or set_bid: 201 self.send_command('eraseflashinfo') 202 203 # Update the board id after it has been erased 204 if set_bid: 205 self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags)) 206 207 self.send_command('rollback') 208 209 # If we aren't using ccd, we should be able to detect the reboot 210 # almost immediately 211 self.wait_for_reboot(self.REBOOT_DELAY_WITH_CCD if self.using_ccd() else 212 self.REBOOT_DELAY_WITH_FLEX) 213 214 running_partition = self.get_active_version_info()[0] 215 if inactive_partition != running_partition: 216 raise error.TestError("Failed to rollback to inactive image") 217 218 219 def rolledback(self): 220 """Returns true if cr50 just rolled back""" 221 return int(self._servo.get('cr50_reset_count')) > self.MAX_RETRY_COUNT 222 223 224 def get_version_info(self, regexp): 225 """Get information from the version command""" 226 return self.send_command_get_output('ver', [regexp])[0][1::] 227 228 229 def get_inactive_version_info(self): 230 """Get the active partition, version, and hash""" 231 return self.get_version_info(self.INACTIVE_VERSION) 232 233 234 def get_active_version_info(self): 235 """Get the active partition, version, and hash""" 236 return self.get_version_info(self.ACTIVE_VERSION) 237 238 239 def get_active_board_id_str(self): 240 """Get the running image board id. 241 242 Returns: 243 The board id string or None if the image does not support board id 244 or the image is not board id locked. 245 """ 246 # Getting the board id from the version console command is only 247 # supported in board id locked images .22 and above. Any image that is 248 # board id locked will have support for getting the image board id. 249 # 250 # If board id is not supported on the device, return None. This is 251 # still expected on all current non board id locked release images. 252 try: 253 version_info = self.get_version_info(self.ACTIVE_BID) 254 except error.TestFail, e: 255 logging.info(e.message) 256 logging.info('Cannot use the version to get the board id') 257 return None 258 259 if self.BID_ERROR in version_info[4]: 260 raise error.TestError(version_info) 261 bid = version_info[4].split()[1] 262 return bid if bid != cr50_utils.EMPTY_IMAGE_BID else None 263 264 265 def get_version(self): 266 """Get the RW version""" 267 return self.get_active_version_info()[1].strip() 268 269 270 def using_servo_v4(self): 271 """Returns true if the console is being served using servo v4""" 272 return 'servo_v4' in self._servo.get_servo_version() 273 274 275 def using_ccd(self): 276 """Returns true if the console is being served using CCD""" 277 return 'ccd_cr50' in self._servo.get_servo_version() 278 279 280 def ccd_is_enabled(self): 281 """Return True if ccd is enabled. 282 283 If the test is running through ccd, return the ccd_state value. If 284 a flex cable is being used, use the CCD_MODE_L gpio setting to determine 285 if Cr50 has ccd enabled. 286 287 Returns: 288 'off' or 'on' based on whether the cr50 console is working. 289 """ 290 if self.using_ccd(): 291 return self._servo.get('ccd_state') == 'on' 292 else: 293 result = self.send_command_get_output('gpioget', 294 ['(0|1)..CCD_MODE_L']) 295 return not bool(int(result[0][1])) 296 297 298 @servo_v4_command 299 def wait_for_ccd_state(self, state, timeout, raise_error): 300 """Wait up to timeout seconds for CCD to be 'on' or 'off' 301 Args: 302 state: a string either 'on' or 'off'. 303 timeout: time in seconds to wait 304 raise_error: Raise TestFail if the value is state is not reached. 305 306 Raises 307 TestFail if ccd never reaches the specified state 308 """ 309 wait_for_enable = state == 'on' 310 logging.info("Wait until ccd is '%s'", state) 311 value = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable, 312 timeout_sec=timeout) 313 if value != wait_for_enable: 314 error_msg = "timed out before detecting ccd '%s'" % state 315 if raise_error: 316 raise error.TestFail(error_msg) 317 logging.warning(error_msg) 318 logging.info("ccd is '%s'", state) 319 320 321 @servo_v4_command 322 def wait_for_ccd_disable(self, timeout=60, raise_error=True): 323 """Wait for the cr50 console to stop working""" 324 self.wait_for_ccd_state('off', timeout, raise_error) 325 326 327 @servo_v4_command 328 def wait_for_ccd_enable(self, timeout=60, raise_error=False): 329 """Wait for the cr50 console to start working""" 330 self.wait_for_ccd_state('on', timeout, raise_error) 331 332 333 @servo_v4_command 334 def ccd_disable(self, raise_error=True): 335 """Change the values of the CC lines to disable CCD""" 336 logging.info("disable ccd") 337 self._servo.set_nocheck('servo_v4_dts_mode', 'off') 338 self.wait_for_ccd_disable(raise_error=raise_error) 339 340 341 @servo_v4_command 342 def ccd_enable(self, raise_error=False): 343 """Reenable CCD and reset servo interfaces""" 344 logging.info("reenable ccd") 345 self._servo.set_nocheck('servo_v4_dts_mode', 'on') 346 # If the test is actually running with ccd, reset usb and wait for 347 # communication to come up. 348 if self.using_ccd(): 349 self._servo.set_nocheck('power_state', 'ccd_reset') 350 self.wait_for_ccd_enable(raise_error=raise_error) 351 352 353 def _level_change_req_pp(self, level): 354 """Returns True if setting the level will require physical presence""" 355 testlab_pp = level != 'testlab open' and 'testlab' in level 356 open_pp = level == 'open' 357 return testlab_pp or open_pp 358 359 360 def _state_to_bool(self, state): 361 """Converts the state string to True or False""" 362 # TODO(mruthven): compare to 'on' once servo is up to date in the lab 363 return state.lower() in self.ON_STRINGS 364 365 366 def testlab_is_on(self): 367 """Returns True of testlab mode is on""" 368 return self._state_to_bool(self._servo.get('cr50_testlab')) 369 370 371 def set_ccd_testlab(self, state): 372 """Set the testlab mode 373 374 Args: 375 state: the desired testlab mode string: 'on' or 'off' 376 377 Raises: 378 TestFail if testlab mode was not changed 379 """ 380 if self.using_ccd(): 381 raise error.TestError('Cannot set testlab mode with CCD. Use flex ' 382 'cable instead.') 383 384 request_on = self._state_to_bool(state) 385 testlab_on = self.testlab_is_on() 386 request_str = 'on' if request_on else 'off' 387 388 if testlab_on == request_on: 389 logging.info('ccd testlab already set to %s', request_str) 390 return 391 392 original_level = self.get_ccd_level() 393 394 # We can only change the testlab mode when the device is open. If 395 # testlab mode is already enabled, we can go directly to open using 'ccd 396 # testlab open'. This will save 5 minutes, because we can skip the 397 # physical presence check. 398 if testlab_on: 399 self.send_command('ccd testlab open') 400 else: 401 self.set_ccd_level('open') 402 403 # Set testlab mode 404 rv = self.send_command_get_output('ccd testlab %s' % request_str, 405 ['.*>'])[0] 406 if 'Access Denied' in rv: 407 raise error.TestFail("'ccd %s' %s" % (request_str, rv)) 408 409 # Press the power button once a second for 15 seconds. 410 self.run_pp(self.PP_SHORT) 411 412 self.set_ccd_level(original_level) 413 414 if request_on != self.testlab_is_on(): 415 raise error.TestFail('Failed to set ccd testlab to %s' % state) 416 417 418 def get_ccd_level(self): 419 """Returns the current ccd privilege level""" 420 # TODO(mruthven): delete the part removing the trailing 'ed' once 421 # servo is up to date in the lab 422 return self._servo.get('cr50_ccd_level').lower().rstrip('ed') 423 424 425 def set_ccd_level(self, level): 426 """Set the Cr50 CCD privilege level. 427 428 Args: 429 level: a string of the ccd privilege level: 'open', 'lock', or 430 'unlock'. 431 432 Raises: 433 TestFail if the level couldn't be set 434 .""" 435 # TODO(mruthven): add support for CCD password 436 level = level.lower() 437 438 if level == self.get_ccd_level(): 439 logging.info('CCD privilege level is already %s', level) 440 return 441 442 if 'testlab' in level: 443 raise error.TestError("Can't change testlab mode using " 444 "ccd_set_level") 445 446 testlab_on = self._state_to_bool(self._servo.get('cr50_testlab')) 447 req_pp = self._level_change_req_pp(level) 448 has_pp = not self.using_ccd() 449 dbg_en = 'DBG' in self._servo.get('cr50_version') 450 451 if req_pp and not has_pp: 452 raise error.TestError("Can't change privilege level to '%s' " 453 "without physical presence." % level) 454 455 if not testlab_on and not has_pp: 456 raise error.TestError("Wont change privilege level without " 457 "physical presence or testlab mode enabled") 458 459 # Start the unlock process. 460 rv = self.send_command_get_output('ccd %s' % level, ['.*>'])[0] 461 logging.info(rv) 462 if 'Access Denied' in rv: 463 raise error.TestFail("'ccd %s' %s" % (level, rv)) 464 465 # Press the power button once a second, if we need physical presence. 466 if req_pp: 467 # DBG images have shorter unlock processes 468 self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG) 469 470 if level != self.get_ccd_level(): 471 raise error.TestFail('Could not set privilege level to %s' % level) 472 473 logging.info('Successfully set CCD privelege level to %s', level) 474 475 476 def run_pp(self, unlock_timeout): 477 """Press the power button a for unlock_timeout seconds. 478 479 This will press the power button many more times than it needs to be 480 pressed. Cr50 doesn't care if you press it too often. It just cares that 481 you press the power button at least once within the detect interval. 482 483 For privilege level changes you need to press the power button 5 times 484 in the short interval and then 4 times within the long interval. 485 Short Interval 486 100msec < power button press < 5 seconds 487 Long Interval 488 60s < power button press < 300s 489 490 For testlab enable/disable you must press the power button 5 times 491 spaced between 100msec and 5 seconds apart. 492 """ 493 end_time = time.time() + unlock_timeout 494 495 logging.info('Pressing power button for %ds to unlock the console.', 496 unlock_timeout) 497 logging.info('The process should end at %s', time.ctime(end_time)) 498 499 # Press the power button once a second to unlock the console. 500 while time.time() < end_time: 501 self._servo.power_short_press() 502 time.sleep(1) 503 504 505 def gettime(self): 506 """Get the current cr50 system time""" 507 result = self.send_command_get_output('gettime', [' = (.*) s']) 508 return float(result[0][1]) 509 510 511 def servo_v4_supports_dts_mode(self): 512 """Returns True if cr50 registers changes in servo v4 dts mode.""" 513 # This is to test that Cr50 actually recognizes the change in ccd state 514 # We cant do that with tests using ccd, because the cr50 communication 515 # goes down once ccd is enabled. 516 if 'servo_v4_with_servo_micro' != self._servo.get_servo_version(): 517 return False 518 519 start_val = self._servo.get('servo_v4_dts_mode') 520 try: 521 # Verify both ccd enable and disable 522 self.ccd_disable(raise_error=True) 523 self.ccd_enable(raise_error=True) 524 rv = True 525 except Exception, e: 526 logging.info(e) 527 rv = False 528 self._servo.set_nocheck('servo_v4_dts_mode', start_val) 529 self.wait_for_ccd_state(start_val, 60, True) 530 logging.info('Test setup does%s support servo DTS mode', 531 '' if rv else 'n\'t') 532 return rv 533 534 535 def wait_until_update_is_allowed(self): 536 """Wait until cr50 will be able to accept an update. 537 538 Cr50 rejects any attempt to update if it has been less than 60 seconds 539 since it last recovered from deep sleep or came up from reboot. This 540 will wait until cr50 gettime shows a time greater than 60. 541 """ 542 if self.get_active_version_info()[2]: 543 logging.info("Running DBG image. Don't need to wait for update.") 544 return 545 cr50_time = self.gettime() 546 if cr50_time < 60: 547 sleep_time = 61 - cr50_time 548 logging.info('Cr50 has been up for %ds waiting %ds before update', 549 cr50_time, sleep_time) 550 time.sleep(sleep_time) 551