1import os 2import pathlib 3import tempfile 4import functools 5import contextlib 6import types 7import importlib 8import inspect 9import warnings 10import itertools 11 12from typing import Union, Optional, cast 13from .abc import ResourceReader, Traversable 14 15Package = Union[types.ModuleType, str] 16Anchor = Package 17 18 19def package_to_anchor(func): 20 """ 21 Replace 'package' parameter as 'anchor' and warn about the change. 22 23 Other errors should fall through. 24 25 >>> files('a', 'b') 26 Traceback (most recent call last): 27 TypeError: files() takes from 0 to 1 positional arguments but 2 were given 28 29 Remove this compatibility in Python 3.14. 30 """ 31 undefined = object() 32 33 @functools.wraps(func) 34 def wrapper(anchor=undefined, package=undefined): 35 if package is not undefined: 36 if anchor is not undefined: 37 return func(anchor, package) 38 warnings.warn( 39 "First parameter to files is renamed to 'anchor'", 40 DeprecationWarning, 41 stacklevel=2, 42 ) 43 return func(package) 44 elif anchor is undefined: 45 return func() 46 return func(anchor) 47 48 return wrapper 49 50 51@package_to_anchor 52def files(anchor: Optional[Anchor] = None) -> Traversable: 53 """ 54 Get a Traversable resource for an anchor. 55 """ 56 return from_package(resolve(anchor)) 57 58 59def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: 60 """ 61 Return the package's loader if it's a ResourceReader. 62 """ 63 # We can't use 64 # a issubclass() check here because apparently abc.'s __subclasscheck__() 65 # hook wants to create a weak reference to the object, but 66 # zipimport.zipimporter does not support weak references, resulting in a 67 # TypeError. That seems terrible. 68 spec = package.__spec__ 69 reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore 70 if reader is None: 71 return None 72 return reader(spec.name) # type: ignore 73 74 75@functools.singledispatch 76def resolve(cand: Optional[Anchor]) -> types.ModuleType: 77 return cast(types.ModuleType, cand) 78 79 80@resolve.register 81def _(cand: str) -> types.ModuleType: 82 return importlib.import_module(cand) 83 84 85@resolve.register 86def _(cand: None) -> types.ModuleType: 87 return resolve(_infer_caller().f_globals['__name__']) 88 89 90def _infer_caller(): 91 """ 92 Walk the stack and find the frame of the first caller not in this module. 93 """ 94 95 def is_this_file(frame_info): 96 return frame_info.filename == __file__ 97 98 def is_wrapper(frame_info): 99 return frame_info.function == 'wrapper' 100 101 not_this_file = itertools.filterfalse(is_this_file, inspect.stack()) 102 # also exclude 'wrapper' due to singledispatch in the call stack 103 callers = itertools.filterfalse(is_wrapper, not_this_file) 104 return next(callers).frame 105 106 107def from_package(package: types.ModuleType): 108 """ 109 Return a Traversable object for the given package. 110 111 """ 112 # deferred for performance (python/cpython#109829) 113 from ._adapters import wrap_spec 114 115 spec = wrap_spec(package) 116 reader = spec.loader.get_resource_reader(spec.name) 117 return reader.files() 118 119 120@contextlib.contextmanager 121def _tempfile( 122 reader, 123 suffix='', 124 # gh-93353: Keep a reference to call os.remove() in late Python 125 # finalization. 126 *, 127 _os_remove=os.remove, 128): 129 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 130 # blocks due to the need to close the temporary file to work on Windows 131 # properly. 132 fd, raw_path = tempfile.mkstemp(suffix=suffix) 133 try: 134 try: 135 os.write(fd, reader()) 136 finally: 137 os.close(fd) 138 del reader 139 yield pathlib.Path(raw_path) 140 finally: 141 try: 142 _os_remove(raw_path) 143 except FileNotFoundError: 144 pass 145 146 147def _temp_file(path): 148 return _tempfile(path.read_bytes, suffix=path.name) 149 150 151def _is_present_dir(path: Traversable) -> bool: 152 """ 153 Some Traversables implement ``is_dir()`` to raise an 154 exception (i.e. ``FileNotFoundError``) when the 155 directory doesn't exist. This function wraps that call 156 to always return a boolean and only return True 157 if there's a dir and it exists. 158 """ 159 with contextlib.suppress(FileNotFoundError): 160 return path.is_dir() 161 return False 162 163 164@functools.singledispatch 165def as_file(path): 166 """ 167 Given a Traversable object, return that object as a 168 path on the local file system in a context manager. 169 """ 170 return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) 171 172 173@as_file.register(pathlib.Path) 174@contextlib.contextmanager 175def _(path): 176 """ 177 Degenerate behavior for pathlib.Path objects. 178 """ 179 yield path 180 181 182@contextlib.contextmanager 183def _temp_path(dir: tempfile.TemporaryDirectory): 184 """ 185 Wrap tempfile.TemporyDirectory to return a pathlib object. 186 """ 187 with dir as result: 188 yield pathlib.Path(result) 189 190 191@contextlib.contextmanager 192def _temp_dir(path): 193 """ 194 Given a traversable dir, recursively replicate the whole tree 195 to the file system in a context manager. 196 """ 197 assert path.is_dir() 198 with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: 199 yield _write_contents(temp_dir, path) 200 201 202def _write_contents(target, source): 203 child = target.joinpath(source.name) 204 if source.is_dir(): 205 child.mkdir() 206 for item in source.iterdir(): 207 _write_contents(child, item) 208 else: 209 child.write_bytes(source.read_bytes()) 210 return child 211