# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """A module to provide interface to OS services.""" import datetime import os import re import struct import shell_wrapper class OSInterfaceError(Exception): """OS interface specific exception.""" pass class Crossystem(object): """A wrapper for the crossystem utility.""" # Code dedicated for user triggering recovery mode through crossystem. USER_RECOVERY_REQUEST_CODE = '193' def init(self, os_if): """Init the instance. If running on Mario - adjust the map.""" self.os_if = os_if def __getattr__(self, name): """ Retrieve a crosssystem attribute. Attempt to access crossystemobject.name will invoke `crossystem name' and return the stdout as the value. """ return self.os_if.run_shell_command_get_output( 'crossystem %s' % name)[0] def __setattr__(self, name, value): if name in ('os_if',): self.__dict__[name] = value else: self.os_if.run_shell_command('crossystem "%s=%s"' % (name, value)) def request_recovery(self): """Request recovery mode next time the target reboots.""" self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE) class OSInterface(object): """An object to encapsulate OS services functions.""" ANDROID_TESTER_FILE = '/mnt/stateful_partition/.android_faft_tester' def __init__(self): """Object construction time initialization.""" self.state_dir = None self.log_file = None self.cs = Crossystem() self.is_android = os.path.isfile(self.ANDROID_TESTER_FILE) if self.is_android: self.shell = shell_wrapper.AdbShell() self.host_shell = shell_wrapper.LocalShell() else: self.shell = shell_wrapper.LocalShell() self.host_shell = None def init(self, state_dir=None, log_file=None): """Initialize the OS interface object. Args: state_dir - a string, the name of the directory (as defined by the caller). The contents of this directory persist over system restarts and power cycles. log_file - a string, the name of the log file kept in the state directory. Default argument values support unit testing. """ self.cs.init(self) self.state_dir = state_dir if self.state_dir: if not os.path.exists(self.state_dir): try: os.mkdir(self.state_dir) except OSError, err: raise OSInterfaceError(err) if log_file: if log_file[0] == '/': self.log_file = log_file else: self.log_file = os.path.join(state_dir, log_file) # Initialize the shell. Should be after creating the log file. self.shell.init(self) if self.host_shell: self.host_shell.init(self) def has_host(self): """Return True if a host is connected to DUT.""" return self.is_android def run_shell_command(self, cmd): """Run a shell command.""" self.shell.run_command(cmd) def run_shell_command_get_status(self, cmd): """Run shell command and return its return code.""" return self.shell.run_command_get_status(cmd) def run_shell_command_get_output(self, cmd): """Run shell command and return its console output.""" return self.shell.run_command_get_output(cmd) def run_host_shell_command(self, cmd, block=True): """Run a shell command on the host.""" if self.host_shell: self.host_shell.run_command(cmd, block) else: raise OSInterfaceError('There is no host for DUT.') def run_host_shell_command_get_status(self, cmd): """Run shell command and return its return code on the host.""" if self.host_shell: return self.host_shell.run_command_get_status(cmd) else: raise OSInterfaceError('There is no host for DUT.') def run_host_shell_command_get_output(self, cmd): """Run shell command and return its console output.""" if self.host_shell: return self.host_shell.run_command_get_output(cmd) else: raise OSInterfaceError('There is no host for DUT.') def read_file(self, path): """Read the content of the file.""" return self.shell.read_file(path) def write_file(self, path, data): """Write the data to the file.""" self.shell.write_file(path, data) def append_file(self, path, data): """Append the data to the file.""" self.shell.append_file(path, data) def path_exists(self, path): """Return True if the path exists on DUT.""" cmd = 'test -e %s' % path return self.run_shell_command_get_status(cmd) == 0 def is_dir(self, path): """Return True if the path is a directory.""" cmd = 'test -d %s' % path return self.run_shell_command_get_status(cmd) == 0 def create_dir(self, path): """Create a new directory.""" cmd = 'mkdir -p %s' % path return self.run_shell_command(cmd) def create_temp_file(self, prefix): """Create a temporary file with a prefix.""" if self.is_android: tmp_path = '/data/local/tmp' else: tmp_path = '/tmp' cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix) return self.run_shell_command_get_output(cmd)[0] def copy_file(self, from_path, to_path): """Copy the file.""" cmd = 'cp -f %s %s' % (from_path, to_path) return self.run_shell_command(cmd) def copy_dir(self, from_path, to_path): """Copy the directory.""" cmd = 'cp -rf %s %s' % (from_path, to_path) return self.run_shell_command(cmd) def remove_file(self, path): """Remove the file.""" cmd = 'rm -f %s' % path return self.run_shell_command(cmd) def remove_dir(self, path): """Remove the directory.""" cmd = 'rm -rf %s' % path return self.run_shell_command(cmd) def get_file_size(self, path): """Get the size of the file.""" cmd = 'stat -c %%s %s' % path return int(self.run_shell_command_get_output(cmd)[0]) def target_hosted(self): """Return True if running on DUT.""" if self.is_android: return True signature = open('/etc/lsb-release', 'r').readlines()[0] return re.search(r'chrom(ium|e)os', signature, re.IGNORECASE) != None def state_dir_file(self, file_name): """Get a full path of a file in the state directory.""" return os.path.join(self.state_dir, file_name) def wait_for_device(self, timeout): """Wait for an Android device to be connected.""" return self.shell.wait_for_device(timeout) def wait_for_no_device(self, timeout): """Wait for no Android device to be connected (offline).""" return self.shell.wait_for_no_device(timeout) def log(self, text): """Write text to the log file and print it on the screen, if enabled. The entire log (maintained across reboots) can be found in self.log_file. """ if not self.log_file or not os.path.exists(self.state_dir): # Called before environment was initialized, ignore. return timestamp = datetime.datetime.strftime( datetime.datetime.now(), '%I:%M:%S %p:') with open(self.log_file, 'a') as log_f: log_f.write('%s %s\n' % (timestamp, text)) log_f.flush() os.fdatasync(log_f) def is_removable_device(self, device): """Check if a certain storage device is removable. device - a string, file name of a storage device or a device partition (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). Returns True if the device is removable, False if not. """ if self.is_android: return False if not self.target_hosted(): return False # Drop trailing digit(s) and letter(s) (if any) base_dev = self.strip_part(device.split('/')[2]) removable = int(self.read_file('/sys/block/%s/removable' % base_dev)) return removable == 1 def get_internal_disk(self, device): """Get the internal disk by given the current disk. If device is removable device, internal disk is decided by which kind of divice (arm or x86). Otherwise, return device itself. device - a string, file name of a storage device or a device partition (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). Return internal kernel disk. """ if self.is_removable_device(device): for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'): if self.path_exists(p): return p return '/dev/sda' else: return self.strip_part(device) def get_root_part(self): """Return a string, the name of root device with partition number""" # FIXME(waihong): Android doesn't support dual kernel/root and misses # the related tools. Just return something that not break the existing # code. if self.is_android: return '/dev/mmcblk0p3' else: return self.run_shell_command_get_output('rootdev -s')[0] def get_root_dev(self): """Return a string, the name of root device without partition number""" return self.strip_part(self.get_root_part()) def join_part(self, dev, part): """Return a concatenated string of device and partition number""" if dev.endswith(tuple(str(i) for i in range(0, 10))): return dev + 'p' + part else: return dev + part def strip_part(self, dev_with_part): """Return a stripped string without partition number""" dev_name_stripper = re.compile('p?[0-9]+$') return dev_name_stripper.sub('', dev_with_part) def retrieve_body_version(self, blob): """Given a blob, retrieve body version. Currently works for both, firmware and kernel blobs. Returns '-1' in case the version can not be retrieved reliably. """ header_format = '<8s8sQ' preamble_format = '<40sQ' magic, _, kb_size = struct.unpack_from(header_format, blob) if magic != 'CHROMEOS': return -1 # This could be a corrupted version case. _, version = struct.unpack_from(preamble_format, blob, kb_size) return version def retrieve_datakey_version(self, blob): """Given a blob, retrieve firmware data key version. Currently works for both, firmware and kernel blobs. Returns '-1' in case the version can not be retrieved reliably. """ header_format = '<8s96sQ' magic, _, version = struct.unpack_from(header_format, blob) if magic != 'CHROMEOS': return -1 # This could be a corrupted version case. return version def retrieve_kernel_subkey_version(self, blob): """Given a blob, retrieve kernel subkey version. It is in firmware vblock's preamble. """ header_format = '<8s8sQ' preamble_format = '<72sQ' magic, _, kb_size = struct.unpack_from(header_format, blob) if magic != 'CHROMEOS': return -1 _, version = struct.unpack_from(preamble_format, blob, kb_size) return version def retrieve_preamble_flags(self, blob): """Given a blob, retrieve preamble flags if available. It only works for firmware. If the version of preamble header is less than 2.1, no preamble flags supported, just returns 0. """ header_format = '<8s8sQ' preamble_format = '<32sII64sI' magic, _, kb_size = struct.unpack_from(header_format, blob) if magic != 'CHROMEOS': return -1 # This could be a corrupted version case. _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob, kb_size) if ver > 2 or (ver == 2 and subver >= 1): return flags else: return 0 # Returns 0 if preamble flags not available. def read_partition(self, partition, size): """Read the requested partition, up to size bytes.""" tmp_file = self.state_dir_file('part.tmp') self.run_shell_command('dd if=%s of=%s bs=1 count=%d' % ( partition, tmp_file, size)) data = self.read_file(tmp_file) self.remove_file(tmp_file) return data