1import os 2import pathlib 3import tempfile 4import functools 5import contextlib 6import types 7import importlib 8 9from typing import Union, Any, Optional 10from .abc import ResourceReader, Traversable 11 12from ._adapters import wrap_spec 13 14Package = Union[types.ModuleType, str] 15 16 17def files(package): 18 # type: (Package) -> Traversable 19 """ 20 Get a Traversable resource from a package 21 """ 22 return from_package(get_package(package)) 23 24 25def normalize_path(path): 26 # type: (Any) -> str 27 """Normalize a path by ensuring it is a string. 28 29 If the resulting string contains path separators, an exception is raised. 30 """ 31 str_path = str(path) 32 parent, file_name = os.path.split(str_path) 33 if parent: 34 raise ValueError(f'{path!r} must be only a file name') 35 return file_name 36 37 38def get_resource_reader(package): 39 # type: (types.ModuleType) -> Optional[ResourceReader] 40 """ 41 Return the package's loader if it's a ResourceReader. 42 """ 43 # We can't use 44 # a issubclass() check here because apparently abc.'s __subclasscheck__() 45 # hook wants to create a weak reference to the object, but 46 # zipimport.zipimporter does not support weak references, resulting in a 47 # TypeError. That seems terrible. 48 spec = package.__spec__ 49 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore 50 if reader is None: 51 return None 52 return reader(spec.name) # type: ignore 53 54 55def resolve(cand): 56 # type: (Package) -> types.ModuleType 57 return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand) 58 59 60def get_package(package): 61 # type: (Package) -> types.ModuleType 62 """Take a package name or module object and return the module. 63 64 Raise an exception if the resolved module is not a package. 65 """ 66 resolved = resolve(package) 67 if wrap_spec(resolved).submodule_search_locations is None: 68 raise TypeError(f'{package!r} is not a package') 69 return resolved 70 71 72def from_package(package): 73 """ 74 Return a Traversable object for the given package. 75 76 """ 77 spec = wrap_spec(package) 78 reader = spec.loader.get_resource_reader(spec.name) 79 return reader.files() 80 81 82@contextlib.contextmanager 83def _tempfile(reader, suffix=''): 84 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 85 # blocks due to the need to close the temporary file to work on Windows 86 # properly. 87 fd, raw_path = tempfile.mkstemp(suffix=suffix) 88 try: 89 os.write(fd, reader()) 90 os.close(fd) 91 del reader 92 yield pathlib.Path(raw_path) 93 finally: 94 try: 95 os.remove(raw_path) 96 except FileNotFoundError: 97 pass 98 99 100@functools.singledispatch 101def as_file(path): 102 """ 103 Given a Traversable object, return that object as a 104 path on the local file system in a context manager. 105 """ 106 return _tempfile(path.read_bytes, suffix=path.name) 107 108 109@as_file.register(pathlib.Path) 110@contextlib.contextmanager 111def _(path): 112 """ 113 Degenerate behavior for pathlib.Path objects. 114 """ 115 yield path 116