1# Copyright (c) 2010 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A module to provide interface to OS services.""" 6 7import datetime 8import os 9import re 10import struct 11 12import shell_wrapper 13 14 15class OSInterfaceError(Exception): 16 """OS interface specific exception.""" 17 pass 18 19class Crossystem(object): 20 """A wrapper for the crossystem utility.""" 21 22 # Code dedicated for user triggering recovery mode through crossystem. 23 USER_RECOVERY_REQUEST_CODE = '193' 24 25 def init(self, os_if): 26 """Init the instance. If running on Mario - adjust the map.""" 27 self.os_if = os_if 28 29 def __getattr__(self, name): 30 """ 31 Retrieve a crosssystem attribute. 32 33 Attempt to access crossystemobject.name will invoke `crossystem name' 34 and return the stdout as the value. 35 """ 36 return self.os_if.run_shell_command_get_output( 37 'crossystem %s' % name)[0] 38 39 def __setattr__(self, name, value): 40 if name in ('os_if',): 41 self.__dict__[name] = value 42 else: 43 self.os_if.run_shell_command('crossystem "%s=%s"' % (name, value)) 44 45 def request_recovery(self): 46 """Request recovery mode next time the target reboots.""" 47 48 self.__setattr__('recovery_request', self.USER_RECOVERY_REQUEST_CODE) 49 50 51class OSInterface(object): 52 """An object to encapsulate OS services functions.""" 53 54 ANDROID_TESTER_FILE = '/mnt/stateful_partition/.android_faft_tester' 55 56 def __init__(self): 57 """Object construction time initialization.""" 58 self.state_dir = None 59 self.log_file = None 60 self.cs = Crossystem() 61 self.is_android = os.path.isfile(self.ANDROID_TESTER_FILE) 62 if self.is_android: 63 self.shell = shell_wrapper.AdbShell() 64 self.host_shell = shell_wrapper.LocalShell() 65 else: 66 self.shell = shell_wrapper.LocalShell() 67 self.host_shell = None 68 69 70 def init(self, state_dir=None, log_file=None): 71 """Initialize the OS interface object. 72 73 Args: 74 state_dir - a string, the name of the directory (as defined by the 75 caller). The contents of this directory persist over 76 system restarts and power cycles. 77 log_file - a string, the name of the log file kept in the state 78 directory. 79 80 Default argument values support unit testing. 81 """ 82 self.cs.init(self) 83 self.state_dir = state_dir 84 85 if self.state_dir: 86 if not os.path.exists(self.state_dir): 87 try: 88 os.mkdir(self.state_dir) 89 except OSError, err: 90 raise OSInterfaceError(err) 91 if log_file: 92 if log_file[0] == '/': 93 self.log_file = log_file 94 else: 95 self.log_file = os.path.join(state_dir, log_file) 96 97 # Initialize the shell. Should be after creating the log file. 98 self.shell.init(self) 99 if self.host_shell: 100 self.host_shell.init(self) 101 102 def has_host(self): 103 """Return True if a host is connected to DUT.""" 104 return self.is_android 105 106 def run_shell_command(self, cmd): 107 """Run a shell command.""" 108 self.shell.run_command(cmd) 109 110 def run_shell_command_get_status(self, cmd): 111 """Run shell command and return its return code.""" 112 return self.shell.run_command_get_status(cmd) 113 114 def run_shell_command_get_output(self, cmd): 115 """Run shell command and return its console output.""" 116 return self.shell.run_command_get_output(cmd) 117 118 def run_host_shell_command(self, cmd, block=True): 119 """Run a shell command on the host.""" 120 if self.host_shell: 121 self.host_shell.run_command(cmd, block) 122 else: 123 raise OSInterfaceError('There is no host for DUT.') 124 125 def run_host_shell_command_get_status(self, cmd): 126 """Run shell command and return its return code on the host.""" 127 if self.host_shell: 128 return self.host_shell.run_command_get_status(cmd) 129 else: 130 raise OSInterfaceError('There is no host for DUT.') 131 132 def run_host_shell_command_get_output(self, cmd): 133 """Run shell command and return its console output.""" 134 if self.host_shell: 135 return self.host_shell.run_command_get_output(cmd) 136 else: 137 raise OSInterfaceError('There is no host for DUT.') 138 139 def read_file(self, path): 140 """Read the content of the file.""" 141 return self.shell.read_file(path) 142 143 def write_file(self, path, data): 144 """Write the data to the file.""" 145 self.shell.write_file(path, data) 146 147 def append_file(self, path, data): 148 """Append the data to the file.""" 149 self.shell.append_file(path, data) 150 151 def path_exists(self, path): 152 """Return True if the path exists on DUT.""" 153 cmd = 'test -e %s' % path 154 return self.run_shell_command_get_status(cmd) == 0 155 156 def is_dir(self, path): 157 """Return True if the path is a directory.""" 158 cmd = 'test -d %s' % path 159 return self.run_shell_command_get_status(cmd) == 0 160 161 def create_dir(self, path): 162 """Create a new directory.""" 163 cmd = 'mkdir -p %s' % path 164 return self.run_shell_command(cmd) 165 166 def create_temp_file(self, prefix): 167 """Create a temporary file with a prefix.""" 168 if self.is_android: 169 tmp_path = '/data/local/tmp' 170 else: 171 tmp_path = '/tmp' 172 cmd = 'mktemp -p %s %sXXXXXX' % (tmp_path, prefix) 173 return self.run_shell_command_get_output(cmd)[0] 174 175 def copy_file(self, from_path, to_path): 176 """Copy the file.""" 177 cmd = 'cp -f %s %s' % (from_path, to_path) 178 return self.run_shell_command(cmd) 179 180 def copy_dir(self, from_path, to_path): 181 """Copy the directory.""" 182 cmd = 'cp -rf %s %s' % (from_path, to_path) 183 return self.run_shell_command(cmd) 184 185 def remove_file(self, path): 186 """Remove the file.""" 187 cmd = 'rm -f %s' % path 188 return self.run_shell_command(cmd) 189 190 def remove_dir(self, path): 191 """Remove the directory.""" 192 cmd = 'rm -rf %s' % path 193 return self.run_shell_command(cmd) 194 195 def get_file_size(self, path): 196 """Get the size of the file.""" 197 cmd = 'stat -c %%s %s' % path 198 return int(self.run_shell_command_get_output(cmd)[0]) 199 200 def target_hosted(self): 201 """Return True if running on DUT.""" 202 if self.is_android: 203 return True 204 signature = open('/etc/lsb-release', 'r').readlines()[0] 205 return re.search(r'chrom(ium|e)os', signature, re.IGNORECASE) != None 206 207 def state_dir_file(self, file_name): 208 """Get a full path of a file in the state directory.""" 209 return os.path.join(self.state_dir, file_name) 210 211 def wait_for_device(self, timeout): 212 """Wait for an Android device to be connected.""" 213 return self.shell.wait_for_device(timeout) 214 215 def wait_for_no_device(self, timeout): 216 """Wait for no Android device to be connected (offline).""" 217 return self.shell.wait_for_no_device(timeout) 218 219 def log(self, text): 220 """Write text to the log file and print it on the screen, if enabled. 221 222 The entire log (maintained across reboots) can be found in 223 self.log_file. 224 """ 225 if not self.log_file or not os.path.exists(self.state_dir): 226 # Called before environment was initialized, ignore. 227 return 228 229 timestamp = datetime.datetime.strftime( 230 datetime.datetime.now(), '%I:%M:%S %p:') 231 232 with open(self.log_file, 'a') as log_f: 233 log_f.write('%s %s\n' % (timestamp, text)) 234 log_f.flush() 235 os.fdatasync(log_f) 236 237 def is_removable_device(self, device): 238 """Check if a certain storage device is removable. 239 240 device - a string, file name of a storage device or a device partition 241 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 242 243 Returns True if the device is removable, False if not. 244 """ 245 if self.is_android: 246 return False 247 248 if not self.target_hosted(): 249 return False 250 251 # Drop trailing digit(s) and letter(s) (if any) 252 base_dev = self.strip_part(device.split('/')[2]) 253 removable = int(self.read_file('/sys/block/%s/removable' % base_dev)) 254 255 return removable == 1 256 257 def get_internal_disk(self, device): 258 """Get the internal disk by given the current disk. 259 260 If device is removable device, internal disk is decided by which kind 261 of divice (arm or x86). Otherwise, return device itself. 262 263 device - a string, file name of a storage device or a device partition 264 (as in /dev/sda[0-9] or /dev/mmcblk0p[0-9]). 265 266 Return internal kernel disk. 267 """ 268 if self.is_removable_device(device): 269 for p in ('/dev/mmcblk0', '/dev/mmcblk1', '/dev/nvme0n1'): 270 if self.path_exists(p): 271 return p 272 return '/dev/sda' 273 else: 274 return self.strip_part(device) 275 276 def get_root_part(self): 277 """Return a string, the name of root device with partition number""" 278 # FIXME(waihong): Android doesn't support dual kernel/root and misses 279 # the related tools. Just return something that not break the existing 280 # code. 281 if self.is_android: 282 return '/dev/mmcblk0p3' 283 else: 284 return self.run_shell_command_get_output('rootdev -s')[0] 285 286 def get_root_dev(self): 287 """Return a string, the name of root device without partition number""" 288 return self.strip_part(self.get_root_part()) 289 290 def join_part(self, dev, part): 291 """Return a concatenated string of device and partition number""" 292 if dev.endswith(tuple(str(i) for i in range(0, 10))): 293 return dev + 'p' + part 294 else: 295 return dev + part 296 297 def strip_part(self, dev_with_part): 298 """Return a stripped string without partition number""" 299 dev_name_stripper = re.compile('p?[0-9]+$') 300 return dev_name_stripper.sub('', dev_with_part) 301 302 def retrieve_body_version(self, blob): 303 """Given a blob, retrieve body version. 304 305 Currently works for both, firmware and kernel blobs. Returns '-1' in 306 case the version can not be retrieved reliably. 307 """ 308 header_format = '<8s8sQ' 309 preamble_format = '<40sQ' 310 magic, _, kb_size = struct.unpack_from(header_format, blob) 311 312 if magic != 'CHROMEOS': 313 return -1 # This could be a corrupted version case. 314 315 _, version = struct.unpack_from(preamble_format, blob, kb_size) 316 return version 317 318 def retrieve_datakey_version(self, blob): 319 """Given a blob, retrieve firmware data key version. 320 321 Currently works for both, firmware and kernel blobs. Returns '-1' in 322 case the version can not be retrieved reliably. 323 """ 324 header_format = '<8s96sQ' 325 magic, _, version = struct.unpack_from(header_format, blob) 326 if magic != 'CHROMEOS': 327 return -1 # This could be a corrupted version case. 328 return version 329 330 def retrieve_kernel_subkey_version(self, blob): 331 """Given a blob, retrieve kernel subkey version. 332 333 It is in firmware vblock's preamble. 334 """ 335 336 header_format = '<8s8sQ' 337 preamble_format = '<72sQ' 338 magic, _, kb_size = struct.unpack_from(header_format, blob) 339 340 if magic != 'CHROMEOS': 341 return -1 342 343 _, version = struct.unpack_from(preamble_format, blob, kb_size) 344 return version 345 346 def retrieve_preamble_flags(self, blob): 347 """Given a blob, retrieve preamble flags if available. 348 349 It only works for firmware. If the version of preamble header is less 350 than 2.1, no preamble flags supported, just returns 0. 351 """ 352 header_format = '<8s8sQ' 353 preamble_format = '<32sII64sI' 354 magic, _, kb_size = struct.unpack_from(header_format, blob) 355 356 if magic != 'CHROMEOS': 357 return -1 # This could be a corrupted version case. 358 359 _, ver, subver, _, flags = struct.unpack_from(preamble_format, blob, 360 kb_size) 361 362 if ver > 2 or (ver == 2 and subver >= 1): 363 return flags 364 else: 365 return 0 # Returns 0 if preamble flags not available. 366 367 def read_partition(self, partition, size): 368 """Read the requested partition, up to size bytes.""" 369 tmp_file = self.state_dir_file('part.tmp') 370 self.run_shell_command('dd if=%s of=%s bs=1 count=%d' % ( 371 partition, tmp_file, size)) 372 data = self.read_file(tmp_file) 373 self.remove_file(tmp_file) 374 return data 375