1# Copyright 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Provides stubs for os, sys and subprocess for testing 6 7This test allows one to test code that itself uses os, sys, and subprocess. 8""" 9 10import ntpath 11import os 12import posixpath 13import re 14import shlex 15import sys 16 17 18class Override(object): 19 def __init__(self, base_module, module_list): 20 stubs = {'cloud_storage': CloudStorageModuleStub, 21 'open': OpenFunctionStub, 22 'os': OsModuleStub, 23 'perf_control': PerfControlModuleStub, 24 'raw_input': RawInputFunctionStub, 25 'subprocess': SubprocessModuleStub, 26 'sys': SysModuleStub, 27 'thermal_throttle': ThermalThrottleModuleStub, 28 'logging': LoggingStub, 29 } 30 self.adb_commands = None 31 self.os = None 32 self.subprocess = None 33 self.sys = None 34 35 self._base_module = base_module 36 self._overrides = {} 37 38 for module_name in module_list: 39 self._overrides[module_name] = getattr(base_module, module_name, None) 40 setattr(self, module_name, stubs[module_name]()) 41 setattr(base_module, module_name, getattr(self, module_name)) 42 43 if self.os and self.sys: 44 self.os.path.sys = self.sys 45 46 def __del__(self): 47 assert not len(self._overrides) 48 49 def Restore(self): 50 for module_name, original_module in self._overrides.iteritems(): 51 if original_module is None: 52 # This will happen when we override built-in functions, like open. 53 # If we don't delete the attribute, we will shadow the built-in 54 # function with an attribute set to None. 55 delattr(self._base_module, module_name) 56 else: 57 setattr(self._base_module, module_name, original_module) 58 self._overrides = {} 59 60 61class AdbDevice(object): 62 63 def __init__(self): 64 self.has_root = False 65 self.needs_su = False 66 self.shell_command_handlers = {} 67 self.mock_content = [] 68 self.system_properties = {} 69 if self.system_properties.get('ro.product.cpu.abi') == None: 70 self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a' 71 72 def HasRoot(self): 73 return self.has_root 74 75 def NeedsSU(self): 76 return self.needs_su 77 78 def RunShellCommand(self, args, **kwargs): 79 del kwargs # unused 80 if isinstance(args, basestring): 81 args = shlex.split(args) 82 handler = self.shell_command_handlers[args[0]] 83 return handler(args) 84 85 def FileExists(self, _): 86 return False 87 88 def ReadFile(self, device_path, as_root=False): 89 del device_path, as_root # unused 90 return self.mock_content 91 92 def GetProp(self, property_name): 93 return self.system_properties[property_name] 94 95 def SetProp(self, property_name, property_value): 96 self.system_properties[property_name] = property_value 97 98 99class CloudStorageModuleStub(object): 100 PUBLIC_BUCKET = 'chromium-telemetry' 101 PARTNER_BUCKET = 'chrome-partner-telemetry' 102 INTERNAL_BUCKET = 'chrome-telemetry' 103 BUCKET_ALIASES = { 104 'public': PUBLIC_BUCKET, 105 'partner': PARTNER_BUCKET, 106 'internal': INTERNAL_BUCKET, 107 } 108 109 # These are used to test for CloudStorage errors. 110 INTERNAL_PERMISSION = 2 111 PARTNER_PERMISSION = 1 112 PUBLIC_PERMISSION = 0 113 # Not logged in. 114 CREDENTIALS_ERROR_PERMISSION = -1 115 116 class NotFoundError(Exception): 117 pass 118 119 class CloudStorageError(Exception): 120 pass 121 122 class PermissionError(CloudStorageError): 123 pass 124 125 class CredentialsError(CloudStorageError): 126 pass 127 128 def __init__(self): 129 self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{}, 130 CloudStorageModuleStub.PARTNER_BUCKET:{}, 131 CloudStorageModuleStub.PUBLIC_BUCKET:{}} 132 self.remote_paths = self.default_remote_paths 133 self.local_file_hashes = {} 134 self.local_hash_files = {} 135 self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION 136 self.downloaded_files = [] 137 138 def SetPermissionLevelForTesting(self, permission_level): 139 self.permission_level = permission_level 140 141 def CheckPermissionLevelForBucket(self, bucket): 142 if bucket == CloudStorageModuleStub.PUBLIC_BUCKET: 143 return 144 elif (self.permission_level == 145 CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION): 146 raise CloudStorageModuleStub.CredentialsError() 147 elif bucket == CloudStorageModuleStub.PARTNER_BUCKET: 148 if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION: 149 raise CloudStorageModuleStub.PermissionError() 150 elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET: 151 if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION: 152 raise CloudStorageModuleStub.PermissionError() 153 elif bucket not in self.remote_paths: 154 raise CloudStorageModuleStub.NotFoundError() 155 156 def SetRemotePathsForTesting(self, remote_path_dict=None): 157 if not remote_path_dict: 158 self.remote_paths = self.default_remote_paths 159 return 160 self.remote_paths = remote_path_dict 161 162 def GetRemotePathsForTesting(self): 163 if not self.remote_paths: 164 self.remote_paths = self.default_remote_paths 165 return self.remote_paths 166 167 # Set a dictionary of data files and their "calculated" hashes. 168 def SetCalculatedHashesForTesting(self, calculated_hash_dictionary): 169 self.local_file_hashes = calculated_hash_dictionary 170 171 def GetLocalDataFiles(self): 172 return self.local_file_hashes.keys() 173 174 # Set a dictionary of hash files and the hashes they should contain. 175 def SetHashFileContentsForTesting(self, hash_file_dictionary): 176 self.local_hash_files = hash_file_dictionary 177 178 def GetLocalHashFiles(self): 179 return self.local_hash_files.keys() 180 181 def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash): 182 self.remote_paths[bucket][remote_path] = new_hash 183 184 def List(self, bucket): 185 if not bucket or not bucket in self.remote_paths: 186 bucket_error = ('Incorrect bucket specified, correct buckets:' + 187 str(self.remote_paths)) 188 raise CloudStorageModuleStub.CloudStorageError(bucket_error) 189 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 190 return list(self.remote_paths[bucket].keys()) 191 192 def Exists(self, bucket, remote_path): 193 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 194 return remote_path in self.remote_paths[bucket] 195 196 def Insert(self, bucket, remote_path, local_path): 197 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 198 if not local_path in self.GetLocalDataFiles(): 199 file_path_error = 'Local file path does not exist' 200 raise CloudStorageModuleStub.CloudStorageError(file_path_error) 201 self.remote_paths[bucket][remote_path] = ( 202 CloudStorageModuleStub.CalculateHash(self, local_path)) 203 return remote_path 204 205 def GetHelper(self, bucket, remote_path, local_path, only_if_changed): 206 CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket) 207 if not remote_path in self.remote_paths[bucket]: 208 if only_if_changed: 209 return False 210 raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.') 211 remote_hash = self.remote_paths[bucket][remote_path] 212 local_hash = self.local_file_hashes[local_path] 213 if only_if_changed and remote_hash == local_hash: 214 return False 215 self.downloaded_files.append(remote_path) 216 self.local_file_hashes[local_path] = remote_hash 217 self.local_hash_files[local_path + '.sha1'] = remote_hash 218 return remote_hash 219 220 def Get(self, bucket, remote_path, local_path): 221 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, 222 local_path, False) 223 224 def GetIfChanged(self, local_path, bucket=None): 225 remote_path = os.path.basename(local_path) 226 if bucket: 227 return CloudStorageModuleStub.GetHelper(self, bucket, remote_path, 228 local_path, True) 229 result = CloudStorageModuleStub.GetHelper( 230 self, self.PUBLIC_BUCKET, remote_path, local_path, True) 231 if not result: 232 result = CloudStorageModuleStub.GetHelper( 233 self, self.PARTNER_BUCKET, remote_path, local_path, True) 234 if not result: 235 result = CloudStorageModuleStub.GetHelper( 236 self, self.INTERNAL_BUCKET, remote_path, local_path, True) 237 return result 238 239 def GetFilesInDirectoryIfChanged(self, directory, bucket): 240 if os.path.dirname(directory) == directory: # If in the root dir. 241 raise ValueError('Trying to serve root directory from HTTP server.') 242 for dirpath, _, filenames in os.walk(directory): 243 for filename in filenames: 244 path, extension = os.path.splitext( 245 os.path.join(dirpath, filename)) 246 if extension != '.sha1': 247 continue 248 self.GetIfChanged(path, bucket) 249 250 def CalculateHash(self, file_path): 251 return self.local_file_hashes[file_path] 252 253 def ReadHash(self, hash_path): 254 return self.local_hash_files[hash_path] 255 256 257class LoggingStub(object): 258 def __init__(self): 259 self.warnings = [] 260 self.errors = [] 261 262 def info(self, msg, *args): 263 pass 264 265 def error(self, msg, *args): 266 self.errors.append(msg % args) 267 268 def warning(self, msg, *args): 269 self.warnings.append(msg % args) 270 271 def warn(self, msg, *args): 272 self.warning(msg, *args) 273 274 275class OpenFunctionStub(object): 276 class FileStub(object): 277 def __init__(self, data): 278 self._data = data 279 280 def __enter__(self): 281 return self 282 283 def __exit__(self, *args): 284 pass 285 286 def read(self, size=None): 287 if size: 288 return self._data[:size] 289 else: 290 return self._data 291 292 def write(self, data): 293 self._data.write(data) 294 295 def close(self): 296 pass 297 298 def __init__(self): 299 self.files = {} 300 301 def __call__(self, name, *args, **kwargs): 302 return OpenFunctionStub.FileStub(self.files[name]) 303 304 305class OsModuleStub(object): 306 class OsEnvironModuleStub(object): 307 def get(self, _): 308 return None 309 310 class OsPathModuleStub(object): 311 def __init__(self, sys_module): 312 self.sys = sys_module 313 self.files = [] 314 self.dirs = [] 315 316 def exists(self, path): 317 return path in self.files 318 319 def isfile(self, path): 320 return path in self.files 321 322 def isdir(self, path): 323 return path in self.dirs 324 325 def join(self, *paths): 326 def IsAbsolutePath(path): 327 if self.sys.platform.startswith('win'): 328 return re.match('[a-zA-Z]:\\\\', path) 329 else: 330 return path.startswith('/') 331 332 # Per Python specification, if any component is an absolute path, 333 # discard previous components. 334 for index, path in reversed(list(enumerate(paths))): 335 if IsAbsolutePath(path): 336 paths = paths[index:] 337 break 338 339 if self.sys.platform.startswith('win'): 340 tmp = os.path.join(*paths) 341 return tmp.replace('/', '\\') 342 else: 343 tmp = os.path.join(*paths) 344 return tmp.replace('\\', '/') 345 346 def basename(self, path): 347 if self.sys.platform.startswith('win'): 348 return ntpath.basename(path) 349 else: 350 return posixpath.basename(path) 351 352 @staticmethod 353 def abspath(path): 354 return os.path.abspath(path) 355 356 @staticmethod 357 def expanduser(path): 358 return os.path.expanduser(path) 359 360 @staticmethod 361 def dirname(path): 362 return os.path.dirname(path) 363 364 @staticmethod 365 def realpath(path): 366 return os.path.realpath(path) 367 368 @staticmethod 369 def split(path): 370 return os.path.split(path) 371 372 @staticmethod 373 def splitext(path): 374 return os.path.splitext(path) 375 376 @staticmethod 377 def splitdrive(path): 378 return os.path.splitdrive(path) 379 380 X_OK = os.X_OK 381 382 sep = os.sep 383 pathsep = os.pathsep 384 385 def __init__(self, sys_module=sys): 386 self.path = OsModuleStub.OsPathModuleStub(sys_module) 387 self.environ = OsModuleStub.OsEnvironModuleStub() 388 self.display = ':0' 389 self.local_app_data = None 390 self.sys_path = None 391 self.program_files = None 392 self.program_files_x86 = None 393 self.devnull = os.devnull 394 self._directory = {} 395 396 def access(self, path, _): 397 return path in self.path.files 398 399 def getenv(self, name, value=None): 400 if name == 'DISPLAY': 401 env = self.display 402 elif name == 'LOCALAPPDATA': 403 env = self.local_app_data 404 elif name == 'PATH': 405 env = self.sys_path 406 elif name == 'PROGRAMFILES': 407 env = self.program_files 408 elif name == 'PROGRAMFILES(X86)': 409 env = self.program_files_x86 410 else: 411 raise NotImplementedError('Unsupported getenv') 412 return env if env else value 413 414 def chdir(self, path): 415 pass 416 417 def walk(self, top): 418 for dir_name in self._directory: 419 yield top, dir_name, self._directory[dir_name] 420 421 422class PerfControlModuleStub(object): 423 class PerfControlStub(object): 424 def __init__(self, adb): 425 pass 426 427 def __init__(self): 428 self.PerfControl = PerfControlModuleStub.PerfControlStub 429 430 431class RawInputFunctionStub(object): 432 def __init__(self): 433 self.input = '' 434 435 def __call__(self, name, *args, **kwargs): 436 return self.input 437 438 439class SubprocessModuleStub(object): 440 class PopenStub(object): 441 def __init__(self): 442 self.communicate_result = ('', '') 443 self.returncode_result = 0 444 445 def __call__(self, args, **kwargs): 446 return self 447 448 def communicate(self): 449 return self.communicate_result 450 451 @property 452 def returncode(self): 453 return self.returncode_result 454 455 def __init__(self): 456 self.Popen = SubprocessModuleStub.PopenStub() 457 self.PIPE = None 458 459 def call(self, *args, **kwargs): 460 pass 461 462 463class SysModuleStub(object): 464 def __init__(self): 465 self.platform = '' 466 467 468class ThermalThrottleModuleStub(object): 469 class ThermalThrottleStub(object): 470 def __init__(self, adb): 471 pass 472 473 def __init__(self): 474 self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub 475