1import os 2import tempfile 3 4from . import abc as resources_abc 5from contextlib import contextmanager, suppress 6from importlib import import_module 7from importlib.abc import ResourceLoader 8from io import BytesIO, TextIOWrapper 9from pathlib import Path 10from types import ModuleType 11from typing import Iterable, Iterator, Optional, Set, Union # noqa: F401 12from typing import cast 13from typing.io import BinaryIO, TextIO 14from zipimport import ZipImportError 15 16 17__all__ = [ 18 'Package', 19 'Resource', 20 'contents', 21 'is_resource', 22 'open_binary', 23 'open_text', 24 'path', 25 'read_binary', 26 'read_text', 27 ] 28 29 30Package = Union[str, ModuleType] 31Resource = Union[str, os.PathLike] 32 33 34def _get_package(package) -> ModuleType: 35 """Take a package name or module object and return the module. 36 37 If a name, the module is imported. If the passed or imported module 38 object is not a package, raise an exception. 39 """ 40 if hasattr(package, '__spec__'): 41 if package.__spec__.submodule_search_locations is None: 42 raise TypeError('{!r} is not a package'.format( 43 package.__spec__.name)) 44 else: 45 return package 46 else: 47 module = import_module(package) 48 if module.__spec__.submodule_search_locations is None: 49 raise TypeError('{!r} is not a package'.format(package)) 50 else: 51 return module 52 53 54def _normalize_path(path) -> str: 55 """Normalize a path by ensuring it is a string. 56 57 If the resulting string contains path separators, an exception is raised. 58 """ 59 parent, file_name = os.path.split(path) 60 if parent: 61 raise ValueError('{!r} must be only a file name'.format(path)) 62 else: 63 return file_name 64 65 66def _get_resource_reader( 67 package: ModuleType) -> Optional[resources_abc.ResourceReader]: 68 # Return the package's loader if it's a ResourceReader. We can't use 69 # a issubclass() check here because apparently abc.'s __subclasscheck__() 70 # hook wants to create a weak reference to the object, but 71 # zipimport.zipimporter does not support weak references, resulting in a 72 # TypeError. That seems terrible. 73 spec = package.__spec__ 74 if hasattr(spec.loader, 'get_resource_reader'): 75 return cast(resources_abc.ResourceReader, 76 spec.loader.get_resource_reader(spec.name)) 77 return None 78 79 80def _check_location(package): 81 if package.__spec__.origin is None or not package.__spec__.has_location: 82 raise FileNotFoundError(f'Package has no location {package!r}') 83 84 85def open_binary(package: Package, resource: Resource) -> BinaryIO: 86 """Return a file-like object opened for binary reading of the resource.""" 87 resource = _normalize_path(resource) 88 package = _get_package(package) 89 reader = _get_resource_reader(package) 90 if reader is not None: 91 return reader.open_resource(resource) 92 _check_location(package) 93 absolute_package_path = os.path.abspath(package.__spec__.origin) 94 package_path = os.path.dirname(absolute_package_path) 95 full_path = os.path.join(package_path, resource) 96 try: 97 return open(full_path, mode='rb') 98 except OSError: 99 # Just assume the loader is a resource loader; all the relevant 100 # importlib.machinery loaders are and an AttributeError for 101 # get_data() will make it clear what is needed from the loader. 102 loader = cast(ResourceLoader, package.__spec__.loader) 103 data = None 104 if hasattr(package.__spec__.loader, 'get_data'): 105 with suppress(OSError): 106 data = loader.get_data(full_path) 107 if data is None: 108 package_name = package.__spec__.name 109 message = '{!r} resource not found in {!r}'.format( 110 resource, package_name) 111 raise FileNotFoundError(message) 112 else: 113 return BytesIO(data) 114 115 116def open_text(package: Package, 117 resource: Resource, 118 encoding: str = 'utf-8', 119 errors: str = 'strict') -> TextIO: 120 """Return a file-like object opened for text reading of the resource.""" 121 resource = _normalize_path(resource) 122 package = _get_package(package) 123 reader = _get_resource_reader(package) 124 if reader is not None: 125 return TextIOWrapper(reader.open_resource(resource), encoding, errors) 126 _check_location(package) 127 absolute_package_path = os.path.abspath(package.__spec__.origin) 128 package_path = os.path.dirname(absolute_package_path) 129 full_path = os.path.join(package_path, resource) 130 try: 131 return open(full_path, mode='r', encoding=encoding, errors=errors) 132 except OSError: 133 # Just assume the loader is a resource loader; all the relevant 134 # importlib.machinery loaders are and an AttributeError for 135 # get_data() will make it clear what is needed from the loader. 136 loader = cast(ResourceLoader, package.__spec__.loader) 137 data = None 138 if hasattr(package.__spec__.loader, 'get_data'): 139 with suppress(OSError): 140 data = loader.get_data(full_path) 141 if data is None: 142 package_name = package.__spec__.name 143 message = '{!r} resource not found in {!r}'.format( 144 resource, package_name) 145 raise FileNotFoundError(message) 146 else: 147 return TextIOWrapper(BytesIO(data), encoding, errors) 148 149 150def read_binary(package: Package, resource: Resource) -> bytes: 151 """Return the binary contents of the resource.""" 152 resource = _normalize_path(resource) 153 package = _get_package(package) 154 with open_binary(package, resource) as fp: 155 return fp.read() 156 157 158def read_text(package: Package, 159 resource: Resource, 160 encoding: str = 'utf-8', 161 errors: str = 'strict') -> str: 162 """Return the decoded string of the resource. 163 164 The decoding-related arguments have the same semantics as those of 165 bytes.decode(). 166 """ 167 resource = _normalize_path(resource) 168 package = _get_package(package) 169 with open_text(package, resource, encoding, errors) as fp: 170 return fp.read() 171 172 173@contextmanager 174def path(package: Package, resource: Resource) -> Iterator[Path]: 175 """A context manager providing a file path object to the resource. 176 177 If the resource does not already exist on its own on the file system, 178 a temporary file will be created. If the file was created, the file 179 will be deleted upon exiting the context manager (no exception is 180 raised if the file was deleted prior to the context manager 181 exiting). 182 """ 183 resource = _normalize_path(resource) 184 package = _get_package(package) 185 reader = _get_resource_reader(package) 186 if reader is not None: 187 try: 188 yield Path(reader.resource_path(resource)) 189 return 190 except FileNotFoundError: 191 pass 192 else: 193 _check_location(package) 194 # Fall-through for both the lack of resource_path() *and* if 195 # resource_path() raises FileNotFoundError. 196 package_directory = Path(package.__spec__.origin).parent 197 file_path = package_directory / resource 198 if file_path.exists(): 199 yield file_path 200 else: 201 with open_binary(package, resource) as fp: 202 data = fp.read() 203 # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' 204 # blocks due to the need to close the temporary file to work on 205 # Windows properly. 206 fd, raw_path = tempfile.mkstemp() 207 try: 208 os.write(fd, data) 209 os.close(fd) 210 yield Path(raw_path) 211 finally: 212 try: 213 os.remove(raw_path) 214 except FileNotFoundError: 215 pass 216 217 218def is_resource(package: Package, name: str) -> bool: 219 """True if 'name' is a resource inside 'package'. 220 221 Directories are *not* resources. 222 """ 223 package = _get_package(package) 224 _normalize_path(name) 225 reader = _get_resource_reader(package) 226 if reader is not None: 227 return reader.is_resource(name) 228 try: 229 package_contents = set(contents(package)) 230 except (NotADirectoryError, FileNotFoundError): 231 return False 232 if name not in package_contents: 233 return False 234 # Just because the given file_name lives as an entry in the package's 235 # contents doesn't necessarily mean it's a resource. Directories are not 236 # resources, so let's try to find out if it's a directory or not. 237 path = Path(package.__spec__.origin).parent / name 238 return path.is_file() 239 240 241def contents(package: Package) -> Iterable[str]: 242 """Return an iterable of entries in 'package'. 243 244 Note that not all entries are resources. Specifically, directories are 245 not considered resources. Use `is_resource()` on each entry returned here 246 to check if it is a resource or not. 247 """ 248 package = _get_package(package) 249 reader = _get_resource_reader(package) 250 if reader is not None: 251 return reader.contents() 252 # Is the package a namespace package? By definition, namespace packages 253 # cannot have resources. We could use _check_location() and catch the 254 # exception, but that's extra work, so just inline the check. 255 elif package.__spec__.origin is None or not package.__spec__.has_location: 256 return () 257 else: 258 package_directory = Path(package.__spec__.origin).parent 259 return os.listdir(package_directory) 260