1import collections 2import zipfile 3import pathlib 4from . import abc 5 6 7def remove_duplicates(items): 8 return iter(collections.OrderedDict.fromkeys(items)) 9 10 11class FileReader(abc.TraversableResources): 12 def __init__(self, loader): 13 self.path = pathlib.Path(loader.path).parent 14 15 def resource_path(self, resource): 16 """ 17 Return the file system path to prevent 18 `resources.path()` from creating a temporary 19 copy. 20 """ 21 return str(self.path.joinpath(resource)) 22 23 def files(self): 24 return self.path 25 26 27class ZipReader(abc.TraversableResources): 28 def __init__(self, loader, module): 29 _, _, name = module.rpartition('.') 30 self.prefix = loader.prefix.replace('\\', '/') + name + '/' 31 self.archive = loader.archive 32 33 def open_resource(self, resource): 34 try: 35 return super().open_resource(resource) 36 except KeyError as exc: 37 raise FileNotFoundError(exc.args[0]) 38 39 def is_resource(self, path): 40 # workaround for `zipfile.Path.is_file` returning true 41 # for non-existent paths. 42 target = self.files().joinpath(path) 43 return target.is_file() and target.exists() 44 45 def files(self): 46 return zipfile.Path(self.archive, self.prefix) 47 48 49class MultiplexedPath(abc.Traversable): 50 """ 51 Given a series of Traversable objects, implement a merged 52 version of the interface across all objects. Useful for 53 namespace packages which may be multihomed at a single 54 name. 55 """ 56 57 def __init__(self, *paths): 58 self._paths = list(map(pathlib.Path, remove_duplicates(paths))) 59 if not self._paths: 60 message = 'MultiplexedPath must contain at least one path' 61 raise FileNotFoundError(message) 62 if not all(path.is_dir() for path in self._paths): 63 raise NotADirectoryError('MultiplexedPath only supports directories') 64 65 def iterdir(self): 66 visited = [] 67 for path in self._paths: 68 for file in path.iterdir(): 69 if file.name in visited: 70 continue 71 visited.append(file.name) 72 yield file 73 74 def read_bytes(self): 75 raise FileNotFoundError(f'{self} is not a file') 76 77 def read_text(self, *args, **kwargs): 78 raise FileNotFoundError(f'{self} is not a file') 79 80 def is_dir(self): 81 return True 82 83 def is_file(self): 84 return False 85 86 def joinpath(self, child): 87 # first try to find child in current paths 88 for file in self.iterdir(): 89 if file.name == child: 90 return file 91 # if it does not exist, construct it with the first path 92 return self._paths[0] / child 93 94 __truediv__ = joinpath 95 96 def open(self, *args, **kwargs): 97 raise FileNotFoundError(f'{self} is not a file') 98 99 @property 100 def name(self): 101 return self._paths[0].name 102 103 def __repr__(self): 104 paths = ', '.join(f"'{path}'" for path in self._paths) 105 return f'MultiplexedPath({paths})' 106 107 108class NamespaceReader(abc.TraversableResources): 109 def __init__(self, namespace_path): 110 if 'NamespacePath' not in str(namespace_path): 111 raise ValueError('Invalid path') 112 self.path = MultiplexedPath(*list(namespace_path)) 113 114 def resource_path(self, resource): 115 """ 116 Return the file system path to prevent 117 `resources.path()` from creating a temporary 118 copy. 119 """ 120 return str(self.path.joinpath(resource)) 121 122 def files(self): 123 return self.path 124