1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module to provide interface to OS services.""" 5import logging 6import os 7import re 8import struct 9 10from autotest_lib.client.cros.faft.utils import shell_wrapper 11 12 13class OSInterfaceError(Exception): 14 """OS interface specific exception.""" 15 pass 16 17 18class Crossystem(object): 19 """A wrapper for the crossystem utility.""" 20 21 # Code dedicated for user triggering recovery mode through crossystem. 22 USER_RECOVERY_REQUEST_CODE = '193' 23 24 def __init__(self, os_if): 25 """Init the instance. If running on Mario - adjust the map.""" 26 self.os_if = os_if 27 28 def __getattr__(self, name): 29 """ 30 Retrieve a crosssystem attribute. 31 32 Attempt to access crossystemobject.name will invoke `crossystem name' 33 and return the stdout as the value. 34 """ 35 return self.os_if.run_shell_command_get_output( 36 'crossystem %s' % name)[0] 37 38 def __setattr__(self, name, value): 39 if name in ('os_if', ): 40 self.__dict__[name] = value 41 else: 42 self.os_if.run_shell_command( 43 'crossystem "%s=%s"' % (name, value), modifies_device=True) 44 45 def request_recovery(self): 46 """Request recovery mode next time the target reboots.""" 47 48 self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE) 49 50 51class OSInterface(object): 52 """An object to encapsulate OS services functions.""" 53 54 def __init__(self, state_dir=None, test_mode=False): 55 """Object initialization (side effect: creates the state_dir) 56 57 @param state_dir: the name of the directory to use for storing state. 58 The contents of this directory persist over system 59 restarts and power cycles. 60 @param test_mode: if true, skip (and just log) any shell call 61 marked with modifies_device=True 62 """ 63 64 # We keep the state of FAFT test in a permanent directory over reboots. 65 if state_dir is None: 66 state_dir = '/usr/local/tmp/faft' 67 68 self.state_dir = state_dir 69 self.test_mode = test_mode 70 71 self.shell = shell_wrapper.LocalShell(self) 72 self.host_shell = None 73 74 self.create_dir(self.state_dir) 75 76 self.cs = Crossystem(self) 77 78 def run_shell_command(self, cmd, block=True, modifies_device=False): 79 """Run a shell command. 80 81 @param cmd: the command to run 82 @param block: if True (default), wait for command to finish 83 @param modifies_device: If True and running in test mode, just log 84 the command, but don't actually run it. 85 This should be set for RPC commands that alter 86 the OS or firmware in some persistent way. 87 88 @raise autotest_lib.client.common_lib.error.CmdError: if command fails 89 """ 90 if self.test_mode and modifies_device: 91 logging.info('[SKIPPED] %s', cmd) 92 else: 93 self.shell.run_command(cmd, block=block) 94 95 def run_shell_command_check_output(self, cmd, success_token): 96 """Run shell command and check its stdout for a string.""" 97 return self.shell.run_command_check_output(cmd, success_token) 98 99 def run_shell_command_get_result(self, cmd, ignore_status=False): 100 """Run shell command and get a CmdResult object as a result. 101 102 @param cmd: the command to run 103 @param ignore_status: if True, do not raise CmdError, even if rc != 0. 104 @rtype: autotest_lib.client.common_lib.utils.CmdResult 105 @raise autotest_lib.client.common_lib.error.CmdError: if command fails 106 """ 107 return self.shell.run_command_get_result(cmd, ignore_status) 108 109 def run_shell_command_get_status(self, cmd): 110 """Run shell command and return its return code.""" 111 return self.shell.run_command_get_status(cmd) 112 113 def run_shell_command_get_output(self, cmd, include_stderr=False): 114 """Run shell command and return its console output.""" 115 return self.shell.run_command_get_output(cmd, include_stderr) 116 117 def read_file(self, path): 118 """Read the content of the file.""" 119 return self.shell.read_file(path) 120 121 def write_file(self, path, data): 122 """Write the data to the file.""" 123 self.shell.write_file(path, data) 124 125 def append_file(self, path, data): 126 """Append the data to the file.""" 127 self.shell.append_file(path, data) 128 129 def path_exists(self, path): 130 """Return True if the path exists on DUT.""" 131 cmd = 'test -e %s' % path 132 return self.run_shell_command_get_status(cmd) == 0 133 134 def is_dir(self, path): 135 """Return True if the path is a directory.""" 136 cmd = 'test -d %s' % path 137 return self.run_shell_command_get_status(cmd) == 0 138 139 def create_dir(self, path): 140 """Create a new directory.""" 141 cmd = 'mkdir -p %s' % path 142 return self.run_shell_command(cmd) 143 144 def create_temp_file(self, prefix): 145 """Create a temporary file with a prefix.""" 146 tmp_path = '/tmp' 147 cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix) 148 return self.run_shell_command_get_output(cmd)[0] 149 150 def copy_file(self, from_path, to_path): 151 """Copy the file.""" 152 cmd = 'cp -f %s %s' % (from_path, to_path) 153 return self.run_shell_command(cmd) 154 155 def copy_dir(self, from_path, to_path): 156 """Copy the directory.""" 157 cmd = 'cp -rf %s %s' % (from_path, to_path) 158 return self.run_shell_command(cmd) 159 160 def remove_file(self, path): 161 """Remove the file.""" 162 cmd = 'rm -f %s' % path 163 return self.run_shell_command(cmd) 164 165 def remove_dir(self, path): 166 """Remove the directory.""" 167 cmd = 'rm -rf %s' % path 168 return self.run_shell_command(cmd) 169 170 def get_file_size(self, path): 171 """Get the size of the file.""" 172 cmd = 'stat -c %%s %s' % path 173 return int(self.run_shell_command_get_output(cmd)[0]) 174 175 def target_hosted(self): 176 """Return True if running on DUT.""" 177 with open('/etc/lsb-release', 'r') as lsb_release: 178 signature = lsb_release.readlines()[0] 179 return bool(re.search(r'chrom(ium|e)os', signature, re.IGNORECASE)) 180 181 def state_dir_file(self, file_name): 182 """Get a full path of a file in the state directory.""" 183 return os.path.join(self.state_dir, file_name) 184 185 def is_removable_device(self, device): 186 """Check if a certain storage device is removable. 187 188 device - a string, file name of a storage device or a device partition 189 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 190 191 Returns True if the device is removable, False if not. 192 """ 193 if not self.target_hosted(): 194 return False 195 196 # Drop trailing digit(s) and letter(s) (if any) 197 base_dev = self.strip_part(device.split('/')[2]) 198 removable = int(self.read_file('/sys/block/%s/removable' % base_dev)) 199 200 return removable == 1 201 202 def get_internal_disk(self, device): 203 """Get the internal disk by given the current disk. 204 205 If device is removable device, internal disk is decided by which kind 206 of divice (arm or x86). Otherwise, return device itself. 207 208 device - a string, file name of a storage device or a device partition 209 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 210 211 Return internal kernel disk. 212 """ 213 if self.is_removable_device(device): 214 for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'): 215 if self.path_exists(p): 216 devicetype = '/sys/block/%s/device/type' % p.split('/')[2] 217 if (not self.path_exists(devicetype) 218 or self.read_file(devicetype).strip() != 'SD'): 219 return p 220 return '/dev/sda' 221 else: 222 return self.strip_part(device) 223 224 def get_root_part(self): 225 """Return a string, the name of root device with partition number""" 226 return self.run_shell_command_get_output('rootdev -s')[0] 227 228 def get_root_dev(self): 229 """Return a string, the name of root device without partition number""" 230 return self.strip_part(self.get_root_part()) 231 232 def join_part(self, dev, part): 233 """Return a concatenated string of device and partition number""" 234 if dev.endswith(tuple(str(i) for i in range(0, 10))): 235 return dev + 'p' + part 236 else: 237 return dev + part 238 239 def strip_part(self, dev_with_part): 240 """Return a stripped string without partition number""" 241 dev_name_stripper = re.compile('p?[0-9]+$') 242 return dev_name_stripper.sub('', dev_with_part) 243 244 def retrieve_body_version(self, blob): 245 """Given a blob, retrieve body version. 246 247 Currently works for both, firmware and kernel blobs. Returns '-1' in 248 case the version can not be retrieved reliably. 249 """ 250 header_format = '<8s8sQ' 251 preamble_format = '<40sQ' 252 magic, _, kb_size = struct.unpack_from(header_format, blob) 253 254 if magic != b'CHROMEOS': 255 logging.error(f"Incorrect magic string {magic}") 256 return -1 # This could be a corrupted version case. 257 258 _, version = struct.unpack_from(preamble_format, blob, kb_size) 259 return version 260 261 def retrieve_datakey_version(self, blob): 262 """Given a blob, retrieve firmware data key version. 263 264 Currently works for both, firmware and kernel blobs. Returns '-1' in 265 case the version can not be retrieved reliably. 266 """ 267 header_format = '<8s96sQ' 268 magic, _, version = struct.unpack_from(header_format, blob) 269 if magic != b'CHROMEOS': 270 logging.error(f"Incorrect magic string {magic}") 271 return -1 # This could be a corrupted version case. 272 return version 273 274 def retrieve_kernel_subkey_version(self, blob): 275 """Given a blob, retrieve kernel subkey version. 276 277 It is in firmware vblock's preamble. 278 """ 279 280 header_format = '<8s8sQ' 281 preamble_format = '<72sQ' 282 magic, _, kb_size = struct.unpack_from(header_format, blob) 283 284 if magic != b'CHROMEOS': 285 logging.error(f"Incorrect magic string {magic}") 286 return -1 287 288 _, version = struct.unpack_from(preamble_format, blob, kb_size) 289 return version 290 291 def retrieve_preamble_flags(self, blob): 292 """Given a blob, retrieve preamble flags if available. 293 294 It only works for firmware. If the version of preamble header is less 295 than 2.1, no preamble flags supported, just returns 0. 296 """ 297 header_format = '<8s8sQ' 298 preamble_format = '<32sII64sI' 299 magic, _, kb_size = struct.unpack_from(header_format, blob) 300 301 if magic != b'CHROMEOS': 302 logging.error(f"Incorrect magic string {magic}") 303 return -1 # This could be a corrupted version case. 304 305 _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob, 306 kb_size) 307 308 if ver > 2 or (ver == 2 and subver >= 1): 309 return flags 310 else: 311 return 0 # Returns 0 if preamble flags not available. 312 313 def read_partition(self, partition, size): 314 """Read the requested partition, up to size bytes.""" 315 tmp_file = self.state_dir_file('part.tmp') 316 self.run_shell_command( 317 'dd if=%s of=%s bs=1 count=%d' % (partition, tmp_file, size)) 318 data = self.read_file(tmp_file) 319 self.remove_file(tmp_file) 320 return data 321