• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import ast
6import logging
7import re
8import time
9
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.cros import ec
12from autotest_lib.server.cros.servo import servo
13
14# Hostevent codes, copied from:
15#     ec/include/ec_commands.h
16HOSTEVENT_LID_CLOSED        = 0x00000001
17HOSTEVENT_LID_OPEN          = 0x00000002
18HOSTEVENT_POWER_BUTTON      = 0x00000004
19HOSTEVENT_AC_CONNECTED      = 0x00000008
20HOSTEVENT_AC_DISCONNECTED   = 0x00000010
21HOSTEVENT_BATTERY_LOW       = 0x00000020
22HOSTEVENT_BATTERY_CRITICAL  = 0x00000040
23HOSTEVENT_BATTERY           = 0x00000080
24HOSTEVENT_THERMAL_THRESHOLD = 0x00000100
25HOSTEVENT_THERMAL_OVERLOAD  = 0x00000200
26HOSTEVENT_THERMAL           = 0x00000400
27HOSTEVENT_USB_CHARGER       = 0x00000800
28HOSTEVENT_KEY_PRESSED       = 0x00001000
29HOSTEVENT_INTERFACE_READY   = 0x00002000
30# Keyboard recovery combo has been pressed
31HOSTEVENT_KEYBOARD_RECOVERY = 0x00004000
32# Shutdown due to thermal overload
33HOSTEVENT_THERMAL_SHUTDOWN  = 0x00008000
34# Shutdown due to battery level too low
35HOSTEVENT_BATTERY_SHUTDOWN  = 0x00010000
36HOSTEVENT_INVALID           = 0x80000000
37
38# Time to wait after sending keypress commands.
39KEYPRESS_RECOVERY_TIME = 0.5
40
41
42class ChromeConsole(object):
43    """Manages control of a Chrome console.
44
45    We control the Chrome console via the UART of a Servo board. Chrome console
46    provides many interfaces to set and get its behavior via console commands.
47    This class is to abstract these interfaces.
48    """
49
50    CMD = "_cmd"
51    REGEXP = "_regexp"
52    MULTICMD = "_multicmd"
53
54    def __init__(self, servo, name):
55        """Initialize and keep the servo object.
56
57        Args:
58          servo: A Servo object.
59          name: The console name.
60        """
61        self.name = name
62        self.uart_cmd = self.name + self.CMD
63        self.uart_regexp = self.name + self.REGEXP
64        self.uart_multicmd = self.name + self.MULTICMD
65
66        self._servo = servo
67        self._cached_uart_regexp = None
68
69
70    def set_uart_regexp(self, regexp):
71        if self._cached_uart_regexp == regexp:
72            return
73        self._cached_uart_regexp = regexp
74        self._servo.set(self.uart_regexp, regexp)
75
76
77    def send_command(self, commands):
78        """Send command through UART.
79
80        This function opens UART pty when called, and then command is sent
81        through UART.
82
83        Args:
84          commands: The commands to send, either a list or a string.
85        """
86        self.set_uart_regexp('None')
87        if isinstance(commands, list):
88            try:
89                self._servo.set_nocheck(self.uart_multicmd, ';'.join(commands))
90            except servo.ControlUnavailableError:
91                logging.warning('The servod is too old that uart_multicmd not '
92                                'supported. Use uart_cmd instead.')
93                for command in commands:
94                    self._servo.set_nocheck(self.uart_cmd, command)
95        else:
96            self._servo.set_nocheck(self.uart_cmd, commands)
97
98    def has_command(self, command):
99        """Check whether EC console supports |command|.
100
101        Args:
102          command: Command to look for.
103
104        Returns:
105          True: If the |command| exist on the EC image of the device.
106          False: If the |command| does not exist on the EC image of the device.
107        """
108        result = None
109        try:
110            # Throws error.TestFail (on timeout) if it cannot find a line with
111            # 'command' in the output. Thus return False in that case.
112            result = self.send_command_get_output('help', [command])
113        except error.TestFail:
114            return False
115        return result is not None
116
117    def send_command_get_output(self, command, regexp_list):
118        """Send command through UART and wait for response.
119
120        This function waits for response message matching regular expressions.
121
122        Args:
123          command: The command sent.
124          regexp_list: List of regular expressions used to match response
125            message. Note, list must be ordered.
126
127        Returns:
128          List of tuples, each of which contains the entire matched string and
129          all the subgroups of the match. None if not matched.
130          For example:
131            response of the given command:
132              High temp: 37.2
133              Low temp: 36.4
134            regexp_list:
135              ['High temp: (\d+)\.(\d+)', 'Low temp: (\d+)\.(\d+)']
136            returns:
137              [('High temp: 37.2', '37', '2'), ('Low temp: 36.4', '36', '4')]
138
139        Raises:
140          error.TestError: An error when the given regexp_list is not valid.
141        """
142        if not isinstance(regexp_list, list):
143            raise error.TestError('Arugment regexp_list is not a list: %s' %
144                                  str(regexp_list))
145
146        self.set_uart_regexp(str(regexp_list))
147        self._servo.set_nocheck(self.uart_cmd, command)
148        return ast.literal_eval(self._servo.get(self.uart_cmd))
149
150
151class ChromeEC(ChromeConsole):
152    """Manages control of a Chrome EC.
153
154    We control the Chrome EC via the UART of a Servo board. Chrome EC
155    provides many interfaces to set and get its behavior via console commands.
156    This class is to abstract these interfaces.
157    """
158
159    def __init__(self, servo, name="ec_uart"):
160        super(ChromeEC, self).__init__(servo, name)
161
162
163    def key_down(self, keyname):
164        """Simulate pressing a key.
165
166        Args:
167          keyname: Key name, one of the keys of KEYMATRIX.
168        """
169        self.send_command('kbpress %d %d 1' %
170                (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
171
172
173    def key_up(self, keyname):
174        """Simulate releasing a key.
175
176        Args:
177          keyname: Key name, one of the keys of KEYMATRIX.
178        """
179        self.send_command('kbpress %d %d 0' %
180                (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]))
181
182
183    def key_press(self, keyname):
184        """Press and then release a key.
185
186        Args:
187          keyname: Key name, one of the keys of KEYMATRIX.
188        """
189        self.send_command([
190                'kbpress %d %d 1' %
191                    (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
192                'kbpress %d %d 0' %
193                    (ec.KEYMATRIX[keyname][1], ec.KEYMATRIX[keyname][0]),
194                ])
195        # Don't spam the EC console as fast as we can; leave some recovery time
196        # in between commands.
197        time.sleep(KEYPRESS_RECOVERY_TIME)
198
199
200    def send_key_string_raw(self, string):
201        """Send key strokes consisting of only characters.
202
203        Args:
204          string: Raw string.
205        """
206        for c in string:
207            self.key_press(c)
208
209
210    def send_key_string(self, string):
211        """Send key strokes including special keys.
212
213        Args:
214          string: Character string including special keys. An example
215            is "this is an<tab>example<enter>".
216        """
217        for m in re.finditer("(<[^>]+>)|([^<>]+)", string):
218            sp, raw = m.groups()
219            if raw is not None:
220                self.send_key_string_raw(raw)
221            else:
222                self.key_press(sp)
223
224
225    def reboot(self, flags=''):
226        """Reboot EC with given flags.
227
228        Args:
229          flags: Optional, a space-separated string of flags passed to the
230                 reboot command, including:
231                   default: EC soft reboot;
232                   'hard': EC hard/cold reboot;
233                   'ap-off': Leave AP off after EC reboot (by default, EC turns
234                             AP on after reboot if lid is open).
235
236        Raises:
237          error.TestError: If the string of flags is invalid.
238        """
239        for flag in flags.split():
240            if flag not in ('hard', 'ap-off'):
241                raise error.TestError(
242                        'The flag %s of EC reboot command is invalid.' % flag)
243        self.send_command("reboot %s" % flags)
244
245
246    def set_flash_write_protect(self, enable):
247        """Set the software write protect of EC flash.
248
249        Args:
250          enable: True to enable write protect, False to disable.
251        """
252        if enable:
253            self.send_command("flashwp enable")
254        else:
255            self.send_command("flashwp disable")
256
257
258    def set_hostevent(self, codes):
259        """Set the EC hostevent codes.
260
261        Args:
262          codes: Hostevent codes, HOSTEVENT_*
263        """
264        self.send_command("hostevent set %#x" % codes)
265        # Allow enough time for EC to process input and set flag.
266        # See chromium:371631 for details.
267        # FIXME: Stop importing time module if this hack becomes obsolete.
268        time.sleep(1)
269
270
271    def enable_console_channel(self, channel):
272        """Find console channel mask and enable that channel only
273
274        @param channel: console channel name
275        """
276        # The 'chan' command returns a list of console channels,
277        # their channel masks and channel numbers
278        regexp = r'(\d+)\s+([\w]+)\s+\*?\s+{0}'.format(channel)
279        l = self.send_command_get_output('chan', [regexp])
280        # Use channel mask and append the 0x for proper hex input value
281        cmd = 'chan 0x' + l[0][2]
282        # Set console to only output the desired channel
283        self.send_command(cmd)
284
285
286    def get_version(self):
287        """Get version information from the Chrome EC console.
288           Additionally, can be used to verify if EC console is available.
289        """
290        self.send_command("chan 0")
291        expected_output = ["Chip:\s+([^\r\n]*)\r\n",
292                           "RO:\s+([^\r\n]*)\r\n",
293                           "RW_?[AB]?:\s+([^\r\n]*)\r\n",
294                           "Build:\s+([^\r\n]*)\r\n"]
295        l = self.send_command_get_output("version", expected_output)
296        self.send_command("chan 0xffffffff")
297        return l
298
299
300class ChromeUSBPD(ChromeEC):
301    """Manages control of a Chrome USBPD.
302
303    We control the Chrome EC via the UART of a Servo board. Chrome USBPD
304    provides many interfaces to set and get its behavior via console commands.
305    This class is to abstract these interfaces.
306    """
307
308    def __init__(self, servo):
309        super(ChromeUSBPD, self).__init__(servo, "usbpd_uart")
310