• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import abc
2import builtins
3import contextlib
4import errno
5import functools
6import importlib
7from importlib import machinery, util, invalidate_caches
8from importlib.abc import ResourceReader
9import io
10import os
11import os.path
12from pathlib import Path, PurePath
13from test import support
14import unittest
15import sys
16import tempfile
17import types
18
19from . import data01
20from . import zipdata01
21
22
23BUILTINS = types.SimpleNamespace()
24BUILTINS.good_name = None
25BUILTINS.bad_name = None
26if 'errno' in sys.builtin_module_names:
27    BUILTINS.good_name = 'errno'
28if 'importlib' not in sys.builtin_module_names:
29    BUILTINS.bad_name = 'importlib'
30
31EXTENSIONS = types.SimpleNamespace()
32EXTENSIONS.path = None
33EXTENSIONS.ext = None
34EXTENSIONS.filename = None
35EXTENSIONS.file_path = None
36EXTENSIONS.name = '_testcapi'
37
38def _extension_details():
39    global EXTENSIONS
40    for path in sys.path:
41        for ext in machinery.EXTENSION_SUFFIXES:
42            filename = EXTENSIONS.name + ext
43            file_path = os.path.join(path, filename)
44            if os.path.exists(file_path):
45                EXTENSIONS.path = path
46                EXTENSIONS.ext = ext
47                EXTENSIONS.filename = filename
48                EXTENSIONS.file_path = file_path
49                return
50
51_extension_details()
52
53
54def import_importlib(module_name):
55    """Import a module from importlib both w/ and w/o _frozen_importlib."""
56    fresh = ('importlib',) if '.' in module_name else ()
57    frozen = support.import_fresh_module(module_name)
58    source = support.import_fresh_module(module_name, fresh=fresh,
59                                         blocked=('_frozen_importlib', '_frozen_importlib_external'))
60    return {'Frozen': frozen, 'Source': source}
61
62
63def specialize_class(cls, kind, base=None, **kwargs):
64    # XXX Support passing in submodule names--load (and cache) them?
65    # That would clean up the test modules a bit more.
66    if base is None:
67        base = unittest.TestCase
68    elif not isinstance(base, type):
69        base = base[kind]
70    name = '{}_{}'.format(kind, cls.__name__)
71    bases = (cls, base)
72    specialized = types.new_class(name, bases)
73    specialized.__module__ = cls.__module__
74    specialized._NAME = cls.__name__
75    specialized._KIND = kind
76    for attr, values in kwargs.items():
77        value = values[kind]
78        setattr(specialized, attr, value)
79    return specialized
80
81
82def split_frozen(cls, base=None, **kwargs):
83    frozen = specialize_class(cls, 'Frozen', base, **kwargs)
84    source = specialize_class(cls, 'Source', base, **kwargs)
85    return frozen, source
86
87
88def test_both(test_class, base=None, **kwargs):
89    return split_frozen(test_class, base, **kwargs)
90
91
92CASE_INSENSITIVE_FS = True
93# Windows is the only OS that is *always* case-insensitive
94# (OS X *can* be case-sensitive).
95if sys.platform not in ('win32', 'cygwin'):
96    changed_name = __file__.upper()
97    if changed_name == __file__:
98        changed_name = __file__.lower()
99    if not os.path.exists(changed_name):
100        CASE_INSENSITIVE_FS = False
101
102source_importlib = import_importlib('importlib')['Source']
103__import__ = {'Frozen': staticmethod(builtins.__import__),
104              'Source': staticmethod(source_importlib.__import__)}
105
106
107def case_insensitive_tests(test):
108    """Class decorator that nullifies tests requiring a case-insensitive
109    file system."""
110    return unittest.skipIf(not CASE_INSENSITIVE_FS,
111                            "requires a case-insensitive filesystem")(test)
112
113
114def submodule(parent, name, pkg_dir, content=''):
115    path = os.path.join(pkg_dir, name + '.py')
116    with open(path, 'w') as subfile:
117        subfile.write(content)
118    return '{}.{}'.format(parent, name), path
119
120
121@contextlib.contextmanager
122def uncache(*names):
123    """Uncache a module from sys.modules.
124
125    A basic sanity check is performed to prevent uncaching modules that either
126    cannot/shouldn't be uncached.
127
128    """
129    for name in names:
130        if name in ('sys', 'marshal', 'imp'):
131            raise ValueError(
132                "cannot uncache {0}".format(name))
133        try:
134            del sys.modules[name]
135        except KeyError:
136            pass
137    try:
138        yield
139    finally:
140        for name in names:
141            try:
142                del sys.modules[name]
143            except KeyError:
144                pass
145
146
147@contextlib.contextmanager
148def temp_module(name, content='', *, pkg=False):
149    conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
150    with support.temp_cwd(None) as cwd:
151        with uncache(name, *conflicts):
152            with support.DirsOnSysPath(cwd):
153                invalidate_caches()
154
155                location = os.path.join(cwd, name)
156                if pkg:
157                    modpath = os.path.join(location, '__init__.py')
158                    os.mkdir(name)
159                else:
160                    modpath = location + '.py'
161                    if content is None:
162                        # Make sure the module file gets created.
163                        content = ''
164                if content is not None:
165                    # not a namespace package
166                    with open(modpath, 'w') as modfile:
167                        modfile.write(content)
168                yield location
169
170
171@contextlib.contextmanager
172def import_state(**kwargs):
173    """Context manager to manage the various importers and stored state in the
174    sys module.
175
176    The 'modules' attribute is not supported as the interpreter state stores a
177    pointer to the dict that the interpreter uses internally;
178    reassigning to sys.modules does not have the desired effect.
179
180    """
181    originals = {}
182    try:
183        for attr, default in (('meta_path', []), ('path', []),
184                              ('path_hooks', []),
185                              ('path_importer_cache', {})):
186            originals[attr] = getattr(sys, attr)
187            if attr in kwargs:
188                new_value = kwargs[attr]
189                del kwargs[attr]
190            else:
191                new_value = default
192            setattr(sys, attr, new_value)
193        if len(kwargs):
194            raise ValueError(
195                    'unrecognized arguments: {0}'.format(kwargs.keys()))
196        yield
197    finally:
198        for attr, value in originals.items():
199            setattr(sys, attr, value)
200
201
202class _ImporterMock:
203
204    """Base class to help with creating importer mocks."""
205
206    def __init__(self, *names, module_code={}):
207        self.modules = {}
208        self.module_code = {}
209        for name in names:
210            if not name.endswith('.__init__'):
211                import_name = name
212            else:
213                import_name = name[:-len('.__init__')]
214            if '.' not in name:
215                package = None
216            elif import_name == name:
217                package = name.rsplit('.', 1)[0]
218            else:
219                package = import_name
220            module = types.ModuleType(import_name)
221            module.__loader__ = self
222            module.__file__ = '<mock __file__>'
223            module.__package__ = package
224            module.attr = name
225            if import_name != name:
226                module.__path__ = ['<mock __path__>']
227            self.modules[import_name] = module
228            if import_name in module_code:
229                self.module_code[import_name] = module_code[import_name]
230
231    def __getitem__(self, name):
232        return self.modules[name]
233
234    def __enter__(self):
235        self._uncache = uncache(*self.modules.keys())
236        self._uncache.__enter__()
237        return self
238
239    def __exit__(self, *exc_info):
240        self._uncache.__exit__(None, None, None)
241
242
243class mock_modules(_ImporterMock):
244
245    """Importer mock using PEP 302 APIs."""
246
247    def find_module(self, fullname, path=None):
248        if fullname not in self.modules:
249            return None
250        else:
251            return self
252
253    def load_module(self, fullname):
254        if fullname not in self.modules:
255            raise ImportError
256        else:
257            sys.modules[fullname] = self.modules[fullname]
258            if fullname in self.module_code:
259                try:
260                    self.module_code[fullname]()
261                except Exception:
262                    del sys.modules[fullname]
263                    raise
264            return self.modules[fullname]
265
266
267class mock_spec(_ImporterMock):
268
269    """Importer mock using PEP 451 APIs."""
270
271    def find_spec(self, fullname, path=None, parent=None):
272        try:
273            module = self.modules[fullname]
274        except KeyError:
275            return None
276        spec = util.spec_from_file_location(
277                fullname, module.__file__, loader=self,
278                submodule_search_locations=getattr(module, '__path__', None))
279        return spec
280
281    def create_module(self, spec):
282        if spec.name not in self.modules:
283            raise ImportError
284        return self.modules[spec.name]
285
286    def exec_module(self, module):
287        try:
288            self.module_code[module.__spec__.name]()
289        except KeyError:
290            pass
291
292
293def writes_bytecode_files(fxn):
294    """Decorator to protect sys.dont_write_bytecode from mutation and to skip
295    tests that require it to be set to False."""
296    if sys.dont_write_bytecode:
297        return lambda *args, **kwargs: None
298    @functools.wraps(fxn)
299    def wrapper(*args, **kwargs):
300        original = sys.dont_write_bytecode
301        sys.dont_write_bytecode = False
302        try:
303            to_return = fxn(*args, **kwargs)
304        finally:
305            sys.dont_write_bytecode = original
306        return to_return
307    return wrapper
308
309
310def ensure_bytecode_path(bytecode_path):
311    """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
312
313    :param bytecode_path: File system path to PEP 3147 pyc file.
314    """
315    try:
316        os.mkdir(os.path.dirname(bytecode_path))
317    except OSError as error:
318        if error.errno != errno.EEXIST:
319            raise
320
321
322@contextlib.contextmanager
323def temporary_pycache_prefix(prefix):
324    """Adjust and restore sys.pycache_prefix."""
325    _orig_prefix = sys.pycache_prefix
326    sys.pycache_prefix = prefix
327    try:
328        yield
329    finally:
330        sys.pycache_prefix = _orig_prefix
331
332
333@contextlib.contextmanager
334def create_modules(*names):
335    """Temporarily create each named module with an attribute (named 'attr')
336    that contains the name passed into the context manager that caused the
337    creation of the module.
338
339    All files are created in a temporary directory returned by
340    tempfile.mkdtemp(). This directory is inserted at the beginning of
341    sys.path. When the context manager exits all created files (source and
342    bytecode) are explicitly deleted.
343
344    No magic is performed when creating packages! This means that if you create
345    a module within a package you must also create the package's __init__ as
346    well.
347
348    """
349    source = 'attr = {0!r}'
350    created_paths = []
351    mapping = {}
352    state_manager = None
353    uncache_manager = None
354    try:
355        temp_dir = tempfile.mkdtemp()
356        mapping['.root'] = temp_dir
357        import_names = set()
358        for name in names:
359            if not name.endswith('__init__'):
360                import_name = name
361            else:
362                import_name = name[:-len('.__init__')]
363            import_names.add(import_name)
364            if import_name in sys.modules:
365                del sys.modules[import_name]
366            name_parts = name.split('.')
367            file_path = temp_dir
368            for directory in name_parts[:-1]:
369                file_path = os.path.join(file_path, directory)
370                if not os.path.exists(file_path):
371                    os.mkdir(file_path)
372                    created_paths.append(file_path)
373            file_path = os.path.join(file_path, name_parts[-1] + '.py')
374            with open(file_path, 'w') as file:
375                file.write(source.format(name))
376            created_paths.append(file_path)
377            mapping[name] = file_path
378        uncache_manager = uncache(*import_names)
379        uncache_manager.__enter__()
380        state_manager = import_state(path=[temp_dir])
381        state_manager.__enter__()
382        yield mapping
383    finally:
384        if state_manager is not None:
385            state_manager.__exit__(None, None, None)
386        if uncache_manager is not None:
387            uncache_manager.__exit__(None, None, None)
388        support.rmtree(temp_dir)
389
390
391def mock_path_hook(*entries, importer):
392    """A mock sys.path_hooks entry."""
393    def hook(entry):
394        if entry not in entries:
395            raise ImportError
396        return importer
397    return hook
398
399
400class CASEOKTestBase:
401
402    def caseok_env_changed(self, *, should_exist):
403        possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
404        if any(x in self.importlib._bootstrap_external._os.environ
405                    for x in possibilities) != should_exist:
406            self.skipTest('os.environ changes not reflected in _os.environ')
407
408
409def create_package(file, path, is_package=True, contents=()):
410    class Reader(ResourceReader):
411        def get_resource_reader(self, package):
412            return self
413
414        def open_resource(self, path):
415            self._path = path
416            if isinstance(file, Exception):
417                raise file
418            else:
419                return file
420
421        def resource_path(self, path_):
422            self._path = path_
423            if isinstance(path, Exception):
424                raise path
425            else:
426                return path
427
428        def is_resource(self, path_):
429            self._path = path_
430            if isinstance(path, Exception):
431                raise path
432            for entry in contents:
433                parts = entry.split('/')
434                if len(parts) == 1 and parts[0] == path_:
435                    return True
436            return False
437
438        def contents(self):
439            if isinstance(path, Exception):
440                raise path
441            # There's no yield from in baseball, er, Python 2.
442            for entry in contents:
443                yield entry
444
445    name = 'testingpackage'
446    # Unfortunately importlib.util.module_from_spec() was not introduced until
447    # Python 3.5.
448    module = types.ModuleType(name)
449    loader = Reader()
450    spec = machinery.ModuleSpec(
451        name, loader,
452        origin='does-not-exist',
453        is_package=is_package)
454    module.__spec__ = spec
455    module.__loader__ = loader
456    return module
457
458
459class CommonResourceTests(abc.ABC):
460    @abc.abstractmethod
461    def execute(self, package, path):
462        raise NotImplementedError
463
464    def test_package_name(self):
465        # Passing in the package name should succeed.
466        self.execute(data01.__name__, 'utf-8.file')
467
468    def test_package_object(self):
469        # Passing in the package itself should succeed.
470        self.execute(data01, 'utf-8.file')
471
472    def test_string_path(self):
473        # Passing in a string for the path should succeed.
474        path = 'utf-8.file'
475        self.execute(data01, path)
476
477    @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
478    def test_pathlib_path(self):
479        # Passing in a pathlib.PurePath object for the path should succeed.
480        path = PurePath('utf-8.file')
481        self.execute(data01, path)
482
483    def test_absolute_path(self):
484        # An absolute path is a ValueError.
485        path = Path(__file__)
486        full_path = path.parent/'utf-8.file'
487        with self.assertRaises(ValueError):
488            self.execute(data01, full_path)
489
490    def test_relative_path(self):
491        # A reative path is a ValueError.
492        with self.assertRaises(ValueError):
493            self.execute(data01, '../data01/utf-8.file')
494
495    def test_importing_module_as_side_effect(self):
496        # The anchor package can already be imported.
497        del sys.modules[data01.__name__]
498        self.execute(data01.__name__, 'utf-8.file')
499
500    def test_non_package_by_name(self):
501        # The anchor package cannot be a module.
502        with self.assertRaises(TypeError):
503            self.execute(__name__, 'utf-8.file')
504
505    def test_non_package_by_package(self):
506        # The anchor package cannot be a module.
507        with self.assertRaises(TypeError):
508            module = sys.modules['test.test_importlib.util']
509            self.execute(module, 'utf-8.file')
510
511    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
512    def test_resource_opener(self):
513        bytes_data = io.BytesIO(b'Hello, world!')
514        package = create_package(file=bytes_data, path=FileNotFoundError())
515        self.execute(package, 'utf-8.file')
516        self.assertEqual(package.__loader__._path, 'utf-8.file')
517
518    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
519    def test_resource_path(self):
520        bytes_data = io.BytesIO(b'Hello, world!')
521        path = __file__
522        package = create_package(file=bytes_data, path=path)
523        self.execute(package, 'utf-8.file')
524        self.assertEqual(package.__loader__._path, 'utf-8.file')
525
526    def test_useless_loader(self):
527        package = create_package(file=FileNotFoundError(),
528                                 path=FileNotFoundError())
529        with self.assertRaises(FileNotFoundError):
530            self.execute(package, 'utf-8.file')
531
532
533class ZipSetupBase:
534    ZIP_MODULE = None
535
536    @classmethod
537    def setUpClass(cls):
538        data_path = Path(cls.ZIP_MODULE.__file__)
539        data_dir = data_path.parent
540        cls._zip_path = str(data_dir / 'ziptestdata.zip')
541        sys.path.append(cls._zip_path)
542        cls.data = importlib.import_module('ziptestdata')
543
544    @classmethod
545    def tearDownClass(cls):
546        try:
547            sys.path.remove(cls._zip_path)
548        except ValueError:
549            pass
550
551        try:
552            del sys.path_importer_cache[cls._zip_path]
553            del sys.modules[cls.data.__name__]
554        except KeyError:
555            pass
556
557        try:
558            del cls.data
559            del cls._zip_path
560        except AttributeError:
561            pass
562
563    def setUp(self):
564        modules = support.modules_setup()
565        self.addCleanup(support.modules_cleanup, *modules)
566
567
568class ZipSetup(ZipSetupBase):
569    ZIP_MODULE = zipdata01                          # type: ignore
570