• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright 2017 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import functools
11import logging
12import pprint
13import re
14import six
15from six.moves import range
16import time
17
18from autotest_lib.client.bin import utils
19from autotest_lib.client.common_lib import error
20from autotest_lib.client.common_lib.cros import cr50_utils
21from autotest_lib.server.cros.servo import chrome_ec
22from autotest_lib.server.cros.servo import servo
23
24
25def dts_control_command(func):
26    """For methods that should only run when dts mode control is supported."""
27    @functools.wraps(func)
28    def wrapper(instance, *args, **kwargs):
29        """Ignore those functions if dts mode control is not supported."""
30        if instance._servo.dts_mode_is_valid():
31            return func(instance, *args, **kwargs)
32        logging.info('Servo setup does not support DTS mode. ignoring %s',
33                     func.__name__)
34    return wrapper
35
36
37class ChromeCr50(chrome_ec.ChromeConsole):
38    """Manages control of a Chrome Cr50.
39
40    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
41    provides many interfaces to set and get its behavior via console commands.
42    This class is to abstract these interfaces.
43    """
44    PROD_RW_KEYIDS = ['0x87b73b67', '0xde88588d']
45    PROD_RO_KEYIDS = ['0xaa66150f']
46    OPEN = 'open'
47    UNLOCK = 'unlock'
48    LOCK = 'lock'
49    # The amount of time you need to show physical presence.
50    PP_SHORT = 15
51    PP_LONG = 300
52    CCD_PASSWORD_RATE_LIMIT = 3
53    IDLE_COUNT = 'count: (\d+)\s'
54    SHORT_WAIT = 3
55    # The version has four groups: the partition, the header version, debug
56    # descriptor and then version string.
57    # There are two partitions A and B. The active partition is marked with a
58    # '*'. If it is a debug image '/DBG' is added to the version string. If the
59    # image has been corrupted, the version information will be replaced with
60    # 'Error'.
61    # So the output may look something like this.
62    #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
63    #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
64    # Or like this if the region was corrupted.
65    #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
66    #   RW_B:    Error
67    VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
68    INACTIVE_VERSION = VERSION_FORMAT % ''
69    ACTIVE_VERSION = VERSION_FORMAT % '\*'
70    # Following lines of the version output may print the image board id
71    # information. eg.
72    # BID A:   5a5a4146:ffffffff:00007f00 Yes
73    # BID B:   00000000:00000000:00000000 Yes
74    # Use the first group from ACTIVE_VERSION to match the active board id
75    # partition.
76    BID_ERROR = 'read_board_id: failed'
77    BID_FORMAT = ':\s+[a-f0-9:]{26} '
78    ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
79            BID_ERROR)
80    WAKE_CHAR = '\n\n\n\n'
81    WAKE_RESPONSE = ['(>|Console is enabled)']
82    START_UNLOCK_TIMEOUT = 20
83    GETTIME = ['= (\S+)']
84    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
85    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
86    MAX_RETRY_COUNT = 10
87    CCDSTATE_MAX_RETRY_COUNT = 20
88    START_STR = ['((Havn|UART).*Console is enabled;)']
89    REBOOT_DELAY_WITH_CCD = 60
90    REBOOT_DELAY_WITH_FLEX = 3
91    ON_STRINGS = ['enable', 'enabled', 'on']
92    CONSERVATIVE_CCD_WAIT = 10
93    CCD_SHORT_PRESSES = 5
94    CAP_IS_ACCESSIBLE = 0
95    CAP_SETTING = 1
96    CAP_REQ = 2
97    GET_CAP_TRIES = 20
98    CAP_ALWAYS = 'Always'
99    # Regex to match the valid capability settings.
100    CAP_STATES = '(%s|Default|IfOpened|UnlessLocked)' % CAP_ALWAYS
101    # List of all cr50 ccd capabilities. Same order of 'ccd' output
102    CAP_NAMES = [
103            'UartGscRxAPTx', 'UartGscTxAPRx', 'UartGscRxECTx', 'UartGscTxECRx',
104            'FlashAP', 'FlashEC', 'OverrideWP', 'RebootECAP', 'GscFullConsole',
105            'UnlockNoReboot', 'UnlockNoShortPP', 'OpenNoTPMWipe',
106            'OpenNoLongPP', 'BatteryBypassPP', '(UpdateNoTPMWipe|Unused)',
107            'I2C', 'FlashRead', 'OpenNoDevMode', 'OpenFromUSB', 'OverrideBatt'
108    ]
109    # There are two capability formats. Match both.
110    #  UartGscRxECTx   Y 3=IfOpened
111    #  or
112    #  UartGscRxECTx   Y 0=Default (Always)
113    # Make sure the last word is at the end of the line. The next line will
114    # start with some whitespace, so account for that too.
115    CAP_FORMAT = '\s+(Y|-) \d\=%s( \(%s\))?[\r\n]+\s*' % (CAP_STATES,
116                                                          CAP_STATES)
117    # Be as specific as possible with the 'ccd' output, so the test will notice
118    # missing characters and retry getting the output. Name each group, so the
119    # test can extract the field information into a dictionary.
120    # CCD_FIELDS is used to order the regex when searching for multiple fields
121    CCD_FIELDS = ['State', 'Password', 'Flags', 'Capabilities', 'TPM']
122    # CCD_FORMAT has the field names as keys and the expected output as the
123    # value.
124    CCD_FORMAT = {
125        'State' : '(State: (?P<State>Opened|Locked|Unlocked))',
126        'Password' : '(Password: (?P<Password>set|none))',
127        'Flags' : '(Flags: (?P<Flags>\S*))',
128        'Capabilities' : '(Capabilities:.*(?P<Capabilities>%s))' %
129                         (CAP_FORMAT.join(CAP_NAMES) + CAP_FORMAT),
130        'TPM' : '(TPM:(?P<TPM>[ \S]*)\r)',
131    }
132
133    # CR50 Board Properties as defined in platform/ec/board/cr50/scratch-reg1.h
134    BOARD_PROP = {
135            'BOARD_PERIPH_CONFIG_SPI': (1 << 0, None),
136            'BOARD_PERIPH_CONFIG_SPI': (1 << 0, None),
137            'BOARD_PERIPH_CONFIG_I2C': (1 << 1, None),
138            'BOARD_PERIPH_CONFIG_I2C': (1 << 1, None),
139            'BOARD_NEEDS_SYS_RST_PULL_UP': (1 << 5, None),
140            'BOARD_USE_PLT_RESET': (1 << 6, None),
141            'BOARD_WP_ASSERTED': (1 << 8, None),
142            'BOARD_FORCING_WP': (1 << 9, None),
143            'BOARD_NO_RO_UART': (1 << 10, None),
144            'BOARD_CCD_UNLOCKED': (1 << 11, 3 << 11),
145            'BOARD_CCD_OPENED': (2 << 11, 3 << 11),
146            'BOARD_DEEP_SLEEP_DISABLED': (1 << 13, None),
147            'BOARD_DETECT_AP_WITH_UART': (1 << 14, None),
148            'BOARD_ITE_EC_SYNC_NEEDED': (1 << 15, None),
149            'BOARD_WP_DISABLE_DELAY': (1 << 16, None),
150            'BOARD_CLOSED_SOURCE_SET1': (1 << 17, None),
151            'BOARD_CLOSED_LOOP_RESET': (1 << 18, None),
152            'BOARD_NO_INA_SUPPORT': (1 << 19, None),
153            'BOARD_ALLOW_CHANGE_TPM_MODE': (1 << 20, None),
154            'BOARD_EC_CR50_COMM_SUPPORT': (1 << 21, None),
155            'BOARD_CCD_REC_LID_PIN_DIOA1': (1 << 22, 3 << 22),
156            'BOARD_CCD_REC_LID_PIN_DIOA9': (2 << 22, 3 << 22),
157            'BOARD_CCD_REC_LID_PIN_DIOA12': (3 << 22, 3 << 22)
158    }
159
160    # CR50 reset flags as defined in platform ec_commands.h. These are only the
161    # flags used by cr50.
162    RESET_FLAGS = {
163           'RESET_FLAG_OTHER'            : 1 << 0,
164           'RESET_FLAG_BROWNOUT'         : 1 << 2,
165           'RESET_FLAG_POWER_ON'         : 1 << 3,
166           'RESET_FLAG_SOFT'             : 1 << 5,
167           'RESET_FLAG_HIBERNATE'        : 1 << 6,
168           'RESET_FLAG_RTC_ALARM'        : 3 << 7,
169           'RESET_FLAG_WAKE_PIN'         : 1 << 8,
170           'RESET_FLAG_HARD'             : 1 << 11,
171           'RESET_FLAG_USB_RESUME'       : 1 << 14,
172           'RESET_FLAG_RDD'              : 1 << 15,
173           'RESET_FLAG_RBOX'             : 1 << 16,
174           'RESET_FLAG_SECURITY'         : 1 << 17,
175    }
176    FIPS_RE = r' ([^ ]*)approved.*allowed: (1|0)'
177    # CCD Capabilities used for c2d2 control drivers.
178    SERVO_DRV_CAPS = ['OverrideWP', 'GscFullConsole', 'RebootECAP']
179    # Cr50 may have flash operation errors during the test. Here's an example
180    # of one error message.
181    # do_flash_op:245 errors 20 fsh_pe_control 40720004
182    # The stuff after the ':' may change, but all flash operation errors
183    # contain do_flash_op. do_flash_op is only ever printed if there is an
184    # error during the flash operation. Just search for do_flash_op to simplify
185    # the search string and make it applicable to all flash op errors.
186    FLASH_OP_ERROR_MSG = 'do_flash_op'
187    # USB issues may show up with the timer sof calibration overflow interrupt.
188    # Count these during cleanup.
189    USB_ERROR = 'timer_sof_calibration_overflow_int'
190    # Message printed during watchdog reset.
191    WATCHDOG_RST = 'WATCHDOG PC'
192    # ===============================================================
193    # AP_RO strings
194    # Cr50 only supports v2
195    AP_RO_VERSIONS = [1]
196    AP_RO_HASH_RE = r'sha256 (hash) ([0-9a-f]{64})'
197    AP_RO_UNSUPPORTED_UNPROGRAMMED = 'RO verification not programmed'
198    AP_RO_UNSUPPORTED_BID_BLOCKED = 'BID blocked'
199    AP_RO_REASON_RE = r'(ap_ro_check_unsupported): (.*)\]'
200    AP_RO_RESULT_RE = r'(result)\s*: (\d)'
201    AP_RO_SUPPORTED_RE = r'(supported)\s*: (yes|no)'
202    AP_RO_UNSUPPORTED_OUTPUT = [
203            AP_RO_REASON_RE, AP_RO_RESULT_RE, AP_RO_SUPPORTED_RE
204    ]
205    AP_RO_SAVED_OUTPUT = [AP_RO_RESULT_RE, AP_RO_SUPPORTED_RE, AP_RO_HASH_RE]
206
207    # ===============================================================
208
209    def __init__(self, servo, faft_config):
210        """Initializes a ChromeCr50 object.
211
212        @param servo: A servo object.
213        @param faft_config: A faft config object.
214        """
215        super(ChromeCr50, self).__init__(servo, 'cr50_uart')
216        self.faft_config = faft_config
217
218    def wake_cr50(self):
219        """Wake up cr50 by sending some linebreaks and wait for the response"""
220        for i in range(self.MAX_RETRY_COUNT):
221            try:
222                rv = super(ChromeCr50, self).send_command_get_output(
223                        self.WAKE_CHAR, self.WAKE_RESPONSE)
224                logging.debug('wake result %r', rv)
225                return
226            except servo.ResponsiveConsoleError as e:
227                logging.info("Console responsive, but couldn't match wake "
228                             "response %s", e)
229        raise servo.ResponsiveConsoleError('Unable to wake cr50')
230
231
232    def send_command(self, commands):
233        """Send command through UART.
234
235        Cr50 will drop characters input to the UART when it resumes from sleep.
236        If servo is not using ccd, send some characters before sending the
237        real command to make sure cr50 is awake.
238
239        @param commands: the command string to send to cr50
240        """
241        if self._servo.main_device_is_flex():
242            self.wake_cr50()
243        super(ChromeCr50, self).send_command(commands)
244
245
246    def set_cap(self, cap, setting):
247        """Set the capability to setting
248
249        @param cap: The capability string
250        @param setting: The setting to set the capability to.
251        """
252        self.set_caps({ cap : setting })
253
254
255    def set_caps(self, cap_dict):
256        """Use cap_dict to set all the cap values
257
258        Set all of the capabilities in cap_dict to the correct config.
259
260        @param cap_dict: A dictionary with the capability as key and the desired
261                         setting as values
262        """
263        for cap, config in six.iteritems(cap_dict):
264            self.send_command('ccd set %s %s' % (cap, config))
265        current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING)
266        for cap, config in six.iteritems(cap_dict):
267            if (current_cap_settings[cap].lower() !=
268                config.lower()):
269                raise error.TestFail('Failed to set %s to %s' % (cap, config))
270
271
272    def get_cap_overview(self, cap_dict):
273        """Get a basic overview of the capability dictionary
274
275        If all capabilities are set to Default, ccd has been reset to default.
276        If all capabilities are set to Always, ccd is in factory mode.
277
278        @param cap_dict: A dictionary of the capability settings
279        @return: A tuple of the capability overview (in factory mode, is reset)
280        """
281        in_factory_mode = True
282        is_reset = True
283        for cap, cap_info in six.iteritems(cap_dict):
284            cap_setting = cap_info[self.CAP_SETTING]
285            if cap_setting != 'Always':
286                in_factory_mode = False
287            if cap_setting != 'Default':
288                is_reset = False
289        return in_factory_mode, is_reset
290
291
292    def password_is_reset(self):
293        """Returns True if the password is cleared"""
294        return self.get_ccd_info('Password') == 'none'
295
296
297    def ccd_is_reset(self):
298        """Returns True if the ccd is reset
299
300        The password must be cleared, write protect and battery presence must
301        follow battery presence, and all capabilities must be Always
302        """
303        return (self.password_is_reset() and self.wp_is_reset() and
304                self.batt_pres_is_reset() and
305                self.get_cap_overview(self.get_cap_dict())[1])
306
307
308    def wp_is_reset(self):
309        """Returns True if wp is reset to follow batt pres at all times"""
310        follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state()
311        return follow_batt_pres and follow_batt_pres_atboot
312
313
314    def get_wp_state(self):
315        """Get the current write protect and atboot state
316
317        The atboot setting cannot really be determined now if it is set to
318        follow battery presence. It is likely to remain the same after reboot,
319        but who knows. If the third element of the tuple is True, the last
320        element will not be that useful
321
322        @return: a tuple with the current write protect state
323                (True if current state is to follow batt presence,
324                 True if write protect is enabled,
325                 True if current state is to follow batt presence atboot,
326                 True if write protect is enabled atboot)
327        """
328        rv = self.send_command_retry_get_output('wp',
329                ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?'
330                 '(follow|enabled|disabled)'], safe=True)[0]
331        _, forced, enabled, _, atboot = rv
332        logging.debug(rv)
333        return (not forced, enabled =='enabled',
334                atboot == 'follow', atboot == 'enabled')
335
336
337    def in_dev_mode(self):
338        """Return True if cr50 thinks the device is in dev mode"""
339        return 'dev_mode' in self.get_ccd_info('TPM')
340
341
342    def get_ccd_info(self, field=None):
343        """Get the current ccd state.
344
345        Take the output of 'ccd' and convert it to a dictionary.
346
347        @param: the ccd info param to get or None to get the full ccd output
348                dictionary.
349        @return: the field value or a dictionary with the ccd field name as the
350                 key and the setting as the value.
351        """
352
353        if field:
354            match_value = self.CCD_FORMAT[field]
355        else:
356            values = [ self.CCD_FORMAT[field] for field in self.CCD_FIELDS ]
357            match_value = '.*'.join(values)
358        matched_output = None
359        original_timeout = float(self._servo.get('cr50_uart_timeout'))
360        # Change the console timeout to 10s, it may take longer than 3s to read
361        # ccd info
362        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
363        for i in range(self.GET_CAP_TRIES):
364            try:
365                # If some ccd output is dropped and the output doesn't match the
366                # expected ccd output format, send_command_get_output will wait the
367                # full CONSERVATIVE_CCD_WAIT even though ccd is done printing. Use
368                # re to search the command output instead of
369                # send_safe_command_get_output, so we don't have to wait the full
370                # timeout if output is dropped.
371                rv = self.send_command_retry_get_output('ccd', ['ccd.*>'],
372                                                        safe=True)[0]
373                matched_output = re.search(match_value, rv, re.DOTALL)
374                if matched_output:
375                    break
376                logging.info('try %d: could not match ccd output %s', i, rv)
377            except Exception as e:
378                logging.info('try %d got error %s', i, str(e))
379
380        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
381        if not matched_output:
382            raise error.TestFail('Could not get ccd output')
383        matched_dict = matched_output.groupdict()
384        logging.info('Current CCD settings:\n%s', pprint.pformat(matched_dict))
385        if field:
386            return matched_dict.get(field)
387        return matched_dict
388
389
390    def get_cap(self, cap):
391        """Returns the capabilitiy from the capability dictionary"""
392        return self.get_cap_dict()[cap]
393
394
395    def get_cap_dict(self, info=None):
396        """Get the current ccd capability settings.
397
398        The capability may be using the 'Default' setting. That doesn't say much
399        about the ccd state required to use the capability. Return all ccd
400        information in the cap_dict
401        [is accessible, setting, requirement]
402
403        @param info: Only fill the cap_dict with the requested information:
404                     CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ
405        @return: A dictionary with the capability as the key a list of the
406                 current settings as the value [is_accessible, setting,
407                 requirement]
408        """
409        # Add whitespace at the end, so we can still match the last line.
410        cap_info_str = self.get_ccd_info('Capabilities') + '\r\n'
411        cap_settings = re.findall('(\S+) ' + self.CAP_FORMAT,
412                                  cap_info_str)
413        caps = {}
414        for cap, accessible, setting, _, required in cap_settings:
415            # If there's only 1 value after =, then the setting is the
416            # requirement.
417            if not required:
418                required = setting
419            cap_info = [accessible == 'Y', setting, required]
420            if info is not None:
421                caps[cap] = cap_info[info]
422            else:
423                caps[cap] = cap_info
424        logging.debug(pprint.pformat(caps))
425        return caps
426
427
428    def send_command_get_output(self, command, regexp_list):
429        """Send command through UART and wait for response.
430
431        Cr50 will drop characters input to the UART when it resumes from sleep.
432        If servo is not using ccd, send some characters before sending the
433        real command to make sure cr50 is awake.
434
435        @param command: the command to send
436        @param regexp_list: The list of regular expressions to match in the
437                            command output
438        @return: A list of matched output
439        """
440        if self._servo.main_device_is_flex():
441            self.wake_cr50()
442
443        # We have started prepending '\n' to separate cr50 console junk from
444        # the real command. If someone is just searching for .*>, then they will
445        # only get the output from the first '\n' we added. Raise an error to
446        # change the test to look for something more specific ex command.*>.
447        # cr50 will print the command in the output, so that is an easy way to
448        # modify '.*>' to match the real command output.
449        if '.*>' in regexp_list:
450            raise error.TestError('Send more specific regexp %r %r' % (command,
451                    regexp_list))
452
453        # prepend \n to separate the command from any junk that may have been
454        # sent to the cr50 uart.
455        command = '\n' + command
456        return super(ChromeCr50, self).send_command_get_output(command,
457                                                               regexp_list)
458
459
460    def send_safe_command_get_output(self, command, regexp_list,
461            channel_mask=0x1):
462        """Restrict the console channels while sending console commands.
463
464        @param command: the command to send
465        @param regexp_list: The list of regular expressions to match in the
466                            command output
467        @param channel_mask: The mask to pass to 'chan' prior to running the
468                             command, indicating which channels should remain
469                             enabled (0x1 is command output)
470        @return: A list of matched output
471        """
472        self.send_command('chan save')
473        self.send_command('chan 0x%x' % channel_mask)
474        try:
475            rv = self.send_command_get_output(command, regexp_list)
476        finally:
477            self.send_command('chan restore')
478        return rv
479
480
481    def send_command_retry_get_output(self, command, regexp_list, safe=False,
482                                      compare_output=False, retries=MAX_RETRY_COUNT):
483        """Retry the command 5 times if you get a timeout or drop some output
484
485
486        @param command: the command string
487        @param regexp_list: the regex to search for
488        @param safe: use send_safe_command_get_output if True otherwise use
489                     send_command_get_output
490        @param compare_output: look for reproducible output
491        """
492        send_command = (self.send_safe_command_get_output if safe else
493                        self.send_command_get_output)
494        err = 'no consistent output' if compare_output else 'unknown'
495        past_rv = []
496        for i in range(retries):
497            try:
498                rv = send_command(command, regexp_list)
499                if not compare_output or rv in past_rv:
500                    return rv
501                if past_rv:
502                    logging.debug('%d %s not in %s', i, rv, past_rv)
503                past_rv.append(rv)
504            except Exception as e:
505                err = e
506                logging.info('attempt %d %r: %s %s', i, command, type(e),
507                             str(e))
508        if compare_output:
509            logging.info('No consistent output for %r %s', command,
510                         pprint.pformat(past_rv))
511        raise error.TestError('Issue sending %r command: %r' % (command, err))
512
513
514    def get_deep_sleep_count(self):
515        """Get the deep sleep count from the idle task"""
516        result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT],
517                                                    safe=True)
518        return int(result[0][1])
519
520
521    def clear_deep_sleep_count(self):
522        """Clear the deep sleep count"""
523        self.send_command('idle c')
524        if self.get_deep_sleep_count():
525            raise error.TestFail("Could not clear deep sleep count")
526
527
528    def get_board_properties(self):
529        """Get information from the version command"""
530        rv = self.send_command_retry_get_output('brdprop',
531                ['properties = (\S+)\s'], safe=True)
532        return int(rv[0][1], 16)
533
534
535    def uses_board_property(self, prop_name):
536        """Returns 1 if the given property is set, or 0 otherwise
537
538        @param prop_name: a property name in string type.
539        """
540        brdprop = self.get_board_properties()
541        (prop, mask) = self.BOARD_PROP[prop_name]
542        # Use the board property value for the mask if no mask is given.
543        mask = mask or prop
544        return (brdprop & mask) == prop
545
546
547    def has_command(self, cmd):
548        """Returns 1 if cr50 has the command 0 if it doesn't"""
549        try:
550            self.send_command_retry_get_output('help', [cmd],
551                                               safe=True,
552                                               retries=3)
553        except:
554            logging.info("Image does not include '%s' command", cmd)
555            return 0
556        return 1
557
558
559    def reboot(self):
560        """Reboot Cr50 and wait for cr50 to reset"""
561        self.wait_for_reboot(cmd='reboot', timeout=10)
562
563
564    def _uart_wait_for_reboot(self, cmd='\n', timeout=60):
565        """Use uart to wait for cr50 to reboot.
566
567        If a command is given run it and wait for cr50 to reboot. Monitor
568        the cr50 uart to detect the reset. Wait up to timeout seconds
569        for the reset.
570
571        @param cmd: the command to run to reset cr50.
572        @param timeout: seconds to wait to detect the reboot.
573        """
574        original_timeout = float(self._servo.get('cr50_uart_timeout'))
575        # Change the console timeout to timeout, so we wait at least that long
576        # for cr50 to print the start string.
577        self._servo.set_nocheck('cr50_uart_timeout', timeout)
578        try:
579            self.send_command_get_output(cmd, self.START_STR)
580            logging.debug('Detected cr50 reboot')
581        except error.TestFail as e:
582            logging.debug('Failed to detect cr50 reboot')
583        # Reset the timeout.
584        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
585
586
587    def wait_for_reboot(self, cmd='\n', timeout=60):
588        """Wait for cr50 to reboot
589
590        Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if
591        necessary.
592
593        @param cmd: the command to run to reset cr50.
594        @param timeout: seconds to wait to detect the reboot.
595        """
596        logging.info('Wait up to %s seconds for reboot (%s)', timeout,
597                     cmd.strip())
598        if self._servo.main_device_is_ccd():
599            self.send_command(cmd)
600            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
601            # go down to detect the reboot.
602            self.wait_for_ccd_disable(timeout, raise_error=False)
603            self.ccd_enable()
604        else:
605            self._uart_wait_for_reboot(cmd, timeout)
606
607        # On most devices, a Cr50 reset will cause an AP reset. Force this to
608        # happen on devices where the AP is left down.
609        if not self.faft_config.ap_up_after_cr50_reboot:
610            # Reset the DUT a few seconds after cr50 reboot.
611            time.sleep(self.SHORT_WAIT)
612            logging.info('Resetting DUT after Cr50 reset')
613            self._servo.get_power_state_controller().reset()
614
615
616    def set_board_id(self, chip_bid, chip_flags):
617        """Set the chip board id type and flags."""
618        self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
619
620
621    def get_board_id(self):
622        """Get the chip board id type and flags.
623
624        bid_type_inv will be '' if the bid output doesn't show it. If no board
625        id type inv is shown, then board id is erased will just check the type
626        and flags.
627
628        @returns a tuple (A string of bid_type:bid_type_inv:bid_flags,
629                          True if board id is erased)
630        """
631        bid = self.send_command_retry_get_output('bid',
632                    ['Board ID: (\S{8}):?(|\S{8}), flags (\S{8})\s'],
633                    safe=True)[0][1:]
634        bid_str = ':'.join(bid)
635        bid_is_erased =  set(bid).issubset({'', 'ffffffff'})
636        logging.info('chip board id: %s', bid_str)
637        logging.info('chip board id is erased: %s',
638                     'yes' if bid_is_erased else 'no')
639        return bid_str, bid_is_erased
640
641
642    def eraseflashinfo(self, retries=10):
643        """Run eraseflashinfo.
644
645        @returns True if the board id is erased
646        """
647        for i in range(retries):
648            # The console could drop characters while matching 'eraseflashinfo'.
649            # Retry if the command times out. It's ok to run eraseflashinfo
650            # multiple times.
651            rv = self.send_command_retry_get_output(
652                    'eraseflashinfo', ['eraseflashinfo(.*)>'])[0][1].strip()
653            logging.info('eraseflashinfo output: %r', rv)
654            bid_erased = self.get_board_id()[1]
655            eraseflashinfo_issue = 'Busy' in rv or 'do_flash_op' in rv
656            if not eraseflashinfo_issue and bid_erased:
657                break
658            logging.info('Retrying eraseflashinfo')
659        return bid_erased
660
661
662    def rollback(self):
663        """Set the reset counter high enough to force a rollback and reboot."""
664        if not self.has_command('rollback'):
665            raise error.TestError("need image with 'rollback'")
666
667        inactive_partition = self.get_inactive_version_info()[0]
668
669        self.wait_for_reboot(cmd='rollback', timeout=10)
670
671        running_partition = self.get_active_version_info()[0]
672        if inactive_partition != running_partition:
673            raise error.TestError("Failed to rollback to inactive image")
674
675
676    def rolledback(self):
677        """Returns true if cr50 just rolled back"""
678        return 'Rollback detected' in self.send_command_retry_get_output(
679                'sysinfo', ['sysinfo.*>'], safe=True)[0]
680
681
682    def get_version_info(self, regexp):
683        """Get information from the version command"""
684        return self.send_command_retry_get_output('version', [regexp],
685                                                  safe=True,
686                                                  compare_output=True)[0][1::]
687
688
689    def get_inactive_version_info(self):
690        """Get the active partition, version, and hash"""
691        return self.get_version_info(self.INACTIVE_VERSION)
692
693
694    def get_active_version_info(self):
695        """Get the active partition, version, and hash"""
696        return self.get_version_info(self.ACTIVE_VERSION)
697
698
699    def using_prod_rw_keys(self):
700        """Returns True if the RW keyid is prod"""
701        rv = self.send_command_retry_get_output('sysinfo',
702                ['RW keyid:\s+(0x[0-9a-f]{8})'], safe=True)[0][1]
703        logging.info('RW Keyid: 0x%s', rv)
704        return rv in self.PROD_RW_KEYIDS
705
706
707    def get_active_board_id_str(self):
708        """Get the running image board id.
709
710        @return: The board id string or None if the image does not support board
711                 id or the image is not board id locked.
712        """
713        # Getting the board id from the version console command is only
714        # supported in board id locked images .22 and above. Any image that is
715        # board id locked will have support for getting the image board id.
716        #
717        # If board id is not supported on the device, return None. This is
718        # still expected on all current non board id locked release images.
719        try:
720            version_info = self.get_version_info(self.ACTIVE_BID)
721        except error.TestFail as e:
722            logging.info(str(e))
723            logging.info('Cannot use the version to get the board id')
724            return None
725
726        if self.BID_ERROR in version_info[4]:
727            raise error.TestError(version_info)
728        bid = version_info[4].split()[1]
729        return cr50_utils.GetBoardIdInfoString(bid)
730
731
732    def get_version(self):
733        """Get the RW version"""
734        return self.get_active_version_info()[1].strip()
735
736
737    def get_full_version(self):
738        """Get the complete RW version string."""
739        _, rw_ver, dbg, ver_str = self.get_active_version_info()
740        return  rw_ver + (dbg if dbg else '') + ver_str
741
742
743    def ccd_is_enabled(self):
744        """Return True if ccd is enabled.
745
746        If the test is running through ccd, return the ccd_state value. If
747        a flex cable is being used, use the CCD_MODE_L gpio setting to determine
748        if Cr50 has ccd enabled.
749
750        @return: 'off' or 'on' based on whether the cr50 console is working.
751        """
752        if self._servo.main_device_is_ccd():
753            return self._servo.get('ccd_state') == 'on'
754        else:
755            return not bool(self.gpioget('CCD_MODE_L'))
756
757
758    @dts_control_command
759    def wait_for_stable_ccd_state(self, state, timeout, raise_error):
760        """Wait up to timeout seconds for CCD to be 'on' or 'off'
761
762        Verify ccd is off or on and remains in that state for 3 seconds.
763
764        @param state: a string either 'on' or 'off'.
765        @param timeout: time in seconds to wait
766        @param raise_error: Raise TestFail if the value is state is not reached.
767        @raise TestFail: if ccd never reaches the specified state
768        """
769        wait_for_enable = state == 'on'
770        logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off')
771        enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
772                                       timeout_sec=timeout)
773        if enabled != wait_for_enable:
774            error_msg = ("timed out before detecting ccd '%s'" %
775                         ('on' if wait_for_enable else 'off'))
776            if raise_error:
777                raise error.TestFail(error_msg)
778            logging.warning(error_msg)
779        else:
780            # Make sure the state doesn't change.
781            enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled,
782                                           timeout_sec=self.SHORT_WAIT)
783            if enabled != wait_for_enable:
784                error_msg = ("CCD switched %r after briefly being %r" %
785                             ('on' if enabled else 'off', state))
786                if raise_error:
787                    raise error.TestFail(error_msg)
788                logging.info(error_msg)
789        logging.info("ccd is %r", 'on' if enabled else 'off')
790
791
792    @dts_control_command
793    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
794        """Wait for the cr50 console to stop working"""
795        self.wait_for_stable_ccd_state('off', timeout, raise_error)
796
797
798    @dts_control_command
799    def wait_for_ccd_enable(self, timeout=60, raise_error=False):
800        """Wait for the cr50 console to start working"""
801        self.wait_for_stable_ccd_state('on', timeout, raise_error)
802
803
804    @dts_control_command
805    def ccd_disable(self, raise_error=True):
806        """Change the values of the CC lines to disable CCD"""
807        logging.info("disable ccd")
808        self._servo.set_dts_mode('off')
809        self.wait_for_ccd_disable(raise_error=raise_error)
810
811
812    @dts_control_command
813    def ccd_enable(self, raise_error=False):
814        """Reenable CCD and reset servo interfaces"""
815        logging.info("reenable ccd")
816        self._servo.set_dts_mode('on')
817        # If the test is actually running with ccd, wait for USB communication
818        # to come up after reset.
819        if self._servo.main_device_is_ccd():
820            time.sleep(self._servo.USB_DETECTION_DELAY)
821        self.wait_for_ccd_enable(raise_error=raise_error)
822
823
824    def _level_change_req_pp(self, level):
825        """Returns True if setting the level will require physical presence"""
826        testlab_pp = level != 'testlab open' and 'testlab' in level
827        # If the level is open and the ccd capabilities say physical presence
828        # is required, then physical presence will be required.
829        open_pp = (level == 'open' and
830                   not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE])
831        return testlab_pp or open_pp
832
833
834    def _state_to_bool(self, state):
835        """Converts the state string to True or False"""
836        # TODO(mruthven): compare to 'on' once servo is up to date in the lab
837        return state.lower() in self.ON_STRINGS
838
839
840    def testlab_is_on(self):
841        """Returns True of testlab mode is on"""
842        return self._state_to_bool(self._servo.get('cr50_testlab'))
843
844
845    def set_ccd_testlab(self, state):
846        """Set the testlab mode
847
848        @param state: the desired testlab mode string: 'on' or 'off'
849        @raise TestFail: if testlab mode was not changed
850        """
851        if self._servo.main_device_is_ccd():
852            raise error.TestError('Cannot set testlab mode with CCD. Use flex '
853                    'cable instead.')
854        if not self.faft_config.has_powerbutton:
855            raise error.TestError('No power button on device')
856
857        request_on = self._state_to_bool(state)
858        testlab_on = self.testlab_is_on()
859        request_str = 'on' if request_on else 'off'
860
861        if testlab_on == request_on:
862            logging.info('ccd testlab already set to %s', request_str)
863            return
864
865        original_level = self.get_ccd_level()
866
867        # We can only change the testlab mode when the device is open. If
868        # testlab mode is already enabled, we can go directly to open using 'ccd
869        # testlab open'. This will save 5 minutes, because we can skip the
870        # physical presence check.
871        if testlab_on:
872            self.send_command('ccd testlab open')
873        else:
874            self.set_ccd_level('open')
875
876        ap_is_on = self.ap_is_on()
877        # Set testlab mode
878        rv = self.send_command_get_output('ccd testlab %s' % request_str,
879                ['ccd.*>'])[0]
880        if 'Access Denied' in rv:
881            raise error.TestFail("'ccd %s' %s" % (request_str, rv))
882
883        # Press the power button once a second for 15 seconds. If the AP is
884        # currently on, make sure it's on at the end of the open process.
885        self.run_pp(self.PP_SHORT, ensure_ap_on=ap_is_on)
886
887        self.set_ccd_level(original_level)
888        if request_on != self.testlab_is_on():
889            raise error.TestFail('Failed to set ccd testlab to %s' % state)
890
891
892    def get_ccd_level(self):
893        """Returns the current ccd privilege level"""
894        return self.get_ccd_info('State').lower().rstrip('ed')
895
896
897    def set_ccd_level(self, level, password=''):
898        """Set the Cr50 CCD privilege level.
899
900        @param level: a string of the ccd privilege level: 'open', 'lock', or
901                      'unlock'.
902        @param password: send the ccd command with password. This will still
903                         require the same physical presence.
904        @raise TestFail: if the level couldn't be set
905        """
906        # TODO(mruthven): add support for CCD password
907        level = level.lower()
908
909        if level == self.get_ccd_level():
910            logging.info('CCD privilege level is already %s', level)
911            return
912
913        if 'testlab' in level:
914            raise error.TestError("Can't change testlab mode using "
915                "ccd_set_level")
916
917        testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
918        batt_is_disconnected = self.get_batt_pres_state()[1]
919        req_pp = self._level_change_req_pp(level)
920        has_pp = not self._servo.main_device_is_ccd()
921        dbg_en = self.get_active_version_info()[2]
922
923        if req_pp and not has_pp:
924            raise error.TestError("Can't change privilege level to '%s' "
925                "without physical presence." % level)
926
927        if not testlab_on and not has_pp:
928            raise error.TestError("Wont change privilege level without "
929                "physical presence or testlab mode enabled")
930
931        original_timeout = float(self._servo.get('cr50_uart_timeout'))
932        # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may
933        # take more than 3 seconds.
934        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
935        # Start the unlock process.
936
937        if level == 'open' or level == 'unlock':
938            logging.info('waiting %d seconds, the minimum time between'
939                         ' ccd password attempts',
940                         self.CCD_PASSWORD_RATE_LIMIT)
941            time.sleep(self.CCD_PASSWORD_RATE_LIMIT)
942
943        ap_is_on = self.ap_is_on()
944        try:
945            cmd = 'ccd %s%s' % (level, (' ' + password) if password else '')
946            # ccd command outputs on the rbox, ccd, and console channels,
947            # respectively. Cr50 uses these channels to print relevant ccd
948            # information.
949            # Restrict all other channels.
950            ccd_output_channels = 0x20000 | 0x8 | 0x1
951            rv = self.send_safe_command_get_output(
952                    cmd, [cmd + '(.*)>'],
953                    channel_mask=ccd_output_channels)[0][1]
954        finally:
955            self._servo.set('cr50_uart_timeout', original_timeout)
956        logging.info(rv)
957        if 'ccd_open denied: fwmp' in rv:
958            raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv))
959        if 'Access Denied' in rv:
960            raise error.TestFail("%r %s" % (cmd, rv))
961        if 'Busy' in rv:
962            raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv))
963
964        # Press the power button once a second, if we need physical presence.
965        if req_pp and batt_is_disconnected:
966            # DBG images have shorter unlock processes. If the AP is currently
967            # on, make sure it's on at the end of the open process.
968            self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG,
969                        ensure_ap_on=ap_is_on)
970
971        if level != self.get_ccd_level():
972            self.check_for_console_errors('Running console ccd %s' % level)
973            raise error.TestFail('Could not set privilege level to %s' % level)
974
975        logging.info('Successfully set CCD privelege level to %s', level)
976
977
978    def run_pp(self, unlock_timeout, ensure_ap_on=False):
979        """Press the power button a for unlock_timeout seconds.
980
981        This will press the power button many more times than it needs to be
982        pressed. Cr50 doesn't care if you press it too often. It just cares that
983        you press the power button at least once within the detect interval.
984
985        For privilege level changes you need to press the power button 5 times
986        in the short interval and then 4 times within the long interval.
987        Short Interval
988        100msec < power button press < 5 seconds
989        Long Interval
990        60s < power button press < 300s
991
992        For testlab enable/disable you must press the power button 5 times
993        spaced between 100msec and 5 seconds apart.
994
995        @param unlock_timeout: time to press the power button in seconds.
996        @param ensure_ap_on: If true, press the power to turn on the AP.
997        """
998        end_time = time.time() + unlock_timeout
999
1000        logging.info('Pressing power button for %ds to unlock the console.',
1001                     unlock_timeout)
1002        logging.info('The process should end at %s', time.ctime(end_time))
1003
1004        # Press the power button once a second to unlock the console.
1005        while time.time() < end_time:
1006            self._servo.power_short_press()
1007            time.sleep(1)
1008
1009        # If the last power button press left the AP powered off, and it was on
1010        # before, turn it back on.
1011        time.sleep(self.faft_config.shutdown)
1012        if ensure_ap_on and not self.ap_is_on():
1013            logging.info('AP is off. Pressing the power button to turn it on')
1014            self._servo.power_short_press()
1015            logging.debug('Pressing PP to turn back on')
1016
1017
1018    def gettime(self):
1019        """Get the current cr50 system time"""
1020        result = self.send_safe_command_get_output('gettime', [' = (.*) s'])
1021        return float(result[0][1])
1022
1023
1024    def servo_dts_mode_is_valid(self):
1025        """Returns True if cr50 registers change in servo dts mode."""
1026        # This is to test that Cr50 actually recognizes the change in ccd state
1027        # We cant do that with tests using ccd, because the cr50 communication
1028        # goes down once ccd is enabled.
1029        if not self._servo.dts_mode_is_safe():
1030            return False
1031
1032        ccd_start = 'on' if self.ccd_is_enabled() else 'off'
1033        dts_start = self._servo.get_dts_mode()
1034        try:
1035            # Verify both ccd enable and disable
1036            self.ccd_disable(raise_error=True)
1037            self.ccd_enable(raise_error=True)
1038            rv = True
1039        except Exception as e:
1040            logging.info(e)
1041            rv = False
1042        self._servo.set_dts_mode(dts_start)
1043        self.wait_for_stable_ccd_state(ccd_start, 60, True)
1044        logging.info('Test setup does%s support servo DTS mode',
1045                '' if rv else 'n\'t')
1046        return rv
1047
1048
1049    def wait_until_update_is_allowed(self):
1050        """Wait until cr50 will be able to accept an update.
1051
1052        Cr50 rejects any attempt to update if it has been less than 60 seconds
1053        since it last recovered from deep sleep or came up from reboot. This
1054        will wait until cr50 gettime shows a time greater than 60.
1055        """
1056        if self.get_active_version_info()[2]:
1057            logging.info("Running DBG image. Don't need to wait for update.")
1058            return
1059        cr50_time = self.gettime()
1060        if cr50_time < 60:
1061            sleep_time = 61 - cr50_time
1062            logging.info('Cr50 has been up for %ds waiting %ds before update',
1063                         cr50_time, sleep_time)
1064            time.sleep(sleep_time)
1065
1066
1067    def tpm_is_enabled(self):
1068        """Query the current TPM mode.
1069
1070        @return:  True if TPM is enabled, False otherwise.
1071        """
1072        result = self.send_command_retry_get_output('sysinfo',
1073                ['(?i)TPM\s+MODE:\s+(enabled|disabled)'], safe=True)[0][1]
1074        logging.debug(result)
1075
1076        return result.lower() == 'enabled'
1077
1078
1079    def get_keyladder_state(self):
1080        """Get the status of H1 Key Ladder.
1081
1082        @return: The keyladder state string. prod or dev both mean enabled.
1083        """
1084        result = self.send_command_retry_get_output('sysinfo',
1085                ['(?i)Key\s+Ladder:\s+(enabled|prod|dev|disabled)'],
1086                safe=True)[0][1]
1087        logging.debug(result)
1088        return result
1089
1090
1091    def keyladder_is_disabled(self):
1092        """Get the status of H1 Key Ladder.
1093
1094        @return: True if H1 Key Ladder is disabled. False otherwise.
1095        """
1096        return self.get_keyladder_state() == 'disabled'
1097
1098
1099    def get_sleepmask(self):
1100        """Returns the sleepmask as an int"""
1101        rv = self.send_command_retry_get_output('sleepmask',
1102                ['sleep mask: (\S{8})\s+'], safe=True)[0][1]
1103        logging.info('sleepmask %s', rv)
1104        return int(rv, 16)
1105
1106
1107    def get_ccdstate(self):
1108        """Return a dictionary of the ccdstate once it's done debouncing"""
1109        for i in range(self.CCDSTATE_MAX_RETRY_COUNT):
1110            rv = self.send_command_retry_get_output('ccdstate',
1111                    ['ccdstate(.*)>'], safe=True, compare_output=True)[0][0]
1112
1113            # Look for a line like 'AP: on' or 'AP: off'. 'debouncing' or
1114            # 'unknown' may appear transiently. 'debouncing' should transition
1115            # to 'on' or 'off' within 1 second, and 'unknown' should do so
1116            # within 20 seconds.
1117            if 'debouncing' not in rv and 'unknown' not in rv:
1118                break
1119            time.sleep(self.SHORT_WAIT)
1120        ccdstate = {}
1121        for line in rv.splitlines():
1122            line = line.strip()
1123            if ':' in line:
1124                k, v = line.split(':', 1)
1125                k = k.strip()
1126                v = v.strip()
1127                if '(' in v:
1128                    ccdstate[k + ' full'] = v
1129                    v = v.split('(')[0].strip()
1130                ccdstate[k] = v
1131        logging.info('Current CCD state:\n%s', pprint.pformat(ccdstate))
1132        return ccdstate
1133
1134
1135    def ap_is_on(self):
1136        """Get the power state of the AP.
1137
1138        @return: True if the AP is on; False otherwise.
1139        """
1140        ap_state = self.get_ccdstate()['AP']
1141        if ap_state.lower() == 'on':
1142            return True
1143        elif ap_state.lower() == 'off':
1144            return False
1145        else:
1146            raise error.TestFail('Read unusable AP state from ccdstate: %r' %
1147                                 ap_state)
1148
1149
1150    def gpioget(self, signal_name):
1151        """Get the current state of the signal
1152
1153        @return an integer 1 or 0 based on the gpioget value
1154        """
1155        result = self.send_command_retry_get_output('gpioget',
1156                    ['(0|1)[ \S]*%s' % signal_name], safe=True)
1157        return int(result[0][1])
1158
1159
1160    def batt_pres_is_reset(self):
1161        """Returns True if batt pres is reset to always follow batt pres"""
1162        follow_bp, _, follow_bp_atboot, _ = self.get_batt_pres_state()
1163        return follow_bp and follow_bp_atboot
1164
1165
1166    def get_batt_pres_state(self):
1167        """Get the current and atboot battery presence state
1168
1169        The atboot setting cannot really be determined now if it is set to
1170        follow battery presence. It is likely to remain the same after reboot,
1171        but who knows. If the third element of the tuple is True, the last
1172        element will not be that useful
1173
1174        @return: a tuple of the current battery presence state
1175                 (True if current state is to follow batt presence,
1176                  True if battery is connected,
1177                  True if current state is to follow batt presence atboot,
1178                  True if battery is connected atboot)
1179        """
1180        # bpforce is added in 4.16. If the image doesn't have the command, cr50
1181        # always follows battery presence. In these images 'gpioget BATT_PRES_L'
1182        # accurately represents the battery presence state, because it can't be
1183        # overidden.
1184        if not self.has_command('bpforce'):
1185            batt_pres = not bool(self.gpioget('BATT_PRES_L'))
1186            return (True, batt_pres, True, batt_pres)
1187
1188        # The bpforce command is very similar to the wp command. It just
1189        # substitutes 'connected' for 'enabled' and 'disconnected' for
1190        # 'disabled'.
1191        rv = self.send_command_retry_get_output('bpforce',
1192                ['batt pres: (forced )?(con|dis).*at boot: (forced )?'
1193                 '(follow|discon|con)'], safe=True)[0]
1194        _, forced, connected, _, atboot = rv
1195        logging.info(rv)
1196        return (not forced, connected == 'con', atboot == 'follow',
1197                atboot == 'con')
1198
1199
1200    def set_batt_pres_state(self, state, atboot):
1201        """Override the battery presence state.
1202
1203        @param state: a string of the battery presence setting: 'connected',
1204                  'disconnected', or 'follow_batt_pres'
1205        @param atboot: True if we're overriding battery presence atboot
1206        """
1207        cmd = 'bpforce %s%s' % (state, ' atboot' if atboot else '')
1208        logging.info('running %r', cmd)
1209        self.send_command(cmd)
1210
1211
1212    def dump_nvmem(self):
1213        """Print nvmem objects."""
1214        rv = self.send_safe_command_get_output('dump_nvmem',
1215                                               ['dump_nvmem(.*)>'])[0][1]
1216        logging.info('NVMEM OUTPUT:\n%s', rv)
1217
1218
1219    def get_reset_cause(self):
1220        """Returns the reset flags for the last reset."""
1221        rv = self.send_command_retry_get_output('sysinfo',
1222                ['Reset flags:\s+0x([0-9a-f]{8})\s'], compare_output=True)[0][1]
1223        logging.info('reset cause: %s', rv)
1224        return int(rv, 16)
1225
1226
1227    def was_reset(self, reset_type):
1228        """Returns 1 if the reset type is found in the reset_cause.
1229
1230        @param reset_type: reset name in string type.
1231        """
1232        reset_cause = self.get_reset_cause()
1233        reset_flag = self.RESET_FLAGS[reset_type]
1234        return bool(reset_cause & reset_flag)
1235
1236
1237    def get_devid(self):
1238        """Returns the cr50 serial number."""
1239        return self.send_command_retry_get_output('sysinfo',
1240                ['DEV_ID:\s+(0x[0-9a-f]{8} 0x[0-9a-f]{8})'])[0][1]
1241
1242
1243    def get_serial(self):
1244        """Returns the cr50 serial number."""
1245        serial = self.get_devid().replace('0x', '').replace(' ', '-').upper()
1246        logging.info('CCD serial: %s', serial)
1247        return serial
1248
1249    def check_boot_mode(self, mode_exp='NORMAL'):
1250        """Query the boot mode to Cr50, and compare it against mode_exp.
1251
1252        Args:
1253            mode_exp: expecting boot mode. It should be either 'NORMAL'
1254                      or 'NO_BOOT'.
1255        Returns:
1256            True if the boot mode matches mode_exp.
1257            False, otherwise.
1258        Raises:
1259            TestError: Input parameter is not valid.
1260        """
1261
1262        if mode_exp not in ['NORMAL', 'NO_BOOT']:
1263            raise error.TestError('parameter, mode_exp is not valid: %s' %
1264                                  mode_exp)
1265        rv = self.send_command_retry_get_output('ec_comm',
1266                ['boot_mode\s*:\s*(NORMAL|NO_BOOT)'], safe=True)
1267        return mode_exp == rv[0][1]
1268
1269    def get_reset_count(self):
1270        """Returns the cr50 reset count"""
1271        return self.send_command_retry_get_output('sysinfo',
1272                                                  ['Reset count: (\d+)'],
1273                                                  safe=True)[0][1]
1274
1275    def check_servo_monitor(self):
1276        """Returns True if cr50 can detect servo connect/disconnect"""
1277        orig_dts = self._servo.get('servo_dts_mode')
1278        # Detach ccd so EC uart won't interfere with servo detection
1279        self._servo.set_dts_mode('off')
1280        self._servo.set('ec_uart_en', 'off')
1281        time.sleep(self.SHORT_WAIT)
1282        if self.get_ccdstate()['Servo'] != 'disconnected':
1283            self._servo.set_dts_mode(orig_dts)
1284            return False
1285
1286        self._servo.set('ec_uart_en', 'on')
1287        time.sleep(self.SHORT_WAIT)
1288        if self.get_ccdstate()['Servo'] != 'connected':
1289            self._servo.set_dts_mode(orig_dts)
1290            return False
1291        self._servo.set_dts_mode(orig_dts)
1292        return True
1293
1294    def fips_crypto_allowed(self):
1295        """Return 1 if fips crypto is enabled."""
1296        if not self.has_command('fips'):
1297            return 0
1298
1299        rv = self.send_command_retry_get_output('fips', [self.FIPS_RE])
1300        logging.info('FIPS: %r', rv)
1301        _, approved, allowed = rv[0]
1302        if int(approved == '') != int(allowed):
1303            raise error.TestFail('Approved does not match allowed %r' % rv)
1304        return int(allowed)
1305
1306    def unlock_is_supported(self):
1307        """Returns True if GSC supports the ccd unlock state."""
1308        return True
1309
1310    def cap_is_always_on(self, cap):
1311        """Returns True if the capability is set to Always"""
1312        rv = self.send_command_retry_get_output('ccd',
1313                                                [cap + self.CAP_FORMAT])[0]
1314        # The third field could be Default or "Always". If it's Default,
1315        # "Always" must show up in the third field.
1316        return self.CAP_ALWAYS in rv[2] or self.CAP_ALWAYS in rv[3]
1317
1318    def servo_drv_enabled(self):
1319        """Check if the caps  are accessible on boards wigh gsc controls."""
1320        if not self._servo.main_device_uses_gsc_drv():
1321            return True
1322        for cap in self.SERVO_DRV_CAPS:
1323            # If any capability isn't accessible, return False.
1324            if not self.cap_is_always_on(cap):
1325                return False
1326        return True
1327
1328    def enable_servo_control_caps(self):
1329        """Set all servo control capabilities to Always."""
1330        # Nothing do do if servo doesn't use gsc for any controls.
1331        if not self._servo.main_device_uses_gsc_drv():
1332            return
1333        logging.info('Setting servo caps to Always')
1334        self.send_command('ccd testlab open')
1335        for cap in self.SERVO_DRV_CAPS:
1336            self.send_command('ccd set %s Always' % cap)
1337        return self.servo_drv_enabled()
1338
1339    def ccd_reset_factory(self):
1340        """Enable factory mode."""
1341        self.send_command('ccd reset factory')
1342
1343    def ccd_reset(self, servo_en=True):
1344        """Reset ccd capabilities."""
1345        servo_uses_gsc = self._servo.main_device_uses_gsc_drv()
1346        # If testlab mode is enabled, capabilities can be restored. It's
1347        # ok to reset ccd.
1348        if not servo_en and servo_uses_gsc and not self.testlab_is_on():
1349            raise error.TestError(
1350                    'Board uses ccd drivers. Enable testlab mode '
1351                    'before ccd reset')
1352        self.send_command('ccd reset')
1353        if servo_en:
1354            self.enable_servo_control_caps()
1355
1356    def check_for_console_errors(self, desc):
1357        """Check cr50 uart output for errors.
1358
1359        Use the logs captured during firmware_test cleanup to check for cr50
1360        errors. Flash operation issues aren't obvious unless you check the logs.
1361        All flash op errors print do_flash_op and it isn't printed during normal
1362        operation. Open the cr50 uart file and count the number of times this is
1363        printed. Log the number of errors.
1364        """
1365        self._servo.record_uart_capture()
1366        cr50_uart_file = self._servo.get_uart_logfile('cr50')
1367        if not cr50_uart_file:
1368            logging.info('There is not a cr50 uart file')
1369            return
1370
1371        flash_error_count = 0
1372        usb_error_count = 0
1373        watchdog_count = 0
1374        with open(cr50_uart_file, 'r') as f:
1375            for line in f:
1376                if self.FLASH_OP_ERROR_MSG in line:
1377                    flash_error_count += 1
1378                if self.USB_ERROR in line:
1379                    usb_error_count += 1
1380                if self.WATCHDOG_RST in line:
1381                    watchdog_count += 1
1382
1383        # Log any flash operation errors.
1384        logging.info('do_flash_op count: %d', flash_error_count)
1385        logging.info('usb error count: %d', usb_error_count)
1386        logging.info('watchdog count: %d', watchdog_count)
1387        if watchdog_count:
1388            raise error.TestFail('Found %r %d times in logs after %s' %
1389                                 (self.WATCHDOG_RST, watchdog_count, desc))
1390
1391    def ap_ro_version_is_supported(self, version):
1392        """Returns True if GSC supports the given version."""
1393        return version in self.AP_RO_VERSIONS
1394
1395    def ap_ro_supported(self):
1396        """Returns True if the hash is saved and AP RO is supported."""
1397        return self.send_command_retry_get_output(
1398                'ap_ro_info', [self.AP_RO_SUPPORTED_RE])[0][2] == 'yes'
1399
1400    def get_ap_ro_info(self):
1401        """Returns a dictionary of the AP RO info.
1402
1403        Get the ap_ro_info output. Convert it to a usable dictionary.
1404
1405        Returns:
1406            A dictionary with the following key value pairs.
1407                'reason': String of unsupported reason or None if ap ro is
1408                          supported.
1409                'hash': 64 char hash or None if it isn't supported.
1410                'supported': bool whether AP RO verification is supported.
1411                'result': int of the AP RO verification result.
1412        """
1413        # Cr50 prints different output based on whether ap ro verification is
1414        # supported.
1415        if self.ap_ro_supported():
1416            output = self.AP_RO_SAVED_OUTPUT
1417        else:
1418            output = self.AP_RO_UNSUPPORTED_OUTPUT
1419        # The reason and hash output is optional. Make sure it's in the
1420        # dictionary even if it isn't in the output.
1421        info = {'hash': None, 'reason': None}
1422        rv = self.send_command_retry_get_output('ap_ro_info',
1423                                                output,
1424                                                compare_output=True)
1425        for _, k, v in rv:
1426            # Make key more usable.
1427            if k == 'ap_ro_check_unsupported':
1428                k = 'reason'
1429            # Convert digit strings to ints
1430            if v.isdigit():
1431                v = int(v)
1432            # Convert yes or no to bool
1433            if k == 'supported':
1434                v = v == 'yes'
1435            info[k] = v
1436        return info
1437