• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import io
2import itertools
3import contextlib
4import pathlib
5import pickle
6import stat
7import sys
8import time
9import unittest
10import zipfile
11import zipfile._path
12
13from test.support.os_helper import temp_dir, FakePath
14
15from ._functools import compose
16from ._itertools import Counter
17
18from ._test_params import parameterize, Invoked
19
20
21class jaraco:
22    class itertools:
23        Counter = Counter
24
25
26def _make_link(info: zipfile.ZipInfo):  # type: ignore[name-defined]
27    info.external_attr |= stat.S_IFLNK << 16
28
29
30def build_alpharep_fixture():
31    """
32    Create a zip file with this structure:
33
34    .
35    ├── a.txt
36    ├── n.txt (-> a.txt)
37    ├── b
38    │   ├── c.txt
39    │   ├── d
40    │   │   └── e.txt
41    │   └── f.txt
42    ├── g
43    │   └── h
44    │       └── i.txt
45    └── j
46        ├── k.bin
47        ├── l.baz
48        └── m.bar
49
50    This fixture has the following key characteristics:
51
52    - a file at the root (a)
53    - a file two levels deep (b/d/e)
54    - multiple files in a directory (b/c, b/f)
55    - a directory containing only a directory (g/h)
56    - a directory with files of different extensions (j/klm)
57    - a symlink (n) pointing to (a)
58
59    "alpha" because it uses alphabet
60    "rep" because it's a representative example
61    """
62    data = io.BytesIO()
63    zf = zipfile.ZipFile(data, "w")
64    zf.writestr("a.txt", b"content of a")
65    zf.writestr("b/c.txt", b"content of c")
66    zf.writestr("b/d/e.txt", b"content of e")
67    zf.writestr("b/f.txt", b"content of f")
68    zf.writestr("g/h/i.txt", b"content of i")
69    zf.writestr("j/k.bin", b"content of k")
70    zf.writestr("j/l.baz", b"content of l")
71    zf.writestr("j/m.bar", b"content of m")
72    zf.writestr("n.txt", b"a.txt")
73    _make_link(zf.infolist()[-1])
74
75    zf.filename = "alpharep.zip"
76    return zf
77
78
79alpharep_generators = [
80    Invoked.wrap(build_alpharep_fixture),
81    Invoked.wrap(compose(zipfile._path.CompleteDirs.inject, build_alpharep_fixture)),
82]
83
84pass_alpharep = parameterize(['alpharep'], alpharep_generators)
85
86
87class TestPath(unittest.TestCase):
88    def setUp(self):
89        self.fixtures = contextlib.ExitStack()
90        self.addCleanup(self.fixtures.close)
91
92    def zipfile_ondisk(self, alpharep):
93        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
94        buffer = alpharep.fp
95        alpharep.close()
96        path = tmpdir / alpharep.filename
97        with path.open("wb") as strm:
98            strm.write(buffer.getvalue())
99        return path
100
101    @pass_alpharep
102    def test_iterdir_and_types(self, alpharep):
103        root = zipfile.Path(alpharep)
104        assert root.is_dir()
105        a, n, b, g, j = root.iterdir()
106        assert a.is_file()
107        assert b.is_dir()
108        assert g.is_dir()
109        c, f, d = b.iterdir()
110        assert c.is_file() and f.is_file()
111        (e,) = d.iterdir()
112        assert e.is_file()
113        (h,) = g.iterdir()
114        (i,) = h.iterdir()
115        assert i.is_file()
116
117    @pass_alpharep
118    def test_is_file_missing(self, alpharep):
119        root = zipfile.Path(alpharep)
120        assert not root.joinpath('missing.txt').is_file()
121
122    @pass_alpharep
123    def test_iterdir_on_file(self, alpharep):
124        root = zipfile.Path(alpharep)
125        a, n, b, g, j = root.iterdir()
126        with self.assertRaises(ValueError):
127            a.iterdir()
128
129    @pass_alpharep
130    def test_subdir_is_dir(self, alpharep):
131        root = zipfile.Path(alpharep)
132        assert (root / 'b').is_dir()
133        assert (root / 'b/').is_dir()
134        assert (root / 'g').is_dir()
135        assert (root / 'g/').is_dir()
136
137    @pass_alpharep
138    def test_open(self, alpharep):
139        root = zipfile.Path(alpharep)
140        a, n, b, g, j = root.iterdir()
141        with a.open(encoding="utf-8") as strm:
142            data = strm.read()
143        self.assertEqual(data, "content of a")
144        with a.open('r', "utf-8") as strm:  # not a kw, no gh-101144 TypeError
145            data = strm.read()
146        self.assertEqual(data, "content of a")
147
148    def test_open_encoding_utf16(self):
149        in_memory_file = io.BytesIO()
150        zf = zipfile.ZipFile(in_memory_file, "w")
151        zf.writestr("path/16.txt", "This was utf-16".encode("utf-16"))
152        zf.filename = "test_open_utf16.zip"
153        root = zipfile.Path(zf)
154        (path,) = root.iterdir()
155        u16 = path.joinpath("16.txt")
156        with u16.open('r', "utf-16") as strm:
157            data = strm.read()
158        assert data == "This was utf-16"
159        with u16.open(encoding="utf-16") as strm:
160            data = strm.read()
161        assert data == "This was utf-16"
162
163    def test_open_encoding_errors(self):
164        in_memory_file = io.BytesIO()
165        zf = zipfile.ZipFile(in_memory_file, "w")
166        zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.")
167        zf.filename = "test_read_text_encoding_errors.zip"
168        root = zipfile.Path(zf)
169        (path,) = root.iterdir()
170        u16 = path.joinpath("bad-utf8.bin")
171
172        # encoding= as a positional argument for gh-101144.
173        data = u16.read_text("utf-8", errors="ignore")
174        assert data == "invalid utf-8: ."
175        with u16.open("r", "utf-8", errors="surrogateescape") as f:
176            assert f.read() == "invalid utf-8: \udcff\udcff."
177
178        # encoding= both positional and keyword is an error; gh-101144.
179        with self.assertRaisesRegex(TypeError, "encoding"):
180            data = u16.read_text("utf-8", encoding="utf-8")
181
182        # both keyword arguments work.
183        with u16.open("r", encoding="utf-8", errors="strict") as f:
184            # error during decoding with wrong codec.
185            with self.assertRaises(UnicodeDecodeError):
186                f.read()
187
188    @unittest.skipIf(
189        not getattr(sys.flags, 'warn_default_encoding', 0),
190        "Requires warn_default_encoding",
191    )
192    @pass_alpharep
193    def test_encoding_warnings(self, alpharep):
194        """EncodingWarning must blame the read_text and open calls."""
195        assert sys.flags.warn_default_encoding
196        root = zipfile.Path(alpharep)
197        with self.assertWarns(EncodingWarning) as wc:
198            root.joinpath("a.txt").read_text()
199        assert __file__ == wc.filename
200        with self.assertWarns(EncodingWarning) as wc:
201            root.joinpath("a.txt").open("r").close()
202        assert __file__ == wc.filename
203
204    def test_open_write(self):
205        """
206        If the zipfile is open for write, it should be possible to
207        write bytes or text to it.
208        """
209        zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
210        with zf.joinpath('file.bin').open('wb') as strm:
211            strm.write(b'binary contents')
212        with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
213            strm.write('text file')
214
215    @pass_alpharep
216    def test_open_extant_directory(self, alpharep):
217        """
218        Attempting to open a directory raises IsADirectoryError.
219        """
220        zf = zipfile.Path(alpharep)
221        with self.assertRaises(IsADirectoryError):
222            zf.joinpath('b').open()
223
224    @pass_alpharep
225    def test_open_binary_invalid_args(self, alpharep):
226        root = zipfile.Path(alpharep)
227        with self.assertRaises(ValueError):
228            root.joinpath('a.txt').open('rb', encoding='utf-8')
229        with self.assertRaises(ValueError):
230            root.joinpath('a.txt').open('rb', 'utf-8')
231
232    @pass_alpharep
233    def test_open_missing_directory(self, alpharep):
234        """
235        Attempting to open a missing directory raises FileNotFoundError.
236        """
237        zf = zipfile.Path(alpharep)
238        with self.assertRaises(FileNotFoundError):
239            zf.joinpath('z').open()
240
241    @pass_alpharep
242    def test_read(self, alpharep):
243        root = zipfile.Path(alpharep)
244        a, n, b, g, j = root.iterdir()
245        assert a.read_text(encoding="utf-8") == "content of a"
246        # Also check positional encoding arg (gh-101144).
247        assert a.read_text("utf-8") == "content of a"
248        assert a.read_bytes() == b"content of a"
249
250    @pass_alpharep
251    def test_joinpath(self, alpharep):
252        root = zipfile.Path(alpharep)
253        a = root.joinpath("a.txt")
254        assert a.is_file()
255        e = root.joinpath("b").joinpath("d").joinpath("e.txt")
256        assert e.read_text(encoding="utf-8") == "content of e"
257
258    @pass_alpharep
259    def test_joinpath_multiple(self, alpharep):
260        root = zipfile.Path(alpharep)
261        e = root.joinpath("b", "d", "e.txt")
262        assert e.read_text(encoding="utf-8") == "content of e"
263
264    @pass_alpharep
265    def test_traverse_truediv(self, alpharep):
266        root = zipfile.Path(alpharep)
267        a = root / "a.txt"
268        assert a.is_file()
269        e = root / "b" / "d" / "e.txt"
270        assert e.read_text(encoding="utf-8") == "content of e"
271
272    @pass_alpharep
273    def test_pathlike_construction(self, alpharep):
274        """
275        zipfile.Path should be constructable from a path-like object
276        """
277        zipfile_ondisk = self.zipfile_ondisk(alpharep)
278        pathlike = FakePath(str(zipfile_ondisk))
279        zipfile.Path(pathlike)
280
281    @pass_alpharep
282    def test_traverse_pathlike(self, alpharep):
283        root = zipfile.Path(alpharep)
284        root / FakePath("a")
285
286    @pass_alpharep
287    def test_parent(self, alpharep):
288        root = zipfile.Path(alpharep)
289        assert (root / 'a').parent.at == ''
290        assert (root / 'a' / 'b').parent.at == 'a/'
291
292    @pass_alpharep
293    def test_dir_parent(self, alpharep):
294        root = zipfile.Path(alpharep)
295        assert (root / 'b').parent.at == ''
296        assert (root / 'b/').parent.at == ''
297
298    @pass_alpharep
299    def test_missing_dir_parent(self, alpharep):
300        root = zipfile.Path(alpharep)
301        assert (root / 'missing dir/').parent.at == ''
302
303    @pass_alpharep
304    def test_mutability(self, alpharep):
305        """
306        If the underlying zipfile is changed, the Path object should
307        reflect that change.
308        """
309        root = zipfile.Path(alpharep)
310        a, n, b, g, j = root.iterdir()
311        alpharep.writestr('foo.txt', 'foo')
312        alpharep.writestr('bar/baz.txt', 'baz')
313        assert any(child.name == 'foo.txt' for child in root.iterdir())
314        assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
315        (baz,) = (root / 'bar').iterdir()
316        assert baz.read_text(encoding="utf-8") == 'baz'
317
318    HUGE_ZIPFILE_NUM_ENTRIES = 2**13
319
320    def huge_zipfile(self):
321        """Create a read-only zipfile with a huge number of entries entries."""
322        strm = io.BytesIO()
323        zf = zipfile.ZipFile(strm, "w")
324        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
325            zf.writestr(entry, entry)
326        zf.mode = 'r'
327        return zf
328
329    def test_joinpath_constant_time(self):
330        """
331        Ensure joinpath on items in zipfile is linear time.
332        """
333        root = zipfile.Path(self.huge_zipfile())
334        entries = jaraco.itertools.Counter(root.iterdir())
335        for entry in entries:
336            entry.joinpath('suffix')
337        # Check the file iterated all items
338        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
339
340    @pass_alpharep
341    def test_read_does_not_close(self, alpharep):
342        alpharep = self.zipfile_ondisk(alpharep)
343        with zipfile.ZipFile(alpharep) as file:
344            for rep in range(2):
345                zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
346
347    @pass_alpharep
348    def test_subclass(self, alpharep):
349        class Subclass(zipfile.Path):
350            pass
351
352        root = Subclass(alpharep)
353        assert isinstance(root / 'b', Subclass)
354
355    @pass_alpharep
356    def test_filename(self, alpharep):
357        root = zipfile.Path(alpharep)
358        assert root.filename == pathlib.Path('alpharep.zip')
359
360    @pass_alpharep
361    def test_root_name(self, alpharep):
362        """
363        The name of the root should be the name of the zipfile
364        """
365        root = zipfile.Path(alpharep)
366        assert root.name == 'alpharep.zip' == root.filename.name
367
368    @pass_alpharep
369    def test_suffix(self, alpharep):
370        """
371        The suffix of the root should be the suffix of the zipfile.
372        The suffix of each nested file is the final component's last suffix, if any.
373        Includes the leading period, just like pathlib.Path.
374        """
375        root = zipfile.Path(alpharep)
376        assert root.suffix == '.zip' == root.filename.suffix
377
378        b = root / "b.txt"
379        assert b.suffix == ".txt"
380
381        c = root / "c" / "filename.tar.gz"
382        assert c.suffix == ".gz"
383
384        d = root / "d"
385        assert d.suffix == ""
386
387    @pass_alpharep
388    def test_suffixes(self, alpharep):
389        """
390        The suffix of the root should be the suffix of the zipfile.
391        The suffix of each nested file is the final component's last suffix, if any.
392        Includes the leading period, just like pathlib.Path.
393        """
394        root = zipfile.Path(alpharep)
395        assert root.suffixes == ['.zip'] == root.filename.suffixes
396
397        b = root / 'b.txt'
398        assert b.suffixes == ['.txt']
399
400        c = root / 'c' / 'filename.tar.gz'
401        assert c.suffixes == ['.tar', '.gz']
402
403        d = root / 'd'
404        assert d.suffixes == []
405
406        e = root / '.hgrc'
407        assert e.suffixes == []
408
409    @pass_alpharep
410    def test_suffix_no_filename(self, alpharep):
411        alpharep.filename = None
412        root = zipfile.Path(alpharep)
413        assert root.joinpath('example').suffix == ""
414        assert root.joinpath('example').suffixes == []
415
416    @pass_alpharep
417    def test_stem(self, alpharep):
418        """
419        The final path component, without its suffix
420        """
421        root = zipfile.Path(alpharep)
422        assert root.stem == 'alpharep' == root.filename.stem
423
424        b = root / "b.txt"
425        assert b.stem == "b"
426
427        c = root / "c" / "filename.tar.gz"
428        assert c.stem == "filename.tar"
429
430        d = root / "d"
431        assert d.stem == "d"
432
433        assert (root / ".gitignore").stem == ".gitignore"
434
435    @pass_alpharep
436    def test_root_parent(self, alpharep):
437        root = zipfile.Path(alpharep)
438        assert root.parent == pathlib.Path('.')
439        root.root.filename = 'foo/bar.zip'
440        assert root.parent == pathlib.Path('foo')
441
442    @pass_alpharep
443    def test_root_unnamed(self, alpharep):
444        """
445        It is an error to attempt to get the name
446        or parent of an unnamed zipfile.
447        """
448        alpharep.filename = None
449        root = zipfile.Path(alpharep)
450        with self.assertRaises(TypeError):
451            root.name
452        with self.assertRaises(TypeError):
453            root.parent
454
455        # .name and .parent should still work on subs
456        sub = root / "b"
457        assert sub.name == "b"
458        assert sub.parent
459
460    @pass_alpharep
461    def test_match_and_glob(self, alpharep):
462        root = zipfile.Path(alpharep)
463        assert not root.match("*.txt")
464
465        assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]
466        assert list(root.glob("b/*.txt")) == [
467            zipfile.Path(alpharep, "b/c.txt"),
468            zipfile.Path(alpharep, "b/f.txt"),
469        ]
470
471    @pass_alpharep
472    def test_glob_recursive(self, alpharep):
473        root = zipfile.Path(alpharep)
474        files = root.glob("**/*.txt")
475        assert all(each.match("*.txt") for each in files)
476
477        assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))
478
479    @pass_alpharep
480    def test_glob_dirs(self, alpharep):
481        root = zipfile.Path(alpharep)
482        assert list(root.glob('b')) == [zipfile.Path(alpharep, "b/")]
483        assert list(root.glob('b*')) == [zipfile.Path(alpharep, "b/")]
484
485    @pass_alpharep
486    def test_glob_subdir(self, alpharep):
487        root = zipfile.Path(alpharep)
488        assert list(root.glob('g/h')) == [zipfile.Path(alpharep, "g/h/")]
489        assert list(root.glob('g*/h*')) == [zipfile.Path(alpharep, "g/h/")]
490
491    @pass_alpharep
492    def test_glob_subdirs(self, alpharep):
493        root = zipfile.Path(alpharep)
494
495        assert list(root.glob("*/i.txt")) == []
496        assert list(root.rglob("*/i.txt")) == [zipfile.Path(alpharep, "g/h/i.txt")]
497
498    @pass_alpharep
499    def test_glob_does_not_overmatch_dot(self, alpharep):
500        root = zipfile.Path(alpharep)
501
502        assert list(root.glob("*.xt")) == []
503
504    @pass_alpharep
505    def test_glob_single_char(self, alpharep):
506        root = zipfile.Path(alpharep)
507
508        assert list(root.glob("a?txt")) == [zipfile.Path(alpharep, "a.txt")]
509        assert list(root.glob("a[.]txt")) == [zipfile.Path(alpharep, "a.txt")]
510        assert list(root.glob("a[?]txt")) == []
511
512    @pass_alpharep
513    def test_glob_chars(self, alpharep):
514        root = zipfile.Path(alpharep)
515
516        assert list(root.glob("j/?.b[ai][nz]")) == [
517            zipfile.Path(alpharep, "j/k.bin"),
518            zipfile.Path(alpharep, "j/l.baz"),
519        ]
520
521    def test_glob_empty(self):
522        root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
523        with self.assertRaises(ValueError):
524            root.glob('')
525
526    @pass_alpharep
527    def test_eq_hash(self, alpharep):
528        root = zipfile.Path(alpharep)
529        assert root == zipfile.Path(alpharep)
530
531        assert root != (root / "a.txt")
532        assert (root / "a.txt") == (root / "a.txt")
533
534        root = zipfile.Path(alpharep)
535        assert root in {root}
536
537    @pass_alpharep
538    def test_is_symlink(self, alpharep):
539        root = zipfile.Path(alpharep)
540        assert not root.joinpath('a.txt').is_symlink()
541        assert root.joinpath('n.txt').is_symlink()
542
543    @pass_alpharep
544    def test_relative_to(self, alpharep):
545        root = zipfile.Path(alpharep)
546        relative = root.joinpath("b", "c.txt").relative_to(root / "b")
547        assert str(relative) == "c.txt"
548
549        relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
550        assert str(relative) == "d/e.txt"
551
552    @pass_alpharep
553    def test_inheritance(self, alpharep):
554        cls = type('PathChild', (zipfile.Path,), {})
555        file = cls(alpharep).joinpath('some dir').parent
556        assert isinstance(file, cls)
557
558    @parameterize(
559        ['alpharep', 'path_type', 'subpath'],
560        itertools.product(
561            alpharep_generators,
562            [str, FakePath],
563            ['', 'b/'],
564        ),
565    )
566    def test_pickle(self, alpharep, path_type, subpath):
567        zipfile_ondisk = path_type(str(self.zipfile_ondisk(alpharep)))
568
569        saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath))
570        restored_1 = pickle.loads(saved_1)
571        first, *rest = restored_1.iterdir()
572        assert first.read_text(encoding='utf-8').startswith('content of ')
573
574    @pass_alpharep
575    def test_extract_orig_with_implied_dirs(self, alpharep):
576        """
577        A zip file wrapped in a Path should extract even with implied dirs.
578        """
579        source_path = self.zipfile_ondisk(alpharep)
580        zf = zipfile.ZipFile(source_path)
581        # wrap the zipfile for its side effect
582        zipfile.Path(zf)
583        zf.extractall(source_path.parent)
584
585    @pass_alpharep
586    def test_getinfo_missing(self, alpharep):
587        """
588        Validate behavior of getinfo on original zipfile after wrapping.
589        """
590        zipfile.Path(alpharep)
591        with self.assertRaises(KeyError):
592            alpharep.getinfo('does-not-exist')
593
594    def test_malformed_paths(self):
595        """
596        Path should handle malformed paths gracefully.
597
598        Paths with leading slashes are not visible.
599
600        Paths with dots are treated like regular files.
601        """
602        data = io.BytesIO()
603        zf = zipfile.ZipFile(data, "w")
604        zf.writestr("/one-slash.txt", b"content")
605        zf.writestr("//two-slash.txt", b"content")
606        zf.writestr("../parent.txt", b"content")
607        zf.filename = ''
608        root = zipfile.Path(zf)
609        assert list(map(str, root.iterdir())) == ['../']
610        assert root.joinpath('..').joinpath('parent.txt').read_bytes() == b'content'
611
612    def test_unsupported_names(self):
613        """
614        Path segments with special characters are readable.
615
616        On some platforms or file systems, characters like
617        ``:`` and ``?`` are not allowed, but they are valid
618        in the zip file.
619        """
620        data = io.BytesIO()
621        zf = zipfile.ZipFile(data, "w")
622        zf.writestr("path?", b"content")
623        zf.writestr("V: NMS.flac", b"fLaC...")
624        zf.filename = ''
625        root = zipfile.Path(zf)
626        contents = root.iterdir()
627        assert next(contents).name == 'path?'
628        assert next(contents).name == 'V: NMS.flac'
629        assert root.joinpath('V: NMS.flac').read_bytes() == b"fLaC..."
630
631    def test_backslash_not_separator(self):
632        """
633        In a zip file, backslashes are not separators.
634        """
635        data = io.BytesIO()
636        zf = zipfile.ZipFile(data, "w")
637        zf.writestr(DirtyZipInfo.for_name("foo\\bar", zf), b"content")
638        zf.filename = ''
639        root = zipfile.Path(zf)
640        (first,) = root.iterdir()
641        assert not first.is_dir()
642        assert first.name == 'foo\\bar'
643
644    @pass_alpharep
645    def test_interface(self, alpharep):
646        from importlib.resources.abc import Traversable
647
648        zf = zipfile.Path(alpharep)
649        assert isinstance(zf, Traversable)
650
651
652class DirtyZipInfo(zipfile.ZipInfo):
653    """
654    Bypass name sanitization.
655    """
656
657    def __init__(self, filename, *args, **kwargs):
658        super().__init__(filename, *args, **kwargs)
659        self.filename = filename
660
661    @classmethod
662    def for_name(cls, name, archive):
663        """
664        Construct the same way that ZipFile.writestr does.
665
666        TODO: extract this functionality and re-use
667        """
668        self = cls(filename=name, date_time=time.localtime(time.time())[:6])
669        self.compress_type = archive.compression
670        self.compress_level = archive.compresslevel
671        if self.filename.endswith('/'):  # pragma: no cover
672            self.external_attr = 0o40775 << 16  # drwxrwxr-x
673            self.external_attr |= 0x10  # MS-DOS directory flag
674        else:
675            self.external_attr = 0o600 << 16  # ?rw-------
676        return self
677