1# Lint as: python2, python3 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5from __future__ import absolute_import 6from __future__ import division 7from __future__ import print_function 8 9import logging 10import os 11import re 12from six.moves import range 13import time 14 15from autotest_lib.client.bin import utils 16from autotest_lib.client.common_lib import error 17 18# en-US key matrix (from "kb membrane pin matrix.pdf") 19KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3), 20 '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9), 21 '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4), 22 'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6), 23 'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5), 24 '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3), 25 'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9), 26 ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2), 27 'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5), 28 '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12), 29 '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1), 30 '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0), 31 '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2), 32 '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2), 33 '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4), 34 '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9), 35 '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11), 36 '<left>': (7, 12)} 37 38 39def has_ectool(): 40 """Determine if ectool shell command is present. 41 42 Returns: 43 boolean true if avail, false otherwise. 44 """ 45 cmd = 'which ectool' 46 return (utils.system(cmd, ignore_status=True) == 0) 47 48 49def has_cros_ec(): 50 """Check whether DUT has chromium ec or not. 51 52 Returns: 53 boolean whether device has ec or not. 54 """ 55 return os.path.exists('/dev/cros_ec') 56 57 58class ECError(Exception): 59 """Base class for a failure when communicating with EC.""" 60 pass 61 62 63class EC_Common(object): 64 """Class for EC common. 65 66 This incredibly brief base class is intended to encapsulate common elements 67 across various CrOS MCUs (ec proper, USB-PD, Sensor Hub). At the moment 68 that includes only the use of ectool. 69 """ 70 71 def __init__(self, target='cros_ec'): 72 """Constructor. 73 74 @param target: target name of ec to communicate with. 75 """ 76 if not has_ectool(): 77 ec_info = utils.system_output("mosys ec info", 78 ignore_status=True) 79 logging.warning("Ectool absent on this platform ( %s )", 80 ec_info) 81 raise error.TestNAError("Platform doesn't support ectool") 82 self._target = target 83 84 def ec_command(self, cmd, **kwargs): 85 """Executes ec command and returns results. 86 87 @param cmd: string of command to execute. 88 @param kwargs: optional params passed to utils.system_output 89 90 @returns: string of results from ec command. 91 """ 92 full_cmd = 'ectool --name=%s %s' % (self._target, cmd) 93 logging.debug('Command: %s', full_cmd) 94 result = utils.system_output(full_cmd, **kwargs) 95 logging.debug('Result: %s', result) 96 return result 97 98 99class EC(EC_Common): 100 """Class for CrOS embedded controller (EC).""" 101 HELLO_RE = "EC says hello" 102 GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)" 103 SET_FANSPEED_RE = "Fan target RPM set." 104 TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)" 105 # <sensor idx>: <sensor type> <sensor name> 106 TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)" 107 TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on" 108 # For battery, check we can see a non-zero capacity value. 109 BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh" 110 LIGHTBAR_RE = "^ 05\s+3f\s+3f$" 111 112 def __init__(self): 113 """Constructor.""" 114 super(EC, self).__init__() 115 self._temperature_dict = None 116 117 def hello(self, **kwargs): 118 """Test EC hello command. 119 120 @param kwargs: optional params passed to utils.system_output 121 122 @returns True if success False otherwise. 123 """ 124 response = self.ec_command('hello', **kwargs) 125 return (re.search(self.HELLO_RE, response) is not None) 126 127 def auto_fan_ctrl(self): 128 """Turns auto fan ctrl on. 129 130 @returns True if success False otherwise. 131 """ 132 response = self.ec_command('autofanctrl') 133 logging.info('Turned on auto fan control.') 134 return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None) 135 136 def get_fanspeed(self): 137 """Gets fanspeed. 138 139 @raises error.TestError if regexp fails to match. 140 141 @returns integer of fan speed RPM. 142 """ 143 response = self.ec_command('pwmgetfanrpm') 144 match = re.search(self.GET_FANSPEED_RE, response) 145 if not match: 146 raise error.TestError('Unable to read fan speed') 147 148 rpm = int(match.group(1)) 149 logging.info('Fan speed: %d', rpm) 150 return rpm 151 152 def set_fanspeed(self, rpm): 153 """Sets fan speed. 154 155 @param rpm: integer of fan speed RPM to set 156 157 @returns True if success False otherwise. 158 """ 159 response = self.ec_command('pwmsetfanrpm %d' % rpm) 160 logging.info('Set fan speed: %d', rpm) 161 return (re.search(self.SET_FANSPEED_RE, response) is not None) 162 163 def _get_temperature_dict(self): 164 """Read EC temperature name and idx into a dict. 165 166 @returns dict where key=<sensor name>, value =<sensor idx> 167 """ 168 # The sensor (name, idx) mapping does not change. 169 if self._temperature_dict: 170 return self._temperature_dict 171 172 temperature_dict = {} 173 response = self.ec_command('tempsinfo all') 174 for rline in response.split('\n'): 175 match = re.search(self.TEMP_SENSOR_INFO_RE, rline) 176 if match: 177 temperature_dict[match.group(3)] = int(match.group(1)) 178 179 self._temperature_dict = temperature_dict 180 return temperature_dict 181 182 def get_temperature(self, idx=None, name=None): 183 """Gets temperature from idx sensor. 184 185 Reads temperature either directly if idx is provided or by discovering 186 idx using name. 187 188 @param idx: integer of temp sensor to read. Default=None 189 @param name: string of temp sensor to read. Default=None. 190 For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro 191 192 @raises ECError if fails to find idx of name. 193 @raises error.TestError if fails to read sensor or fails to identify 194 sensor to read from idx & name param. 195 196 @returns integer of temperature reading in degrees Kelvin. 197 """ 198 if idx is None: 199 temperature_dict = self._get_temperature_dict() 200 if name in temperature_dict: 201 idx = temperature_dict[name] 202 else: 203 raise ECError('Finding temp idx for name %s' % name) 204 205 response = self.ec_command('temps %d' % idx) 206 match = re.search(self.TEMP_SENSOR_TEMP_RE, response) 207 if not match: 208 raise error.TestError('Reading temperature idx %d' % idx) 209 210 return int(match.group(1)) 211 212 def get_battery(self): 213 """Get battery presence (design capacity found). 214 215 @returns True if success False otherwise. 216 """ 217 try: 218 response = self.ec_command('battery') 219 except error.CmdError: 220 raise ECError('calling EC battery command') 221 222 return (re.search(self.BATTERY_RE, response) is not None) 223 224 def get_lightbar(self): 225 """Test lightbar. 226 227 @returns True if success False otherwise. 228 """ 229 self.ec_command('lightbar on') 230 self.ec_command('lightbar init') 231 self.ec_command('lightbar 4 255 255 255') 232 response = self.ec_command('lightbar') 233 self.ec_command('lightbar off') 234 return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None) 235 236 def key_press(self, key): 237 """Emit key down and up signal of the keyboard. 238 239 @param key: name of a key defined in KEYMATRIX. 240 """ 241 self.key_down(key) 242 self.key_up(key) 243 244 def _key_action(self, key, action_type): 245 if not key in KEYMATRIX: 246 raise error.TestError('Unknown key: ' + key) 247 row, col = KEYMATRIX[key] 248 self.ec_command('kbpress %d %d %d' % (row, col, action_type)) 249 250 def key_down(self, key): 251 """Emit key down signal of the keyboard. 252 253 @param key: name of a key defined in KEYMATRIX. 254 """ 255 self._key_action(key, 1) 256 257 def key_up(self, key): 258 """Emit key up signal of the keyboard. 259 260 @param key: name of a key defined in KEYMATRIX. 261 """ 262 self._key_action(key, 0) 263 264 265class EC_USBPD_Port(EC_Common): 266 """Class for CrOS embedded controller for USB-PD Port. 267 268 Public attributes: 269 index: integer of USB type-C port index. 270 271 Public Methods: 272 is_dfp: Determine if data role is Downstream Facing Port (DFP). 273 is_amode_supported: Check if alternate mode is supported by port. 274 is_amode_entered: Check if alternate mode is entered. 275 set_amode: Set an alternate mode. 276 277 Private attributes: 278 _port: integer of USB type-C port id. 279 _port_info: holds usbpd protocol info. 280 _amodes: holds alternate mode info. 281 282 Private methods: 283 _invalidate_port_data: Remove port data to force re-eval. 284 _get_port_info: Get USB-PD port info. 285 _get_amodes: parse and return port's svid info. 286 """ 287 def __init__(self, index): 288 """Constructor. 289 290 @param index: integer of USB type-C port index. 291 """ 292 self.index = index 293 # TODO(crosbug.com/p/38133) target= only works for samus 294 super(EC_USBPD_Port, self).__init__(target='cros_pd') 295 296 # Interrogate port at instantiation. Use invalidate to force re-eval. 297 self._port_info = self._get_port_info() 298 self._amodes = self._get_amodes() 299 300 def _invalidate_port_data(self): 301 """Remove port data to force re-eval.""" 302 self._port_info = None 303 self._amodes = None 304 305 def _get_port_info(self): 306 """Get USB-PD port info. 307 308 ectool command usbpd provides the following information about the port: 309 - Enabled/Disabled 310 - Power & Data Role 311 - Polarity 312 - Protocol State 313 314 At time of authoring it looks like: 315 Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY 316 317 @raises error.TestError if ... 318 port info not parseable. 319 320 @returns dictionary for <port> with keyval pairs: 321 enabled: True | False | None 322 power_role: sink | source | None 323 data_role: UFP | DFP | None 324 is_reversed: True | False | None 325 state: various strings | None 326 """ 327 PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \ 328 'Polarity:CC(\d+)\s+State:(\w+)' 329 330 match = re.search(PORT_INFO_RE, 331 self.ec_command("usbpd %s" % (self.index))) 332 if not match or int(match.group(1)) != self.index: 333 raise error.TestError('Unable to determine port %d info' % 334 self.index) 335 336 pinfo = dict(enabled=None, power_role=None, data_role=None, 337 is_reversed=None, state=None) 338 pinfo['enabled'] = match.group(2) == 'enabled' 339 pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source' 340 pinfo['data_role'] = match.group(4) 341 pinfo['is_reversed'] = True if match.group(5) == '2' else False 342 pinfo['state'] = match.group(6) 343 logging.debug('port_info = %s', pinfo) 344 return pinfo 345 346 def _get_amodes(self): 347 """Parse alternate modes from pdgetmode. 348 349 Looks like ... 350 *SVID:0xff01 *0x00000485 0x00000000 ... 351 SVID:0x18d1 0x00000001 0x00000000 ... 352 353 @returns dictionary of format: 354 <svid>: {active: True|False, configs: <config_list>, opos:<opos>} 355 where: 356 <svid> : USB-IF Standard or vendor id as 357 hex string (i.e. 0xff01) 358 <config_list> : list of uint32_t configs 359 <opos> : integer of active object position. 360 Note, this is the config list index + 1 361 """ 362 SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)' 363 svids = dict() 364 cmd = 'pdgetmode %d' % self.index 365 for line in self.ec_command(cmd, ignore_status=True).split('\n'): 366 if line.strip() == '': 367 continue 368 logging.debug('pdgetmode line: %s', line) 369 match = re.search(SVID_RE, line) 370 if not match: 371 logging.warning("Unable to parse SVID line %s", line) 372 continue 373 active = match.group(1) == '*' 374 svid = match.group(2) 375 configs_str = match.group(3) 376 configs = list() 377 opos = None 378 for i,config in enumerate(configs_str.split(), 1): 379 if config.startswith('*'): 380 opos = i 381 config = config[1:] 382 config = int(config, 16) 383 # ignore unpopulated configs 384 if config == 0: 385 continue 386 configs.append(config) 387 svids[svid] = dict(active=active, configs=configs, opos=opos) 388 389 logging.debug("Port %d svids = %s", self.index, svids) 390 return svids 391 392 def is_dfp(self): 393 """Determine if data role is Downstream Facing Port (DFP). 394 395 @returns True if DFP False otherwise. 396 """ 397 if self._port_info is None: 398 self._port_info = self._get_port_info() 399 400 return self._port_info['data_role'] == 'DFP' 401 402 def is_amode_supported(self, svid): 403 """Check if alternate mode is supported by port partner. 404 405 @param svid: alternate mode SVID hexstring (i.e. 0xff01) 406 """ 407 if self._amodes is None: 408 self._amodes = self._get_amodes() 409 410 if svid in self._amodes.keys(): 411 return True 412 return False 413 414 def is_amode_entered(self, svid, opos): 415 """Check if alternate mode is entered. 416 417 @param svid: alternate mode SVID hexstring (i.e. 0xff01). 418 @param opos: object position of config to act on. 419 420 @returns True if entered False otherwise 421 """ 422 if self._amodes is None: 423 self._amodes = self._get_amodes() 424 425 if not self.is_amode_supported(svid): 426 return False 427 428 if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos: 429 return True 430 431 return False 432 433 def set_amode(self, svid, opos, enter, delay_secs=2): 434 """Set alternate mode. 435 436 @param svid: alternate mode SVID hexstring (i.e. 0xff01). 437 @param opos: object position of config to act on. 438 @param enter: Boolean of whether to enter mode. 439 440 @raises error.TestError if ... 441 mode not supported. 442 opos is > number of configs. 443 444 @returns True if successful False otherwise 445 """ 446 if self._amodes is None: 447 self._amodes = self._get_amodes() 448 449 if svid not in self._amodes.keys(): 450 raise error.TestError("SVID %s not supported" % svid) 451 452 if opos > len(self._amodes[svid]['configs']): 453 raise error.TestError("opos > available configs") 454 455 cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos, 456 1 if enter else 0) 457 self.ec_command(cmd, ignore_status=True) 458 self._invalidate_port_data() 459 460 # allow some time for mode entry/exit 461 time.sleep(delay_secs) 462 return self.is_amode_entered(svid, opos) == enter 463 464 def get_flash_info(self): 465 mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*' 466 mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*' 467 flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major', 468 'dev_minor', 'rw_hash', 'image_status']) 469 470 cmd = 'infopddev %d' % self.index 471 472 tries = 3 473 while (tries): 474 res = self.ec_command(cmd, ignore_status=True) 475 if not 'has no discovered device' in res: 476 break 477 478 tries -= 1 479 time.sleep(1) 480 481 for ln in res.split('\n'): 482 mat1 = re.match(mat1_re, ln) 483 if mat1: 484 flash_dict['ptype'] = int(mat1.group(1)) 485 flash_dict['vid'] = mat1.group(2) 486 flash_dict['pid'] = mat1.group(3) 487 continue 488 489 mat2 = re.match(mat2_re, ln) 490 if mat2: 491 flash_dict['dev_major'] = int(mat2.group(1)) 492 flash_dict['dev_minor'] = int(mat2.group(2)) 493 flash_dict['rw_hash'] = mat2.group(3) 494 flash_dict['image_status'] = mat2.group(4) 495 break 496 497 return flash_dict 498 499 500class EC_USBPD(EC_Common): 501 """Class for CrOS embedded controller for USB-PD. 502 503 Public attributes: 504 ports: list EC_USBPD_Port instances 505 506 Public Methods: 507 get_num_ports: get number of USB-PD ports device has. 508 509 Private attributes: 510 _num_ports: integer number of USB-PD ports device has. 511 """ 512 def __init__(self, num_ports=None): 513 """Constructor. 514 515 @param num_ports: total number of USB-PD ports on device. This is an 516 override. If left 'None' will try to determine. 517 """ 518 self._num_ports = num_ports 519 self.ports = list() 520 521 # TODO(crosbug.com/p/38133) target= only works for samus 522 super(EC_USBPD, self).__init__(target='cros_pd') 523 524 if (self.get_num_ports() == 0): 525 raise error.TestNAError("Device has no USB-PD ports") 526 527 for i in range(self._num_ports): 528 self.ports.append(EC_USBPD_Port(i)) 529 530 def get_num_ports(self): 531 """Determine the number of ports for device. 532 533 Uses ectool's usbpdpower command which in turn makes host command call 534 to EC_CMD_USB_PD_PORTS to determine the number of ports. 535 536 TODO(tbroch) May want to consider adding separate ectool command to 537 surface the number of ports directly instead of via usbpdpower 538 539 @returns number of ports. 540 """ 541 if (self._num_ports is not None): 542 return self._num_ports 543 544 self._num_ports = len(self.ec_command("usbpdpower").split(b'\n')) 545 return self._num_ports 546