1import os 2import tempfile 3 4from . import abc as resources_abc 5from contextlib import contextmanager, suppress 6from importlib import import_module 7from importlib.abc import ResourceLoader 8from io import BytesIO, TextIOWrapper 9from pathlib import Path 10from types import ModuleType 11from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401 12from typing import cast 13from typing.io import BinaryIO, TextIO 14from zipimport import ZipImportError 15 16 17__all__ = [ 18 'Package', 19 'Resource', 20 'contents', 21 'is_resource', 22 'open_binary', 23 'open_text', 24 'path', 25 'read_binary', 26 'read_text', 27 ] 28 29 30Package = Union[str, ModuleType] 31Resource = Union[str, os.PathLike] 32 33 34def _get_package(package) -> ModuleType: 35 """Take a package name or module object and return the module. 36 37 If a name, the module is imported. If the passed or imported module 38 object is not a package, raise an exception. 39 """ 40 if hasattr(package, '__spec__'): 41 if package.__spec__.submodule_search_locations is None: 42 raise TypeError('{!r} is not a package'.format( 43 package.__spec__.name)) 44 else: 45 return package 46 else: 47 module = import_module(package) 48 if module.__spec__.submodule_search_locations is None: 49 raise TypeError('{!r} is not a package'.format(package)) 50 else: 51 return module 52 53 54def _normalize_path(path) -> str: 55 """Normalize a path by ensuring it is a string. 56 57 If the resulting string contains path separators, an exception is raised. 58 """ 59 parent, file_name = os.path.split(path) 60 if parent: 61 raise ValueError('{!r} must be only a file name'.format(path)) 62 else: 63 return file_name 64 65 66def _get_resource_reader( 67 package: ModuleType) -> Optional[resources_abc.ResourceReader]: 68 # Return the package's loader if it's a ResourceReader. We can't use 69 # a issubclass() check here because apparently abc.'s __subclasscheck__() 70 # hook wants to create a weak reference to the object, but 71 # zipimport.zipimporter does not support weak references, resulting in a 72 # TypeError. That seems terrible. 73 spec = package.__spec__ 74 if hasattr(spec.loader, 'get_resource_reader'): 75 return cast(resources_abc.ResourceReader, 76 spec.loader.get_resource_reader(spec.name)) 77 return None 78 79 80def _check_location(package): 81 if package.__spec__.origin is None or not package.__spec__.has_location: 82 raise FileNotFoundError(f'Package has no location {package!r}') 83 84 85def open_binary(package: Package, resource: Resource) -> BinaryIO: 86 """Return a file-like object opened for binary reading of the resource.""" 87 resource = _normalize_path(resource) 88 package = _get_package(package) 89 reader = _get_resource_reader(package) 90 if reader is not None: 91 return reader.open_resource(resource) 92 _check_location(package) 93 absolute_package_path = os.path.abspath(package.__spec__.origin) 94 package_path = os.path.dirname(absolute_package_path) 95 full_path = os.path.join(package_path, resource) 96 try: 97 return open(full_path, mode='rb') 98 except OSError: 99 # Just assume the loader is a resource loader; all the relevant 100 # importlib.machinery loaders are and an AttributeError for 101 # get_data() will make it clear what is needed from the loader. 102 loader = cast(ResourceLoader, package.__spec__.loader) 103 data = None 104 if hasattr(package.__spec__.loader, 'get_data'): 105 with suppress(OSError): 106 data = loader.get_data(full_path) 107 if data is None: 108 package_name = package.__spec__.name 109 message = '{!r} resource not found in {!r}'.format( 110 resource, package_name) 111 raise FileNotFoundError(message) 112 else: 113 return BytesIO(data) 114 115 116def open_text(package: Package, 117 resource: Resource, 118 encoding: str = 'utf-8', 119 errors: str = 'strict') -> TextIO: 120 """Return a file-like object opened for text reading of the resource.""" 121 resource = _normalize_path(resource) 122 package = _get_package(package) 123 reader = _get_resource_reader(package) 124 if reader is not None: 125 return TextIOWrapper(reader.open_resource(resource), encoding, errors) 126 _check_location(package) 127 absolute_package_path = os.path.abspath(package.__spec__.origin) 128 package_path = os.path.dirname(absolute_package_path) 129 full_path = os.path.join(package_path, resource) 130 try: 131 return open(full_path, mode='r', encoding=encoding, errors=errors) 132 except OSError: 133 # Just assume the loader is a resource loader; all the relevant 134 # importlib.machinery loaders are and an AttributeError for 135 # get_data() will make it clear what is needed from the loader. 136 loader = cast(ResourceLoader, package.__spec__.loader) 137 data = None 138 if hasattr(package.__spec__.loader, 'get_data'): 139 with suppress(OSError): 140 data = loader.get_data(full_path) 141 if data is None: 142 package_name = package.__spec__.name 143 message = '{!r} resource not found in {!r}'.format( 144 resource, package_name) 145 raise FileNotFoundError(message) 146 else: 147 return TextIOWrapper(BytesIO(data), encoding, errors) 148 149 150def read_binary(package: Package, resource: Resource) -> bytes: 151 """Return the binary contents of the resource.""" 152 resource = _normalize_path(resource) 153 package = _get_package(package) 154 with open_binary(package, resource) as fp: 155 return fp.read() 156 157 158def read_text(package: Package, 159 resource: Resource, 160 encoding: str = 'utf-8', 161 errors: str = 'strict') -> str: 162 """Return the decoded string of the resource. 163 164 The decoding-related arguments have the same semantics as those of 165 bytes.decode(). 166 """ 167 resource = _normalize_path(resource) 168 package = _get_package(package) 169 with open_text(package, resource, encoding, errors) as fp: 170 return fp.read() 171 172 173@contextmanager 174def path(package: Package, resource: Resource) -> Iterator[Path]: 175 """A context manager providing a file path object to the resource. 176 177 If the resource does not already exist on its own on the file system, 178 a temporary file will be created. If the file was created, the file 179 will be deleted upon exiting the context manager (no exception is 180 raised if the file was deleted prior to the context manager 181 exiting). 182 """ 183 resource = _normalize_path(resource) 184 package = _get_package(package) 185 reader = _get_resource_reader(package) 186 if reader is not None: 187 try: 188 yield Path(reader.resource_path(resource)) 189 return 190 except FileNotFoundError: 191 pass 192 else: 193 _check_location(package) 194 # Fall-through for both the lack of resource_path() *and* if 195 # resource_path() raises FileNotFoundError. 196 package_directory = Path(package.__spec__.origin).parent 197 file_path = package_directory / resource 198 if file_path.exists(): 199 yield file_path 200 else: 201 with open_binary(package, resource) as fp: 202 data = fp.read() 203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 204 # blocks due to the need to close the temporary file to work on 205 # Windows properly. 206 fd, raw_path = tempfile.mkstemp() 207 try: 208 os.write(fd, data) 209 os.close(fd) 210 yield Path(raw_path) 211 finally: 212 try: 213 os.remove(raw_path) 214 except FileNotFoundError: 215 pass 216 217 218def is_resource(package: Package, name: str) -> bool: 219 """True if 'name' is a resource inside 'package'. 220 221 Directories are *not* resources. 222 """ 223 package = _get_package(package) 224 _normalize_path(name) 225 reader = _get_resource_reader(package) 226 if reader is not None: 227 return reader.is_resource(name) 228 try: 229 package_contents = set(contents(package)) 230 except (NotADirectoryError, FileNotFoundError): 231 return False 232 if name not in package_contents: 233 return False 234 # Just because the given file_name lives as an entry in the package's 235 # contents doesn't necessarily mean it's a resource. Directories are not 236 # resources, so let's try to find out if it's a directory or not. 237 path = Path(package.__spec__.origin).parent / name 238 return path.is_file() 239 240 241def contents(package: Package) -> Iterable[str]: 242 """Return an iterable of entries in 'package'. 243 244 Note that not all entries are resources. Specifically, directories are 245 not considered resources. Use `is_resource()` on each entry returned here 246 to check if it is a resource or not. 247 """ 248 package = _get_package(package) 249 reader = _get_resource_reader(package) 250 if reader is not None: 251 return reader.contents() 252 # Is the package a namespace package? By definition, namespace packages 253 # cannot have resources. We could use _check_location() and catch the 254 # exception, but that's extra work, so just inline the check. 255 elif package.__spec__.origin is None or not package.__spec__.has_location: 256 return () 257 else: 258 package_directory = Path(package.__spec__.origin).parent 259 return os.listdir(package_directory) 260 261 262# Private implementation of ResourceReader and get_resource_reader() called 263# from zipimport.c. Don't use these directly! We're implementing these in 264# Python because 1) it's easier, 2) zipimport may get rewritten in Python 265# itself at some point, so doing this all in C would difficult and a waste of 266# effort. 267 268class _ZipImportResourceReader(resources_abc.ResourceReader): 269 """Private class used to support ZipImport.get_resource_reader(). 270 271 This class is allowed to reference all the innards and private parts of 272 the zipimporter. 273 """ 274 275 def __init__(self, zipimporter, fullname): 276 self.zipimporter = zipimporter 277 self.fullname = fullname 278 279 def open_resource(self, resource): 280 fullname_as_path = self.fullname.replace('.', '/') 281 path = f'{fullname_as_path}/{resource}' 282 try: 283 return BytesIO(self.zipimporter.get_data(path)) 284 except OSError: 285 raise FileNotFoundError(path) 286 287 def resource_path(self, resource): 288 # All resources are in the zip file, so there is no path to the file. 289 # Raising FileNotFoundError tells the higher level API to extract the 290 # binary data and create a temporary file. 291 raise FileNotFoundError 292 293 def is_resource(self, name): 294 # Maybe we could do better, but if we can get the data, it's a 295 # resource. Otherwise it isn't. 296 fullname_as_path = self.fullname.replace('.', '/') 297 path = f'{fullname_as_path}/{name}' 298 try: 299 self.zipimporter.get_data(path) 300 except OSError: 301 return False 302 return True 303 304 def contents(self): 305 # This is a bit convoluted, because fullname will be a module path, 306 # but _files is a list of file names relative to the top of the 307 # archive's namespace. We want to compare file paths to find all the 308 # names of things inside the module represented by fullname. So we 309 # turn the module path of fullname into a file path relative to the 310 # top of the archive, and then we iterate through _files looking for 311 # names inside that "directory". 312 fullname_path = Path(self.zipimporter.get_filename(self.fullname)) 313 relative_path = fullname_path.relative_to(self.zipimporter.archive) 314 # Don't forget that fullname names a package, so its path will include 315 # __init__.py, which we want to ignore. 316 assert relative_path.name == '__init__.py' 317 package_path = relative_path.parent 318 subdirs_seen = set() 319 for filename in self.zipimporter._files: 320 try: 321 relative = Path(filename).relative_to(package_path) 322 except ValueError: 323 continue 324 # If the path of the file (which is relative to the top of the zip 325 # namespace), relative to the package given when the resource 326 # reader was created, has a parent, then it's a name in a 327 # subdirectory and thus we skip it. 328 parent_name = relative.parent.name 329 if len(parent_name) == 0: 330 yield relative.name 331 elif parent_name not in subdirs_seen: 332 subdirs_seen.add(parent_name) 333 yield parent_name 334 335 336# Called from zipimport.c 337def _zipimport_get_resource_reader(zipimporter, fullname): 338 try: 339 if not zipimporter.is_package(fullname): 340 return None 341 except ZipImportError: 342 return None 343 return _ZipImportResourceReader(zipimporter, fullname) 344