1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import logging 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.cros import ec 12from autotest_lib.server.cros.servo import servo 13 14# Hostevent codes, copied from: 15# ec/include/ec_commands.h 16HOSTEVENT_LID_CLOSED = 0x00000001 17HOSTEVENT_LID_OPEN = 0x00000002 18HOSTEVENT_POWER_BUTTON = 0x00000004 19HOSTEVENT_AC_CONNECTED = 0x00000008 20HOSTEVENT_AC_DISCONNECTED = 0x00000010 21HOSTEVENT_BATTERY_LOW = 0x00000020 22HOSTEVENT_BATTERY_CRITICAL = 0x00000040 23HOSTEVENT_BATTERY = 0x00000080 24HOSTEVENT_THERMAL_THRESHOLD = 0x00000100 25HOSTEVENT_THERMAL_OVERLOAD = 0x00000200 26HOSTEVENT_THERMAL = 0x00000400 27HOSTEVENT_USB_CHARGER = 0x00000800 28HOSTEVENT_KEY_PRESSED = 0x00001000 29HOSTEVENT_INTERFACE_READY = 0x00002000 30# Keyboard recovery combo has been pressed 31HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000 32# Shutdown due to thermal overload 33HOSTEVENT_THERMAL_SHUTDOWN = 0x00008000 34# Shutdown due to battery level too low 35HOSTEVENT_BATTERY_SHUTDOWN = 0x00010000 36HOSTEVENT_INVALID = 0x80000000 37 38# Time to wait after sending keypress commands. 39KEYPRESS_RECOVERY_TIME = 0.5 40 41 42class ChromeConsole(object): 43 """Manages control of a Chrome console. 44 45 We control the Chrome console via the UART of a Servo board. Chrome console 46 provides many interfaces to set and get its behavior via console commands. 47 This class is to abstract these interfaces. 48 """ 49 50 CMD = "_cmd" 51 REGEXP = "_regexp" 52 MULTICMD = "_multicmd" 53 54 def __init__(self, servo, name): 55 """Initialize and keep the servo object. 56 57 Args: 58 servo: A Servo object. 59 name: The console name. 60 """ 61 self.name = name 62 self.uart_cmd = self.name + self.CMD 63 self.uart_regexp = self.name + self.REGEXP 64 self.uart_multicmd = self.name + self.MULTICMD 65 66 self._servo = servo 67 self._cached_uart_regexp = None 68 69 70 def set_uart_regexp(self, regexp): 71 if self._cached_uart_regexp == regexp: 72 return 73 self._cached_uart_regexp = regexp 74 self._servo.set(self.uart_regexp, regexp) 75 76 77 def send_command(self, commands): 78 """Send command through UART. 79 80 This function opens UART pty when called, and then command is sent 81 through UART. 82 83 Args: 84 commands: The commands to send, either a list or a string. 85 """ 86 self.set_uart_regexp('None') 87 if isinstance(commands, list): 88 try: 89 self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands)) 90 except servo.ControlUnavailableError: 91 logging.warning('The servod is too old that uart_multicmd not ' 92 'supported. Use uart_cmd instead.') 93 for command in commands: 94 self._servo.set_nocheck(self.uart_cmd, command) 95 else: 96 self._servo.set_nocheck(self.uart_cmd, commands) 97 98 def has_command(self, command): 99 """Check whether EC console supports |command|. 100 101 Args: 102 command: Command to look for. 103 104 Returns: 105 True: If the |command| exist on the EC image of the device. 106 False: If the |command| does not exist on the EC image of the device. 107 """ 108 result = None 109 try: 110 # Throws error.TestFail (on timeout) if it cannot find a line with 111 # 'command' in the output. Thus return False in that case. 112 result = self.send_command_get_output('help', [command]) 113 except error.TestFail: 114 return False 115 return result is not None 116 117 def send_command_get_output(self, command, regexp_list): 118 """Send command through UART and wait for response. 119 120 This function waits for response message matching regular expressions. 121 122 Args: 123 command: The command sent. 124 regexp_list: List of regular expressions used to match response 125 message. Note, list must be ordered. 126 127 Returns: 128 List of tuples, each of which contains the entire matched string and 129 all the subgroups of the match. None if not matched. 130 For example: 131 response of the given command: 132 High temp: 37.2 133 Low temp: 36.4 134 regexp_list: 135 ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] 136 returns: 137 [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] 138 139 Raises: 140 error.TestError: An error when the given regexp_list is not valid. 141 """ 142 if not isinstance(regexp_list, list): 143 raise error.TestError('Arugment regexp_list is not a list: %s' % 144 str(regexp_list)) 145 146 self.set_uart_regexp(str(regexp_list)) 147 self._servo.set_nocheck(self.uart_cmd, command) 148 return ast.literal_eval(self._servo.get(self.uart_cmd)) 149 150 151class ChromeEC(ChromeConsole): 152 """Manages control of a Chrome EC. 153 154 We control the Chrome EC via the UART of a Servo board. Chrome EC 155 provides many interfaces to set and get its behavior via console commands. 156 This class is to abstract these interfaces. 157 """ 158 159 def __init__(self, servo, name="ec_uart"): 160 super(ChromeEC, self).__init__(servo, name) 161 162 163 def key_down(self, keyname): 164 """Simulate pressing a key. 165 166 Args: 167 keyname: Key name, one of the keys of KEYMATRIX. 168 """ 169 self.send_command('kbpress %d %d 1' % 170 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0])) 171 172 173 def key_up(self, keyname): 174 """Simulate releasing a key. 175 176 Args: 177 keyname: Key name, one of the keys of KEYMATRIX. 178 """ 179 self.send_command('kbpress %d %d 0' % 180 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0])) 181 182 183 def key_press(self, keyname): 184 """Press and then release a key. 185 186 Args: 187 keyname: Key name, one of the keys of KEYMATRIX. 188 """ 189 self.send_command([ 190 'kbpress %d %d 1' % 191 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]), 192 'kbpress %d %d 0' % 193 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]), 194 ]) 195 # Don't spam the EC console as fast as we can; leave some recovery time 196 # in between commands. 197 time.sleep(KEYPRESS_RECOVERY_TIME) 198 199 200 def send_key_string_raw(self, string): 201 """Send key strokes consisting of only characters. 202 203 Args: 204 string: Raw string. 205 """ 206 for c in string: 207 self.key_press(c) 208 209 210 def send_key_string(self, string): 211 """Send key strokes including special keys. 212 213 Args: 214 string: Character string including special keys. An example 215 is "this is an<tab>example<enter>". 216 """ 217 for m in re.finditer("(<[^>]+>)|([^<>]+)", string): 218 sp, raw = m.groups() 219 if raw is not None: 220 self.send_key_string_raw(raw) 221 else: 222 self.key_press(sp) 223 224 225 def reboot(self, flags=''): 226 """Reboot EC with given flags. 227 228 Args: 229 flags: Optional, a space-separated string of flags passed to the 230 reboot command, including: 231 default: EC soft reboot; 232 'hard': EC hard/cold reboot; 233 'ap-off': Leave AP off after EC reboot (by default, EC turns 234 AP on after reboot if lid is open). 235 236 Raises: 237 error.TestError: If the string of flags is invalid. 238 """ 239 for flag in flags.split(): 240 if flag not in ('hard', 'ap-off'): 241 raise error.TestError( 242 'The flag %s of EC reboot command is invalid.' % flag) 243 self.send_command("reboot %s" % flags) 244 245 246 def set_flash_write_protect(self, enable): 247 """Set the software write protect of EC flash. 248 249 Args: 250 enable: True to enable write protect, False to disable. 251 """ 252 if enable: 253 self.send_command("flashwp enable") 254 else: 255 self.send_command("flashwp disable") 256 257 258 def set_hostevent(self, codes): 259 """Set the EC hostevent codes. 260 261 Args: 262 codes: Hostevent codes, HOSTEVENT_* 263 """ 264 self.send_command("hostevent set %#x" % codes) 265 # Allow enough time for EC to process input and set flag. 266 # See chromium:371631 for details. 267 # FIXME: Stop importing time module if this hack becomes obsolete. 268 time.sleep(1) 269 270 271 def enable_console_channel(self, channel): 272 """Find console channel mask and enable that channel only 273 274 @param channel: console channel name 275 """ 276 # The 'chan' command returns a list of console channels, 277 # their channel masks and channel numbers 278 regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel) 279 l = self.send_command_get_output('chan', [regexp]) 280 # Use channel mask and append the 0x for proper hex input value 281 cmd = 'chan 0x' + l[0][2] 282 # Set console to only output the desired channel 283 self.send_command(cmd) 284 285 286 def get_version(self): 287 """Get version information from the Chrome EC console. 288 Additionally, can be used to verify if EC console is available. 289 """ 290 self.send_command("chan 0") 291 expected_output = ["Chip:\s+([^\r\n]*)\r\n", 292 "RO:\s+([^\r\n]*)\r\n", 293 "RW_?[AB]?:\s+([^\r\n]*)\r\n", 294 "Build:\s+([^\r\n]*)\r\n"] 295 l = self.send_command_get_output("version", expected_output) 296 self.send_command("chan 0xffffffff") 297 return l 298 299 300class ChromeUSBPD(ChromeEC): 301 """Manages control of a Chrome USBPD. 302 303 We control the Chrome EC via the UART of a Servo board. Chrome USBPD 304 provides many interfaces to set and get its behavior via console commands. 305 This class is to abstract these interfaces. 306 """ 307 308 def __init__(self, servo): 309 super(ChromeUSBPD, self).__init__(servo, "usbpd_uart") 310