1import abc 2import builtins 3import contextlib 4import errno 5import functools 6import importlib 7from importlib import machinery, util, invalidate_caches 8from importlib.abc import ResourceReader 9import io 10import os 11import os.path 12from pathlib import Path, PurePath 13from test import support 14import unittest 15import sys 16import tempfile 17import types 18 19from . import data01 20from . import zipdata01 21 22 23BUILTINS = types.SimpleNamespace() 24BUILTINS.good_name = None 25BUILTINS.bad_name = None 26if 'errno' in sys.builtin_module_names: 27 BUILTINS.good_name = 'errno' 28if 'importlib' not in sys.builtin_module_names: 29 BUILTINS.bad_name = 'importlib' 30 31EXTENSIONS = types.SimpleNamespace() 32EXTENSIONS.path = None 33EXTENSIONS.ext = None 34EXTENSIONS.filename = None 35EXTENSIONS.file_path = None 36EXTENSIONS.name = '_testcapi' 37 38def _extension_details(): 39 global EXTENSIONS 40 for path in sys.path: 41 for ext in machinery.EXTENSION_SUFFIXES: 42 filename = EXTENSIONS.name + ext 43 file_path = os.path.join(path, filename) 44 if os.path.exists(file_path): 45 EXTENSIONS.path = path 46 EXTENSIONS.ext = ext 47 EXTENSIONS.filename = filename 48 EXTENSIONS.file_path = file_path 49 return 50 51_extension_details() 52 53 54def import_importlib(module_name): 55 """Import a module from importlib both w/ and w/o _frozen_importlib.""" 56 fresh = ('importlib',) if '.' in module_name else () 57 frozen = support.import_fresh_module(module_name) 58 source = support.import_fresh_module(module_name, fresh=fresh, 59 blocked=('_frozen_importlib', '_frozen_importlib_external')) 60 return {'Frozen': frozen, 'Source': source} 61 62 63def specialize_class(cls, kind, base=None, **kwargs): 64 # XXX Support passing in submodule names--load (and cache) them? 65 # That would clean up the test modules a bit more. 66 if base is None: 67 base = unittest.TestCase 68 elif not isinstance(base, type): 69 base = base[kind] 70 name = '{}_{}'.format(kind, cls.__name__) 71 bases = (cls, base) 72 specialized = types.new_class(name, bases) 73 specialized.__module__ = cls.__module__ 74 specialized._NAME = cls.__name__ 75 specialized._KIND = kind 76 for attr, values in kwargs.items(): 77 value = values[kind] 78 setattr(specialized, attr, value) 79 return specialized 80 81 82def split_frozen(cls, base=None, **kwargs): 83 frozen = specialize_class(cls, 'Frozen', base, **kwargs) 84 source = specialize_class(cls, 'Source', base, **kwargs) 85 return frozen, source 86 87 88def test_both(test_class, base=None, **kwargs): 89 return split_frozen(test_class, base, **kwargs) 90 91 92CASE_INSENSITIVE_FS = True 93# Windows is the only OS that is *always* case-insensitive 94# (OS X *can* be case-sensitive). 95if sys.platform not in ('win32', 'cygwin'): 96 changed_name = __file__.upper() 97 if changed_name == __file__: 98 changed_name = __file__.lower() 99 if not os.path.exists(changed_name): 100 CASE_INSENSITIVE_FS = False 101 102source_importlib = import_importlib('importlib')['Source'] 103__import__ = {'Frozen': staticmethod(builtins.__import__), 104 'Source': staticmethod(source_importlib.__import__)} 105 106 107def case_insensitive_tests(test): 108 """Class decorator that nullifies tests requiring a case-insensitive 109 file system.""" 110 return unittest.skipIf(not CASE_INSENSITIVE_FS, 111 "requires a case-insensitive filesystem")(test) 112 113 114def submodule(parent, name, pkg_dir, content=''): 115 path = os.path.join(pkg_dir, name + '.py') 116 with open(path, 'w') as subfile: 117 subfile.write(content) 118 return '{}.{}'.format(parent, name), path 119 120 121@contextlib.contextmanager 122def uncache(*names): 123 """Uncache a module from sys.modules. 124 125 A basic sanity check is performed to prevent uncaching modules that either 126 cannot/shouldn't be uncached. 127 128 """ 129 for name in names: 130 if name in ('sys', 'marshal', 'imp'): 131 raise ValueError( 132 "cannot uncache {0}".format(name)) 133 try: 134 del sys.modules[name] 135 except KeyError: 136 pass 137 try: 138 yield 139 finally: 140 for name in names: 141 try: 142 del sys.modules[name] 143 except KeyError: 144 pass 145 146 147@contextlib.contextmanager 148def temp_module(name, content='', *, pkg=False): 149 conflicts = [n for n in sys.modules if n.partition('.')[0] == name] 150 with support.temp_cwd(None) as cwd: 151 with uncache(name, *conflicts): 152 with support.DirsOnSysPath(cwd): 153 invalidate_caches() 154 155 location = os.path.join(cwd, name) 156 if pkg: 157 modpath = os.path.join(location, '__init__.py') 158 os.mkdir(name) 159 else: 160 modpath = location + '.py' 161 if content is None: 162 # Make sure the module file gets created. 163 content = '' 164 if content is not None: 165 # not a namespace package 166 with open(modpath, 'w') as modfile: 167 modfile.write(content) 168 yield location 169 170 171@contextlib.contextmanager 172def import_state(**kwargs): 173 """Context manager to manage the various importers and stored state in the 174 sys module. 175 176 The 'modules' attribute is not supported as the interpreter state stores a 177 pointer to the dict that the interpreter uses internally; 178 reassigning to sys.modules does not have the desired effect. 179 180 """ 181 originals = {} 182 try: 183 for attr, default in (('meta_path', []), ('path', []), 184 ('path_hooks', []), 185 ('path_importer_cache', {})): 186 originals[attr] = getattr(sys, attr) 187 if attr in kwargs: 188 new_value = kwargs[attr] 189 del kwargs[attr] 190 else: 191 new_value = default 192 setattr(sys, attr, new_value) 193 if len(kwargs): 194 raise ValueError( 195 'unrecognized arguments: {0}'.format(kwargs.keys())) 196 yield 197 finally: 198 for attr, value in originals.items(): 199 setattr(sys, attr, value) 200 201 202class _ImporterMock: 203 204 """Base class to help with creating importer mocks.""" 205 206 def __init__(self, *names, module_code={}): 207 self.modules = {} 208 self.module_code = {} 209 for name in names: 210 if not name.endswith('.__init__'): 211 import_name = name 212 else: 213 import_name = name[:-len('.__init__')] 214 if '.' not in name: 215 package = None 216 elif import_name == name: 217 package = name.rsplit('.', 1)[0] 218 else: 219 package = import_name 220 module = types.ModuleType(import_name) 221 module.__loader__ = self 222 module.__file__ = '<mock __file__>' 223 module.__package__ = package 224 module.attr = name 225 if import_name != name: 226 module.__path__ = ['<mock __path__>'] 227 self.modules[import_name] = module 228 if import_name in module_code: 229 self.module_code[import_name] = module_code[import_name] 230 231 def __getitem__(self, name): 232 return self.modules[name] 233 234 def __enter__(self): 235 self._uncache = uncache(*self.modules.keys()) 236 self._uncache.__enter__() 237 return self 238 239 def __exit__(self, *exc_info): 240 self._uncache.__exit__(None, None, None) 241 242 243class mock_modules(_ImporterMock): 244 245 """Importer mock using PEP 302 APIs.""" 246 247 def find_module(self, fullname, path=None): 248 if fullname not in self.modules: 249 return None 250 else: 251 return self 252 253 def load_module(self, fullname): 254 if fullname not in self.modules: 255 raise ImportError 256 else: 257 sys.modules[fullname] = self.modules[fullname] 258 if fullname in self.module_code: 259 try: 260 self.module_code[fullname]() 261 except Exception: 262 del sys.modules[fullname] 263 raise 264 return self.modules[fullname] 265 266 267class mock_spec(_ImporterMock): 268 269 """Importer mock using PEP 451 APIs.""" 270 271 def find_spec(self, fullname, path=None, parent=None): 272 try: 273 module = self.modules[fullname] 274 except KeyError: 275 return None 276 spec = util.spec_from_file_location( 277 fullname, module.__file__, loader=self, 278 submodule_search_locations=getattr(module, '__path__', None)) 279 return spec 280 281 def create_module(self, spec): 282 if spec.name not in self.modules: 283 raise ImportError 284 return self.modules[spec.name] 285 286 def exec_module(self, module): 287 try: 288 self.module_code[module.__spec__.name]() 289 except KeyError: 290 pass 291 292 293def writes_bytecode_files(fxn): 294 """Decorator to protect sys.dont_write_bytecode from mutation and to skip 295 tests that require it to be set to False.""" 296 if sys.dont_write_bytecode: 297 return lambda *args, **kwargs: None 298 @functools.wraps(fxn) 299 def wrapper(*args, **kwargs): 300 original = sys.dont_write_bytecode 301 sys.dont_write_bytecode = False 302 try: 303 to_return = fxn(*args, **kwargs) 304 finally: 305 sys.dont_write_bytecode = original 306 return to_return 307 return wrapper 308 309 310def ensure_bytecode_path(bytecode_path): 311 """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. 312 313 :param bytecode_path: File system path to PEP 3147 pyc file. 314 """ 315 try: 316 os.mkdir(os.path.dirname(bytecode_path)) 317 except OSError as error: 318 if error.errno != errno.EEXIST: 319 raise 320 321 322@contextlib.contextmanager 323def create_modules(*names): 324 """Temporarily create each named module with an attribute (named 'attr') 325 that contains the name passed into the context manager that caused the 326 creation of the module. 327 328 All files are created in a temporary directory returned by 329 tempfile.mkdtemp(). This directory is inserted at the beginning of 330 sys.path. When the context manager exits all created files (source and 331 bytecode) are explicitly deleted. 332 333 No magic is performed when creating packages! This means that if you create 334 a module within a package you must also create the package's __init__ as 335 well. 336 337 """ 338 source = 'attr = {0!r}' 339 created_paths = [] 340 mapping = {} 341 state_manager = None 342 uncache_manager = None 343 try: 344 temp_dir = tempfile.mkdtemp() 345 mapping['.root'] = temp_dir 346 import_names = set() 347 for name in names: 348 if not name.endswith('__init__'): 349 import_name = name 350 else: 351 import_name = name[:-len('.__init__')] 352 import_names.add(import_name) 353 if import_name in sys.modules: 354 del sys.modules[import_name] 355 name_parts = name.split('.') 356 file_path = temp_dir 357 for directory in name_parts[:-1]: 358 file_path = os.path.join(file_path, directory) 359 if not os.path.exists(file_path): 360 os.mkdir(file_path) 361 created_paths.append(file_path) 362 file_path = os.path.join(file_path, name_parts[-1] + '.py') 363 with open(file_path, 'w') as file: 364 file.write(source.format(name)) 365 created_paths.append(file_path) 366 mapping[name] = file_path 367 uncache_manager = uncache(*import_names) 368 uncache_manager.__enter__() 369 state_manager = import_state(path=[temp_dir]) 370 state_manager.__enter__() 371 yield mapping 372 finally: 373 if state_manager is not None: 374 state_manager.__exit__(None, None, None) 375 if uncache_manager is not None: 376 uncache_manager.__exit__(None, None, None) 377 support.rmtree(temp_dir) 378 379 380def mock_path_hook(*entries, importer): 381 """A mock sys.path_hooks entry.""" 382 def hook(entry): 383 if entry not in entries: 384 raise ImportError 385 return importer 386 return hook 387 388 389class CASEOKTestBase: 390 391 def caseok_env_changed(self, *, should_exist): 392 possibilities = b'PYTHONCASEOK', 'PYTHONCASEOK' 393 if any(x in self.importlib._bootstrap_external._os.environ 394 for x in possibilities) != should_exist: 395 self.skipTest('os.environ changes not reflected in _os.environ') 396 397 398def create_package(file, path, is_package=True, contents=()): 399 class Reader(ResourceReader): 400 def get_resource_reader(self, package): 401 return self 402 403 def open_resource(self, path): 404 self._path = path 405 if isinstance(file, Exception): 406 raise file 407 else: 408 return file 409 410 def resource_path(self, path_): 411 self._path = path_ 412 if isinstance(path, Exception): 413 raise path 414 else: 415 return path 416 417 def is_resource(self, path_): 418 self._path = path_ 419 if isinstance(path, Exception): 420 raise path 421 for entry in contents: 422 parts = entry.split('/') 423 if len(parts) == 1 and parts[0] == path_: 424 return True 425 return False 426 427 def contents(self): 428 if isinstance(path, Exception): 429 raise path 430 # There's no yield from in baseball, er, Python 2. 431 for entry in contents: 432 yield entry 433 434 name = 'testingpackage' 435 # Unforunately importlib.util.module_from_spec() was not introduced until 436 # Python 3.5. 437 module = types.ModuleType(name) 438 loader = Reader() 439 spec = machinery.ModuleSpec( 440 name, loader, 441 origin='does-not-exist', 442 is_package=is_package) 443 module.__spec__ = spec 444 module.__loader__ = loader 445 return module 446 447 448class CommonResourceTests(abc.ABC): 449 @abc.abstractmethod 450 def execute(self, package, path): 451 raise NotImplementedError 452 453 def test_package_name(self): 454 # Passing in the package name should succeed. 455 self.execute(data01.__name__, 'utf-8.file') 456 457 def test_package_object(self): 458 # Passing in the package itself should succeed. 459 self.execute(data01, 'utf-8.file') 460 461 def test_string_path(self): 462 # Passing in a string for the path should succeed. 463 path = 'utf-8.file' 464 self.execute(data01, path) 465 466 @unittest.skipIf(sys.version_info < (3, 6), 'requires os.PathLike support') 467 def test_pathlib_path(self): 468 # Passing in a pathlib.PurePath object for the path should succeed. 469 path = PurePath('utf-8.file') 470 self.execute(data01, path) 471 472 def test_absolute_path(self): 473 # An absolute path is a ValueError. 474 path = Path(__file__) 475 full_path = path.parent/'utf-8.file' 476 with self.assertRaises(ValueError): 477 self.execute(data01, full_path) 478 479 def test_relative_path(self): 480 # A reative path is a ValueError. 481 with self.assertRaises(ValueError): 482 self.execute(data01, '../data01/utf-8.file') 483 484 def test_importing_module_as_side_effect(self): 485 # The anchor package can already be imported. 486 del sys.modules[data01.__name__] 487 self.execute(data01.__name__, 'utf-8.file') 488 489 def test_non_package_by_name(self): 490 # The anchor package cannot be a module. 491 with self.assertRaises(TypeError): 492 self.execute(__name__, 'utf-8.file') 493 494 def test_non_package_by_package(self): 495 # The anchor package cannot be a module. 496 with self.assertRaises(TypeError): 497 module = sys.modules['test.test_importlib.util'] 498 self.execute(module, 'utf-8.file') 499 500 @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2') 501 def test_resource_opener(self): 502 bytes_data = io.BytesIO(b'Hello, world!') 503 package = create_package(file=bytes_data, path=FileNotFoundError()) 504 self.execute(package, 'utf-8.file') 505 self.assertEqual(package.__loader__._path, 'utf-8.file') 506 507 @unittest.skipIf(sys.version_info < (3,), 'No ResourceReader in Python 2') 508 def test_resource_path(self): 509 bytes_data = io.BytesIO(b'Hello, world!') 510 path = __file__ 511 package = create_package(file=bytes_data, path=path) 512 self.execute(package, 'utf-8.file') 513 self.assertEqual(package.__loader__._path, 'utf-8.file') 514 515 def test_useless_loader(self): 516 package = create_package(file=FileNotFoundError(), 517 path=FileNotFoundError()) 518 with self.assertRaises(FileNotFoundError): 519 self.execute(package, 'utf-8.file') 520 521 522class ZipSetupBase: 523 ZIP_MODULE = None 524 525 @classmethod 526 def setUpClass(cls): 527 data_path = Path(cls.ZIP_MODULE.__file__) 528 data_dir = data_path.parent 529 cls._zip_path = str(data_dir / 'ziptestdata.zip') 530 sys.path.append(cls._zip_path) 531 cls.data = importlib.import_module('ziptestdata') 532 533 @classmethod 534 def tearDownClass(cls): 535 try: 536 sys.path.remove(cls._zip_path) 537 except ValueError: 538 pass 539 540 try: 541 del sys.path_importer_cache[cls._zip_path] 542 del sys.modules[cls.data.__name__] 543 except KeyError: 544 pass 545 546 try: 547 del cls.data 548 del cls._zip_path 549 except AttributeError: 550 pass 551 552 def setUp(self): 553 modules = support.modules_setup() 554 self.addCleanup(support.modules_cleanup, *modules) 555 556 557class ZipSetup(ZipSetupBase): 558 ZIP_MODULE = zipdata01 # type: ignore 559