• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import functools
6import logging
7import pprint
8import re
9import time
10
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.common_lib.cros import cr50_utils
14from autotest_lib.server.cros.servo import chrome_ec
15
16
17def servo_v4_command(func):
18    """Decorator for methods only relevant to tests running with servo v4."""
19    @functools.wraps(func)
20    def wrapper(instance, *args, **kwargs):
21        """Ignore servo v4 functions it's not being used."""
22        if instance.using_servo_v4():
23            return func(instance, *args, **kwargs)
24        logging.info("not using servo v4. ignoring %s", func.func_name)
25    return wrapper
26
27
28class ChromeCr50(chrome_ec.ChromeConsole):
29    """Manages control of a Chrome Cr50.
30
31    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
32    provides many interfaces to set and get its behavior via console commands.
33    This class is to abstract these interfaces.
34    """
35    OPEN = 'open'
36    UNLOCK = 'unlock'
37    LOCK = 'lock'
38    # The amount of time you need to show physical presence.
39    PP_SHORT = 15
40    PP_LONG = 300
41    CCD_PASSWORD_RATE_LIMIT = 3
42    IDLE_COUNT = 'count: (\d+)\s'
43    SHORT_WAIT = 3
44    # The version has four groups: the partition, the header version, debug
45    # descriptor and then version string.
46    # There are two partitions A and B. The active partition is marked with a
47    # '*'. If it is a debug image '/DBG' is added to the version string. If the
48    # image has been corrupted, the version information will be replaced with
49    # 'Error'.
50    # So the output may look something like this.
51    #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
52    #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
53    # Or like this if the region was corrupted.
54    #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
55    #   RW_B:    Error
56    VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
57    INACTIVE_VERSION = VERSION_FORMAT % ''
58    ACTIVE_VERSION = VERSION_FORMAT % '\*'
59    # Following lines of the version output may print the image board id
60    # information. eg.
61    # BID A:   5a5a4146:ffffffff:00007f00 Yes
62    # BID B:   00000000:00000000:00000000 Yes
63    # Use the first group from ACTIVE_VERSION to match the active board id
64    # partition.
65    BID_ERROR = 'read_board_id: failed'
66    BID_FORMAT = ':\s+[a-f0-9:]+ '
67    ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
68            BID_ERROR)
69    WAKE_CHAR = '\n\n'
70    WAKE_RESPONSE = ['(>|Console is enabled)']
71    START_UNLOCK_TIMEOUT = 20
72    GETTIME = ['= (\S+)']
73    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
74    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
75    MAX_RETRY_COUNT = 5
76    START_STR = ['(.*Console is enabled;)']
77    REBOOT_DELAY_WITH_CCD = 60
78    REBOOT_DELAY_WITH_FLEX = 3
79    ON_STRINGS = ['enable', 'enabled', 'on']
80    CONSERVATIVE_CCD_WAIT = 10
81    CCD_SHORT_PRESSES = 5
82    CAP_IS_ACCESSIBLE = 0
83    CAP_SETTING = 1
84    CAP_REQ = 2
85    GET_CAP_TRIES = 3
86
87
88    def __init__(self, servo):
89        super(ChromeCr50, self).__init__(servo, 'cr50_uart')
90
91
92    def wake_cr50(self):
93        """Wake up cr50 by sending some linebreaks and wait for the response"""
94        logging.debug(super(ChromeCr50, self).send_command_get_output(
95                self.WAKE_CHAR, self.WAKE_RESPONSE))
96
97
98    def send_command(self, commands):
99        """Send command through UART.
100
101        Cr50 will drop characters input to the UART when it resumes from sleep.
102        If servo is not using ccd, send some dummy characters before sending the
103        real command to make sure cr50 is awake.
104        """
105        if not self.using_ccd():
106            self.wake_cr50()
107        super(ChromeCr50, self).send_command(commands)
108
109
110    def set_cap(self, cap, setting):
111        """Set the capability to setting"""
112        self.set_caps({ cap : setting })
113
114
115    def set_caps(self, cap_dict):
116        """Use cap_dict to set all the cap values
117
118        Set all of the capabilities in cap_dict to the correct config.
119
120        Args:
121            cap_dict: A dictionary with the capability as key and the desired
122                setting as values
123        """
124        for cap, config in cap_dict.iteritems():
125            self.send_command('ccd set %s %s' % (cap, config))
126        current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING)
127        for cap, config in cap_dict.iteritems():
128            if (current_cap_settings[cap].lower() !=
129                config.lower()):
130                raise error.TestFail('Failed to set %s to %s' % (cap, config))
131
132
133    def get_cap_overview(self, cap_dict):
134        """Returns a tuple (in factory mode, is reset)
135
136        If all capabilities are set to Default, ccd has been reset to default.
137        If all capabilities are set to Always, ccd is in factory mode.
138        """
139        in_factory_mode = True
140        is_reset = True
141        for cap, cap_info in cap_dict.iteritems():
142            cap_setting = cap_info[self.CAP_SETTING]
143            if cap_setting != 'Always':
144                in_factory_mode = False
145            if cap_setting != 'Default':
146                is_reset = False
147        return in_factory_mode, is_reset
148
149
150    def wp_is_reset(self):
151        """Returns True if wp is reset to follow batt pres at all times"""
152        follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state()
153        return follow_batt_pres and follow_batt_pres_atboot
154
155
156    def get_wp_state(self):
157        """Returns a tuple of the current write protect state.
158
159        The atboot setting cannot really be determined now if it is set to
160        follow battery presence. It is likely to remain the same after reboot,
161        but who knows. If the third element of the tuple is True, the last
162        element will not be that useful
163
164        Returns:
165            (True if current state is to follow batt presence,
166             True if write protect is enabled,
167             True if current state is to follow batt presence atboot,
168             True if write protect is enabled atboot)
169        """
170        rv = self.send_command_get_output('wp',
171                ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?'
172                 '(follow|enabled|disabled)'])[0]
173        _, forced, enabled, _, atboot = rv
174        logging.debug(rv)
175        return (not forced, enabled =='enabled',
176                atboot == 'follow', atboot == 'enabled')
177
178
179    def in_dev_mode(self):
180        """Return True if cr50 thinks the device is in dev mode"""
181        return 'dev_mode' in self.get_ccd_info()['TPM']
182
183
184    def get_ccd_info(self):
185        """Get the current ccd state.
186
187        Take the output of 'ccd' and convert it to a dictionary.
188
189        Returns:
190            A dictionary with the ccd state name as the key and setting as
191            value.
192        """
193        info = {}
194        original_timeout = float(self._servo.get('cr50_uart_timeout'))
195        # Change the console timeout to 10s, it may take longer than 3s to read
196        # ccd info
197        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
198        try:
199            rv = self.send_command_get_output('ccd', ["ccd.*>"])[0]
200        finally:
201            self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
202        for line in rv.splitlines():
203            # CCD information is separated with an :
204            #   State: Opened
205            # Extract the state name and the value.
206            line = line.strip()
207            if ':' not in line or '[' in line:
208                continue
209            key, value = line.split(':')
210            info[key.strip()] = value.strip()
211        logging.info('Current CCD settings:\n%s', pprint.pformat(info))
212        return info
213
214
215    def get_cap(self, cap):
216        """Returns the capabilitiy from the capability dictionary"""
217        return self.get_cap_dict()[cap]
218
219
220    def _get_ccd_cap_string(self):
221        """Return a string with the current capability settings.
222
223        The ccd information is pretty long. Servo micro sometimes drops
224        characters. Run the command a couple of times. Return the capapability
225        string that matches a previous run.
226
227        Raises:
228            TestError if the test could not retrieve consistent capability
229            information.
230        """
231        past_results = []
232        for i in range(self.GET_CAP_TRIES):
233            rv = self.send_safe_command_get_output('ccd',
234                    ["Capabilities:\s+[\da-f]+\s(.*)TPM:"])[0][1]
235            logging.debug(rv)
236            if rv in past_results:
237                return rv
238            past_results.append(rv)
239        logging.debug(past_results)
240        raise error.TestError('Could not get consistent capability information')
241
242
243    def get_cap_dict(self, info=None):
244        """Get the current ccd capability settings.
245
246        The capability may be using the 'Default' setting. That doesn't say much
247        about the ccd state required to use the capability. Return all ccd
248        information in the cap_dict
249        [is accessible, setting, requirement]
250
251        Args:
252            info: Only fill the cap_dict with the requested information:
253                  CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ
254
255        Returns:
256            A dictionary with the capability as the key a list of the current
257            settings as the value [is_accessible, setting, requirement]
258        """
259        caps = {}
260        cap_info_str = self._get_ccd_cap_string()
261        # There are two capability formats. Extract the setting and the
262        # requirement from both formats
263        #  UartGscRxECTx   Y 3=IfOpened
264        #  or
265        #  UartGscRxECTx   Y 0=Default (Always)
266        cap_settings = re.findall('(\S+) +(Y|-).*=(\w+)( \((\S+)\))?',
267                                  cap_info_str)
268        for cap, accessible, setting, _, required in cap_settings:
269            cap_info = [accessible == 'Y', setting, required]
270            # If there's only 1 value after =, then the setting is the
271            # requirement.
272            if not required:
273                cap_info[self.CAP_REQ] = setting
274            if info is not None:
275                caps[cap] = cap_info[info]
276            else:
277                caps[cap] = cap_info
278        logging.debug(pprint.pformat(caps))
279        return caps
280
281
282    def send_command_get_output(self, command, regexp_list):
283        """Send command through UART and wait for response.
284
285        Cr50 will drop characters input to the UART when it resumes from sleep.
286        If servo is not using ccd, send some dummy characters before sending the
287        real command to make sure cr50 is awake.
288        """
289        if not self.using_ccd():
290            self.wake_cr50()
291
292        # We have started prepending '\n' to separate cr50 console junk from
293        # the real command. If someone is just searching for .*>, then they will
294        # only get the output from the first '\n' we added. Raise an error to
295        # change the test to look for something more specific ex command.*>.
296        # cr50 will print the command in the output, so that is an easy way to
297        # modify '.*>' to match the real command output.
298        if '.*>' in regexp_list:
299            raise error.TestError('Send more specific regexp %r %r' % (command,
300                    regexp_list))
301
302        # prepend \n to separate the command from any junk that may have been
303        # sent to the cr50 uart.
304        command = '\n' + command
305        return super(ChromeCr50, self).send_command_get_output(command,
306                                                               regexp_list)
307
308
309    def send_safe_command_get_output(self, command, regexp_list):
310        """Restrict the console channels while sending console commands"""
311        self.send_command('chan save')
312        self.send_command('chan 0')
313        try:
314            rv = self.send_command_get_output(command, regexp_list)
315        finally:
316            self.send_command('chan restore')
317        return rv
318
319
320    def send_command_retry_get_output(self, command, regexp_list, tries=3):
321        """Retry sending a command if we can't find the output.
322
323        Cr50 may print irrelevant output while printing command output. It may
324        prevent the regex from matching. Send command and get the output. If it
325        fails try again.
326
327        If it fails every time, raise an error.
328
329        Don't use this to set something that should only be set once.
330        """
331        # TODO(b/80319784): once chan is unrestricted, use it to restrict what
332        # output cr50 prints while we are sending commands.
333        for i in range(tries):
334            try:
335                return self.send_command_get_output(command, regexp_list)
336            except error.TestFail, e:
337                logging.info('Failed to get %r output: %r', command, str(e))
338                if i == tries - 1:
339                    # Raise the last error, if we never successfully returned
340                    # the command output
341                    logging.info('Could not get %r output after %d tries',
342                                 command, tries)
343                    raise
344
345
346    def get_deep_sleep_count(self):
347        """Get the deep sleep count from the idle task"""
348        result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT])
349        return int(result[0][1])
350
351
352    def clear_deep_sleep_count(self):
353        """Clear the deep sleep count"""
354        result = self.send_command_retry_get_output('idle c', [self.IDLE_COUNT])
355        if int(result[0][1]):
356            raise error.TestFail("Could not clear deep sleep count")
357
358
359    def get_board_properties(self):
360        """Get information from the version command"""
361        rv = self.send_command_retry_get_output('brdprop',
362                ['properties = (\S+)\s'])
363        return int(rv[0][1], 16)
364
365
366    def has_command(self, cmd):
367        """Returns 1 if cr50 has the command 0 if it doesn't"""
368        try:
369            self.send_command_retry_get_output('help', [cmd])
370        except:
371            logging.info("Image does not include '%s' command", cmd)
372            return 0
373        return 1
374
375
376    def erase_nvmem(self):
377        """Use flasherase to erase both nvmem sections"""
378        if not self.has_command('flasherase'):
379            raise error.TestError("need image with 'flasherase'")
380
381        self.send_command('flasherase 0x7d000 0x3000')
382        self.send_command('flasherase 0x3d000 0x3000')
383
384
385    def reboot(self):
386        """Reboot Cr50 and wait for cr50 to reset"""
387        self.wait_for_reboot(cmd='reboot')
388
389
390    def _uart_wait_for_reboot(self, cmd='\n', timeout=60):
391        """Use uart to wait for cr50 to reboot.
392
393        If a command is given run it and wait for cr50 to reboot. Monitor
394        the cr50 uart to detect the reset. Wait up to timeout seconds
395        for the reset.
396
397        Args:
398            cmd: the command to run to reset cr50.
399            timeout: seconds to wait to detect the reboot.
400        """
401        original_timeout = float(self._servo.get('cr50_uart_timeout'))
402        # Change the console timeout to timeout, so we wait at least that long
403        # for cr50 to print the start string.
404        self._servo.set_nocheck('cr50_uart_timeout', timeout)
405        try:
406            self.send_command_get_output(cmd, self.START_STR)
407            logging.debug('Detected cr50 reboot')
408        except error.TestFail, e:
409            logging.debug('Failed to detect cr50 reboot')
410        # Reset the timeout.
411        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
412
413
414    def wait_for_reboot(self, cmd='\n', timeout=60):
415        """Wait for cr50 to reboot
416
417        Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if
418        necessary.
419
420        Args:
421            cmd: the command to run to reset cr50.
422            timeout: seconds to wait to detect the reboot.
423        """
424        if self.using_ccd():
425            self.send_command(cmd)
426            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
427            # go down to detect the reboot.
428            self.wait_for_ccd_disable(timeout, raise_error=False)
429            self.ccd_enable()
430        else:
431            self._uart_wait_for_reboot(cmd, timeout)
432
433
434    def rollback(self, eraseflashinfo=True, chip_bid=None, chip_flags=None):
435        """Set the reset counter high enough to force a rollback then reboot
436
437        Set the new board id before rolling back if one is given.
438
439        Args:
440            eraseflashinfo: True if eraseflashinfo should be run before rollback
441            chip_bid: the integer representation of chip board id or None if the
442                      board id should be erased during rollback
443            chip_flags: the integer representation of chip board id flags or
444                        None if the board id should be erased during rollback
445        """
446        if (not self.has_command('rollback') or not
447            self.has_command('eraseflashinfo')):
448            raise error.TestError("need image with 'rollback' and "
449                "'eraseflashinfo'")
450
451        inactive_partition = self.get_inactive_version_info()[0]
452        # Set the board id if both the board id and flags have been given.
453        set_bid = chip_bid and chip_flags
454
455        # Erase the infomap
456        if eraseflashinfo or set_bid:
457            self.send_command('eraseflashinfo')
458
459        # Update the board id after it has been erased
460        if set_bid:
461            self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
462
463        self.wait_for_reboot(cmd='rollback')
464
465        running_partition = self.get_active_version_info()[0]
466        if inactive_partition != running_partition:
467            raise error.TestError("Failed to rollback to inactive image")
468
469
470    def rolledback(self):
471        """Returns true if cr50 just rolled back"""
472        return 'Rollback detected' in self.send_safe_command_get_output(
473                'sysinfo', ['sysinfo.*>'])[0]
474
475
476    def get_version_info(self, regexp):
477        """Get information from the version command"""
478        return self.send_command_retry_get_output('ver', [regexp])[0][1::]
479
480
481    def get_inactive_version_info(self):
482        """Get the active partition, version, and hash"""
483        return self.get_version_info(self.INACTIVE_VERSION)
484
485
486    def get_active_version_info(self):
487        """Get the active partition, version, and hash"""
488        return self.get_version_info(self.ACTIVE_VERSION)
489
490
491    def using_prod_rw_keys(self):
492        """Returns True if the RW keyid is prod"""
493        rv = self.send_safe_command_retry_get_output('sysinfo',
494                ['RW keyid:.*\(([a-z]+)\)'])
495        logging.info(rv)
496        return rv[0][1] == 'prod'
497
498
499    def get_active_board_id_str(self):
500        """Get the running image board id.
501
502        Returns:
503            The board id string or None if the image does not support board id
504            or the image is not board id locked.
505        """
506        # Getting the board id from the version console command is only
507        # supported in board id locked images .22 and above. Any image that is
508        # board id locked will have support for getting the image board id.
509        #
510        # If board id is not supported on the device, return None. This is
511        # still expected on all current non board id locked release images.
512        try:
513            version_info = self.get_version_info(self.ACTIVE_BID)
514        except error.TestFail, e:
515            logging.info(str(e))
516            logging.info('Cannot use the version to get the board id')
517            return None
518
519        if self.BID_ERROR in version_info[4]:
520            raise error.TestError(version_info)
521        bid = version_info[4].split()[1]
522        return cr50_utils.GetBoardIdInfoString(bid)
523
524
525    def get_version(self):
526        """Get the RW version"""
527        return self.get_active_version_info()[1].strip()
528
529
530    def using_servo_v4(self):
531        """Returns true if the console is being served using servo v4"""
532        return 'servo_v4' in self._servo.get_servo_version()
533
534
535    def using_ccd(self):
536        """Returns true if the console is being served using CCD"""
537        return 'ccd_cr50' in self._servo.get_servo_version()
538
539
540    def ccd_is_enabled(self):
541        """Return True if ccd is enabled.
542
543        If the test is running through ccd, return the ccd_state value. If
544        a flex cable is being used, use the CCD_MODE_L gpio setting to determine
545        if Cr50 has ccd enabled.
546
547        Returns:
548            'off' or 'on' based on whether the cr50 console is working.
549        """
550        if self.using_ccd():
551            return self._servo.get('ccd_state') == 'on'
552        else:
553            result = self.send_command_retry_get_output('gpioget',
554                    ['(0|1)[ \S]*CCD_MODE_L'])
555            return not bool(int(result[0][1]))
556
557
558    @servo_v4_command
559    def wait_for_stable_ccd_state(self, state, timeout, raise_error):
560        """Wait up to timeout seconds for CCD to be 'on' or 'off'
561
562        Verify ccd is off or on and remains in that state for 3 seconds.
563
564        Args:
565            state: a string either 'on' or 'off'.
566            timeout: time in seconds to wait
567            raise_error: Raise TestFail if the value is state is not reached.
568
569        Raises:
570            TestFail if ccd never reaches the specified state
571        """
572        wait_for_enable = state == 'on'
573        logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off')
574        enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
575                                       timeout_sec=timeout)
576        if enabled != wait_for_enable:
577            error_msg = ("timed out before detecting ccd '%s'" %
578                         ('on' if wait_for_enable else 'off'))
579            if raise_error:
580                raise error.TestFail(error_msg)
581            logging.warning(error_msg)
582        else:
583            # Make sure the state doesn't change.
584            enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled,
585                                           timeout_sec=self.SHORT_WAIT)
586            if enabled != wait_for_enable:
587                error_msg = ("CCD switched %r after briefly being %r" %
588                             ('on' if enabled else 'off', state))
589                if raise_error:
590                    raise error.TestFail(error_msg)
591                logging.info(error_msg)
592        logging.info("ccd is %r", 'on' if enabled else 'off')
593
594
595    @servo_v4_command
596    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
597        """Wait for the cr50 console to stop working"""
598        self.wait_for_stable_ccd_state('off', timeout, raise_error)
599
600
601    @servo_v4_command
602    def wait_for_ccd_enable(self, timeout=60, raise_error=False):
603        """Wait for the cr50 console to start working"""
604        self.wait_for_stable_ccd_state('on', timeout, raise_error)
605
606
607    @servo_v4_command
608    def ccd_disable(self, raise_error=True):
609        """Change the values of the CC lines to disable CCD"""
610        logging.info("disable ccd")
611        self._servo.set_nocheck('servo_v4_dts_mode', 'off')
612        self.wait_for_ccd_disable(raise_error=raise_error)
613
614
615    @servo_v4_command
616    def ccd_enable(self, raise_error=False):
617        """Reenable CCD and reset servo interfaces"""
618        logging.info("reenable ccd")
619        self._servo.set_nocheck('servo_v4_dts_mode', 'on')
620        # If the test is actually running with ccd, wait for USB communication
621        # to come up after reset.
622        if self.using_ccd():
623            time.sleep(self._servo.USB_DETECTION_DELAY)
624        self.wait_for_ccd_enable(raise_error=raise_error)
625
626
627    def _level_change_req_pp(self, level):
628        """Returns True if setting the level will require physical presence"""
629        testlab_pp = level != 'testlab open' and 'testlab' in level
630        # If the level is open and the ccd capabilities say physical presence
631        # is required, then physical presence will be required.
632        open_pp = (level == 'open' and
633                   not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE])
634        return testlab_pp or open_pp
635
636
637    def _state_to_bool(self, state):
638        """Converts the state string to True or False"""
639        # TODO(mruthven): compare to 'on' once servo is up to date in the lab
640        return state.lower() in self.ON_STRINGS
641
642
643    def testlab_is_on(self):
644        """Returns True of testlab mode is on"""
645        return self._state_to_bool(self._servo.get('cr50_testlab'))
646
647
648    def set_ccd_testlab(self, state):
649        """Set the testlab mode
650
651        Args:
652            state: the desired testlab mode string: 'on' or 'off'
653
654        Raises:
655            TestFail if testlab mode was not changed
656        """
657        if self.using_ccd():
658            raise error.TestError('Cannot set testlab mode with CCD. Use flex '
659                    'cable instead.')
660
661        request_on = self._state_to_bool(state)
662        testlab_on = self.testlab_is_on()
663        request_str = 'on' if request_on else 'off'
664
665        if testlab_on == request_on:
666            logging.info('ccd testlab already set to %s', request_str)
667            return
668
669        original_level = self.get_ccd_level()
670
671        # We can only change the testlab mode when the device is open. If
672        # testlab mode is already enabled, we can go directly to open using 'ccd
673        # testlab open'. This will save 5 minutes, because we can skip the
674        # physical presence check.
675        if testlab_on:
676            self.send_command('ccd testlab open')
677        else:
678            self.set_ccd_level('open')
679
680        # Set testlab mode
681        rv = self.send_command_get_output('ccd testlab %s' % request_str,
682                ['ccd.*>'])[0]
683        if 'Access Denied' in rv:
684            raise error.TestFail("'ccd %s' %s" % (request_str, rv))
685
686        # Press the power button once a second for 15 seconds.
687        self.run_pp(self.PP_SHORT)
688
689        self.set_ccd_level(original_level)
690
691        if request_on != self.testlab_is_on():
692            raise error.TestFail('Failed to set ccd testlab to %s' % state)
693
694
695    def get_ccd_level(self):
696        """Returns the current ccd privilege level"""
697        return self._servo.get('cr50_ccd_level').lower()
698
699
700    def set_ccd_level(self, level, password=''):
701        """Set the Cr50 CCD privilege level.
702
703        Args:
704            level: a string of the ccd privilege level: 'open', 'lock', or
705                   'unlock'.
706            password: send the ccd command with password. This will still
707                    require the same physical presence.
708
709        Raises:
710            TestFail if the level couldn't be set
711        ."""
712        # TODO(mruthven): add support for CCD password
713        level = level.lower()
714
715        if level == self.get_ccd_level():
716            logging.info('CCD privilege level is already %s', level)
717            return
718
719        if 'testlab' in level:
720            raise error.TestError("Can't change testlab mode using "
721                "ccd_set_level")
722
723        testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
724        req_pp = self._level_change_req_pp(level)
725        has_pp = not self.using_ccd()
726        dbg_en = 'DBG' in self._servo.get('cr50_version')
727
728        if req_pp and not has_pp:
729            raise error.TestError("Can't change privilege level to '%s' "
730                "without physical presence." % level)
731
732        if not testlab_on and not has_pp:
733            raise error.TestError("Wont change privilege level without "
734                "physical presence or testlab mode enabled")
735
736        original_timeout = float(self._servo.get('cr50_uart_timeout'))
737        # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may
738        # take more than 3 seconds.
739        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
740        # Start the unlock process.
741
742        if level == 'open' or level == 'unlock':
743            logging.info('waiting %d seconds, the minimum time between'
744                         ' ccd password attempts',
745                         self.CCD_PASSWORD_RATE_LIMIT)
746            time.sleep(self.CCD_PASSWORD_RATE_LIMIT)
747
748        try:
749            cmd = 'ccd %s%s' % (level, (' ' + password) if password else '')
750            rv = self.send_command_get_output(cmd, [cmd + '(.*)>'])[0][1]
751        finally:
752            self._servo.set('cr50_uart_timeout', original_timeout)
753        logging.info(rv)
754        if 'ccd_open denied: fwmp' in rv:
755            raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv))
756        if 'Access Denied' in rv:
757            raise error.TestFail("%r %s" % (cmd, rv))
758        if 'Busy' in rv:
759            raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv))
760
761        # Press the power button once a second, if we need physical presence.
762        if req_pp:
763            # DBG images have shorter unlock processes
764            self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG)
765
766        if level != self.get_ccd_level():
767            raise error.TestFail('Could not set privilege level to %s' % level)
768
769        logging.info('Successfully set CCD privelege level to %s', level)
770
771
772    def run_pp(self, unlock_timeout):
773        """Press the power button a for unlock_timeout seconds.
774
775        This will press the power button many more times than it needs to be
776        pressed. Cr50 doesn't care if you press it too often. It just cares that
777        you press the power button at least once within the detect interval.
778
779        For privilege level changes you need to press the power button 5 times
780        in the short interval and then 4 times within the long interval.
781        Short Interval
782        100msec < power button press < 5 seconds
783        Long Interval
784        60s < power button press < 300s
785
786        For testlab enable/disable you must press the power button 5 times
787        spaced between 100msec and 5 seconds apart.
788        """
789        end_time = time.time() + unlock_timeout
790
791        logging.info('Pressing power button for %ds to unlock the console.',
792                     unlock_timeout)
793        logging.info('The process should end at %s', time.ctime(end_time))
794
795        # Press the power button once a second to unlock the console.
796        while time.time() < end_time:
797            self._servo.power_short_press()
798            time.sleep(1)
799
800
801    def gettime(self):
802        """Get the current cr50 system time"""
803        result = self.send_command_retry_get_output('gettime', [' = (.*) s'])
804        return float(result[0][1])
805
806
807    def servo_v4_supports_dts_mode(self):
808        """Returns True if cr50 registers changes in servo v4 dts mode."""
809        # This is to test that Cr50 actually recognizes the change in ccd state
810        # We cant do that with tests using ccd, because the cr50 communication
811        # goes down once ccd is enabled.
812        if 'servo_v4_with_servo_micro' != self._servo.get_servo_version():
813            return False
814
815        ccd_start = 'on' if self.ccd_is_enabled() else 'off'
816        dts_start = self._servo.get('servo_v4_dts_mode')
817        try:
818            # Verify both ccd enable and disable
819            self.ccd_disable(raise_error=True)
820            self.ccd_enable(raise_error=True)
821            rv = True
822        except Exception, e:
823            logging.info(e)
824            rv = False
825        self._servo.set_nocheck('servo_v4_dts_mode', dts_start)
826        self.wait_for_stable_ccd_state(ccd_start, 60, True)
827        logging.info('Test setup does%s support servo DTS mode',
828                '' if rv else 'n\'t')
829        return rv
830
831
832    def wait_until_update_is_allowed(self):
833        """Wait until cr50 will be able to accept an update.
834
835        Cr50 rejects any attempt to update if it has been less than 60 seconds
836        since it last recovered from deep sleep or came up from reboot. This
837        will wait until cr50 gettime shows a time greater than 60.
838        """
839        if self.get_active_version_info()[2]:
840            logging.info("Running DBG image. Don't need to wait for update.")
841            return
842        cr50_time = self.gettime()
843        if cr50_time < 60:
844            sleep_time = 61 - cr50_time
845            logging.info('Cr50 has been up for %ds waiting %ds before update',
846                         cr50_time, sleep_time)
847            time.sleep(sleep_time)
848
849    def tpm_is_enabled(self):
850        """Query the current TPM mode.
851
852        Returns  True if TPM is enabled,
853                 False otherwise.
854        """
855        result = self.send_command_get_output('sysinfo',
856                ['(?i)TPM\s+MODE:\s+(enabled|disabled)'])[0][1]
857        logging.debug(result)
858
859        return result.lower() == 'enabled'
860
861    def keyladder_is_enabled(self):
862        """Get the status of H1 Key Ladder.
863
864        Returns True if H1 Key Ladder is enabled.
865                False otherwise.
866        """
867        result = self.send_command_get_output('sysinfo',
868                ['(?i)Key\s+Ladder:\s+(enabled|disabled)'])[0][1]
869        logging.debug(result)
870
871        return result.lower() == 'enabled'
872