1# Lint as: python2, python3 2# Copyright 2017 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import functools 11import logging 12import pprint 13import re 14import six 15from six.moves import range 16import time 17 18from autotest_lib.client.bin import utils 19from autotest_lib.client.common_lib import error 20from autotest_lib.client.common_lib.cros import cr50_utils 21from autotest_lib.server.cros.servo import chrome_ec 22from autotest_lib.server.cros.servo import servo 23 24 25def dts_control_command(func): 26 """For methods that should only run when dts mode control is supported.""" 27 @functools.wraps(func) 28 def wrapper(instance, *args, **kwargs): 29 """Ignore those functions if dts mode control is not supported.""" 30 if instance._servo.dts_mode_is_valid(): 31 return func(instance, *args, **kwargs) 32 logging.info('Servo setup does not support DTS mode. ignoring %s', 33 func.__name__) 34 return wrapper 35 36 37class ChromeCr50(chrome_ec.ChromeConsole): 38 """Manages control of a Chrome Cr50. 39 40 We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50 41 provides many interfaces to set and get its behavior via console commands. 42 This class is to abstract these interfaces. 43 """ 44 PROD_RW_KEYIDS = ['0x87b73b67', '0xde88588d'] 45 PROD_RO_KEYIDS = ['0xaa66150f'] 46 OPEN = 'open' 47 UNLOCK = 'unlock' 48 LOCK = 'lock' 49 # The amount of time you need to show physical presence. 50 PP_SHORT = 15 51 PP_LONG = 300 52 CCD_PASSWORD_RATE_LIMIT = 3 53 IDLE_COUNT = 'count: (\d+)\s' 54 SHORT_WAIT = 3 55 # The version has four groups: the partition, the header version, debug 56 # descriptor and then version string. 57 # There are two partitions A and B. The active partition is marked with a 58 # '*'. If it is a debug image '/DBG' is added to the version string. If the 59 # image has been corrupted, the version information will be replaced with 60 # 'Error'. 61 # So the output may look something like this. 62 # RW_A: 0.0.21/cr50_v1.1.6133-fd788b 63 # RW_B: * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d 64 # Or like this if the region was corrupted. 65 # RW_A: * 0.0.21/cr50_v1.1.6133-fd788b 66 # RW_B: Error 67 VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s' 68 INACTIVE_VERSION = VERSION_FORMAT % '' 69 ACTIVE_VERSION = VERSION_FORMAT % '\*' 70 # Following lines of the version output may print the image board id 71 # information. eg. 72 # BID A: 5a5a4146:ffffffff:00007f00 Yes 73 # BID B: 00000000:00000000:00000000 Yes 74 # Use the first group from ACTIVE_VERSION to match the active board id 75 # partition. 76 BID_ERROR = 'read_board_id: failed' 77 BID_FORMAT = ':\s+[a-f0-9:]{26} ' 78 ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT, 79 BID_ERROR) 80 WAKE_CHAR = '\n\n\n\n' 81 WAKE_RESPONSE = ['(>|Console is enabled)'] 82 START_UNLOCK_TIMEOUT = 20 83 GETTIME = ['= (\S+)'] 84 FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"] 85 FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting'] 86 MAX_RETRY_COUNT = 5 87 CCDSTATE_MAX_RETRY_COUNT = 20 88 START_STR = ['((Havn|UART).*Console is enabled;)'] 89 REBOOT_DELAY_WITH_CCD = 60 90 REBOOT_DELAY_WITH_FLEX = 3 91 ON_STRINGS = ['enable', 'enabled', 'on'] 92 CONSERVATIVE_CCD_WAIT = 10 93 CCD_SHORT_PRESSES = 5 94 CAP_IS_ACCESSIBLE = 0 95 CAP_SETTING = 1 96 CAP_REQ = 2 97 GET_CAP_TRIES = 20 98 # Regex to match the valid capability settings. 99 CAP_STATES = '(Always|Default|IfOpened|UnlessLocked)' 100 # List of all cr50 ccd capabilities. Same order of 'ccd' output 101 CAP_NAMES = [ 102 'UartGscRxAPTx', 'UartGscTxAPRx', 'UartGscRxECTx', 'UartGscTxECRx', 103 'FlashAP', 'FlashEC', 'OverrideWP', 'RebootECAP', 'GscFullConsole', 104 'UnlockNoReboot', 'UnlockNoShortPP', 'OpenNoTPMWipe', 'OpenNoLongPP', 105 'BatteryBypassPP', 'UpdateNoTPMWipe', 'I2C', 'FlashRead', 106 'OpenNoDevMode', 'OpenFromUSB', 'OverrideBatt' 107 ] 108 # There are two capability formats. Match both. 109 # UartGscRxECTx Y 3=IfOpened 110 # or 111 # UartGscRxECTx Y 0=Default (Always) 112 # Make sure the last word is at the end of the line. The next line will 113 # start with some whitespace, so account for that too. 114 CAP_FORMAT = '\s+(Y|-) \d\=%s( \(%s\))?[\r\n]+\s*' % (CAP_STATES, 115 CAP_STATES) 116 # Be as specific as possible with the 'ccd' output, so the test will notice 117 # missing characters and retry getting the output. Name each group, so the 118 # test can extract the field information into a dictionary. 119 # CCD_FIELDS is used to order the regex when searching for multiple fields 120 CCD_FIELDS = ['State', 'Password', 'Flags', 'Capabilities', 'TPM'] 121 # CCD_FORMAT has the field names as keys and the expected output as the 122 # value. 123 CCD_FORMAT = { 124 'State' : '(State: (?P<State>Opened|Locked|Unlocked))', 125 'Password' : '(Password: (?P<Password>set|none))', 126 'Flags' : '(Flags: (?P<Flags>\S*))', 127 'Capabilities' : '(Capabilities:.*(?P<Capabilities>%s))' % 128 (CAP_FORMAT.join(CAP_NAMES) + CAP_FORMAT), 129 'TPM' : '(TPM:(?P<TPM>[ \S]*)\r)', 130 } 131 132 # CR50 Board Properties as defined in platform/ec/board/cr50/scratch-reg1.h 133 BOARD_PROP = { 134 'BOARD_SLAVE_CONFIG_SPI' : 1 << 0, 135 'BOARD_SLAVE_CONFIG_I2C' : 1 << 1, 136 'BOARD_NEEDS_SYS_RST_PULL_UP' : 1 << 5, 137 'BOARD_USE_PLT_RESET' : 1 << 6, 138 'BOARD_WP_ASSERTED' : 1 << 8, 139 'BOARD_FORCING_WP' : 1 << 9, 140 'BOARD_NO_RO_UART' : 1 << 10, 141 'BOARD_CCD_STATE_MASK' : 3 << 11, 142 'BOARD_DEEP_SLEEP_DISABLED' : 1 << 13, 143 'BOARD_DETECT_AP_WITH_UART' : 1 << 14, 144 'BOARD_ITE_EC_SYNC_NEEDED' : 1 << 15, 145 'BOARD_WP_DISABLE_DELAY' : 1 << 16, 146 'BOARD_CLOSED_SOURCE_SET1' : 1 << 17, 147 'BOARD_CLOSED_LOOP_RESET' : 1 << 18, 148 'BOARD_NO_INA_SUPPORT' : 1 << 19, 149 'BOARD_ALLOW_CHANGE_TPM_MODE' : 1 << 20, 150 'BOARD_EC_CR50_COMM_SUPPORT' : 1 << 21, 151 'BOARD_CCD_REC_LID_PIN_DIOA1' : 0x01 << 22, 152 'BOARD_CCD_REC_LID_PIN_DIOA9' : 0x02 << 22, 153 'BOARD_CCD_REC_LID_PIN_DIOA12': 0x03 << 22, 154 } 155 156 # CR50 reset flags as defined in platform ec_commands.h. These are only the 157 # flags used by cr50. 158 RESET_FLAGS = { 159 'RESET_FLAG_OTHER' : 1 << 0, 160 'RESET_FLAG_BROWNOUT' : 1 << 2, 161 'RESET_FLAG_POWER_ON' : 1 << 3, 162 'RESET_FLAG_SOFT' : 1 << 5, 163 'RESET_FLAG_HIBERNATE' : 1 << 6, 164 'RESET_FLAG_RTC_ALARM' : 3 << 7, 165 'RESET_FLAG_WAKE_PIN' : 1 << 8, 166 'RESET_FLAG_HARD' : 1 << 11, 167 'RESET_FLAG_USB_RESUME' : 1 << 14, 168 'RESET_FLAG_RDD' : 1 << 15, 169 'RESET_FLAG_RBOX' : 1 << 16, 170 'RESET_FLAG_SECURITY' : 1 << 17, 171 } 172 173 def __init__(self, servo, faft_config): 174 """Initializes a ChromeCr50 object. 175 176 @param servo: A servo object. 177 @param faft_config: A faft config object. 178 """ 179 super(ChromeCr50, self).__init__(servo, 'cr50_uart') 180 self.faft_config = faft_config 181 182 183 def wake_cr50(self): 184 """Wake up cr50 by sending some linebreaks and wait for the response""" 185 for i in range(self.MAX_RETRY_COUNT): 186 try: 187 rv = super(ChromeCr50, self).send_command_get_output( 188 self.WAKE_CHAR, self.WAKE_RESPONSE) 189 logging.debug('wake result %r', rv) 190 return 191 except servo.ResponsiveConsoleError as e: 192 logging.info("Console responsive, but couldn't match wake " 193 "response %s", e) 194 raise servo.ResponsiveConsoleError('Unable to wake cr50') 195 196 197 def send_command(self, commands): 198 """Send command through UART. 199 200 Cr50 will drop characters input to the UART when it resumes from sleep. 201 If servo is not using ccd, send some dummy characters before sending the 202 real command to make sure cr50 is awake. 203 204 @param commands: the command string to send to cr50 205 """ 206 if self._servo.main_device_is_flex(): 207 self.wake_cr50() 208 super(ChromeCr50, self).send_command(commands) 209 210 211 def set_cap(self, cap, setting): 212 """Set the capability to setting 213 214 @param cap: The capability string 215 @param setting: The setting to set the capability to. 216 """ 217 self.set_caps({ cap : setting }) 218 219 220 def set_caps(self, cap_dict): 221 """Use cap_dict to set all the cap values 222 223 Set all of the capabilities in cap_dict to the correct config. 224 225 @param cap_dict: A dictionary with the capability as key and the desired 226 setting as values 227 """ 228 for cap, config in six.iteritems(cap_dict): 229 self.send_command('ccd set %s %s' % (cap, config)) 230 current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING) 231 for cap, config in six.iteritems(cap_dict): 232 if (current_cap_settings[cap].lower() != 233 config.lower()): 234 raise error.TestFail('Failed to set %s to %s' % (cap, config)) 235 236 237 def get_cap_overview(self, cap_dict): 238 """Get a basic overview of the capability dictionary 239 240 If all capabilities are set to Default, ccd has been reset to default. 241 If all capabilities are set to Always, ccd is in factory mode. 242 243 @param cap_dict: A dictionary of the capability settings 244 @return: A tuple of the capability overview (in factory mode, is reset) 245 """ 246 in_factory_mode = True 247 is_reset = True 248 for cap, cap_info in six.iteritems(cap_dict): 249 cap_setting = cap_info[self.CAP_SETTING] 250 if cap_setting != 'Always': 251 in_factory_mode = False 252 if cap_setting != 'Default': 253 is_reset = False 254 return in_factory_mode, is_reset 255 256 257 def password_is_reset(self): 258 """Returns True if the password is cleared""" 259 return self.get_ccd_info('Password') == 'none' 260 261 262 def ccd_is_reset(self): 263 """Returns True if the ccd is reset 264 265 The password must be cleared, write protect and battery presence must 266 follow battery presence, and all capabilities must be Always 267 """ 268 return (self.password_is_reset() and self.wp_is_reset() and 269 self.batt_pres_is_reset() and 270 self.get_cap_overview(self.get_cap_dict())[1]) 271 272 273 def wp_is_reset(self): 274 """Returns True if wp is reset to follow batt pres at all times""" 275 follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state() 276 return follow_batt_pres and follow_batt_pres_atboot 277 278 279 def get_wp_state(self): 280 """Get the current write protect and atboot state 281 282 The atboot setting cannot really be determined now if it is set to 283 follow battery presence. It is likely to remain the same after reboot, 284 but who knows. If the third element of the tuple is True, the last 285 element will not be that useful 286 287 @return: a tuple with the current write protect state 288 (True if current state is to follow batt presence, 289 True if write protect is enabled, 290 True if current state is to follow batt presence atboot, 291 True if write protect is enabled atboot) 292 """ 293 rv = self.send_command_retry_get_output('wp', 294 ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?' 295 '(follow|enabled|disabled)'], safe=True)[0] 296 _, forced, enabled, _, atboot = rv 297 logging.debug(rv) 298 return (not forced, enabled =='enabled', 299 atboot == 'follow', atboot == 'enabled') 300 301 302 def in_dev_mode(self): 303 """Return True if cr50 thinks the device is in dev mode""" 304 return 'dev_mode' in self.get_ccd_info('TPM') 305 306 307 def get_ccd_info(self, field=None): 308 """Get the current ccd state. 309 310 Take the output of 'ccd' and convert it to a dictionary. 311 312 @param: the ccd info param to get or None to get the full ccd output 313 dictionary. 314 @return: the field value or a dictionary with the ccd field name as the 315 key and the setting as the value. 316 """ 317 318 if field: 319 match_value = self.CCD_FORMAT[field] 320 else: 321 values = [ self.CCD_FORMAT[field] for field in self.CCD_FIELDS ] 322 match_value = '.*'.join(values) 323 matched_output = None 324 original_timeout = float(self._servo.get('cr50_uart_timeout')) 325 # Change the console timeout to 10s, it may take longer than 3s to read 326 # ccd info 327 self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT) 328 for i in range(self.GET_CAP_TRIES): 329 try: 330 # If some ccd output is dropped and the output doesn't match the 331 # expected ccd output format, send_command_get_output will wait the 332 # full CONSERVATIVE_CCD_WAIT even though ccd is done printing. Use 333 # re to search the command output instead of 334 # send_safe_command_get_output, so we don't have to wait the full 335 # timeout if output is dropped. 336 rv = self.send_command_retry_get_output('ccd', ['ccd.*>'], 337 safe=True)[0] 338 matched_output = re.search(match_value, rv, re.DOTALL) 339 if matched_output: 340 break 341 logging.info('try %d: could not match ccd output %s', i, rv) 342 except Exception as e: 343 logging.info('try %d got error %s', i, str(e)) 344 345 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 346 if not matched_output: 347 raise error.TestFail('Could not get ccd output') 348 matched_dict = matched_output.groupdict() 349 logging.info('Current CCD settings:\n%s', pprint.pformat(matched_dict)) 350 if field: 351 return matched_dict.get(field) 352 return matched_dict 353 354 355 def get_cap(self, cap): 356 """Returns the capabilitiy from the capability dictionary""" 357 return self.get_cap_dict()[cap] 358 359 360 def get_cap_dict(self, info=None): 361 """Get the current ccd capability settings. 362 363 The capability may be using the 'Default' setting. That doesn't say much 364 about the ccd state required to use the capability. Return all ccd 365 information in the cap_dict 366 [is accessible, setting, requirement] 367 368 @param info: Only fill the cap_dict with the requested information: 369 CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ 370 @return: A dictionary with the capability as the key a list of the 371 current settings as the value [is_accessible, setting, 372 requirement] 373 """ 374 # Add whitespace at the end, so we can still match the last line. 375 cap_info_str = self.get_ccd_info('Capabilities') + '\r\n' 376 cap_settings = re.findall('(\S+) ' + self.CAP_FORMAT, 377 cap_info_str) 378 caps = {} 379 for cap, accessible, setting, _, required in cap_settings: 380 # If there's only 1 value after =, then the setting is the 381 # requirement. 382 if not required: 383 required = setting 384 cap_info = [accessible == 'Y', setting, required] 385 if info is not None: 386 caps[cap] = cap_info[info] 387 else: 388 caps[cap] = cap_info 389 logging.debug(pprint.pformat(caps)) 390 return caps 391 392 393 def send_command_get_output(self, command, regexp_list): 394 """Send command through UART and wait for response. 395 396 Cr50 will drop characters input to the UART when it resumes from sleep. 397 If servo is not using ccd, send some dummy characters before sending the 398 real command to make sure cr50 is awake. 399 400 @param command: the command to send 401 @param regexp_list: The list of regular expressions to match in the 402 command output 403 @return: A list of matched output 404 """ 405 if self._servo.main_device_is_flex(): 406 self.wake_cr50() 407 408 # We have started prepending '\n' to separate cr50 console junk from 409 # the real command. If someone is just searching for .*>, then they will 410 # only get the output from the first '\n' we added. Raise an error to 411 # change the test to look for something more specific ex command.*>. 412 # cr50 will print the command in the output, so that is an easy way to 413 # modify '.*>' to match the real command output. 414 if '.*>' in regexp_list: 415 raise error.TestError('Send more specific regexp %r %r' % (command, 416 regexp_list)) 417 418 # prepend \n to separate the command from any junk that may have been 419 # sent to the cr50 uart. 420 command = '\n' + command 421 return super(ChromeCr50, self).send_command_get_output(command, 422 regexp_list) 423 424 425 def send_safe_command_get_output(self, command, regexp_list, 426 channel_mask=0x1): 427 """Restrict the console channels while sending console commands. 428 429 @param command: the command to send 430 @param regexp_list: The list of regular expressions to match in the 431 command output 432 @param channel_mask: The mask to pass to 'chan' prior to running the 433 command, indicating which channels should remain 434 enabled (0x1 is command output) 435 @return: A list of matched output 436 """ 437 self.send_command('chan save') 438 self.send_command('chan 0x%x' % channel_mask) 439 try: 440 rv = self.send_command_get_output(command, regexp_list) 441 finally: 442 self.send_command('chan restore') 443 return rv 444 445 446 def send_command_retry_get_output(self, command, regexp_list, safe=False, 447 compare_output=False, retries=MAX_RETRY_COUNT): 448 """Retry the command 5 times if you get a timeout or drop some output 449 450 451 @param command: the command string 452 @param regexp_list: the regex to search for 453 @param safe: use send_safe_command_get_output if True otherwise use 454 send_command_get_output 455 @param compare_output: look for reproducible output 456 """ 457 send_command = (self.send_safe_command_get_output if safe else 458 self.send_command_get_output) 459 err = 'no consistent output' if compare_output else 'unknown' 460 past_rv = [] 461 for i in range(retries): 462 try: 463 rv = send_command(command, regexp_list) 464 if not compare_output or rv in past_rv: 465 return rv 466 if past_rv: 467 logging.debug('%d %s not in %s', i, rv, past_rv) 468 past_rv.append(rv) 469 except Exception as e: 470 err = e 471 logging.info('attempt %d %r: %s %s', i, command, type(e), 472 str(e)) 473 if compare_output: 474 logging.info('No consistent output for %r %s', command, 475 pprint.pformat(past_rv)) 476 raise error.TestError('Issue sending %r command: %r' % (command, err)) 477 478 479 def get_deep_sleep_count(self): 480 """Get the deep sleep count from the idle task""" 481 result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT], 482 safe=True) 483 return int(result[0][1]) 484 485 486 def clear_deep_sleep_count(self): 487 """Clear the deep sleep count""" 488 self.send_command('idle c') 489 if self.get_deep_sleep_count(): 490 raise error.TestFail("Could not clear deep sleep count") 491 492 493 def get_board_properties(self): 494 """Get information from the version command""" 495 rv = self.send_command_retry_get_output('brdprop', 496 ['properties = (\S+)\s'], safe=True) 497 return int(rv[0][1], 16) 498 499 500 def uses_board_property(self, prop_name): 501 """Returns 1 if the given property is set, or 0 otherwise 502 503 @param prop_name: a property name in string type. 504 """ 505 brdprop = self.get_board_properties() 506 prop = self.BOARD_PROP[prop_name] 507 return (brdprop & prop) == prop 508 509 510 def has_command(self, cmd): 511 """Returns 1 if cr50 has the command 0 if it doesn't""" 512 try: 513 self.send_safe_command_get_output('help', [cmd]) 514 except: 515 logging.info("Image does not include '%s' command", cmd) 516 return 0 517 return 1 518 519 520 def reboot(self): 521 """Reboot Cr50 and wait for cr50 to reset""" 522 self.wait_for_reboot(cmd='reboot', timeout=10) 523 524 525 def _uart_wait_for_reboot(self, cmd='\n', timeout=60): 526 """Use uart to wait for cr50 to reboot. 527 528 If a command is given run it and wait for cr50 to reboot. Monitor 529 the cr50 uart to detect the reset. Wait up to timeout seconds 530 for the reset. 531 532 @param cmd: the command to run to reset cr50. 533 @param timeout: seconds to wait to detect the reboot. 534 """ 535 original_timeout = float(self._servo.get('cr50_uart_timeout')) 536 # Change the console timeout to timeout, so we wait at least that long 537 # for cr50 to print the start string. 538 self._servo.set_nocheck('cr50_uart_timeout', timeout) 539 try: 540 self.send_command_get_output(cmd, self.START_STR) 541 logging.debug('Detected cr50 reboot') 542 except error.TestFail as e: 543 logging.debug('Failed to detect cr50 reboot') 544 # Reset the timeout. 545 self._servo.set_nocheck('cr50_uart_timeout', original_timeout) 546 547 548 def wait_for_reboot(self, cmd='\n', timeout=60): 549 """Wait for cr50 to reboot 550 551 Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if 552 necessary. 553 554 @param cmd: the command to run to reset cr50. 555 @param timeout: seconds to wait to detect the reboot. 556 """ 557 logging.info('Wait up to %s seconds for reboot (%s)', timeout, 558 cmd.strip()) 559 if self._servo.main_device_is_ccd(): 560 self.send_command(cmd) 561 # Cr50 USB is reset when it reboots. Wait for the CCD connection to 562 # go down to detect the reboot. 563 self.wait_for_ccd_disable(timeout, raise_error=False) 564 self.ccd_enable() 565 else: 566 self._uart_wait_for_reboot(cmd, timeout) 567 568 # On most devices, a Cr50 reset will cause an AP reset. Force this to 569 # happen on devices where the AP is left down. 570 if not self.faft_config.ap_up_after_cr50_reboot: 571 # Reset the DUT a few seconds after cr50 reboot. 572 time.sleep(self.SHORT_WAIT) 573 logging.info('Resetting DUT after Cr50 reset') 574 self._servo.get_power_state_controller().reset() 575 576 577 def set_board_id(self, chip_bid, chip_flags): 578 """Set the chip board id type and flags.""" 579 self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags)) 580 581 582 def get_board_id(self): 583 """Get the chip board id type and flags. 584 585 bid_type_inv will be '' if the bid output doesn't show it. If no board 586 id type inv is shown, then board id is erased will just check the type 587 and flags. 588 589 @returns a tuple (A string of bid_type:bid_type_inv:bid_flags, 590 True if board id is erased) 591 """ 592 bid = self.send_command_retry_get_output('bid', 593 ['Board ID: (\S{8}):?(|\S{8}), flags (\S{8})\s'], 594 safe=True)[0][1:] 595 bid_str = ':'.join(bid) 596 bid_is_erased = set(bid).issubset({'', 'ffffffff'}) 597 logging.info('chip board id: %s', bid_str) 598 logging.info('chip board id is erased: %s', 599 'yes' if bid_is_erased else 'no') 600 return bid_str, bid_is_erased 601 602 603 def eraseflashinfo(self, retries=10): 604 """Run eraseflashinfo. 605 606 @returns True if the board id is erased 607 """ 608 for i in range(retries): 609 # The console could drop characters while matching 'eraseflashinfo'. 610 # Retry if the command times out. It's ok to run eraseflashinfo 611 # multiple times. 612 rv = self.send_command_retry_get_output( 613 'eraseflashinfo', ['eraseflashinfo(.*)>'])[0][1].strip() 614 logging.info('eraseflashinfo output: %r', rv) 615 bid_erased = self.get_board_id()[1] 616 eraseflashinfo_issue = 'Busy' in rv or 'do_flash_op' in rv 617 if not eraseflashinfo_issue and bid_erased: 618 break 619 logging.info('Retrying eraseflashinfo') 620 return bid_erased 621 622 623 def rollback(self): 624 """Set the reset counter high enough to force a rollback and reboot.""" 625 if not self.has_command('rollback'): 626 raise error.TestError("need image with 'rollback'") 627 628 inactive_partition = self.get_inactive_version_info()[0] 629 630 self.wait_for_reboot(cmd='rollback', timeout=10) 631 632 running_partition = self.get_active_version_info()[0] 633 if inactive_partition != running_partition: 634 raise error.TestError("Failed to rollback to inactive image") 635 636 637 def rolledback(self): 638 """Returns true if cr50 just rolled back""" 639 return 'Rollback detected' in self.send_command_retry_get_output( 640 'sysinfo', ['sysinfo.*>'], safe=True)[0] 641 642 643 def get_version_info(self, regexp): 644 """Get information from the version command""" 645 return self.send_command_retry_get_output('version', [regexp], 646 safe=True, 647 compare_output=True)[0][1::] 648 649 650 def get_inactive_version_info(self): 651 """Get the active partition, version, and hash""" 652 return self.get_version_info(self.INACTIVE_VERSION) 653 654 655 def get_active_version_info(self): 656 """Get the active partition, version, and hash""" 657 return self.get_version_info(self.ACTIVE_VERSION) 658 659 660 def using_prod_rw_keys(self): 661 """Returns True if the RW keyid is prod""" 662 rv = self.send_command_retry_get_output('sysinfo', 663 ['RW keyid:\s+(0x[0-9a-f]{8})'], safe=True)[0][1] 664 logging.info('RW Keyid: 0x%s', rv) 665 return rv in self.PROD_RW_KEYIDS 666 667 668 def get_active_board_id_str(self): 669 """Get the running image board id. 670 671 @return: The board id string or None if the image does not support board 672 id or the image is not board id locked. 673 """ 674 # Getting the board id from the version console command is only 675 # supported in board id locked images .22 and above. Any image that is 676 # board id locked will have support for getting the image board id. 677 # 678 # If board id is not supported on the device, return None. This is 679 # still expected on all current non board id locked release images. 680 try: 681 version_info = self.get_version_info(self.ACTIVE_BID) 682 except error.TestFail as e: 683 logging.info(str(e)) 684 logging.info('Cannot use the version to get the board id') 685 return None 686 687 if self.BID_ERROR in version_info[4]: 688 raise error.TestError(version_info) 689 bid = version_info[4].split()[1] 690 return cr50_utils.GetBoardIdInfoString(bid) 691 692 693 def get_version(self): 694 """Get the RW version""" 695 return self.get_active_version_info()[1].strip() 696 697 698 def get_full_version(self): 699 """Get the complete RW version string.""" 700 _, rw_ver, dbg, ver_str = self.get_active_version_info() 701 return rw_ver + (dbg if dbg else '') + ver_str 702 703 704 def ccd_is_enabled(self): 705 """Return True if ccd is enabled. 706 707 If the test is running through ccd, return the ccd_state value. If 708 a flex cable is being used, use the CCD_MODE_L gpio setting to determine 709 if Cr50 has ccd enabled. 710 711 @return: 'off' or 'on' based on whether the cr50 console is working. 712 """ 713 if self._servo.main_device_is_ccd(): 714 return self._servo.get('ccd_state') == 'on' 715 else: 716 return not bool(self.gpioget('CCD_MODE_L')) 717 718 719 @dts_control_command 720 def wait_for_stable_ccd_state(self, state, timeout, raise_error): 721 """Wait up to timeout seconds for CCD to be 'on' or 'off' 722 723 Verify ccd is off or on and remains in that state for 3 seconds. 724 725 @param state: a string either 'on' or 'off'. 726 @param timeout: time in seconds to wait 727 @param raise_error: Raise TestFail if the value is state is not reached. 728 @raise TestFail: if ccd never reaches the specified state 729 """ 730 wait_for_enable = state == 'on' 731 logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off') 732 enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable, 733 timeout_sec=timeout) 734 if enabled != wait_for_enable: 735 error_msg = ("timed out before detecting ccd '%s'" % 736 ('on' if wait_for_enable else 'off')) 737 if raise_error: 738 raise error.TestFail(error_msg) 739 logging.warning(error_msg) 740 else: 741 # Make sure the state doesn't change. 742 enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled, 743 timeout_sec=self.SHORT_WAIT) 744 if enabled != wait_for_enable: 745 error_msg = ("CCD switched %r after briefly being %r" % 746 ('on' if enabled else 'off', state)) 747 if raise_error: 748 raise error.TestFail(error_msg) 749 logging.info(error_msg) 750 logging.info("ccd is %r", 'on' if enabled else 'off') 751 752 753 @dts_control_command 754 def wait_for_ccd_disable(self, timeout=60, raise_error=True): 755 """Wait for the cr50 console to stop working""" 756 self.wait_for_stable_ccd_state('off', timeout, raise_error) 757 758 759 @dts_control_command 760 def wait_for_ccd_enable(self, timeout=60, raise_error=False): 761 """Wait for the cr50 console to start working""" 762 self.wait_for_stable_ccd_state('on', timeout, raise_error) 763 764 765 @dts_control_command 766 def ccd_disable(self, raise_error=True): 767 """Change the values of the CC lines to disable CCD""" 768 logging.info("disable ccd") 769 self._servo.set_dts_mode('off') 770 self.wait_for_ccd_disable(raise_error=raise_error) 771 772 773 @dts_control_command 774 def ccd_enable(self, raise_error=False): 775 """Reenable CCD and reset servo interfaces""" 776 logging.info("reenable ccd") 777 self._servo.set_dts_mode('on') 778 # If the test is actually running with ccd, wait for USB communication 779 # to come up after reset. 780 if self._servo.main_device_is_ccd(): 781 time.sleep(self._servo.USB_DETECTION_DELAY) 782 self.wait_for_ccd_enable(raise_error=raise_error) 783 784 785 def _level_change_req_pp(self, level): 786 """Returns True if setting the level will require physical presence""" 787 testlab_pp = level != 'testlab open' and 'testlab' in level 788 # If the level is open and the ccd capabilities say physical presence 789 # is required, then physical presence will be required. 790 open_pp = (level == 'open' and 791 not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE]) 792 return testlab_pp or open_pp 793 794 795 def _state_to_bool(self, state): 796 """Converts the state string to True or False""" 797 # TODO(mruthven): compare to 'on' once servo is up to date in the lab 798 return state.lower() in self.ON_STRINGS 799 800 801 def testlab_is_on(self): 802 """Returns True of testlab mode is on""" 803 return self._state_to_bool(self._servo.get('cr50_testlab')) 804 805 806 def set_ccd_testlab(self, state): 807 """Set the testlab mode 808 809 @param state: the desired testlab mode string: 'on' or 'off' 810 @raise TestFail: if testlab mode was not changed 811 """ 812 if self._servo.main_device_is_ccd(): 813 raise error.TestError('Cannot set testlab mode with CCD. Use flex ' 814 'cable instead.') 815 if not self.faft_config.has_powerbutton: 816 raise error.TestError('No power button on device') 817 818 request_on = self._state_to_bool(state) 819 testlab_on = self.testlab_is_on() 820 request_str = 'on' if request_on else 'off' 821 822 if testlab_on == request_on: 823 logging.info('ccd testlab already set to %s', request_str) 824 return 825 826 original_level = self.get_ccd_level() 827 828 # We can only change the testlab mode when the device is open. If 829 # testlab mode is already enabled, we can go directly to open using 'ccd 830 # testlab open'. This will save 5 minutes, because we can skip the 831 # physical presence check. 832 if testlab_on: 833 self.send_command('ccd testlab open') 834 else: 835 self.set_ccd_level('open') 836 837 ap_is_on = self.ap_is_on() 838 # Set testlab mode 839 rv = self.send_command_get_output('ccd testlab %s' % request_str, 840 ['ccd.*>'])[0] 841 if 'Access Denied' in rv: 842 raise error.TestFail("'ccd %s' %s" % (request_str, rv)) 843 844 # Press the power button once a second for 15 seconds. If the AP is 845 # currently on, make sure it's on at the end of the open process. 846 self.run_pp(self.PP_SHORT, ensure_ap_on=ap_is_on) 847 848 self.set_ccd_level(original_level) 849 if request_on != self.testlab_is_on(): 850 raise error.TestFail('Failed to set ccd testlab to %s' % state) 851 852 853 def get_ccd_level(self): 854 """Returns the current ccd privilege level""" 855 return self.get_ccd_info('State').lower().rstrip('ed') 856 857 858 def set_ccd_level(self, level, password=''): 859 """Set the Cr50 CCD privilege level. 860 861 @param level: a string of the ccd privilege level: 'open', 'lock', or 862 'unlock'. 863 @param password: send the ccd command with password. This will still 864 require the same physical presence. 865 @raise TestFail: if the level couldn't be set 866 """ 867 # TODO(mruthven): add support for CCD password 868 level = level.lower() 869 870 if level == self.get_ccd_level(): 871 logging.info('CCD privilege level is already %s', level) 872 return 873 874 if 'testlab' in level: 875 raise error.TestError("Can't change testlab mode using " 876 "ccd_set_level") 877 878 testlab_on = self._state_to_bool(self._servo.get('cr50_testlab')) 879 batt_is_disconnected = self.get_batt_pres_state()[1] 880 req_pp = self._level_change_req_pp(level) 881 has_pp = not self._servo.main_device_is_ccd() 882 dbg_en = self.get_active_version_info()[2] 883 884 if req_pp and not has_pp: 885 raise error.TestError("Can't change privilege level to '%s' " 886 "without physical presence." % level) 887 888 if not testlab_on and not has_pp: 889 raise error.TestError("Wont change privilege level without " 890 "physical presence or testlab mode enabled") 891 892 original_timeout = float(self._servo.get('cr50_uart_timeout')) 893 # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may 894 # take more than 3 seconds. 895 self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT) 896 # Start the unlock process. 897 898 if level == 'open' or level == 'unlock': 899 logging.info('waiting %d seconds, the minimum time between' 900 ' ccd password attempts', 901 self.CCD_PASSWORD_RATE_LIMIT) 902 time.sleep(self.CCD_PASSWORD_RATE_LIMIT) 903 904 ap_is_on = self.ap_is_on() 905 try: 906 cmd = 'ccd %s%s' % (level, (' ' + password) if password else '') 907 # ccd command outputs on the rbox, ccd, and console channels, 908 # respectively. Cr50 uses these channels to print relevant ccd 909 # information. 910 # Restrict all other channels. 911 ccd_output_channels = 0x20000 | 0x8 | 0x1 912 rv = self.send_safe_command_get_output( 913 cmd, [cmd + '(.*)>'], 914 channel_mask=ccd_output_channels)[0][1] 915 finally: 916 self._servo.set('cr50_uart_timeout', original_timeout) 917 logging.info(rv) 918 if 'ccd_open denied: fwmp' in rv: 919 raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv)) 920 if 'Access Denied' in rv: 921 raise error.TestFail("%r %s" % (cmd, rv)) 922 if 'Busy' in rv: 923 raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv)) 924 925 # Press the power button once a second, if we need physical presence. 926 if req_pp and batt_is_disconnected: 927 # DBG images have shorter unlock processes. If the AP is currently 928 # on, make sure it's on at the end of the open process. 929 self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG, 930 ensure_ap_on=ap_is_on) 931 932 if level != self.get_ccd_level(): 933 raise error.TestFail('Could not set privilege level to %s' % level) 934 935 logging.info('Successfully set CCD privelege level to %s', level) 936 937 938 def run_pp(self, unlock_timeout, ensure_ap_on=False): 939 """Press the power button a for unlock_timeout seconds. 940 941 This will press the power button many more times than it needs to be 942 pressed. Cr50 doesn't care if you press it too often. It just cares that 943 you press the power button at least once within the detect interval. 944 945 For privilege level changes you need to press the power button 5 times 946 in the short interval and then 4 times within the long interval. 947 Short Interval 948 100msec < power button press < 5 seconds 949 Long Interval 950 60s < power button press < 300s 951 952 For testlab enable/disable you must press the power button 5 times 953 spaced between 100msec and 5 seconds apart. 954 955 @param unlock_timeout: time to press the power button in seconds. 956 @param ensure_ap_on: If true, press the power to turn on the AP. 957 """ 958 end_time = time.time() + unlock_timeout 959 960 logging.info('Pressing power button for %ds to unlock the console.', 961 unlock_timeout) 962 logging.info('The process should end at %s', time.ctime(end_time)) 963 964 # Press the power button once a second to unlock the console. 965 while time.time() < end_time: 966 self._servo.power_short_press() 967 time.sleep(1) 968 969 # If the last power button press left the AP powered off, and it was on 970 # before, turn it back on. 971 time.sleep(self.faft_config.shutdown) 972 if ensure_ap_on and not self.ap_is_on(): 973 logging.info('AP is off. Pressing the power button to turn it on') 974 self._servo.power_short_press() 975 logging.debug('Pressing PP to turn back on') 976 977 978 def gettime(self): 979 """Get the current cr50 system time""" 980 result = self.send_safe_command_get_output('gettime', [' = (.*) s']) 981 return float(result[0][1]) 982 983 984 def servo_dts_mode_is_valid(self): 985 """Returns True if cr50 registers change in servo dts mode.""" 986 # This is to test that Cr50 actually recognizes the change in ccd state 987 # We cant do that with tests using ccd, because the cr50 communication 988 # goes down once ccd is enabled. 989 if not self._servo.dts_mode_is_safe(): 990 return False 991 992 ccd_start = 'on' if self.ccd_is_enabled() else 'off' 993 dts_start = self._servo.get_dts_mode() 994 try: 995 # Verify both ccd enable and disable 996 self.ccd_disable(raise_error=True) 997 self.ccd_enable(raise_error=True) 998 rv = True 999 except Exception as e: 1000 logging.info(e) 1001 rv = False 1002 self._servo.set_dts_mode(dts_start) 1003 self.wait_for_stable_ccd_state(ccd_start, 60, True) 1004 logging.info('Test setup does%s support servo DTS mode', 1005 '' if rv else 'n\'t') 1006 return rv 1007 1008 1009 def wait_until_update_is_allowed(self): 1010 """Wait until cr50 will be able to accept an update. 1011 1012 Cr50 rejects any attempt to update if it has been less than 60 seconds 1013 since it last recovered from deep sleep or came up from reboot. This 1014 will wait until cr50 gettime shows a time greater than 60. 1015 """ 1016 if self.get_active_version_info()[2]: 1017 logging.info("Running DBG image. Don't need to wait for update.") 1018 return 1019 cr50_time = self.gettime() 1020 if cr50_time < 60: 1021 sleep_time = 61 - cr50_time 1022 logging.info('Cr50 has been up for %ds waiting %ds before update', 1023 cr50_time, sleep_time) 1024 time.sleep(sleep_time) 1025 1026 1027 def tpm_is_enabled(self): 1028 """Query the current TPM mode. 1029 1030 @return: True if TPM is enabled, False otherwise. 1031 """ 1032 result = self.send_command_retry_get_output('sysinfo', 1033 ['(?i)TPM\s+MODE:\s+(enabled|disabled)'], safe=True)[0][1] 1034 logging.debug(result) 1035 1036 return result.lower() == 'enabled' 1037 1038 1039 def get_keyladder_state(self): 1040 """Get the status of H1 Key Ladder. 1041 1042 @return: The keyladder state string. prod or dev both mean enabled. 1043 """ 1044 result = self.send_command_retry_get_output('sysinfo', 1045 ['(?i)Key\s+Ladder:\s+(enabled|prod|dev|disabled)'], 1046 safe=True)[0][1] 1047 logging.debug(result) 1048 return result 1049 1050 1051 def keyladder_is_disabled(self): 1052 """Get the status of H1 Key Ladder. 1053 1054 @return: True if H1 Key Ladder is disabled. False otherwise. 1055 """ 1056 return self.get_keyladder_state() == 'disabled' 1057 1058 1059 def get_sleepmask(self): 1060 """Returns the sleepmask as an int""" 1061 rv = self.send_command_retry_get_output('sleepmask', 1062 ['sleep mask: (\S{8})\s+'], safe=True)[0][1] 1063 logging.info('sleepmask %s', rv) 1064 return int(rv, 16) 1065 1066 1067 def get_ccdstate(self): 1068 """Return a dictionary of the ccdstate once it's done debouncing""" 1069 for i in range(self.CCDSTATE_MAX_RETRY_COUNT): 1070 rv = self.send_command_retry_get_output('ccdstate', 1071 ['ccdstate(.*)>'], safe=True, compare_output=True)[0][0] 1072 1073 # Look for a line like 'AP: on' or 'AP: off'. 'debouncing' or 1074 # 'unknown' may appear transiently. 'debouncing' should transition 1075 # to 'on' or 'off' within 1 second, and 'unknown' should do so 1076 # within 20 seconds. 1077 if 'debouncing' not in rv and 'unknown' not in rv: 1078 break 1079 time.sleep(self.SHORT_WAIT) 1080 ccdstate = {} 1081 for line in rv.splitlines(): 1082 line = line.strip() 1083 if ':' in line: 1084 k, v = line.split(':', 1) 1085 ccdstate[k.strip()] = v.strip() 1086 logging.info('Current CCD state:\n%s', pprint.pformat(ccdstate)) 1087 return ccdstate 1088 1089 1090 def ap_is_on(self): 1091 """Get the power state of the AP. 1092 1093 @return: True if the AP is on; False otherwise. 1094 """ 1095 ap_state = self.get_ccdstate()['AP'] 1096 if ap_state == 'on': 1097 return True 1098 elif ap_state == 'off': 1099 return False 1100 else: 1101 raise error.TestFail('Read unusable AP state from ccdstate: %r' % 1102 ap_state) 1103 1104 1105 def gpioget(self, signal_name): 1106 """Get the current state of the signal 1107 1108 @return an integer 1 or 0 based on the gpioget value 1109 """ 1110 result = self.send_command_retry_get_output('gpioget', 1111 ['(0|1)[ \S]*%s' % signal_name], safe=True) 1112 return int(result[0][1]) 1113 1114 1115 def batt_pres_is_reset(self): 1116 """Returns True if batt pres is reset to always follow batt pres""" 1117 follow_bp, _, follow_bp_atboot, _ = self.get_batt_pres_state() 1118 return follow_bp and follow_bp_atboot 1119 1120 1121 def get_batt_pres_state(self): 1122 """Get the current and atboot battery presence state 1123 1124 The atboot setting cannot really be determined now if it is set to 1125 follow battery presence. It is likely to remain the same after reboot, 1126 but who knows. If the third element of the tuple is True, the last 1127 element will not be that useful 1128 1129 @return: a tuple of the current battery presence state 1130 (True if current state is to follow batt presence, 1131 True if battery is connected, 1132 True if current state is to follow batt presence atboot, 1133 True if battery is connected atboot) 1134 """ 1135 # bpforce is added in 4.16. If the image doesn't have the command, cr50 1136 # always follows battery presence. In these images 'gpioget BATT_PRES_L' 1137 # accurately represents the battery presence state, because it can't be 1138 # overidden. 1139 if not self.has_command('bpforce'): 1140 batt_pres = not bool(self.gpioget('BATT_PRES_L')) 1141 return (True, batt_pres, True, batt_pres) 1142 1143 # The bpforce command is very similar to the wp command. It just 1144 # substitutes 'connected' for 'enabled' and 'disconnected' for 1145 # 'disabled'. 1146 rv = self.send_command_retry_get_output('bpforce', 1147 ['batt pres: (forced )?(con|dis).*at boot: (forced )?' 1148 '(follow|discon|con)'], safe=True)[0] 1149 _, forced, connected, _, atboot = rv 1150 logging.info(rv) 1151 return (not forced, connected == 'con', atboot == 'follow', 1152 atboot == 'con') 1153 1154 1155 def set_batt_pres_state(self, state, atboot): 1156 """Override the battery presence state. 1157 1158 @param state: a string of the battery presence setting: 'connected', 1159 'disconnected', or 'follow_batt_pres' 1160 @param atboot: True if we're overriding battery presence atboot 1161 """ 1162 cmd = 'bpforce %s%s' % (state, ' atboot' if atboot else '') 1163 logging.info('running %r', cmd) 1164 self.send_command(cmd) 1165 1166 1167 def dump_nvmem(self): 1168 """Print nvmem objects.""" 1169 rv = self.send_safe_command_get_output('dump_nvmem', 1170 ['dump_nvmem(.*)>'])[0][1] 1171 logging.info('NVMEM OUTPUT:\n%s', rv) 1172 1173 1174 def get_reset_cause(self): 1175 """Returns the reset flags for the last reset.""" 1176 rv = self.send_command_retry_get_output('sysinfo', 1177 ['Reset flags:\s+0x([0-9a-f]{8})\s'], compare_output=True)[0][1] 1178 logging.info('reset cause: %s', rv) 1179 return int(rv, 16) 1180 1181 1182 def was_reset(self, reset_type): 1183 """Returns 1 if the reset type is found in the reset_cause. 1184 1185 @param reset_type: reset name in string type. 1186 """ 1187 reset_cause = self.get_reset_cause() 1188 reset_flag = self.RESET_FLAGS[reset_type] 1189 return bool(reset_cause & reset_flag) 1190 1191 1192 def get_devid(self): 1193 """Returns the cr50 serial number.""" 1194 return self.send_command_retry_get_output('sysinfo', 1195 ['DEV_ID:\s+(0x[0-9a-f]{8} 0x[0-9a-f]{8})'])[0][1] 1196 1197 1198 def get_serial(self): 1199 """Returns the cr50 serial number.""" 1200 serial = self.get_devid().replace('0x', '').replace(' ', '-').upper() 1201 logging.info('CCD serial: %s', serial) 1202 return serial 1203 1204 def check_boot_mode(self, mode_exp='NORMAL'): 1205 """Query the boot mode to Cr50, and compare it against mode_exp. 1206 1207 Args: 1208 mode_exp: expecting boot mode. It should be either 'NORMAL' 1209 or 'NO_BOOT'. 1210 Returns: 1211 True if the boot mode matches mode_exp. 1212 False, otherwise. 1213 Raises: 1214 TestError: Input parameter is not valid. 1215 """ 1216 1217 if mode_exp not in ['NORMAL', 'NO_BOOT']: 1218 raise error.TestError('parameter, mode_exp is not valid: %s' % 1219 mode_exp) 1220 rv = self.send_command_retry_get_output('ec_comm', 1221 ['boot_mode\s*:\s*(NORMAL|NO_BOOT)'], safe=True) 1222 return mode_exp == rv[0][1] 1223 1224 def get_reset_count(self): 1225 """Returns the cr50 reset count""" 1226 return self.send_command_retry_get_output('sysinfo', 1227 ['Reset count: (\d+)'], 1228 safe=True)[0][1] 1229 1230 def check_servo_monitor(self): 1231 """Returns True if cr50 can detect servo connect/disconnect""" 1232 orig_dts = self._servo.get('servo_v4_dts_mode') 1233 # Detach ccd so EC uart won't interfere with servo detection 1234 self._servo.set_dts_mode('off') 1235 self._servo.set('ec_uart_en', 'off') 1236 time.sleep(self.SHORT_WAIT) 1237 if self.get_ccdstate()['Servo'] != 'disconnected': 1238 self._servo.set_dts_mode(orig_dts) 1239 return False 1240 1241 self._servo.set('ec_uart_en', 'on') 1242 time.sleep(self.SHORT_WAIT) 1243 if self.get_ccdstate()['Servo'] != 'connected': 1244 self._servo.set_dts_mode(orig_dts) 1245 return False 1246 self._servo.set_dts_mode(orig_dts) 1247 return True 1248