• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import pathlib
3import tempfile
4import functools
5import contextlib
6import types
7import importlib
8
9from typing import Union, Any, Optional
10from .abc import ResourceReader, Traversable
11
12from ._adapters import wrap_spec
13
14Package = Union[types.ModuleType, str]
15
16
17def files(package):
18    # type: (Package) -> Traversable
19    """
20    Get a Traversable resource from a package
21    """
22    return from_package(get_package(package))
23
24
25def normalize_path(path):
26    # type: (Any) -> str
27    """Normalize a path by ensuring it is a string.
28
29    If the resulting string contains path separators, an exception is raised.
30    """
31    str_path = str(path)
32    parent, file_name = os.path.split(str_path)
33    if parent:
34        raise ValueError(f'{path!r} must be only a file name')
35    return file_name
36
37
38def get_resource_reader(package):
39    # type: (types.ModuleType) -> Optional[ResourceReader]
40    """
41    Return the package's loader if it's a ResourceReader.
42    """
43    # We can't use
44    # a issubclass() check here because apparently abc.'s __subclasscheck__()
45    # hook wants to create a weak reference to the object, but
46    # zipimport.zipimporter does not support weak references, resulting in a
47    # TypeError.  That seems terrible.
48    spec = package.__spec__
49    reader = getattr(spec.loader, 'get_resource_reader', None)  # type: ignore
50    if reader is None:
51        return None
52    return reader(spec.name)  # type: ignore
53
54
55def resolve(cand):
56    # type: (Package) -> types.ModuleType
57    return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
58
59
60def get_package(package):
61    # type: (Package) -> types.ModuleType
62    """Take a package name or module object and return the module.
63
64    Raise an exception if the resolved module is not a package.
65    """
66    resolved = resolve(package)
67    if wrap_spec(resolved).submodule_search_locations is None:
68        raise TypeError(f'{package!r} is not a package')
69    return resolved
70
71
72def from_package(package):
73    """
74    Return a Traversable object for the given package.
75
76    """
77    spec = wrap_spec(package)
78    reader = spec.loader.get_resource_reader(spec.name)
79    return reader.files()
80
81
82@contextlib.contextmanager
83def _tempfile(reader, suffix=''):
84    # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
85    # blocks due to the need to close the temporary file to work on Windows
86    # properly.
87    fd, raw_path = tempfile.mkstemp(suffix=suffix)
88    try:
89        os.write(fd, reader())
90        os.close(fd)
91        del reader
92        yield pathlib.Path(raw_path)
93    finally:
94        try:
95            os.remove(raw_path)
96        except FileNotFoundError:
97            pass
98
99
100@functools.singledispatch
101def as_file(path):
102    """
103    Given a Traversable object, return that object as a
104    path on the local file system in a context manager.
105    """
106    return _tempfile(path.read_bytes, suffix=path.name)
107
108
109@as_file.register(pathlib.Path)
110@contextlib.contextmanager
111def _(path):
112    """
113    Degenerate behavior for pathlib.Path objects.
114    """
115    yield path
116