• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import logging
5import re
6import time
7
8from autotest_lib.client.bin import utils
9from autotest_lib.client.common_lib import error
10
11# en-US key matrix (from "kb membrane pin matrix.pdf")
12KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
13             '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
14             '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
15             'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
16             'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
17             '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
18             'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
19             ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
20             'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
21             '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
22             '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
23             '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
24             '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
25             '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
26             '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
27             '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
28             '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
29             '<left>': (7, 12)}
30
31
32def has_ectool():
33    """Determine if ectool shell command is present.
34
35    Returns:
36        boolean true if avail, false otherwise.
37    """
38    cmd = 'which ectool'
39    return (utils.system(cmd, ignore_status=True) == 0)
40
41
42class ECError(Exception):
43    """Base class for a failure when communicating with EC."""
44    pass
45
46
47class EC_Common(object):
48    """Class for EC common.
49
50    This incredibly brief base class is intended to encapsulate common elements
51    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
52    that includes only the use of ectool.
53    """
54
55    def __init__(self, target='cros_ec'):
56        """Constructor.
57
58        @param target: target name of ec to communicate with.
59        """
60        if not has_ectool():
61            ec_info = utils.system_output("mosys ec info",
62                                          ignore_status=True)
63            logging.warning("Ectool absent on this platform ( %s )",
64                         ec_info)
65            raise error.TestNAError("Platform doesn't support ectool")
66        self._target = target
67
68    def ec_command(self, cmd, **kwargs):
69        """Executes ec command and returns results.
70
71        @param cmd: string of command to execute.
72        @param kwargs: optional params passed to utils.system_output
73
74        @returns: string of results from ec command.
75        """
76        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
77        logging.debug('Command: %s', full_cmd)
78        result = utils.system_output(full_cmd, **kwargs)
79        logging.debug('Result: %s', result)
80        return result
81
82
83class EC(EC_Common):
84    """Class for CrOS embedded controller (EC)."""
85    HELLO_RE = "EC says hello"
86    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
87    SET_FANSPEED_RE = "Fan target RPM set."
88    TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)"
89    # <sensor idx>: <sensor type> <sensor name>
90    TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)"
91    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
92    # For battery, check we can see a non-zero capacity value.
93    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
94    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
95
96    def __init__(self):
97        """Constructor."""
98        super(EC, self).__init__()
99        self._temperature_dict = None
100
101    def hello(self, **kwargs):
102        """Test EC hello command.
103
104        @param kwargs: optional params passed to utils.system_output
105
106        @returns True if success False otherwise.
107        """
108        response = self.ec_command('hello', **kwargs)
109        return (re.search(self.HELLO_RE, response) is not None)
110
111    def auto_fan_ctrl(self):
112        """Turns auto fan ctrl on.
113
114        @returns True if success False otherwise.
115        """
116        response = self.ec_command('autofanctrl')
117        logging.info('Turned on auto fan control.')
118        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
119
120    def get_fanspeed(self):
121        """Gets fanspeed.
122
123        @raises error.TestError if regexp fails to match.
124
125        @returns integer of fan speed RPM.
126        """
127        response = self.ec_command('pwmgetfanrpm')
128        match = re.search(self.GET_FANSPEED_RE, response)
129        if not match:
130            raise error.TestError('Unable to read fan speed')
131
132        rpm = int(match.group(1))
133        logging.info('Fan speed: %d', rpm)
134        return rpm
135
136    def set_fanspeed(self, rpm):
137        """Sets fan speed.
138
139        @param rpm: integer of fan speed RPM to set
140
141        @returns True if success False otherwise.
142        """
143        response = self.ec_command('pwmsetfanrpm %d' % rpm)
144        logging.info('Set fan speed: %d', rpm)
145        return (re.search(self.SET_FANSPEED_RE, response) is not None)
146
147    def _get_temperature_dict(self):
148        """Read EC temperature name and idx into a dict.
149
150        @returns dict where key=<sensor name>, value =<sensor idx>
151        """
152        # The sensor (name, idx) mapping does not change.
153        if self._temperature_dict:
154            return self._temperature_dict
155
156        temperature_dict = {}
157        response = self.ec_command('tempsinfo all')
158        for rline in response.split('\n'):
159            match = re.search(self.TEMP_SENSOR_INFO_RE, rline)
160            if match:
161                temperature_dict[match.group(3)] = int(match.group(1))
162
163        self._temperature_dict = temperature_dict
164        return temperature_dict
165
166    def get_temperature(self, idx=None, name=None):
167        """Gets temperature from idx sensor.
168
169        Reads temperature either directly if idx is provided or by discovering
170        idx using name.
171
172        @param idx:  integer of temp sensor to read.  Default=None
173        @param name: string of temp sensor to read.  Default=None.
174            For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro
175
176        @raises ECError if fails to find idx of name.
177        @raises error.TestError if fails to read sensor or fails to identify
178        sensor to read from idx & name param.
179
180        @returns integer of temperature reading in degrees Kelvin.
181        """
182        if idx is None:
183            temperature_dict = self._get_temperature_dict()
184            if name in temperature_dict:
185                idx = temperature_dict[name]
186            else:
187                raise ECError('Finding temp idx for name %s' % name)
188
189        response = self.ec_command('temps %d' % idx)
190        match = re.search(self.TEMP_SENSOR_TEMP_RE, response)
191        if not match:
192            raise error.TestError('Reading temperature idx %d' % idx)
193
194        return int(match.group(1))
195
196    def get_battery(self):
197        """Get battery presence (design capacity found).
198
199        @returns True if success False otherwise.
200        """
201        try:
202            response = self.ec_command('battery')
203        except error.CmdError:
204            raise ECError('calling EC battery command')
205
206        return (re.search(self.BATTERY_RE, response) is not None)
207
208    def get_lightbar(self):
209        """Test lightbar.
210
211        @returns True if success False otherwise.
212        """
213        self.ec_command('lightbar on')
214        self.ec_command('lightbar init')
215        self.ec_command('lightbar 4 255 255 255')
216        response = self.ec_command('lightbar')
217        self.ec_command('lightbar off')
218        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
219
220    def key_press(self, key):
221        """Emit key down and up signal of the keyboard.
222
223        @param key: name of a key defined in KEYMATRIX.
224        """
225        self.key_down(key)
226        self.key_up(key)
227
228    def _key_action(self, key, action_type):
229        if not key in KEYMATRIX:
230            raise error.TestError('Unknown key: ' + key)
231        row, col = KEYMATRIX[key]
232        self.ec_command('kbpress %d %d %d' % (row, col, action_type))
233
234    def key_down(self, key):
235        """Emit key down signal of the keyboard.
236
237        @param key: name of a key defined in KEYMATRIX.
238        """
239        self._key_action(key, 1)
240
241    def key_up(self, key):
242        """Emit key up signal of the keyboard.
243
244        @param key: name of a key defined in KEYMATRIX.
245        """
246        self._key_action(key, 0)
247
248
249class EC_USBPD_Port(EC_Common):
250    """Class for CrOS embedded controller for USB-PD Port.
251
252    Public attributes:
253        index: integer of USB type-C port index.
254
255    Public Methods:
256        is_dfp: Determine if data role is Downstream Facing Port (DFP).
257        is_amode_supported: Check if alternate mode is supported by port.
258        is_amode_entered: Check if alternate mode is entered.
259        set_amode: Set an alternate mode.
260
261    Private attributes:
262        _port: integer of USB type-C port id.
263        _port_info: holds usbpd protocol info.
264        _amodes: holds alternate mode info.
265
266    Private methods:
267        _invalidate_port_data: Remove port data to force re-eval.
268        _get_port_info: Get USB-PD port info.
269        _get_amodes: parse and return port's svid info.
270    """
271    def __init__(self, index):
272        """Constructor.
273
274        @param index: integer of USB type-C port index.
275        """
276        self.index = index
277        # TODO(crosbug.com/p/38133) target= only works for samus
278        super(EC_USBPD_Port, self).__init__(target='cros_pd')
279
280        # Interrogate port at instantiation.  Use invalidate to force re-eval.
281        self._port_info = self._get_port_info()
282        self._amodes = self._get_amodes()
283
284    def _invalidate_port_data(self):
285        """Remove port data to force re-eval."""
286        self._port_info = None
287        self._amodes = None
288
289    def _get_port_info(self):
290        """Get USB-PD port info.
291
292        ectool command usbpd provides the following information about the port:
293          - Enabled/Disabled
294          - Power & Data Role
295          - Polarity
296          - Protocol State
297
298        At time of authoring it looks like:
299          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
300
301        @raises error.TestError if ...
302          port info not parseable.
303
304        @returns dictionary for <port> with keyval pairs:
305          enabled: True | False | None
306          power_role: sink | source | None
307          data_role: UFP | DFP | None
308          is_reversed: True | False | None
309          state: various strings | None
310        """
311        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
312                       'Polarity:CC(\d+)\s+State:(\w+)'
313
314        match = re.search(PORT_INFO_RE,
315                          self.ec_command("usbpd %s" % (self.index)))
316        if not match or int(match.group(1)) != self.index:
317            error.TestError('Unable to determine port %d info' % self.index)
318
319        pinfo = dict(enabled=None, power_role=None, data_role=None,
320                    is_reversed=None, state=None)
321        pinfo['enabled'] = match.group(2) == 'enabled'
322        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
323        pinfo['data_role'] = match.group(4)
324        pinfo['is_reversed'] = True if match.group(5) == '2' else False
325        pinfo['state'] = match.group(6)
326        logging.debug('port_info = %s', pinfo)
327        return pinfo
328
329    def _get_amodes(self):
330        """Parse alternate modes from pdgetmode.
331
332        Looks like ...
333          *SVID:0xff01 *0x00000485  0x00000000 ...
334          SVID:0x18d1   0x00000001  0x00000000 ...
335
336        @returns dictionary of format:
337          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
338            where:
339              <svid>        : USB-IF Standard or vendor id as
340                              hex string (i.e. 0xff01)
341              <config_list> : list of uint32_t configs
342              <opos>        : integer of active object position.
343                              Note, this is the config list index + 1
344        """
345        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
346        svids = dict()
347        cmd = 'pdgetmode %d' % self.index
348        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
349            if line.strip() == '':
350                continue
351            logging.debug('pdgetmode line: %s', line)
352            match = re.search(SVID_RE, line)
353            if not match:
354                logging.warning("Unable to parse SVID line %s", line)
355                continue
356            active = match.group(1) == '*'
357            svid = match.group(2)
358            configs_str = match.group(3)
359            configs = list()
360            opos = None
361            for i,config in enumerate(configs_str.split(), 1):
362                if config.startswith('*'):
363                    opos = i
364                    config = config[1:]
365                config = int(config, 16)
366                # ignore unpopulated configs
367                if config == 0:
368                    continue
369                configs.append(config)
370            svids[svid] = dict(active=active, configs=configs, opos=opos)
371
372        logging.debug("Port %d svids = %s", self.index, svids)
373        return svids
374
375    def is_dfp(self):
376        """Determine if data role is Downstream Facing Port (DFP).
377
378        @returns True if DFP False otherwise.
379        """
380        if self._port_info is None:
381            self._port_info = self._get_port_info()
382
383        return self._port_info['data_role'] == 'DFP'
384
385    def is_amode_supported(self, svid):
386        """Check if alternate mode is supported by port partner.
387
388        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
389        """
390        if self._amodes is None:
391            self._amodes = self._get_amodes()
392
393        if svid in self._amodes.keys():
394            return True
395        return False
396
397    def is_amode_entered(self, svid, opos):
398        """Check if alternate mode is entered.
399
400        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
401        @param opos: object position of config to act on.
402
403        @returns True if entered False otherwise
404        """
405        if self._amodes is None:
406            self._amodes = self._get_amodes()
407
408        if not self.is_amode_supported(svid):
409            return False
410
411        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
412            return True
413
414        return False
415
416    def set_amode(self, svid, opos, enter, delay_secs=2):
417        """Set alternate mode.
418
419        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
420        @param opos: object position of config to act on.
421        @param enter: Boolean of whether to enter mode.
422
423        @raises error.TestError if ...
424           mode not supported.
425           opos is > number of configs.
426
427        @returns True if successful False otherwise
428        """
429        if self._amodes is None:
430            self._amodes = self._get_amodes()
431
432        if svid not in self._amodes.keys():
433            raise error.TestError("SVID %s not supported", svid)
434
435        if opos > len(self._amodes[svid]['configs']):
436            raise error.TestError("opos > available configs")
437
438        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
439                                         1 if enter else 0)
440        self.ec_command(cmd, ignore_status=True)
441        self._invalidate_port_data()
442
443        # allow some time for mode entry/exit
444        time.sleep(delay_secs)
445        return self.is_amode_entered(svid, opos) == enter
446
447    def get_flash_info(self):
448        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
449        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
450        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
451                                    'dev_minor', 'rw_hash', 'image_status'])
452
453        cmd = 'infopddev %d' % self.index
454
455        tries = 3
456        while (tries):
457            res = self.ec_command(cmd, ignore_status=True)
458            if not 'has no discovered device' in res:
459                break
460
461            tries -= 1
462            time.sleep(1)
463
464        for ln in res.split('\n'):
465            mat1 = re.match(mat1_re, ln)
466            if mat1:
467                flash_dict['ptype'] = int(mat1.group(1))
468                flash_dict['vid'] = mat1.group(2)
469                flash_dict['pid'] = mat1.group(3)
470                continue
471
472            mat2 = re.match(mat2_re, ln)
473            if mat2:
474                flash_dict['dev_major'] = int(mat2.group(1))
475                flash_dict['dev_minor'] = int(mat2.group(2))
476                flash_dict['rw_hash'] = mat2.group(3)
477                flash_dict['image_status'] = mat2.group(4)
478                break
479
480        return flash_dict
481
482
483class EC_USBPD(EC_Common):
484    """Class for CrOS embedded controller for USB-PD.
485
486    Public attributes:
487        ports: list EC_USBPD_Port instances
488
489    Public Methods:
490        get_num_ports: get number of USB-PD ports device has.
491
492    Private attributes:
493        _num_ports: integer number of USB-PD ports device has.
494    """
495    def __init__(self, num_ports=None):
496        """Constructor.
497
498        @param num_ports: total number of USB-PD ports on device.  This is an
499          override.  If left 'None' will try to determine.
500        """
501        self._num_ports = num_ports
502        self.ports = list()
503
504        # TODO(crosbug.com/p/38133) target= only works for samus
505        super(EC_USBPD, self).__init__(target='cros_pd')
506
507        if (self.get_num_ports() == 0):
508            raise error.TestNAError("Device has no USB-PD ports")
509
510        for i in xrange(self._num_ports):
511            self.ports.append(EC_USBPD_Port(i))
512
513    def get_num_ports(self):
514        """Determine the number of ports for device.
515
516        Uses ectool's usbpdpower command which in turn makes host command call
517        to EC_CMD_USB_PD_PORTS to determine the number of ports.
518
519        TODO(tbroch) May want to consider adding separate ectool command to
520        surface the number of ports directly instead of via usbpdpower
521
522        @returns number of ports.
523        """
524        if (self._num_ports is not None):
525            return self._num_ports
526
527        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
528        return self._num_ports
529