1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module to provide interface to OS services.""" 5 6import datetime 7import os 8import re 9import struct 10 11import shell_wrapper 12 13 14class OSInterfaceError(Exception): 15 """OS interface specific exception.""" 16 pass 17 18 19class Crossystem(object): 20 """A wrapper for the crossystem utility.""" 21 22 # Code dedicated for user triggering recovery mode through crossystem. 23 USER_RECOVERY_REQUEST_CODE = '193' 24 25 def __init__(self, os_if): 26 """Init the instance. If running on Mario - adjust the map.""" 27 self.os_if = os_if 28 29 def __getattr__(self, name): 30 """ 31 Retrieve a crosssystem attribute. 32 33 Attempt to access crossystemobject.name will invoke `crossystem name' 34 and return the stdout as the value. 35 """ 36 return self.os_if.run_shell_command_get_output( 37 'crossystem %s' % name)[0] 38 39 def __setattr__(self, name, value): 40 if name in ('os_if', ): 41 self.__dict__[name] = value 42 else: 43 self.os_if.run_shell_command( 44 'crossystem "%s=%s"' % (name, value), modifies_device=True) 45 46 def request_recovery(self): 47 """Request recovery mode next time the target reboots.""" 48 49 self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE) 50 51 52class OSInterface(object): 53 """An object to encapsulate OS services functions.""" 54 55 def __init__(self, state_dir=None, log_file=None, test_mode=False): 56 """Object initialization (side effect: creates the state_dir) 57 58 @param state_dir: the name of the directory to use for storing state. 59 The contents of this directory persist over system 60 restarts and power cycles. 61 @param log_file: the name of the log file kept in the state directory. 62 @param test_mode: if true, skip (and just log) any shell call 63 marked with modifies_device=True 64 """ 65 66 # We keep the state of FAFT test in a permanent directory over reboots. 67 if state_dir is None: 68 state_dir = '/usr/local/tmp/faft' 69 70 if log_file is None: 71 log_file = 'faft_client.log' 72 73 if not os.path.isabs(log_file): 74 log_file = os.path.join(state_dir, log_file) 75 76 self.state_dir = state_dir 77 self.log_file = log_file 78 self.test_mode = test_mode 79 80 self.shell = shell_wrapper.LocalShell(self) 81 self.host_shell = None 82 83 self.create_dir(self.state_dir) 84 85 self.cs = Crossystem(self) 86 87 def run_shell_command(self, cmd, block=True, modifies_device=False): 88 """Run a shell command. 89 90 @param cmd: the command to run 91 @param block: if True (default), wait for command to finish 92 @param modifies_device: If True and running in test mode, just log 93 the command, but don't actually run it. 94 This should be set for RPC commands that alter 95 the OS or firmware in some persistent way. 96 97 @raise autotest_lib.client.common_lib.error.CmdError: if command fails 98 """ 99 if self.test_mode and modifies_device: 100 self.log('[SKIPPED] %s' % cmd) 101 else: 102 self.shell.run_command(cmd, block=block) 103 104 def run_shell_command_check_output(self, cmd, success_token): 105 """Run shell command and check its stdout for a string.""" 106 return self.shell.run_command_check_output(cmd, success_token) 107 108 def run_shell_command_get_result(self, cmd, ignore_status=False): 109 """Run shell command and get a CmdResult object as a result. 110 111 @param cmd: the command to run 112 @param ignore_status: if True, do not raise CmdError, even if rc != 0. 113 @rtype: autotest_lib.client.common_lib.utils.CmdResult 114 @raise autotest_lib.client.common_lib.error.CmdError: if command fails 115 """ 116 return self.shell.run_command_get_result(cmd, ignore_status) 117 118 def run_shell_command_get_status(self, cmd): 119 """Run shell command and return its return code.""" 120 return self.shell.run_command_get_status(cmd) 121 122 def run_shell_command_get_output(self, cmd, include_stderr=False): 123 """Run shell command and return its console output.""" 124 return self.shell.run_command_get_output(cmd, include_stderr) 125 126 def read_file(self, path): 127 """Read the content of the file.""" 128 return self.shell.read_file(path) 129 130 def write_file(self, path, data): 131 """Write the data to the file.""" 132 self.shell.write_file(path, data) 133 134 def append_file(self, path, data): 135 """Append the data to the file.""" 136 self.shell.append_file(path, data) 137 138 def path_exists(self, path): 139 """Return True if the path exists on DUT.""" 140 cmd = 'test -e %s' % path 141 return self.run_shell_command_get_status(cmd) == 0 142 143 def is_dir(self, path): 144 """Return True if the path is a directory.""" 145 cmd = 'test -d %s' % path 146 return self.run_shell_command_get_status(cmd) == 0 147 148 def create_dir(self, path): 149 """Create a new directory.""" 150 cmd = 'mkdir -p %s' % path 151 return self.run_shell_command(cmd) 152 153 def create_temp_file(self, prefix): 154 """Create a temporary file with a prefix.""" 155 tmp_path = '/tmp' 156 cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix) 157 return self.run_shell_command_get_output(cmd)[0] 158 159 def copy_file(self, from_path, to_path): 160 """Copy the file.""" 161 cmd = 'cp -f %s %s' % (from_path, to_path) 162 return self.run_shell_command(cmd) 163 164 def copy_dir(self, from_path, to_path): 165 """Copy the directory.""" 166 cmd = 'cp -rf %s %s' % (from_path, to_path) 167 return self.run_shell_command(cmd) 168 169 def remove_file(self, path): 170 """Remove the file.""" 171 cmd = 'rm -f %s' % path 172 return self.run_shell_command(cmd) 173 174 def remove_dir(self, path): 175 """Remove the directory.""" 176 cmd = 'rm -rf %s' % path 177 return self.run_shell_command(cmd) 178 179 def get_file_size(self, path): 180 """Get the size of the file.""" 181 cmd = 'stat -c %%s %s' % path 182 return int(self.run_shell_command_get_output(cmd)[0]) 183 184 def target_hosted(self): 185 """Return True if running on DUT.""" 186 with open('/etc/lsb-release', 'r') as lsb_release: 187 signature = lsb_release.readlines()[0] 188 return bool(re.search(r'chrom(ium|e)os', signature, re.IGNORECASE)) 189 190 def state_dir_file(self, file_name): 191 """Get a full path of a file in the state directory.""" 192 return os.path.join(self.state_dir, file_name) 193 194 def log(self, text): 195 """Write text to the log file and print it on the screen, if enabled. 196 197 The entire log (maintained across reboots) can be found in 198 self.log_file. 199 """ 200 if not self.log_file or not os.path.exists(self.state_dir): 201 # Called before environment was initialized, ignore. 202 return 203 204 timestamp = datetime.datetime.strftime(datetime.datetime.now(), 205 '%I:%M:%S %p:') 206 207 with open(self.log_file, 'a') as log_f: 208 log_f.write('%s %s\n' % (timestamp, text)) 209 log_f.flush() 210 os.fdatasync(log_f.fileno()) 211 212 def is_removable_device(self, device): 213 """Check if a certain storage device is removable. 214 215 device - a string, file name of a storage device or a device partition 216 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 217 218 Returns True if the device is removable, False if not. 219 """ 220 if not self.target_hosted(): 221 return False 222 223 # Drop trailing digit(s) and letter(s) (if any) 224 base_dev = self.strip_part(device.split('/')[2]) 225 removable = int(self.read_file('/sys/block/%s/removable' % base_dev)) 226 227 return removable == 1 228 229 def get_internal_disk(self, device): 230 """Get the internal disk by given the current disk. 231 232 If device is removable device, internal disk is decided by which kind 233 of divice (arm or x86). Otherwise, return device itself. 234 235 device - a string, file name of a storage device or a device partition 236 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 237 238 Return internal kernel disk. 239 """ 240 if self.is_removable_device(device): 241 for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'): 242 if self.path_exists(p): 243 return p 244 return '/dev/sda' 245 else: 246 return self.strip_part(device) 247 248 def get_root_part(self): 249 """Return a string, the name of root device with partition number""" 250 return self.run_shell_command_get_output('rootdev -s')[0] 251 252 def get_root_dev(self): 253 """Return a string, the name of root device without partition number""" 254 return self.strip_part(self.get_root_part()) 255 256 def join_part(self, dev, part): 257 """Return a concatenated string of device and partition number""" 258 if dev.endswith(tuple(str(i) for i in range(0, 10))): 259 return dev + 'p' + part 260 else: 261 return dev + part 262 263 def strip_part(self, dev_with_part): 264 """Return a stripped string without partition number""" 265 dev_name_stripper = re.compile('p?[0-9]+$') 266 return dev_name_stripper.sub('', dev_with_part) 267 268 def retrieve_body_version(self, blob): 269 """Given a blob, retrieve body version. 270 271 Currently works for both, firmware and kernel blobs. Returns '-1' in 272 case the version can not be retrieved reliably. 273 """ 274 header_format = '<8s8sQ' 275 preamble_format = '<40sQ' 276 magic, _, kb_size = struct.unpack_from(header_format, blob) 277 278 if magic != 'CHROMEOS': 279 return -1 # This could be a corrupted version case. 280 281 _, version = struct.unpack_from(preamble_format, blob, kb_size) 282 return version 283 284 def retrieve_datakey_version(self, blob): 285 """Given a blob, retrieve firmware data key version. 286 287 Currently works for both, firmware and kernel blobs. Returns '-1' in 288 case the version can not be retrieved reliably. 289 """ 290 header_format = '<8s96sQ' 291 magic, _, version = struct.unpack_from(header_format, blob) 292 if magic != 'CHROMEOS': 293 return -1 # This could be a corrupted version case. 294 return version 295 296 def retrieve_kernel_subkey_version(self, blob): 297 """Given a blob, retrieve kernel subkey version. 298 299 It is in firmware vblock's preamble. 300 """ 301 302 header_format = '<8s8sQ' 303 preamble_format = '<72sQ' 304 magic, _, kb_size = struct.unpack_from(header_format, blob) 305 306 if magic != 'CHROMEOS': 307 return -1 308 309 _, version = struct.unpack_from(preamble_format, blob, kb_size) 310 return version 311 312 def retrieve_preamble_flags(self, blob): 313 """Given a blob, retrieve preamble flags if available. 314 315 It only works for firmware. If the version of preamble header is less 316 than 2.1, no preamble flags supported, just returns 0. 317 """ 318 header_format = '<8s8sQ' 319 preamble_format = '<32sII64sI' 320 magic, _, kb_size = struct.unpack_from(header_format, blob) 321 322 if magic != 'CHROMEOS': 323 return -1 # This could be a corrupted version case. 324 325 _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob, 326 kb_size) 327 328 if ver > 2 or (ver == 2 and subver >= 1): 329 return flags 330 else: 331 return 0 # Returns 0 if preamble flags not available. 332 333 def read_partition(self, partition, size): 334 """Read the requested partition, up to size bytes.""" 335 tmp_file = self.state_dir_file('part.tmp') 336 self.run_shell_command( 337 'dd if=%s of=%s bs=1 count=%d' % (partition, tmp_file, size)) 338 data = self.read_file(tmp_file) 339 self.remove_file(tmp_file) 340 return data 341