1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import ast 6import functools 7import logging 8import re 9import time 10 11from autotest_lib.client.bin import utils 12from autotest_lib.client.common_lib import error 13from autotest_lib.client.cros import ec 14 15# Hostevent codes, copied from: 16# ec/include/ec_commands.h 17HOSTEVENT_LID_CLOSED = 0x00000001 18HOSTEVENT_LID_OPEN = 0x00000002 19HOSTEVENT_POWER_BUTTON = 0x00000004 20HOSTEVENT_AC_CONNECTED = 0x00000008 21HOSTEVENT_AC_DISCONNECTED = 0x00000010 22HOSTEVENT_BATTERY_LOW = 0x00000020 23HOSTEVENT_BATTERY_CRITICAL = 0x00000040 24HOSTEVENT_BATTERY = 0x00000080 25HOSTEVENT_THERMAL_THRESHOLD = 0x00000100 26HOSTEVENT_THERMAL_OVERLOAD = 0x00000200 27HOSTEVENT_THERMAL = 0x00000400 28HOSTEVENT_USB_CHARGER = 0x00000800 29HOSTEVENT_KEY_PRESSED = 0x00001000 30HOSTEVENT_INTERFACE_READY = 0x00002000 31# Keyboard recovery combo has been pressed 32HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000 33# Shutdown due to thermal overload 34HOSTEVENT_THERMAL_SHUTDOWN = 0x00008000 35# Shutdown due to battery level too low 36HOSTEVENT_BATTERY_SHUTDOWN = 0x00010000 37HOSTEVENT_INVALID = 0x80000000 38 39# Time to wait after sending keypress commands. 40KEYPRESS_RECOVERY_TIME = 0.5 41 42 43class ChromeConsole(object): 44 """Manages control of a Chrome console. 45 46 We control the Chrome console via the UART of a Servo board. Chrome console 47 provides many interfaces to set and get its behavior via console commands. 48 This class is to abstract these interfaces. 49 """ 50 51 CMD = "_cmd" 52 REGEXP = "_regexp" 53 MULTICMD = "_multicmd" 54 55 def __init__(self, servo, name): 56 """Initialize and keep the servo object. 57 58 Args: 59 servo: A Servo object. 60 name: The console name. 61 """ 62 self.name = name 63 self.uart_cmd = self.name + self.CMD 64 self.uart_regexp = self.name + self.REGEXP 65 self.uart_multicmd = self.name + self.MULTICMD 66 67 self._servo = servo 68 self._cached_uart_regexp = None 69 70 71 def set_uart_regexp(self, regexp): 72 if self._cached_uart_regexp == regexp: 73 return 74 self._cached_uart_regexp = regexp 75 self._servo.set(self.uart_regexp, regexp) 76 77 78 def send_command(self, commands): 79 """Send command through UART. 80 81 This function opens UART pty when called, and then command is sent 82 through UART. 83 84 Args: 85 commands: The commands to send, either a list or a string. 86 """ 87 self.set_uart_regexp('None') 88 if isinstance(commands, list): 89 try: 90 self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands)) 91 except error.TestFail as e: 92 if 'No control named' in str(e): 93 logging.warning( 94 'The servod is too old that uart_multicmd ' 95 'not supported. Use uart_cmd instead.') 96 for command in commands: 97 self._servo.set_nocheck(self.uart_cmd, command) 98 else: 99 raise 100 else: 101 self._servo.set_nocheck(self.uart_cmd, commands) 102 103 104 def send_command_get_output(self, command, regexp_list): 105 """Send command through UART and wait for response. 106 107 This function waits for response message matching regular expressions. 108 109 Args: 110 command: The command sent. 111 regexp_list: List of regular expressions used to match response 112 message. Note, list must be ordered. 113 114 Returns: 115 List of tuples, each of which contains the entire matched string and 116 all the subgroups of the match. None if not matched. 117 For example: 118 response of the given command: 119 High temp: 37.2 120 Low temp: 36.4 121 regexp_list: 122 ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)'] 123 returns: 124 [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')] 125 126 Raises: 127 error.TestError: An error when the given regexp_list is not valid. 128 """ 129 if not isinstance(regexp_list, list): 130 raise error.TestError('Arugment regexp_list is not a list: %s' % 131 str(regexp_list)) 132 133 self.set_uart_regexp(str(regexp_list)) 134 self._servo.set_nocheck(self.uart_cmd, command) 135 return ast.literal_eval(self._servo.get(self.uart_cmd)) 136 137 138class ChromeEC(ChromeConsole): 139 """Manages control of a Chrome EC. 140 141 We control the Chrome EC via the UART of a Servo board. Chrome EC 142 provides many interfaces to set and get its behavior via console commands. 143 This class is to abstract these interfaces. 144 """ 145 146 def __init__(self, servo, name="ec_uart"): 147 super(ChromeEC, self).__init__(servo, name) 148 149 150 def key_down(self, keyname): 151 """Simulate pressing a key. 152 153 Args: 154 keyname: Key name, one of the keys of KEYMATRIX. 155 """ 156 self.send_command('kbpress %d %d 1' % 157 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0])) 158 159 160 def key_up(self, keyname): 161 """Simulate releasing a key. 162 163 Args: 164 keyname: Key name, one of the keys of KEYMATRIX. 165 """ 166 self.send_command('kbpress %d %d 0' % 167 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0])) 168 169 170 def key_press(self, keyname): 171 """Press and then release a key. 172 173 Args: 174 keyname: Key name, one of the keys of KEYMATRIX. 175 """ 176 self.send_command([ 177 'kbpress %d %d 1' % 178 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]), 179 'kbpress %d %d 0' % 180 (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]), 181 ]) 182 # Don't spam the EC console as fast as we can; leave some recovery time 183 # in between commands. 184 time.sleep(KEYPRESS_RECOVERY_TIME) 185 186 187 def send_key_string_raw(self, string): 188 """Send key strokes consisting of only characters. 189 190 Args: 191 string: Raw string. 192 """ 193 for c in string: 194 self.key_press(c) 195 196 197 def send_key_string(self, string): 198 """Send key strokes including special keys. 199 200 Args: 201 string: Character string including special keys. An example 202 is "this is an<tab>example<enter>". 203 """ 204 for m in re.finditer("(<[^>]+>)|([^<>]+)", string): 205 sp, raw = m.groups() 206 if raw is not None: 207 self.send_key_string_raw(raw) 208 else: 209 self.key_press(sp) 210 211 212 def reboot(self, flags=''): 213 """Reboot EC with given flags. 214 215 Args: 216 flags: Optional, a space-separated string of flags passed to the 217 reboot command, including: 218 default: EC soft reboot; 219 'hard': EC hard/cold reboot; 220 'ap-off': Leave AP off after EC reboot (by default, EC turns 221 AP on after reboot if lid is open). 222 223 Raises: 224 error.TestError: If the string of flags is invalid. 225 """ 226 for flag in flags.split(): 227 if flag not in ('hard', 'ap-off'): 228 raise error.TestError( 229 'The flag %s of EC reboot command is invalid.' % flag) 230 self.send_command("reboot %s" % flags) 231 232 233 def set_flash_write_protect(self, enable): 234 """Set the software write protect of EC flash. 235 236 Args: 237 enable: True to enable write protect, False to disable. 238 """ 239 if enable: 240 self.send_command("flashwp enable") 241 else: 242 self.send_command("flashwp disable") 243 244 245 def set_hostevent(self, codes): 246 """Set the EC hostevent codes. 247 248 Args: 249 codes: Hostevent codes, HOSTEVENT_* 250 """ 251 self.send_command("hostevent set %#x" % codes) 252 # Allow enough time for EC to process input and set flag. 253 # See chromium:371631 for details. 254 # FIXME: Stop importing time module if this hack becomes obsolete. 255 time.sleep(1) 256 257 258 def enable_console_channel(self, channel): 259 """Find console channel mask and enable that channel only 260 261 @param channel: console channel name 262 """ 263 # The 'chan' command returns a list of console channels, 264 # their channel masks and channel numbers 265 regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel) 266 l = self.send_command_get_output('chan', [regexp]) 267 # Use channel mask and append the 0x for proper hex input value 268 cmd = 'chan 0x' + l[0][2] 269 # Set console to only output the desired channel 270 self.send_command(cmd) 271 272 273 def get_version(self): 274 """Get version information from the Chrome EC console. 275 Additionally, can be used to verify if EC console is available. 276 """ 277 self.send_command("chan 0") 278 expected_output = ["Chip:\s+([^\r\n]*)\r\n", 279 "RO:\s+([^\r\n]*)\r\n", 280 "RW_?[AB]?:\s+([^\r\n]*)\r\n", 281 "Build:\s+([^\r\n]*)\r\n"] 282 l = self.send_command_get_output("version", expected_output) 283 self.send_command("chan 0xffffffff") 284 return l 285 286 287class ChromeUSBPD(ChromeEC): 288 """Manages control of a Chrome USBPD. 289 290 We control the Chrome EC via the UART of a Servo board. Chrome USBPD 291 provides many interfaces to set and get its behavior via console commands. 292 This class is to abstract these interfaces. 293 """ 294 295 def __init__(self, servo): 296 super(ChromeUSBPD, self).__init__(servo, "usbpd_uart") 297