• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from contextlib import suppress
2from io import TextIOWrapper
3
4from . import abc
5
6
7class SpecLoaderAdapter:
8    """
9    Adapt a package spec to adapt the underlying loader.
10    """
11
12    def __init__(self, spec, adapter=lambda spec: spec.loader):
13        self.spec = spec
14        self.loader = adapter(spec)
15
16    def __getattr__(self, name):
17        return getattr(self.spec, name)
18
19
20class TraversableResourcesLoader:
21    """
22    Adapt a loader to provide TraversableResources.
23    """
24
25    def __init__(self, spec):
26        self.spec = spec
27
28    def get_resource_reader(self, name):
29        return CompatibilityFiles(self.spec)._native()
30
31
32def _io_wrapper(file, mode='r', *args, **kwargs):
33    if mode == 'r':
34        return TextIOWrapper(file, *args, **kwargs)
35    elif mode == 'rb':
36        return file
37    raise ValueError(f"Invalid mode value '{mode}', only 'r' and 'rb' are supported")
38
39
40class CompatibilityFiles:
41    """
42    Adapter for an existing or non-existent resource reader
43    to provide a compatibility .files().
44    """
45
46    class SpecPath(abc.Traversable):
47        """
48        Path tied to a module spec.
49        Can be read and exposes the resource reader children.
50        """
51
52        def __init__(self, spec, reader):
53            self._spec = spec
54            self._reader = reader
55
56        def iterdir(self):
57            if not self._reader:
58                return iter(())
59            return iter(
60                CompatibilityFiles.ChildPath(self._reader, path)
61                for path in self._reader.contents()
62            )
63
64        def is_file(self):
65            return False
66
67        is_dir = is_file
68
69        def joinpath(self, other):
70            if not self._reader:
71                return CompatibilityFiles.OrphanPath(other)
72            return CompatibilityFiles.ChildPath(self._reader, other)
73
74        @property
75        def name(self):
76            return self._spec.name
77
78        def open(self, mode='r', *args, **kwargs):
79            return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
80
81    class ChildPath(abc.Traversable):
82        """
83        Path tied to a resource reader child.
84        Can be read but doesn't expose any meaningful children.
85        """
86
87        def __init__(self, reader, name):
88            self._reader = reader
89            self._name = name
90
91        def iterdir(self):
92            return iter(())
93
94        def is_file(self):
95            return self._reader.is_resource(self.name)
96
97        def is_dir(self):
98            return not self.is_file()
99
100        def joinpath(self, other):
101            return CompatibilityFiles.OrphanPath(self.name, other)
102
103        @property
104        def name(self):
105            return self._name
106
107        def open(self, mode='r', *args, **kwargs):
108            return _io_wrapper(
109                self._reader.open_resource(self.name), mode, *args, **kwargs
110            )
111
112    class OrphanPath(abc.Traversable):
113        """
114        Orphan path, not tied to a module spec or resource reader.
115        Can't be read and doesn't expose any meaningful children.
116        """
117
118        def __init__(self, *path_parts):
119            if len(path_parts) < 1:
120                raise ValueError('Need at least one path part to construct a path')
121            self._path = path_parts
122
123        def iterdir(self):
124            return iter(())
125
126        def is_file(self):
127            return False
128
129        is_dir = is_file
130
131        def joinpath(self, other):
132            return CompatibilityFiles.OrphanPath(*self._path, other)
133
134        @property
135        def name(self):
136            return self._path[-1]
137
138        def open(self, mode='r', *args, **kwargs):
139            raise FileNotFoundError("Can't open orphan path")
140
141    def __init__(self, spec):
142        self.spec = spec
143
144    @property
145    def _reader(self):
146        with suppress(AttributeError):
147            return self.spec.loader.get_resource_reader(self.spec.name)
148
149    def _native(self):
150        """
151        Return the native reader if it supports files().
152        """
153        reader = self._reader
154        return reader if hasattr(reader, 'files') else self
155
156    def __getattr__(self, attr):
157        return getattr(self._reader, attr)
158
159    def files(self):
160        return CompatibilityFiles.SpecPath(self.spec, self._reader)
161
162
163def wrap_spec(package):
164    """
165    Construct a package spec with traversable compatibility
166    on the spec/loader/reader.
167    """
168    return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)
169