1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import logging 5import re 6import time 7 8from autotest_lib.client.bin import utils 9from autotest_lib.client.common_lib import error 10 11# en-US key matrix (from "kb membrane pin matrix.pdf") 12KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3), 13 '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9), 14 '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4), 15 'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6), 16 'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5), 17 '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3), 18 'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9), 19 ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2), 20 'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5), 21 '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12), 22 '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1), 23 '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0), 24 '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2), 25 '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2), 26 '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4), 27 '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9), 28 '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11), 29 '<left>': (7, 12)} 30 31 32def has_ectool(): 33 """Determine if ectool shell command is present. 34 35 Returns: 36 boolean true if avail, false otherwise. 37 """ 38 cmd = 'which ectool' 39 return (utils.system(cmd, ignore_status=True) == 0) 40 41 42class EC_Common(object): 43 """Class for EC common. 44 45 This incredibly brief base class is intended to encapsulate common elements 46 across various CrOS MCUs (ec proper, USB-PD, Sensor Hub). At the moment 47 that includes only the use of ectool. 48 """ 49 50 def __init__(self, target='cros_ec'): 51 """Constructor. 52 53 @param target: target name of ec to communicate with. 54 """ 55 if not has_ectool(): 56 ec_info = utils.system_output("mosys ec info", 57 ignore_status=True) 58 logging.warning("Ectool absent on this platform ( %s )", 59 ec_info) 60 raise error.TestNAError("Platform doesn't support ectool") 61 self._target = target 62 63 def ec_command(self, cmd, **kwargs): 64 """Executes ec command and returns results. 65 66 @param cmd: string of command to execute. 67 @param kwargs: optional params passed to utils.system_output 68 69 @returns: string of results from ec command. 70 """ 71 full_cmd = 'ectool --name=%s %s' % (self._target, cmd) 72 result = utils.system_output(full_cmd, **kwargs) 73 logging.debug('Command: %s', full_cmd) 74 logging.debug('Result: %s', result) 75 return result 76 77 78class EC(EC_Common): 79 """Class for CrOS embedded controller (EC).""" 80 HELLO_RE = "EC says hello" 81 GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)" 82 SET_FANSPEED_RE = "Fan target RPM set." 83 TEMP_SENSOR_RE = "Reading temperature...([0-9]*)" 84 TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on" 85 # For battery, check we can see a non-zero capacity value. 86 BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh" 87 LIGHTBAR_RE = "^ 05\s+3f\s+3f$" 88 89 90 def hello(self): 91 """Test EC hello command. 92 93 @returns True if success False otherwise. 94 """ 95 response = self.ec_command('hello') 96 return (re.search(self.HELLO_RE, response) is not None) 97 98 def auto_fan_ctrl(self): 99 """Turns auto fan ctrl on. 100 101 @returns True if success False otherwise. 102 """ 103 response = self.ec_command('autofanctrl') 104 logging.info('Turned on auto fan control.') 105 return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None) 106 107 def get_fanspeed(self): 108 """Gets fanspeed. 109 110 @raises error.TestError if regexp fails to match. 111 112 @returns integer of fan speed RPM. 113 """ 114 response = self.ec_command('pwmgetfanrpm') 115 match = re.search(self.GET_FANSPEED_RE, response) 116 if not match: 117 raise error.TestError('Unable to read fan speed') 118 119 rpm = int(match.group(1)) 120 logging.info('Fan speed: %d', rpm) 121 return rpm 122 123 def set_fanspeed(self, rpm): 124 """Sets fan speed. 125 126 @param rpm: integer of fan speed RPM to set 127 128 @returns True if success False otherwise. 129 """ 130 response = self.ec_command('pwmsetfanrpm %d' % rpm) 131 logging.info('Set fan speed: %d', rpm) 132 return (re.search(self.SET_FANSPEED_RE, response) is not None) 133 134 def get_temperature(self, idx): 135 """Gets temperature from idx sensor. 136 137 @param idx: integer of temp sensor to read. 138 139 @raises error.TestError if fails to read sensor. 140 141 @returns integer of temperature reading in degrees Kelvin. 142 """ 143 response = self.ec_command('temps %d' % idx) 144 match = re.search(self.TEMP_SENSOR_RE, response) 145 if not match: 146 raise error.TestError('Unable to read temperature sensor %d' % idx) 147 148 return int(match.group(1)) 149 150 def get_battery(self): 151 """Get battery presence (design capacity found). 152 153 @returns True if success False otherwise. 154 """ 155 response = self.ec_command('battery') 156 return (re.search(self.BATTERY_RE, response) is not None) 157 158 def get_lightbar(self): 159 """Test lightbar. 160 161 @returns True if success False otherwise. 162 """ 163 self.ec_command('lightbar on') 164 self.ec_command('lightbar init') 165 self.ec_command('lightbar 4 255 255 255') 166 response = self.ec_command('lightbar') 167 self.ec_command('lightbar off') 168 return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None) 169 170 def key_press(self, key): 171 """Emit key down and up signal of the keyboard. 172 173 @param key: name of a key defined in KEYMATRIX. 174 """ 175 self.key_down(key) 176 self.key_up(key) 177 178 def _key_action(self, key, action_type): 179 if not key in KEYMATRIX: 180 raise error.TestError('Unknown key: ' + key) 181 row, col = KEYMATRIX[key] 182 self.ec_command('kbpress %d %d %d' % (row, col, action_type)) 183 184 def key_down(self, key): 185 """Emit key down signal of the keyboard. 186 187 @param key: name of a key defined in KEYMATRIX. 188 """ 189 self._key_action(key, 1) 190 191 def key_up(self, key): 192 """Emit key up signal of the keyboard. 193 194 @param key: name of a key defined in KEYMATRIX. 195 """ 196 self._key_action(key, 0) 197 198 199class EC_USBPD_Port(EC_Common): 200 """Class for CrOS embedded controller for USB-PD Port. 201 202 Public attributes: 203 index: integer of USB type-C port index. 204 205 Public Methods: 206 is_dfp: Determine if data role is Downstream Facing Port (DFP). 207 is_amode_supported: Check if alternate mode is supported by port. 208 is_amode_entered: Check if alternate mode is entered. 209 set_amode: Set an alternate mode. 210 211 Private attributes: 212 _port: integer of USB type-C port id. 213 _port_info: holds usbpd protocol info. 214 _amodes: holds alternate mode info. 215 216 Private methods: 217 _invalidate_port_data: Remove port data to force re-eval. 218 _get_port_info: Get USB-PD port info. 219 _get_amodes: parse and return port's svid info. 220 """ 221 def __init__(self, index): 222 """Constructor. 223 224 @param index: integer of USB type-C port index. 225 """ 226 self.index = index 227 # TODO(crosbug.com/p/38133) target= only works for samus 228 super(EC_USBPD_Port, self).__init__(target='cros_pd') 229 230 # Interrogate port at instantiation. Use invalidate to force re-eval. 231 self._port_info = self._get_port_info() 232 self._amodes = self._get_amodes() 233 234 def _invalidate_port_data(self): 235 """Remove port data to force re-eval.""" 236 self._port_info = None 237 self._amodes = None 238 239 def _get_port_info(self): 240 """Get USB-PD port info. 241 242 ectool command usbpd provides the following information about the port: 243 - Enabled/Disabled 244 - Power & Data Role 245 - Polarity 246 - Protocol State 247 248 At time of authoring it looks like: 249 Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY 250 251 @raises error.TestError if ... 252 port info not parseable. 253 254 @returns dictionary for <port> with keyval pairs: 255 enabled: True | False | None 256 power_role: sink | source | None 257 data_role: UFP | DFP | None 258 is_reversed: True | False | None 259 state: various strings | None 260 """ 261 PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \ 262 'Polarity:CC(\d+)\s+State:(\w+)' 263 264 match = re.search(PORT_INFO_RE, 265 self.ec_command("usbpd %s" % (self.index))) 266 if not match or int(match.group(1)) != self.index: 267 error.TestError('Unable to determine port %d info' % self.index) 268 269 pinfo = dict(enabled=None, power_role=None, data_role=None, 270 is_reversed=None, state=None) 271 pinfo['enabled'] = match.group(2) == 'enabled' 272 pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source' 273 pinfo['data_role'] = match.group(4) 274 pinfo['is_reversed'] = True if match.group(5) == '2' else False 275 pinfo['state'] = match.group(6) 276 logging.debug('port_info = %s', pinfo) 277 return pinfo 278 279 def _get_amodes(self): 280 """Parse alternate modes from pdgetmode. 281 282 Looks like ... 283 *SVID:0xff01 *0x00000485 0x00000000 ... 284 SVID:0x18d1 0x00000001 0x00000000 ... 285 286 @returns dictionary of format: 287 <svid>: {active: True|False, configs: <config_list>, opos:<opos>} 288 where: 289 <svid> : USB-IF Standard or vendor id as 290 hex string (i.e. 0xff01) 291 <config_list> : list of uint32_t configs 292 <opos> : integer of active object position. 293 Note, this is the config list index + 1 294 """ 295 SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)' 296 svids = dict() 297 cmd = 'pdgetmode %d' % self.index 298 for line in self.ec_command(cmd, ignore_status=True).split('\n'): 299 if line.strip() == '': 300 continue 301 logging.debug('pdgetmode line: %s', line) 302 match = re.search(SVID_RE, line) 303 if not match: 304 logging.warning("Unable to parse SVID line %s", line) 305 continue 306 active = match.group(1) == '*' 307 svid = match.group(2) 308 configs_str = match.group(3) 309 configs = list() 310 opos = None 311 for i,config in enumerate(configs_str.split(), 1): 312 if config.startswith('*'): 313 opos = i 314 config = config[1:] 315 config = int(config, 16) 316 # ignore unpopulated configs 317 if config == 0: 318 continue 319 configs.append(config) 320 svids[svid] = dict(active=active, configs=configs, opos=opos) 321 322 logging.debug("Port %d svids = %s", self.index, svids) 323 return svids 324 325 def is_dfp(self): 326 """Determine if data role is Downstream Facing Port (DFP). 327 328 @returns True if DFP False otherwise. 329 """ 330 if self._port_info is None: 331 self._port_info = self._get_port_info() 332 333 return self._port_info['data_role'] == 'DFP' 334 335 def is_amode_supported(self, svid): 336 """Check if alternate mode is supported by port partner. 337 338 @param svid: alternate mode SVID hexstring (i.e. 0xff01) 339 """ 340 if self._amodes is None: 341 self._amodes = self._get_amodes() 342 343 if svid in self._amodes.keys(): 344 return True 345 return False 346 347 def is_amode_entered(self, svid, opos): 348 """Check if alternate mode is entered. 349 350 @param svid: alternate mode SVID hexstring (i.e. 0xff01). 351 @param opos: object position of config to act on. 352 353 @returns True if entered False otherwise 354 """ 355 if self._amodes is None: 356 self._amodes = self._get_amodes() 357 358 if not self.is_amode_supported(svid): 359 return False 360 361 if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos: 362 return True 363 364 return False 365 366 def set_amode(self, svid, opos, enter, delay_secs=2): 367 """Set alternate mode. 368 369 @param svid: alternate mode SVID hexstring (i.e. 0xff01). 370 @param opos: object position of config to act on. 371 @param enter: Boolean of whether to enter mode. 372 373 @raises error.TestError if ... 374 mode not supported. 375 opos is > number of configs. 376 377 @returns True if successful False otherwise 378 """ 379 if self._amodes is None: 380 self._amodes = self._get_amodes() 381 382 if svid not in self._amodes.keys(): 383 raise error.TestError("SVID %s not supported", svid) 384 385 if opos > len(self._amodes[svid]['configs']): 386 raise error.TestError("opos > available configs") 387 388 cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos, 389 1 if enter else 0) 390 self.ec_command(cmd, ignore_status=True) 391 self._invalidate_port_data() 392 393 # allow some time for mode entry/exit 394 time.sleep(delay_secs) 395 return self.is_amode_entered(svid, opos) == enter 396 397 def get_flash_info(self): 398 mat0_re = r'has no discovered device' 399 mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*' 400 mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*' 401 flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major', 402 'dev_minor', 'rw_hash', 'image_status']) 403 404 cmd = 'infopddev %d' % self.index 405 406 tries = 3 407 while (tries): 408 res = self.ec_command(cmd, ignore_status=True) 409 if not 'has no discovered device' in res: 410 break 411 412 tries -= 1 413 time.sleep(1) 414 415 for ln in res.split('\n'): 416 mat1 = re.match(mat1_re, ln) 417 if mat1: 418 flash_dict['ptype'] = int(mat1.group(1)) 419 flash_dict['vid'] = mat1.group(2) 420 flash_dict['pid'] = mat1.group(3) 421 continue 422 423 mat2 = re.match(mat2_re, ln) 424 if mat2: 425 flash_dict['dev_major'] = int(mat2.group(1)) 426 flash_dict['dev_minor'] = int(mat2.group(2)) 427 flash_dict['rw_hash'] = mat2.group(3) 428 flash_dict['image_status'] = mat2.group(4) 429 break 430 431 return flash_dict 432 433 434class EC_USBPD(EC_Common): 435 """Class for CrOS embedded controller for USB-PD. 436 437 Public attributes: 438 ports: list EC_USBPD_Port instances 439 440 Public Methods: 441 get_num_ports: get number of USB-PD ports device has. 442 443 Private attributes: 444 _num_ports: integer number of USB-PD ports device has. 445 """ 446 def __init__(self, num_ports=None): 447 """Constructor. 448 449 @param num_ports: total number of USB-PD ports on device. This is an 450 override. If left 'None' will try to determine. 451 """ 452 self._num_ports = num_ports 453 self.ports = list() 454 455 # TODO(crosbug.com/p/38133) target= only works for samus 456 super(EC_USBPD, self).__init__(target='cros_pd') 457 458 if (self.get_num_ports() == 0): 459 raise error.TestNAError("Device has no USB-PD ports") 460 461 for i in xrange(self._num_ports): 462 self.ports.append(EC_USBPD_Port(i)) 463 464 def get_num_ports(self): 465 """Determine the number of ports for device. 466 467 Uses ectool's usbpdpower command which in turn makes host command call 468 to EC_CMD_USB_PD_PORTS to determine the number of ports. 469 470 TODO(tbroch) May want to consider adding separate ectool command to 471 surface the number of ports directly instead of via usbpdpower 472 473 @returns number of ports. 474 """ 475 if (self._num_ports is not None): 476 return self._num_ports 477 478 self._num_ports = len(self.ec_command("usbpdpower").split(b'\n')) 479 return self._num_ports 480