1import abc 2import builtins 3import contextlib 4import errno 5import functools 6import importlib 7from importlib import machinery, util, invalidate_caches 8from importlib.abc import ResourceReader 9import io 10import marshal 11import os 12import os.path 13from pathlib import Path, PurePath 14from test import support 15from test.support import import_helper 16from test.support import os_helper 17import unittest 18import sys 19import tempfile 20import types 21 22from . import data01 23from . import zipdata01 24 25 26BUILTINS = types.SimpleNamespace() 27BUILTINS.good_name = None 28BUILTINS.bad_name = None 29if 'errno' in sys.builtin_module_names: 30 BUILTINS.good_name = 'errno' 31if 'importlib' not in sys.builtin_module_names: 32 BUILTINS.bad_name = 'importlib' 33 34EXTENSIONS = types.SimpleNamespace() 35EXTENSIONS.path = None 36EXTENSIONS.ext = None 37EXTENSIONS.filename = None 38EXTENSIONS.file_path = None 39EXTENSIONS.name = '_testcapi' 40 41def _extension_details(): 42 global EXTENSIONS 43 for path in sys.path: 44 for ext in machinery.EXTENSION_SUFFIXES: 45 filename = EXTENSIONS.name + ext 46 file_path = os.path.join(path, filename) 47 if os.path.exists(file_path): 48 EXTENSIONS.path = path 49 EXTENSIONS.ext = ext 50 EXTENSIONS.filename = filename 51 EXTENSIONS.file_path = file_path 52 return 53 54_extension_details() 55 56 57def import_importlib(module_name): 58 """Import a module from importlib both w/ and w/o _frozen_importlib.""" 59 fresh = ('importlib',) if '.' in module_name else () 60 frozen = import_helper.import_fresh_module(module_name) 61 source = import_helper.import_fresh_module(module_name, fresh=fresh, 62 blocked=('_frozen_importlib', '_frozen_importlib_external')) 63 return {'Frozen': frozen, 'Source': source} 64 65 66def specialize_class(cls, kind, base=None, **kwargs): 67 # XXX Support passing in submodule names--load (and cache) them? 68 # That would clean up the test modules a bit more. 69 if base is None: 70 base = unittest.TestCase 71 elif not isinstance(base, type): 72 base = base[kind] 73 name = '{}_{}'.format(kind, cls.__name__) 74 bases = (cls, base) 75 specialized = types.new_class(name, bases) 76 specialized.__module__ = cls.__module__ 77 specialized._NAME = cls.__name__ 78 specialized._KIND = kind 79 for attr, values in kwargs.items(): 80 value = values[kind] 81 setattr(specialized, attr, value) 82 return specialized 83 84 85def split_frozen(cls, base=None, **kwargs): 86 frozen = specialize_class(cls, 'Frozen', base, **kwargs) 87 source = specialize_class(cls, 'Source', base, **kwargs) 88 return frozen, source 89 90 91def test_both(test_class, base=None, **kwargs): 92 return split_frozen(test_class, base, **kwargs) 93 94 95CASE_INSENSITIVE_FS = True 96# Windows is the only OS that is *always* case-insensitive 97# (OS X *can* be case-sensitive). 98if sys.platform not in ('win32', 'cygwin'): 99 changed_name = __file__.upper() 100 if changed_name == __file__: 101 changed_name = __file__.lower() 102 if not os.path.exists(changed_name): 103 CASE_INSENSITIVE_FS = False 104 105source_importlib = import_importlib('importlib')['Source'] 106__import__ = {'Frozen': staticmethod(builtins.__import__), 107 'Source': staticmethod(source_importlib.__import__)} 108 109 110def case_insensitive_tests(test): 111 """Class decorator that nullifies tests requiring a case-insensitive 112 file system.""" 113 return unittest.skipIf(not CASE_INSENSITIVE_FS, 114 "requires a case-insensitive filesystem")(test) 115 116 117def submodule(parent, name, pkg_dir, content=''): 118 path = os.path.join(pkg_dir, name + '.py') 119 with open(path, 'w', encoding='utf-8') as subfile: 120 subfile.write(content) 121 return '{}.{}'.format(parent, name), path 122 123 124def get_code_from_pyc(pyc_path): 125 """Reads a pyc file and returns the unmarshalled code object within. 126 127 No header validation is performed. 128 """ 129 with open(pyc_path, 'rb') as pyc_f: 130 pyc_f.seek(16) 131 return marshal.load(pyc_f) 132 133 134@contextlib.contextmanager 135def uncache(*names): 136 """Uncache a module from sys.modules. 137 138 A basic sanity check is performed to prevent uncaching modules that either 139 cannot/shouldn't be uncached. 140 141 """ 142 for name in names: 143 if name in ('sys', 'marshal', 'imp'): 144 raise ValueError( 145 "cannot uncache {0}".format(name)) 146 try: 147 del sys.modules[name] 148 except KeyError: 149 pass 150 try: 151 yield 152 finally: 153 for name in names: 154 try: 155 del sys.modules[name] 156 except KeyError: 157 pass 158 159 160@contextlib.contextmanager 161def temp_module(name, content='', *, pkg=False): 162 conflicts = [n for n in sys.modules if n.partition('.')[0] == name] 163 with os_helper.temp_cwd(None) as cwd: 164 with uncache(name, *conflicts): 165 with import_helper.DirsOnSysPath(cwd): 166 invalidate_caches() 167 168 location = os.path.join(cwd, name) 169 if pkg: 170 modpath = os.path.join(location, '__init__.py') 171 os.mkdir(name) 172 else: 173 modpath = location + '.py' 174 if content is None: 175 # Make sure the module file gets created. 176 content = '' 177 if content is not None: 178 # not a namespace package 179 with open(modpath, 'w', encoding='utf-8') as modfile: 180 modfile.write(content) 181 yield location 182 183 184@contextlib.contextmanager 185def import_state(**kwargs): 186 """Context manager to manage the various importers and stored state in the 187 sys module. 188 189 The 'modules' attribute is not supported as the interpreter state stores a 190 pointer to the dict that the interpreter uses internally; 191 reassigning to sys.modules does not have the desired effect. 192 193 """ 194 originals = {} 195 try: 196 for attr, default in (('meta_path', []), ('path', []), 197 ('path_hooks', []), 198 ('path_importer_cache', {})): 199 originals[attr] = getattr(sys, attr) 200 if attr in kwargs: 201 new_value = kwargs[attr] 202 del kwargs[attr] 203 else: 204 new_value = default 205 setattr(sys, attr, new_value) 206 if len(kwargs): 207 raise ValueError( 208 'unrecognized arguments: {0}'.format(kwargs.keys())) 209 yield 210 finally: 211 for attr, value in originals.items(): 212 setattr(sys, attr, value) 213 214 215class _ImporterMock: 216 217 """Base class to help with creating importer mocks.""" 218 219 def __init__(self, *names, module_code={}): 220 self.modules = {} 221 self.module_code = {} 222 for name in names: 223 if not name.endswith('.__init__'): 224 import_name = name 225 else: 226 import_name = name[:-len('.__init__')] 227 if '.' not in name: 228 package = None 229 elif import_name == name: 230 package = name.rsplit('.', 1)[0] 231 else: 232 package = import_name 233 module = types.ModuleType(import_name) 234 module.__loader__ = self 235 module.__file__ = '<mock __file__>' 236 module.__package__ = package 237 module.attr = name 238 if import_name != name: 239 module.__path__ = ['<mock __path__>'] 240 self.modules[import_name] = module 241 if import_name in module_code: 242 self.module_code[import_name] = module_code[import_name] 243 244 def __getitem__(self, name): 245 return self.modules[name] 246 247 def __enter__(self): 248 self._uncache = uncache(*self.modules.keys()) 249 self._uncache.__enter__() 250 return self 251 252 def __exit__(self, *exc_info): 253 self._uncache.__exit__(None, None, None) 254 255 256class mock_modules(_ImporterMock): 257 258 """Importer mock using PEP 302 APIs.""" 259 260 def find_module(self, fullname, path=None): 261 if fullname not in self.modules: 262 return None 263 else: 264 return self 265 266 def load_module(self, fullname): 267 if fullname not in self.modules: 268 raise ImportError 269 else: 270 sys.modules[fullname] = self.modules[fullname] 271 if fullname in self.module_code: 272 try: 273 self.module_code[fullname]() 274 except Exception: 275 del sys.modules[fullname] 276 raise 277 return self.modules[fullname] 278 279 280class mock_spec(_ImporterMock): 281 282 """Importer mock using PEP 451 APIs.""" 283 284 def find_spec(self, fullname, path=None, parent=None): 285 try: 286 module = self.modules[fullname] 287 except KeyError: 288 return None 289 spec = util.spec_from_file_location( 290 fullname, module.__file__, loader=self, 291 submodule_search_locations=getattr(module, '__path__', None)) 292 return spec 293 294 def create_module(self, spec): 295 if spec.name not in self.modules: 296 raise ImportError 297 return self.modules[spec.name] 298 299 def exec_module(self, module): 300 try: 301 self.module_code[module.__spec__.name]() 302 except KeyError: 303 pass 304 305 306def writes_bytecode_files(fxn): 307 """Decorator to protect sys.dont_write_bytecode from mutation and to skip 308 tests that require it to be set to False.""" 309 if sys.dont_write_bytecode: 310 return lambda *args, **kwargs: None 311 @functools.wraps(fxn) 312 def wrapper(*args, **kwargs): 313 original = sys.dont_write_bytecode 314 sys.dont_write_bytecode = False 315 try: 316 to_return = fxn(*args, **kwargs) 317 finally: 318 sys.dont_write_bytecode = original 319 return to_return 320 return wrapper 321 322 323def ensure_bytecode_path(bytecode_path): 324 """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. 325 326 :param bytecode_path: File system path to PEP 3147 pyc file. 327 """ 328 try: 329 os.mkdir(os.path.dirname(bytecode_path)) 330 except OSError as error: 331 if error.errno != errno.EEXIST: 332 raise 333 334 335@contextlib.contextmanager 336def temporary_pycache_prefix(prefix): 337 """Adjust and restore sys.pycache_prefix.""" 338 _orig_prefix = sys.pycache_prefix 339 sys.pycache_prefix = prefix 340 try: 341 yield 342 finally: 343 sys.pycache_prefix = _orig_prefix 344 345 346@contextlib.contextmanager 347def create_modules(*names): 348 """Temporarily create each named module with an attribute (named 'attr') 349 that contains the name passed into the context manager that caused the 350 creation of the module. 351 352 All files are created in a temporary directory returned by 353 tempfile.mkdtemp(). This directory is inserted at the beginning of 354 sys.path. When the context manager exits all created files (source and 355 bytecode) are explicitly deleted. 356 357 No magic is performed when creating packages! This means that if you create 358 a module within a package you must also create the package's __init__ as 359 well. 360 361 """ 362 source = 'attr = {0!r}' 363 created_paths = [] 364 mapping = {} 365 state_manager = None 366 uncache_manager = None 367 try: 368 temp_dir = tempfile.mkdtemp() 369 mapping['.root'] = temp_dir 370 import_names = set() 371 for name in names: 372 if not name.endswith('__init__'): 373 import_name = name 374 else: 375 import_name = name[:-len('.__init__')] 376 import_names.add(import_name) 377 if import_name in sys.modules: 378 del sys.modules[import_name] 379 name_parts = name.split('.') 380 file_path = temp_dir 381 for directory in name_parts[:-1]: 382 file_path = os.path.join(file_path, directory) 383 if not os.path.exists(file_path): 384 os.mkdir(file_path) 385 created_paths.append(file_path) 386 file_path = os.path.join(file_path, name_parts[-1] + '.py') 387 with open(file_path, 'w', encoding='utf-8') as file: 388 file.write(source.format(name)) 389 created_paths.append(file_path) 390 mapping[name] = file_path 391 uncache_manager = uncache(*import_names) 392 uncache_manager.__enter__() 393 state_manager = import_state(path=[temp_dir]) 394 state_manager.__enter__() 395 yield mapping 396 finally: 397 if state_manager is not None: 398 state_manager.__exit__(None, None, None) 399 if uncache_manager is not None: 400 uncache_manager.__exit__(None, None, None) 401 os_helper.rmtree(temp_dir) 402 403 404def mock_path_hook(*entries, importer): 405 """A mock sys.path_hooks entry.""" 406 def hook(entry): 407 if entry not in entries: 408 raise ImportError 409 return importer 410 return hook 411 412 413class CASEOKTestBase: 414 415 def caseok_env_changed(self, *, should_exist): 416 possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK' 417 if any(x in self.importlib._bootstrap_external._os.environ 418 for x in possibilities) != should_exist: 419 self.skipTest('os.environ changes not reflected in _os.environ') 420 421 422def create_package(file, path, is_package=True, contents=()): 423 class Reader(ResourceReader): 424 def get_resource_reader(self, package): 425 return self 426 427 def open_resource(self, path): 428 self._path = path 429 if isinstance(file, Exception): 430 raise file 431 else: 432 return file 433 434 def resource_path(self, path_): 435 self._path = path_ 436 if isinstance(path, Exception): 437 raise path 438 else: 439 return path 440 441 def is_resource(self, path_): 442 self._path = path_ 443 if isinstance(path, Exception): 444 raise path 445 for entry in contents: 446 parts = entry.split('/') 447 if len(parts) == 1 and parts[0] == path_: 448 return True 449 return False 450 451 def contents(self): 452 if isinstance(path, Exception): 453 raise path 454 # There's no yield from in baseball, er, Python 2. 455 for entry in contents: 456 yield entry 457 458 name = 'testingpackage' 459 # Unfortunately importlib.util.module_from_spec() was not introduced until 460 # Python 3.5. 461 module = types.ModuleType(name) 462 loader = Reader() 463 spec = machinery.ModuleSpec( 464 name, loader, 465 origin='does-not-exist', 466 is_package=is_package) 467 module.__spec__ = spec 468 module.__loader__ = loader 469 return module 470 471 472class CommonResourceTests(abc.ABC): 473 @abc.abstractmethod 474 def execute(self, package, path): 475 raise NotImplementedError 476 477 def test_package_name(self): 478 # Passing in the package name should succeed. 479 self.execute(data01.__name__, 'utf-8.file') 480 481 def test_package_object(self): 482 # Passing in the package itself should succeed. 483 self.execute(data01, 'utf-8.file') 484 485 def test_string_path(self): 486 # Passing in a string for the path should succeed. 487 path = 'utf-8.file' 488 self.execute(data01, path) 489 490 @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support') 491 def test_pathlib_path(self): 492 # Passing in a pathlib.PurePath object for the path should succeed. 493 path = PurePath('utf-8.file') 494 self.execute(data01, path) 495 496 def test_absolute_path(self): 497 # An absolute path is a ValueError. 498 path = Path(__file__) 499 full_path = path.parent/'utf-8.file' 500 with self.assertRaises(ValueError): 501 self.execute(data01, full_path) 502 503 def test_relative_path(self): 504 # A relative path is a ValueError. 505 with self.assertRaises(ValueError): 506 self.execute(data01, '../data01/utf-8.file') 507 508 def test_importing_module_as_side_effect(self): 509 # The anchor package can already be imported. 510 del sys.modules[data01.__name__] 511 self.execute(data01.__name__, 'utf-8.file') 512 513 def test_non_package_by_name(self): 514 # The anchor package cannot be a module. 515 with self.assertRaises(TypeError): 516 self.execute(__name__, 'utf-8.file') 517 518 def test_non_package_by_package(self): 519 # The anchor package cannot be a module. 520 with self.assertRaises(TypeError): 521 module = sys.modules['test.test_importlib.util'] 522 self.execute(module, 'utf-8.file') 523 524 @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2') 525 def test_resource_opener(self): 526 bytes_data = io.BytesIO(b'Hello, world!') 527 package = create_package(file=bytes_data, path=FileNotFoundError()) 528 self.execute(package, 'utf-8.file') 529 self.assertEqual(package.__loader__._path, 'utf-8.file') 530 531 @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2') 532 def test_resource_path(self): 533 bytes_data = io.BytesIO(b'Hello, world!') 534 path = __file__ 535 package = create_package(file=bytes_data, path=path) 536 self.execute(package, 'utf-8.file') 537 self.assertEqual(package.__loader__._path, 'utf-8.file') 538 539 def test_useless_loader(self): 540 package = create_package(file=FileNotFoundError(), 541 path=FileNotFoundError()) 542 with self.assertRaises(FileNotFoundError): 543 self.execute(package, 'utf-8.file') 544 545 546class ZipSetupBase: 547 ZIP_MODULE = None 548 549 @classmethod 550 def setUpClass(cls): 551 data_path = Path(cls.ZIP_MODULE.__file__) 552 data_dir = data_path.parent 553 cls._zip_path = str(data_dir / 'ziptestdata.zip') 554 sys.path.append(cls._zip_path) 555 cls.data = importlib.import_module('ziptestdata') 556 557 @classmethod 558 def tearDownClass(cls): 559 try: 560 sys.path.remove(cls._zip_path) 561 except ValueError: 562 pass 563 564 try: 565 del sys.path_importer_cache[cls._zip_path] 566 del sys.modules[cls.data.__name__] 567 except KeyError: 568 pass 569 570 try: 571 del cls.data 572 del cls._zip_path 573 except AttributeError: 574 pass 575 576 def setUp(self): 577 modules = import_helper.modules_setup() 578 self.addCleanup(import_helper.modules_cleanup, *modules) 579 580 581class ZipSetup(ZipSetupBase): 582 ZIP_MODULE = zipdata01 # type: ignore 583