1import abc 2import builtins 3import contextlib 4import errno 5import functools 6import importlib 7from importlib import machinery, util, invalidate_caches 8from importlib.abc import ResourceReader 9import io 10import marshal 11import os 12import os.path 13from pathlib import Path, PurePath 14from test import support 15import unittest 16import sys 17import tempfile 18import types 19 20from . import data01 21from . import zipdata01 22 23 24BUILTINS = types.SimpleNamespace() 25BUILTINS.good_name = None 26BUILTINS.bad_name = None 27if 'errno' in sys.builtin_module_names: 28 BUILTINS.good_name = 'errno' 29if 'importlib' not in sys.builtin_module_names: 30 BUILTINS.bad_name = 'importlib' 31 32EXTENSIONS = types.SimpleNamespace() 33EXTENSIONS.path = None 34EXTENSIONS.ext = None 35EXTENSIONS.filename = None 36EXTENSIONS.file_path = None 37EXTENSIONS.name = '_testcapi' 38 39def _extension_details(): 40 global EXTENSIONS 41 for path in sys.path: 42 for ext in machinery.EXTENSION_SUFFIXES: 43 filename = EXTENSIONS.name + ext 44 file_path = os.path.join(path, filename) 45 if os.path.exists(file_path): 46 EXTENSIONS.path = path 47 EXTENSIONS.ext = ext 48 EXTENSIONS.filename = filename 49 EXTENSIONS.file_path = file_path 50 return 51 52_extension_details() 53 54 55def import_importlib(module_name): 56 """Import a module from importlib both w/ and w/o _frozen_importlib.""" 57 fresh = ('importlib',) if '.' in module_name else () 58 frozen = support.import_fresh_module(module_name) 59 source = support.import_fresh_module(module_name, fresh=fresh, 60 blocked=('_frozen_importlib', '_frozen_importlib_external')) 61 return {'Frozen': frozen, 'Source': source} 62 63 64def specialize_class(cls, kind, base=None, **kwargs): 65 # XXX Support passing in submodule names--load (and cache) them? 66 # That would clean up the test modules a bit more. 67 if base is None: 68 base = unittest.TestCase 69 elif not isinstance(base, type): 70 base = base[kind] 71 name = '{}_{}'.format(kind, cls.__name__) 72 bases = (cls, base) 73 specialized = types.new_class(name, bases) 74 specialized.__module__ = cls.__module__ 75 specialized._NAME = cls.__name__ 76 specialized._KIND = kind 77 for attr, values in kwargs.items(): 78 value = values[kind] 79 setattr(specialized, attr, value) 80 return specialized 81 82 83def split_frozen(cls, base=None, **kwargs): 84 frozen = specialize_class(cls, 'Frozen', base, **kwargs) 85 source = specialize_class(cls, 'Source', base, **kwargs) 86 return frozen, source 87 88 89def test_both(test_class, base=None, **kwargs): 90 return split_frozen(test_class, base, **kwargs) 91 92 93CASE_INSENSITIVE_FS = True 94# Windows is the only OS that is *always* case-insensitive 95# (OS X *can* be case-sensitive). 96if sys.platform not in ('win32', 'cygwin'): 97 changed_name = __file__.upper() 98 if changed_name == __file__: 99 changed_name = __file__.lower() 100 if not os.path.exists(changed_name): 101 CASE_INSENSITIVE_FS = False 102 103source_importlib = import_importlib('importlib')['Source'] 104__import__ = {'Frozen': staticmethod(builtins.__import__), 105 'Source': staticmethod(source_importlib.__import__)} 106 107 108def case_insensitive_tests(test): 109 """Class decorator that nullifies tests requiring a case-insensitive 110 file system.""" 111 return unittest.skipIf(not CASE_INSENSITIVE_FS, 112 "requires a case-insensitive filesystem")(test) 113 114 115def submodule(parent, name, pkg_dir, content=''): 116 path = os.path.join(pkg_dir, name + '.py') 117 with open(path, 'w') as subfile: 118 subfile.write(content) 119 return '{}.{}'.format(parent, name), path 120 121 122def get_code_from_pyc(pyc_path): 123 """Reads a pyc file and returns the unmarshalled code object within. 124 125 No header validation is performed. 126 """ 127 with open(pyc_path, 'rb') as pyc_f: 128 pyc_f.seek(16) 129 return marshal.load(pyc_f) 130 131 132@contextlib.contextmanager 133def uncache(*names): 134 """Uncache a module from sys.modules. 135 136 A basic sanity check is performed to prevent uncaching modules that either 137 cannot/shouldn't be uncached. 138 139 """ 140 for name in names: 141 if name in ('sys', 'marshal', 'imp'): 142 raise ValueError( 143 "cannot uncache {0}".format(name)) 144 try: 145 del sys.modules[name] 146 except KeyError: 147 pass 148 try: 149 yield 150 finally: 151 for name in names: 152 try: 153 del sys.modules[name] 154 except KeyError: 155 pass 156 157 158@contextlib.contextmanager 159def temp_module(name, content='', *, pkg=False): 160 conflicts = [n for n in sys.modules if n.partition('.')[0] == name] 161 with support.temp_cwd(None) as cwd: 162 with uncache(name, *conflicts): 163 with support.DirsOnSysPath(cwd): 164 invalidate_caches() 165 166 location = os.path.join(cwd, name) 167 if pkg: 168 modpath = os.path.join(location, '__init__.py') 169 os.mkdir(name) 170 else: 171 modpath = location + '.py' 172 if content is None: 173 # Make sure the module file gets created. 174 content = '' 175 if content is not None: 176 # not a namespace package 177 with open(modpath, 'w') as modfile: 178 modfile.write(content) 179 yield location 180 181 182@contextlib.contextmanager 183def import_state(**kwargs): 184 """Context manager to manage the various importers and stored state in the 185 sys module. 186 187 The 'modules' attribute is not supported as the interpreter state stores a 188 pointer to the dict that the interpreter uses internally; 189 reassigning to sys.modules does not have the desired effect. 190 191 """ 192 originals = {} 193 try: 194 for attr, default in (('meta_path', []), ('path', []), 195 ('path_hooks', []), 196 ('path_importer_cache', {})): 197 originals[attr] = getattr(sys, attr) 198 if attr in kwargs: 199 new_value = kwargs[attr] 200 del kwargs[attr] 201 else: 202 new_value = default 203 setattr(sys, attr, new_value) 204 if len(kwargs): 205 raise ValueError( 206 'unrecognized arguments: {0}'.format(kwargs.keys())) 207 yield 208 finally: 209 for attr, value in originals.items(): 210 setattr(sys, attr, value) 211 212 213class _ImporterMock: 214 215 """Base class to help with creating importer mocks.""" 216 217 def __init__(self, *names, module_code={}): 218 self.modules = {} 219 self.module_code = {} 220 for name in names: 221 if not name.endswith('.__init__'): 222 import_name = name 223 else: 224 import_name = name[:-len('.__init__')] 225 if '.' not in name: 226 package = None 227 elif import_name == name: 228 package = name.rsplit('.', 1)[0] 229 else: 230 package = import_name 231 module = types.ModuleType(import_name) 232 module.__loader__ = self 233 module.__file__ = '<mock __file__>' 234 module.__package__ = package 235 module.attr = name 236 if import_name != name: 237 module.__path__ = ['<mock __path__>'] 238 self.modules[import_name] = module 239 if import_name in module_code: 240 self.module_code[import_name] = module_code[import_name] 241 242 def __getitem__(self, name): 243 return self.modules[name] 244 245 def __enter__(self): 246 self._uncache = uncache(*self.modules.keys()) 247 self._uncache.__enter__() 248 return self 249 250 def __exit__(self, *exc_info): 251 self._uncache.__exit__(None, None, None) 252 253 254class mock_modules(_ImporterMock): 255 256 """Importer mock using PEP 302 APIs.""" 257 258 def find_module(self, fullname, path=None): 259 if fullname not in self.modules: 260 return None 261 else: 262 return self 263 264 def load_module(self, fullname): 265 if fullname not in self.modules: 266 raise ImportError 267 else: 268 sys.modules[fullname] = self.modules[fullname] 269 if fullname in self.module_code: 270 try: 271 self.module_code[fullname]() 272 except Exception: 273 del sys.modules[fullname] 274 raise 275 return self.modules[fullname] 276 277 278class mock_spec(_ImporterMock): 279 280 """Importer mock using PEP 451 APIs.""" 281 282 def find_spec(self, fullname, path=None, parent=None): 283 try: 284 module = self.modules[fullname] 285 except KeyError: 286 return None 287 spec = util.spec_from_file_location( 288 fullname, module.__file__, loader=self, 289 submodule_search_locations=getattr(module, '__path__', None)) 290 return spec 291 292 def create_module(self, spec): 293 if spec.name not in self.modules: 294 raise ImportError 295 return self.modules[spec.name] 296 297 def exec_module(self, module): 298 try: 299 self.module_code[module.__spec__.name]() 300 except KeyError: 301 pass 302 303 304def writes_bytecode_files(fxn): 305 """Decorator to protect sys.dont_write_bytecode from mutation and to skip 306 tests that require it to be set to False.""" 307 if sys.dont_write_bytecode: 308 return lambda *args, **kwargs: None 309 @functools.wraps(fxn) 310 def wrapper(*args, **kwargs): 311 original = sys.dont_write_bytecode 312 sys.dont_write_bytecode = False 313 try: 314 to_return = fxn(*args, **kwargs) 315 finally: 316 sys.dont_write_bytecode = original 317 return to_return 318 return wrapper 319 320 321def ensure_bytecode_path(bytecode_path): 322 """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. 323 324 :param bytecode_path: File system path to PEP 3147 pyc file. 325 """ 326 try: 327 os.mkdir(os.path.dirname(bytecode_path)) 328 except OSError as error: 329 if error.errno != errno.EEXIST: 330 raise 331 332 333@contextlib.contextmanager 334def temporary_pycache_prefix(prefix): 335 """Adjust and restore sys.pycache_prefix.""" 336 _orig_prefix = sys.pycache_prefix 337 sys.pycache_prefix = prefix 338 try: 339 yield 340 finally: 341 sys.pycache_prefix = _orig_prefix 342 343 344@contextlib.contextmanager 345def create_modules(*names): 346 """Temporarily create each named module with an attribute (named 'attr') 347 that contains the name passed into the context manager that caused the 348 creation of the module. 349 350 All files are created in a temporary directory returned by 351 tempfile.mkdtemp(). This directory is inserted at the beginning of 352 sys.path. When the context manager exits all created files (source and 353 bytecode) are explicitly deleted. 354 355 No magic is performed when creating packages! This means that if you create 356 a module within a package you must also create the package's __init__ as 357 well. 358 359 """ 360 source = 'attr = {0!r}' 361 created_paths = [] 362 mapping = {} 363 state_manager = None 364 uncache_manager = None 365 try: 366 temp_dir = tempfile.mkdtemp() 367 mapping['.root'] = temp_dir 368 import_names = set() 369 for name in names: 370 if not name.endswith('__init__'): 371 import_name = name 372 else: 373 import_name = name[:-len('.__init__')] 374 import_names.add(import_name) 375 if import_name in sys.modules: 376 del sys.modules[import_name] 377 name_parts = name.split('.') 378 file_path = temp_dir 379 for directory in name_parts[:-1]: 380 file_path = os.path.join(file_path, directory) 381 if not os.path.exists(file_path): 382 os.mkdir(file_path) 383 created_paths.append(file_path) 384 file_path = os.path.join(file_path, name_parts[-1] + '.py') 385 with open(file_path, 'w') as file: 386 file.write(source.format(name)) 387 created_paths.append(file_path) 388 mapping[name] = file_path 389 uncache_manager = uncache(*import_names) 390 uncache_manager.__enter__() 391 state_manager = import_state(path=[temp_dir]) 392 state_manager.__enter__() 393 yield mapping 394 finally: 395 if state_manager is not None: 396 state_manager.__exit__(None, None, None) 397 if uncache_manager is not None: 398 uncache_manager.__exit__(None, None, None) 399 support.rmtree(temp_dir) 400 401 402def mock_path_hook(*entries, importer): 403 """A mock sys.path_hooks entry.""" 404 def hook(entry): 405 if entry not in entries: 406 raise ImportError 407 return importer 408 return hook 409 410 411class CASEOKTestBase: 412 413 def caseok_env_changed(self, *, should_exist): 414 possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK' 415 if any(x in self.importlib._bootstrap_external._os.environ 416 for x in possibilities) != should_exist: 417 self.skipTest('os.environ changes not reflected in _os.environ') 418 419 420def create_package(file, path, is_package=True, contents=()): 421 class Reader(ResourceReader): 422 def get_resource_reader(self, package): 423 return self 424 425 def open_resource(self, path): 426 self._path = path 427 if isinstance(file, Exception): 428 raise file 429 else: 430 return file 431 432 def resource_path(self, path_): 433 self._path = path_ 434 if isinstance(path, Exception): 435 raise path 436 else: 437 return path 438 439 def is_resource(self, path_): 440 self._path = path_ 441 if isinstance(path, Exception): 442 raise path 443 for entry in contents: 444 parts = entry.split('/') 445 if len(parts) == 1 and parts[0] == path_: 446 return True 447 return False 448 449 def contents(self): 450 if isinstance(path, Exception): 451 raise path 452 # There's no yield from in baseball, er, Python 2. 453 for entry in contents: 454 yield entry 455 456 name = 'testingpackage' 457 # Unfortunately importlib.util.module_from_spec() was not introduced until 458 # Python 3.5. 459 module = types.ModuleType(name) 460 loader = Reader() 461 spec = machinery.ModuleSpec( 462 name, loader, 463 origin='does-not-exist', 464 is_package=is_package) 465 module.__spec__ = spec 466 module.__loader__ = loader 467 return module 468 469 470class CommonResourceTests(abc.ABC): 471 @abc.abstractmethod 472 def execute(self, package, path): 473 raise NotImplementedError 474 475 def test_package_name(self): 476 # Passing in the package name should succeed. 477 self.execute(data01.__name__, 'utf-8.file') 478 479 def test_package_object(self): 480 # Passing in the package itself should succeed. 481 self.execute(data01, 'utf-8.file') 482 483 def test_string_path(self): 484 # Passing in a string for the path should succeed. 485 path = 'utf-8.file' 486 self.execute(data01, path) 487 488 @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support') 489 def test_pathlib_path(self): 490 # Passing in a pathlib.PurePath object for the path should succeed. 491 path = PurePath('utf-8.file') 492 self.execute(data01, path) 493 494 def test_absolute_path(self): 495 # An absolute path is a ValueError. 496 path = Path(__file__) 497 full_path = path.parent/'utf-8.file' 498 with self.assertRaises(ValueError): 499 self.execute(data01, full_path) 500 501 def test_relative_path(self): 502 # A relative path is a ValueError. 503 with self.assertRaises(ValueError): 504 self.execute(data01, '../data01/utf-8.file') 505 506 def test_importing_module_as_side_effect(self): 507 # The anchor package can already be imported. 508 del sys.modules[data01.__name__] 509 self.execute(data01.__name__, 'utf-8.file') 510 511 def test_non_package_by_name(self): 512 # The anchor package cannot be a module. 513 with self.assertRaises(TypeError): 514 self.execute(__name__, 'utf-8.file') 515 516 def test_non_package_by_package(self): 517 # The anchor package cannot be a module. 518 with self.assertRaises(TypeError): 519 module = sys.modules['test.test_importlib.util'] 520 self.execute(module, 'utf-8.file') 521 522 @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2') 523 def test_resource_opener(self): 524 bytes_data = io.BytesIO(b'Hello, world!') 525 package = create_package(file=bytes_data, path=FileNotFoundError()) 526 self.execute(package, 'utf-8.file') 527 self.assertEqual(package.__loader__._path, 'utf-8.file') 528 529 @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2') 530 def test_resource_path(self): 531 bytes_data = io.BytesIO(b'Hello, world!') 532 path = __file__ 533 package = create_package(file=bytes_data, path=path) 534 self.execute(package, 'utf-8.file') 535 self.assertEqual(package.__loader__._path, 'utf-8.file') 536 537 def test_useless_loader(self): 538 package = create_package(file=FileNotFoundError(), 539 path=FileNotFoundError()) 540 with self.assertRaises(FileNotFoundError): 541 self.execute(package, 'utf-8.file') 542 543 544class ZipSetupBase: 545 ZIP_MODULE = None 546 547 @classmethod 548 def setUpClass(cls): 549 data_path = Path(cls.ZIP_MODULE.__file__) 550 data_dir = data_path.parent 551 cls._zip_path = str(data_dir / 'ziptestdata.zip') 552 sys.path.append(cls._zip_path) 553 cls.data = importlib.import_module('ziptestdata') 554 555 @classmethod 556 def tearDownClass(cls): 557 try: 558 sys.path.remove(cls._zip_path) 559 except ValueError: 560 pass 561 562 try: 563 del sys.path_importer_cache[cls._zip_path] 564 del sys.modules[cls.data.__name__] 565 except KeyError: 566 pass 567 568 try: 569 del cls.data 570 del cls._zip_path 571 except AttributeError: 572 pass 573 574 def setUp(self): 575 modules = support.modules_setup() 576 self.addCleanup(support.modules_cleanup, *modules) 577 578 579class ZipSetup(ZipSetupBase): 580 ZIP_MODULE = zipdata01 # type: ignore 581