• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A module to provide interface to OS services."""
5import logging
6import os
7import re
8import struct
9
10from autotest_lib.client.cros.faft.utils import shell_wrapper
11
12
13class OSInterfaceError(Exception):
14    """OS interface specific exception."""
15    pass
16
17
18class Crossystem(object):
19    """A wrapper for the crossystem utility."""
20
21    # Code dedicated for user triggering recovery mode through crossystem.
22    USER_RECOVERY_REQUEST_CODE = '193'
23
24    def __init__(self, os_if):
25        """Init the instance. If running on Mario - adjust the map."""
26        self.os_if = os_if
27
28    def __getattr__(self, name):
29        """
30        Retrieve a crosssystem attribute.
31
32        Attempt to access crossystemobject.name will invoke `crossystem name'
33        and return the stdout as the value.
34        """
35        return self.os_if.run_shell_command_get_output(
36                'crossystem %s' % name)[0]
37
38    def __setattr__(self, name, value):
39        if name in ('os_if', ):
40            self.__dict__[name] = value
41        else:
42            self.os_if.run_shell_command(
43                    'crossystem "%s=%s"' % (name, value), modifies_device=True)
44
45    def request_recovery(self):
46        """Request recovery mode next time the target reboots."""
47
48        self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE)
49
50
51class OSInterface(object):
52    """An object to encapsulate OS services functions."""
53
54    def __init__(self, state_dir=None, test_mode=False):
55        """Object initialization (side effect: creates the state_dir)
56
57        @param state_dir: the name of the directory to use for storing state.
58                            The contents of this directory persist over system
59                            restarts and power cycles.
60        @param test_mode: if true, skip (and just log) any shell call
61                          marked with modifies_device=True
62        """
63
64        # We keep the state of FAFT test in a permanent directory over reboots.
65        if state_dir is None:
66            state_dir = '/usr/local/tmp/faft'
67
68        self.state_dir = state_dir
69        self.test_mode = test_mode
70
71        self.shell = shell_wrapper.LocalShell(self)
72        self.host_shell = None
73
74        self.create_dir(self.state_dir)
75
76        self.cs = Crossystem(self)
77
78    def run_shell_command(self, cmd, block=True, modifies_device=False):
79        """Run a shell command.
80
81        @param cmd: the command to run
82        @param block: if True (default), wait for command to finish
83        @param modifies_device: If True and running in test mode, just log
84                                the command, but don't actually run it.
85                                This should be set for RPC commands that alter
86                                the OS or firmware in some persistent way.
87
88        @raise autotest_lib.client.common_lib.error.CmdError: if command fails
89        """
90        if self.test_mode and modifies_device:
91            logging.info('[SKIPPED] %s', cmd)
92        else:
93            self.shell.run_command(cmd, block=block)
94
95    def run_shell_command_check_output(self, cmd, success_token):
96        """Run shell command and check its stdout for a string."""
97        return self.shell.run_command_check_output(cmd, success_token)
98
99    def run_shell_command_get_result(self, cmd, ignore_status=False):
100        """Run shell command and get a CmdResult object as a result.
101
102        @param cmd: the command to run
103        @param ignore_status: if True, do not raise CmdError, even if rc != 0.
104        @rtype: autotest_lib.client.common_lib.utils.CmdResult
105        @raise autotest_lib.client.common_lib.error.CmdError: if command fails
106        """
107        return self.shell.run_command_get_result(cmd, ignore_status)
108
109    def run_shell_command_get_status(self, cmd):
110        """Run shell command and return its return code."""
111        return self.shell.run_command_get_status(cmd)
112
113    def run_shell_command_get_output(self, cmd, include_stderr=False):
114        """Run shell command and return its console output."""
115        return self.shell.run_command_get_output(cmd, include_stderr)
116
117    def read_file(self, path):
118        """Read the content of the file."""
119        return self.shell.read_file(path)
120
121    def write_file(self, path, data):
122        """Write the data to the file."""
123        self.shell.write_file(path, data)
124
125    def append_file(self, path, data):
126        """Append the data to the file."""
127        self.shell.append_file(path, data)
128
129    def path_exists(self, path):
130        """Return True if the path exists on DUT."""
131        cmd = 'test -e %s' % path
132        return self.run_shell_command_get_status(cmd) == 0
133
134    def is_dir(self, path):
135        """Return True if the path is a directory."""
136        cmd = 'test -d %s' % path
137        return self.run_shell_command_get_status(cmd) == 0
138
139    def create_dir(self, path):
140        """Create a new directory."""
141        cmd = 'mkdir -p %s' % path
142        return self.run_shell_command(cmd)
143
144    def create_temp_file(self, prefix):
145        """Create a temporary file with a prefix."""
146        tmp_path = '/tmp'
147        cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix)
148        return self.run_shell_command_get_output(cmd)[0]
149
150    def copy_file(self, from_path, to_path):
151        """Copy the file."""
152        cmd = 'cp -f %s %s' % (from_path, to_path)
153        return self.run_shell_command(cmd)
154
155    def copy_dir(self, from_path, to_path):
156        """Copy the directory."""
157        cmd = 'cp -rf %s %s' % (from_path, to_path)
158        return self.run_shell_command(cmd)
159
160    def remove_file(self, path):
161        """Remove the file."""
162        cmd = 'rm -f %s' % path
163        return self.run_shell_command(cmd)
164
165    def remove_dir(self, path):
166        """Remove the directory."""
167        cmd = 'rm -rf %s' % path
168        return self.run_shell_command(cmd)
169
170    def get_file_size(self, path):
171        """Get the size of the file."""
172        cmd = 'stat -c %%s %s' % path
173        return int(self.run_shell_command_get_output(cmd)[0])
174
175    def target_hosted(self):
176        """Return True if running on DUT."""
177        with open('/etc/lsb-release', 'r') as lsb_release:
178            signature = lsb_release.readlines()[0]
179        return bool(re.search(r'chrom(ium|e)os', signature, re.IGNORECASE))
180
181    def state_dir_file(self, file_name):
182        """Get a full path of a file in the state directory."""
183        return os.path.join(self.state_dir, file_name)
184
185    def is_removable_device(self, device):
186        """Check if a certain storage device is removable.
187
188        device - a string, file name of a storage device or a device partition
189                 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
190
191        Returns True if the device is removable, False if not.
192        """
193        if not self.target_hosted():
194            return False
195
196        # Drop trailing digit(s) and letter(s) (if any)
197        base_dev = self.strip_part(device.split('/')[2])
198        removable = int(self.read_file('/sys/block/%s/removable' % base_dev))
199
200        return removable == 1
201
202    def get_internal_disk(self, device):
203        """Get the internal disk by given the current disk.
204
205        If device is removable device, internal disk is decided by which kind
206        of divice (arm or x86). Otherwise, return device itself.
207
208        device - a string, file name of a storage device or a device partition
209                 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
210
211        Return internal kernel disk.
212        """
213        if self.is_removable_device(device):
214            for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'):
215                if self.path_exists(p):
216                    devicetype = '/sys/block/%s/device/type' % p.split('/')[2]
217                    if (not self.path_exists(devicetype)
218                        or self.read_file(devicetype).strip() != 'SD'):
219                        return p
220            return '/dev/sda'
221        else:
222            return self.strip_part(device)
223
224    def get_root_part(self):
225        """Return a string, the name of root device with partition number"""
226        return self.run_shell_command_get_output('rootdev -s')[0]
227
228    def get_root_dev(self):
229        """Return a string, the name of root device without partition number"""
230        return self.strip_part(self.get_root_part())
231
232    def join_part(self, dev, part):
233        """Return a concatenated string of device and partition number"""
234        if dev.endswith(tuple(str(i) for i in range(0, 10))):
235            return dev + 'p' + part
236        else:
237            return dev + part
238
239    def strip_part(self, dev_with_part):
240        """Return a stripped string without partition number"""
241        dev_name_stripper = re.compile('p?[0-9]+$')
242        return dev_name_stripper.sub('', dev_with_part)
243
244    def retrieve_body_version(self, blob):
245        """Given a blob, retrieve body version.
246
247        Currently works for both, firmware and kernel blobs. Returns '-1' in
248        case the version can not be retrieved reliably.
249        """
250        header_format = '<8s8sQ'
251        preamble_format = '<40sQ'
252        magic, _, kb_size = struct.unpack_from(header_format, blob)
253
254        if magic != b'CHROMEOS':
255            logging.error(f"Incorrect magic string {magic}")
256            return -1  # This could be a corrupted version case.
257
258        _, version = struct.unpack_from(preamble_format, blob, kb_size)
259        return version
260
261    def retrieve_datakey_version(self, blob):
262        """Given a blob, retrieve firmware data key version.
263
264        Currently works for both, firmware and kernel blobs. Returns '-1' in
265        case the version can not be retrieved reliably.
266        """
267        header_format = '<8s96sQ'
268        magic, _, version = struct.unpack_from(header_format, blob)
269        if magic != b'CHROMEOS':
270            logging.error(f"Incorrect magic string {magic}")
271            return -1  # This could be a corrupted version case.
272        return version
273
274    def retrieve_kernel_subkey_version(self, blob):
275        """Given a blob, retrieve kernel subkey version.
276
277        It is in firmware vblock's preamble.
278        """
279
280        header_format = '<8s8sQ'
281        preamble_format = '<72sQ'
282        magic, _, kb_size = struct.unpack_from(header_format, blob)
283
284        if magic != b'CHROMEOS':
285            logging.error(f"Incorrect magic string {magic}")
286            return -1
287
288        _, version = struct.unpack_from(preamble_format, blob, kb_size)
289        return version
290
291    def retrieve_preamble_flags(self, blob):
292        """Given a blob, retrieve preamble flags if available.
293
294        It only works for firmware. If the version of preamble header is less
295        than 2.1, no preamble flags supported, just returns 0.
296        """
297        header_format = '<8s8sQ'
298        preamble_format = '<32sII64sI'
299        magic, _, kb_size = struct.unpack_from(header_format, blob)
300
301        if magic != b'CHROMEOS':
302            logging.error(f"Incorrect magic string {magic}")
303            return -1  # This could be a corrupted version case.
304
305        _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob,
306                                                      kb_size)
307
308        if ver > 2 or (ver == 2 and subver >= 1):
309            return flags
310        else:
311            return 0  # Returns 0 if preamble flags not available.
312
313    def read_partition(self, partition, size):
314        """Read the requested partition, up to size bytes."""
315        tmp_file = self.state_dir_file('part.tmp')
316        self.run_shell_command(
317                'dd if=%s of=%s bs=1 count=%d' % (partition, tmp_file, size))
318        data = self.read_file(tmp_file)
319        self.remove_file(tmp_file)
320        return data
321