• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4import logging
5import re
6import time
7
8from autotest_lib.client.bin import utils
9from autotest_lib.client.common_lib import error
10
11
12def has_ectool():
13    """Determine if ectool shell command is present.
14
15    Returns:
16        boolean true if avail, false otherwise.
17    """
18    cmd = 'which ectool'
19    return (utils.system(cmd, ignore_status=True) == 0)
20
21
22class EC_Common(object):
23    """Class for EC common.
24
25    This incredibly brief base class is intended to encapsulate common elements
26    across various CrOS MCUs (ec proper, USB-PD, Sensor Hub).  At the moment
27    that includes only the use of ectool.
28    """
29
30    def __init__(self, target='cros_ec'):
31        """Constructor.
32
33        @param target: target name of ec to communicate with.
34        """
35        if not has_ectool():
36            ec_info = utils.system_output("mosys ec info",
37                                          ignore_status=True)
38            logging.warning("Ectool absent on this platform ( %s )",
39                         ec_info)
40            raise error.TestNAError("Platform doesn't support ectool")
41        self._target = target
42
43    def ec_command(self, cmd, **kwargs):
44        """Executes ec command and returns results.
45
46        @param cmd: string of command to execute.
47        @param kwargs: optional params passed to utils.system_output
48
49        @returns: string of results from ec command.
50        """
51        full_cmd = 'ectool --name=%s %s' % (self._target, cmd)
52        result = utils.system_output(full_cmd, **kwargs)
53        logging.debug('Command: %s', full_cmd)
54        logging.debug('Result: %s', result)
55        return result
56
57
58class EC(EC_Common):
59    """Class for CrOS embedded controller (EC)."""
60    HELLO_RE = "EC says hello"
61    GET_FANSPEED_RE = "Current fan RPM: ([0-9]*)"
62    SET_FANSPEED_RE = "Fan target RPM set."
63    TEMP_SENSOR_RE = "Reading temperature...([0-9]*)"
64    TOGGLE_AUTO_FAN_RE = "Automatic fan control is now on"
65    # For battery, check we can see a non-zero capacity value.
66    BATTERY_RE = "Design capacity:\s+[1-9]\d*\s+mAh"
67    LIGHTBAR_RE = "^ 05\s+3f\s+3f$"
68
69
70    def hello(self):
71        """Test EC hello command.
72
73        @returns True if success False otherwise.
74        """
75        response = self.ec_command('hello')
76        return (re.search(self.HELLO_RE, response) is not None)
77
78    def auto_fan_ctrl(self):
79        """Turns auto fan ctrl on.
80
81        @returns True if success False otherwise.
82        """
83        response = self.ec_command('autofanctrl')
84        logging.info('Turned on auto fan control.')
85        return (re.search(self.TOGGLE_AUTO_FAN_RE, response) is not None)
86
87    def get_fanspeed(self):
88        """Gets fanspeed.
89
90        @raises error.TestError if regexp fails to match.
91
92        @returns integer of fan speed RPM.
93        """
94        response = self.ec_command('pwmgetfanrpm')
95        match = re.search(self.GET_FANSPEED_RE, response)
96        if not match:
97            raise error.TestError('Unable to read fan speed')
98
99        rpm = int(match.group(1))
100        logging.info('Fan speed: %d', rpm)
101        return rpm
102
103    def set_fanspeed(self, rpm):
104        """Sets fan speed.
105
106        @param rpm: integer of fan speed RPM to set
107
108        @returns True if success False otherwise.
109        """
110        response = self.ec_command('pwmsetfanrpm %d' % rpm)
111        logging.info('Set fan speed: %d', rpm)
112        return (re.search(self.SET_FANSPEED_RE, response) is not None)
113
114    def get_temperature(self, idx):
115        """Gets temperature from idx sensor.
116
117        @param idx: integer of temp sensor to read.
118
119        @raises error.TestError if fails to read sensor.
120
121        @returns integer of temperature reading in degrees Kelvin.
122        """
123        response = self.ec_command('temps %d' % idx)
124        match = re.search(self.TEMP_SENSOR_RE, response)
125        if not match:
126            raise error.TestError('Unable to read temperature sensor %d' % idx)
127
128        return int(match.group(1))
129
130    def get_battery(self):
131        """Get battery presence (design capacity found).
132
133        @returns True if success False otherwise.
134        """
135        response = self.ec_command('battery')
136        return (re.search(self.BATTERY_RE, response) is not None)
137
138    def get_lightbar(self):
139        """Test lightbar.
140
141        @returns True if success False otherwise.
142        """
143        self.ec_command('lightbar on')
144        self.ec_command('lightbar init')
145        self.ec_command('lightbar 4 255 255 255')
146        response = self.ec_command('lightbar')
147        self.ec_command('lightbar off')
148        return (re.search(self.LIGHTBAR_RE, response, re.MULTILINE) is not None)
149
150
151class EC_USBPD_Port(EC_Common):
152    """Class for CrOS embedded controller for USB-PD Port.
153
154    Public attributes:
155        index: integer of USB type-C port index.
156
157    Public Methods:
158        is_dfp: Determine if data role is Downstream Facing Port (DFP).
159        is_amode_supported: Check if alternate mode is supported by port.
160        is_amode_entered: Check if alternate mode is entered.
161        set_amode: Set an alternate mode.
162
163    Private attributes:
164        _port: integer of USB type-C port id.
165        _port_info: holds usbpd protocol info.
166        _amodes: holds alternate mode info.
167
168    Private methods:
169        _invalidate_port_data: Remove port data to force re-eval.
170        _get_port_info: Get USB-PD port info.
171        _get_amodes: parse and return port's svid info.
172    """
173    def __init__(self, index):
174        """Constructor.
175
176        @param index: integer of USB type-C port index.
177        """
178        self.index = index
179        # TODO(crosbug.com/p/38133) target= only works for samus
180        super(EC_USBPD_Port, self).__init__(target='cros_pd')
181
182        # Interrogate port at instantiation.  Use invalidate to force re-eval.
183        self._port_info = self._get_port_info()
184        self._amodes = self._get_amodes()
185
186    def _invalidate_port_data(self):
187        """Remove port data to force re-eval."""
188        self._port_info = None
189        self._amodes = None
190
191    def _get_port_info(self):
192        """Get USB-PD port info.
193
194        ectool command usbpd provides the following information about the port:
195          - Enabled/Disabled
196          - Power & Data Role
197          - Polarity
198          - Protocol State
199
200        At time of authoring it looks like:
201          Port C0 is enabled, Role:SNK UFP Polarity:CC2 State:SNK_READY
202
203        @raises error.TestError if ...
204          port info not parseable.
205
206        @returns dictionary for <port> with keyval pairs:
207          enabled: True | False | None
208          power_role: sink | source | None
209          data_role: UFP | DFP | None
210          is_reversed: True | False | None
211          state: various strings | None
212        """
213        PORT_INFO_RE = 'Port\s+C(\d+)\s+is\s+(\w+),\s+Role:(\w+)\s+(\w+)\s+' + \
214                       'Polarity:CC(\d+)\s+State:(\w+)'
215
216        match = re.search(PORT_INFO_RE,
217                          self.ec_command("usbpd %s" % (self.index)))
218        if not match or int(match.group(1)) != self.index:
219            error.TestError('Unable to determine port %d info' % self.index)
220
221        pinfo = dict(enabled=None, power_role=None, data_role=None,
222                    is_reversed=None, state=None)
223        pinfo['enabled'] = match.group(2) == 'enabled'
224        pinfo['power_role'] = 'sink' if match.group(3) == 'SNK' else 'source'
225        pinfo['data_role'] = match.group(4)
226        pinfo['is_reversed'] = True if match.group(5) == '2' else False
227        pinfo['state'] = match.group(6)
228        logging.debug('port_info = %s', pinfo)
229        return pinfo
230
231    def _get_amodes(self):
232        """Parse alternate modes from pdgetmode.
233
234        Looks like ...
235          *SVID:0xff01 *0x00000485  0x00000000 ...
236          SVID:0x18d1   0x00000001  0x00000000 ...
237
238        @returns dictionary of format:
239          <svid>: {active: True|False, configs: <config_list>, opos:<opos>}
240            where:
241              <svid>        : USB-IF Standard or vendor id as
242                              hex string (i.e. 0xff01)
243              <config_list> : list of uint32_t configs
244              <opos>        : integer of active object position.
245                              Note, this is the config list index + 1
246        """
247        SVID_RE = r'(\*?)SVID:(\S+)\s+(.*)'
248        svids = dict()
249        cmd = 'pdgetmode %d' % self.index
250        for line in self.ec_command(cmd, ignore_status=True).split('\n'):
251            if line.strip() == '':
252                continue
253            logging.debug('pdgetmode line: %s', line)
254            match = re.search(SVID_RE, line)
255            if not match:
256                logging.warning("Unable to parse SVID line %s", line)
257                continue
258            active = match.group(1) == '*'
259            svid = match.group(2)
260            configs_str = match.group(3)
261            configs = list()
262            opos = None
263            for i,config in enumerate(configs_str.split(), 1):
264                if config.startswith('*'):
265                    opos = i
266                    config = config[1:]
267                config = int(config, 16)
268                # ignore unpopulated configs
269                if config == 0:
270                    continue
271                configs.append(config)
272            svids[svid] = dict(active=active, configs=configs, opos=opos)
273
274        logging.debug("Port %d svids = %s", self.index, svids)
275        return svids
276
277    def is_dfp(self):
278        """Determine if data role is Downstream Facing Port (DFP).
279
280        @returns True if DFP False otherwise.
281        """
282        if self._port_info is None:
283            self._port_info = self._get_port_info()
284
285        return self._port_info['data_role'] == 'DFP'
286
287    def is_amode_supported(self, svid):
288        """Check if alternate mode is supported by port partner.
289
290        @param svid: alternate mode SVID hexstring (i.e. 0xff01)
291        """
292        if self._amodes is None:
293            self._amodes = self._get_amodes()
294
295        if svid in self._amodes.keys():
296            return True
297        return False
298
299    def is_amode_entered(self, svid, opos):
300        """Check if alternate mode is entered.
301
302        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
303        @param opos: object position of config to act on.
304
305        @returns True if entered False otherwise
306        """
307        if self._amodes is None:
308            self._amodes = self._get_amodes()
309
310        if not self.is_amode_supported(svid):
311            return False
312
313        if self._amodes[svid]['active'] and self._amodes[svid]['opos'] == opos:
314            return True
315
316        return False
317
318    def set_amode(self, svid, opos, enter, delay_secs=2):
319        """Set alternate mode.
320
321        @param svid: alternate mode SVID hexstring (i.e. 0xff01).
322        @param opos: object position of config to act on.
323        @param enter: Boolean of whether to enter mode.
324
325        @raises error.TestError if ...
326           mode not supported.
327           opos is > number of configs.
328
329        @returns True if successful False otherwise
330        """
331        if self._amodes is None:
332            self._amodes = self._get_amodes()
333
334        if svid not in self._amodes.keys():
335            raise error.TestError("SVID %s not supported", svid)
336
337        if opos > len(self._amodes[svid]['configs']):
338            raise error.TestError("opos > available configs")
339
340        cmd = "pdsetmode %d %s %d %d" % (self.index, svid, opos,
341                                         1 if enter else 0)
342        self.ec_command(cmd, ignore_status=True)
343        self._invalidate_port_data()
344
345        # allow some time for mode entry/exit
346        time.sleep(delay_secs)
347        return self.is_amode_entered(svid, opos) == enter
348
349    def get_flash_info(self):
350        mat0_re = r'has no discovered device'
351        mat1_re = r'.*ptype:(\d+)\s+vid:(\w+)\s+pid:(\w+).*'
352        mat2_re = r'.*DevId:(\d+)\.(\d+)\s+Hash:\s*(\w+.*)\s*CurImg:(\w+).*'
353        flash_dict = dict.fromkeys(['ptype', 'vid', 'pid', 'dev_major',
354                                    'dev_minor', 'rw_hash', 'image_status'])
355
356        cmd = 'infopddev %d' % self.index
357
358        tries = 3
359        while (tries):
360            res = self.ec_command(cmd, ignore_status=True)
361            if not 'has no discovered device' in res:
362                break
363
364            tries -= 1
365            time.sleep(1)
366
367        for ln in res.split('\n'):
368            mat1 = re.match(mat1_re, ln)
369            if mat1:
370                flash_dict['ptype'] = int(mat1.group(1))
371                flash_dict['vid'] = mat1.group(2)
372                flash_dict['pid'] = mat1.group(3)
373                continue
374
375            mat2 = re.match(mat2_re, ln)
376            if mat2:
377                flash_dict['dev_major'] = int(mat2.group(1))
378                flash_dict['dev_minor'] = int(mat2.group(2))
379                flash_dict['rw_hash'] = mat2.group(3)
380                flash_dict['image_status'] = mat2.group(4)
381                break
382
383        return flash_dict
384
385
386class EC_USBPD(EC_Common):
387    """Class for CrOS embedded controller for USB-PD.
388
389    Public attributes:
390        ports: list EC_USBPD_Port instances
391
392    Public Methods:
393        get_num_ports: get number of USB-PD ports device has.
394
395    Private attributes:
396        _num_ports: integer number of USB-PD ports device has.
397    """
398    def __init__(self, num_ports=None):
399        """Constructor.
400
401        @param num_ports: total number of USB-PD ports on device.  This is an
402          override.  If left 'None' will try to determine.
403        """
404        self._num_ports = num_ports
405        self.ports = list()
406
407        # TODO(crosbug.com/p/38133) target= only works for samus
408        super(EC_USBPD, self).__init__(target='cros_pd')
409
410        if (self.get_num_ports() == 0):
411            raise error.TestNAError("Device has no USB-PD ports")
412
413        for i in xrange(self._num_ports):
414            self.ports.append(EC_USBPD_Port(i))
415
416    def get_num_ports(self):
417        """Determine the number of ports for device.
418
419        Uses ectool's usbpdpower command which in turn makes host command call
420        to EC_CMD_USB_PD_PORTS to determine the number of ports.
421
422        TODO(tbroch) May want to consider adding separate ectool command to
423        surface the number of ports directly instead of via usbpdpower
424
425        @returns number of ports.
426        """
427        if (self._num_ports is not None):
428            return self._num_ports
429
430        self._num_ports = len(self.ec_command("usbpdpower").split(b'\n'))
431        return self._num_ports
432