• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import functools
6import logging
7import time
8
9from autotest_lib.client.bin import utils
10from autotest_lib.client.common_lib import error
11from autotest_lib.client.common_lib.cros import cr50_utils
12from autotest_lib.server.cros.servo import chrome_ec
13
14
15def servo_v4_command(func):
16    """Decorator for methods only relevant to tests running with servo v4."""
17    @functools.wraps(func)
18    def wrapper(instance, *args, **kwargs):
19        """Ignore servo v4 functions it's not being used."""
20        if instance.using_servo_v4():
21            return func(instance, *args, **kwargs)
22        logging.info("not using servo v4. ignoring %s", func.func_name)
23    return wrapper
24
25
26class ChromeCr50(chrome_ec.ChromeConsole):
27    """Manages control of a Chrome Cr50.
28
29    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
30    provides many interfaces to set and get its behavior via console commands.
31    This class is to abstract these interfaces.
32    """
33    # The amount of time you need to show physical presence.
34    PP_SHORT = 15
35    PP_LONG = 300
36    IDLE_COUNT = 'count: (\d+)'
37    # The version has four groups: the partition, the header version, debug
38    # descriptor and then version string.
39    # There are two partitions A and B. The active partition is marked with a
40    # '*'. If it is a debug image '/DBG' is added to the version string. If the
41    # image has been corrupted, the version information will be replaced with
42    # 'Error'.
43    # So the output may look something like this.
44    #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
45    #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
46    # Or like this if the region was corrupted.
47    #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
48    #   RW_B:    Error
49    VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
50    INACTIVE_VERSION = VERSION_FORMAT % ''
51    ACTIVE_VERSION = VERSION_FORMAT % '\*'
52    # Following lines of the version output may print the image board id
53    # information. eg.
54    # BID A:   5a5a4146:ffffffff:00007f00 Yes
55    # BID B:   00000000:00000000:00000000 Yes
56    # Use the first group from ACTIVE_VERSION to match the active board id
57    # partition.
58    BID_ERROR = 'read_board_id: failed'
59    BID_FORMAT = ':\s+[a-f0-9:]+ '
60    ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
61            BID_ERROR)
62    WAKE_CHAR = '\n'
63    START_UNLOCK_TIMEOUT = 20
64    GETTIME = ['= (\S+)']
65    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
66    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
67    MAX_RETRY_COUNT = 5
68    START_STR = ['(.*Console is enabled;)']
69    REBOOT_DELAY_WITH_CCD = 60
70    REBOOT_DELAY_WITH_FLEX = 3
71    ON_STRINGS = ['enable', 'enabled', 'on']
72
73
74    def __init__(self, servo):
75        super(ChromeCr50, self).__init__(servo, 'cr50_uart')
76
77
78    def send_command(self, commands):
79        """Send command through UART.
80
81        Cr50 will drop characters input to the UART when it resumes from sleep.
82        If servo is not using ccd, send some dummy characters before sending the
83        real command to make sure cr50 is awake.
84        """
85        if not self.using_ccd():
86            super(ChromeCr50, self).send_command(self.WAKE_CHAR)
87        super(ChromeCr50, self).send_command(commands)
88
89
90    def send_command_get_output(self, command, regexp_list):
91        """Send command through UART and wait for response.
92
93        Cr50 will drop characters input to the UART when it resumes from sleep.
94        If servo is not using ccd, send some dummy characters before sending the
95        real command to make sure cr50 is awake.
96        """
97        if not self.using_ccd():
98            super(ChromeCr50, self).send_command(self.WAKE_CHAR)
99        return super(ChromeCr50, self).send_command_get_output(command,
100                                                               regexp_list)
101
102
103    def get_deep_sleep_count(self):
104        """Get the deep sleep count from the idle task"""
105        result = self.send_command_get_output('idle', [self.IDLE_COUNT])
106        return int(result[0][1])
107
108
109    def clear_deep_sleep_count(self):
110        """Clear the deep sleep count"""
111        result = self.send_command_get_output('idle c', [self.IDLE_COUNT])
112        if int(result[0][1]):
113            raise error.TestFail("Could not clear deep sleep count")
114
115
116    def has_command(self, cmd):
117        """Returns 1 if cr50 has the command 0 if it doesn't"""
118        try:
119            self.send_command_get_output('help', [cmd])
120        except:
121            logging.info("Image does not include '%s' command", cmd)
122            return 0
123        return 1
124
125
126    def erase_nvmem(self):
127        """Use flasherase to erase both nvmem sections"""
128        if not self.has_command('flasherase'):
129            raise error.TestError("need image with 'flasherase'")
130
131        self.send_command('flasherase 0x7d000 0x3000')
132        self.send_command('flasherase 0x3d000 0x3000')
133
134
135    def reboot(self):
136        """Reboot Cr50 and wait for cr50 to reset"""
137        response = [] if self.using_ccd() else self.START_STR
138        self.send_command_get_output('reboot', response)
139
140        # ccd will stop working after the reboot. Wait until that happens and
141        # reenable it.
142        if self.using_ccd():
143            self.wait_for_reboot()
144
145
146    def _uart_wait_for_reboot(self, timeout=60):
147        """Wait for the cr50 to reboot and enable the console.
148
149        This will wait up to timeout seconds for cr50 to print the start string.
150
151        Args:
152            timeout: seconds to wait to detect the reboot.
153        """
154        original_timeout = float(self._servo.get('cr50_uart_timeout'))
155        # Change the console timeout to timeout, so we wait at least that long
156        # for cr50 to print the start string.
157        self._servo.set_nocheck('cr50_uart_timeout', timeout)
158        try:
159            self.send_command_get_output('\n', self.START_STR)
160            logging.debug('Detected cr50 reboot')
161        except error.TestFail, e:
162            logging.debug('Failed to detect cr50 reboot')
163        # Reset the timeout.
164        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
165
166
167    def wait_for_reboot(self, timeout=60):
168        """Wait for cr50 to reboot"""
169        if self.using_ccd():
170            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
171            # go down to detect the reboot.
172            self.wait_for_ccd_disable(timeout, raise_error=False)
173            self.ccd_enable()
174        else:
175            self._uart_wait_for_reboot(timeout)
176
177
178    def rollback(self, eraseflashinfo=True, chip_bid=None, chip_flags=None):
179        """Set the reset counter high enough to force a rollback then reboot
180
181        Set the new board id before rolling back if one is given.
182
183        Args:
184            eraseflashinfo: True if eraseflashinfo should be run before rollback
185            chip_bid: the integer representation of chip board id or None if the
186                      board id should be erased during rollback
187            chip_flags: the integer representation of chip board id flags or
188                        None if the board id should be erased during rollback
189        """
190        if (not self.has_command('rollback') or not
191            self.has_command('eraseflashinfo')):
192            raise error.TestError("need image with 'rollback' and "
193                "'eraseflashinfo'")
194
195        inactive_partition = self.get_inactive_version_info()[0]
196        # Set the board id if both the board id and flags have been given.
197        set_bid = chip_bid and chip_flags
198
199        # Erase the infomap
200        if eraseflashinfo or set_bid:
201            self.send_command('eraseflashinfo')
202
203        # Update the board id after it has been erased
204        if set_bid:
205            self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
206
207        self.send_command('rollback')
208
209        # If we aren't using ccd, we should be able to detect the reboot
210        # almost immediately
211        self.wait_for_reboot(self.REBOOT_DELAY_WITH_CCD if self.using_ccd() else
212                self.REBOOT_DELAY_WITH_FLEX)
213
214        running_partition = self.get_active_version_info()[0]
215        if inactive_partition != running_partition:
216            raise error.TestError("Failed to rollback to inactive image")
217
218
219    def rolledback(self):
220        """Returns true if cr50 just rolled back"""
221        return int(self._servo.get('cr50_reset_count')) > self.MAX_RETRY_COUNT
222
223
224    def get_version_info(self, regexp):
225        """Get information from the version command"""
226        return self.send_command_get_output('ver', [regexp])[0][1::]
227
228
229    def get_inactive_version_info(self):
230        """Get the active partition, version, and hash"""
231        return self.get_version_info(self.INACTIVE_VERSION)
232
233
234    def get_active_version_info(self):
235        """Get the active partition, version, and hash"""
236        return self.get_version_info(self.ACTIVE_VERSION)
237
238
239    def get_active_board_id_str(self):
240        """Get the running image board id.
241
242        Returns:
243            The board id string or None if the image does not support board id
244            or the image is not board id locked.
245        """
246        # Getting the board id from the version console command is only
247        # supported in board id locked images .22 and above. Any image that is
248        # board id locked will have support for getting the image board id.
249        #
250        # If board id is not supported on the device, return None. This is
251        # still expected on all current non board id locked release images.
252        try:
253            version_info = self.get_version_info(self.ACTIVE_BID)
254        except error.TestFail, e:
255            logging.info(e.message)
256            logging.info('Cannot use the version to get the board id')
257            return None
258
259        if self.BID_ERROR in version_info[4]:
260            raise error.TestError(version_info)
261        bid = version_info[4].split()[1]
262        return bid if bid != cr50_utils.EMPTY_IMAGE_BID else None
263
264
265    def get_version(self):
266        """Get the RW version"""
267        return self.get_active_version_info()[1].strip()
268
269
270    def using_servo_v4(self):
271        """Returns true if the console is being served using servo v4"""
272        return 'servo_v4' in self._servo.get_servo_version()
273
274
275    def using_ccd(self):
276        """Returns true if the console is being served using CCD"""
277        return 'ccd_cr50' in self._servo.get_servo_version()
278
279
280    def ccd_is_enabled(self):
281        """Return True if ccd is enabled.
282
283        If the test is running through ccd, return the ccd_state value. If
284        a flex cable is being used, use the CCD_MODE_L gpio setting to determine
285        if Cr50 has ccd enabled.
286
287        Returns:
288            'off' or 'on' based on whether the cr50 console is working.
289        """
290        if self.using_ccd():
291            return self._servo.get('ccd_state') == 'on'
292        else:
293            result = self.send_command_get_output('gpioget',
294                    ['(0|1)..CCD_MODE_L'])
295            return not bool(int(result[0][1]))
296
297
298    @servo_v4_command
299    def wait_for_ccd_state(self, state, timeout, raise_error):
300        """Wait up to timeout seconds for CCD to be 'on' or 'off'
301        Args:
302            state: a string either 'on' or 'off'.
303            timeout: time in seconds to wait
304            raise_error: Raise TestFail if the value is state is not reached.
305
306        Raises
307            TestFail if ccd never reaches the specified state
308        """
309        wait_for_enable = state == 'on'
310        logging.info("Wait until ccd is '%s'", state)
311        value = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
312                                     timeout_sec=timeout)
313        if value != wait_for_enable:
314            error_msg = "timed out before detecting ccd '%s'" % state
315            if raise_error:
316                raise error.TestFail(error_msg)
317            logging.warning(error_msg)
318        logging.info("ccd is '%s'", state)
319
320
321    @servo_v4_command
322    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
323        """Wait for the cr50 console to stop working"""
324        self.wait_for_ccd_state('off', timeout, raise_error)
325
326
327    @servo_v4_command
328    def wait_for_ccd_enable(self, timeout=60, raise_error=False):
329        """Wait for the cr50 console to start working"""
330        self.wait_for_ccd_state('on', timeout, raise_error)
331
332
333    @servo_v4_command
334    def ccd_disable(self, raise_error=True):
335        """Change the values of the CC lines to disable CCD"""
336        logging.info("disable ccd")
337        self._servo.set_nocheck('servo_v4_dts_mode', 'off')
338        self.wait_for_ccd_disable(raise_error=raise_error)
339
340
341    @servo_v4_command
342    def ccd_enable(self, raise_error=False):
343        """Reenable CCD and reset servo interfaces"""
344        logging.info("reenable ccd")
345        self._servo.set_nocheck('servo_v4_dts_mode', 'on')
346        # If the test is actually running with ccd, reset usb and wait for
347        # communication to come up.
348        if self.using_ccd():
349            self._servo.set_nocheck('power_state', 'ccd_reset')
350        self.wait_for_ccd_enable(raise_error=raise_error)
351
352
353    def _level_change_req_pp(self, level):
354        """Returns True if setting the level will require physical presence"""
355        testlab_pp = level != 'testlab open' and 'testlab' in level
356        open_pp = level == 'open'
357        return testlab_pp or open_pp
358
359
360    def _state_to_bool(self, state):
361        """Converts the state string to True or False"""
362        # TODO(mruthven): compare to 'on' once servo is up to date in the lab
363        return state.lower() in self.ON_STRINGS
364
365
366    def testlab_is_on(self):
367        """Returns True of testlab mode is on"""
368        return self._state_to_bool(self._servo.get('cr50_testlab'))
369
370
371    def set_ccd_testlab(self, state):
372        """Set the testlab mode
373
374        Args:
375            state: the desired testlab mode string: 'on' or 'off'
376
377        Raises:
378            TestFail if testlab mode was not changed
379        """
380        if self.using_ccd():
381            raise error.TestError('Cannot set testlab mode with CCD. Use flex '
382                    'cable instead.')
383
384        request_on = self._state_to_bool(state)
385        testlab_on = self.testlab_is_on()
386        request_str = 'on' if request_on else 'off'
387
388        if testlab_on == request_on:
389            logging.info('ccd testlab already set to %s', request_str)
390            return
391
392        original_level = self.get_ccd_level()
393
394        # We can only change the testlab mode when the device is open. If
395        # testlab mode is already enabled, we can go directly to open using 'ccd
396        # testlab open'. This will save 5 minutes, because we can skip the
397        # physical presence check.
398        if testlab_on:
399            self.send_command('ccd testlab open')
400        else:
401            self.set_ccd_level('open')
402
403        # Set testlab mode
404        rv = self.send_command_get_output('ccd testlab %s' % request_str,
405                ['.*>'])[0]
406        if 'Access Denied' in rv:
407            raise error.TestFail("'ccd %s' %s" % (request_str, rv))
408
409        # Press the power button once a second for 15 seconds.
410        self.run_pp(self.PP_SHORT)
411
412        self.set_ccd_level(original_level)
413
414        if request_on != self.testlab_is_on():
415            raise error.TestFail('Failed to set ccd testlab to %s' % state)
416
417
418    def get_ccd_level(self):
419        """Returns the current ccd privilege level"""
420        # TODO(mruthven): delete the part removing the trailing 'ed' once
421        # servo is up to date in the lab
422        return self._servo.get('cr50_ccd_level').lower().rstrip('ed')
423
424
425    def set_ccd_level(self, level):
426        """Set the Cr50 CCD privilege level.
427
428        Args:
429            level: a string of the ccd privilege level: 'open', 'lock', or
430                   'unlock'.
431
432        Raises:
433            TestFail if the level couldn't be set
434        ."""
435        # TODO(mruthven): add support for CCD password
436        level = level.lower()
437
438        if level == self.get_ccd_level():
439            logging.info('CCD privilege level is already %s', level)
440            return
441
442        if 'testlab' in level:
443            raise error.TestError("Can't change testlab mode using "
444                "ccd_set_level")
445
446        testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
447        req_pp = self._level_change_req_pp(level)
448        has_pp = not self.using_ccd()
449        dbg_en = 'DBG' in self._servo.get('cr50_version')
450
451        if req_pp and not has_pp:
452            raise error.TestError("Can't change privilege level to '%s' "
453                "without physical presence." % level)
454
455        if not testlab_on and not has_pp:
456            raise error.TestError("Wont change privilege level without "
457                "physical presence or testlab mode enabled")
458
459        # Start the unlock process.
460        rv = self.send_command_get_output('ccd %s' % level, ['.*>'])[0]
461        logging.info(rv)
462        if 'Access Denied' in rv:
463            raise error.TestFail("'ccd %s' %s" % (level, rv))
464
465        # Press the power button once a second, if we need physical presence.
466        if req_pp:
467            # DBG images have shorter unlock processes
468            self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG)
469
470        if level != self.get_ccd_level():
471            raise error.TestFail('Could not set privilege level to %s' % level)
472
473        logging.info('Successfully set CCD privelege level to %s', level)
474
475
476    def run_pp(self, unlock_timeout):
477        """Press the power button a for unlock_timeout seconds.
478
479        This will press the power button many more times than it needs to be
480        pressed. Cr50 doesn't care if you press it too often. It just cares that
481        you press the power button at least once within the detect interval.
482
483        For privilege level changes you need to press the power button 5 times
484        in the short interval and then 4 times within the long interval.
485        Short Interval
486        100msec < power button press < 5 seconds
487        Long Interval
488        60s < power button press < 300s
489
490        For testlab enable/disable you must press the power button 5 times
491        spaced between 100msec and 5 seconds apart.
492        """
493        end_time = time.time() + unlock_timeout
494
495        logging.info('Pressing power button for %ds to unlock the console.',
496                     unlock_timeout)
497        logging.info('The process should end at %s', time.ctime(end_time))
498
499        # Press the power button once a second to unlock the console.
500        while time.time() < end_time:
501            self._servo.power_short_press()
502            time.sleep(1)
503
504
505    def gettime(self):
506        """Get the current cr50 system time"""
507        result = self.send_command_get_output('gettime', [' = (.*) s'])
508        return float(result[0][1])
509
510
511    def servo_v4_supports_dts_mode(self):
512        """Returns True if cr50 registers changes in servo v4 dts mode."""
513        # This is to test that Cr50 actually recognizes the change in ccd state
514        # We cant do that with tests using ccd, because the cr50 communication
515        # goes down once ccd is enabled.
516        if 'servo_v4_with_servo_micro' != self._servo.get_servo_version():
517            return False
518
519        start_val = self._servo.get('servo_v4_dts_mode')
520        try:
521            # Verify both ccd enable and disable
522            self.ccd_disable(raise_error=True)
523            self.ccd_enable(raise_error=True)
524            rv = True
525        except Exception, e:
526            logging.info(e)
527            rv = False
528        self._servo.set_nocheck('servo_v4_dts_mode', start_val)
529        self.wait_for_ccd_state(start_val, 60, True)
530        logging.info('Test setup does%s support servo DTS mode',
531                '' if rv else 'n\'t')
532        return rv
533
534
535    def wait_until_update_is_allowed(self):
536        """Wait until cr50 will be able to accept an update.
537
538        Cr50 rejects any attempt to update if it has been less than 60 seconds
539        since it last recovered from deep sleep or came up from reboot. This
540        will wait until cr50 gettime shows a time greater than 60.
541        """
542        if self.get_active_version_info()[2]:
543            logging.info("Running DBG image. Don't need to wait for update.")
544            return
545        cr50_time = self.gettime()
546        if cr50_time < 60:
547            sleep_time = 61 - cr50_time
548            logging.info('Cr50 has been up for %ds waiting %ds before update',
549                         cr50_time, sleep_time)
550            time.sleep(sleep_time)
551