• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import functools
6import logging
7import time
8
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.server.cros.servo import chrome_ec
12
13
14def ccd_command(func):
15    """Decorator for methods only relevant to devices using CCD."""
16    @functools.wraps(func)
17    def wrapper(instance, *args, **kwargs):
18        if instance.using_ccd():
19            return func(instance, *args, **kwargs)
20        logging.info("not using ccd. ignoring %s", func.func_name)
21    return wrapper
22
23
24class ChromeCr50(chrome_ec.ChromeConsole):
25    """Manages control of a Chrome Cr50.
26
27    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
28    provides many interfaces to set and get its behavior via console commands.
29    This class is to abstract these interfaces.
30    """
31    IDLE_COUNT = 'count: (\d+)'
32    VERSION_FORMAT = '\d+\.\d+\.\d+'
33    VERSION_ERROR = 'Error'
34    INACTIVE = '\nRW_(A|B): +(%s|%s)(/DBG|)?' % (VERSION_FORMAT, VERSION_ERROR)
35    ACTIVE = '\nRW_(A|B): +\* +(%s)(/DBG|)?' % (VERSION_FORMAT)
36    WAKE_CHAR = '\n'
37    START_UNLOCK_TIMEOUT = 20
38    GETTIME = ['= (\S+)']
39    UNLOCK = ['Unlock sequence starting. Continue until (\S+)']
40    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
41    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
42    MAX_RETRY_COUNT = 5
43    START_STR = ['(.*Console is enabled;)']
44
45
46    def __init__(self, servo):
47        super(ChromeCr50, self).__init__(servo, "cr50_console")
48
49
50    def send_command(self, commands):
51        """Send command through UART.
52
53        Cr50 will drop characters input to the UART when it resumes from sleep.
54        If servo is not using ccd, send some dummy characters before sending the
55        real command to make sure cr50 is awake.
56        """
57        if not self.using_ccd():
58            super(ChromeCr50, self).send_command(self.WAKE_CHAR)
59        super(ChromeCr50, self).send_command(commands)
60
61
62    def send_command_get_output(self, command, regexp_list):
63        """Send command through UART and wait for response.
64
65        Cr50 will drop characters input to the UART when it resumes from sleep.
66        If servo is not using ccd, send some dummy characters before sending the
67        real command to make sure cr50 is awake.
68        """
69        if not self.using_ccd():
70            super(ChromeCr50, self).send_command(self.WAKE_CHAR)
71        return super(ChromeCr50, self).send_command_get_output(command,
72                                                               regexp_list)
73
74
75    def get_deep_sleep_count(self):
76        """Get the deep sleep count from the idle task"""
77        result = self.send_command_get_output('idle', [self.IDLE_COUNT])
78        return int(result[0][1])
79
80
81    def clear_deep_sleep_count(self):
82        """Clear the deep sleep count"""
83        result = self.send_command_get_output('idle c', [self.IDLE_COUNT])
84        if int(result[0][1]):
85            raise error.TestFail("Could not clear deep sleep count")
86
87
88    def has_command(self, cmd):
89        """Returns 1 if cr50 has the command 0 if it doesn't"""
90        try:
91            self.send_command_get_output('help', [cmd])
92        except:
93            logging.info("Image does not include '%s' command", cmd)
94            return 0
95        return 1
96
97
98    def erase_nvmem(self):
99        """Use flasherase to erase both nvmem sections"""
100        if not self.has_command('flasherase'):
101            raise error.TestError("need image with 'flasherase'")
102
103        self.send_command('flasherase 0x7d000 0x3000')
104        self.send_command('flasherase 0x3d000 0x3000')
105
106
107    def reboot(self):
108        """Reboot Cr50 and wait for CCD to be enabled"""
109        self.send_command('reboot')
110        self.wait_for_reboot()
111
112
113    def wait_for_reboot(self, timeout=60):
114        """Wait for cr50 to reboot"""
115        if self.using_ccd():
116            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
117            # go down to detect the reboot.
118            self.wait_for_ccd_disable(timeout, raise_error=False)
119            self.ccd_enable()
120        else:
121            # Look for the boot string declaring the console is ready. If we
122            # don't find it, its ok. The command will timeout after 3 seconds
123            # which is longer than the time it takes for cr50 to reboot.
124            try:
125                rv = self.send_command_get_output('\n\n', self.START_STR)
126                logging.debug(rv[0][0])
127            except:
128                pass
129
130
131    def rollback(self, eraseflashinfo=True):
132        """Set the reset counter high enough to force a rollback then reboot"""
133        if not self.has_command('rw') or not self.has_command('eraseflashinfo'):
134            raise error.TestError("need image with 'rw' and 'eraseflashinfo'")
135
136        inactive_partition = self.get_inactive_version_info()[0]
137        # Increase the reset count to above the rollback threshold
138        self.send_command('rw 0x40000128 1')
139        self.send_command('rw 0x4000012c %d' % (self.MAX_RETRY_COUNT + 2))
140
141        if eraseflashinfo:
142            self.send_command('eraseflashinfo')
143
144        self.reboot()
145
146        running_partition = self.get_active_version_info()[0]
147        if inactive_partition != running_partition:
148            raise error.TestError("Failed to rollback to inactive image")
149
150
151    def rolledback(self):
152        """Returns true if cr50 just rolled back"""
153        return int(self._servo.get('cr50_reset_count')) > self.MAX_RETRY_COUNT
154
155
156    def get_version_info(self, regexp):
157        """Get information from the version command"""
158        return self.send_command_get_output('ver', [regexp])[0][1::]
159
160
161    def get_inactive_version_info(self):
162        """Get the active partition, version, and hash"""
163        return self.get_version_info(self.INACTIVE)
164
165
166    def get_active_version_info(self):
167        """Get the active partition, version, and hash"""
168        return self.get_version_info(self.ACTIVE)
169
170
171    def get_version(self):
172        """Get the RW version"""
173        return self.get_active_version_info()[1].strip()
174
175
176    def using_servo_v4(self):
177        """Returns true if the console is being served using servo v4"""
178        return 'servo_v4' in self._servo.get_servo_version()
179
180
181    def using_ccd(self):
182        """Returns true if the console is being served using CCD"""
183        return 'ccd_cr50' in self._servo.get_servo_version()
184
185
186    @ccd_command
187    def get_ccd_state(self):
188        """Get the CCD state from servo
189
190        Returns:
191            'off' or 'on' based on whether the cr50 console is working.
192        """
193        return self._servo.get('ccd_state')
194
195
196    @ccd_command
197    def wait_for_ccd_state(self, state, timeout, raise_error=True):
198        """Wait up to timeout seconds for CCD to be 'on' or 'off'
199        Args:
200            state: a string either 'on' or 'off'.
201            timeout: time in seconds to wait
202            raise_error: Raise TestFail if the value is state is not reached.
203
204        Raises
205            TestFail if ccd never reaches the specified state
206        """
207        logging.info("Wait until ccd is '%s'", state)
208        value = utils.wait_for_value(self.get_ccd_state, state,
209                                     timeout_sec=timeout)
210        if value != state:
211            error_msg = "timed out before detecting ccd '%s'" % state
212            if raise_error:
213                raise error.TestFail(error_msg)
214            logging.warning(error_msg)
215        logging.info("ccd is '%s'", value)
216
217
218    @ccd_command
219    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
220        """Wait for the cr50 console to stop working"""
221        self.wait_for_ccd_state('off', timeout, raise_error)
222
223
224    @ccd_command
225    def wait_for_ccd_enable(self, timeout=60):
226        """Wait for the cr50 console to start working"""
227        self.wait_for_ccd_state('on', timeout)
228
229
230    def ccd_disable(self):
231        """Change the values of the CC lines to disable CCD"""
232        if self.using_servo_v4():
233            logging.info("disable ccd")
234            self._servo.set_nocheck('servo_v4_dts_mode', 'off')
235            self.wait_for_ccd_disable()
236
237
238    @ccd_command
239    def ccd_enable(self):
240        """Reenable CCD and reset servo interfaces"""
241        logging.info("reenable ccd")
242        self._servo.set_nocheck('servo_v4_ccd_mode', 'ccd')
243        self._servo.set_nocheck('servo_v4_dts_mode', 'on')
244        self._servo.set_nocheck('power_state', 'ccd_reset')
245        self.wait_for_ccd_enable()
246
247
248    def lock_enable(self):
249        """Enable the lock on cr50"""
250        # Lock enable can be run, but we won't be able to use the power button
251        # to disable the lock. Let's not allow the console lock to be enabled
252        # if it can't be disabled without some change to the test setup
253        if self.using_ccd():
254            raise error.TestError("Cannot run 'lock enable' using CCD.")
255        self.send_command_get_output('lock enable',
256                                     ['The restricted console lock is enabled'])
257
258
259    def _attempt_unlock(self):
260        """Try to unlock the console.
261
262        Raises:
263            TestError if the unlock process fails.
264        """
265        # Get the current time.
266        rv = self.send_command_get_output('gettime', self.GETTIME)
267        current_time = float(rv[0][1])
268
269        # Start the unlock process.
270        rv = self.send_command_get_output('lock disable', self.UNLOCK)
271        unlock_finished = float(rv[0][1])
272
273        # Calculate the unlock timeout. There is a 10s countdown to start the
274        # unlock process, so unlock_timeout will be around 10s longer than
275        # necessary.
276        unlock_timeout = int(unlock_finished - current_time)
277        end_time = time.time() + unlock_timeout
278
279        logging.info('Pressing power button for %ds to unlock the console.',
280                     unlock_timeout)
281        logging.info('The process should end at %s', time.ctime(end_time))
282
283        # Press the power button once a second to unlock the console.
284        while time.time() < end_time:
285            self._servo.power_short_press()
286            time.sleep(1)
287
288        if self._servo.get('ccd_lock') != 'off':
289            raise error.TestError('Could not disable lock')
290
291
292    def lock_disable(self):
293        """Increase the console timeout and try disabling the lock."""
294        # We cannot press the power button using ccd.
295        if self.using_ccd():
296            raise error.TestError("Cannot run 'lock disable' using CCD.")
297
298        # The unlock process takes a while to start. Increase the cr50 console
299        # timeout so we can get the entire output of the 'lock disable' start
300        # process
301        original_timeout = self._servo.get('cr50_console_timeout')
302        self._servo.set_nocheck('cr50_console_timeout',
303                                self.START_UNLOCK_TIMEOUT)
304
305        try:
306            # Try to disable the lock
307            self._attempt_unlock()
308        finally:
309            # Reset the cr50_console timeout
310            self._servo.set_nocheck('cr50_console_timeout', original_timeout)
311
312        logging.info('Successfully disabled the lock')
313