• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Provides stubs for os, sys and subprocess for testing
6
7This test allows one to test code that itself uses os, sys, and subprocess.
8"""
9
10import ntpath
11import os
12import posixpath
13import re
14import shlex
15import sys
16
17
18class Override(object):
19  def __init__(self, base_module, module_list):
20    stubs = {'cloud_storage': CloudStorageModuleStub,
21             'open': OpenFunctionStub,
22             'os': OsModuleStub,
23             'perf_control': PerfControlModuleStub,
24             'raw_input': RawInputFunctionStub,
25             'subprocess': SubprocessModuleStub,
26             'sys': SysModuleStub,
27             'thermal_throttle': ThermalThrottleModuleStub,
28             'logging': LoggingStub,
29    }
30    self.adb_commands = None
31    self.os = None
32    self.subprocess = None
33    self.sys = None
34
35    self._base_module = base_module
36    self._overrides = {}
37
38    for module_name in module_list:
39      self._overrides[module_name] = getattr(base_module, module_name, None)
40      setattr(self, module_name, stubs[module_name]())
41      setattr(base_module, module_name, getattr(self, module_name))
42
43    if self.os and self.sys:
44      self.os.path.sys = self.sys
45
46  def __del__(self):
47    assert not len(self._overrides)
48
49  def Restore(self):
50    for module_name, original_module in self._overrides.iteritems():
51      if original_module is None:
52        # This will happen when we override built-in functions, like open.
53        # If we don't delete the attribute, we will shadow the built-in
54        # function with an attribute set to None.
55        delattr(self._base_module, module_name)
56      else:
57        setattr(self._base_module, module_name, original_module)
58    self._overrides = {}
59
60
61class AdbDevice(object):
62
63  def __init__(self):
64    self.has_root = False
65    self.needs_su = False
66    self.shell_command_handlers = {}
67    self.mock_content = []
68    self.system_properties = {}
69    if self.system_properties.get('ro.product.cpu.abi') == None:
70      self.system_properties['ro.product.cpu.abi'] = 'armeabi-v7a'
71
72  def HasRoot(self):
73    return self.has_root
74
75  def NeedsSU(self):
76    return self.needs_su
77
78  def RunShellCommand(self, args, **kwargs):
79    del kwargs  # unused
80    if isinstance(args, basestring):
81      args = shlex.split(args)
82    handler = self.shell_command_handlers[args[0]]
83    return handler(args)
84
85  def FileExists(self, _):
86    return False
87
88  def ReadFile(self, device_path, as_root=False):
89    del device_path, as_root  # unused
90    return self.mock_content
91
92  def GetProp(self, property_name):
93    return self.system_properties[property_name]
94
95  def SetProp(self, property_name, property_value):
96    self.system_properties[property_name] = property_value
97
98
99class CloudStorageModuleStub(object):
100  PUBLIC_BUCKET = 'chromium-telemetry'
101  PARTNER_BUCKET = 'chrome-partner-telemetry'
102  INTERNAL_BUCKET = 'chrome-telemetry'
103  BUCKET_ALIASES = {
104    'public': PUBLIC_BUCKET,
105    'partner': PARTNER_BUCKET,
106    'internal': INTERNAL_BUCKET,
107  }
108
109  # These are used to test for CloudStorage errors.
110  INTERNAL_PERMISSION = 2
111  PARTNER_PERMISSION = 1
112  PUBLIC_PERMISSION = 0
113  # Not logged in.
114  CREDENTIALS_ERROR_PERMISSION = -1
115
116  class NotFoundError(Exception):
117    pass
118
119  class CloudStorageError(Exception):
120    pass
121
122  class PermissionError(CloudStorageError):
123    pass
124
125  class CredentialsError(CloudStorageError):
126    pass
127
128  def __init__(self):
129    self.default_remote_paths = {CloudStorageModuleStub.INTERNAL_BUCKET:{},
130                                 CloudStorageModuleStub.PARTNER_BUCKET:{},
131                                 CloudStorageModuleStub.PUBLIC_BUCKET:{}}
132    self.remote_paths = self.default_remote_paths
133    self.local_file_hashes = {}
134    self.local_hash_files = {}
135    self.permission_level = CloudStorageModuleStub.INTERNAL_PERMISSION
136    self.downloaded_files = []
137
138  def SetPermissionLevelForTesting(self, permission_level):
139    self.permission_level = permission_level
140
141  def CheckPermissionLevelForBucket(self, bucket):
142    if bucket == CloudStorageModuleStub.PUBLIC_BUCKET:
143      return
144    elif (self.permission_level ==
145          CloudStorageModuleStub.CREDENTIALS_ERROR_PERMISSION):
146      raise CloudStorageModuleStub.CredentialsError()
147    elif bucket == CloudStorageModuleStub.PARTNER_BUCKET:
148      if self.permission_level < CloudStorageModuleStub.PARTNER_PERMISSION:
149        raise CloudStorageModuleStub.PermissionError()
150    elif bucket == CloudStorageModuleStub.INTERNAL_BUCKET:
151      if self.permission_level < CloudStorageModuleStub.INTERNAL_PERMISSION:
152        raise CloudStorageModuleStub.PermissionError()
153    elif bucket not in self.remote_paths:
154      raise CloudStorageModuleStub.NotFoundError()
155
156  def SetRemotePathsForTesting(self, remote_path_dict=None):
157    if not remote_path_dict:
158      self.remote_paths = self.default_remote_paths
159      return
160    self.remote_paths = remote_path_dict
161
162  def GetRemotePathsForTesting(self):
163    if not self.remote_paths:
164      self.remote_paths = self.default_remote_paths
165    return self.remote_paths
166
167  # Set a dictionary of data files and their "calculated" hashes.
168  def SetCalculatedHashesForTesting(self, calculated_hash_dictionary):
169    self.local_file_hashes = calculated_hash_dictionary
170
171  def GetLocalDataFiles(self):
172    return self.local_file_hashes.keys()
173
174  # Set a dictionary of hash files and the hashes they should contain.
175  def SetHashFileContentsForTesting(self, hash_file_dictionary):
176    self.local_hash_files = hash_file_dictionary
177
178  def GetLocalHashFiles(self):
179    return self.local_hash_files.keys()
180
181  def ChangeRemoteHashForTesting(self, bucket, remote_path, new_hash):
182    self.remote_paths[bucket][remote_path] = new_hash
183
184  def List(self, bucket):
185    if not bucket or not bucket in self.remote_paths:
186      bucket_error = ('Incorrect bucket specified, correct buckets:' +
187                      str(self.remote_paths))
188      raise CloudStorageModuleStub.CloudStorageError(bucket_error)
189    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
190    return list(self.remote_paths[bucket].keys())
191
192  def Exists(self, bucket, remote_path):
193    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
194    return remote_path in self.remote_paths[bucket]
195
196  def Insert(self, bucket, remote_path, local_path):
197    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
198    if not local_path in self.GetLocalDataFiles():
199      file_path_error = 'Local file path does not exist'
200      raise CloudStorageModuleStub.CloudStorageError(file_path_error)
201    self.remote_paths[bucket][remote_path] = (
202      CloudStorageModuleStub.CalculateHash(self, local_path))
203    return remote_path
204
205  def GetHelper(self, bucket, remote_path, local_path, only_if_changed):
206    CloudStorageModuleStub.CheckPermissionLevelForBucket(self, bucket)
207    if not remote_path in self.remote_paths[bucket]:
208      if only_if_changed:
209        return False
210      raise CloudStorageModuleStub.NotFoundError('Remote file does not exist.')
211    remote_hash = self.remote_paths[bucket][remote_path]
212    local_hash = self.local_file_hashes[local_path]
213    if only_if_changed and remote_hash == local_hash:
214      return False
215    self.downloaded_files.append(remote_path)
216    self.local_file_hashes[local_path] = remote_hash
217    self.local_hash_files[local_path + '.sha1'] = remote_hash
218    return remote_hash
219
220  def Get(self, bucket, remote_path, local_path):
221    return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
222                                            local_path, False)
223
224  def GetIfChanged(self, local_path, bucket=None):
225    remote_path = os.path.basename(local_path)
226    if bucket:
227      return CloudStorageModuleStub.GetHelper(self, bucket, remote_path,
228                                              local_path, True)
229    result = CloudStorageModuleStub.GetHelper(
230        self, self.PUBLIC_BUCKET, remote_path, local_path, True)
231    if not result:
232      result = CloudStorageModuleStub.GetHelper(
233          self, self.PARTNER_BUCKET, remote_path, local_path, True)
234    if not result:
235      result = CloudStorageModuleStub.GetHelper(
236          self, self.INTERNAL_BUCKET, remote_path, local_path, True)
237    return result
238
239  def GetFilesInDirectoryIfChanged(self, directory, bucket):
240    if os.path.dirname(directory) == directory: # If in the root dir.
241      raise ValueError('Trying to serve root directory from HTTP server.')
242    for dirpath, _, filenames in os.walk(directory):
243      for filename in filenames:
244        path, extension = os.path.splitext(
245            os.path.join(dirpath, filename))
246        if extension != '.sha1':
247          continue
248        self.GetIfChanged(path, bucket)
249
250  def CalculateHash(self, file_path):
251    return self.local_file_hashes[file_path]
252
253  def ReadHash(self, hash_path):
254    return self.local_hash_files[hash_path]
255
256
257class LoggingStub(object):
258  def __init__(self):
259    self.warnings = []
260    self.errors = []
261
262  def info(self, msg, *args):
263    pass
264
265  def error(self, msg, *args):
266    self.errors.append(msg % args)
267
268  def warning(self, msg, *args):
269    self.warnings.append(msg % args)
270
271  def warn(self, msg, *args):
272    self.warning(msg, *args)
273
274
275class OpenFunctionStub(object):
276  class FileStub(object):
277    def __init__(self, data):
278      self._data = data
279
280    def __enter__(self):
281      return self
282
283    def __exit__(self, *args):
284      pass
285
286    def read(self, size=None):
287      if size:
288        return self._data[:size]
289      else:
290        return self._data
291
292    def write(self, data):
293      self._data.write(data)
294
295    def close(self):
296      pass
297
298  def __init__(self):
299    self.files = {}
300
301  def __call__(self, name, *args, **kwargs):
302    return OpenFunctionStub.FileStub(self.files[name])
303
304
305class OsModuleStub(object):
306  class OsEnvironModuleStub(object):
307    def get(self, _):
308      return None
309
310  class OsPathModuleStub(object):
311    def __init__(self, sys_module):
312      self.sys = sys_module
313      self.files = []
314      self.dirs = []
315
316    def exists(self, path):
317      return path in self.files
318
319    def isfile(self, path):
320      return path in self.files
321
322    def isdir(self, path):
323      return path in self.dirs
324
325    def join(self, *paths):
326      def IsAbsolutePath(path):
327        if self.sys.platform.startswith('win'):
328          return re.match('[a-zA-Z]:\\\\', path)
329        else:
330          return path.startswith('/')
331
332      # Per Python specification, if any component is an absolute path,
333      # discard previous components.
334      for index, path in reversed(list(enumerate(paths))):
335        if IsAbsolutePath(path):
336          paths = paths[index:]
337          break
338
339      if self.sys.platform.startswith('win'):
340        tmp = os.path.join(*paths)
341        return tmp.replace('/', '\\')
342      else:
343        tmp = os.path.join(*paths)
344        return tmp.replace('\\', '/')
345
346    def basename(self, path):
347      if self.sys.platform.startswith('win'):
348        return ntpath.basename(path)
349      else:
350        return posixpath.basename(path)
351
352    @staticmethod
353    def abspath(path):
354      return os.path.abspath(path)
355
356    @staticmethod
357    def expanduser(path):
358      return os.path.expanduser(path)
359
360    @staticmethod
361    def dirname(path):
362      return os.path.dirname(path)
363
364    @staticmethod
365    def realpath(path):
366      return os.path.realpath(path)
367
368    @staticmethod
369    def split(path):
370      return os.path.split(path)
371
372    @staticmethod
373    def splitext(path):
374      return os.path.splitext(path)
375
376    @staticmethod
377    def splitdrive(path):
378      return os.path.splitdrive(path)
379
380  X_OK = os.X_OK
381
382  sep = os.sep
383  pathsep = os.pathsep
384
385  def __init__(self, sys_module=sys):
386    self.path = OsModuleStub.OsPathModuleStub(sys_module)
387    self.environ = OsModuleStub.OsEnvironModuleStub()
388    self.display = ':0'
389    self.local_app_data = None
390    self.sys_path = None
391    self.program_files = None
392    self.program_files_x86 = None
393    self.devnull = os.devnull
394    self._directory = {}
395
396  def access(self, path, _):
397    return path in self.path.files
398
399  def getenv(self, name, value=None):
400    if name == 'DISPLAY':
401      env = self.display
402    elif name == 'LOCALAPPDATA':
403      env = self.local_app_data
404    elif name == 'PATH':
405      env = self.sys_path
406    elif name == 'PROGRAMFILES':
407      env = self.program_files
408    elif name == 'PROGRAMFILES(X86)':
409      env = self.program_files_x86
410    else:
411      raise NotImplementedError('Unsupported getenv')
412    return env if env else value
413
414  def chdir(self, path):
415    pass
416
417  def walk(self, top):
418    for dir_name in self._directory:
419      yield top, dir_name, self._directory[dir_name]
420
421
422class PerfControlModuleStub(object):
423  class PerfControlStub(object):
424    def __init__(self, adb):
425      pass
426
427  def __init__(self):
428    self.PerfControl = PerfControlModuleStub.PerfControlStub
429
430
431class RawInputFunctionStub(object):
432  def __init__(self):
433    self.input = ''
434
435  def __call__(self, name, *args, **kwargs):
436    return self.input
437
438
439class SubprocessModuleStub(object):
440  class PopenStub(object):
441    def __init__(self):
442      self.communicate_result = ('', '')
443      self.returncode_result = 0
444
445    def __call__(self, args, **kwargs):
446      return self
447
448    def communicate(self):
449      return self.communicate_result
450
451    @property
452    def returncode(self):
453      return self.returncode_result
454
455  def __init__(self):
456    self.Popen = SubprocessModuleStub.PopenStub()
457    self.PIPE = None
458
459  def call(self, *args, **kwargs):
460    pass
461
462
463class SysModuleStub(object):
464  def __init__(self):
465    self.platform = ''
466
467
468class ThermalThrottleModuleStub(object):
469  class ThermalThrottleStub(object):
470    def __init__(self, adb):
471      pass
472
473  def __init__(self):
474    self.ThermalThrottle = ThermalThrottleModuleStub.ThermalThrottleStub
475