1import os 2 3from . import abc as resources_abc 4from . import _common 5from ._common import as_file 6from contextlib import contextmanager, suppress 7from importlib import import_module 8from importlib.abc import ResourceLoader 9from io import BytesIO, TextIOWrapper 10from pathlib import Path 11from types import ModuleType 12from typing import ContextManager, Iterable, Optional, Union 13from typing import cast 14from typing.io import BinaryIO, TextIO 15 16 17__all__ = [ 18 'Package', 19 'Resource', 20 'as_file', 21 'contents', 22 'files', 23 'is_resource', 24 'open_binary', 25 'open_text', 26 'path', 27 'read_binary', 28 'read_text', 29 ] 30 31 32Package = Union[str, ModuleType] 33Resource = Union[str, os.PathLike] 34 35 36def _resolve(name) -> ModuleType: 37 """If name is a string, resolve to a module.""" 38 if hasattr(name, '__spec__'): 39 return name 40 return import_module(name) 41 42 43def _get_package(package) -> ModuleType: 44 """Take a package name or module object and return the module. 45 46 If a name, the module is imported. If the resolved module 47 object is not a package, raise an exception. 48 """ 49 module = _resolve(package) 50 if module.__spec__.submodule_search_locations is None: 51 raise TypeError('{!r} is not a package'.format(package)) 52 return module 53 54 55def _normalize_path(path) -> str: 56 """Normalize a path by ensuring it is a string. 57 58 If the resulting string contains path separators, an exception is raised. 59 """ 60 parent, file_name = os.path.split(path) 61 if parent: 62 raise ValueError('{!r} must be only a file name'.format(path)) 63 return file_name 64 65 66def _get_resource_reader( 67 package: ModuleType) -> Optional[resources_abc.ResourceReader]: 68 # Return the package's loader if it's a ResourceReader. We can't use 69 # a issubclass() check here because apparently abc.'s __subclasscheck__() 70 # hook wants to create a weak reference to the object, but 71 # zipimport.zipimporter does not support weak references, resulting in a 72 # TypeError. That seems terrible. 73 spec = package.__spec__ 74 if hasattr(spec.loader, 'get_resource_reader'): 75 return cast(resources_abc.ResourceReader, 76 spec.loader.get_resource_reader(spec.name)) 77 return None 78 79 80def _check_location(package): 81 if package.__spec__.origin is None or not package.__spec__.has_location: 82 raise FileNotFoundError(f'Package has no location {package!r}') 83 84 85def open_binary(package: Package, resource: Resource) -> BinaryIO: 86 """Return a file-like object opened for binary reading of the resource.""" 87 resource = _normalize_path(resource) 88 package = _get_package(package) 89 reader = _get_resource_reader(package) 90 if reader is not None: 91 return reader.open_resource(resource) 92 absolute_package_path = os.path.abspath( 93 package.__spec__.origin or 'non-existent file') 94 package_path = os.path.dirname(absolute_package_path) 95 full_path = os.path.join(package_path, resource) 96 try: 97 return open(full_path, mode='rb') 98 except OSError: 99 # Just assume the loader is a resource loader; all the relevant 100 # importlib.machinery loaders are and an AttributeError for 101 # get_data() will make it clear what is needed from the loader. 102 loader = cast(ResourceLoader, package.__spec__.loader) 103 data = None 104 if hasattr(package.__spec__.loader, 'get_data'): 105 with suppress(OSError): 106 data = loader.get_data(full_path) 107 if data is None: 108 package_name = package.__spec__.name 109 message = '{!r} resource not found in {!r}'.format( 110 resource, package_name) 111 raise FileNotFoundError(message) 112 return BytesIO(data) 113 114 115def open_text(package: Package, 116 resource: Resource, 117 encoding: str = 'utf-8', 118 errors: str = 'strict') -> TextIO: 119 """Return a file-like object opened for text reading of the resource.""" 120 return TextIOWrapper( 121 open_binary(package, resource), encoding=encoding, errors=errors) 122 123 124def read_binary(package: Package, resource: Resource) -> bytes: 125 """Return the binary contents of the resource.""" 126 with open_binary(package, resource) as fp: 127 return fp.read() 128 129 130def read_text(package: Package, 131 resource: Resource, 132 encoding: str = 'utf-8', 133 errors: str = 'strict') -> str: 134 """Return the decoded string of the resource. 135 136 The decoding-related arguments have the same semantics as those of 137 bytes.decode(). 138 """ 139 with open_text(package, resource, encoding, errors) as fp: 140 return fp.read() 141 142 143def files(package: Package) -> resources_abc.Traversable: 144 """ 145 Get a Traversable resource from a package 146 """ 147 return _common.from_package(_get_package(package)) 148 149 150def path( 151 package: Package, resource: Resource, 152 ) -> 'ContextManager[Path]': 153 """A context manager providing a file path object to the resource. 154 155 If the resource does not already exist on its own on the file system, 156 a temporary file will be created. If the file was created, the file 157 will be deleted upon exiting the context manager (no exception is 158 raised if the file was deleted prior to the context manager 159 exiting). 160 """ 161 reader = _get_resource_reader(_get_package(package)) 162 return ( 163 _path_from_reader(reader, resource) 164 if reader else 165 _common.as_file(files(package).joinpath(_normalize_path(resource))) 166 ) 167 168 169@contextmanager 170def _path_from_reader(reader, resource): 171 norm_resource = _normalize_path(resource) 172 with suppress(FileNotFoundError): 173 yield Path(reader.resource_path(norm_resource)) 174 return 175 opener_reader = reader.open_resource(norm_resource) 176 with _common._tempfile(opener_reader.read, suffix=norm_resource) as res: 177 yield res 178 179 180def is_resource(package: Package, name: str) -> bool: 181 """True if 'name' is a resource inside 'package'. 182 183 Directories are *not* resources. 184 """ 185 package = _get_package(package) 186 _normalize_path(name) 187 reader = _get_resource_reader(package) 188 if reader is not None: 189 return reader.is_resource(name) 190 package_contents = set(contents(package)) 191 if name not in package_contents: 192 return False 193 return (_common.from_package(package) / name).is_file() 194 195 196def contents(package: Package) -> Iterable[str]: 197 """Return an iterable of entries in 'package'. 198 199 Note that not all entries are resources. Specifically, directories are 200 not considered resources. Use `is_resource()` on each entry returned here 201 to check if it is a resource or not. 202 """ 203 package = _get_package(package) 204 reader = _get_resource_reader(package) 205 if reader is not None: 206 return reader.contents() 207 # Is the package a namespace package? By definition, namespace packages 208 # cannot have resources. 209 namespace = ( 210 package.__spec__.origin is None or 211 package.__spec__.origin == 'namespace' 212 ) 213 if namespace or not package.__spec__.has_location: 214 return () 215 return list(item.name for item in _common.from_package(package).iterdir()) 216