• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2A Path-like interface for zipfiles.
3
4This codebase is shared between zipfile.Path in the stdlib
5and zipp in PyPI. See
6https://github.com/python/importlib_metadata/wiki/Development-Methodology
7for more detail.
8"""
9
10import io
11import posixpath
12import zipfile
13import itertools
14import contextlib
15import pathlib
16import re
17import stat
18import sys
19
20from .glob import Translator
21
22
23__all__ = ['Path']
24
25
26def _parents(path):
27    """
28    Given a path with elements separated by
29    posixpath.sep, generate all parents of that path.
30
31    >>> list(_parents('b/d'))
32    ['b']
33    >>> list(_parents('/b/d/'))
34    ['/b']
35    >>> list(_parents('b/d/f/'))
36    ['b/d', 'b']
37    >>> list(_parents('b'))
38    []
39    >>> list(_parents(''))
40    []
41    """
42    return itertools.islice(_ancestry(path), 1, None)
43
44
45def _ancestry(path):
46    """
47    Given a path with elements separated by
48    posixpath.sep, generate all elements of that path.
49
50    >>> list(_ancestry('b/d'))
51    ['b/d', 'b']
52    >>> list(_ancestry('/b/d/'))
53    ['/b/d', '/b']
54    >>> list(_ancestry('b/d/f/'))
55    ['b/d/f', 'b/d', 'b']
56    >>> list(_ancestry('b'))
57    ['b']
58    >>> list(_ancestry(''))
59    []
60
61    Multiple separators are treated like a single.
62
63    >>> list(_ancestry('//b//d///f//'))
64    ['//b//d///f', '//b//d', '//b']
65    """
66    path = path.rstrip(posixpath.sep)
67    while path.rstrip(posixpath.sep):
68        yield path
69        path, tail = posixpath.split(path)
70
71
72_dedupe = dict.fromkeys
73"""Deduplicate an iterable in original order"""
74
75
76def _difference(minuend, subtrahend):
77    """
78    Return items in minuend not in subtrahend, retaining order
79    with O(1) lookup.
80    """
81    return itertools.filterfalse(set(subtrahend).__contains__, minuend)
82
83
84class InitializedState:
85    """
86    Mix-in to save the initialization state for pickling.
87    """
88
89    def __init__(self, *args, **kwargs):
90        self.__args = args
91        self.__kwargs = kwargs
92        super().__init__(*args, **kwargs)
93
94    def __getstate__(self):
95        return self.__args, self.__kwargs
96
97    def __setstate__(self, state):
98        args, kwargs = state
99        super().__init__(*args, **kwargs)
100
101
102class CompleteDirs(InitializedState, zipfile.ZipFile):
103    """
104    A ZipFile subclass that ensures that implied directories
105    are always included in the namelist.
106
107    >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt']))
108    ['foo/', 'foo/bar/']
109    >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/']))
110    ['foo/']
111    """
112
113    @staticmethod
114    def _implied_dirs(names):
115        parents = itertools.chain.from_iterable(map(_parents, names))
116        as_dirs = (p + posixpath.sep for p in parents)
117        return _dedupe(_difference(as_dirs, names))
118
119    def namelist(self):
120        names = super().namelist()
121        return names + list(self._implied_dirs(names))
122
123    def _name_set(self):
124        return set(self.namelist())
125
126    def resolve_dir(self, name):
127        """
128        If the name represents a directory, return that name
129        as a directory (with the trailing slash).
130        """
131        names = self._name_set()
132        dirname = name + '/'
133        dir_match = name not in names and dirname in names
134        return dirname if dir_match else name
135
136    def getinfo(self, name):
137        """
138        Supplement getinfo for implied dirs.
139        """
140        try:
141            return super().getinfo(name)
142        except KeyError:
143            if not name.endswith('/') or name not in self._name_set():
144                raise
145            return zipfile.ZipInfo(filename=name)
146
147    @classmethod
148    def make(cls, source):
149        """
150        Given a source (filename or zipfile), return an
151        appropriate CompleteDirs subclass.
152        """
153        if isinstance(source, CompleteDirs):
154            return source
155
156        if not isinstance(source, zipfile.ZipFile):
157            return cls(source)
158
159        # Only allow for FastLookup when supplied zipfile is read-only
160        if 'r' not in source.mode:
161            cls = CompleteDirs
162
163        source.__class__ = cls
164        return source
165
166    @classmethod
167    def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile:
168        """
169        Given a writable zip file zf, inject directory entries for
170        any directories implied by the presence of children.
171        """
172        for name in cls._implied_dirs(zf.namelist()):
173            zf.writestr(name, b"")
174        return zf
175
176
177class FastLookup(CompleteDirs):
178    """
179    ZipFile subclass to ensure implicit
180    dirs exist and are resolved rapidly.
181    """
182
183    def namelist(self):
184        with contextlib.suppress(AttributeError):
185            return self.__names
186        self.__names = super().namelist()
187        return self.__names
188
189    def _name_set(self):
190        with contextlib.suppress(AttributeError):
191            return self.__lookup
192        self.__lookup = super()._name_set()
193        return self.__lookup
194
195
196def _extract_text_encoding(encoding=None, *args, **kwargs):
197    # compute stack level so that the caller of the caller sees any warning.
198    is_pypy = sys.implementation.name == 'pypy'
199    stack_level = 3 + is_pypy
200    return io.text_encoding(encoding, stack_level), args, kwargs
201
202
203class Path:
204    """
205    A :class:`importlib.resources.abc.Traversable` interface for zip files.
206
207    Implements many of the features users enjoy from
208    :class:`pathlib.Path`.
209
210    Consider a zip file with this structure::
211
212        .
213        ├── a.txt
214        └── b
215            ├── c.txt
216            └── d
217                └── e.txt
218
219    >>> data = io.BytesIO()
220    >>> zf = ZipFile(data, 'w')
221    >>> zf.writestr('a.txt', 'content of a')
222    >>> zf.writestr('b/c.txt', 'content of c')
223    >>> zf.writestr('b/d/e.txt', 'content of e')
224    >>> zf.filename = 'mem/abcde.zip'
225
226    Path accepts the zipfile object itself or a filename
227
228    >>> path = Path(zf)
229
230    From there, several path operations are available.
231
232    Directory iteration (including the zip file itself):
233
234    >>> a, b = path.iterdir()
235    >>> a
236    Path('mem/abcde.zip', 'a.txt')
237    >>> b
238    Path('mem/abcde.zip', 'b/')
239
240    name property:
241
242    >>> b.name
243    'b'
244
245    join with divide operator:
246
247    >>> c = b / 'c.txt'
248    >>> c
249    Path('mem/abcde.zip', 'b/c.txt')
250    >>> c.name
251    'c.txt'
252
253    Read text:
254
255    >>> c.read_text(encoding='utf-8')
256    'content of c'
257
258    existence:
259
260    >>> c.exists()
261    True
262    >>> (b / 'missing.txt').exists()
263    False
264
265    Coercion to string:
266
267    >>> import os
268    >>> str(c).replace(os.sep, posixpath.sep)
269    'mem/abcde.zip/b/c.txt'
270
271    At the root, ``name``, ``filename``, and ``parent``
272    resolve to the zipfile.
273
274    >>> str(path)
275    'mem/abcde.zip/'
276    >>> path.name
277    'abcde.zip'
278    >>> path.filename == pathlib.Path('mem/abcde.zip')
279    True
280    >>> str(path.parent)
281    'mem'
282
283    If the zipfile has no filename, such attributes are not
284    valid and accessing them will raise an Exception.
285
286    >>> zf.filename = None
287    >>> path.name
288    Traceback (most recent call last):
289    ...
290    TypeError: ...
291
292    >>> path.filename
293    Traceback (most recent call last):
294    ...
295    TypeError: ...
296
297    >>> path.parent
298    Traceback (most recent call last):
299    ...
300    TypeError: ...
301
302    # workaround python/cpython#106763
303    >>> pass
304    """
305
306    __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
307
308    def __init__(self, root, at=""):
309        """
310        Construct a Path from a ZipFile or filename.
311
312        Note: When the source is an existing ZipFile object,
313        its type (__class__) will be mutated to a
314        specialized type. If the caller wishes to retain the
315        original type, the caller should either create a
316        separate ZipFile object or pass a filename.
317        """
318        self.root = FastLookup.make(root)
319        self.at = at
320
321    def __eq__(self, other):
322        """
323        >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
324        False
325        """
326        if self.__class__ is not other.__class__:
327            return NotImplemented
328        return (self.root, self.at) == (other.root, other.at)
329
330    def __hash__(self):
331        return hash((self.root, self.at))
332
333    def open(self, mode='r', *args, pwd=None, **kwargs):
334        """
335        Open this entry as text or binary following the semantics
336        of ``pathlib.Path.open()`` by passing arguments through
337        to io.TextIOWrapper().
338        """
339        if self.is_dir():
340            raise IsADirectoryError(self)
341        zip_mode = mode[0]
342        if zip_mode == 'r' and not self.exists():
343            raise FileNotFoundError(self)
344        stream = self.root.open(self.at, zip_mode, pwd=pwd)
345        if 'b' in mode:
346            if args or kwargs:
347                raise ValueError("encoding args invalid for binary operation")
348            return stream
349        # Text mode:
350        encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
351        return io.TextIOWrapper(stream, encoding, *args, **kwargs)
352
353    def _base(self):
354        return pathlib.PurePosixPath(self.at or self.root.filename)
355
356    @property
357    def name(self):
358        return self._base().name
359
360    @property
361    def suffix(self):
362        return self._base().suffix
363
364    @property
365    def suffixes(self):
366        return self._base().suffixes
367
368    @property
369    def stem(self):
370        return self._base().stem
371
372    @property
373    def filename(self):
374        return pathlib.Path(self.root.filename).joinpath(self.at)
375
376    def read_text(self, *args, **kwargs):
377        encoding, args, kwargs = _extract_text_encoding(*args, **kwargs)
378        with self.open('r', encoding, *args, **kwargs) as strm:
379            return strm.read()
380
381    def read_bytes(self):
382        with self.open('rb') as strm:
383            return strm.read()
384
385    def _is_child(self, path):
386        return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/")
387
388    def _next(self, at):
389        return self.__class__(self.root, at)
390
391    def is_dir(self):
392        return not self.at or self.at.endswith("/")
393
394    def is_file(self):
395        return self.exists() and not self.is_dir()
396
397    def exists(self):
398        return self.at in self.root._name_set()
399
400    def iterdir(self):
401        if not self.is_dir():
402            raise ValueError("Can't listdir a file")
403        subs = map(self._next, self.root.namelist())
404        return filter(self._is_child, subs)
405
406    def match(self, path_pattern):
407        return pathlib.PurePosixPath(self.at).match(path_pattern)
408
409    def is_symlink(self):
410        """
411        Return whether this path is a symlink.
412        """
413        info = self.root.getinfo(self.at)
414        mode = info.external_attr >> 16
415        return stat.S_ISLNK(mode)
416
417    def glob(self, pattern):
418        if not pattern:
419            raise ValueError(f"Unacceptable pattern: {pattern!r}")
420
421        prefix = re.escape(self.at)
422        tr = Translator(seps='/')
423        matches = re.compile(prefix + tr.translate(pattern)).fullmatch
424        return map(self._next, filter(matches, self.root.namelist()))
425
426    def rglob(self, pattern):
427        return self.glob(f'**/{pattern}')
428
429    def relative_to(self, other, *extra):
430        return posixpath.relpath(str(self), str(other.joinpath(*extra)))
431
432    def __str__(self):
433        return posixpath.join(self.root.filename, self.at)
434
435    def __repr__(self):
436        return self.__repr.format(self=self)
437
438    def joinpath(self, *other):
439        next = posixpath.join(self.at, *other)
440        return self._next(self.root.resolve_dir(next))
441
442    __truediv__ = joinpath
443
444    @property
445    def parent(self):
446        if not self.at:
447            return self.filename.parent
448        parent_at = posixpath.dirname(self.at.rstrip('/'))
449        if parent_at:
450            parent_at += '/'
451        return self._next(parent_at)
452