• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import functools
6import logging
7import pprint
8import re
9import time
10
11from autotest_lib.client.bin import utils
12from autotest_lib.client.common_lib import error
13from autotest_lib.client.common_lib.cros import cr50_utils
14from autotest_lib.server.cros.servo import chrome_ec
15
16
17def dts_control_command(func):
18    """For methods that should only run when dts mode control is supported."""
19    @functools.wraps(func)
20    def wrapper(instance, *args, **kwargs):
21        """Ignore those functions if dts mode control is not supported."""
22        if instance._servo.dts_mode_is_valid():
23            return func(instance, *args, **kwargs)
24        logging.info('Servo setup does not support DTS mode. ignoring %s',
25                     func.func_name)
26    return wrapper
27
28
29class ChromeCr50(chrome_ec.ChromeConsole):
30    """Manages control of a Chrome Cr50.
31
32    We control the Chrome Cr50 via the console of a Servo board. Chrome Cr50
33    provides many interfaces to set and get its behavior via console commands.
34    This class is to abstract these interfaces.
35    """
36    PROD_RW_KEYIDS = ['0x87b73b67', '0xde88588d']
37    PROD_RO_KEYIDS = ['0xaa66150f']
38    OPEN = 'open'
39    UNLOCK = 'unlock'
40    LOCK = 'lock'
41    # The amount of time you need to show physical presence.
42    PP_SHORT = 15
43    PP_LONG = 300
44    CCD_PASSWORD_RATE_LIMIT = 3
45    IDLE_COUNT = 'count: (\d+)\s'
46    SHORT_WAIT = 3
47    # The version has four groups: the partition, the header version, debug
48    # descriptor and then version string.
49    # There are two partitions A and B. The active partition is marked with a
50    # '*'. If it is a debug image '/DBG' is added to the version string. If the
51    # image has been corrupted, the version information will be replaced with
52    # 'Error'.
53    # So the output may look something like this.
54    #   RW_A:    0.0.21/cr50_v1.1.6133-fd788b
55    #   RW_B:  * 0.0.22/DBG/cr50_v1.1.6138-b9f0b1d
56    # Or like this if the region was corrupted.
57    #   RW_A:  * 0.0.21/cr50_v1.1.6133-fd788b
58    #   RW_B:    Error
59    VERSION_FORMAT = '\nRW_(A|B): +%s +(\d+\.\d+\.\d+|Error)(/DBG)?(\S+)?\s'
60    INACTIVE_VERSION = VERSION_FORMAT % ''
61    ACTIVE_VERSION = VERSION_FORMAT % '\*'
62    # Following lines of the version output may print the image board id
63    # information. eg.
64    # BID A:   5a5a4146:ffffffff:00007f00 Yes
65    # BID B:   00000000:00000000:00000000 Yes
66    # Use the first group from ACTIVE_VERSION to match the active board id
67    # partition.
68    BID_ERROR = 'read_board_id: failed'
69    BID_FORMAT = ':\s+[a-f0-9:]+ '
70    ACTIVE_BID = r'%s.*(\1%s|%s.*>)' % (ACTIVE_VERSION, BID_FORMAT,
71            BID_ERROR)
72    WAKE_CHAR = '\n\n'
73    WAKE_RESPONSE = ['(>|Console is enabled)']
74    START_UNLOCK_TIMEOUT = 20
75    GETTIME = ['= (\S+)']
76    FWMP_LOCKED_PROD = ["Managed device console can't be unlocked"]
77    FWMP_LOCKED_DBG = ['Ignoring FWMP unlock setting']
78    MAX_RETRY_COUNT = 5
79    CCDSTATE_MAX_RETRY_COUNT = 20
80    START_STR = ['(.*Console is enabled;)']
81    REBOOT_DELAY_WITH_CCD = 60
82    REBOOT_DELAY_WITH_FLEX = 3
83    ON_STRINGS = ['enable', 'enabled', 'on']
84    CONSERVATIVE_CCD_WAIT = 10
85    CCD_SHORT_PRESSES = 5
86    CAP_IS_ACCESSIBLE = 0
87    CAP_SETTING = 1
88    CAP_REQ = 2
89    GET_CAP_TRIES = 5
90    # Regex to match the valid capability settings.
91    CAP_STATES = '(Always|Default|IfOpened|UnlessLocked)'
92    # List of all cr50 ccd capabilities. Same order of 'ccd' output
93    CAP_NAMES = [
94        'UartGscRxAPTx', 'UartGscTxAPRx', 'UartGscRxECTx', 'UartGscTxECRx',
95        'FlashAP', 'FlashEC', 'OverrideWP', 'RebootECAP', 'GscFullConsole',
96        'UnlockNoReboot', 'UnlockNoShortPP', 'OpenNoTPMWipe', 'OpenNoLongPP',
97        'BatteryBypassPP', 'UpdateNoTPMWipe', 'I2C', 'FlashRead',
98        'OpenNoDevMode', 'OpenFromUSB'
99    ]
100    # There are two capability formats. Match both.
101    #  UartGscRxECTx   Y 3=IfOpened
102    #  or
103    #  UartGscRxECTx   Y 0=Default (Always)
104    # Make sure the last word is at the end of the line. The next line will
105    # start with some whitespace, so account for that too.
106    CAP_FORMAT = '\s+(Y|-) \d\=%s( \(%s\))?[\r\n]+\s*' % (CAP_STATES,
107                                                          CAP_STATES)
108    # Name each group, so we can use groupdict to extract all useful information
109    # from the ccd outupt.
110    CCD_FORMAT = [
111        '(State: (?P<State>Opened|Locked|Unlocked))',
112        '(Password: (?P<Password>set|none))',
113        '(Flags: (?P<Flags>\S*))',
114        '(Capabilities:.*(?P<Capabilities>%s))' %
115                (CAP_FORMAT.join(CAP_NAMES) + CAP_FORMAT),
116        '(TPM:(?P<TPM>[ \S]*)\r)',
117    ]
118
119    # CR50 Board Properties as defined in platform/ec/board/cr50/scratch-reg1.h
120    BOARD_PROP = {
121           'BOARD_SLAVE_CONFIG_SPI'      : 1 << 0,
122           'BOARD_SLAVE_CONFIG_I2C'      : 1 << 1,
123           'BOARD_NEEDS_SYS_RST_PULL_UP' : 1 << 5,
124           'BOARD_USE_PLT_RESET'         : 1 << 6,
125           'BOARD_WP_ASSERTED'           : 1 << 8,
126           'BOARD_FORCING_WP'            : 1 << 9,
127           'BOARD_NO_RO_UART'            : 1 << 10,
128           'BOARD_CCD_STATE_MASK'        : 3 << 11,
129           'BOARD_DEEP_SLEEP_DISABLED'   : 1 << 13,
130           'BOARD_DETECT_AP_WITH_UART'   : 1 << 14,
131           'BOARD_ITE_EC_SYNC_NEEDED'    : 1 << 15,
132           'BOARD_WP_DISABLE_DELAY'      : 1 << 16,
133           'BOARD_CLOSED_SOURCE_SET1'    : 1 << 17,
134           'BOARD_CLOSED_LOOP_RESET'     : 1 << 18,
135           'BOARD_NO_INA_SUPPORT'        : 1 << 19,
136           'BOARD_ALLOW_CHANGE_TPM_MODE' : 1 << 20,
137    }
138
139    # CR50 reset flags as defined in platform ec_commands.h. These are only the
140    # flags used by cr50.
141    RESET_FLAGS = {
142           'RESET_FLAG_OTHER'            : 1 << 0,
143           'RESET_FLAG_BROWNOUT'         : 1 << 2,
144           'RESET_FLAG_POWER_ON'         : 1 << 3,
145           'RESET_FLAG_SOFT'             : 1 << 5,
146           'RESET_FLAG_HIBERNATE'        : 1 << 6,
147           'RESET_FLAG_RTC_ALARM'        : 3 << 7,
148           'RESET_FLAG_WAKE_PIN'         : 1 << 8,
149           'RESET_FLAG_HARD'             : 1 << 11,
150           'RESET_FLAG_USB_RESUME'       : 1 << 14,
151           'RESET_FLAG_RDD'              : 1 << 15,
152           'RESET_FLAG_RBOX'             : 1 << 16,
153           'RESET_FLAG_SECURITY'         : 1 << 17,
154    }
155
156    def __init__(self, servo, faft_config):
157        """Initializes a ChromeCr50 object.
158
159        @param servo: A servo object.
160        @param faft_config: A faft config object.
161        """
162        super(ChromeCr50, self).__init__(servo, 'cr50_uart')
163        self.faft_config = faft_config
164
165
166    def wake_cr50(self):
167        """Wake up cr50 by sending some linebreaks and wait for the response"""
168        logging.debug(super(ChromeCr50, self).send_command_get_output(
169                self.WAKE_CHAR, self.WAKE_RESPONSE))
170
171
172    def send_command(self, commands):
173        """Send command through UART.
174
175        Cr50 will drop characters input to the UART when it resumes from sleep.
176        If servo is not using ccd, send some dummy characters before sending the
177        real command to make sure cr50 is awake.
178
179        @param commands: the command string to send to cr50
180        """
181        if self._servo.main_device_is_flex():
182            self.wake_cr50()
183        super(ChromeCr50, self).send_command(commands)
184
185
186    def set_cap(self, cap, setting):
187        """Set the capability to setting
188
189        @param cap: The capability string
190        @param setting: The setting to set the capability to.
191        """
192        self.set_caps({ cap : setting })
193
194
195    def set_caps(self, cap_dict):
196        """Use cap_dict to set all the cap values
197
198        Set all of the capabilities in cap_dict to the correct config.
199
200        @param cap_dict: A dictionary with the capability as key and the desired
201                         setting as values
202        """
203        for cap, config in cap_dict.iteritems():
204            self.send_command('ccd set %s %s' % (cap, config))
205        current_cap_settings = self.get_cap_dict(info=self.CAP_SETTING)
206        for cap, config in cap_dict.iteritems():
207            if (current_cap_settings[cap].lower() !=
208                config.lower()):
209                raise error.TestFail('Failed to set %s to %s' % (cap, config))
210
211
212    def get_cap_overview(self, cap_dict):
213        """Get a basic overview of the capability dictionary
214
215        If all capabilities are set to Default, ccd has been reset to default.
216        If all capabilities are set to Always, ccd is in factory mode.
217
218        @param cap_dict: A dictionary of the capability settings
219        @return: A tuple of the capability overview (in factory mode, is reset)
220        """
221        in_factory_mode = True
222        is_reset = True
223        for cap, cap_info in cap_dict.iteritems():
224            cap_setting = cap_info[self.CAP_SETTING]
225            if cap_setting != 'Always':
226                in_factory_mode = False
227            if cap_setting != 'Default':
228                is_reset = False
229        return in_factory_mode, is_reset
230
231
232    def password_is_reset(self):
233        """Returns True if the password is cleared"""
234        return self.get_ccd_info()['Password'] == 'none'
235
236
237    def ccd_is_reset(self):
238        """Returns True if the ccd is reset
239
240        The password must be cleared, write protect and battery presence must
241        follow battery presence, and all capabilities must be Always
242        """
243        return (self.password_is_reset() and self.wp_is_reset() and
244                self.batt_pres_is_reset() and
245                self.get_cap_overview(self.get_cap_dict())[1])
246
247
248    def wp_is_reset(self):
249        """Returns True if wp is reset to follow batt pres at all times"""
250        follow_batt_pres, _, follow_batt_pres_atboot, _ = self.get_wp_state()
251        return follow_batt_pres and follow_batt_pres_atboot
252
253
254    def get_wp_state(self):
255        """Get the current write protect and atboot state
256
257        The atboot setting cannot really be determined now if it is set to
258        follow battery presence. It is likely to remain the same after reboot,
259        but who knows. If the third element of the tuple is True, the last
260        element will not be that useful
261
262        @return: a tuple with the current write protect state
263                (True if current state is to follow batt presence,
264                 True if write protect is enabled,
265                 True if current state is to follow batt presence atboot,
266                 True if write protect is enabled atboot)
267        """
268        rv = self.send_command_retry_get_output('wp',
269                ['Flash WP: (forced )?(enabled|disabled).*at boot: (forced )?'
270                 '(follow|enabled|disabled)'], safe=True)[0]
271        _, forced, enabled, _, atboot = rv
272        logging.debug(rv)
273        return (not forced, enabled =='enabled',
274                atboot == 'follow', atboot == 'enabled')
275
276
277    def in_dev_mode(self):
278        """Return True if cr50 thinks the device is in dev mode"""
279        return 'dev_mode' in self.get_ccd_info()['TPM']
280
281
282    def get_ccd_info(self):
283        """Get the current ccd state.
284
285        Take the output of 'ccd' and convert it to a dictionary.
286
287        @return: A dictionary with the ccd state name as the key and setting as
288                 value.
289        """
290        matched_output = None
291        original_timeout = float(self._servo.get('cr50_uart_timeout'))
292        # Change the console timeout to 10s, it may take longer than 3s to read
293        # ccd info
294        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
295        for i in range(self.GET_CAP_TRIES):
296          try:
297            # If some ccd output is dropped and the output doesn't match the
298            # expected ccd output format, send_command_get_output will wait the
299            # full CONSERVATIVE_CCD_WAIT even though ccd is done printing. Use
300            # re to search the command output instead of
301            # send_safe_command_get_output, so we don't have to wait the full
302            # timeout if output is dropped.
303            rv = self.send_command_retry_get_output('ccd', ['ccd.*>'],
304                    safe=True)[0]
305            matched_output = re.search('.*'.join(self.CCD_FORMAT), rv,
306                                       re.DOTALL)
307            if matched_output:
308                break
309            logging.info('try %d: could not match ccd output %s', i, rv)
310          except Exception, e:
311            logging.info('try %d got error %s', i, str(e))
312
313        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
314        if not matched_output:
315            raise error.TestFail('Could not get ccd output')
316        logging.info('Current CCD settings:\n%s',
317                     pprint.pformat(matched_output.groupdict()))
318        return matched_output.groupdict()
319
320
321    def get_cap(self, cap):
322        """Returns the capabilitiy from the capability dictionary"""
323        return self.get_cap_dict()[cap]
324
325
326    def get_cap_dict(self, info=None):
327        """Get the current ccd capability settings.
328
329        The capability may be using the 'Default' setting. That doesn't say much
330        about the ccd state required to use the capability. Return all ccd
331        information in the cap_dict
332        [is accessible, setting, requirement]
333
334        @param info: Only fill the cap_dict with the requested information:
335                     CAP_IS_ACCESSIBLE, CAP_SETTING, or CAP_REQ
336        @return: A dictionary with the capability as the key a list of the
337                 current settings as the value [is_accessible, setting,
338                 requirement]
339        """
340        # Add whitespace at the end, so we can still match the last line.
341        cap_info_str = self.get_ccd_info()['Capabilities'] + '\r\n'
342        cap_settings = re.findall('(\S+) ' + self.CAP_FORMAT,
343                                  cap_info_str)
344        caps = {}
345        for cap, accessible, setting, _, required in cap_settings:
346            # If there's only 1 value after =, then the setting is the
347            # requirement.
348            if not required:
349                required = setting
350            cap_info = [accessible == 'Y', setting, required]
351            if info is not None:
352                caps[cap] = cap_info[info]
353            else:
354                caps[cap] = cap_info
355        logging.debug(pprint.pformat(caps))
356        return caps
357
358
359    def send_command_get_output(self, command, regexp_list):
360        """Send command through UART and wait for response.
361
362        Cr50 will drop characters input to the UART when it resumes from sleep.
363        If servo is not using ccd, send some dummy characters before sending the
364        real command to make sure cr50 is awake.
365
366        @param command: the command to send
367        @param regexp_list: The list of regular expressions to match in the
368                            command output
369        @return: A list of matched output
370        """
371        if self._servo.main_device_is_flex():
372            self.wake_cr50()
373
374        # We have started prepending '\n' to separate cr50 console junk from
375        # the real command. If someone is just searching for .*>, then they will
376        # only get the output from the first '\n' we added. Raise an error to
377        # change the test to look for something more specific ex command.*>.
378        # cr50 will print the command in the output, so that is an easy way to
379        # modify '.*>' to match the real command output.
380        if '.*>' in regexp_list:
381            raise error.TestError('Send more specific regexp %r %r' % (command,
382                    regexp_list))
383
384        # prepend \n to separate the command from any junk that may have been
385        # sent to the cr50 uart.
386        command = '\n' + command
387        return super(ChromeCr50, self).send_command_get_output(command,
388                                                               regexp_list)
389
390
391    def send_safe_command_get_output(self, command, regexp_list,
392            channel_mask=0x1):
393        """Restrict the console channels while sending console commands.
394
395        @param command: the command to send
396        @param regexp_list: The list of regular expressions to match in the
397                            command output
398        @param channel_mask: The mask to pass to 'chan' prior to running the
399                             command, indicating which channels should remain
400                             enabled (0x1 is command output)
401        @return: A list of matched output
402        """
403        self.send_command('chan save')
404        self.send_command('chan 0x%x' % channel_mask)
405        try:
406            rv = self.send_command_get_output(command, regexp_list)
407        finally:
408            self.send_command('chan restore')
409        return rv
410
411
412    def send_command_retry_get_output(self, command, regexp_list, safe=False,
413                                      compare_output=False):
414        """Retry the command 5 times if you get a timeout or drop some output
415
416
417        @param command: the command string
418        @param regexp_list: the regex to search for
419        @param safe: use send_safe_command_get_output if True otherwise use
420                     send_command_get_output
421        @param compare_output: look for reproducible output
422        """
423        send_command = (self.send_safe_command_get_output if safe else
424                        self.send_command_get_output)
425        err = 'no consistent output' if compare_output else 'unknown'
426        past_rv = []
427        for i in range(self.MAX_RETRY_COUNT):
428            try:
429                rv = send_command(command, regexp_list)
430                if not compare_output or rv in past_rv:
431                    return rv
432                if past_rv:
433                    logging.debug('%d %s not in %s', i, rv, past_rv)
434                past_rv.append(rv)
435            except Exception, e:
436                err = str(e)
437                logging.info('attempt %d %r: %s', i, command, str(e))
438        if compare_output:
439            logging.info('No consistent output for %r %s', command,
440                         pprint.pformat(past_rv))
441        raise error.TestError('Issue sending %r command: %s' % (command, err))
442
443
444    def get_deep_sleep_count(self):
445        """Get the deep sleep count from the idle task"""
446        result = self.send_command_retry_get_output('idle', [self.IDLE_COUNT],
447                                                    safe=True)
448        return int(result[0][1])
449
450
451    def clear_deep_sleep_count(self):
452        """Clear the deep sleep count"""
453        self.send_command('idle c')
454        if self.get_deep_sleep_count():
455            raise error.TestFail("Could not clear deep sleep count")
456
457
458    def get_board_properties(self):
459        """Get information from the version command"""
460        rv = self.send_command_retry_get_output('brdprop',
461                ['properties = (\S+)\s'], safe=True)
462        return int(rv[0][1], 16)
463
464
465    def uses_board_property(self, prop_name):
466        """Returns 1 if the given property is set, or 0 otherwise
467
468        @param prop_name: a property name in string type.
469        """
470        brdprop = self.get_board_properties()
471        prop = self.BOARD_PROP[prop_name]
472        return bool(brdprop & prop)
473
474
475    def has_command(self, cmd):
476        """Returns 1 if cr50 has the command 0 if it doesn't"""
477        try:
478            self.send_safe_command_get_output('help', [cmd])
479        except:
480            logging.info("Image does not include '%s' command", cmd)
481            return 0
482        return 1
483
484
485    def reboot(self):
486        """Reboot Cr50 and wait for cr50 to reset"""
487        self.wait_for_reboot(cmd='reboot')
488
489
490    def _uart_wait_for_reboot(self, cmd='\n', timeout=60):
491        """Use uart to wait for cr50 to reboot.
492
493        If a command is given run it and wait for cr50 to reboot. Monitor
494        the cr50 uart to detect the reset. Wait up to timeout seconds
495        for the reset.
496
497        @param cmd: the command to run to reset cr50.
498        @param timeout: seconds to wait to detect the reboot.
499        """
500        original_timeout = float(self._servo.get('cr50_uart_timeout'))
501        # Change the console timeout to timeout, so we wait at least that long
502        # for cr50 to print the start string.
503        self._servo.set_nocheck('cr50_uart_timeout', timeout)
504        try:
505            self.send_command_get_output(cmd, self.START_STR)
506            logging.debug('Detected cr50 reboot')
507        except error.TestFail, e:
508            logging.debug('Failed to detect cr50 reboot')
509        # Reset the timeout.
510        self._servo.set_nocheck('cr50_uart_timeout', original_timeout)
511
512
513    def wait_for_reboot(self, cmd='\n', timeout=60):
514        """Wait for cr50 to reboot
515
516        Run the cr50 reset command. Wait for cr50 to reset and reenable ccd if
517        necessary.
518
519        @param cmd: the command to run to reset cr50.
520        @param timeout: seconds to wait to detect the reboot.
521        """
522        if self._servo.main_device_is_ccd():
523            self.send_command(cmd)
524            # Cr50 USB is reset when it reboots. Wait for the CCD connection to
525            # go down to detect the reboot.
526            self.wait_for_ccd_disable(timeout, raise_error=False)
527            self.ccd_enable()
528        else:
529            self._uart_wait_for_reboot(cmd, timeout)
530
531        # On most devices, a Cr50 reset will cause an AP reset. Force this to
532        # happen on devices where the AP is left down.
533        if not self.faft_config.ap_up_after_cr50_reboot:
534            logging.info('Resetting DUT after Cr50 reset')
535            self._servo.get_power_state_controller().reset()
536
537
538    def set_board_id(self, chip_bid, chip_flags):
539        """Set the chip board id type and flags."""
540        self.send_command('bid 0x%x 0x%x' % (chip_bid, chip_flags))
541
542
543    def get_board_id(self):
544        """Get the chip board id type and flags.
545
546        bid_type_inv will be '' if the bid output doesn't show it. If no board
547        id type inv is shown, then board id is erased will just check the type
548        and flags.
549
550        @returns a tuple (A string of bid_type:bid_type_inv:bid_flags,
551                          True if board id is erased)
552        """
553        bid = self.send_command_retry_get_output('bid',
554                    ['Board ID: (\S{8}):?(|\S{8}), flags (\S{8})\s'],
555                    safe=True)[0][1:]
556        bid_str = ':'.join(bid)
557        bid_is_erased =  set(bid).issubset({'', 'ffffffff'})
558        logging.info('chip board id: %s', bid_str)
559        logging.info('chip board id is erased: %s',
560                     'yes' if bid_is_erased else 'no')
561        return bid_str, bid_is_erased
562
563
564    def eraseflashinfo(self, retries=10):
565        """Run eraseflashinfo.
566
567        @returns True if the board id is erased
568        """
569        for i in range(retries):
570            # The console could drop characters while matching 'eraseflashinfo'.
571            # Retry if the command times out. It's ok to run eraseflashinfo
572            # multiple times.
573            rv = self.send_command_retry_get_output(
574                    'eraseflashinfo', ['eraseflashinfo(.*)>'])[0][1].strip()
575            logging.info('eraseflashinfo output: %r', rv)
576            bid_erased = self.get_board_id()[1]
577            eraseflashinfo_issue = 'Busy' in rv or 'do_flash_op' in rv
578            if not eraseflashinfo_issue and bid_erased:
579                break
580            logging.info('Retrying eraseflashinfo')
581        return bid_erased
582
583
584    def rollback(self):
585        """Set the reset counter high enough to force a rollback and reboot."""
586        if not self.has_command('rollback'):
587            raise error.TestError("need image with 'rollback'")
588
589        inactive_partition = self.get_inactive_version_info()[0]
590
591        self.wait_for_reboot(cmd='rollback')
592
593        running_partition = self.get_active_version_info()[0]
594        if inactive_partition != running_partition:
595            raise error.TestError("Failed to rollback to inactive image")
596
597
598    def rolledback(self):
599        """Returns true if cr50 just rolled back"""
600        return 'Rollback detected' in self.send_command_retry_get_output(
601                'sysinfo', ['sysinfo.*>'], safe=True)[0]
602
603
604    def get_version_info(self, regexp):
605        """Get information from the version command"""
606        return self.send_command_retry_get_output('ver', [regexp],
607                                                  safe=True)[0][1::]
608
609
610    def get_inactive_version_info(self):
611        """Get the active partition, version, and hash"""
612        return self.get_version_info(self.INACTIVE_VERSION)
613
614
615    def get_active_version_info(self):
616        """Get the active partition, version, and hash"""
617        return self.get_version_info(self.ACTIVE_VERSION)
618
619
620    def using_prod_rw_keys(self):
621        """Returns True if the RW keyid is prod"""
622        rv = self.send_command_retry_get_output('sysinfo',
623                ['RW keyid:\s+(0x[0-9a-f]{8})'], safe=True)[0][1]
624        logging.info('RW Keyid: 0x%s', rv)
625        return rv in self.PROD_RW_KEYIDS
626
627
628    def get_active_board_id_str(self):
629        """Get the running image board id.
630
631        @return: The board id string or None if the image does not support board
632                 id or the image is not board id locked.
633        """
634        # Getting the board id from the version console command is only
635        # supported in board id locked images .22 and above. Any image that is
636        # board id locked will have support for getting the image board id.
637        #
638        # If board id is not supported on the device, return None. This is
639        # still expected on all current non board id locked release images.
640        try:
641            version_info = self.get_version_info(self.ACTIVE_BID)
642        except error.TestFail, e:
643            logging.info(str(e))
644            logging.info('Cannot use the version to get the board id')
645            return None
646
647        if self.BID_ERROR in version_info[4]:
648            raise error.TestError(version_info)
649        bid = version_info[4].split()[1]
650        return cr50_utils.GetBoardIdInfoString(bid)
651
652
653    def get_version(self):
654        """Get the RW version"""
655        return self.get_active_version_info()[1].strip()
656
657
658    def ccd_is_enabled(self):
659        """Return True if ccd is enabled.
660
661        If the test is running through ccd, return the ccd_state value. If
662        a flex cable is being used, use the CCD_MODE_L gpio setting to determine
663        if Cr50 has ccd enabled.
664
665        @return: 'off' or 'on' based on whether the cr50 console is working.
666        """
667        if self._servo.main_device_is_ccd():
668            return self._servo.get('ccd_state') == 'on'
669        else:
670            return not bool(self.gpioget('CCD_MODE_L'))
671
672
673    @dts_control_command
674    def wait_for_stable_ccd_state(self, state, timeout, raise_error):
675        """Wait up to timeout seconds for CCD to be 'on' or 'off'
676
677        Verify ccd is off or on and remains in that state for 3 seconds.
678
679        @param state: a string either 'on' or 'off'.
680        @param timeout: time in seconds to wait
681        @param raise_error: Raise TestFail if the value is state is not reached.
682        @raise TestFail: if ccd never reaches the specified state
683        """
684        wait_for_enable = state == 'on'
685        logging.info("Wait until ccd is %s", 'on' if wait_for_enable else 'off')
686        enabled = utils.wait_for_value(self.ccd_is_enabled, wait_for_enable,
687                                       timeout_sec=timeout)
688        if enabled != wait_for_enable:
689            error_msg = ("timed out before detecting ccd '%s'" %
690                         ('on' if wait_for_enable else 'off'))
691            if raise_error:
692                raise error.TestFail(error_msg)
693            logging.warning(error_msg)
694        else:
695            # Make sure the state doesn't change.
696            enabled = utils.wait_for_value(self.ccd_is_enabled, not enabled,
697                                           timeout_sec=self.SHORT_WAIT)
698            if enabled != wait_for_enable:
699                error_msg = ("CCD switched %r after briefly being %r" %
700                             ('on' if enabled else 'off', state))
701                if raise_error:
702                    raise error.TestFail(error_msg)
703                logging.info(error_msg)
704        logging.info("ccd is %r", 'on' if enabled else 'off')
705
706
707    @dts_control_command
708    def wait_for_ccd_disable(self, timeout=60, raise_error=True):
709        """Wait for the cr50 console to stop working"""
710        self.wait_for_stable_ccd_state('off', timeout, raise_error)
711
712
713    @dts_control_command
714    def wait_for_ccd_enable(self, timeout=60, raise_error=False):
715        """Wait for the cr50 console to start working"""
716        self.wait_for_stable_ccd_state('on', timeout, raise_error)
717
718
719    @dts_control_command
720    def ccd_disable(self, raise_error=True):
721        """Change the values of the CC lines to disable CCD"""
722        logging.info("disable ccd")
723        self._servo.set_dts_mode('off')
724        self.wait_for_ccd_disable(raise_error=raise_error)
725
726
727    @dts_control_command
728    def ccd_enable(self, raise_error=False):
729        """Reenable CCD and reset servo interfaces"""
730        logging.info("reenable ccd")
731        self._servo.set_dts_mode('on')
732        # If the test is actually running with ccd, wait for USB communication
733        # to come up after reset.
734        if self._servo.main_device_is_ccd():
735            time.sleep(self._servo.USB_DETECTION_DELAY)
736        self.wait_for_ccd_enable(raise_error=raise_error)
737
738
739    def _level_change_req_pp(self, level):
740        """Returns True if setting the level will require physical presence"""
741        testlab_pp = level != 'testlab open' and 'testlab' in level
742        # If the level is open and the ccd capabilities say physical presence
743        # is required, then physical presence will be required.
744        open_pp = (level == 'open' and
745                   not self.get_cap('OpenNoLongPP')[self.CAP_IS_ACCESSIBLE])
746        return testlab_pp or open_pp
747
748
749    def _state_to_bool(self, state):
750        """Converts the state string to True or False"""
751        # TODO(mruthven): compare to 'on' once servo is up to date in the lab
752        return state.lower() in self.ON_STRINGS
753
754
755    def testlab_is_on(self):
756        """Returns True of testlab mode is on"""
757        return self._state_to_bool(self._servo.get('cr50_testlab'))
758
759
760    def set_ccd_testlab(self, state):
761        """Set the testlab mode
762
763        @param state: the desired testlab mode string: 'on' or 'off'
764        @raise TestFail: if testlab mode was not changed
765        """
766        if self._servo.main_device_is_ccd():
767            raise error.TestError('Cannot set testlab mode with CCD. Use flex '
768                    'cable instead.')
769        if not self.faft_config.has_powerbutton:
770            raise error.TestError('No power button on device')
771
772        request_on = self._state_to_bool(state)
773        testlab_on = self.testlab_is_on()
774        request_str = 'on' if request_on else 'off'
775
776        if testlab_on == request_on:
777            logging.info('ccd testlab already set to %s', request_str)
778            return
779
780        original_level = self.get_ccd_level()
781
782        # We can only change the testlab mode when the device is open. If
783        # testlab mode is already enabled, we can go directly to open using 'ccd
784        # testlab open'. This will save 5 minutes, because we can skip the
785        # physical presence check.
786        if testlab_on:
787            self.send_command('ccd testlab open')
788        else:
789            self.set_ccd_level('open')
790
791        # Set testlab mode
792        rv = self.send_command_get_output('ccd testlab %s' % request_str,
793                ['ccd.*>'])[0]
794        if 'Access Denied' in rv:
795            raise error.TestFail("'ccd %s' %s" % (request_str, rv))
796
797        # Press the power button once a second for 15 seconds.
798        self.run_pp(self.PP_SHORT)
799
800        self.set_ccd_level(original_level)
801        if request_on != self.testlab_is_on():
802            raise error.TestFail('Failed to set ccd testlab to %s' % state)
803
804
805    def get_ccd_level(self):
806        """Returns the current ccd privilege level"""
807        return self._servo.get('cr50_ccd_level').lower()
808
809
810    def set_ccd_level(self, level, password=''):
811        """Set the Cr50 CCD privilege level.
812
813        @param level: a string of the ccd privilege level: 'open', 'lock', or
814                      'unlock'.
815        @param password: send the ccd command with password. This will still
816                         require the same physical presence.
817        @raise TestFail: if the level couldn't be set
818        """
819        # TODO(mruthven): add support for CCD password
820        level = level.lower()
821
822        if level == self.get_ccd_level():
823            logging.info('CCD privilege level is already %s', level)
824            return
825
826        if 'testlab' in level:
827            raise error.TestError("Can't change testlab mode using "
828                "ccd_set_level")
829
830        testlab_on = self._state_to_bool(self._servo.get('cr50_testlab'))
831        batt_is_disconnected = self.get_batt_pres_state()[1]
832        req_pp = self._level_change_req_pp(level)
833        has_pp = not self._servo.main_device_is_ccd()
834        dbg_en = 'DBG' in self._servo.get('cr50_version')
835
836        if req_pp and not has_pp:
837            raise error.TestError("Can't change privilege level to '%s' "
838                "without physical presence." % level)
839
840        if not testlab_on and not has_pp:
841            raise error.TestError("Wont change privilege level without "
842                "physical presence or testlab mode enabled")
843
844        original_timeout = float(self._servo.get('cr50_uart_timeout'))
845        # Change the console timeout to CONSERVATIVE_CCD_WAIT, running 'ccd' may
846        # take more than 3 seconds.
847        self._servo.set_nocheck('cr50_uart_timeout', self.CONSERVATIVE_CCD_WAIT)
848        # Start the unlock process.
849
850        if level == 'open' or level == 'unlock':
851            logging.info('waiting %d seconds, the minimum time between'
852                         ' ccd password attempts',
853                         self.CCD_PASSWORD_RATE_LIMIT)
854            time.sleep(self.CCD_PASSWORD_RATE_LIMIT)
855
856        try:
857            cmd = 'ccd %s%s' % (level, (' ' + password) if password else '')
858            # ccd command outputs on the rbox, ccd, and console channels,
859            # respectively. Cr50 uses these channels to print relevant ccd
860            # information.
861            # Restrict all other channels.
862            ccd_output_channels = 0x20000 | 0x8 | 0x1
863            rv = self.send_safe_command_get_output(
864                    cmd, [cmd + '(.*)>'],
865                    channel_mask=ccd_output_channels)[0][1]
866        finally:
867            self._servo.set('cr50_uart_timeout', original_timeout)
868        logging.info(rv)
869        if 'ccd_open denied: fwmp' in rv:
870            raise error.TestFail('FWMP disabled %r: %s' % (cmd, rv))
871        if 'Access Denied' in rv:
872            raise error.TestFail("%r %s" % (cmd, rv))
873        if 'Busy' in rv:
874            raise error.TestFail("cr50 is too busy to run %r: %s" % (cmd, rv))
875
876        # Press the power button once a second, if we need physical presence.
877        if req_pp and batt_is_disconnected:
878            # DBG images have shorter unlock processes
879            self.run_pp(self.PP_SHORT if dbg_en else self.PP_LONG)
880
881        if level != self.get_ccd_level():
882            raise error.TestFail('Could not set privilege level to %s' % level)
883
884        logging.info('Successfully set CCD privelege level to %s', level)
885
886
887    def run_pp(self, unlock_timeout):
888        """Press the power button a for unlock_timeout seconds.
889
890        This will press the power button many more times than it needs to be
891        pressed. Cr50 doesn't care if you press it too often. It just cares that
892        you press the power button at least once within the detect interval.
893
894        For privilege level changes you need to press the power button 5 times
895        in the short interval and then 4 times within the long interval.
896        Short Interval
897        100msec < power button press < 5 seconds
898        Long Interval
899        60s < power button press < 300s
900
901        For testlab enable/disable you must press the power button 5 times
902        spaced between 100msec and 5 seconds apart.
903        """
904        ap_on_before = self.ap_is_on()
905
906        end_time = time.time() + unlock_timeout
907
908        logging.info('Pressing power button for %ds to unlock the console.',
909                     unlock_timeout)
910        logging.info('The process should end at %s', time.ctime(end_time))
911
912        # Press the power button once a second to unlock the console.
913        while time.time() < end_time:
914            self._servo.power_short_press()
915            time.sleep(1)
916
917        # If the last power button press left the AP powered off, and it was on
918        # before, turn it back on.
919        time.sleep(self.faft_config.shutdown)
920        ap_on_after = self.ap_is_on()
921        logging.debug('During run_pp, AP %s -> %s',
922                'on' if ap_on_before else 'off',
923                'on' if ap_on_after else 'off')
924        if ap_on_before and not ap_on_after:
925            self._servo.power_short_press()
926            logging.debug('Pressing PP to turn back on')
927
928
929    def gettime(self):
930        """Get the current cr50 system time"""
931        result = self.send_safe_command_get_output('gettime', [' = (.*) s'])
932        return float(result[0][1])
933
934
935    def servo_dts_mode_is_valid(self):
936        """Returns True if cr50 registers change in servo dts mode."""
937        # This is to test that Cr50 actually recognizes the change in ccd state
938        # We cant do that with tests using ccd, because the cr50 communication
939        # goes down once ccd is enabled.
940        if not self._servo.dts_mode_is_safe():
941            return False
942
943        ccd_start = 'on' if self.ccd_is_enabled() else 'off'
944        dts_start = self._servo.get_dts_mode()
945        try:
946            # Verify both ccd enable and disable
947            self.ccd_disable(raise_error=True)
948            self.ccd_enable(raise_error=True)
949            rv = True
950        except Exception, e:
951            logging.info(e)
952            rv = False
953        self._servo.set_dts_mode(dts_start)
954        self.wait_for_stable_ccd_state(ccd_start, 60, True)
955        logging.info('Test setup does%s support servo DTS mode',
956                '' if rv else 'n\'t')
957        return rv
958
959
960    def wait_until_update_is_allowed(self):
961        """Wait until cr50 will be able to accept an update.
962
963        Cr50 rejects any attempt to update if it has been less than 60 seconds
964        since it last recovered from deep sleep or came up from reboot. This
965        will wait until cr50 gettime shows a time greater than 60.
966        """
967        if self.get_active_version_info()[2]:
968            logging.info("Running DBG image. Don't need to wait for update.")
969            return
970        cr50_time = self.gettime()
971        if cr50_time < 60:
972            sleep_time = 61 - cr50_time
973            logging.info('Cr50 has been up for %ds waiting %ds before update',
974                         cr50_time, sleep_time)
975            time.sleep(sleep_time)
976
977
978    def tpm_is_enabled(self):
979        """Query the current TPM mode.
980
981        @return:  True if TPM is enabled, False otherwise.
982        """
983        result = self.send_command_retry_get_output('sysinfo',
984                ['(?i)TPM\s+MODE:\s+(enabled|disabled)'], safe=True)[0][1]
985        logging.debug(result)
986
987        return result.lower() == 'enabled'
988
989
990    def get_keyladder_state(self):
991        """Get the status of H1 Key Ladder.
992
993        @return: The keyladder state string. prod or dev both mean enabled.
994        """
995        result = self.send_command_retry_get_output('sysinfo',
996                ['(?i)Key\s+Ladder:\s+(enabled|prod|dev|disabled)'],
997                safe=True)[0][1]
998        logging.debug(result)
999        return result
1000
1001
1002    def keyladder_is_disabled(self):
1003        """Get the status of H1 Key Ladder.
1004
1005        @return: True if H1 Key Ladder is disabled. False otherwise.
1006        """
1007        return self.get_keyladder_state() == 'disabled'
1008
1009
1010    def get_sleepmask(self):
1011        """Returns the sleepmask as an int"""
1012        rv = self.send_command_retry_get_output('sleepmask',
1013                ['sleep mask: (\S{8})\s+'], safe=True)[0][1]
1014        logging.info('sleepmask %s', rv)
1015        return int(rv, 16)
1016
1017
1018    def get_ccdstate(self):
1019        """Return a dictionary of the ccdstate once it's done debouncing"""
1020        for i in range(self.CCDSTATE_MAX_RETRY_COUNT):
1021            rv = self.send_command_retry_get_output('ccdstate',
1022                    ['ccdstate(.*)>'], safe=True)[0][0]
1023
1024            # Look for a line like 'AP: on' or 'AP: off'. 'debouncing' or
1025            # 'unknown' may appear transiently. 'debouncing' should transition
1026            # to 'on' or 'off' within 1 second, and 'unknown' should do so
1027            # within 20 seconds.
1028            if 'debouncing' not in rv and 'unknown' not in rv:
1029                break
1030            time.sleep(self.SHORT_WAIT)
1031        ccdstate = {}
1032        for line in rv.splitlines():
1033            line = line.strip()
1034            if ':' in line:
1035                k, v = line.split(':', 1)
1036                ccdstate[k.strip()] = v.strip()
1037        logging.info('Current CCD state:\n%s', pprint.pformat(ccdstate))
1038        return ccdstate
1039
1040
1041    def ap_is_on(self):
1042        """Get the power state of the AP.
1043
1044        @return: True if the AP is on; False otherwise.
1045        """
1046        ap_state = self.get_ccdstate()['AP']
1047        if ap_state == 'on':
1048            return True
1049        elif ap_state == 'off':
1050            return False
1051        else:
1052            raise error.TestFail('Read unusable AP state from ccdstate: "%s"',
1053                                 ap_state)
1054
1055
1056    def gpioget(self, signal_name):
1057        """Get the current state of the signal
1058
1059        @return an integer 1 or 0 based on the gpioget value
1060        """
1061        result = self.send_command_retry_get_output('gpioget',
1062                    ['(0|1)[ \S]*%s' % signal_name], safe=True)
1063        return int(result[0][1])
1064
1065
1066    def batt_pres_is_reset(self):
1067        """Returns True if batt pres is reset to always follow batt pres"""
1068        follow_bp, _, follow_bp_atboot, _ = self.get_batt_pres_state()
1069        return follow_bp and follow_bp_atboot
1070
1071
1072    def get_batt_pres_state(self):
1073        """Get the current and atboot battery presence state
1074
1075        The atboot setting cannot really be determined now if it is set to
1076        follow battery presence. It is likely to remain the same after reboot,
1077        but who knows. If the third element of the tuple is True, the last
1078        element will not be that useful
1079
1080        @return: a tuple of the current battery presence state
1081                 (True if current state is to follow batt presence,
1082                  True if battery is connected,
1083                  True if current state is to follow batt presence atboot,
1084                  True if battery is connected atboot)
1085        """
1086        # bpforce is added in 4.16. If the image doesn't have the command, cr50
1087        # always follows battery presence. In these images 'gpioget BATT_PRES_L'
1088        # accurately represents the battery presence state, because it can't be
1089        # overidden.
1090        if not self.has_command('bpforce'):
1091            batt_pres = not bool(self.gpioget('BATT_PRES_L'))
1092            return (True, batt_pres, True, batt_pres)
1093
1094        # The bpforce command is very similar to the wp command. It just
1095        # substitutes 'connected' for 'enabled' and 'disconnected' for
1096        # 'disabled'.
1097        rv = self.send_command_retry_get_output('bpforce',
1098                ['batt pres: (forced )?(con|dis).*at boot: (forced )?'
1099                 '(follow|discon|con)'], safe=True)[0]
1100        _, forced, connected, _, atboot = rv
1101        logging.info(rv)
1102        return (not forced, connected == 'con', atboot == 'follow',
1103                atboot == 'con')
1104
1105
1106    def set_batt_pres_state(self, state, atboot):
1107        """Override the battery presence state.
1108
1109        @param state: a string of the battery presence setting: 'connected',
1110                  'disconnected', or 'follow_batt_pres'
1111        @param atboot: True if we're overriding battery presence atboot
1112        """
1113        cmd = 'bpforce %s%s' % (state, ' atboot' if atboot else '')
1114        logging.info('running %r', cmd)
1115        self.send_command(cmd)
1116
1117
1118    def dump_nvmem(self):
1119        """Print nvmem objects."""
1120        rv = self.send_safe_command_get_output('dump_nvmem',
1121                                               ['dump_nvmem(.*)>'])[0][1]
1122        logging.info('NVMEM OUTPUT:\n%s', rv)
1123
1124
1125    def get_reset_cause(self):
1126        """Returns the reset flags for the last reset."""
1127        rv = self.send_command_retry_get_output('sysinfo',
1128                ['Reset flags:\s+0x([0-9a-f]{8})\s'], compare_output=True)[0][1]
1129        logging.info('reset cause: %s', rv)
1130        return int(rv, 16)
1131
1132
1133    def was_reset(self, reset_type):
1134        """Returns 1 if the reset type is found in the reset_cause.
1135
1136        @param reset_type: reset name in string type.
1137        """
1138        reset_cause = self.get_reset_cause()
1139        reset_flag = self.RESET_FLAGS[reset_type]
1140        return bool(reset_cause & reset_flag)
1141