• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7
8from autotest_lib.client.common_lib import error
9from autotest_lib.server.cros import vboot_constants as vboot
10
11
12class FAFTCheckers(object):
13    """Class that contains FAFT checkers."""
14    version = 1
15
16    def __init__(self, faft_framework):
17        self.faft_framework = faft_framework
18        self.faft_client = faft_framework.faft_client
19        self.faft_config = faft_framework.faft_config
20        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
21
22    def _parse_crossystem_output(self, lines):
23        """Parse the crossystem output into a dict.
24
25        @param lines: The list of crossystem output strings.
26        @return: A dict which contains the crossystem keys/values.
27        @raise TestError: If wrong format in crossystem output.
28
29        >>> seq = FAFTSequence()
30        >>> seq._parse_crossystem_output([ \
31                "arch          = x86    # Platform architecture", \
32                "cros_debug    = 1      # OS should allow debug", \
33            ])
34        {'cros_debug': '1', 'arch': 'x86'}
35        >>> seq._parse_crossystem_output([ \
36                "arch=x86", \
37            ])
38        Traceback (most recent call last):
39            ...
40        TestError: Failed to parse crossystem output: arch=x86
41        >>> seq._parse_crossystem_output([ \
42                "arch          = x86    # Platform architecture", \
43                "arch          = arm    # Platform architecture", \
44            ])
45        Traceback (most recent call last):
46            ...
47        TestError: Duplicated crossystem key: arch
48        """
49        pattern = "^([^ =]*) *= *(.*[^ ]) *# [^#]*$"
50        parsed_list = {}
51        for line in lines:
52            matched = re.match(pattern, line.strip())
53            if not matched:
54                raise error.TestError("Failed to parse crossystem output: %s"
55                                      % line)
56            (name, value) = (matched.group(1), matched.group(2))
57            if name in parsed_list:
58                raise error.TestError("Duplicated crossystem key: %s" % name)
59            parsed_list[name] = value
60        return parsed_list
61
62    def crossystem_checker(self, expected_dict, suppress_logging=False):
63        """Check the crossystem values matched.
64
65        Given an expect_dict which describes the expected crossystem values,
66        this function check the current crossystem values are matched or not.
67
68        @param expected_dict: A dict which contains the expected values.
69        @param suppress_logging: True to suppress any logging messages.
70        @return: True if the crossystem value matched; otherwise, False.
71        """
72        succeed = True
73        lines = self.faft_client.system.run_shell_command_get_output(
74                'crossystem')
75        got_dict = self._parse_crossystem_output(lines)
76        for key in expected_dict:
77            if key not in got_dict:
78                logging.warn('Expected key %r not in crossystem result', key)
79                succeed = False
80                continue
81            if isinstance(expected_dict[key], str):
82                if got_dict[key] != expected_dict[key]:
83                    message = ('Expected %r value %r but got %r' % (
84                               key, expected_dict[key], got_dict[key]))
85                    succeed = False
86                else:
87                    message = ('Expected %r value %r == real value %r' % (
88                               key, expected_dict[key], got_dict[key]))
89
90            elif isinstance(expected_dict[key], tuple):
91                # Expected value is a tuple of possible actual values.
92                if got_dict[key] not in expected_dict[key]:
93                    message = ('Expected %r values %r but got %r' % (
94                               key, expected_dict[key], got_dict[key]))
95                    succeed = False
96                else:
97                    message = ('Expected %r values %r == real value %r' % (
98                               key, expected_dict[key], got_dict[key]))
99            else:
100                logging.warn('The expected value of %r is neither a str nor a '
101                             'dict: %r', key, expected_dict[key])
102                succeed = False
103                continue
104            if not suppress_logging:
105                logging.info(message)
106        return succeed
107
108    def mode_checker(self, mode):
109        """Check the current system in the given mode.
110
111        @param mode: A string of mode, one of 'normal', 'dev', or 'rec'.
112        @return: True if the system in the given mode; otherwise, False.
113        """
114        if mode == 'normal':
115            return self.crossystem_checker(
116                    {'devsw_boot': '0',
117                     'mainfw_type': 'normal'},
118                    suppress_logging=True)
119        elif mode == 'dev':
120            return self.crossystem_checker(
121                    {'devsw_boot': '1',
122                     'mainfw_type': 'developer'},
123                    suppress_logging=True)
124        elif mode == 'rec':
125            return self.crossystem_checker(
126                    {'mainfw_type': 'recovery'},
127                    suppress_logging=True)
128        else:
129            raise NotImplementedError('The given mode %s not supported' % mode)
130
131    def fw_tries_checker(self,
132                         expected_mainfw_act,
133                         expected_fw_tried=True,
134                         expected_try_count=0):
135        """Check the current FW booted and try_count
136
137        Mainly for dealing with the vboot1-specific flags fwb_tries and
138        tried_fwb fields in crossystem.  In vboot2, fwb_tries is meaningless and
139        is ignored while tried_fwb is translated into fw_try_count.
140
141        @param expected_mainfw_act: A string of expected firmware, 'A', 'B', or
142                       None if don't care.
143        @param expected_fw_tried: True if tried expected FW at last boot.
144                       This means that mainfw_act=A,tried_fwb=0 or
145                       mainfw_act=B,tried_fwb=1. Set to False if want to
146                       check the opposite case for the mainfw_act.  This
147                       check is only performed in vboot1 as tried_fwb is
148                       never set in vboot2.
149        @param expected_try_count: Number of times to try a FW slot.
150
151        @return: True if the correct boot firmware fields matched.  Otherwise,
152                       False.
153        """
154        crossystem_dict = {'mainfw_act': expected_mainfw_act.upper()}
155
156        if not self.fw_vboot2:
157            if expected_mainfw_act == 'B':
158                tried_fwb_val = True
159            else:
160                tried_fwb_val = False
161            if not expected_fw_tried:
162                tried_fwb_val = not tried_fwb_val
163            crossystem_dict['tried_fwb'] = '1' if tried_fwb_val else '0'
164
165            crossystem_dict['fwb_tries'] = str(expected_try_count)
166        else:
167            crossystem_dict['fw_try_count'] = str(expected_try_count)
168        return self.crossystem_checker(crossystem_dict)
169
170    def vdat_flags_checker(self, mask, value):
171        """Check the flags from VbSharedData matched.
172
173        This function checks the masked flags from VbSharedData using crossystem
174        are matched the given value.
175
176        @param mask: A bitmask of flags to be matched.
177        @param value: An expected value.
178        @return: True if the flags matched; otherwise, False.
179        """
180        lines = self.faft_client.system.run_shell_command_get_output(
181                    'crossystem vdat_flags')
182        vdat_flags = int(lines[0], 16)
183        if vdat_flags & mask != value:
184            logging.info("Expected vdat_flags 0x%x mask 0x%x but got 0x%x",
185                         value, mask, vdat_flags)
186            return False
187        return True
188
189    def ro_normal_checker(self, expected_fw=None, twostop=False):
190        """Check the current boot uses RO boot.
191
192        @param expected_fw: A string of expected firmware, 'A', 'B', or
193                            None if don't care.
194        @param twostop: True to expect a TwoStop boot; False to expect a RO
195                        boot.
196        @return: True if the currect boot firmware matched and used RO boot;
197                 otherwise, False.
198        """
199        crossystem_dict = {'tried_fwb': '0'}
200        if expected_fw:
201            crossystem_dict['mainfw_act'] = expected_fw.upper()
202        succeed = True
203        if not self.vdat_flags_checker(vboot.VDAT_FLAG_LF_USE_RO_NORMAL,
204                0 if twostop else vboot.VDAT_FLAG_LF_USE_RO_NORMAL):
205            succeed = False
206        if not self.crossystem_checker(crossystem_dict):
207            succeed = False
208        if self.faft_framework.check_ec_capability(suppress_warning=True):
209            expected_ec = ('RW' if twostop else 'RO')
210            if not self.ec_act_copy_checker(expected_ec):
211                succeed = False
212        return succeed
213
214    def dev_boot_usb_checker(self, dev_boot_usb=True, kernel_key_hash=False):
215        """Check the current boot is from a developer USB (Ctrl-U trigger).
216
217        @param dev_boot_usb: True to expect an USB boot;
218                             False to expect an internal device boot.
219        @param kernel_key_hash: True to expect an USB boot with kernkey_vfy
220                                value as 'hash';
221                                False to expect kernkey_vfy value as 'sig'.
222        @return: True if the current boot device matched; otherwise, False.
223        """
224        assert (dev_boot_usb or not kernel_key_hash), ("Invalid condition "
225            "dev_boot_usb_checker(%s, %s). kernel_key_hash should not be "
226            "True in internal disk boot.") % (dev_boot_usb, kernel_key_hash)
227        # kernkey_vfy value will be 'sig', when device booted in internal
228        # disk or booted in USB image signed with SSD key(Ctrl-U trigger).
229        expected_kernkey_vfy = 'sig'
230        if kernel_key_hash:
231            expected_kernkey_vfy = 'hash'
232        return (self.crossystem_checker({'mainfw_type': 'developer',
233                                         'kernkey_vfy':
234                                             expected_kernkey_vfy}) and
235                self.faft_client.system.is_removable_device_boot() ==
236                dev_boot_usb)
237
238    def root_part_checker(self, expected_part):
239        """Check the partition number of the root device matched.
240
241        @param expected_part: A string containing the number of the expected
242                              root partition.
243        @return: True if the currect root  partition number matched;
244                 otherwise, False.
245        """
246        part = self.faft_client.system.get_root_part()[-1]
247        if self.faft_framework.ROOTFS_MAP[expected_part] != part:
248            logging.info("Expected root part %s but got %s",
249                         self.faft_framework.ROOTFS_MAP[expected_part], part)
250            return False
251        return True
252
253    def ec_act_copy_checker(self, expected_copy):
254        """Check the EC running firmware copy matches.
255
256        @param expected_copy: A string containing 'RO', 'A', or 'B' indicating
257                              the expected copy of EC running firmware.
258        @return: True if the current EC running copy matches; otherwise, False.
259        """
260        if self.faft_client.system.has_host():
261            cmd = 'fwtool ec version'
262        else:
263            cmd = 'ectool version'
264        lines = self.faft_client.system.run_shell_command_get_output(cmd)
265        pattern = re.compile("Firmware copy: (.*)")
266        for line in lines:
267            matched = pattern.match(line)
268            if matched:
269                if matched.group(1) == expected_copy:
270                    return True
271                else:
272                    logging.info("Expected EC in %s but now in %s",
273                                 expected_copy, matched.group(1))
274                    return False
275        logging.info("Wrong output format of '%s':\n%s", cmd, '\n'.join(lines))
276        return False
277