• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import re
6import logging
7
8from autotest_lib.client.common_lib import error
9
10
11class FAFTCheckers(object):
12    """Class that contains FAFT checkers."""
13    version = 1
14
15    def __init__(self, faft_framework):
16        self.faft_framework = faft_framework
17        self.faft_client = faft_framework.faft_client
18        self.faft_config = faft_framework.faft_config
19        self.fw_vboot2 = self.faft_client.system.get_fw_vboot2()
20
21    def _parse_crossystem_output(self, lines):
22        """Parse the crossystem output into a dict.
23
24        @param lines: The list of crossystem output strings.
25        @return: A dict which contains the crossystem keys/values.
26        @raise TestError: If wrong format in crossystem output.
27
28        >>> seq = FAFTSequence()
29        >>> seq._parse_crossystem_output([ \
30                "arch          = x86    # Platform architecture", \
31                "cros_debug    = 1      # OS should allow debug", \
32            ])
33        {'cros_debug': '1', 'arch': 'x86'}
34        >>> seq._parse_crossystem_output([ \
35                "arch=x86", \
36            ])
37        Traceback (most recent call last):
38            ...
39        TestError: Failed to parse crossystem output: arch=x86
40        >>> seq._parse_crossystem_output([ \
41                "arch          = x86    # Platform architecture", \
42                "arch          = arm    # Platform architecture", \
43            ])
44        Traceback (most recent call last):
45            ...
46        TestError: Duplicated crossystem key: arch
47        """
48        pattern = "^([^ =]*) *= *(.*[^ ]) *# [^#]*$"
49        parsed_list = {}
50        for line in lines:
51            matched = re.match(pattern, line.strip())
52            if not matched:
53                raise error.TestError("Failed to parse crossystem output: %s"
54                                      % line)
55            (name, value) = (matched.group(1), matched.group(2))
56            if name in parsed_list:
57                raise error.TestError("Duplicated crossystem key: %s" % name)
58            parsed_list[name] = value
59        return parsed_list
60
61    def crossystem_checker(self, expected_dict, suppress_logging=False):
62        """Check the crossystem values matched.
63
64        Given an expect_dict which describes the expected crossystem values,
65        this function check the current crossystem values are matched or not.
66
67        @param expected_dict: A dict which contains the expected values.
68        @param suppress_logging: True to suppress any logging messages.
69        @return: True if the crossystem value matched; otherwise, False.
70        """
71        succeed = True
72        lines = self.faft_client.system.run_shell_command_get_output(
73                'crossystem')
74        got_dict = self._parse_crossystem_output(lines)
75        for key in expected_dict:
76            if key not in got_dict:
77                logging.warn('Expected key %r not in crossystem result', key)
78                succeed = False
79                continue
80            if isinstance(expected_dict[key], str):
81                if got_dict[key] != expected_dict[key]:
82                    message = ('Expected %r value %r but got %r' % (
83                               key, expected_dict[key], got_dict[key]))
84                    succeed = False
85                else:
86                    message = ('Expected %r value %r == real value %r' % (
87                               key, expected_dict[key], got_dict[key]))
88
89            elif isinstance(expected_dict[key], tuple):
90                # Expected value is a tuple of possible actual values.
91                if got_dict[key] not in expected_dict[key]:
92                    message = ('Expected %r values %r but got %r' % (
93                               key, expected_dict[key], got_dict[key]))
94                    succeed = False
95                else:
96                    message = ('Expected %r values %r == real value %r' % (
97                               key, expected_dict[key], got_dict[key]))
98            else:
99                logging.warn('The expected value of %r is neither a str nor a '
100                             'dict: %r', key, expected_dict[key])
101                succeed = False
102                continue
103            if not suppress_logging:
104                logging.info(message)
105        return succeed
106
107    def mode_checker(self, mode):
108        """Check the current system in the given mode.
109
110        @param mode: A string of mode, one of 'normal', 'dev', or 'rec'.
111        @return: True if the system in the given mode; otherwise, False.
112        """
113        if mode == 'normal':
114            return self.crossystem_checker(
115                    {'devsw_boot': '0',
116                     'mainfw_type': 'normal'},
117                    suppress_logging=True)
118        elif mode == 'dev':
119            return self.crossystem_checker(
120                    {'devsw_boot': '1',
121                     'mainfw_type': 'developer'},
122                    suppress_logging=True)
123        elif mode == 'rec':
124            return self.crossystem_checker(
125                    {'mainfw_type': 'recovery'},
126                    suppress_logging=True)
127        else:
128            raise NotImplementedError('The given mode %s not supported' % mode)
129
130    def fw_tries_checker(self,
131                         expected_mainfw_act,
132                         expected_fw_tried=True,
133                         expected_try_count=0):
134        """Check the current FW booted and try_count
135
136        Mainly for dealing with the vboot1-specific flags fwb_tries and
137        tried_fwb fields in crossystem.  In vboot2, fwb_tries is meaningless and
138        is ignored while tried_fwb is translated into fw_try_count.
139
140        @param expected_mainfw_act: A string of expected firmware, 'A', 'B', or
141                       None if don't care.
142        @param expected_fw_tried: True if tried expected FW at last boot.
143                       This means that mainfw_act=A,tried_fwb=0 or
144                       mainfw_act=B,tried_fwb=1. Set to False if want to
145                       check the opposite case for the mainfw_act.  This
146                       check is only performed in vboot1 as tried_fwb is
147                       never set in vboot2.
148        @param expected_try_count: Number of times to try a FW slot.
149
150        @return: True if the correct boot firmware fields matched.  Otherwise,
151                       False.
152        """
153        crossystem_dict = {'mainfw_act': expected_mainfw_act.upper()}
154
155        if not self.fw_vboot2:
156            if expected_mainfw_act == 'B':
157                tried_fwb_val = True
158            else:
159                tried_fwb_val = False
160            if not expected_fw_tried:
161                tried_fwb_val = not tried_fwb_val
162            crossystem_dict['tried_fwb'] = '1' if tried_fwb_val else '0'
163
164            crossystem_dict['fwb_tries'] = str(expected_try_count)
165        else:
166            crossystem_dict['fw_try_count'] = str(expected_try_count)
167        return self.crossystem_checker(crossystem_dict)
168
169    def vdat_flags_checker(self, mask, value):
170        """Check the flags from VbSharedData matched.
171
172        This function checks the masked flags from VbSharedData using crossystem
173        are matched the given value.
174
175        @param mask: A bitmask of flags to be matched.
176        @param value: An expected value.
177        @return: True if the flags matched; otherwise, False.
178        """
179        lines = self.faft_client.system.run_shell_command_get_output(
180                    'crossystem vdat_flags')
181        vdat_flags = int(lines[0], 16)
182        if vdat_flags & mask != value:
183            logging.info("Expected vdat_flags 0x%x mask 0x%x but got 0x%x",
184                         value, mask, vdat_flags)
185            return False
186        return True
187
188    def dev_boot_usb_checker(self, dev_boot_usb=True, kernel_key_hash=False):
189        """Check the current boot is from a developer USB (Ctrl-U trigger).
190
191        @param dev_boot_usb: True to expect an USB boot;
192                             False to expect an internal device boot.
193        @param kernel_key_hash: True to expect an USB boot with kernkey_vfy
194                                value as 'hash';
195                                False to expect kernkey_vfy value as 'sig'.
196        @return: True if the current boot device matched; otherwise, False.
197        """
198        assert (dev_boot_usb or not kernel_key_hash), ("Invalid condition "
199            "dev_boot_usb_checker(%s, %s). kernel_key_hash should not be "
200            "True in internal disk boot.") % (dev_boot_usb, kernel_key_hash)
201        # kernkey_vfy value will be 'sig', when device booted in internal
202        # disk or booted in USB image signed with SSD key(Ctrl-U trigger).
203        expected_kernkey_vfy = 'sig'
204        if kernel_key_hash:
205            expected_kernkey_vfy = 'hash'
206        return (self.crossystem_checker({'mainfw_type': 'developer',
207                                         'kernkey_vfy':
208                                             expected_kernkey_vfy}) and
209                self.faft_client.system.is_removable_device_boot() ==
210                dev_boot_usb)
211
212    def root_part_checker(self, expected_part):
213        """Check the partition number of the root device matched.
214
215        @param expected_part: A string containing the number of the expected
216                              root partition.
217        @return: True if the currect root  partition number matched;
218                 otherwise, False.
219        """
220        part = self.faft_client.system.get_root_part()[-1]
221        if self.faft_framework.ROOTFS_MAP[expected_part] != part:
222            logging.info("Expected root part %s but got %s",
223                         self.faft_framework.ROOTFS_MAP[expected_part], part)
224            return False
225        return True
226
227    def ec_act_copy_checker(self, expected_copy):
228        """Check the EC running firmware copy matches.
229
230        @param expected_copy: A string containing 'RO', 'A', or 'B' indicating
231                              the expected copy of EC running firmware.
232        @return: True if the current EC running copy matches; otherwise, False.
233        """
234        cmd = 'ectool version'
235        lines = self.faft_client.system.run_shell_command_get_output(cmd)
236        pattern = re.compile("Firmware copy: (.*)")
237        for line in lines:
238            matched = pattern.match(line)
239            if matched:
240                if matched.group(1) == expected_copy:
241                    return True
242                else:
243                    logging.info("Expected EC in %s but now in %s",
244                                 expected_copy, matched.group(1))
245                    return False
246        logging.info("Wrong output format of '%s':\n%s", cmd, '\n'.join(lines))
247        return False
248