1from contextlib import suppress 2from io import TextIOWrapper 3 4from . import abc 5 6 7class SpecLoaderAdapter: 8 """ 9 Adapt a package spec to adapt the underlying loader. 10 """ 11 12 def __init__(self, spec, adapter=lambda spec: spec.loader): 13 self.spec = spec 14 self.loader = adapter(spec) 15 16 def __getattr__(self, name): 17 return getattr(self.spec, name) 18 19 20class TraversableResourcesLoader: 21 """ 22 Adapt a loader to provide TraversableResources. 23 """ 24 25 def __init__(self, spec): 26 self.spec = spec 27 28 def get_resource_reader(self, name): 29 return CompatibilityFiles(self.spec)._native() 30 31 32def _io_wrapper(file, mode='r', *args, **kwargs): 33 if mode == 'r': 34 return TextIOWrapper(file, *args, **kwargs) 35 elif mode == 'rb': 36 return file 37 raise ValueError(f"Invalid mode value '{mode}', only 'r' and 'rb' are supported") 38 39 40class CompatibilityFiles: 41 """ 42 Adapter for an existing or non-existent resource reader 43 to provide a compatibility .files(). 44 """ 45 46 class SpecPath(abc.Traversable): 47 """ 48 Path tied to a module spec. 49 Can be read and exposes the resource reader children. 50 """ 51 52 def __init__(self, spec, reader): 53 self._spec = spec 54 self._reader = reader 55 56 def iterdir(self): 57 if not self._reader: 58 return iter(()) 59 return iter( 60 CompatibilityFiles.ChildPath(self._reader, path) 61 for path in self._reader.contents() 62 ) 63 64 def is_file(self): 65 return False 66 67 is_dir = is_file 68 69 def joinpath(self, other): 70 if not self._reader: 71 return CompatibilityFiles.OrphanPath(other) 72 return CompatibilityFiles.ChildPath(self._reader, other) 73 74 @property 75 def name(self): 76 return self._spec.name 77 78 def open(self, mode='r', *args, **kwargs): 79 return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs) 80 81 class ChildPath(abc.Traversable): 82 """ 83 Path tied to a resource reader child. 84 Can be read but doesn't expose any meaningful children. 85 """ 86 87 def __init__(self, reader, name): 88 self._reader = reader 89 self._name = name 90 91 def iterdir(self): 92 return iter(()) 93 94 def is_file(self): 95 return self._reader.is_resource(self.name) 96 97 def is_dir(self): 98 return not self.is_file() 99 100 def joinpath(self, other): 101 return CompatibilityFiles.OrphanPath(self.name, other) 102 103 @property 104 def name(self): 105 return self._name 106 107 def open(self, mode='r', *args, **kwargs): 108 return _io_wrapper( 109 self._reader.open_resource(self.name), mode, *args, **kwargs 110 ) 111 112 class OrphanPath(abc.Traversable): 113 """ 114 Orphan path, not tied to a module spec or resource reader. 115 Can't be read and doesn't expose any meaningful children. 116 """ 117 118 def __init__(self, *path_parts): 119 if len(path_parts) < 1: 120 raise ValueError('Need at least one path part to construct a path') 121 self._path = path_parts 122 123 def iterdir(self): 124 return iter(()) 125 126 def is_file(self): 127 return False 128 129 is_dir = is_file 130 131 def joinpath(self, other): 132 return CompatibilityFiles.OrphanPath(*self._path, other) 133 134 @property 135 def name(self): 136 return self._path[-1] 137 138 def open(self, mode='r', *args, **kwargs): 139 raise FileNotFoundError("Can't open orphan path") 140 141 def __init__(self, spec): 142 self.spec = spec 143 144 @property 145 def _reader(self): 146 with suppress(AttributeError): 147 return self.spec.loader.get_resource_reader(self.spec.name) 148 149 def _native(self): 150 """ 151 Return the native reader if it supports files(). 152 """ 153 reader = self._reader 154 return reader if hasattr(reader, 'files') else self 155 156 def __getattr__(self, attr): 157 return getattr(self._reader, attr) 158 159 def files(self): 160 return CompatibilityFiles.SpecPath(self.spec, self._reader) 161 162 163def wrap_spec(package): 164 """ 165 Construct a package spec with traversable compatibility 166 on the spec/loader/reader. 167 """ 168 return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader) 169