1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""A module to provide interface to OS services.""" 5import datetime 6import errno 7import logging 8import os 9import re 10import struct 11 12import shell_wrapper 13 14 15class OSInterfaceError(Exception): 16 """OS interface specific exception.""" 17 pass 18 19 20class Crossystem(object): 21 """A wrapper for the crossystem utility.""" 22 23 # Code dedicated for user triggering recovery mode through crossystem. 24 USER_RECOVERY_REQUEST_CODE = '193' 25 26 def __init__(self, os_if): 27 """Init the instance. If running on Mario - adjust the map.""" 28 self.os_if = os_if 29 30 def __getattr__(self, name): 31 """ 32 Retrieve a crosssystem attribute. 33 34 Attempt to access crossystemobject.name will invoke `crossystem name' 35 and return the stdout as the value. 36 """ 37 return self.os_if.run_shell_command_get_output( 38 'crossystem %s' % name)[0] 39 40 def __setattr__(self, name, value): 41 if name in ('os_if', ): 42 self.__dict__[name] = value 43 else: 44 self.os_if.run_shell_command( 45 'crossystem "%s=%s"' % (name, value), modifies_device=True) 46 47 def request_recovery(self): 48 """Request recovery mode next time the target reboots.""" 49 50 self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE) 51 52 53class OSInterface(object): 54 """An object to encapsulate OS services functions.""" 55 56 def __init__(self, state_dir=None, log_file=None, test_mode=False): 57 """Object initialization (side effect: creates the state_dir) 58 59 @param state_dir: the name of the directory to use for storing state. 60 The contents of this directory persist over system 61 restarts and power cycles. 62 @param log_file: the name of the log file kept in the state directory. 63 @param test_mode: if true, skip (and just log) any shell call 64 marked with modifies_device=True 65 """ 66 67 # We keep the state of FAFT test in a permanent directory over reboots. 68 if state_dir is None: 69 state_dir = '/usr/local/tmp/faft' 70 71 if log_file is None: 72 log_file = 'faft_client.log' 73 74 if not os.path.isabs(log_file): 75 log_file = os.path.join(state_dir, log_file) 76 77 self.state_dir = state_dir 78 self.log_file = log_file 79 self.test_mode = test_mode 80 81 self._use_log_file = False 82 83 self.shell = shell_wrapper.LocalShell(self) 84 self.host_shell = None 85 86 self.create_dir(self.state_dir) 87 88 self.cs = Crossystem(self) 89 90 def run_shell_command(self, cmd, block=True, modifies_device=False): 91 """Run a shell command. 92 93 @param cmd: the command to run 94 @param block: if True (default), wait for command to finish 95 @param modifies_device: If True and running in test mode, just log 96 the command, but don't actually run it. 97 This should be set for RPC commands that alter 98 the OS or firmware in some persistent way. 99 100 @raise autotest_lib.client.common_lib.error.CmdError: if command fails 101 """ 102 if self.test_mode and modifies_device: 103 self.log('[SKIPPED] %s' % cmd) 104 else: 105 self.shell.run_command(cmd, block=block) 106 107 def run_shell_command_check_output(self, cmd, success_token): 108 """Run shell command and check its stdout for a string.""" 109 return self.shell.run_command_check_output(cmd, success_token) 110 111 def run_shell_command_get_result(self, cmd, ignore_status=False): 112 """Run shell command and get a CmdResult object as a result. 113 114 @param cmd: the command to run 115 @param ignore_status: if True, do not raise CmdError, even if rc != 0. 116 @rtype: autotest_lib.client.common_lib.utils.CmdResult 117 @raise autotest_lib.client.common_lib.error.CmdError: if command fails 118 """ 119 return self.shell.run_command_get_result(cmd, ignore_status) 120 121 def run_shell_command_get_status(self, cmd): 122 """Run shell command and return its return code.""" 123 return self.shell.run_command_get_status(cmd) 124 125 def run_shell_command_get_output(self, cmd, include_stderr=False): 126 """Run shell command and return its console output.""" 127 return self.shell.run_command_get_output(cmd, include_stderr) 128 129 def read_file(self, path): 130 """Read the content of the file.""" 131 return self.shell.read_file(path) 132 133 def write_file(self, path, data): 134 """Write the data to the file.""" 135 self.shell.write_file(path, data) 136 137 def append_file(self, path, data): 138 """Append the data to the file.""" 139 self.shell.append_file(path, data) 140 141 def path_exists(self, path): 142 """Return True if the path exists on DUT.""" 143 cmd = 'test -e %s' % path 144 return self.run_shell_command_get_status(cmd) == 0 145 146 def is_dir(self, path): 147 """Return True if the path is a directory.""" 148 cmd = 'test -d %s' % path 149 return self.run_shell_command_get_status(cmd) == 0 150 151 def create_dir(self, path): 152 """Create a new directory.""" 153 cmd = 'mkdir -p %s' % path 154 return self.run_shell_command(cmd) 155 156 def create_temp_file(self, prefix): 157 """Create a temporary file with a prefix.""" 158 tmp_path = '/tmp' 159 cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix) 160 return self.run_shell_command_get_output(cmd)[0] 161 162 def copy_file(self, from_path, to_path): 163 """Copy the file.""" 164 cmd = 'cp -f %s %s' % (from_path, to_path) 165 return self.run_shell_command(cmd) 166 167 def copy_dir(self, from_path, to_path): 168 """Copy the directory.""" 169 cmd = 'cp -rf %s %s' % (from_path, to_path) 170 return self.run_shell_command(cmd) 171 172 def remove_file(self, path): 173 """Remove the file.""" 174 cmd = 'rm -f %s' % path 175 return self.run_shell_command(cmd) 176 177 def remove_dir(self, path): 178 """Remove the directory.""" 179 cmd = 'rm -rf %s' % path 180 return self.run_shell_command(cmd) 181 182 def get_file_size(self, path): 183 """Get the size of the file.""" 184 cmd = 'stat -c %%s %s' % path 185 return int(self.run_shell_command_get_output(cmd)[0]) 186 187 def target_hosted(self): 188 """Return True if running on DUT.""" 189 with open('/etc/lsb-release', 'r') as lsb_release: 190 signature = lsb_release.readlines()[0] 191 return bool(re.search(r'chrom(ium|e)os', signature, re.IGNORECASE)) 192 193 def state_dir_file(self, file_name): 194 """Get a full path of a file in the state directory.""" 195 return os.path.join(self.state_dir, file_name) 196 197 def log(self, text): 198 """Write text to the log file and print it on the screen, if enabled. 199 200 The entire log (kept across reboots) can be found in self.log_file. 201 """ 202 if not self._use_log_file: 203 # Called during init, during shutdown, or after a log write fails. 204 logging.info('%s', text) 205 return 206 207 timestamp = datetime.datetime.strftime(datetime.datetime.now(), 208 '%I:%M:%S %p:') 209 210 try: 211 with open(self.log_file, 'a') as log_f: 212 log_f.write('%s %s\n' % (timestamp, text)) 213 log_f.flush() 214 os.fdatasync(log_f.fileno()) 215 except EnvironmentError: 216 logging.info('%s', text) 217 logging.warn("Couldn't write RPC Log: %s", self.log_file, 218 exc_info=True) 219 # Report error only once. 220 self._use_log_file = False 221 222 def start_file_logging(self): 223 """Create and start using using the log file (or report failure)""" 224 if self._use_log_file: 225 return 226 227 try: 228 229 with open(self.log_file, 'a'): 230 self._use_log_file = True 231 232 # log to stderr, showing the filename (extra newline to add a gap) 233 logging.debug('Begin RPC Log: %s\n', self.log_file) 234 235 # log into the file, to indicate the start time 236 self.log('Begin RPC Log: %s (this file)' % self.log_file) 237 238 except EnvironmentError: 239 logging.warn("Couldn't write RPC Log: %s", self.log_file, 240 exc_info=True) 241 self._use_log_file = False 242 243 def stop_file_logging(self): 244 """Stop using the log file (switch back to stderr).""" 245 if not self._use_log_file: 246 return 247 248 # log to the file, to indicate when done (extra newline to add a gap) 249 self.log('End RPC Log.\n') 250 251 self._use_log_file = False 252 253 # log to stderr, to tie timestamps together 254 logging.debug('End RPC Log.') 255 256 def remove_log_file(self): 257 """Delete the log file.""" 258 if not self.test_mode: 259 # Test mode shouldn't be able to actually remove the log. 260 try: 261 os.remove(self.log_file) 262 except EnvironmentError as e: 263 if e.errno != errno.ENOENT: 264 self.log("Could not remove log file: %s" % e) 265 266 def dump_log(self, remove_log=False): 267 """Dump the log file. 268 269 @param remove_log: Remove the log file after dump 270 @return: String of the log file content. 271 """ 272 if remove_log and not self.test_mode: 273 # Make sure "end RPC log" is printed before grabbing the log 274 self.stop_file_logging() 275 276 try: 277 with open(self.log_file, 'r') as f: 278 log = f.read() 279 except EnvironmentError as e: 280 log = '<%s>' % e 281 282 if remove_log and not self.test_mode: 283 self.remove_log_file() 284 return log 285 286 def is_removable_device(self, device): 287 """Check if a certain storage device is removable. 288 289 device - a string, file name of a storage device or a device partition 290 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 291 292 Returns True if the device is removable, False if not. 293 """ 294 if not self.target_hosted(): 295 return False 296 297 # Drop trailing digit(s) and letter(s) (if any) 298 base_dev = self.strip_part(device.split('/')[2]) 299 removable = int(self.read_file('/sys/block/%s/removable' % base_dev)) 300 301 return removable == 1 302 303 def get_internal_disk(self, device): 304 """Get the internal disk by given the current disk. 305 306 If device is removable device, internal disk is decided by which kind 307 of divice (arm or x86). Otherwise, return device itself. 308 309 device - a string, file name of a storage device or a device partition 310 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 311 312 Return internal kernel disk. 313 """ 314 if self.is_removable_device(device): 315 for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'): 316 if self.path_exists(p): 317 devicetype = '/sys/block/%s/device/type' % p.split('/')[2] 318 if (not self.path_exists(devicetype) 319 or self.read_file(devicetype).strip() != 'SD'): 320 return p 321 return '/dev/sda' 322 else: 323 return self.strip_part(device) 324 325 def get_root_part(self): 326 """Return a string, the name of root device with partition number""" 327 return self.run_shell_command_get_output('rootdev -s')[0] 328 329 def get_root_dev(self): 330 """Return a string, the name of root device without partition number""" 331 return self.strip_part(self.get_root_part()) 332 333 def join_part(self, dev, part): 334 """Return a concatenated string of device and partition number""" 335 if dev.endswith(tuple(str(i) for i in range(0, 10))): 336 return dev + 'p' + part 337 else: 338 return dev + part 339 340 def strip_part(self, dev_with_part): 341 """Return a stripped string without partition number""" 342 dev_name_stripper = re.compile('p?[0-9]+$') 343 return dev_name_stripper.sub('', dev_with_part) 344 345 def retrieve_body_version(self, blob): 346 """Given a blob, retrieve body version. 347 348 Currently works for both, firmware and kernel blobs. Returns '-1' in 349 case the version can not be retrieved reliably. 350 """ 351 header_format = '<8s8sQ' 352 preamble_format = '<40sQ' 353 magic, _, kb_size = struct.unpack_from(header_format, blob) 354 355 if magic != 'CHROMEOS': 356 return -1 # This could be a corrupted version case. 357 358 _, version = struct.unpack_from(preamble_format, blob, kb_size) 359 return version 360 361 def retrieve_datakey_version(self, blob): 362 """Given a blob, retrieve firmware data key version. 363 364 Currently works for both, firmware and kernel blobs. Returns '-1' in 365 case the version can not be retrieved reliably. 366 """ 367 header_format = '<8s96sQ' 368 magic, _, version = struct.unpack_from(header_format, blob) 369 if magic != 'CHROMEOS': 370 return -1 # This could be a corrupted version case. 371 return version 372 373 def retrieve_kernel_subkey_version(self, blob): 374 """Given a blob, retrieve kernel subkey version. 375 376 It is in firmware vblock's preamble. 377 """ 378 379 header_format = '<8s8sQ' 380 preamble_format = '<72sQ' 381 magic, _, kb_size = struct.unpack_from(header_format, blob) 382 383 if magic != 'CHROMEOS': 384 return -1 385 386 _, version = struct.unpack_from(preamble_format, blob, kb_size) 387 return version 388 389 def retrieve_preamble_flags(self, blob): 390 """Given a blob, retrieve preamble flags if available. 391 392 It only works for firmware. If the version of preamble header is less 393 than 2.1, no preamble flags supported, just returns 0. 394 """ 395 header_format = '<8s8sQ' 396 preamble_format = '<32sII64sI' 397 magic, _, kb_size = struct.unpack_from(header_format, blob) 398 399 if magic != 'CHROMEOS': 400 return -1 # This could be a corrupted version case. 401 402 _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob, 403 kb_size) 404 405 if ver > 2 or (ver == 2 and subver >= 1): 406 return flags 407 else: 408 return 0 # Returns 0 if preamble flags not available. 409 410 def read_partition(self, partition, size): 411 """Read the requested partition, up to size bytes.""" 412 tmp_file = self.state_dir_file('part.tmp') 413 self.run_shell_command( 414 'dd if=%s of=%s bs=1 count=%d' % (partition, tmp_file, size)) 415 data = self.read_file(tmp_file) 416 self.remove_file(tmp_file) 417 return data 418