1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import functools 6import logging 7import time 8 9from autotest_lib.client.bin import utils 10from autotest_lib.client.common_lib import error 11from autotest_lib.server.cros.servo import chrome_ec 12 13 14def ccd_command(func): 15 """Decorator for methods only relevant to devices using CCD.""" 16 @functools.wraps(func) 17 def wrapper(instance, *args, **kwargs): 18 if instance.using_ccd(): 19 return func(instance, *args, **kwargs) 20 logging.info("not using ccd. ignoring %s", func.func_name) 21 return wrapper 22 23 24class ChromeCr50(chrome_ec.ChromeConsole): 25 """Manages control of a Chrome Cr50. 26 27 We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50 28 provides many interfaces to set and get its behavior via console commands. 29 This class is to abstract these interfaces. 30 """ 31 IDLE_COUNT = 'count: (\d+)' 32 VERSION_FORMAT = '\d+\.\d+\.\d+' 33 VERSION_ERROR = 'Error' 34 INACTIVE = '\nRW_(A|B): +(%s|%s)(/DBG|)?' % (VERSION_FORMAT, VERSION_ERROR) 35 ACTIVE = '\nRW_(A|B): +\* +(%s)(/DBG|)?' % (VERSION_FORMAT) 36 WAKE_CHAR = '\n' 37 START_UNLOCK_TIMEOUT = 20 38 GETTIME = ['= (\S+)'] 39 UNLOCK = ['Unlock sequence starting. Continue until (\S+)'] 40 FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"] 41 FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting'] 42 MAX_RETRY_COUNT = 5 43 START_STR = ['(.*Console is enabled;)'] 44 45 46 def __init__(self, servo): 47 super(ChromeCr50, self).__init__(servo, "cr50_console") 48 49 50 def send_command(self, commands): 51 """Send command through UART. 52 53 Cr50 will drop characters input to the UART when it resumes from sleep. 54 If servo is not using ccd, send some dummy characters before sending the 55 real command to make sure cr50 is awake. 56 """ 57 if not self.using_ccd(): 58 super(ChromeCr50, self).send_command(self.WAKE_CHAR) 59 super(ChromeCr50, self).send_command(commands) 60 61 62 def send_command_get_output(self, command, regexp_list): 63 """Send command through UART and wait for response. 64 65 Cr50 will drop characters input to the UART when it resumes from sleep. 66 If servo is not using ccd, send some dummy characters before sending the 67 real command to make sure cr50 is awake. 68 """ 69 if not self.using_ccd(): 70 super(ChromeCr50, self).send_command(self.WAKE_CHAR) 71 return super(ChromeCr50, self).send_command_get_output(command, 72 regexp_list) 73 74 75 def get_deep_sleep_count(self): 76 """Get the deep sleep count from the idle task""" 77 result = self.send_command_get_output('idle', [self.IDLE_COUNT]) 78 return int(result[0][1]) 79 80 81 def clear_deep_sleep_count(self): 82 """Clear the deep sleep count""" 83 result = self.send_command_get_output('idle c', [self.IDLE_COUNT]) 84 if int(result[0][1]): 85 raise error.TestFail("Could not clear deep sleep count") 86 87 88 def has_command(self, cmd): 89 """Returns 1 if cr50 has the command 0 if it doesn't""" 90 try: 91 self.send_command_get_output('help', [cmd]) 92 except: 93 logging.info("Image does not include '%s' command", cmd) 94 return 0 95 return 1 96 97 98 def erase_nvmem(self): 99 """Use flasherase to erase both nvmem sections""" 100 if not self.has_command('flasherase'): 101 raise error.TestError("need image with 'flasherase'") 102 103 self.send_command('flasherase 0x7d000 0x3000') 104 self.send_command('flasherase 0x3d000 0x3000') 105 106 107 def reboot(self): 108 """Reboot Cr50 and wait for CCD to be enabled""" 109 self.send_command('reboot') 110 self.wait_for_reboot() 111 112 113 def wait_for_reboot(self, timeout=60): 114 """Wait for cr50 to reboot""" 115 if self.using_ccd(): 116 # Cr50 USB is reset when it reboots. Wait for the CCD connection to 117 # go down to detect the reboot. 118 self.wait_for_ccd_disable(timeout, raise_error=False) 119 self.ccd_enable() 120 else: 121 # Look for the boot string declaring the console is ready. If we 122 # don't find it, its ok. The command will timeout after 3 seconds 123 # which is longer than the time it takes for cr50 to reboot. 124 try: 125 rv = self.send_command_get_output('\n\n', self.START_STR) 126 logging.debug(rv[0][0]) 127 except: 128 pass 129 130 131 def rollback(self, eraseflashinfo=True): 132 """Set the reset counter high enough to force a rollback then reboot""" 133 if not self.has_command('rw') or not self.has_command('eraseflashinfo'): 134 raise error.TestError("need image with 'rw' and 'eraseflashinfo'") 135 136 inactive_partition = self.get_inactive_version_info()[0] 137 # Increase the reset count to above the rollback threshold 138 self.send_command('rw 0x40000128 1') 139 self.send_command('rw 0x4000012c %d' % (self.MAX_RETRY_COUNT + 2)) 140 141 if eraseflashinfo: 142 self.send_command('eraseflashinfo') 143 144 self.reboot() 145 146 running_partition = self.get_active_version_info()[0] 147 if inactive_partition != running_partition: 148 raise error.TestError("Failed to rollback to inactive image") 149 150 151 def rolledback(self): 152 """Returns true if cr50 just rolled back""" 153 return int(self._servo.get('cr50_reset_count')) > self.MAX_RETRY_COUNT 154 155 156 def get_version_info(self, regexp): 157 """Get information from the version command""" 158 return self.send_command_get_output('ver', [regexp])[0][1::] 159 160 161 def get_inactive_version_info(self): 162 """Get the active partition, version, and hash""" 163 return self.get_version_info(self.INACTIVE) 164 165 166 def get_active_version_info(self): 167 """Get the active partition, version, and hash""" 168 return self.get_version_info(self.ACTIVE) 169 170 171 def get_version(self): 172 """Get the RW version""" 173 return self.get_active_version_info()[1].strip() 174 175 176 def using_servo_v4(self): 177 """Returns true if the console is being served using servo v4""" 178 return 'servo_v4' in self._servo.get_servo_version() 179 180 181 def using_ccd(self): 182 """Returns true if the console is being served using CCD""" 183 return 'ccd_cr50' in self._servo.get_servo_version() 184 185 186 @ccd_command 187 def get_ccd_state(self): 188 """Get the CCD state from servo 189 190 Returns: 191 'off' or 'on' based on whether the cr50 console is working. 192 """ 193 return self._servo.get('ccd_state') 194 195 196 @ccd_command 197 def wait_for_ccd_state(self, state, timeout, raise_error=True): 198 """Wait up to timeout seconds for CCD to be 'on' or 'off' 199 Args: 200 state: a string either 'on' or 'off'. 201 timeout: time in seconds to wait 202 raise_error: Raise TestFail if the value is state is not reached. 203 204 Raises 205 TestFail if ccd never reaches the specified state 206 """ 207 logging.info("Wait until ccd is '%s'", state) 208 value = utils.wait_for_value(self.get_ccd_state, state, 209 timeout_sec=timeout) 210 if value != state: 211 error_msg = "timed out before detecting ccd '%s'" % state 212 if raise_error: 213 raise error.TestFail(error_msg) 214 logging.warning(error_msg) 215 logging.info("ccd is '%s'", value) 216 217 218 @ccd_command 219 def wait_for_ccd_disable(self, timeout=60, raise_error=True): 220 """Wait for the cr50 console to stop working""" 221 self.wait_for_ccd_state('off', timeout, raise_error) 222 223 224 @ccd_command 225 def wait_for_ccd_enable(self, timeout=60): 226 """Wait for the cr50 console to start working""" 227 self.wait_for_ccd_state('on', timeout) 228 229 230 def ccd_disable(self): 231 """Change the values of the CC lines to disable CCD""" 232 if self.using_servo_v4(): 233 logging.info("disable ccd") 234 self._servo.set_nocheck('servo_v4_dts_mode', 'off') 235 self.wait_for_ccd_disable() 236 237 238 @ccd_command 239 def ccd_enable(self): 240 """Reenable CCD and reset servo interfaces""" 241 logging.info("reenable ccd") 242 self._servo.set_nocheck('servo_v4_ccd_mode', 'ccd') 243 self._servo.set_nocheck('servo_v4_dts_mode', 'on') 244 self._servo.set_nocheck('power_state', 'ccd_reset') 245 self.wait_for_ccd_enable() 246 247 248 def lock_enable(self): 249 """Enable the lock on cr50""" 250 # Lock enable can be run, but we won't be able to use the power button 251 # to disable the lock. Let's not allow the console lock to be enabled 252 # if it can't be disabled without some change to the test setup 253 if self.using_ccd(): 254 raise error.TestError("Cannot run 'lock enable' using CCD.") 255 self.send_command_get_output('lock enable', 256 ['The restricted console lock is enabled']) 257 258 259 def _attempt_unlock(self): 260 """Try to unlock the console. 261 262 Raises: 263 TestError if the unlock process fails. 264 """ 265 # Get the current time. 266 rv = self.send_command_get_output('gettime', self.GETTIME) 267 current_time = float(rv[0][1]) 268 269 # Start the unlock process. 270 rv = self.send_command_get_output('lock disable', self.UNLOCK) 271 unlock_finished = float(rv[0][1]) 272 273 # Calculate the unlock timeout. There is a 10s countdown to start the 274 # unlock process, so unlock_timeout will be around 10s longer than 275 # necessary. 276 unlock_timeout = int(unlock_finished - current_time) 277 end_time = time.time() + unlock_timeout 278 279 logging.info('Pressing power button for %ds to unlock the console.', 280 unlock_timeout) 281 logging.info('The process should end at %s', time.ctime(end_time)) 282 283 # Press the power button once a second to unlock the console. 284 while time.time() < end_time: 285 self._servo.power_short_press() 286 time.sleep(1) 287 288 if self._servo.get('ccd_lock') != 'off': 289 raise error.TestError('Could not disable lock') 290 291 292 def lock_disable(self): 293 """Increase the console timeout and try disabling the lock.""" 294 # We cannot press the power button using ccd. 295 if self.using_ccd(): 296 raise error.TestError("Cannot run 'lock disable' using CCD.") 297 298 # The unlock process takes a while to start. Increase the cr50 console 299 # timeout so we can get the entire output of the 'lock disable' start 300 # process 301 original_timeout = self._servo.get('cr50_console_timeout') 302 self._servo.set_nocheck('cr50_console_timeout', 303 self.START_UNLOCK_TIMEOUT) 304 305 try: 306 # Try to disable the lock 307 self._attempt_unlock() 308 finally: 309 # Reset the cr50_console timeout 310 self._servo.set_nocheck('cr50_console_timeout', original_timeout) 311 312 logging.info('Successfully disabled the lock') 313