• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import logging
5import re
6import time
7
8from autotest_lib.client.bin import utils
9from autotest_lib.client.common_lib import error
10
11# en-US key matrix (from "kb membrane pin matrix.pdf")
12KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
13             '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
14             '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
15             'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
16             'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
17             '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
18             'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
19             ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
20             'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
21             '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
22             '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
23             '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
24             '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
25             '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
26             '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
27             '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
28             '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
29             '<left>': (7, 12)}
30
31
32def has_ectool():
33    """Determine if ectool shell command is present.
34
35    Returns:
36        boolean true if avail, false otherwise.
37    """
38    cmd = 'which ectool'
39    return (utils.system(cmd, ignore_status=True) == 0)
40
41
42class ECError(Exception):
43    """Base class for a failure when communicating with EC."""
44    pass
45
46
47class EC_Common(object):
48    """Class for EC common.
49
50    This incredibly brief base class is intended to encapsulate common elements
51    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
52    that includes only the use of ectool.
53    """
54
55    def __init__(self, target='cros_ec'):
56        """Constructor.
57
58        @param target: target name of ec to communicate with.
59        """
60        if not has_ectool():
61            ec_info = utils.system_output("mosys ec info",
62                                          ignore_status=True)
63            logging.warning("Ectool absent on this platform ( %s )",
64                         ec_info)
65            raise error.TestNAError("Platform doesn't support ectool")
66        self._target = target
67
68    def ec_command(self, cmd, **kwargs):
69        """Executes ec command and returns results.
70
71        @param cmd: string of command to execute.
72        @param kwargs: optional params passed to utils.system_output
73
74        @returns: string of results from ec command.
75        """
76        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
77        logging.debug('Command: %s', full_cmd)
78        result = utils.system_output(full_cmd, **kwargs)
79        logging.debug('Result: %s', result)
80        return result
81
82
83class EC(EC_Common):
84    """Class for CrOS embedded controller (EC)."""
85    HELLO_RE = "EC says hello"
86    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
87    SET_FANSPEED_RE = "Fan target RPM set."
88    TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)"
89    # <sensor idx>: <sensor type> <sensor name>
90    TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)"
91    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
92    # For battery, check we can see a non-zero capacity value.
93    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
94    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
95
96    def __init__(self):
97        """Constructor."""
98        super(EC, self).__init__()
99        self._temperature_dict = None
100
101    def hello(self, **kwargs):
102        """Test EC hello command.
103
104        @param kwargs: optional params passed to utils.system_output
105
106        @returns True if success False otherwise.
107        """
108        response = self.ec_command('hello', **kwargs)
109        return (re.search(self.HELLO_RE, response) is not None)
110
111    def auto_fan_ctrl(self):
112        """Turns auto fan ctrl on.
113
114        @returns True if success False otherwise.
115        """
116        response = self.ec_command('autofanctrl')
117        logging.info('Turned on auto fan control.')
118        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
119
120    def get_fanspeed(self):
121        """Gets fanspeed.
122
123        @raises error.TestError if regexp fails to match.
124
125        @returns integer of fan speed RPM.
126        """
127        response = self.ec_command('pwmgetfanrpm')
128        match = re.search(self.GET_FANSPEED_RE, response)
129        if not match:
130            raise error.TestError('Unable to read fan speed')
131
132        rpm = int(match.group(1))
133        logging.info('Fan speed: %d', rpm)
134        return rpm
135
136    def set_fanspeed(self, rpm):
137        """Sets fan speed.
138
139        @param rpm: integer of fan speed RPM to set
140
141        @returns True if success False otherwise.
142        """
143        response = self.ec_command('pwmsetfanrpm %d' % rpm)
144        logging.info('Set fan speed: %d', rpm)
145        return (re.search(self.SET_FANSPEED_RE, response) is not None)
146
147    def _get_temperature_dict(self):
148        """Read EC temperature name and idx into a dict.
149
150        @returns dict where key=<sensor name>, value =<sensor idx>
151        """
152        # The sensor (name, idx) mapping does not change.
153        if self._temperature_dict:
154            return self._temperature_dict
155
156        temperature_dict = {}
157        response = self.ec_command('tempsinfo all')
158        for rline in response.split('\n'):
159            match = re.search(self.TEMP_SENSOR_INFO_RE, rline)
160            if match:
161                temperature_dict[match.group(3)] = int(match.group(1))
162
163        self._temperature_dict = temperature_dict
164        return temperature_dict
165
166    def get_temperature(self, idx=None, name=None):
167        """Gets temperature from idx sensor.
168
169        Reads temperature either directly if idx is provided or by discovering
170        idx using name.
171
172        @param idx:  integer of temp sensor to read.  Default=None
173        @param name: string of temp sensor to read.  Default=None.
174            For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro
175
176        @raises ECError if fails to find idx of name.
177        @raises error.TestError if fails to read sensor or fails to identify
178        sensor to read from idx & name param.
179
180        @returns integer of temperature reading in degrees Kelvin.
181        """
182        if idx is None:
183            temperature_dict = self._get_temperature_dict()
184            if name in temperature_dict:
185                idx = temperature_dict[name]
186            else:
187                raise ECError('Finding temp idx for name %s' % name)
188
189        response = self.ec_command('temps %d' % idx)
190        match = re.search(self.TEMP_SENSOR_TEMP_RE, response)
191        if not match:
192            raise error.TestError('Reading temperature idx %d' % idx)
193
194        return int(match.group(1))
195
196    def get_battery(self):
197        """Get battery presence (design capacity found).
198
199        @returns True if success False otherwise.
200        """
201        try:
202            response = self.ec_command('battery')
203        except error.CmdError:
204            raise ECError('calling EC battery command')
205
206        return (re.search(self.BATTERY_RE, response) is not None)
207
208    def get_lightbar(self):
209        """Test lightbar.
210
211        @returns True if success False otherwise.
212        """
213        self.ec_command('lightbar on')
214        self.ec_command('lightbar init')
215        self.ec_command('lightbar 4 255 255 255')
216        response = self.ec_command('lightbar')
217        self.ec_command('lightbar off')
218        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
219
220    def key_press(self, key):
221        """Emit key down and up signal of the keyboard.
222
223        @param key: name of a key defined in KEYMATRIX.
224        """
225        self.key_down(key)
226        self.key_up(key)
227
228    def _key_action(self, key, action_type):
229        if not key in KEYMATRIX:
230            raise error.TestError('Unknown key: ' + key)
231        row, col = KEYMATRIX[key]
232        self.ec_command('kbpress %d %d %d' % (row, col, action_type))
233
234    def key_down(self, key):
235        """Emit key down signal of the keyboard.
236
237        @param key: name of a key defined in KEYMATRIX.
238        """
239        self._key_action(key, 1)
240
241    def key_up(self, key):
242        """Emit key up signal of the keyboard.
243
244        @param key: name of a key defined in KEYMATRIX.
245        """
246        self._key_action(key, 0)
247
248
249class EC_USBPD_Port(EC_Common):
250    """Class for CrOS embedded controller for USB-PD Port.
251
252    Public attributes:
253        index: integer of USB type-C port index.
254
255    Public Methods:
256        is_dfp: Determine if data role is Downstream Facing Port (DFP).
257        is_amode_supported: Check if alternate mode is supported by port.
258        is_amode_entered: Check if alternate mode is entered.
259        set_amode: Set an alternate mode.
260
261    Private attributes:
262        _port: integer of USB type-C port id.
263        _port_info: holds usbpd protocol info.
264        _amodes: holds alternate mode info.
265
266    Private methods:
267        _invalidate_port_data: Remove port data to force re-eval.
268        _get_port_info: Get USB-PD port info.
269        _get_amodes: parse and return port's svid info.
270    """
271    def __init__(self, index):
272        """Constructor.
273
274        @param index: integer of USB type-C port index.
275        """
276        self.index = index
277        # TODO(crosbug.com/p/38133) target= only works for samus
278        super(EC_USBPD_Port, self).__init__(target='cros_pd')
279
280        # Interrogate port at instantiation.  Use invalidate to force re-eval.
281        self._port_info = self._get_port_info()
282        self._amodes = self._get_amodes()
283
284    def _invalidate_port_data(self):
285        """Remove port data to force re-eval."""
286        self._port_info = None
287        self._amodes = None
288
289    def _get_port_info(self):
290        """Get USB-PD port info.
291
292        ectool command usbpd provides the following information about the port:
293          - Enabled/Disabled
294          - Power & Data Role
295          - Polarity
296          - Protocol State
297
298        At time of authoring it looks like:
299          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
300
301        @raises error.TestError if ...
302          port info not parseable.
303
304        @returns dictionary for <port> with keyval pairs:
305          enabled: True | False | None
306          power_role: sink | source | None
307          data_role: UFP | DFP | None
308          is_reversed: True | False | None
309          state: various strings | None
310        """
311        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
312                       'Polarity:CC(\d+)\s+State:(\w+)'
313
314        match = re.search(PORT_INFO_RE,
315                          self.ec_command("usbpd %s" % (self.index)))
316        if not match or int(match.group(1)) != self.index:
317            raise error.TestError('Unable to determine port %d info' %
318                                  self.index)
319
320        pinfo = dict(enabled=None, power_role=None, data_role=None,
321                    is_reversed=None, state=None)
322        pinfo['enabled'] = match.group(2) == 'enabled'
323        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
324        pinfo['data_role'] = match.group(4)
325        pinfo['is_reversed'] = True if match.group(5) == '2' else False
326        pinfo['state'] = match.group(6)
327        logging.debug('port_info = %s', pinfo)
328        return pinfo
329
330    def _get_amodes(self):
331        """Parse alternate modes from pdgetmode.
332
333        Looks like ...
334          *SVID:0xff01 *0x00000485  0x00000000 ...
335          SVID:0x18d1   0x00000001  0x00000000 ...
336
337        @returns dictionary of format:
338          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
339            where:
340              <svid>        : USB-IF Standard or vendor id as
341                              hex string (i.e. 0xff01)
342              <config_list> : list of uint32_t configs
343              <opos>        : integer of active object position.
344                              Note, this is the config list index + 1
345        """
346        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
347        svids = dict()
348        cmd = 'pdgetmode %d' % self.index
349        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
350            if line.strip() == '':
351                continue
352            logging.debug('pdgetmode line: %s', line)
353            match = re.search(SVID_RE, line)
354            if not match:
355                logging.warning("Unable to parse SVID line %s", line)
356                continue
357            active = match.group(1) == '*'
358            svid = match.group(2)
359            configs_str = match.group(3)
360            configs = list()
361            opos = None
362            for i,config in enumerate(configs_str.split(), 1):
363                if config.startswith('*'):
364                    opos = i
365                    config = config[1:]
366                config = int(config, 16)
367                # ignore unpopulated configs
368                if config == 0:
369                    continue
370                configs.append(config)
371            svids[svid] = dict(active=active, configs=configs, opos=opos)
372
373        logging.debug("Port %d svids = %s", self.index, svids)
374        return svids
375
376    def is_dfp(self):
377        """Determine if data role is Downstream Facing Port (DFP).
378
379        @returns True if DFP False otherwise.
380        """
381        if self._port_info is None:
382            self._port_info = self._get_port_info()
383
384        return self._port_info['data_role'] == 'DFP'
385
386    def is_amode_supported(self, svid):
387        """Check if alternate mode is supported by port partner.
388
389        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
390        """
391        if self._amodes is None:
392            self._amodes = self._get_amodes()
393
394        if svid in self._amodes.keys():
395            return True
396        return False
397
398    def is_amode_entered(self, svid, opos):
399        """Check if alternate mode is entered.
400
401        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
402        @param opos: object position of config to act on.
403
404        @returns True if entered False otherwise
405        """
406        if self._amodes is None:
407            self._amodes = self._get_amodes()
408
409        if not self.is_amode_supported(svid):
410            return False
411
412        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
413            return True
414
415        return False
416
417    def set_amode(self, svid, opos, enter, delay_secs=2):
418        """Set alternate mode.
419
420        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
421        @param opos: object position of config to act on.
422        @param enter: Boolean of whether to enter mode.
423
424        @raises error.TestError if ...
425           mode not supported.
426           opos is > number of configs.
427
428        @returns True if successful False otherwise
429        """
430        if self._amodes is None:
431            self._amodes = self._get_amodes()
432
433        if svid not in self._amodes.keys():
434            raise error.TestError("SVID %s not supported", svid)
435
436        if opos > len(self._amodes[svid]['configs']):
437            raise error.TestError("opos > available configs")
438
439        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
440                                         1 if enter else 0)
441        self.ec_command(cmd, ignore_status=True)
442        self._invalidate_port_data()
443
444        # allow some time for mode entry/exit
445        time.sleep(delay_secs)
446        return self.is_amode_entered(svid, opos) == enter
447
448    def get_flash_info(self):
449        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
450        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
451        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
452                                    'dev_minor', 'rw_hash', 'image_status'])
453
454        cmd = 'infopddev %d' % self.index
455
456        tries = 3
457        while (tries):
458            res = self.ec_command(cmd, ignore_status=True)
459            if not 'has no discovered device' in res:
460                break
461
462            tries -= 1
463            time.sleep(1)
464
465        for ln in res.split('\n'):
466            mat1 = re.match(mat1_re, ln)
467            if mat1:
468                flash_dict['ptype'] = int(mat1.group(1))
469                flash_dict['vid'] = mat1.group(2)
470                flash_dict['pid'] = mat1.group(3)
471                continue
472
473            mat2 = re.match(mat2_re, ln)
474            if mat2:
475                flash_dict['dev_major'] = int(mat2.group(1))
476                flash_dict['dev_minor'] = int(mat2.group(2))
477                flash_dict['rw_hash'] = mat2.group(3)
478                flash_dict['image_status'] = mat2.group(4)
479                break
480
481        return flash_dict
482
483
484class EC_USBPD(EC_Common):
485    """Class for CrOS embedded controller for USB-PD.
486
487    Public attributes:
488        ports: list EC_USBPD_Port instances
489
490    Public Methods:
491        get_num_ports: get number of USB-PD ports device has.
492
493    Private attributes:
494        _num_ports: integer number of USB-PD ports device has.
495    """
496    def __init__(self, num_ports=None):
497        """Constructor.
498
499        @param num_ports: total number of USB-PD ports on device.  This is an
500          override.  If left 'None' will try to determine.
501        """
502        self._num_ports = num_ports
503        self.ports = list()
504
505        # TODO(crosbug.com/p/38133) target= only works for samus
506        super(EC_USBPD, self).__init__(target='cros_pd')
507
508        if (self.get_num_ports() == 0):
509            raise error.TestNAError("Device has no USB-PD ports")
510
511        for i in xrange(self._num_ports):
512            self.ports.append(EC_USBPD_Port(i))
513
514    def get_num_ports(self):
515        """Determine the number of ports for device.
516
517        Uses ectool's usbpdpower command which in turn makes host command call
518        to EC_CMD_USB_PD_PORTS to determine the number of ports.
519
520        TODO(tbroch) May want to consider adding separate ectool command to
521        surface the number of ports directly instead of via usbpdpower
522
523        @returns number of ports.
524        """
525        if (self._num_ports is not None):
526            return self._num_ports
527
528        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
529        return self._num_ports
530