1import os 2import io 3 4from . import _common 5from ._common import as_file, files 6from .abc import ResourceReader 7from contextlib import suppress 8from importlib.abc import ResourceLoader 9from importlib.machinery import ModuleSpec 10from io import BytesIO, TextIOWrapper 11from pathlib import Path 12from types import ModuleType 13from typing import ContextManager, Iterable, Union 14from typing import cast 15from typing.io import BinaryIO, TextIO 16from collections.abc import Sequence 17from functools import singledispatch 18 19 20__all__ = [ 21 'Package', 22 'Resource', 23 'ResourceReader', 24 'as_file', 25 'contents', 26 'files', 27 'is_resource', 28 'open_binary', 29 'open_text', 30 'path', 31 'read_binary', 32 'read_text', 33] 34 35 36Package = Union[str, ModuleType] 37Resource = Union[str, os.PathLike] 38 39 40def open_binary(package: Package, resource: Resource) -> BinaryIO: 41 """Return a file-like object opened for binary reading of the resource.""" 42 resource = _common.normalize_path(resource) 43 package = _common.get_package(package) 44 reader = _common.get_resource_reader(package) 45 if reader is not None: 46 return reader.open_resource(resource) 47 spec = cast(ModuleSpec, package.__spec__) 48 # Using pathlib doesn't work well here due to the lack of 'strict' 49 # argument for pathlib.Path.resolve() prior to Python 3.6. 50 if spec.submodule_search_locations is not None: 51 paths = spec.submodule_search_locations 52 elif spec.origin is not None: 53 paths = [os.path.dirname(os.path.abspath(spec.origin))] 54 55 for package_path in paths: 56 full_path = os.path.join(package_path, resource) 57 try: 58 return open(full_path, mode='rb') 59 except OSError: 60 # Just assume the loader is a resource loader; all the relevant 61 # importlib.machinery loaders are and an AttributeError for 62 # get_data() will make it clear what is needed from the loader. 63 loader = cast(ResourceLoader, spec.loader) 64 data = None 65 if hasattr(spec.loader, 'get_data'): 66 with suppress(OSError): 67 data = loader.get_data(full_path) 68 if data is not None: 69 return BytesIO(data) 70 71 raise FileNotFoundError(f'{resource!r} resource not found in {spec.name!r}') 72 73 74def open_text( 75 package: Package, 76 resource: Resource, 77 encoding: str = 'utf-8', 78 errors: str = 'strict', 79) -> TextIO: 80 """Return a file-like object opened for text reading of the resource.""" 81 return TextIOWrapper( 82 open_binary(package, resource), encoding=encoding, errors=errors 83 ) 84 85 86def read_binary(package: Package, resource: Resource) -> bytes: 87 """Return the binary contents of the resource.""" 88 with open_binary(package, resource) as fp: 89 return fp.read() 90 91 92def read_text( 93 package: Package, 94 resource: Resource, 95 encoding: str = 'utf-8', 96 errors: str = 'strict', 97) -> str: 98 """Return the decoded string of the resource. 99 100 The decoding-related arguments have the same semantics as those of 101 bytes.decode(). 102 """ 103 with open_text(package, resource, encoding, errors) as fp: 104 return fp.read() 105 106 107def path( 108 package: Package, 109 resource: Resource, 110) -> 'ContextManager[Path]': 111 """A context manager providing a file path object to the resource. 112 113 If the resource does not already exist on its own on the file system, 114 a temporary file will be created. If the file was created, the file 115 will be deleted upon exiting the context manager (no exception is 116 raised if the file was deleted prior to the context manager 117 exiting). 118 """ 119 reader = _common.get_resource_reader(_common.get_package(package)) 120 return ( 121 _path_from_reader(reader, _common.normalize_path(resource)) 122 if reader 123 else _common.as_file( 124 _common.files(package).joinpath(_common.normalize_path(resource)) 125 ) 126 ) 127 128 129def _path_from_reader(reader, resource): 130 return _path_from_resource_path(reader, resource) or _path_from_open_resource( 131 reader, resource 132 ) 133 134 135def _path_from_resource_path(reader, resource): 136 with suppress(FileNotFoundError): 137 return Path(reader.resource_path(resource)) 138 139 140def _path_from_open_resource(reader, resource): 141 saved = io.BytesIO(reader.open_resource(resource).read()) 142 return _common._tempfile(saved.read, suffix=resource) 143 144 145def is_resource(package: Package, name: str) -> bool: 146 """True if 'name' is a resource inside 'package'. 147 148 Directories are *not* resources. 149 """ 150 package = _common.get_package(package) 151 _common.normalize_path(name) 152 reader = _common.get_resource_reader(package) 153 if reader is not None: 154 return reader.is_resource(name) 155 package_contents = set(contents(package)) 156 if name not in package_contents: 157 return False 158 return (_common.from_package(package) / name).is_file() 159 160 161def contents(package: Package) -> Iterable[str]: 162 """Return an iterable of entries in 'package'. 163 164 Note that not all entries are resources. Specifically, directories are 165 not considered resources. Use `is_resource()` on each entry returned here 166 to check if it is a resource or not. 167 """ 168 package = _common.get_package(package) 169 reader = _common.get_resource_reader(package) 170 if reader is not None: 171 return _ensure_sequence(reader.contents()) 172 transversable = _common.from_package(package) 173 if transversable.is_dir(): 174 return list(item.name for item in transversable.iterdir()) 175 return [] 176 177 178@singledispatch 179def _ensure_sequence(iterable): 180 return list(iterable) 181 182 183@_ensure_sequence.register(Sequence) 184def _(iterable): 185 return iterable 186