1# Copyright 2015 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4import logging 5import re 6import time 7 8from autotest_lib.client.bin import utils 9from autotest_lib.client.common_lib import error 10 11 12def has_ectool(): 13 """Determine if ectool shell command is present. 14 15 Returns: 16 boolean true if avail, false otherwise. 17 """ 18 cmd = 'which ectool' 19 return (utils.system(cmd, ignore_status=True) == 0) 20 21 22class EC_Common(object): 23 """Class for EC common. 24 25 This incredibly brief base class is intended to encapsulate common elements 26 across various CrOS MCUs (ec proper, USB-PD, Sensor Hub). At the moment 27 that includes only the use of ectool. 28 """ 29 30 def __init__(self, target='cros_ec'): 31 """Constructor. 32 33 @param target: target name of ec to communicate with. 34 """ 35 if not has_ectool(): 36 ec_info = utils.system_output("mosys ec info", 37 ignore_status=True) 38 logging.warning("Ectool absent on this platform ( %s )", 39 ec_info) 40 raise error.TestNAError("Platform doesn't support ectool") 41 self._target = target 42 43 def ec_command(self, cmd, **kwargs): 44 """Executes ec command and returns results. 45 46 @param cmd: string of command to execute. 47 @param kwargs: optional params passed to utils.system_output 48 49 @returns: string of results from ec command. 50 """ 51 full_cmd = 'ectool --name=%s %s' % (self._target, cmd) 52 result = utils.system_output(full_cmd, **kwargs) 53 logging.debug('Command: %s', full_cmd) 54 logging.debug('Result: %s', result) 55 return result 56 57 58class EC(EC_Common): 59 """Class for CrOS embedded controller (EC).""" 60 HELLO_RE = "EC says hello" 61 GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)" 62 SET_FANSPEED_RE = "Fan target RPM set." 63 TEMP_SENSOR_RE = "Reading temperature...([0-9]*)" 64 TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on" 65 # For battery, check we can see a non-zero capacity value. 66 BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh" 67 LIGHTBAR_RE = "^ 05\s+3f\s+3f$" 68 69 70 def hello(self): 71 """Test EC hello command. 72 73 @returns True if success False otherwise. 74 """ 75 response = self.ec_command('hello') 76 return (re.search(self.HELLO_RE, response) is not None) 77 78 def auto_fan_ctrl(self): 79 """Turns auto fan ctrl on. 80 81 @returns True if success False otherwise. 82 """ 83 response = self.ec_command('autofanctrl') 84 logging.info('Turned on auto fan control.') 85 return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None) 86 87 def get_fanspeed(self): 88 """Gets fanspeed. 89 90 @raises error.TestError if regexp fails to match. 91 92 @returns integer of fan speed RPM. 93 """ 94 response = self.ec_command('pwmgetfanrpm') 95 match = re.search(self.GET_FANSPEED_RE, response) 96 if not match: 97 raise error.TestError('Unable to read fan speed') 98 99 rpm = int(match.group(1)) 100 logging.info('Fan speed: %d', rpm) 101 return rpm 102 103 def set_fanspeed(self, rpm): 104 """Sets fan speed. 105 106 @param rpm: integer of fan speed RPM to set 107 108 @returns True if success False otherwise. 109 """ 110 response = self.ec_command('pwmsetfanrpm %d' % rpm) 111 logging.info('Set fan speed: %d', rpm) 112 return (re.search(self.SET_FANSPEED_RE, response) is not None) 113 114 def get_temperature(self, idx): 115 """Gets temperature from idx sensor. 116 117 @param idx: integer of temp sensor to read. 118 119 @raises error.TestError if fails to read sensor. 120 121 @returns integer of temperature reading in degrees Kelvin. 122 """ 123 response = self.ec_command('temps %d' % idx) 124 match = re.search(self.TEMP_SENSOR_RE, response) 125 if not match: 126 raise error.TestError('Unable to read temperature sensor %d' % idx) 127 128 return int(match.group(1)) 129 130 def get_battery(self): 131 """Get battery presence (design capacity found). 132 133 @returns True if success False otherwise. 134 """ 135 response = self.ec_command('battery') 136 return (re.search(self.BATTERY_RE, response) is not None) 137 138 def get_lightbar(self): 139 """Test lightbar. 140 141 @returns True if success False otherwise. 142 """ 143 self.ec_command('lightbar on') 144 self.ec_command('lightbar init') 145 self.ec_command('lightbar 4 255 255 255') 146 response = self.ec_command('lightbar') 147 self.ec_command('lightbar off') 148 return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None) 149 150 151class EC_USBPD_Port(EC_Common): 152 """Class for CrOS embedded controller for USB-PD Port. 153 154 Public attributes: 155 index: integer of USB type-C port index. 156 157 Public Methods: 158 is_dfp: Determine if data role is Downstream Facing Port (DFP). 159 is_amode_supported: Check if alternate mode is supported by port. 160 is_amode_entered: Check if alternate mode is entered. 161 set_amode: Set an alternate mode. 162 163 Private attributes: 164 _port: integer of USB type-C port id. 165 _port_info: holds usbpd protocol info. 166 _amodes: holds alternate mode info. 167 168 Private methods: 169 _invalidate_port_data: Remove port data to force re-eval. 170 _get_port_info: Get USB-PD port info. 171 _get_amodes: parse and return port's svid info. 172 """ 173 def __init__(self, index): 174 """Constructor. 175 176 @param index: integer of USB type-C port index. 177 """ 178 self.index = index 179 # TODO(crosbug.com/p/38133) target= only works for samus 180 super(EC_USBPD_Port, self).__init__(target='cros_pd') 181 182 # Interrogate port at instantiation. Use invalidate to force re-eval. 183 self._port_info = self._get_port_info() 184 self._amodes = self._get_amodes() 185 186 def _invalidate_port_data(self): 187 """Remove port data to force re-eval.""" 188 self._port_info = None 189 self._amodes = None 190 191 def _get_port_info(self): 192 """Get USB-PD port info. 193 194 ectool command usbpd provides the following information about the port: 195 - Enabled/Disabled 196 - Power & Data Role 197 - Polarity 198 - Protocol State 199 200 At time of authoring it looks like: 201 Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY 202 203 @raises error.TestError if ... 204 port info not parseable. 205 206 @returns dictionary for <port> with keyval pairs: 207 enabled: True | False | None 208 power_role: sink | source | None 209 data_role: UFP | DFP | None 210 is_reversed: True | False | None 211 state: various strings | None 212 """ 213 PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \ 214 'Polarity:CC(\d+)\s+State:(\w+)' 215 216 match = re.search(PORT_INFO_RE, 217 self.ec_command("usbpd %s" % (self.index))) 218 if not match or int(match.group(1)) != self.index: 219 error.TestError('Unable to determine port %d info' % self.index) 220 221 pinfo = dict(enabled=None, power_role=None, data_role=None, 222 is_reversed=None, state=None) 223 pinfo['enabled'] = match.group(2) == 'enabled' 224 pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source' 225 pinfo['data_role'] = match.group(4) 226 pinfo['is_reversed'] = True if match.group(5) == '2' else False 227 pinfo['state'] = match.group(6) 228 logging.debug('port_info = %s', pinfo) 229 return pinfo 230 231 def _get_amodes(self): 232 """Parse alternate modes from pdgetmode. 233 234 Looks like ... 235 *SVID:0xff01 *0x00000485 0x00000000 ... 236 SVID:0x18d1 0x00000001 0x00000000 ... 237 238 @returns dictionary of format: 239 <svid>: {active: True|False, configs: <config_list>, opos:<opos>} 240 where: 241 <svid> : USB-IF Standard or vendor id as 242 hex string (i.e. 0xff01) 243 <config_list> : list of uint32_t configs 244 <opos> : integer of active object position. 245 Note, this is the config list index + 1 246 """ 247 SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)' 248 svids = dict() 249 cmd = 'pdgetmode %d' % self.index 250 for line in self.ec_command(cmd, ignore_status=True).split('\n'): 251 if line.strip() == '': 252 continue 253 logging.debug('pdgetmode line: %s', line) 254 match = re.search(SVID_RE, line) 255 if not match: 256 logging.warning("Unable to parse SVID line %s", line) 257 continue 258 active = match.group(1) == '*' 259 svid = match.group(2) 260 configs_str = match.group(3) 261 configs = list() 262 opos = None 263 for i,config in enumerate(configs_str.split(), 1): 264 if config.startswith('*'): 265 opos = i 266 config = config[1:] 267 config = int(config, 16) 268 # ignore unpopulated configs 269 if config == 0: 270 continue 271 configs.append(config) 272 svids[svid] = dict(active=active, configs=configs, opos=opos) 273 274 logging.debug("Port %d svids = %s", self.index, svids) 275 return svids 276 277 def is_dfp(self): 278 """Determine if data role is Downstream Facing Port (DFP). 279 280 @returns True if DFP False otherwise. 281 """ 282 if self._port_info is None: 283 self._port_info = self._get_port_info() 284 285 return self._port_info['data_role'] == 'DFP' 286 287 def is_amode_supported(self, svid): 288 """Check if alternate mode is supported by port partner. 289 290 @param svid: alternate mode SVID hexstring (i.e. 0xff01) 291 """ 292 if self._amodes is None: 293 self._amodes = self._get_amodes() 294 295 if svid in self._amodes.keys(): 296 return True 297 return False 298 299 def is_amode_entered(self, svid, opos): 300 """Check if alternate mode is entered. 301 302 @param svid: alternate mode SVID hexstring (i.e. 0xff01). 303 @param opos: object position of config to act on. 304 305 @returns True if entered False otherwise 306 """ 307 if self._amodes is None: 308 self._amodes = self._get_amodes() 309 310 if not self.is_amode_supported(svid): 311 return False 312 313 if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos: 314 return True 315 316 return False 317 318 def set_amode(self, svid, opos, enter, delay_secs=2): 319 """Set alternate mode. 320 321 @param svid: alternate mode SVID hexstring (i.e. 0xff01). 322 @param opos: object position of config to act on. 323 @param enter: Boolean of whether to enter mode. 324 325 @raises error.TestError if ... 326 mode not supported. 327 opos is > number of configs. 328 329 @returns True if successful False otherwise 330 """ 331 if self._amodes is None: 332 self._amodes = self._get_amodes() 333 334 if svid not in self._amodes.keys(): 335 raise error.TestError("SVID %s not supported", svid) 336 337 if opos > len(self._amodes[svid]['configs']): 338 raise error.TestError("opos > available configs") 339 340 cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos, 341 1 if enter else 0) 342 self.ec_command(cmd, ignore_status=True) 343 self._invalidate_port_data() 344 345 # allow some time for mode entry/exit 346 time.sleep(delay_secs) 347 return self.is_amode_entered(svid, opos) == enter 348 349 def get_flash_info(self): 350 mat0_re = r'has no discovered device' 351 mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*' 352 mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*' 353 flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major', 354 'dev_minor', 'rw_hash', 'image_status']) 355 356 cmd = 'infopddev %d' % self.index 357 358 tries = 3 359 while (tries): 360 res = self.ec_command(cmd, ignore_status=True) 361 if not 'has no discovered device' in res: 362 break 363 364 tries -= 1 365 time.sleep(1) 366 367 for ln in res.split('\n'): 368 mat1 = re.match(mat1_re, ln) 369 if mat1: 370 flash_dict['ptype'] = int(mat1.group(1)) 371 flash_dict['vid'] = mat1.group(2) 372 flash_dict['pid'] = mat1.group(3) 373 continue 374 375 mat2 = re.match(mat2_re, ln) 376 if mat2: 377 flash_dict['dev_major'] = int(mat2.group(1)) 378 flash_dict['dev_minor'] = int(mat2.group(2)) 379 flash_dict['rw_hash'] = mat2.group(3) 380 flash_dict['image_status'] = mat2.group(4) 381 break 382 383 return flash_dict 384 385 386class EC_USBPD(EC_Common): 387 """Class for CrOS embedded controller for USB-PD. 388 389 Public attributes: 390 ports: list EC_USBPD_Port instances 391 392 Public Methods: 393 get_num_ports: get number of USB-PD ports device has. 394 395 Private attributes: 396 _num_ports: integer number of USB-PD ports device has. 397 """ 398 def __init__(self, num_ports=None): 399 """Constructor. 400 401 @param num_ports: total number of USB-PD ports on device. This is an 402 override. If left 'None' will try to determine. 403 """ 404 self._num_ports = num_ports 405 self.ports = list() 406 407 # TODO(crosbug.com/p/38133) target= only works for samus 408 super(EC_USBPD, self).__init__(target='cros_pd') 409 410 if (self.get_num_ports() == 0): 411 raise error.TestNAError("Device has no USB-PD ports") 412 413 for i in xrange(self._num_ports): 414 self.ports.append(EC_USBPD_Port(i)) 415 416 def get_num_ports(self): 417 """Determine the number of ports for device. 418 419 Uses ectool's usbpdpower command which in turn makes host command call 420 to EC_CMD_USB_PD_PORTS to determine the number of ports. 421 422 TODO(tbroch) May want to consider adding separate ectool command to 423 surface the number of ports directly instead of via usbpdpower 424 425 @returns number of ports. 426 """ 427 if (self._num_ports is not None): 428 return self._num_ports 429 430 self._num_ports = len(self.ec_command("usbpdpower").split(b'\n')) 431 return self._num_ports 432