• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import builtins
2import contextlib
3import errno
4import functools
5import importlib
6from importlib import machinery, util, invalidate_caches
7import os
8import os.path
9from test import support
10import unittest
11import sys
12import tempfile
13import types
14
15
16BUILTINS = types.SimpleNamespace()
17BUILTINS.good_name = None
18BUILTINS.bad_name = None
19if 'errno' in sys.builtin_module_names:
20    BUILTINS.good_name = 'errno'
21if 'importlib' not in sys.builtin_module_names:
22    BUILTINS.bad_name = 'importlib'
23
24EXTENSIONS = types.SimpleNamespace()
25EXTENSIONS.path = None
26EXTENSIONS.ext = None
27EXTENSIONS.filename = None
28EXTENSIONS.file_path = None
29EXTENSIONS.name = '_testcapi'
30
31def _extension_details():
32    global EXTENSIONS
33    for path in sys.path:
34        for ext in machinery.EXTENSION_SUFFIXES:
35            filename = EXTENSIONS.name + ext
36            file_path = os.path.join(path, filename)
37            if os.path.exists(file_path):
38                EXTENSIONS.path = path
39                EXTENSIONS.ext = ext
40                EXTENSIONS.filename = filename
41                EXTENSIONS.file_path = file_path
42                return
43
44_extension_details()
45
46
47def import_importlib(module_name):
48    """Import a module from importlib both w/ and w/o _frozen_importlib."""
49    fresh = ('importlib',) if '.' in module_name else ()
50    frozen = support.import_fresh_module(module_name)
51    source = support.import_fresh_module(module_name, fresh=fresh,
52                                         blocked=('_frozen_importlib', '_frozen_importlib_external'))
53    return {'Frozen': frozen, 'Source': source}
54
55
56def specialize_class(cls, kind, base=None, **kwargs):
57    # XXX Support passing in submodule names--load (and cache) them?
58    # That would clean up the test modules a bit more.
59    if base is None:
60        base = unittest.TestCase
61    elif not isinstance(base, type):
62        base = base[kind]
63    name = '{}_{}'.format(kind, cls.__name__)
64    bases = (cls, base)
65    specialized = types.new_class(name, bases)
66    specialized.__module__ = cls.__module__
67    specialized._NAME = cls.__name__
68    specialized._KIND = kind
69    for attr, values in kwargs.items():
70        value = values[kind]
71        setattr(specialized, attr, value)
72    return specialized
73
74
75def split_frozen(cls, base=None, **kwargs):
76    frozen = specialize_class(cls, 'Frozen', base, **kwargs)
77    source = specialize_class(cls, 'Source', base, **kwargs)
78    return frozen, source
79
80
81def test_both(test_class, base=None, **kwargs):
82    return split_frozen(test_class, base, **kwargs)
83
84
85CASE_INSENSITIVE_FS = True
86# Windows is the only OS that is *always* case-insensitive
87# (OS X *can* be case-sensitive).
88if sys.platform not in ('win32', 'cygwin'):
89    changed_name = __file__.upper()
90    if changed_name == __file__:
91        changed_name = __file__.lower()
92    if not os.path.exists(changed_name):
93        CASE_INSENSITIVE_FS = False
94
95source_importlib = import_importlib('importlib')['Source']
96__import__ = {'Frozen': staticmethod(builtins.__import__),
97              'Source': staticmethod(source_importlib.__import__)}
98
99
100def case_insensitive_tests(test):
101    """Class decorator that nullifies tests requiring a case-insensitive
102    file system."""
103    return unittest.skipIf(not CASE_INSENSITIVE_FS,
104                            "requires a case-insensitive filesystem")(test)
105
106
107def submodule(parent, name, pkg_dir, content=''):
108    path = os.path.join(pkg_dir, name + '.py')
109    with open(path, 'w') as subfile:
110        subfile.write(content)
111    return '{}.{}'.format(parent, name), path
112
113
114@contextlib.contextmanager
115def uncache(*names):
116    """Uncache a module from sys.modules.
117
118    A basic sanity check is performed to prevent uncaching modules that either
119    cannot/shouldn't be uncached.
120
121    """
122    for name in names:
123        if name in ('sys', 'marshal', 'imp'):
124            raise ValueError(
125                "cannot uncache {0}".format(name))
126        try:
127            del sys.modules[name]
128        except KeyError:
129            pass
130    try:
131        yield
132    finally:
133        for name in names:
134            try:
135                del sys.modules[name]
136            except KeyError:
137                pass
138
139
140@contextlib.contextmanager
141def temp_module(name, content='', *, pkg=False):
142    conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
143    with support.temp_cwd(None) as cwd:
144        with uncache(name, *conflicts):
145            with support.DirsOnSysPath(cwd):
146                invalidate_caches()
147
148                location = os.path.join(cwd, name)
149                if pkg:
150                    modpath = os.path.join(location, '__init__.py')
151                    os.mkdir(name)
152                else:
153                    modpath = location + '.py'
154                    if content is None:
155                        # Make sure the module file gets created.
156                        content = ''
157                if content is not None:
158                    # not a namespace package
159                    with open(modpath, 'w') as modfile:
160                        modfile.write(content)
161                yield location
162
163
164@contextlib.contextmanager
165def import_state(**kwargs):
166    """Context manager to manage the various importers and stored state in the
167    sys module.
168
169    The 'modules' attribute is not supported as the interpreter state stores a
170    pointer to the dict that the interpreter uses internally;
171    reassigning to sys.modules does not have the desired effect.
172
173    """
174    originals = {}
175    try:
176        for attr, default in (('meta_path', []), ('path', []),
177                              ('path_hooks', []),
178                              ('path_importer_cache', {})):
179            originals[attr] = getattr(sys, attr)
180            if attr in kwargs:
181                new_value = kwargs[attr]
182                del kwargs[attr]
183            else:
184                new_value = default
185            setattr(sys, attr, new_value)
186        if len(kwargs):
187            raise ValueError(
188                    'unrecognized arguments: {0}'.format(kwargs.keys()))
189        yield
190    finally:
191        for attr, value in originals.items():
192            setattr(sys, attr, value)
193
194
195class _ImporterMock:
196
197    """Base class to help with creating importer mocks."""
198
199    def __init__(self, *names, module_code={}):
200        self.modules = {}
201        self.module_code = {}
202        for name in names:
203            if not name.endswith('.__init__'):
204                import_name = name
205            else:
206                import_name = name[:-len('.__init__')]
207            if '.' not in name:
208                package = None
209            elif import_name == name:
210                package = name.rsplit('.', 1)[0]
211            else:
212                package = import_name
213            module = types.ModuleType(import_name)
214            module.__loader__ = self
215            module.__file__ = '<mock __file__>'
216            module.__package__ = package
217            module.attr = name
218            if import_name != name:
219                module.__path__ = ['<mock __path__>']
220            self.modules[import_name] = module
221            if import_name in module_code:
222                self.module_code[import_name] = module_code[import_name]
223
224    def __getitem__(self, name):
225        return self.modules[name]
226
227    def __enter__(self):
228        self._uncache = uncache(*self.modules.keys())
229        self._uncache.__enter__()
230        return self
231
232    def __exit__(self, *exc_info):
233        self._uncache.__exit__(None, None, None)
234
235
236class mock_modules(_ImporterMock):
237
238    """Importer mock using PEP 302 APIs."""
239
240    def find_module(self, fullname, path=None):
241        if fullname not in self.modules:
242            return None
243        else:
244            return self
245
246    def load_module(self, fullname):
247        if fullname not in self.modules:
248            raise ImportError
249        else:
250            sys.modules[fullname] = self.modules[fullname]
251            if fullname in self.module_code:
252                try:
253                    self.module_code[fullname]()
254                except Exception:
255                    del sys.modules[fullname]
256                    raise
257            return self.modules[fullname]
258
259
260class mock_spec(_ImporterMock):
261
262    """Importer mock using PEP 451 APIs."""
263
264    def find_spec(self, fullname, path=None, parent=None):
265        try:
266            module = self.modules[fullname]
267        except KeyError:
268            return None
269        spec = util.spec_from_file_location(
270                fullname, module.__file__, loader=self,
271                submodule_search_locations=getattr(module, '__path__', None))
272        return spec
273
274    def create_module(self, spec):
275        if spec.name not in self.modules:
276            raise ImportError
277        return self.modules[spec.name]
278
279    def exec_module(self, module):
280        try:
281            self.module_code[module.__spec__.name]()
282        except KeyError:
283            pass
284
285
286def writes_bytecode_files(fxn):
287    """Decorator to protect sys.dont_write_bytecode from mutation and to skip
288    tests that require it to be set to False."""
289    if sys.dont_write_bytecode:
290        return lambda *args, **kwargs: None
291    @functools.wraps(fxn)
292    def wrapper(*args, **kwargs):
293        original = sys.dont_write_bytecode
294        sys.dont_write_bytecode = False
295        try:
296            to_return = fxn(*args, **kwargs)
297        finally:
298            sys.dont_write_bytecode = original
299        return to_return
300    return wrapper
301
302
303def ensure_bytecode_path(bytecode_path):
304    """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
305
306    :param bytecode_path: File system path to PEP 3147 pyc file.
307    """
308    try:
309        os.mkdir(os.path.dirname(bytecode_path))
310    except OSError as error:
311        if error.errno != errno.EEXIST:
312            raise
313
314
315@contextlib.contextmanager
316def create_modules(*names):
317    """Temporarily create each named module with an attribute (named 'attr')
318    that contains the name passed into the context manager that caused the
319    creation of the module.
320
321    All files are created in a temporary directory returned by
322    tempfile.mkdtemp(). This directory is inserted at the beginning of
323    sys.path. When the context manager exits all created files (source and
324    bytecode) are explicitly deleted.
325
326    No magic is performed when creating packages! This means that if you create
327    a module within a package you must also create the package's __init__ as
328    well.
329
330    """
331    source = 'attr = {0!r}'
332    created_paths = []
333    mapping = {}
334    state_manager = None
335    uncache_manager = None
336    try:
337        temp_dir = tempfile.mkdtemp()
338        mapping['.root'] = temp_dir
339        import_names = set()
340        for name in names:
341            if not name.endswith('__init__'):
342                import_name = name
343            else:
344                import_name = name[:-len('.__init__')]
345            import_names.add(import_name)
346            if import_name in sys.modules:
347                del sys.modules[import_name]
348            name_parts = name.split('.')
349            file_path = temp_dir
350            for directory in name_parts[:-1]:
351                file_path = os.path.join(file_path, directory)
352                if not os.path.exists(file_path):
353                    os.mkdir(file_path)
354                    created_paths.append(file_path)
355            file_path = os.path.join(file_path, name_parts[-1] + '.py')
356            with open(file_path, 'w') as file:
357                file.write(source.format(name))
358            created_paths.append(file_path)
359            mapping[name] = file_path
360        uncache_manager = uncache(*import_names)
361        uncache_manager.__enter__()
362        state_manager = import_state(path=[temp_dir])
363        state_manager.__enter__()
364        yield mapping
365    finally:
366        if state_manager is not None:
367            state_manager.__exit__(None, None, None)
368        if uncache_manager is not None:
369            uncache_manager.__exit__(None, None, None)
370        support.rmtree(temp_dir)
371
372
373def mock_path_hook(*entries, importer):
374    """A mock sys.path_hooks entry."""
375    def hook(entry):
376        if entry not in entries:
377            raise ImportError
378        return importer
379    return hook
380
381
382class CASEOKTestBase:
383
384    def caseok_env_changed(self, *, should_exist):
385        possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
386        if any(x in self.importlib._bootstrap_external._os.environ
387                    for x in possibilities) != should_exist:
388            self.skipTest('os.environ changes not reflected in _os.environ')
389