• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""A module to provide interface to OS services."""
5
6import datetime
7import os
8import re
9import struct
10
11import shell_wrapper
12
13
14class OSInterfaceError(Exception):
15    """OS interface specific exception."""
16    pass
17
18
19class Crossystem(object):
20    """A wrapper for the crossystem utility."""
21
22    # Code dedicated for user triggering recovery mode through crossystem.
23    USER_RECOVERY_REQUEST_CODE = '193'
24
25    def __init__(self, os_if):
26        """Init the instance. If running on Mario - adjust the map."""
27        self.os_if = os_if
28
29    def __getattr__(self, name):
30        """
31        Retrieve a crosssystem attribute.
32
33        Attempt to access crossystemobject.name will invoke `crossystem name'
34        and return the stdout as the value.
35        """
36        return self.os_if.run_shell_command_get_output(
37                'crossystem %s' % name)[0]
38
39    def __setattr__(self, name, value):
40        if name in ('os_if', ):
41            self.__dict__[name] = value
42        else:
43            self.os_if.run_shell_command(
44                    'crossystem "%s=%s"' % (name, value), modifies_device=True)
45
46    def request_recovery(self):
47        """Request recovery mode next time the target reboots."""
48
49        self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE)
50
51
52class OSInterface(object):
53    """An object to encapsulate OS services functions."""
54
55    def __init__(self, state_dir=None, log_file=None, test_mode=False):
56        """Object initialization (side effect: creates the state_dir)
57
58        @param state_dir: the name of the directory to use for storing state.
59                            The contents of this directory persist over system
60                            restarts and power cycles.
61        @param log_file: the name of the log file kept in the state directory.
62        @param test_mode: if true, skip (and just log) any shell call
63                          marked with modifies_device=True
64        """
65
66        # We keep the state of FAFT test in a permanent directory over reboots.
67        if state_dir is None:
68            state_dir = '/usr/local/tmp/faft'
69
70        if log_file is None:
71            log_file = 'faft_client.log'
72
73        if not os.path.isabs(log_file):
74            log_file = os.path.join(state_dir, log_file)
75
76        self.state_dir = state_dir
77        self.log_file = log_file
78        self.test_mode = test_mode
79
80        self.shell = shell_wrapper.LocalShell(self)
81        self.host_shell = None
82
83        self.create_dir(self.state_dir)
84
85        self.cs = Crossystem(self)
86
87    def run_shell_command(self, cmd, block=True, modifies_device=False):
88        """Run a shell command.
89
90        @param cmd: the command to run
91        @param block: if True (default), wait for command to finish
92        @param modifies_device: If True and running in test mode, just log
93                                the command, but don't actually run it.
94                                This should be set for RPC commands that alter
95                                the OS or firmware in some persistent way.
96
97        @raise autotest_lib.client.common_lib.error.CmdError: if command fails
98        """
99        if self.test_mode and modifies_device:
100            self.log('[SKIPPED] %s' % cmd)
101        else:
102            self.shell.run_command(cmd, block=block)
103
104    def run_shell_command_check_output(self, cmd, success_token):
105        """Run shell command and check its stdout for a string."""
106        return self.shell.run_command_check_output(cmd, success_token)
107
108    def run_shell_command_get_result(self, cmd, ignore_status=False):
109        """Run shell command and get a CmdResult object as a result.
110
111        @param cmd: the command to run
112        @param ignore_status: if True, do not raise CmdError, even if rc != 0.
113        @rtype: autotest_lib.client.common_lib.utils.CmdResult
114        @raise autotest_lib.client.common_lib.error.CmdError: if command fails
115        """
116        return self.shell.run_command_get_result(cmd, ignore_status)
117
118    def run_shell_command_get_status(self, cmd):
119        """Run shell command and return its return code."""
120        return self.shell.run_command_get_status(cmd)
121
122    def run_shell_command_get_output(self, cmd, include_stderr=False):
123        """Run shell command and return its console output."""
124        return self.shell.run_command_get_output(cmd, include_stderr)
125
126    def read_file(self, path):
127        """Read the content of the file."""
128        return self.shell.read_file(path)
129
130    def write_file(self, path, data):
131        """Write the data to the file."""
132        self.shell.write_file(path, data)
133
134    def append_file(self, path, data):
135        """Append the data to the file."""
136        self.shell.append_file(path, data)
137
138    def path_exists(self, path):
139        """Return True if the path exists on DUT."""
140        cmd = 'test -e %s' % path
141        return self.run_shell_command_get_status(cmd) == 0
142
143    def is_dir(self, path):
144        """Return True if the path is a directory."""
145        cmd = 'test -d %s' % path
146        return self.run_shell_command_get_status(cmd) == 0
147
148    def create_dir(self, path):
149        """Create a new directory."""
150        cmd = 'mkdir -p %s' % path
151        return self.run_shell_command(cmd)
152
153    def create_temp_file(self, prefix):
154        """Create a temporary file with a prefix."""
155        tmp_path = '/tmp'
156        cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix)
157        return self.run_shell_command_get_output(cmd)[0]
158
159    def copy_file(self, from_path, to_path):
160        """Copy the file."""
161        cmd = 'cp -f %s %s' % (from_path, to_path)
162        return self.run_shell_command(cmd)
163
164    def copy_dir(self, from_path, to_path):
165        """Copy the directory."""
166        cmd = 'cp -rf %s %s' % (from_path, to_path)
167        return self.run_shell_command(cmd)
168
169    def remove_file(self, path):
170        """Remove the file."""
171        cmd = 'rm -f %s' % path
172        return self.run_shell_command(cmd)
173
174    def remove_dir(self, path):
175        """Remove the directory."""
176        cmd = 'rm -rf %s' % path
177        return self.run_shell_command(cmd)
178
179    def get_file_size(self, path):
180        """Get the size of the file."""
181        cmd = 'stat -c %%s %s' % path
182        return int(self.run_shell_command_get_output(cmd)[0])
183
184    def target_hosted(self):
185        """Return True if running on DUT."""
186        with open('/etc/lsb-release', 'r') as lsb_release:
187            signature = lsb_release.readlines()[0]
188        return bool(re.search(r'chrom(ium|e)os', signature, re.IGNORECASE))
189
190    def state_dir_file(self, file_name):
191        """Get a full path of a file in the state directory."""
192        return os.path.join(self.state_dir, file_name)
193
194    def log(self, text):
195        """Write text to the log file and print it on the screen, if enabled.
196
197        The entire log (maintained across reboots) can be found in
198        self.log_file.
199        """
200        if not self.log_file or not os.path.exists(self.state_dir):
201            # Called before environment was initialized, ignore.
202            return
203
204        timestamp = datetime.datetime.strftime(datetime.datetime.now(),
205                                               '%I:%M:%S %p:')
206
207        with open(self.log_file, 'a') as log_f:
208            log_f.write('%s %s\n' % (timestamp, text))
209            log_f.flush()
210            os.fdatasync(log_f.fileno())
211
212    def is_removable_device(self, device):
213        """Check if a certain storage device is removable.
214
215        device - a string, file name of a storage device or a device partition
216                 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
217
218        Returns True if the device is removable, False if not.
219        """
220        if not self.target_hosted():
221            return False
222
223        # Drop trailing digit(s) and letter(s) (if any)
224        base_dev = self.strip_part(device.split('/')[2])
225        removable = int(self.read_file('/sys/block/%s/removable' % base_dev))
226
227        return removable == 1
228
229    def get_internal_disk(self, device):
230        """Get the internal disk by given the current disk.
231
232        If device is removable device, internal disk is decided by which kind
233        of divice (arm or x86). Otherwise, return device itself.
234
235        device - a string, file name of a storage device or a device partition
236                 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]).
237
238        Return internal kernel disk.
239        """
240        if self.is_removable_device(device):
241            for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'):
242                if self.path_exists(p):
243                    return p
244            return '/dev/sda'
245        else:
246            return self.strip_part(device)
247
248    def get_root_part(self):
249        """Return a string, the name of root device with partition number"""
250        return self.run_shell_command_get_output('rootdev -s')[0]
251
252    def get_root_dev(self):
253        """Return a string, the name of root device without partition number"""
254        return self.strip_part(self.get_root_part())
255
256    def join_part(self, dev, part):
257        """Return a concatenated string of device and partition number"""
258        if dev.endswith(tuple(str(i) for i in range(0, 10))):
259            return dev + 'p' + part
260        else:
261            return dev + part
262
263    def strip_part(self, dev_with_part):
264        """Return a stripped string without partition number"""
265        dev_name_stripper = re.compile('p?[0-9]+$')
266        return dev_name_stripper.sub('', dev_with_part)
267
268    def retrieve_body_version(self, blob):
269        """Given a blob, retrieve body version.
270
271        Currently works for both, firmware and kernel blobs. Returns '-1' in
272        case the version can not be retrieved reliably.
273        """
274        header_format = '<8s8sQ'
275        preamble_format = '<40sQ'
276        magic, _, kb_size = struct.unpack_from(header_format, blob)
277
278        if magic != 'CHROMEOS':
279            return -1  # This could be a corrupted version case.
280
281        _, version = struct.unpack_from(preamble_format, blob, kb_size)
282        return version
283
284    def retrieve_datakey_version(self, blob):
285        """Given a blob, retrieve firmware data key version.
286
287        Currently works for both, firmware and kernel blobs. Returns '-1' in
288        case the version can not be retrieved reliably.
289        """
290        header_format = '<8s96sQ'
291        magic, _, version = struct.unpack_from(header_format, blob)
292        if magic != 'CHROMEOS':
293            return -1  # This could be a corrupted version case.
294        return version
295
296    def retrieve_kernel_subkey_version(self, blob):
297        """Given a blob, retrieve kernel subkey version.
298
299        It is in firmware vblock's preamble.
300        """
301
302        header_format = '<8s8sQ'
303        preamble_format = '<72sQ'
304        magic, _, kb_size = struct.unpack_from(header_format, blob)
305
306        if magic != 'CHROMEOS':
307            return -1
308
309        _, version = struct.unpack_from(preamble_format, blob, kb_size)
310        return version
311
312    def retrieve_preamble_flags(self, blob):
313        """Given a blob, retrieve preamble flags if available.
314
315        It only works for firmware. If the version of preamble header is less
316        than 2.1, no preamble flags supported, just returns 0.
317        """
318        header_format = '<8s8sQ'
319        preamble_format = '<32sII64sI'
320        magic, _, kb_size = struct.unpack_from(header_format, blob)
321
322        if magic != 'CHROMEOS':
323            return -1  # This could be a corrupted version case.
324
325        _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob,
326                                                      kb_size)
327
328        if ver > 2 or (ver == 2 and subver >= 1):
329            return flags
330        else:
331            return 0  # Returns 0 if preamble flags not available.
332
333    def read_partition(self, partition, size):
334        """Read the requested partition, up to size bytes."""
335        tmp_file = self.state_dir_file('part.tmp')
336        self.run_shell_command(
337                'dd if=%s of=%s bs=1 count=%d' % (partition, tmp_file, size))
338        data = self.read_file(tmp_file)
339        self.remove_file(tmp_file)
340        return data
341