• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import abc
2import builtins
3import contextlib
4import errno
5import functools
6import importlib
7from importlib import machinery, util, invalidate_caches
8from importlib.abc import ResourceReader
9import io
10import marshal
11import os
12import os.path
13from pathlib import Path, PurePath
14from test import support
15from test.support import import_helper
16from test.support import os_helper
17import unittest
18import sys
19import tempfile
20import types
21
22from . import data01
23from . import zipdata01
24
25
26BUILTINS = types.SimpleNamespace()
27BUILTINS.good_name = None
28BUILTINS.bad_name = None
29if 'errno' in sys.builtin_module_names:
30    BUILTINS.good_name = 'errno'
31if 'importlib' not in sys.builtin_module_names:
32    BUILTINS.bad_name = 'importlib'
33
34EXTENSIONS = types.SimpleNamespace()
35EXTENSIONS.path = None
36EXTENSIONS.ext = None
37EXTENSIONS.filename = None
38EXTENSIONS.file_path = None
39EXTENSIONS.name = '_testcapi'
40
41def _extension_details():
42    global EXTENSIONS
43    for path in sys.path:
44        for ext in machinery.EXTENSION_SUFFIXES:
45            filename = EXTENSIONS.name + ext
46            file_path = os.path.join(path, filename)
47            if os.path.exists(file_path):
48                EXTENSIONS.path = path
49                EXTENSIONS.ext = ext
50                EXTENSIONS.filename = filename
51                EXTENSIONS.file_path = file_path
52                return
53
54_extension_details()
55
56
57def import_importlib(module_name):
58    """Import a module from importlib both w/ and w/o _frozen_importlib."""
59    fresh = ('importlib',) if '.' in module_name else ()
60    frozen = import_helper.import_fresh_module(module_name)
61    source = import_helper.import_fresh_module(module_name, fresh=fresh,
62                                         blocked=('_frozen_importlib', '_frozen_importlib_external'))
63    return {'Frozen': frozen, 'Source': source}
64
65
66def specialize_class(cls, kind, base=None, **kwargs):
67    # XXX Support passing in submodule names--load (and cache) them?
68    # That would clean up the test modules a bit more.
69    if base is None:
70        base = unittest.TestCase
71    elif not isinstance(base, type):
72        base = base[kind]
73    name = '{}_{}'.format(kind, cls.__name__)
74    bases = (cls, base)
75    specialized = types.new_class(name, bases)
76    specialized.__module__ = cls.__module__
77    specialized._NAME = cls.__name__
78    specialized._KIND = kind
79    for attr, values in kwargs.items():
80        value = values[kind]
81        setattr(specialized, attr, value)
82    return specialized
83
84
85def split_frozen(cls, base=None, **kwargs):
86    frozen = specialize_class(cls, 'Frozen', base, **kwargs)
87    source = specialize_class(cls, 'Source', base, **kwargs)
88    return frozen, source
89
90
91def test_both(test_class, base=None, **kwargs):
92    return split_frozen(test_class, base, **kwargs)
93
94
95CASE_INSENSITIVE_FS = True
96# Windows is the only OS that is *always* case-insensitive
97# (OS X *can* be case-sensitive).
98if sys.platform not in ('win32', 'cygwin'):
99    changed_name = __file__.upper()
100    if changed_name == __file__:
101        changed_name = __file__.lower()
102    if not os.path.exists(changed_name):
103        CASE_INSENSITIVE_FS = False
104
105source_importlib = import_importlib('importlib')['Source']
106__import__ = {'Frozen': staticmethod(builtins.__import__),
107              'Source': staticmethod(source_importlib.__import__)}
108
109
110def case_insensitive_tests(test):
111    """Class decorator that nullifies tests requiring a case-insensitive
112    file system."""
113    return unittest.skipIf(not CASE_INSENSITIVE_FS,
114                            "requires a case-insensitive filesystem")(test)
115
116
117def submodule(parent, name, pkg_dir, content=''):
118    path = os.path.join(pkg_dir, name + '.py')
119    with open(path, 'w', encoding='utf-8') as subfile:
120        subfile.write(content)
121    return '{}.{}'.format(parent, name), path
122
123
124def get_code_from_pyc(pyc_path):
125    """Reads a pyc file and returns the unmarshalled code object within.
126
127    No header validation is performed.
128    """
129    with open(pyc_path, 'rb') as pyc_f:
130        pyc_f.seek(16)
131        return marshal.load(pyc_f)
132
133
134@contextlib.contextmanager
135def uncache(*names):
136    """Uncache a module from sys.modules.
137
138    A basic sanity check is performed to prevent uncaching modules that either
139    cannot/shouldn't be uncached.
140
141    """
142    for name in names:
143        if name in ('sys', 'marshal', 'imp'):
144            raise ValueError(
145                "cannot uncache {0}".format(name))
146        try:
147            del sys.modules[name]
148        except KeyError:
149            pass
150    try:
151        yield
152    finally:
153        for name in names:
154            try:
155                del sys.modules[name]
156            except KeyError:
157                pass
158
159
160@contextlib.contextmanager
161def temp_module(name, content='', *, pkg=False):
162    conflicts = [n for n in sys.modules if n.partition('.')[0] == name]
163    with os_helper.temp_cwd(None) as cwd:
164        with uncache(name, *conflicts):
165            with import_helper.DirsOnSysPath(cwd):
166                invalidate_caches()
167
168                location = os.path.join(cwd, name)
169                if pkg:
170                    modpath = os.path.join(location, '__init__.py')
171                    os.mkdir(name)
172                else:
173                    modpath = location + '.py'
174                    if content is None:
175                        # Make sure the module file gets created.
176                        content = ''
177                if content is not None:
178                    # not a namespace package
179                    with open(modpath, 'w', encoding='utf-8') as modfile:
180                        modfile.write(content)
181                yield location
182
183
184@contextlib.contextmanager
185def import_state(**kwargs):
186    """Context manager to manage the various importers and stored state in the
187    sys module.
188
189    The 'modules' attribute is not supported as the interpreter state stores a
190    pointer to the dict that the interpreter uses internally;
191    reassigning to sys.modules does not have the desired effect.
192
193    """
194    originals = {}
195    try:
196        for attr, default in (('meta_path', []), ('path', []),
197                              ('path_hooks', []),
198                              ('path_importer_cache', {})):
199            originals[attr] = getattr(sys, attr)
200            if attr in kwargs:
201                new_value = kwargs[attr]
202                del kwargs[attr]
203            else:
204                new_value = default
205            setattr(sys, attr, new_value)
206        if len(kwargs):
207            raise ValueError(
208                    'unrecognized arguments: {0}'.format(kwargs.keys()))
209        yield
210    finally:
211        for attr, value in originals.items():
212            setattr(sys, attr, value)
213
214
215class _ImporterMock:
216
217    """Base class to help with creating importer mocks."""
218
219    def __init__(self, *names, module_code={}):
220        self.modules = {}
221        self.module_code = {}
222        for name in names:
223            if not name.endswith('.__init__'):
224                import_name = name
225            else:
226                import_name = name[:-len('.__init__')]
227            if '.' not in name:
228                package = None
229            elif import_name == name:
230                package = name.rsplit('.', 1)[0]
231            else:
232                package = import_name
233            module = types.ModuleType(import_name)
234            module.__loader__ = self
235            module.__file__ = '<mock __file__>'
236            module.__package__ = package
237            module.attr = name
238            if import_name != name:
239                module.__path__ = ['<mock __path__>']
240            self.modules[import_name] = module
241            if import_name in module_code:
242                self.module_code[import_name] = module_code[import_name]
243
244    def __getitem__(self, name):
245        return self.modules[name]
246
247    def __enter__(self):
248        self._uncache = uncache(*self.modules.keys())
249        self._uncache.__enter__()
250        return self
251
252    def __exit__(self, *exc_info):
253        self._uncache.__exit__(None, None, None)
254
255
256class mock_modules(_ImporterMock):
257
258    """Importer mock using PEP 302 APIs."""
259
260    def find_module(self, fullname, path=None):
261        if fullname not in self.modules:
262            return None
263        else:
264            return self
265
266    def load_module(self, fullname):
267        if fullname not in self.modules:
268            raise ImportError
269        else:
270            sys.modules[fullname] = self.modules[fullname]
271            if fullname in self.module_code:
272                try:
273                    self.module_code[fullname]()
274                except Exception:
275                    del sys.modules[fullname]
276                    raise
277            return self.modules[fullname]
278
279
280class mock_spec(_ImporterMock):
281
282    """Importer mock using PEP 451 APIs."""
283
284    def find_spec(self, fullname, path=None, parent=None):
285        try:
286            module = self.modules[fullname]
287        except KeyError:
288            return None
289        spec = util.spec_from_file_location(
290                fullname, module.__file__, loader=self,
291                submodule_search_locations=getattr(module, '__path__', None))
292        return spec
293
294    def create_module(self, spec):
295        if spec.name not in self.modules:
296            raise ImportError
297        return self.modules[spec.name]
298
299    def exec_module(self, module):
300        try:
301            self.module_code[module.__spec__.name]()
302        except KeyError:
303            pass
304
305
306def writes_bytecode_files(fxn):
307    """Decorator to protect sys.dont_write_bytecode from mutation and to skip
308    tests that require it to be set to False."""
309    if sys.dont_write_bytecode:
310        return lambda *args, **kwargs: None
311    @functools.wraps(fxn)
312    def wrapper(*args, **kwargs):
313        original = sys.dont_write_bytecode
314        sys.dont_write_bytecode = False
315        try:
316            to_return = fxn(*args, **kwargs)
317        finally:
318            sys.dont_write_bytecode = original
319        return to_return
320    return wrapper
321
322
323def ensure_bytecode_path(bytecode_path):
324    """Ensure that the __pycache__ directory for PEP 3147 pyc file exists.
325
326    :param bytecode_path: File system path to PEP 3147 pyc file.
327    """
328    try:
329        os.mkdir(os.path.dirname(bytecode_path))
330    except OSError as error:
331        if error.errno != errno.EEXIST:
332            raise
333
334
335@contextlib.contextmanager
336def temporary_pycache_prefix(prefix):
337    """Adjust and restore sys.pycache_prefix."""
338    _orig_prefix = sys.pycache_prefix
339    sys.pycache_prefix = prefix
340    try:
341        yield
342    finally:
343        sys.pycache_prefix = _orig_prefix
344
345
346@contextlib.contextmanager
347def create_modules(*names):
348    """Temporarily create each named module with an attribute (named 'attr')
349    that contains the name passed into the context manager that caused the
350    creation of the module.
351
352    All files are created in a temporary directory returned by
353    tempfile.mkdtemp(). This directory is inserted at the beginning of
354    sys.path. When the context manager exits all created files (source and
355    bytecode) are explicitly deleted.
356
357    No magic is performed when creating packages! This means that if you create
358    a module within a package you must also create the package's __init__ as
359    well.
360
361    """
362    source = 'attr = {0!r}'
363    created_paths = []
364    mapping = {}
365    state_manager = None
366    uncache_manager = None
367    try:
368        temp_dir = tempfile.mkdtemp()
369        mapping['.root'] = temp_dir
370        import_names = set()
371        for name in names:
372            if not name.endswith('__init__'):
373                import_name = name
374            else:
375                import_name = name[:-len('.__init__')]
376            import_names.add(import_name)
377            if import_name in sys.modules:
378                del sys.modules[import_name]
379            name_parts = name.split('.')
380            file_path = temp_dir
381            for directory in name_parts[:-1]:
382                file_path = os.path.join(file_path, directory)
383                if not os.path.exists(file_path):
384                    os.mkdir(file_path)
385                    created_paths.append(file_path)
386            file_path = os.path.join(file_path, name_parts[-1] + '.py')
387            with open(file_path, 'w', encoding='utf-8') as file:
388                file.write(source.format(name))
389            created_paths.append(file_path)
390            mapping[name] = file_path
391        uncache_manager = uncache(*import_names)
392        uncache_manager.__enter__()
393        state_manager = import_state(path=[temp_dir])
394        state_manager.__enter__()
395        yield mapping
396    finally:
397        if state_manager is not None:
398            state_manager.__exit__(None, None, None)
399        if uncache_manager is not None:
400            uncache_manager.__exit__(None, None, None)
401        os_helper.rmtree(temp_dir)
402
403
404def mock_path_hook(*entries, importer):
405    """A mock sys.path_hooks entry."""
406    def hook(entry):
407        if entry not in entries:
408            raise ImportError
409        return importer
410    return hook
411
412
413class CASEOKTestBase:
414
415    def caseok_env_changed(self, *, should_exist):
416        possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK'
417        if any(x in self.importlib._bootstrap_external._os.environ
418                    for x in possibilities) != should_exist:
419            self.skipTest('os.environ changes not reflected in _os.environ')
420
421
422def create_package(file, path, is_package=True, contents=()):
423    class Reader(ResourceReader):
424        def get_resource_reader(self, package):
425            return self
426
427        def open_resource(self, path):
428            self._path = path
429            if isinstance(file, Exception):
430                raise file
431            else:
432                return file
433
434        def resource_path(self, path_):
435            self._path = path_
436            if isinstance(path, Exception):
437                raise path
438            else:
439                return path
440
441        def is_resource(self, path_):
442            self._path = path_
443            if isinstance(path, Exception):
444                raise path
445            for entry in contents:
446                parts = entry.split('/')
447                if len(parts) == 1 and parts[0] == path_:
448                    return True
449            return False
450
451        def contents(self):
452            if isinstance(path, Exception):
453                raise path
454            # There's no yield from in baseball, er, Python 2.
455            for entry in contents:
456                yield entry
457
458    name = 'testingpackage'
459    # Unfortunately importlib.util.module_from_spec() was not introduced until
460    # Python 3.5.
461    module = types.ModuleType(name)
462    loader = Reader()
463    spec = machinery.ModuleSpec(
464        name, loader,
465        origin='does-not-exist',
466        is_package=is_package)
467    module.__spec__ = spec
468    module.__loader__ = loader
469    return module
470
471
472class CommonResourceTests(abc.ABC):
473    @abc.abstractmethod
474    def execute(self, package, path):
475        raise NotImplementedError
476
477    def test_package_name(self):
478        # Passing in the package name should succeed.
479        self.execute(data01.__name__, 'utf-8.file')
480
481    def test_package_object(self):
482        # Passing in the package itself should succeed.
483        self.execute(data01, 'utf-8.file')
484
485    def test_string_path(self):
486        # Passing in a string for the path should succeed.
487        path = 'utf-8.file'
488        self.execute(data01, path)
489
490    @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support')
491    def test_pathlib_path(self):
492        # Passing in a pathlib.PurePath object for the path should succeed.
493        path = PurePath('utf-8.file')
494        self.execute(data01, path)
495
496    def test_absolute_path(self):
497        # An absolute path is a ValueError.
498        path = Path(__file__)
499        full_path = path.parent/'utf-8.file'
500        with self.assertRaises(ValueError):
501            self.execute(data01, full_path)
502
503    def test_relative_path(self):
504        # A relative path is a ValueError.
505        with self.assertRaises(ValueError):
506            self.execute(data01, '../data01/utf-8.file')
507
508    def test_importing_module_as_side_effect(self):
509        # The anchor package can already be imported.
510        del sys.modules[data01.__name__]
511        self.execute(data01.__name__, 'utf-8.file')
512
513    def test_non_package_by_name(self):
514        # The anchor package cannot be a module.
515        with self.assertRaises(TypeError):
516            self.execute(__name__, 'utf-8.file')
517
518    def test_non_package_by_package(self):
519        # The anchor package cannot be a module.
520        with self.assertRaises(TypeError):
521            module = sys.modules['test.test_importlib.util']
522            self.execute(module, 'utf-8.file')
523
524    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
525    def test_resource_opener(self):
526        bytes_data = io.BytesIO(b'Hello, world!')
527        package = create_package(file=bytes_data, path=FileNotFoundError())
528        self.execute(package, 'utf-8.file')
529        self.assertEqual(package.__loader__._path, 'utf-8.file')
530
531    @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2')
532    def test_resource_path(self):
533        bytes_data = io.BytesIO(b'Hello, world!')
534        path = __file__
535        package = create_package(file=bytes_data, path=path)
536        self.execute(package, 'utf-8.file')
537        self.assertEqual(package.__loader__._path, 'utf-8.file')
538
539    def test_useless_loader(self):
540        package = create_package(file=FileNotFoundError(),
541                                 path=FileNotFoundError())
542        with self.assertRaises(FileNotFoundError):
543            self.execute(package, 'utf-8.file')
544
545
546class ZipSetupBase:
547    ZIP_MODULE = None
548
549    @classmethod
550    def setUpClass(cls):
551        data_path = Path(cls.ZIP_MODULE.__file__)
552        data_dir = data_path.parent
553        cls._zip_path = str(data_dir / 'ziptestdata.zip')
554        sys.path.append(cls._zip_path)
555        cls.data = importlib.import_module('ziptestdata')
556
557    @classmethod
558    def tearDownClass(cls):
559        try:
560            sys.path.remove(cls._zip_path)
561        except ValueError:
562            pass
563
564        try:
565            del sys.path_importer_cache[cls._zip_path]
566            del sys.modules[cls.data.__name__]
567        except KeyError:
568            pass
569
570        try:
571            del cls.data
572            del cls._zip_path
573        except AttributeError:
574            pass
575
576    def setUp(self):
577        modules = import_helper.modules_setup()
578        self.addCleanup(import_helper.modules_cleanup, *modules)
579
580
581class ZipSetup(ZipSetupBase):
582    ZIP_MODULE = zipdata01                          # type: ignore
583