• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2015 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import print_function
8
9import logging
10import os
11import re
12from six.moves import range
13import time
14
15from autotest_lib.client.bin import utils
16from autotest_lib.client.common_lib import error
17
18# en-US key matrix (from "kb membrane pin matrix.pdf")
19KEYMATRIX = {'`': (3, 1), '1': (6, 1), '2': (6, 4), '3': (6, 2), '4': (6, 3),
20             '5': (3, 3), '6': (3, 6), '7': (6, 6), '8': (6, 5), '9': (6, 9),
21             '0': (6, 8), '-': (3, 8), '=': (0, 8), 'q': (7, 1), 'w': (7, 4),
22             'e': (7, 2), 'r': (7, 3), 't': (2, 3), 'y': (2, 6), 'u': (7, 6),
23             'i': (7, 5), 'o': (7, 9), 'p': (7, 8), '[': (2, 8), ']': (2, 5),
24             '\\': (3, 11), 'a': (4, 1), 's': (4, 4), 'd': (4, 2), 'f': (4, 3),
25             'g': (1, 3), 'h': (1, 6), 'j': (4, 6), 'k': (4, 5), 'l': (4, 9),
26             ';': (4, 8), '\'': (1, 8), 'z': (5, 1), 'x': (5, 4), 'c': (5, 2),
27             'v': (5, 3), 'b': (0, 3), 'n': (0, 6), 'm': (5, 6), ',': (5, 5),
28             '.': (5, 9), '/': (5, 8), ' ': (5, 11), '<right>': (6, 12),
29             '<alt_r>': (0, 10), '<down>': (6, 11), '<tab>': (2, 1),
30             '<f10>': (0, 4), '<shift_r>': (7, 7), '<ctrl_r>': (4, 0),
31             '<esc>': (1, 1), '<backspace>': (1, 11), '<f2>': (3, 2),
32             '<alt_l>': (6, 10), '<ctrl_l>': (2, 0), '<f1>': (0, 2),
33             '<search>': (0, 1), '<f3>': (2, 2), '<f4>': (1, 2), '<f5>': (3, 4),
34             '<f6>': (2, 4), '<f7>': (1, 4), '<f8>': (2, 9), '<f9>': (1, 9),
35             '<up>': (7, 11), '<shift_l>': (5, 7), '<enter>': (4, 11),
36             '<left>': (7, 12)}
37
38
39def has_ectool():
40    """Determine if ectool shell command is present.
41
42    Returns:
43        boolean true if avail, false otherwise.
44    """
45    cmd = 'which ectool'
46    return (utils.system(cmd, ignore_status=True) == 0)
47
48
49def has_cros_ec():
50    """Check whether DUT has chromium ec or not.
51
52    Returns:
53        boolean whether device has ec or not.
54    """
55    return os.path.exists('/dev/cros_ec')
56
57
58class ECError(Exception):
59    """Base class for a failure when communicating with EC."""
60    pass
61
62
63class EC_Common(object):
64    """Class for EC common.
65
66    This incredibly brief base class is intended to encapsulate common elements
67    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
68    that includes only the use of ectool.
69    """
70
71    def __init__(self, target='cros_ec'):
72        """Constructor.
73
74        @param target: target name of ec to communicate with.
75        """
76        if not has_ectool():
77            ec_info = utils.system_output("mosys ec info",
78                                          ignore_status=True)
79            logging.warning("Ectool absent on this platform ( %s )",
80                         ec_info)
81            raise error.TestNAError("Platform doesn't support ectool")
82        self._target = target
83
84    def ec_command(self, cmd, **kwargs):
85        """Executes ec command and returns results.
86
87        @param cmd: string of command to execute.
88        @param kwargs: optional params passed to utils.system_output
89
90        @returns: string of results from ec command.
91        """
92        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
93        logging.debug('Command: %s', full_cmd)
94        result = utils.system_output(full_cmd, **kwargs)
95        logging.debug('Result: %s', result)
96        return result
97
98
99class EC(EC_Common):
100    """Class for CrOS embedded controller (EC)."""
101    HELLO_RE = "EC says hello"
102    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
103    SET_FANSPEED_RE = "Fan target RPM set."
104    TEMP_SENSOR_TEMP_RE = "Reading temperature...([0-9]*)"
105    # <sensor idx>: <sensor type> <sensor name>
106    TEMP_SENSOR_INFO_RE = "(\d+):\s+(\d+)\s+([a-zA-Z_0-9]+)"
107    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
108    # For battery, check we can see a non-zero capacity value.
109    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
110    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
111
112    def __init__(self):
113        """Constructor."""
114        super(EC, self).__init__()
115        self._temperature_dict = None
116
117    def hello(self, **kwargs):
118        """Test EC hello command.
119
120        @param kwargs: optional params passed to utils.system_output
121
122        @returns True if success False otherwise.
123        """
124        response = self.ec_command('hello', **kwargs)
125        return (re.search(self.HELLO_RE, response) is not None)
126
127    def auto_fan_ctrl(self):
128        """Turns auto fan ctrl on.
129
130        @returns True if success False otherwise.
131        """
132        response = self.ec_command('autofanctrl')
133        logging.info('Turned on auto fan control.')
134        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
135
136    def get_fanspeed(self):
137        """Gets fanspeed.
138
139        @raises error.TestError if regexp fails to match.
140
141        @returns integer of fan speed RPM.
142        """
143        response = self.ec_command('pwmgetfanrpm')
144        match = re.search(self.GET_FANSPEED_RE, response)
145        if not match:
146            raise error.TestError('Unable to read fan speed')
147
148        rpm = int(match.group(1))
149        logging.info('Fan speed: %d', rpm)
150        return rpm
151
152    def set_fanspeed(self, rpm):
153        """Sets fan speed.
154
155        @param rpm: integer of fan speed RPM to set
156
157        @returns True if success False otherwise.
158        """
159        response = self.ec_command('pwmsetfanrpm %d' % rpm)
160        logging.info('Set fan speed: %d', rpm)
161        return (re.search(self.SET_FANSPEED_RE, response) is not None)
162
163    def _get_temperature_dict(self):
164        """Read EC temperature name and idx into a dict.
165
166        @returns dict where key=<sensor name>, value =<sensor idx>
167        """
168        # The sensor (name, idx) mapping does not change.
169        if self._temperature_dict:
170            return self._temperature_dict
171
172        temperature_dict = {}
173        response = self.ec_command('tempsinfo all')
174        for rline in response.split('\n'):
175            match = re.search(self.TEMP_SENSOR_INFO_RE, rline)
176            if match:
177                temperature_dict[match.group(3)] = int(match.group(1))
178
179        self._temperature_dict = temperature_dict
180        return temperature_dict
181
182    def get_temperature(self, idx=None, name=None):
183        """Gets temperature from idx sensor.
184
185        Reads temperature either directly if idx is provided or by discovering
186        idx using name.
187
188        @param idx:  integer of temp sensor to read.  Default=None
189        @param name: string of temp sensor to read.  Default=None.
190            For example: Battery, Ambient, Charger, DRAM, eMMC, Gyro
191
192        @raises ECError if fails to find idx of name.
193        @raises error.TestError if fails to read sensor or fails to identify
194        sensor to read from idx & name param.
195
196        @returns integer of temperature reading in degrees Kelvin.
197        """
198        if idx is None:
199            temperature_dict = self._get_temperature_dict()
200            if name in temperature_dict:
201                idx = temperature_dict[name]
202            else:
203                raise ECError('Finding temp idx for name %s' % name)
204
205        response = self.ec_command('temps %d' % idx)
206        match = re.search(self.TEMP_SENSOR_TEMP_RE, response)
207        if not match:
208            raise error.TestError('Reading temperature idx %d' % idx)
209
210        return int(match.group(1))
211
212    def get_battery(self):
213        """Get battery presence (design capacity found).
214
215        @returns True if success False otherwise.
216        """
217        try:
218            response = self.ec_command('battery')
219        except error.CmdError:
220            raise ECError('calling EC battery command')
221
222        return (re.search(self.BATTERY_RE, response) is not None)
223
224    def get_lightbar(self):
225        """Test lightbar.
226
227        @returns True if success False otherwise.
228        """
229        self.ec_command('lightbar on')
230        self.ec_command('lightbar init')
231        self.ec_command('lightbar 4 255 255 255')
232        response = self.ec_command('lightbar')
233        self.ec_command('lightbar off')
234        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
235
236    def key_press(self, key):
237        """Emit key down and up signal of the keyboard.
238
239        @param key: name of a key defined in KEYMATRIX.
240        """
241        self.key_down(key)
242        self.key_up(key)
243
244    def _key_action(self, key, action_type):
245        if not key in KEYMATRIX:
246            raise error.TestError('Unknown key: ' + key)
247        row, col = KEYMATRIX[key]
248        self.ec_command('kbpress %d %d %d' % (row, col, action_type))
249
250    def key_down(self, key):
251        """Emit key down signal of the keyboard.
252
253        @param key: name of a key defined in KEYMATRIX.
254        """
255        self._key_action(key, 1)
256
257    def key_up(self, key):
258        """Emit key up signal of the keyboard.
259
260        @param key: name of a key defined in KEYMATRIX.
261        """
262        self._key_action(key, 0)
263
264
265class EC_USBPD_Port(EC_Common):
266    """Class for CrOS embedded controller for USB-PD Port.
267
268    Public attributes:
269        index: integer of USB type-C port index.
270
271    Public Methods:
272        is_dfp: Determine if data role is Downstream Facing Port (DFP).
273        is_amode_supported: Check if alternate mode is supported by port.
274        is_amode_entered: Check if alternate mode is entered.
275        set_amode: Set an alternate mode.
276
277    Private attributes:
278        _port: integer of USB type-C port id.
279        _port_info: holds usbpd protocol info.
280        _amodes: holds alternate mode info.
281
282    Private methods:
283        _invalidate_port_data: Remove port data to force re-eval.
284        _get_port_info: Get USB-PD port info.
285        _get_amodes: parse and return port's svid info.
286    """
287    def __init__(self, index):
288        """Constructor.
289
290        @param index: integer of USB type-C port index.
291        """
292        self.index = index
293        # TODO(crosbug.com/p/38133) target= only works for samus
294        super(EC_USBPD_Port, self).__init__(target='cros_pd')
295
296        # Interrogate port at instantiation.  Use invalidate to force re-eval.
297        self._port_info = self._get_port_info()
298        self._amodes = self._get_amodes()
299
300    def _invalidate_port_data(self):
301        """Remove port data to force re-eval."""
302        self._port_info = None
303        self._amodes = None
304
305    def _get_port_info(self):
306        """Get USB-PD port info.
307
308        ectool command usbpd provides the following information about the port:
309          - Enabled/Disabled
310          - Power & Data Role
311          - Polarity
312          - Protocol State
313
314        At time of authoring it looks like:
315          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
316
317        @raises error.TestError if ...
318          port info not parseable.
319
320        @returns dictionary for <port> with keyval pairs:
321          enabled: True | False | None
322          power_role: sink | source | None
323          data_role: UFP | DFP | None
324          is_reversed: True | False | None
325          state: various strings | None
326        """
327        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
328                       'Polarity:CC(\d+)\s+State:(\w+)'
329
330        match = re.search(PORT_INFO_RE,
331                          self.ec_command("usbpd %s" % (self.index)))
332        if not match or int(match.group(1)) != self.index:
333            raise error.TestError('Unable to determine port %d info' %
334                                  self.index)
335
336        pinfo = dict(enabled=None, power_role=None, data_role=None,
337                    is_reversed=None, state=None)
338        pinfo['enabled'] = match.group(2) == 'enabled'
339        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
340        pinfo['data_role'] = match.group(4)
341        pinfo['is_reversed'] = True if match.group(5) == '2' else False
342        pinfo['state'] = match.group(6)
343        logging.debug('port_info = %s', pinfo)
344        return pinfo
345
346    def _get_amodes(self):
347        """Parse alternate modes from pdgetmode.
348
349        Looks like ...
350          *SVID:0xff01 *0x00000485  0x00000000 ...
351          SVID:0x18d1   0x00000001  0x00000000 ...
352
353        @returns dictionary of format:
354          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
355            where:
356              <svid>        : USB-IF Standard or vendor id as
357                              hex string (i.e. 0xff01)
358              <config_list> : list of uint32_t configs
359              <opos>        : integer of active object position.
360                              Note, this is the config list index + 1
361        """
362        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
363        svids = dict()
364        cmd = 'pdgetmode %d' % self.index
365        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
366            if line.strip() == '':
367                continue
368            logging.debug('pdgetmode line: %s', line)
369            match = re.search(SVID_RE, line)
370            if not match:
371                logging.warning("Unable to parse SVID line %s", line)
372                continue
373            active = match.group(1) == '*'
374            svid = match.group(2)
375            configs_str = match.group(3)
376            configs = list()
377            opos = None
378            for i,config in enumerate(configs_str.split(), 1):
379                if config.startswith('*'):
380                    opos = i
381                    config = config[1:]
382                config = int(config, 16)
383                # ignore unpopulated configs
384                if config == 0:
385                    continue
386                configs.append(config)
387            svids[svid] = dict(active=active, configs=configs, opos=opos)
388
389        logging.debug("Port %d svids = %s", self.index, svids)
390        return svids
391
392    def is_dfp(self):
393        """Determine if data role is Downstream Facing Port (DFP).
394
395        @returns True if DFP False otherwise.
396        """
397        if self._port_info is None:
398            self._port_info = self._get_port_info()
399
400        return self._port_info['data_role'] == 'DFP'
401
402    def is_amode_supported(self, svid):
403        """Check if alternate mode is supported by port partner.
404
405        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
406        """
407        if self._amodes is None:
408            self._amodes = self._get_amodes()
409
410        if svid in self._amodes.keys():
411            return True
412        return False
413
414    def is_amode_entered(self, svid, opos):
415        """Check if alternate mode is entered.
416
417        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
418        @param opos: object position of config to act on.
419
420        @returns True if entered False otherwise
421        """
422        if self._amodes is None:
423            self._amodes = self._get_amodes()
424
425        if not self.is_amode_supported(svid):
426            return False
427
428        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
429            return True
430
431        return False
432
433    def set_amode(self, svid, opos, enter, delay_secs=2):
434        """Set alternate mode.
435
436        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
437        @param opos: object position of config to act on.
438        @param enter: Boolean of whether to enter mode.
439
440        @raises error.TestError if ...
441           mode not supported.
442           opos is > number of configs.
443
444        @returns True if successful False otherwise
445        """
446        if self._amodes is None:
447            self._amodes = self._get_amodes()
448
449        if svid not in self._amodes.keys():
450            raise error.TestError("SVID %s not supported" % svid)
451
452        if opos > len(self._amodes[svid]['configs']):
453            raise error.TestError("opos > available configs")
454
455        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
456                                         1 if enter else 0)
457        self.ec_command(cmd, ignore_status=True)
458        self._invalidate_port_data()
459
460        # allow some time for mode entry/exit
461        time.sleep(delay_secs)
462        return self.is_amode_entered(svid, opos) == enter
463
464    def get_flash_info(self):
465        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
466        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
467        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
468                                    'dev_minor', 'rw_hash', 'image_status'])
469
470        cmd = 'infopddev %d' % self.index
471
472        tries = 3
473        while (tries):
474            res = self.ec_command(cmd, ignore_status=True)
475            if not 'has no discovered device' in res:
476                break
477
478            tries -= 1
479            time.sleep(1)
480
481        for ln in res.split('\n'):
482            mat1 = re.match(mat1_re, ln)
483            if mat1:
484                flash_dict['ptype'] = int(mat1.group(1))
485                flash_dict['vid'] = mat1.group(2)
486                flash_dict['pid'] = mat1.group(3)
487                continue
488
489            mat2 = re.match(mat2_re, ln)
490            if mat2:
491                flash_dict['dev_major'] = int(mat2.group(1))
492                flash_dict['dev_minor'] = int(mat2.group(2))
493                flash_dict['rw_hash'] = mat2.group(3)
494                flash_dict['image_status'] = mat2.group(4)
495                break
496
497        return flash_dict
498
499
500class EC_USBPD(EC_Common):
501    """Class for CrOS embedded controller for USB-PD.
502
503    Public attributes:
504        ports: list EC_USBPD_Port instances
505
506    Public Methods:
507        get_num_ports: get number of USB-PD ports device has.
508
509    Private attributes:
510        _num_ports: integer number of USB-PD ports device has.
511    """
512    def __init__(self, num_ports=None):
513        """Constructor.
514
515        @param num_ports: total number of USB-PD ports on device.  This is an
516          override.  If left 'None' will try to determine.
517        """
518        self._num_ports = num_ports
519        self.ports = list()
520
521        # TODO(crosbug.com/p/38133) target= only works for samus
522        super(EC_USBPD, self).__init__(target='cros_pd')
523
524        if (self.get_num_ports() == 0):
525            raise error.TestNAError("Device has no USB-PD ports")
526
527        for i in range(self._num_ports):
528            self.ports.append(EC_USBPD_Port(i))
529
530    def get_num_ports(self):
531        """Determine the number of ports for device.
532
533        Uses ectool's usbpdpower command which in turn makes host command call
534        to EC_CMD_USB_PD_PORTS to determine the number of ports.
535
536        TODO(tbroch) May want to consider adding separate ectool command to
537        surface the number of ports directly instead of via usbpdpower
538
539        @returns number of ports.
540        """
541        if (self._num_ports is not None):
542            return self._num_ports
543
544        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
545        return self._num_ports
546