• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import tempfile
3
4from . import abc as resources_abc
5from contextlib import contextmanager, suppress
6from importlib import import_module
7from importlib.abc import ResourceLoader
8from io import BytesIO, TextIOWrapper
9from pathlib import Path
10from types import ModuleType
11from typing import Iterable, Iterator, Optional, Set, Union   # noqa: F401
12from typing import cast
13from typing.io import BinaryIO, TextIO
14from zipimport import ZipImportError
15
16
17__all__ = [
18    'Package',
19    'Resource',
20    'contents',
21    'is_resource',
22    'open_binary',
23    'open_text',
24    'path',
25    'read_binary',
26    'read_text',
27    ]
28
29
30Package = Union[str, ModuleType]
31Resource = Union[str, os.PathLike]
32
33
34def _get_package(package) -> ModuleType:
35    """Take a package name or module object and return the module.
36
37    If a name, the module is imported.  If the passed or imported module
38    object is not a package, raise an exception.
39    """
40    if hasattr(package, '__spec__'):
41        if package.__spec__.submodule_search_locations is None:
42            raise TypeError('{!r} is not a package'.format(
43                package.__spec__.name))
44        else:
45            return package
46    else:
47        module = import_module(package)
48        if module.__spec__.submodule_search_locations is None:
49            raise TypeError('{!r} is not a package'.format(package))
50        else:
51            return module
52
53
54def _normalize_path(path) -> str:
55    """Normalize a path by ensuring it is a string.
56
57    If the resulting string contains path separators, an exception is raised.
58    """
59    parent, file_name = os.path.split(path)
60    if parent:
61        raise ValueError('{!r} must be only a file name'.format(path))
62    else:
63        return file_name
64
65
66def _get_resource_reader(
67        package: ModuleType) -> Optional[resources_abc.ResourceReader]:
68    # Return the package's loader if it's a ResourceReader.  We can't use
69    # a issubclass() check here because apparently abc.'s __subclasscheck__()
70    # hook wants to create a weak reference to the object, but
71    # zipimport.zipimporter does not support weak references, resulting in a
72    # TypeError.  That seems terrible.
73    spec = package.__spec__
74    if hasattr(spec.loader, 'get_resource_reader'):
75        return cast(resources_abc.ResourceReader,
76                    spec.loader.get_resource_reader(spec.name))
77    return None
78
79
80def _check_location(package):
81    if package.__spec__.origin is None or not package.__spec__.has_location:
82        raise FileNotFoundError(f'Package has no location {package!r}')
83
84
85def open_binary(package: Package, resource: Resource) -> BinaryIO:
86    """Return a file-like object opened for binary reading of the resource."""
87    resource = _normalize_path(resource)
88    package = _get_package(package)
89    reader = _get_resource_reader(package)
90    if reader is not None:
91        return reader.open_resource(resource)
92    _check_location(package)
93    absolute_package_path = os.path.abspath(package.__spec__.origin)
94    package_path = os.path.dirname(absolute_package_path)
95    full_path = os.path.join(package_path, resource)
96    try:
97        return open(full_path, mode='rb')
98    except OSError:
99        # Just assume the loader is a resource loader; all the relevant
100        # importlib.machinery loaders are and an AttributeError for
101        # get_data() will make it clear what is needed from the loader.
102        loader = cast(ResourceLoader, package.__spec__.loader)
103        data = None
104        if hasattr(package.__spec__.loader, 'get_data'):
105            with suppress(OSError):
106                data = loader.get_data(full_path)
107        if data is None:
108            package_name = package.__spec__.name
109            message = '{!r} resource not found in {!r}'.format(
110                resource, package_name)
111            raise FileNotFoundError(message)
112        else:
113            return BytesIO(data)
114
115
116def open_text(package: Package,
117              resource: Resource,
118              encoding: str = 'utf-8',
119              errors: str = 'strict') -> TextIO:
120    """Return a file-like object opened for text reading of the resource."""
121    resource = _normalize_path(resource)
122    package = _get_package(package)
123    reader = _get_resource_reader(package)
124    if reader is not None:
125        return TextIOWrapper(reader.open_resource(resource), encoding, errors)
126    _check_location(package)
127    absolute_package_path = os.path.abspath(package.__spec__.origin)
128    package_path = os.path.dirname(absolute_package_path)
129    full_path = os.path.join(package_path, resource)
130    try:
131        return open(full_path, mode='r', encoding=encoding, errors=errors)
132    except OSError:
133        # Just assume the loader is a resource loader; all the relevant
134        # importlib.machinery loaders are and an AttributeError for
135        # get_data() will make it clear what is needed from the loader.
136        loader = cast(ResourceLoader, package.__spec__.loader)
137        data = None
138        if hasattr(package.__spec__.loader, 'get_data'):
139            with suppress(OSError):
140                data = loader.get_data(full_path)
141        if data is None:
142            package_name = package.__spec__.name
143            message = '{!r} resource not found in {!r}'.format(
144                resource, package_name)
145            raise FileNotFoundError(message)
146        else:
147            return TextIOWrapper(BytesIO(data), encoding, errors)
148
149
150def read_binary(package: Package, resource: Resource) -> bytes:
151    """Return the binary contents of the resource."""
152    resource = _normalize_path(resource)
153    package = _get_package(package)
154    with open_binary(package, resource) as fp:
155        return fp.read()
156
157
158def read_text(package: Package,
159              resource: Resource,
160              encoding: str = 'utf-8',
161              errors: str = 'strict') -> str:
162    """Return the decoded string of the resource.
163
164    The decoding-related arguments have the same semantics as those of
165    bytes.decode().
166    """
167    resource = _normalize_path(resource)
168    package = _get_package(package)
169    with open_text(package, resource, encoding, errors) as fp:
170        return fp.read()
171
172
173@contextmanager
174def path(package: Package, resource: Resource) -> Iterator[Path]:
175    """A context manager providing a file path object to the resource.
176
177    If the resource does not already exist on its own on the file system,
178    a temporary file will be created. If the file was created, the file
179    will be deleted upon exiting the context manager (no exception is
180    raised if the file was deleted prior to the context manager
181    exiting).
182    """
183    resource = _normalize_path(resource)
184    package = _get_package(package)
185    reader = _get_resource_reader(package)
186    if reader is not None:
187        try:
188            yield Path(reader.resource_path(resource))
189            return
190        except FileNotFoundError:
191            pass
192    else:
193        _check_location(package)
194    # Fall-through for both the lack of resource_path() *and* if
195    # resource_path() raises FileNotFoundError.
196    package_directory = Path(package.__spec__.origin).parent
197    file_path = package_directory / resource
198    if file_path.exists():
199        yield file_path
200    else:
201        with open_binary(package, resource) as fp:
202            data = fp.read()
203        # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
204        # blocks due to the need to close the temporary file to work on
205        # Windows properly.
206        fd, raw_path = tempfile.mkstemp()
207        try:
208            os.write(fd, data)
209            os.close(fd)
210            yield Path(raw_path)
211        finally:
212            try:
213                os.remove(raw_path)
214            except FileNotFoundError:
215                pass
216
217
218def is_resource(package: Package, name: str) -> bool:
219    """True if 'name' is a resource inside 'package'.
220
221    Directories are *not* resources.
222    """
223    package = _get_package(package)
224    _normalize_path(name)
225    reader = _get_resource_reader(package)
226    if reader is not None:
227        return reader.is_resource(name)
228    try:
229        package_contents = set(contents(package))
230    except (NotADirectoryError, FileNotFoundError):
231        return False
232    if name not in package_contents:
233        return False
234    # Just because the given file_name lives as an entry in the package's
235    # contents doesn't necessarily mean it's a resource.  Directories are not
236    # resources, so let's try to find out if it's a directory or not.
237    path = Path(package.__spec__.origin).parent / name
238    return path.is_file()
239
240
241def contents(package: Package) -> Iterable[str]:
242    """Return an iterable of entries in 'package'.
243
244    Note that not all entries are resources.  Specifically, directories are
245    not considered resources.  Use `is_resource()` on each entry returned here
246    to check if it is a resource or not.
247    """
248    package = _get_package(package)
249    reader = _get_resource_reader(package)
250    if reader is not None:
251        return reader.contents()
252    # Is the package a namespace package?  By definition, namespace packages
253    # cannot have resources.  We could use _check_location() and catch the
254    # exception, but that's extra work, so just inline the check.
255    elif package.__spec__.origin is None or not package.__spec__.has_location:
256        return ()
257    else:
258        package_directory = Path(package.__spec__.origin).parent
259        return os.listdir(package_directory)
260
261
262# Private implementation of ResourceReader and get_resource_reader() called
263# from zipimport.c.  Don't use these directly!  We're implementing these in
264# Python because 1) it's easier, 2) zipimport may get rewritten in Python
265# itself at some point, so doing this all in C would difficult and a waste of
266# effort.
267
268class _ZipImportResourceReader(resources_abc.ResourceReader):
269    """Private class used to support ZipImport.get_resource_reader().
270
271    This class is allowed to reference all the innards and private parts of
272    the zipimporter.
273    """
274
275    def __init__(self, zipimporter, fullname):
276        self.zipimporter = zipimporter
277        self.fullname = fullname
278
279    def open_resource(self, resource):
280        fullname_as_path = self.fullname.replace('.', '/')
281        path = f'{fullname_as_path}/{resource}'
282        try:
283            return BytesIO(self.zipimporter.get_data(path))
284        except OSError:
285            raise FileNotFoundError(path)
286
287    def resource_path(self, resource):
288        # All resources are in the zip file, so there is no path to the file.
289        # Raising FileNotFoundError tells the higher level API to extract the
290        # binary data and create a temporary file.
291        raise FileNotFoundError
292
293    def is_resource(self, name):
294        # Maybe we could do better, but if we can get the data, it's a
295        # resource.  Otherwise it isn't.
296        fullname_as_path = self.fullname.replace('.', '/')
297        path = f'{fullname_as_path}/{name}'
298        try:
299            self.zipimporter.get_data(path)
300        except OSError:
301            return False
302        return True
303
304    def contents(self):
305        # This is a bit convoluted, because fullname will be a module path,
306        # but _files is a list of file names relative to the top of the
307        # archive's namespace.  We want to compare file paths to find all the
308        # names of things inside the module represented by fullname.  So we
309        # turn the module path of fullname into a file path relative to the
310        # top of the archive, and then we iterate through _files looking for
311        # names inside that "directory".
312        fullname_path = Path(self.zipimporter.get_filename(self.fullname))
313        relative_path = fullname_path.relative_to(self.zipimporter.archive)
314        # Don't forget that fullname names a package, so its path will include
315        # __init__.py, which we want to ignore.
316        assert relative_path.name == '__init__.py'
317        package_path = relative_path.parent
318        subdirs_seen = set()
319        for filename in self.zipimporter._files:
320            try:
321                relative = Path(filename).relative_to(package_path)
322            except ValueError:
323                continue
324            # If the path of the file (which is relative to the top of the zip
325            # namespace), relative to the package given when the resource
326            # reader was created, has a parent, then it's a name in a
327            # subdirectory and thus we skip it.
328            parent_name = relative.parent.name
329            if len(parent_name) == 0:
330                yield relative.name
331            elif parent_name not in subdirs_seen:
332                subdirs_seen.add(parent_name)
333                yield parent_name
334
335
336# Called from zipimport.c
337def _zipimport_get_resource_reader(zipimporter, fullname):
338    try:
339        if not zipimporter.is_package(fullname):
340            return None
341    except ZipImportError:
342        return None
343    return _ZipImportResourceReader(zipimporter, fullname)
344