• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import collections
2import zipfile
3import pathlib
4from . import abc
5
6
7def remove_duplicates(items):
8    return iter(collections.OrderedDict.fromkeys(items))
9
10
11class FileReader(abc.TraversableResources):
12    def __init__(self, loader):
13        self.path = pathlib.Path(loader.path).parent
14
15    def resource_path(self, resource):
16        """
17        Return the file system path to prevent
18        `resources.path()` from creating a temporary
19        copy.
20        """
21        return str(self.path.joinpath(resource))
22
23    def files(self):
24        return self.path
25
26
27class ZipReader(abc.TraversableResources):
28    def __init__(self, loader, module):
29        _, _, name = module.rpartition('.')
30        self.prefix = loader.prefix.replace('\\', '/') + name + '/'
31        self.archive = loader.archive
32
33    def open_resource(self, resource):
34        try:
35            return super().open_resource(resource)
36        except KeyError as exc:
37            raise FileNotFoundError(exc.args[0])
38
39    def is_resource(self, path):
40        # workaround for `zipfile.Path.is_file` returning true
41        # for non-existent paths.
42        target = self.files().joinpath(path)
43        return target.is_file() and target.exists()
44
45    def files(self):
46        return zipfile.Path(self.archive, self.prefix)
47
48
49class MultiplexedPath(abc.Traversable):
50    """
51    Given a series of Traversable objects, implement a merged
52    version of the interface across all objects. Useful for
53    namespace packages which may be multihomed at a single
54    name.
55    """
56
57    def __init__(self, *paths):
58        self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
59        if not self._paths:
60            message = 'MultiplexedPath must contain at least one path'
61            raise FileNotFoundError(message)
62        if not all(path.is_dir() for path in self._paths):
63            raise NotADirectoryError('MultiplexedPath only supports directories')
64
65    def iterdir(self):
66        visited = []
67        for path in self._paths:
68            for file in path.iterdir():
69                if file.name in visited:
70                    continue
71                visited.append(file.name)
72                yield file
73
74    def read_bytes(self):
75        raise FileNotFoundError(f'{self} is not a file')
76
77    def read_text(self, *args, **kwargs):
78        raise FileNotFoundError(f'{self} is not a file')
79
80    def is_dir(self):
81        return True
82
83    def is_file(self):
84        return False
85
86    def joinpath(self, child):
87        # first try to find child in current paths
88        for file in self.iterdir():
89            if file.name == child:
90                return file
91        # if it does not exist, construct it with the first path
92        return self._paths[0] / child
93
94    __truediv__ = joinpath
95
96    def open(self, *args, **kwargs):
97        raise FileNotFoundError(f'{self} is not a file')
98
99    @property
100    def name(self):
101        return self._paths[0].name
102
103    def __repr__(self):
104        paths = ', '.join(f"'{path}'" for path in self._paths)
105        return f'MultiplexedPath({paths})'
106
107
108class NamespaceReader(abc.TraversableResources):
109    def __init__(self, namespace_path):
110        if 'NamespacePath' not in str(namespace_path):
111            raise ValueError('Invalid path')
112        self.path = MultiplexedPath(*list(namespace_path))
113
114    def resource_path(self, resource):
115        """
116        Return the file system path to prevent
117        `resources.path()` from creating a temporary
118        copy.
119        """
120        return str(self.path.joinpath(resource))
121
122    def files(self):
123        return self.path
124