• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import io
3
4from . import _common
5from ._common import as_file, files
6from .abc import ResourceReader
7from contextlib import suppress
8from importlib.abc import ResourceLoader
9from importlib.machinery import ModuleSpec
10from io import BytesIO, TextIOWrapper
11from pathlib import Path
12from types import ModuleType
13from typing import ContextManager, Iterable, Union
14from typing import cast
15from typing.io import BinaryIO, TextIO
16from collections.abc import Sequence
17from functools import singledispatch
18
19
20__all__ = [
21    'Package',
22    'Resource',
23    'ResourceReader',
24    'as_file',
25    'contents',
26    'files',
27    'is_resource',
28    'open_binary',
29    'open_text',
30    'path',
31    'read_binary',
32    'read_text',
33]
34
35
36Package = Union[str, ModuleType]
37Resource = Union[str, os.PathLike]
38
39
40def open_binary(package: Package, resource: Resource) -> BinaryIO:
41    """Return a file-like object opened for binary reading of the resource."""
42    resource = _common.normalize_path(resource)
43    package = _common.get_package(package)
44    reader = _common.get_resource_reader(package)
45    if reader is not None:
46        return reader.open_resource(resource)
47    spec = cast(ModuleSpec, package.__spec__)
48    # Using pathlib doesn't work well here due to the lack of 'strict'
49    # argument for pathlib.Path.resolve() prior to Python 3.6.
50    if spec.submodule_search_locations is not None:
51        paths = spec.submodule_search_locations
52    elif spec.origin is not None:
53        paths = [os.path.dirname(os.path.abspath(spec.origin))]
54
55    for package_path in paths:
56        full_path = os.path.join(package_path, resource)
57        try:
58            return open(full_path, mode='rb')
59        except OSError:
60            # Just assume the loader is a resource loader; all the relevant
61            # importlib.machinery loaders are and an AttributeError for
62            # get_data() will make it clear what is needed from the loader.
63            loader = cast(ResourceLoader, spec.loader)
64            data = None
65            if hasattr(spec.loader, 'get_data'):
66                with suppress(OSError):
67                    data = loader.get_data(full_path)
68            if data is not None:
69                return BytesIO(data)
70
71    raise FileNotFoundError(f'{resource!r} resource not found in {spec.name!r}')
72
73
74def open_text(
75    package: Package,
76    resource: Resource,
77    encoding: str = 'utf-8',
78    errors: str = 'strict',
79) -> TextIO:
80    """Return a file-like object opened for text reading of the resource."""
81    return TextIOWrapper(
82        open_binary(package, resource), encoding=encoding, errors=errors
83    )
84
85
86def read_binary(package: Package, resource: Resource) -> bytes:
87    """Return the binary contents of the resource."""
88    with open_binary(package, resource) as fp:
89        return fp.read()
90
91
92def read_text(
93    package: Package,
94    resource: Resource,
95    encoding: str = 'utf-8',
96    errors: str = 'strict',
97) -> str:
98    """Return the decoded string of the resource.
99
100    The decoding-related arguments have the same semantics as those of
101    bytes.decode().
102    """
103    with open_text(package, resource, encoding, errors) as fp:
104        return fp.read()
105
106
107def path(
108    package: Package,
109    resource: Resource,
110) -> 'ContextManager[Path]':
111    """A context manager providing a file path object to the resource.
112
113    If the resource does not already exist on its own on the file system,
114    a temporary file will be created. If the file was created, the file
115    will be deleted upon exiting the context manager (no exception is
116    raised if the file was deleted prior to the context manager
117    exiting).
118    """
119    reader = _common.get_resource_reader(_common.get_package(package))
120    return (
121        _path_from_reader(reader, _common.normalize_path(resource))
122        if reader
123        else _common.as_file(
124            _common.files(package).joinpath(_common.normalize_path(resource))
125        )
126    )
127
128
129def _path_from_reader(reader, resource):
130    return _path_from_resource_path(reader, resource) or _path_from_open_resource(
131        reader, resource
132    )
133
134
135def _path_from_resource_path(reader, resource):
136    with suppress(FileNotFoundError):
137        return Path(reader.resource_path(resource))
138
139
140def _path_from_open_resource(reader, resource):
141    saved = io.BytesIO(reader.open_resource(resource).read())
142    return _common._tempfile(saved.read, suffix=resource)
143
144
145def is_resource(package: Package, name: str) -> bool:
146    """True if 'name' is a resource inside 'package'.
147
148    Directories are *not* resources.
149    """
150    package = _common.get_package(package)
151    _common.normalize_path(name)
152    reader = _common.get_resource_reader(package)
153    if reader is not None:
154        return reader.is_resource(name)
155    package_contents = set(contents(package))
156    if name not in package_contents:
157        return False
158    return (_common.from_package(package) / name).is_file()
159
160
161def contents(package: Package) -> Iterable[str]:
162    """Return an iterable of entries in 'package'.
163
164    Note that not all entries are resources.  Specifically, directories are
165    not considered resources.  Use `is_resource()` on each entry returned here
166    to check if it is a resource or not.
167    """
168    package = _common.get_package(package)
169    reader = _common.get_resource_reader(package)
170    if reader is not None:
171        return _ensure_sequence(reader.contents())
172    transversable = _common.from_package(package)
173    if transversable.is_dir():
174        return list(item.name for item in transversable.iterdir())
175    return []
176
177
178@singledispatch
179def _ensure_sequence(iterable):
180    return list(iterable)
181
182
183@_ensure_sequence.register(Sequence)
184def _(iterable):
185    return iterable
186