• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import array
2import contextlib
3import importlib.util
4import io
5import itertools
6import os
7import pathlib
8import posixpath
9import string
10import struct
11import subprocess
12import sys
13import time
14import unittest
15import unittest.mock as mock
16import zipfile
17import functools
18
19
20from tempfile import TemporaryFile
21from random import randint, random, randbytes
22
23from test.support import script_helper
24from test.support import (findfile, requires_zlib, requires_bz2,
25                          requires_lzma, captured_stdout)
26from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd
27
28
29TESTFN2 = TESTFN + "2"
30TESTFNDIR = TESTFN + "d"
31FIXEDTEST_SIZE = 1000
32DATAFILES_DIR = 'zipfile_datafiles'
33
34SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
35                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
36                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
37                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
38
39def get_files(test):
40    yield TESTFN2
41    with TemporaryFile() as f:
42        yield f
43        test.assertFalse(f.closed)
44    with io.BytesIO() as f:
45        yield f
46        test.assertFalse(f.closed)
47
48class AbstractTestsWithSourceFile:
49    @classmethod
50    def setUpClass(cls):
51        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
52                              (i, random()), "ascii")
53                        for i in range(FIXEDTEST_SIZE)]
54        cls.data = b''.join(cls.line_gen)
55
56    def setUp(self):
57        # Make a source file with some lines
58        with open(TESTFN, "wb") as fp:
59            fp.write(self.data)
60
61    def make_test_archive(self, f, compression, compresslevel=None):
62        kwargs = {'compression': compression, 'compresslevel': compresslevel}
63        # Create the ZIP archive
64        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
65            zipfp.write(TESTFN, "another.name")
66            zipfp.write(TESTFN, TESTFN)
67            zipfp.writestr("strfile", self.data)
68            with zipfp.open('written-open-w', mode='w') as f:
69                for line in self.line_gen:
70                    f.write(line)
71
72    def zip_test(self, f, compression, compresslevel=None):
73        self.make_test_archive(f, compression, compresslevel)
74
75        # Read the ZIP archive
76        with zipfile.ZipFile(f, "r", compression) as zipfp:
77            self.assertEqual(zipfp.read(TESTFN), self.data)
78            self.assertEqual(zipfp.read("another.name"), self.data)
79            self.assertEqual(zipfp.read("strfile"), self.data)
80
81            # Print the ZIP directory
82            fp = io.StringIO()
83            zipfp.printdir(file=fp)
84            directory = fp.getvalue()
85            lines = directory.splitlines()
86            self.assertEqual(len(lines), 5) # Number of files + header
87
88            self.assertIn('File Name', lines[0])
89            self.assertIn('Modified', lines[0])
90            self.assertIn('Size', lines[0])
91
92            fn, date, time_, size = lines[1].split()
93            self.assertEqual(fn, 'another.name')
94            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
95            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
96            self.assertEqual(size, str(len(self.data)))
97
98            # Check the namelist
99            names = zipfp.namelist()
100            self.assertEqual(len(names), 4)
101            self.assertIn(TESTFN, names)
102            self.assertIn("another.name", names)
103            self.assertIn("strfile", names)
104            self.assertIn("written-open-w", names)
105
106            # Check infolist
107            infos = zipfp.infolist()
108            names = [i.filename for i in infos]
109            self.assertEqual(len(names), 4)
110            self.assertIn(TESTFN, names)
111            self.assertIn("another.name", names)
112            self.assertIn("strfile", names)
113            self.assertIn("written-open-w", names)
114            for i in infos:
115                self.assertEqual(i.file_size, len(self.data))
116
117            # check getinfo
118            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
119                info = zipfp.getinfo(nm)
120                self.assertEqual(info.filename, nm)
121                self.assertEqual(info.file_size, len(self.data))
122
123            # Check that testzip doesn't raise an exception
124            zipfp.testzip()
125
126    def test_basic(self):
127        for f in get_files(self):
128            self.zip_test(f, self.compression)
129
130    def zip_open_test(self, f, compression):
131        self.make_test_archive(f, compression)
132
133        # Read the ZIP archive
134        with zipfile.ZipFile(f, "r", compression) as zipfp:
135            zipdata1 = []
136            with zipfp.open(TESTFN) as zipopen1:
137                while True:
138                    read_data = zipopen1.read(256)
139                    if not read_data:
140                        break
141                    zipdata1.append(read_data)
142
143            zipdata2 = []
144            with zipfp.open("another.name") as zipopen2:
145                while True:
146                    read_data = zipopen2.read(256)
147                    if not read_data:
148                        break
149                    zipdata2.append(read_data)
150
151            self.assertEqual(b''.join(zipdata1), self.data)
152            self.assertEqual(b''.join(zipdata2), self.data)
153
154    def test_open(self):
155        for f in get_files(self):
156            self.zip_open_test(f, self.compression)
157
158    def test_open_with_pathlike(self):
159        path = pathlib.Path(TESTFN2)
160        self.zip_open_test(path, self.compression)
161        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
162            self.assertIsInstance(zipfp.filename, str)
163
164    def zip_random_open_test(self, f, compression):
165        self.make_test_archive(f, compression)
166
167        # Read the ZIP archive
168        with zipfile.ZipFile(f, "r", compression) as zipfp:
169            zipdata1 = []
170            with zipfp.open(TESTFN) as zipopen1:
171                while True:
172                    read_data = zipopen1.read(randint(1, 1024))
173                    if not read_data:
174                        break
175                    zipdata1.append(read_data)
176
177            self.assertEqual(b''.join(zipdata1), self.data)
178
179    def test_random_open(self):
180        for f in get_files(self):
181            self.zip_random_open_test(f, self.compression)
182
183    def zip_read1_test(self, f, compression):
184        self.make_test_archive(f, compression)
185
186        # Read the ZIP archive
187        with zipfile.ZipFile(f, "r") as zipfp, \
188             zipfp.open(TESTFN) as zipopen:
189            zipdata = []
190            while True:
191                read_data = zipopen.read1(-1)
192                if not read_data:
193                    break
194                zipdata.append(read_data)
195
196        self.assertEqual(b''.join(zipdata), self.data)
197
198    def test_read1(self):
199        for f in get_files(self):
200            self.zip_read1_test(f, self.compression)
201
202    def zip_read1_10_test(self, f, compression):
203        self.make_test_archive(f, compression)
204
205        # Read the ZIP archive
206        with zipfile.ZipFile(f, "r") as zipfp, \
207             zipfp.open(TESTFN) as zipopen:
208            zipdata = []
209            while True:
210                read_data = zipopen.read1(10)
211                self.assertLessEqual(len(read_data), 10)
212                if not read_data:
213                    break
214                zipdata.append(read_data)
215
216        self.assertEqual(b''.join(zipdata), self.data)
217
218    def test_read1_10(self):
219        for f in get_files(self):
220            self.zip_read1_10_test(f, self.compression)
221
222    def zip_readline_read_test(self, f, compression):
223        self.make_test_archive(f, compression)
224
225        # Read the ZIP archive
226        with zipfile.ZipFile(f, "r") as zipfp, \
227             zipfp.open(TESTFN) as zipopen:
228            data = b''
229            while True:
230                read = zipopen.readline()
231                if not read:
232                    break
233                data += read
234
235                read = zipopen.read(100)
236                if not read:
237                    break
238                data += read
239
240        self.assertEqual(data, self.data)
241
242    def test_readline_read(self):
243        # Issue #7610: calls to readline() interleaved with calls to read().
244        for f in get_files(self):
245            self.zip_readline_read_test(f, self.compression)
246
247    def zip_readline_test(self, f, compression):
248        self.make_test_archive(f, compression)
249
250        # Read the ZIP archive
251        with zipfile.ZipFile(f, "r") as zipfp:
252            with zipfp.open(TESTFN) as zipopen:
253                for line in self.line_gen:
254                    linedata = zipopen.readline()
255                    self.assertEqual(linedata, line)
256
257    def test_readline(self):
258        for f in get_files(self):
259            self.zip_readline_test(f, self.compression)
260
261    def zip_readlines_test(self, f, compression):
262        self.make_test_archive(f, compression)
263
264        # Read the ZIP archive
265        with zipfile.ZipFile(f, "r") as zipfp:
266            with zipfp.open(TESTFN) as zipopen:
267                ziplines = zipopen.readlines()
268            for line, zipline in zip(self.line_gen, ziplines):
269                self.assertEqual(zipline, line)
270
271    def test_readlines(self):
272        for f in get_files(self):
273            self.zip_readlines_test(f, self.compression)
274
275    def zip_iterlines_test(self, f, compression):
276        self.make_test_archive(f, compression)
277
278        # Read the ZIP archive
279        with zipfile.ZipFile(f, "r") as zipfp:
280            with zipfp.open(TESTFN) as zipopen:
281                for line, zipline in zip(self.line_gen, zipopen):
282                    self.assertEqual(zipline, line)
283
284    def test_iterlines(self):
285        for f in get_files(self):
286            self.zip_iterlines_test(f, self.compression)
287
288    def test_low_compression(self):
289        """Check for cases where compressed data is larger than original."""
290        # Create the ZIP archive
291        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
292            zipfp.writestr("strfile", '12')
293
294        # Get an open object for strfile
295        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
296            with zipfp.open("strfile") as openobj:
297                self.assertEqual(openobj.read(1), b'1')
298                self.assertEqual(openobj.read(1), b'2')
299
300    def test_writestr_compression(self):
301        zipfp = zipfile.ZipFile(TESTFN2, "w")
302        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
303        info = zipfp.getinfo('b.txt')
304        self.assertEqual(info.compress_type, self.compression)
305
306    def test_writestr_compresslevel(self):
307        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
308        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
309        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
310                       compresslevel=2)
311
312        # Compression level follows the constructor.
313        a_info = zipfp.getinfo('a.txt')
314        self.assertEqual(a_info.compress_type, self.compression)
315        self.assertEqual(a_info._compresslevel, 1)
316
317        # Compression level is overridden.
318        b_info = zipfp.getinfo('b.txt')
319        self.assertEqual(b_info.compress_type, self.compression)
320        self.assertEqual(b_info._compresslevel, 2)
321
322    def test_read_return_size(self):
323        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
324        # than requested.
325        for test_size in (1, 4095, 4096, 4097, 16384):
326            file_size = test_size + 1
327            junk = randbytes(file_size)
328            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
329                zipf.writestr('foo', junk)
330                with zipf.open('foo', 'r') as fp:
331                    buf = fp.read(test_size)
332                    self.assertEqual(len(buf), test_size)
333
334    def test_truncated_zipfile(self):
335        fp = io.BytesIO()
336        with zipfile.ZipFile(fp, mode='w') as zipf:
337            zipf.writestr('strfile', self.data, compress_type=self.compression)
338            end_offset = fp.tell()
339        zipfiledata = fp.getvalue()
340
341        fp = io.BytesIO(zipfiledata)
342        with zipfile.ZipFile(fp) as zipf:
343            with zipf.open('strfile') as zipopen:
344                fp.truncate(end_offset - 20)
345                with self.assertRaises(EOFError):
346                    zipopen.read()
347
348        fp = io.BytesIO(zipfiledata)
349        with zipfile.ZipFile(fp) as zipf:
350            with zipf.open('strfile') as zipopen:
351                fp.truncate(end_offset - 20)
352                with self.assertRaises(EOFError):
353                    while zipopen.read(100):
354                        pass
355
356        fp = io.BytesIO(zipfiledata)
357        with zipfile.ZipFile(fp) as zipf:
358            with zipf.open('strfile') as zipopen:
359                fp.truncate(end_offset - 20)
360                with self.assertRaises(EOFError):
361                    while zipopen.read1(100):
362                        pass
363
364    def test_repr(self):
365        fname = 'file.name'
366        for f in get_files(self):
367            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
368                zipfp.write(TESTFN, fname)
369                r = repr(zipfp)
370                self.assertIn("mode='w'", r)
371
372            with zipfile.ZipFile(f, 'r') as zipfp:
373                r = repr(zipfp)
374                if isinstance(f, str):
375                    self.assertIn('filename=%r' % f, r)
376                else:
377                    self.assertIn('file=%r' % f, r)
378                self.assertIn("mode='r'", r)
379                r = repr(zipfp.getinfo(fname))
380                self.assertIn('filename=%r' % fname, r)
381                self.assertIn('filemode=', r)
382                self.assertIn('file_size=', r)
383                if self.compression != zipfile.ZIP_STORED:
384                    self.assertIn('compress_type=', r)
385                    self.assertIn('compress_size=', r)
386                with zipfp.open(fname) as zipopen:
387                    r = repr(zipopen)
388                    self.assertIn('name=%r' % fname, r)
389                    self.assertIn("mode='r'", r)
390                    if self.compression != zipfile.ZIP_STORED:
391                        self.assertIn('compress_type=', r)
392                self.assertIn('[closed]', repr(zipopen))
393            self.assertIn('[closed]', repr(zipfp))
394
395    def test_compresslevel_basic(self):
396        for f in get_files(self):
397            self.zip_test(f, self.compression, compresslevel=9)
398
399    def test_per_file_compresslevel(self):
400        """Check that files within a Zip archive can have different
401        compression levels."""
402        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
403            zipfp.write(TESTFN, 'compress_1')
404            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
405            one_info = zipfp.getinfo('compress_1')
406            nine_info = zipfp.getinfo('compress_9')
407            self.assertEqual(one_info._compresslevel, 1)
408            self.assertEqual(nine_info._compresslevel, 9)
409
410    def test_writing_errors(self):
411        class BrokenFile(io.BytesIO):
412            def write(self, data):
413                nonlocal count
414                if count is not None:
415                    if count == stop:
416                        raise OSError
417                    count += 1
418                super().write(data)
419
420        stop = 0
421        while True:
422            testfile = BrokenFile()
423            count = None
424            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
425                with zipfp.open('file1', 'w') as f:
426                    f.write(b'data1')
427                count = 0
428                try:
429                    with zipfp.open('file2', 'w') as f:
430                        f.write(b'data2')
431                except OSError:
432                    stop += 1
433                else:
434                    break
435                finally:
436                    count = None
437            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
438                self.assertEqual(zipfp.namelist(), ['file1'])
439                self.assertEqual(zipfp.read('file1'), b'data1')
440
441        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
442            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
443            self.assertEqual(zipfp.read('file1'), b'data1')
444            self.assertEqual(zipfp.read('file2'), b'data2')
445
446
447    def tearDown(self):
448        unlink(TESTFN)
449        unlink(TESTFN2)
450
451
452class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
453                                unittest.TestCase):
454    compression = zipfile.ZIP_STORED
455    test_low_compression = None
456
457    def zip_test_writestr_permissions(self, f, compression):
458        # Make sure that writestr and open(... mode='w') create files with
459        # mode 0600, when they are passed a name rather than a ZipInfo
460        # instance.
461
462        self.make_test_archive(f, compression)
463        with zipfile.ZipFile(f, "r") as zipfp:
464            zinfo = zipfp.getinfo('strfile')
465            self.assertEqual(zinfo.external_attr, 0o600 << 16)
466
467            zinfo2 = zipfp.getinfo('written-open-w')
468            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
469
470    def test_writestr_permissions(self):
471        for f in get_files(self):
472            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
473
474    def test_absolute_arcnames(self):
475        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
476            zipfp.write(TESTFN, "/absolute")
477
478        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
479            self.assertEqual(zipfp.namelist(), ["absolute"])
480
481    def test_append_to_zip_file(self):
482        """Test appending to an existing zipfile."""
483        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
484            zipfp.write(TESTFN, TESTFN)
485
486        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
487            zipfp.writestr("strfile", self.data)
488            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
489
490    def test_append_to_non_zip_file(self):
491        """Test appending to an existing file that is not a zipfile."""
492        # NOTE: this test fails if len(d) < 22 because of the first
493        # line "fpin.seek(-22, 2)" in _EndRecData
494        data = b'I am not a ZipFile!'*10
495        with open(TESTFN2, 'wb') as f:
496            f.write(data)
497
498        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
499            zipfp.write(TESTFN, TESTFN)
500
501        with open(TESTFN2, 'rb') as f:
502            f.seek(len(data))
503            with zipfile.ZipFile(f, "r") as zipfp:
504                self.assertEqual(zipfp.namelist(), [TESTFN])
505                self.assertEqual(zipfp.read(TESTFN), self.data)
506        with open(TESTFN2, 'rb') as f:
507            self.assertEqual(f.read(len(data)), data)
508            zipfiledata = f.read()
509        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
510            self.assertEqual(zipfp.namelist(), [TESTFN])
511            self.assertEqual(zipfp.read(TESTFN), self.data)
512
513    def test_read_concatenated_zip_file(self):
514        with io.BytesIO() as bio:
515            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
516                zipfp.write(TESTFN, TESTFN)
517            zipfiledata = bio.getvalue()
518        data = b'I am not a ZipFile!'*10
519        with open(TESTFN2, 'wb') as f:
520            f.write(data)
521            f.write(zipfiledata)
522
523        with zipfile.ZipFile(TESTFN2) as zipfp:
524            self.assertEqual(zipfp.namelist(), [TESTFN])
525            self.assertEqual(zipfp.read(TESTFN), self.data)
526
527    def test_append_to_concatenated_zip_file(self):
528        with io.BytesIO() as bio:
529            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
530                zipfp.write(TESTFN, TESTFN)
531            zipfiledata = bio.getvalue()
532        data = b'I am not a ZipFile!'*1000000
533        with open(TESTFN2, 'wb') as f:
534            f.write(data)
535            f.write(zipfiledata)
536
537        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
538            self.assertEqual(zipfp.namelist(), [TESTFN])
539            zipfp.writestr('strfile', self.data)
540
541        with open(TESTFN2, 'rb') as f:
542            self.assertEqual(f.read(len(data)), data)
543            zipfiledata = f.read()
544        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
545            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
546            self.assertEqual(zipfp.read(TESTFN), self.data)
547            self.assertEqual(zipfp.read('strfile'), self.data)
548
549    def test_ignores_newline_at_end(self):
550        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
551            zipfp.write(TESTFN, TESTFN)
552        with open(TESTFN2, 'a', encoding='utf-8') as f:
553            f.write("\r\n\00\00\00")
554        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
555            self.assertIsInstance(zipfp, zipfile.ZipFile)
556
557    def test_ignores_stuff_appended_past_comments(self):
558        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
559            zipfp.comment = b"this is a comment"
560            zipfp.write(TESTFN, TESTFN)
561        with open(TESTFN2, 'a', encoding='utf-8') as f:
562            f.write("abcdef\r\n")
563        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
564            self.assertIsInstance(zipfp, zipfile.ZipFile)
565            self.assertEqual(zipfp.comment, b"this is a comment")
566
567    def test_write_default_name(self):
568        """Check that calling ZipFile.write without arcname specified
569        produces the expected result."""
570        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
571            zipfp.write(TESTFN)
572            with open(TESTFN, "rb") as f:
573                self.assertEqual(zipfp.read(TESTFN), f.read())
574
575    def test_io_on_closed_zipextfile(self):
576        fname = "somefile.txt"
577        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
578            zipfp.writestr(fname, "bogus")
579
580        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
581            with zipfp.open(fname) as fid:
582                fid.close()
583                self.assertRaises(ValueError, fid.read)
584                self.assertRaises(ValueError, fid.seek, 0)
585                self.assertRaises(ValueError, fid.tell)
586                self.assertRaises(ValueError, fid.readable)
587                self.assertRaises(ValueError, fid.seekable)
588
589    def test_write_to_readonly(self):
590        """Check that trying to call write() on a readonly ZipFile object
591        raises a ValueError."""
592        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
593            zipfp.writestr("somefile.txt", "bogus")
594
595        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
596            self.assertRaises(ValueError, zipfp.write, TESTFN)
597
598        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
599            with self.assertRaises(ValueError):
600                zipfp.open(TESTFN, mode='w')
601
602    def test_add_file_before_1980(self):
603        # Set atime and mtime to 1970-01-01
604        os.utime(TESTFN, (0, 0))
605        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
606            self.assertRaises(ValueError, zipfp.write, TESTFN)
607
608        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
609            zipfp.write(TESTFN)
610            zinfo = zipfp.getinfo(TESTFN)
611            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
612
613    def test_add_file_after_2107(self):
614        # Set atime and mtime to 2108-12-30
615        ts = 4386268800
616        try:
617            time.localtime(ts)
618        except OverflowError:
619            self.skipTest(f'time.localtime({ts}) raises OverflowError')
620        try:
621            os.utime(TESTFN, (ts, ts))
622        except OverflowError:
623            self.skipTest('Host fs cannot set timestamp to required value.')
624
625        mtime_ns = os.stat(TESTFN).st_mtime_ns
626        if mtime_ns != (4386268800 * 10**9):
627            # XFS filesystem is limited to 32-bit timestamp, but the syscall
628            # didn't fail. Moreover, there is a VFS bug which returns
629            # a cached timestamp which is different than the value on disk.
630            #
631            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
632            #
633            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
634            # https://bugs.python.org/issue39460#msg360952
635            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
636
637        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
638            self.assertRaises(struct.error, zipfp.write, TESTFN)
639
640        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
641            zipfp.write(TESTFN)
642            zinfo = zipfp.getinfo(TESTFN)
643            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
644
645
646@requires_zlib()
647class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
648                                 unittest.TestCase):
649    compression = zipfile.ZIP_DEFLATED
650
651    def test_per_file_compression(self):
652        """Check that files within a Zip archive can have different
653        compression options."""
654        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
655            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
656            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
657            sinfo = zipfp.getinfo('storeme')
658            dinfo = zipfp.getinfo('deflateme')
659            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
660            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
661
662@requires_bz2()
663class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
664                               unittest.TestCase):
665    compression = zipfile.ZIP_BZIP2
666
667@requires_lzma()
668class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
669                              unittest.TestCase):
670    compression = zipfile.ZIP_LZMA
671
672
673class AbstractTestZip64InSmallFiles:
674    # These tests test the ZIP64 functionality without using large files,
675    # see test_zipfile64 for proper tests.
676
677    @classmethod
678    def setUpClass(cls):
679        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
680                    for i in range(0, FIXEDTEST_SIZE))
681        cls.data = b'\n'.join(line_gen)
682
683    def setUp(self):
684        self._limit = zipfile.ZIP64_LIMIT
685        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
686        zipfile.ZIP64_LIMIT = 1000
687        zipfile.ZIP_FILECOUNT_LIMIT = 9
688
689        # Make a source file with some lines
690        with open(TESTFN, "wb") as fp:
691            fp.write(self.data)
692
693    def zip_test(self, f, compression):
694        # Create the ZIP archive
695        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
696            zipfp.write(TESTFN, "another.name")
697            zipfp.write(TESTFN, TESTFN)
698            zipfp.writestr("strfile", self.data)
699
700        # Read the ZIP archive
701        with zipfile.ZipFile(f, "r", compression) as zipfp:
702            self.assertEqual(zipfp.read(TESTFN), self.data)
703            self.assertEqual(zipfp.read("another.name"), self.data)
704            self.assertEqual(zipfp.read("strfile"), self.data)
705
706            # Print the ZIP directory
707            fp = io.StringIO()
708            zipfp.printdir(fp)
709
710            directory = fp.getvalue()
711            lines = directory.splitlines()
712            self.assertEqual(len(lines), 4) # Number of files + header
713
714            self.assertIn('File Name', lines[0])
715            self.assertIn('Modified', lines[0])
716            self.assertIn('Size', lines[0])
717
718            fn, date, time_, size = lines[1].split()
719            self.assertEqual(fn, 'another.name')
720            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
721            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
722            self.assertEqual(size, str(len(self.data)))
723
724            # Check the namelist
725            names = zipfp.namelist()
726            self.assertEqual(len(names), 3)
727            self.assertIn(TESTFN, names)
728            self.assertIn("another.name", names)
729            self.assertIn("strfile", names)
730
731            # Check infolist
732            infos = zipfp.infolist()
733            names = [i.filename for i in infos]
734            self.assertEqual(len(names), 3)
735            self.assertIn(TESTFN, names)
736            self.assertIn("another.name", names)
737            self.assertIn("strfile", names)
738            for i in infos:
739                self.assertEqual(i.file_size, len(self.data))
740
741            # check getinfo
742            for nm in (TESTFN, "another.name", "strfile"):
743                info = zipfp.getinfo(nm)
744                self.assertEqual(info.filename, nm)
745                self.assertEqual(info.file_size, len(self.data))
746
747            # Check that testzip doesn't raise an exception
748            zipfp.testzip()
749
750    def test_basic(self):
751        for f in get_files(self):
752            self.zip_test(f, self.compression)
753
754    def test_too_many_files(self):
755        # This test checks that more than 64k files can be added to an archive,
756        # and that the resulting archive can be read properly by ZipFile
757        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
758                               allowZip64=True)
759        zipf.debug = 100
760        numfiles = 15
761        for i in range(numfiles):
762            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
763        self.assertEqual(len(zipf.namelist()), numfiles)
764        zipf.close()
765
766        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
767        self.assertEqual(len(zipf2.namelist()), numfiles)
768        for i in range(numfiles):
769            content = zipf2.read("foo%08d" % i).decode('ascii')
770            self.assertEqual(content, "%d" % (i**3 % 57))
771        zipf2.close()
772
773    def test_too_many_files_append(self):
774        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
775                               allowZip64=False)
776        zipf.debug = 100
777        numfiles = 9
778        for i in range(numfiles):
779            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
780        self.assertEqual(len(zipf.namelist()), numfiles)
781        with self.assertRaises(zipfile.LargeZipFile):
782            zipf.writestr("foo%08d" % numfiles, b'')
783        self.assertEqual(len(zipf.namelist()), numfiles)
784        zipf.close()
785
786        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
787                               allowZip64=False)
788        zipf.debug = 100
789        self.assertEqual(len(zipf.namelist()), numfiles)
790        with self.assertRaises(zipfile.LargeZipFile):
791            zipf.writestr("foo%08d" % numfiles, b'')
792        self.assertEqual(len(zipf.namelist()), numfiles)
793        zipf.close()
794
795        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
796                               allowZip64=True)
797        zipf.debug = 100
798        self.assertEqual(len(zipf.namelist()), numfiles)
799        numfiles2 = 15
800        for i in range(numfiles, numfiles2):
801            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
802        self.assertEqual(len(zipf.namelist()), numfiles2)
803        zipf.close()
804
805        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
806        self.assertEqual(len(zipf2.namelist()), numfiles2)
807        for i in range(numfiles2):
808            content = zipf2.read("foo%08d" % i).decode('ascii')
809            self.assertEqual(content, "%d" % (i**3 % 57))
810        zipf2.close()
811
812    def tearDown(self):
813        zipfile.ZIP64_LIMIT = self._limit
814        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
815        unlink(TESTFN)
816        unlink(TESTFN2)
817
818
819class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
820                                  unittest.TestCase):
821    compression = zipfile.ZIP_STORED
822
823    def large_file_exception_test(self, f, compression):
824        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
825            self.assertRaises(zipfile.LargeZipFile,
826                              zipfp.write, TESTFN, "another.name")
827
828    def large_file_exception_test2(self, f, compression):
829        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
830            self.assertRaises(zipfile.LargeZipFile,
831                              zipfp.writestr, "another.name", self.data)
832
833    def test_large_file_exception(self):
834        for f in get_files(self):
835            self.large_file_exception_test(f, zipfile.ZIP_STORED)
836            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
837
838    def test_absolute_arcnames(self):
839        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
840                             allowZip64=True) as zipfp:
841            zipfp.write(TESTFN, "/absolute")
842
843        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
844            self.assertEqual(zipfp.namelist(), ["absolute"])
845
846    def test_append(self):
847        # Test that appending to the Zip64 archive doesn't change
848        # extra fields of existing entries.
849        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
850            zipfp.writestr("strfile", self.data)
851        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
852            zinfo = zipfp.getinfo("strfile")
853            extra = zinfo.extra
854        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
855            zipfp.writestr("strfile2", self.data)
856        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
857            zinfo = zipfp.getinfo("strfile")
858            self.assertEqual(zinfo.extra, extra)
859
860    def make_zip64_file(
861        self, file_size_64_set=False, file_size_extra=False,
862        compress_size_64_set=False, compress_size_extra=False,
863        header_offset_64_set=False, header_offset_extra=False,
864    ):
865        """Generate bytes sequence for a zip with (incomplete) zip64 data.
866
867        The actual values (not the zip 64 0xffffffff values) stored in the file
868        are:
869        file_size: 8
870        compress_size: 8
871        header_offset: 0
872        """
873        actual_size = 8
874        actual_header_offset = 0
875        local_zip64_fields = []
876        central_zip64_fields = []
877
878        file_size = actual_size
879        if file_size_64_set:
880            file_size = 0xffffffff
881            if file_size_extra:
882                local_zip64_fields.append(actual_size)
883                central_zip64_fields.append(actual_size)
884        file_size = struct.pack("<L", file_size)
885
886        compress_size = actual_size
887        if compress_size_64_set:
888            compress_size = 0xffffffff
889            if compress_size_extra:
890                local_zip64_fields.append(actual_size)
891                central_zip64_fields.append(actual_size)
892        compress_size = struct.pack("<L", compress_size)
893
894        header_offset = actual_header_offset
895        if header_offset_64_set:
896            header_offset = 0xffffffff
897            if header_offset_extra:
898                central_zip64_fields.append(actual_header_offset)
899        header_offset = struct.pack("<L", header_offset)
900
901        local_extra = struct.pack(
902            '<HH' + 'Q'*len(local_zip64_fields),
903            0x0001,
904            8*len(local_zip64_fields),
905            *local_zip64_fields
906        )
907
908        central_extra = struct.pack(
909            '<HH' + 'Q'*len(central_zip64_fields),
910            0x0001,
911            8*len(central_zip64_fields),
912            *central_zip64_fields
913        )
914
915        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
916        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
917
918        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
919        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
920
921        filename = b"test.txt"
922        content = b"test1234"
923        filename_length = struct.pack("<H", len(filename))
924        zip64_contents = (
925            # Local file header
926            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
927            + compress_size
928            + file_size
929            + filename_length
930            + local_extra_length
931            + filename
932            + local_extra
933            + content
934            # Central directory:
935            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
936            + compress_size
937            + file_size
938            + filename_length
939            + central_extra_length
940            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
941            + header_offset
942            + filename
943            + central_extra
944            # Zip64 end of central directory
945            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
946            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
947            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
948            + central_dir_size
949            + offset_to_central_dir
950            # Zip64 end of central directory locator
951            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
952            + b"\x00\x00\x00"
953            # end of central directory
954            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
955            + b"\x00\x00\x00\x00"
956        )
957        return zip64_contents
958
959    def test_bad_zip64_extra(self):
960        """Missing zip64 extra records raises an exception.
961
962        There are 4 fields that the zip64 format handles (the disk number is
963        not used in this module and so is ignored here). According to the zip
964        spec:
965              The order of the fields in the zip64 extended
966              information record is fixed, but the fields MUST
967              only appear if the corresponding Local or Central
968              directory record field is set to 0xFFFF or 0xFFFFFFFF.
969
970        If the zip64 extra content doesn't contain enough entries for the
971        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
972        This test mismatches the length of the zip64 extra field and the number
973        of fields set to indicate the presence of zip64 data.
974        """
975        # zip64 file size present, no fields in extra, expecting one, equals
976        # missing file size.
977        missing_file_size_extra = self.make_zip64_file(
978            file_size_64_set=True,
979        )
980        with self.assertRaises(zipfile.BadZipFile) as e:
981            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
982        self.assertIn('file size', str(e.exception).lower())
983
984        # zip64 file size present, zip64 compress size present, one field in
985        # extra, expecting two, equals missing compress size.
986        missing_compress_size_extra = self.make_zip64_file(
987            file_size_64_set=True,
988            file_size_extra=True,
989            compress_size_64_set=True,
990        )
991        with self.assertRaises(zipfile.BadZipFile) as e:
992            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
993        self.assertIn('compress size', str(e.exception).lower())
994
995        # zip64 compress size present, no fields in extra, expecting one,
996        # equals missing compress size.
997        missing_compress_size_extra = self.make_zip64_file(
998            compress_size_64_set=True,
999        )
1000        with self.assertRaises(zipfile.BadZipFile) as e:
1001            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1002        self.assertIn('compress size', str(e.exception).lower())
1003
1004        # zip64 file size present, zip64 compress size present, zip64 header
1005        # offset present, two fields in extra, expecting three, equals missing
1006        # header offset
1007        missing_header_offset_extra = self.make_zip64_file(
1008            file_size_64_set=True,
1009            file_size_extra=True,
1010            compress_size_64_set=True,
1011            compress_size_extra=True,
1012            header_offset_64_set=True,
1013        )
1014        with self.assertRaises(zipfile.BadZipFile) as e:
1015            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1016        self.assertIn('header offset', str(e.exception).lower())
1017
1018        # zip64 compress size present, zip64 header offset present, one field
1019        # in extra, expecting two, equals missing header offset
1020        missing_header_offset_extra = self.make_zip64_file(
1021            file_size_64_set=False,
1022            compress_size_64_set=True,
1023            compress_size_extra=True,
1024            header_offset_64_set=True,
1025        )
1026        with self.assertRaises(zipfile.BadZipFile) as e:
1027            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1028        self.assertIn('header offset', str(e.exception).lower())
1029
1030        # zip64 file size present, zip64 header offset present, one field in
1031        # extra, expecting two, equals missing header offset
1032        missing_header_offset_extra = self.make_zip64_file(
1033            file_size_64_set=True,
1034            file_size_extra=True,
1035            compress_size_64_set=False,
1036            header_offset_64_set=True,
1037        )
1038        with self.assertRaises(zipfile.BadZipFile) as e:
1039            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1040        self.assertIn('header offset', str(e.exception).lower())
1041
1042        # zip64 header offset present, no fields in extra, expecting one,
1043        # equals missing header offset
1044        missing_header_offset_extra = self.make_zip64_file(
1045            file_size_64_set=False,
1046            compress_size_64_set=False,
1047            header_offset_64_set=True,
1048        )
1049        with self.assertRaises(zipfile.BadZipFile) as e:
1050            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1051        self.assertIn('header offset', str(e.exception).lower())
1052
1053    def test_generated_valid_zip64_extra(self):
1054        # These values are what is set in the make_zip64_file method.
1055        expected_file_size = 8
1056        expected_compress_size = 8
1057        expected_header_offset = 0
1058        expected_content = b"test1234"
1059
1060        # Loop through the various valid combinations of zip64 masks
1061        # present and extra fields present.
1062        params = (
1063            {"file_size_64_set": True, "file_size_extra": True},
1064            {"compress_size_64_set": True, "compress_size_extra": True},
1065            {"header_offset_64_set": True, "header_offset_extra": True},
1066        )
1067
1068        for r in range(1, len(params) + 1):
1069            for combo in itertools.combinations(params, r):
1070                kwargs = {}
1071                for c in combo:
1072                    kwargs.update(c)
1073                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1074                    zinfo = zf.infolist()[0]
1075                    self.assertEqual(zinfo.file_size, expected_file_size)
1076                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1077                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1078                    self.assertEqual(zf.read(zinfo), expected_content)
1079
1080
1081@requires_zlib()
1082class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1083                                   unittest.TestCase):
1084    compression = zipfile.ZIP_DEFLATED
1085
1086@requires_bz2()
1087class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1088                                 unittest.TestCase):
1089    compression = zipfile.ZIP_BZIP2
1090
1091@requires_lzma()
1092class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1093                                unittest.TestCase):
1094    compression = zipfile.ZIP_LZMA
1095
1096
1097class AbstractWriterTests:
1098
1099    def tearDown(self):
1100        unlink(TESTFN2)
1101
1102    def test_close_after_close(self):
1103        data = b'content'
1104        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1105            w = zipf.open('test', 'w')
1106            w.write(data)
1107            w.close()
1108            self.assertTrue(w.closed)
1109            w.close()
1110            self.assertTrue(w.closed)
1111            self.assertEqual(zipf.read('test'), data)
1112
1113    def test_write_after_close(self):
1114        data = b'content'
1115        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1116            w = zipf.open('test', 'w')
1117            w.write(data)
1118            w.close()
1119            self.assertTrue(w.closed)
1120            self.assertRaises(ValueError, w.write, b'')
1121            self.assertEqual(zipf.read('test'), data)
1122
1123    def test_issue44439(self):
1124        q = array.array('Q', [1, 2, 3, 4, 5])
1125        LENGTH = len(q) * q.itemsize
1126        with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip:
1127            with zip.open('data', 'w') as data:
1128                self.assertEqual(data.write(q), LENGTH)
1129            self.assertEqual(zip.getinfo('data').file_size, LENGTH)
1130
1131class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1132    compression = zipfile.ZIP_STORED
1133
1134@requires_zlib()
1135class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1136    compression = zipfile.ZIP_DEFLATED
1137
1138@requires_bz2()
1139class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1140    compression = zipfile.ZIP_BZIP2
1141
1142@requires_lzma()
1143class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1144    compression = zipfile.ZIP_LZMA
1145
1146
1147class PyZipFileTests(unittest.TestCase):
1148    def assertCompiledIn(self, name, namelist):
1149        if name + 'o' not in namelist:
1150            self.assertIn(name + 'c', namelist)
1151
1152    def requiresWriteAccess(self, path):
1153        # effective_ids unavailable on windows
1154        if not os.access(path, os.W_OK,
1155                         effective_ids=os.access in os.supports_effective_ids):
1156            self.skipTest('requires write access to the installed location')
1157        filename = os.path.join(path, 'test_zipfile.try')
1158        try:
1159            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1160            os.close(fd)
1161        except Exception:
1162            self.skipTest('requires write access to the installed location')
1163        unlink(filename)
1164
1165    def test_write_pyfile(self):
1166        self.requiresWriteAccess(os.path.dirname(__file__))
1167        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1168            fn = __file__
1169            if fn.endswith('.pyc'):
1170                path_split = fn.split(os.sep)
1171                if os.altsep is not None:
1172                    path_split.extend(fn.split(os.altsep))
1173                if '__pycache__' in path_split:
1174                    fn = importlib.util.source_from_cache(fn)
1175                else:
1176                    fn = fn[:-1]
1177
1178            zipfp.writepy(fn)
1179
1180            bn = os.path.basename(fn)
1181            self.assertNotIn(bn, zipfp.namelist())
1182            self.assertCompiledIn(bn, zipfp.namelist())
1183
1184        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1185            fn = __file__
1186            if fn.endswith('.pyc'):
1187                fn = fn[:-1]
1188
1189            zipfp.writepy(fn, "testpackage")
1190
1191            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1192            self.assertNotIn(bn, zipfp.namelist())
1193            self.assertCompiledIn(bn, zipfp.namelist())
1194
1195    def test_write_python_package(self):
1196        import email
1197        packagedir = os.path.dirname(email.__file__)
1198        self.requiresWriteAccess(packagedir)
1199
1200        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1201            zipfp.writepy(packagedir)
1202
1203            # Check for a couple of modules at different levels of the
1204            # hierarchy
1205            names = zipfp.namelist()
1206            self.assertCompiledIn('email/__init__.py', names)
1207            self.assertCompiledIn('email/mime/text.py', names)
1208
1209    def test_write_filtered_python_package(self):
1210        import test
1211        packagedir = os.path.dirname(test.__file__)
1212        self.requiresWriteAccess(packagedir)
1213
1214        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1215
1216            # first make sure that the test folder gives error messages
1217            # (on the badsyntax_... files)
1218            with captured_stdout() as reportSIO:
1219                zipfp.writepy(packagedir)
1220            reportStr = reportSIO.getvalue()
1221            self.assertTrue('SyntaxError' in reportStr)
1222
1223            # then check that the filter works on the whole package
1224            with captured_stdout() as reportSIO:
1225                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1226            reportStr = reportSIO.getvalue()
1227            self.assertTrue('SyntaxError' not in reportStr)
1228
1229            # then check that the filter works on individual files
1230            def filter(path):
1231                return not os.path.basename(path).startswith("bad")
1232            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1233                zipfp.writepy(packagedir, filterfunc=filter)
1234            reportStr = reportSIO.getvalue()
1235            if reportStr:
1236                print(reportStr)
1237            self.assertTrue('SyntaxError' not in reportStr)
1238
1239    def test_write_with_optimization(self):
1240        import email
1241        packagedir = os.path.dirname(email.__file__)
1242        self.requiresWriteAccess(packagedir)
1243        optlevel = 1 if __debug__ else 0
1244        ext = '.pyc'
1245
1246        with TemporaryFile() as t, \
1247             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1248            zipfp.writepy(packagedir)
1249
1250            names = zipfp.namelist()
1251            self.assertIn('email/__init__' + ext, names)
1252            self.assertIn('email/mime/text' + ext, names)
1253
1254    def test_write_python_directory(self):
1255        os.mkdir(TESTFN2)
1256        try:
1257            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1258                fp.write("print(42)\n")
1259
1260            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1261                fp.write("print(42 * 42)\n")
1262
1263            with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp:
1264                fp.write("bla bla bla\n")
1265
1266            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1267                zipfp.writepy(TESTFN2)
1268
1269                names = zipfp.namelist()
1270                self.assertCompiledIn('mod1.py', names)
1271                self.assertCompiledIn('mod2.py', names)
1272                self.assertNotIn('mod2.txt', names)
1273
1274        finally:
1275            rmtree(TESTFN2)
1276
1277    def test_write_python_directory_filtered(self):
1278        os.mkdir(TESTFN2)
1279        try:
1280            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1281                fp.write("print(42)\n")
1282
1283            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1284                fp.write("print(42 * 42)\n")
1285
1286            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1287                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1288                                                  not fn.endswith('mod2.py'))
1289
1290                names = zipfp.namelist()
1291                self.assertCompiledIn('mod1.py', names)
1292                self.assertNotIn('mod2.py', names)
1293
1294        finally:
1295            rmtree(TESTFN2)
1296
1297    def test_write_non_pyfile(self):
1298        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1299            with open(TESTFN, 'w', encoding='utf-8') as f:
1300                f.write('most definitely not a python file')
1301            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1302            unlink(TESTFN)
1303
1304    def test_write_pyfile_bad_syntax(self):
1305        os.mkdir(TESTFN2)
1306        try:
1307            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1308                fp.write("Bad syntax in python file\n")
1309
1310            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1311                # syntax errors are printed to stdout
1312                with captured_stdout() as s:
1313                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1314
1315                self.assertIn("SyntaxError", s.getvalue())
1316
1317                # as it will not have compiled the python file, it will
1318                # include the .py file not .pyc
1319                names = zipfp.namelist()
1320                self.assertIn('mod1.py', names)
1321                self.assertNotIn('mod1.pyc', names)
1322
1323        finally:
1324            rmtree(TESTFN2)
1325
1326    def test_write_pathlike(self):
1327        os.mkdir(TESTFN2)
1328        try:
1329            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1330                fp.write("print(42)\n")
1331
1332            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1333                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1334                names = zipfp.namelist()
1335                self.assertCompiledIn('mod1.py', names)
1336        finally:
1337            rmtree(TESTFN2)
1338
1339
1340class ExtractTests(unittest.TestCase):
1341
1342    def make_test_file(self):
1343        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1344            for fpath, fdata in SMALL_TEST_DATA:
1345                zipfp.writestr(fpath, fdata)
1346
1347    def test_extract(self):
1348        with temp_cwd():
1349            self.make_test_file()
1350            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1351                for fpath, fdata in SMALL_TEST_DATA:
1352                    writtenfile = zipfp.extract(fpath)
1353
1354                    # make sure it was written to the right place
1355                    correctfile = os.path.join(os.getcwd(), fpath)
1356                    correctfile = os.path.normpath(correctfile)
1357
1358                    self.assertEqual(writtenfile, correctfile)
1359
1360                    # make sure correct data is in correct file
1361                    with open(writtenfile, "rb") as f:
1362                        self.assertEqual(fdata.encode(), f.read())
1363
1364                    unlink(writtenfile)
1365
1366    def _test_extract_with_target(self, target):
1367        self.make_test_file()
1368        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1369            for fpath, fdata in SMALL_TEST_DATA:
1370                writtenfile = zipfp.extract(fpath, target)
1371
1372                # make sure it was written to the right place
1373                correctfile = os.path.join(target, fpath)
1374                correctfile = os.path.normpath(correctfile)
1375                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1376
1377                # make sure correct data is in correct file
1378                with open(writtenfile, "rb") as f:
1379                    self.assertEqual(fdata.encode(), f.read())
1380
1381                unlink(writtenfile)
1382
1383        unlink(TESTFN2)
1384
1385    def test_extract_with_target(self):
1386        with temp_dir() as extdir:
1387            self._test_extract_with_target(extdir)
1388
1389    def test_extract_with_target_pathlike(self):
1390        with temp_dir() as extdir:
1391            self._test_extract_with_target(pathlib.Path(extdir))
1392
1393    def test_extract_all(self):
1394        with temp_cwd():
1395            self.make_test_file()
1396            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1397                zipfp.extractall()
1398                for fpath, fdata in SMALL_TEST_DATA:
1399                    outfile = os.path.join(os.getcwd(), fpath)
1400
1401                    with open(outfile, "rb") as f:
1402                        self.assertEqual(fdata.encode(), f.read())
1403
1404                    unlink(outfile)
1405
1406    def _test_extract_all_with_target(self, target):
1407        self.make_test_file()
1408        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1409            zipfp.extractall(target)
1410            for fpath, fdata in SMALL_TEST_DATA:
1411                outfile = os.path.join(target, fpath)
1412
1413                with open(outfile, "rb") as f:
1414                    self.assertEqual(fdata.encode(), f.read())
1415
1416                unlink(outfile)
1417
1418        unlink(TESTFN2)
1419
1420    def test_extract_all_with_target(self):
1421        with temp_dir() as extdir:
1422            self._test_extract_all_with_target(extdir)
1423
1424    def test_extract_all_with_target_pathlike(self):
1425        with temp_dir() as extdir:
1426            self._test_extract_all_with_target(pathlib.Path(extdir))
1427
1428    def check_file(self, filename, content):
1429        self.assertTrue(os.path.isfile(filename))
1430        with open(filename, 'rb') as f:
1431            self.assertEqual(f.read(), content)
1432
1433    def test_sanitize_windows_name(self):
1434        san = zipfile.ZipFile._sanitize_windows_name
1435        # Passing pathsep in allows this test to work regardless of platform.
1436        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1437        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1438        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1439
1440    def test_extract_hackers_arcnames_common_cases(self):
1441        common_hacknames = [
1442            ('../foo/bar', 'foo/bar'),
1443            ('foo/../bar', 'foo/bar'),
1444            ('foo/../../bar', 'foo/bar'),
1445            ('foo/bar/..', 'foo/bar'),
1446            ('./../foo/bar', 'foo/bar'),
1447            ('/foo/bar', 'foo/bar'),
1448            ('/foo/../bar', 'foo/bar'),
1449            ('/foo/../../bar', 'foo/bar'),
1450        ]
1451        self._test_extract_hackers_arcnames(common_hacknames)
1452
1453    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1454    def test_extract_hackers_arcnames_windows_only(self):
1455        """Test combination of path fixing and windows name sanitization."""
1456        windows_hacknames = [
1457            (r'..\foo\bar', 'foo/bar'),
1458            (r'..\/foo\/bar', 'foo/bar'),
1459            (r'foo/\..\/bar', 'foo/bar'),
1460            (r'foo\/../\bar', 'foo/bar'),
1461            (r'C:foo/bar', 'foo/bar'),
1462            (r'C:/foo/bar', 'foo/bar'),
1463            (r'C://foo/bar', 'foo/bar'),
1464            (r'C:\foo\bar', 'foo/bar'),
1465            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1466            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1467            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1468            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1469            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1470            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1471            (r'//?/C:/foo/bar', 'foo/bar'),
1472            (r'\\?\C:\foo\bar', 'foo/bar'),
1473            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1474            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1475            ('../../foo../../ba..r', 'foo/ba..r'),
1476        ]
1477        self._test_extract_hackers_arcnames(windows_hacknames)
1478
1479    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1480    def test_extract_hackers_arcnames_posix_only(self):
1481        posix_hacknames = [
1482            ('//foo/bar', 'foo/bar'),
1483            ('../../foo../../ba..r', 'foo../ba..r'),
1484            (r'foo/..\bar', r'foo/..\bar'),
1485        ]
1486        self._test_extract_hackers_arcnames(posix_hacknames)
1487
1488    def _test_extract_hackers_arcnames(self, hacknames):
1489        for arcname, fixedname in hacknames:
1490            content = b'foobar' + arcname.encode()
1491            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1492                zinfo = zipfile.ZipInfo()
1493                # preserve backslashes
1494                zinfo.filename = arcname
1495                zinfo.external_attr = 0o600 << 16
1496                zipfp.writestr(zinfo, content)
1497
1498            arcname = arcname.replace(os.sep, "/")
1499            targetpath = os.path.join('target', 'subdir', 'subsub')
1500            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1501
1502            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1503                writtenfile = zipfp.extract(arcname, targetpath)
1504                self.assertEqual(writtenfile, correctfile,
1505                                 msg='extract %r: %r != %r' %
1506                                 (arcname, writtenfile, correctfile))
1507            self.check_file(correctfile, content)
1508            rmtree('target')
1509
1510            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1511                zipfp.extractall(targetpath)
1512            self.check_file(correctfile, content)
1513            rmtree('target')
1514
1515            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1516
1517            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1518                writtenfile = zipfp.extract(arcname)
1519                self.assertEqual(writtenfile, correctfile,
1520                                 msg="extract %r" % arcname)
1521            self.check_file(correctfile, content)
1522            rmtree(fixedname.split('/')[0])
1523
1524            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1525                zipfp.extractall()
1526            self.check_file(correctfile, content)
1527            rmtree(fixedname.split('/')[0])
1528
1529            unlink(TESTFN2)
1530
1531
1532class OtherTests(unittest.TestCase):
1533    def test_open_via_zip_info(self):
1534        # Create the ZIP archive
1535        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1536            zipfp.writestr("name", "foo")
1537            with self.assertWarns(UserWarning):
1538                zipfp.writestr("name", "bar")
1539            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1540
1541        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1542            infos = zipfp.infolist()
1543            data = b""
1544            for info in infos:
1545                with zipfp.open(info) as zipopen:
1546                    data += zipopen.read()
1547            self.assertIn(data, {b"foobar", b"barfoo"})
1548            data = b""
1549            for info in infos:
1550                data += zipfp.read(info)
1551            self.assertIn(data, {b"foobar", b"barfoo"})
1552
1553    def test_writestr_extended_local_header_issue1202(self):
1554        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1555            for data in 'abcdefghijklmnop':
1556                zinfo = zipfile.ZipInfo(data)
1557                zinfo.flag_bits |= 0x08  # Include an extended local header.
1558                orig_zip.writestr(zinfo, data)
1559
1560    def test_close(self):
1561        """Check that the zipfile is closed after the 'with' block."""
1562        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1563            for fpath, fdata in SMALL_TEST_DATA:
1564                zipfp.writestr(fpath, fdata)
1565                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1566        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1567
1568        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1569            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1570        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1571
1572    def test_close_on_exception(self):
1573        """Check that the zipfile is closed if an exception is raised in the
1574        'with' block."""
1575        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1576            for fpath, fdata in SMALL_TEST_DATA:
1577                zipfp.writestr(fpath, fdata)
1578
1579        try:
1580            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1581                raise zipfile.BadZipFile()
1582        except zipfile.BadZipFile:
1583            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1584
1585    def test_unsupported_version(self):
1586        # File has an extract_version of 120
1587        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1588                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1589                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1590                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1591                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1592
1593        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1594                          io.BytesIO(data), 'r')
1595
1596    @requires_zlib()
1597    def test_read_unicode_filenames(self):
1598        # bug #10801
1599        fname = findfile('zip_cp437_header.zip')
1600        with zipfile.ZipFile(fname) as zipfp:
1601            for name in zipfp.namelist():
1602                zipfp.open(name).close()
1603
1604    def test_write_unicode_filenames(self):
1605        with zipfile.ZipFile(TESTFN, "w") as zf:
1606            zf.writestr("foo.txt", "Test for unicode filename")
1607            zf.writestr("\xf6.txt", "Test for unicode filename")
1608            self.assertIsInstance(zf.infolist()[0].filename, str)
1609
1610        with zipfile.ZipFile(TESTFN, "r") as zf:
1611            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1612            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1613
1614    def test_read_after_write_unicode_filenames(self):
1615        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1616            zipfp.writestr('приклад', b'sample')
1617            self.assertEqual(zipfp.read('приклад'), b'sample')
1618
1619    def test_exclusive_create_zip_file(self):
1620        """Test exclusive creating a new zipfile."""
1621        unlink(TESTFN2)
1622        filename = 'testfile.txt'
1623        content = b'hello, world. this is some content.'
1624        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1625            zipfp.writestr(filename, content)
1626        with self.assertRaises(FileExistsError):
1627            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1628        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1629            self.assertEqual(zipfp.namelist(), [filename])
1630            self.assertEqual(zipfp.read(filename), content)
1631
1632    def test_create_non_existent_file_for_append(self):
1633        if os.path.exists(TESTFN):
1634            os.unlink(TESTFN)
1635
1636        filename = 'testfile.txt'
1637        content = b'hello, world. this is some content.'
1638
1639        try:
1640            with zipfile.ZipFile(TESTFN, 'a') as zf:
1641                zf.writestr(filename, content)
1642        except OSError:
1643            self.fail('Could not append data to a non-existent zip file.')
1644
1645        self.assertTrue(os.path.exists(TESTFN))
1646
1647        with zipfile.ZipFile(TESTFN, 'r') as zf:
1648            self.assertEqual(zf.read(filename), content)
1649
1650    def test_close_erroneous_file(self):
1651        # This test checks that the ZipFile constructor closes the file object
1652        # it opens if there's an error in the file.  If it doesn't, the
1653        # traceback holds a reference to the ZipFile object and, indirectly,
1654        # the file object.
1655        # On Windows, this causes the os.unlink() call to fail because the
1656        # underlying file is still open.  This is SF bug #412214.
1657        #
1658        with open(TESTFN, "w", encoding="utf-8") as fp:
1659            fp.write("this is not a legal zip file\n")
1660        try:
1661            zf = zipfile.ZipFile(TESTFN)
1662        except zipfile.BadZipFile:
1663            pass
1664
1665    def test_is_zip_erroneous_file(self):
1666        """Check that is_zipfile() correctly identifies non-zip files."""
1667        # - passing a filename
1668        with open(TESTFN, "w", encoding='utf-8') as fp:
1669            fp.write("this is not a legal zip file\n")
1670        self.assertFalse(zipfile.is_zipfile(TESTFN))
1671        # - passing a path-like object
1672        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1673        # - passing a file object
1674        with open(TESTFN, "rb") as fp:
1675            self.assertFalse(zipfile.is_zipfile(fp))
1676        # - passing a file-like object
1677        fp = io.BytesIO()
1678        fp.write(b"this is not a legal zip file\n")
1679        self.assertFalse(zipfile.is_zipfile(fp))
1680        fp.seek(0, 0)
1681        self.assertFalse(zipfile.is_zipfile(fp))
1682
1683    def test_damaged_zipfile(self):
1684        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1685        # - Create a valid zip file
1686        fp = io.BytesIO()
1687        with zipfile.ZipFile(fp, mode="w") as zipf:
1688            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1689        zipfiledata = fp.getvalue()
1690
1691        # - Now create copies of it missing the last N bytes and make sure
1692        #   a BadZipFile exception is raised when we try to open it
1693        for N in range(len(zipfiledata)):
1694            fp = io.BytesIO(zipfiledata[:N])
1695            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1696
1697    def test_is_zip_valid_file(self):
1698        """Check that is_zipfile() correctly identifies zip files."""
1699        # - passing a filename
1700        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1701            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1702
1703        self.assertTrue(zipfile.is_zipfile(TESTFN))
1704        # - passing a file object
1705        with open(TESTFN, "rb") as fp:
1706            self.assertTrue(zipfile.is_zipfile(fp))
1707            fp.seek(0, 0)
1708            zip_contents = fp.read()
1709        # - passing a file-like object
1710        fp = io.BytesIO()
1711        fp.write(zip_contents)
1712        self.assertTrue(zipfile.is_zipfile(fp))
1713        fp.seek(0, 0)
1714        self.assertTrue(zipfile.is_zipfile(fp))
1715
1716    def test_non_existent_file_raises_OSError(self):
1717        # make sure we don't raise an AttributeError when a partially-constructed
1718        # ZipFile instance is finalized; this tests for regression on SF tracker
1719        # bug #403871.
1720
1721        # The bug we're testing for caused an AttributeError to be raised
1722        # when a ZipFile instance was created for a file that did not
1723        # exist; the .fp member was not initialized but was needed by the
1724        # __del__() method.  Since the AttributeError is in the __del__(),
1725        # it is ignored, but the user should be sufficiently annoyed by
1726        # the message on the output that regression will be noticed
1727        # quickly.
1728        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1729
1730    def test_empty_file_raises_BadZipFile(self):
1731        f = open(TESTFN, 'w', encoding='utf-8')
1732        f.close()
1733        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1734
1735        with open(TESTFN, 'w', encoding='utf-8') as fp:
1736            fp.write("short file")
1737        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1738
1739    def test_closed_zip_raises_ValueError(self):
1740        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1741        data = io.BytesIO()
1742        with zipfile.ZipFile(data, mode="w") as zipf:
1743            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1744
1745        # This is correct; calling .read on a closed ZipFile should raise
1746        # a ValueError, and so should calling .testzip.  An earlier
1747        # version of .testzip would swallow this exception (and any other)
1748        # and report that the first file in the archive was corrupt.
1749        self.assertRaises(ValueError, zipf.read, "foo.txt")
1750        self.assertRaises(ValueError, zipf.open, "foo.txt")
1751        self.assertRaises(ValueError, zipf.testzip)
1752        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1753        with open(TESTFN, 'w', encoding='utf-8') as f:
1754            f.write('zipfile test data')
1755        self.assertRaises(ValueError, zipf.write, TESTFN)
1756
1757    def test_bad_constructor_mode(self):
1758        """Check that bad modes passed to ZipFile constructor are caught."""
1759        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1760
1761    def test_bad_open_mode(self):
1762        """Check that bad modes passed to ZipFile.open are caught."""
1763        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1764            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1765
1766        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1767            # read the data to make sure the file is there
1768            zipf.read("foo.txt")
1769            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1770            # universal newlines support is removed
1771            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1772            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1773
1774    def test_read0(self):
1775        """Check that calling read(0) on a ZipExtFile object returns an empty
1776        string and doesn't advance file pointer."""
1777        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1778            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1779            # read the data to make sure the file is there
1780            with zipf.open("foo.txt") as f:
1781                for i in range(FIXEDTEST_SIZE):
1782                    self.assertEqual(f.read(0), b'')
1783
1784                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1785
1786    def test_open_non_existent_item(self):
1787        """Check that attempting to call open() for an item that doesn't
1788        exist in the archive raises a RuntimeError."""
1789        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1790            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1791
1792    def test_bad_compression_mode(self):
1793        """Check that bad compression methods passed to ZipFile.open are
1794        caught."""
1795        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1796
1797    def test_unsupported_compression(self):
1798        # data is declared as shrunk, but actually deflated
1799        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1800                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1801                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1802                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1803                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1804                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1805        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1806            self.assertRaises(NotImplementedError, zipf.open, 'x')
1807
1808    def test_null_byte_in_filename(self):
1809        """Check that a filename containing a null byte is properly
1810        terminated."""
1811        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1812            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1813            self.assertEqual(zipf.namelist(), ['foo.txt'])
1814
1815    def test_struct_sizes(self):
1816        """Check that ZIP internal structure sizes are calculated correctly."""
1817        self.assertEqual(zipfile.sizeEndCentDir, 22)
1818        self.assertEqual(zipfile.sizeCentralDir, 46)
1819        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1820        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1821
1822    def test_comments(self):
1823        """Check that comments on the archive are handled properly."""
1824
1825        # check default comment is empty
1826        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1827            self.assertEqual(zipf.comment, b'')
1828            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1829
1830        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1831            self.assertEqual(zipfr.comment, b'')
1832
1833        # check a simple short comment
1834        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1835        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1836            zipf.comment = comment
1837            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1838        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1839            self.assertEqual(zipf.comment, comment)
1840
1841        # check a comment of max length
1842        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1843        comment2 = comment2.encode("ascii")
1844        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1845            zipf.comment = comment2
1846            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1847
1848        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1849            self.assertEqual(zipfr.comment, comment2)
1850
1851        # check a comment that is too long is truncated
1852        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1853            with self.assertWarns(UserWarning):
1854                zipf.comment = comment2 + b'oops'
1855            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1856        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1857            self.assertEqual(zipfr.comment, comment2)
1858
1859        # check that comments are correctly modified in append mode
1860        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1861            zipf.comment = b"original comment"
1862            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1863        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1864            zipf.comment = b"an updated comment"
1865        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1866            self.assertEqual(zipf.comment, b"an updated comment")
1867
1868        # check that comments are correctly shortened in append mode
1869        # and the file is indeed truncated
1870        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1871            zipf.comment = b"original comment that's longer"
1872            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1873        original_zip_size = os.path.getsize(TESTFN)
1874        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1875            zipf.comment = b"shorter comment"
1876        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
1877        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1878            self.assertEqual(zipf.comment, b"shorter comment")
1879
1880    def test_unicode_comment(self):
1881        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1882            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1883            with self.assertRaises(TypeError):
1884                zipf.comment = "this is an error"
1885
1886    def test_change_comment_in_empty_archive(self):
1887        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1888            self.assertFalse(zipf.filelist)
1889            zipf.comment = b"this is a comment"
1890        with zipfile.ZipFile(TESTFN, "r") as zipf:
1891            self.assertEqual(zipf.comment, b"this is a comment")
1892
1893    def test_change_comment_in_nonempty_archive(self):
1894        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1895            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1896        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1897            self.assertTrue(zipf.filelist)
1898            zipf.comment = b"this is a comment"
1899        with zipfile.ZipFile(TESTFN, "r") as zipf:
1900            self.assertEqual(zipf.comment, b"this is a comment")
1901
1902    def test_empty_zipfile(self):
1903        # Check that creating a file in 'w' or 'a' mode and closing without
1904        # adding any files to the archives creates a valid empty ZIP file
1905        zipf = zipfile.ZipFile(TESTFN, mode="w")
1906        zipf.close()
1907        try:
1908            zipf = zipfile.ZipFile(TESTFN, mode="r")
1909        except zipfile.BadZipFile:
1910            self.fail("Unable to create empty ZIP file in 'w' mode")
1911
1912        zipf = zipfile.ZipFile(TESTFN, mode="a")
1913        zipf.close()
1914        try:
1915            zipf = zipfile.ZipFile(TESTFN, mode="r")
1916        except:
1917            self.fail("Unable to create empty ZIP file in 'a' mode")
1918
1919    def test_open_empty_file(self):
1920        # Issue 1710703: Check that opening a file with less than 22 bytes
1921        # raises a BadZipFile exception (rather than the previously unhelpful
1922        # OSError)
1923        f = open(TESTFN, 'w', encoding='utf-8')
1924        f.close()
1925        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1926
1927    def test_create_zipinfo_before_1980(self):
1928        self.assertRaises(ValueError,
1929                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1930
1931    def test_create_empty_zipinfo_repr(self):
1932        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
1933        zi = zipfile.ZipInfo(filename="empty")
1934        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
1935
1936    def test_create_empty_zipinfo_default_attributes(self):
1937        """Ensure all required attributes are set."""
1938        zi = zipfile.ZipInfo()
1939        self.assertEqual(zi.orig_filename, "NoName")
1940        self.assertEqual(zi.filename, "NoName")
1941        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
1942        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
1943        self.assertEqual(zi.comment, b"")
1944        self.assertEqual(zi.extra, b"")
1945        self.assertIn(zi.create_system, (0, 3))
1946        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
1947        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
1948        self.assertEqual(zi.reserved, 0)
1949        self.assertEqual(zi.flag_bits, 0)
1950        self.assertEqual(zi.volume, 0)
1951        self.assertEqual(zi.internal_attr, 0)
1952        self.assertEqual(zi.external_attr, 0)
1953
1954        # Before bpo-26185, both were missing
1955        self.assertEqual(zi.file_size, 0)
1956        self.assertEqual(zi.compress_size, 0)
1957
1958    def test_zipfile_with_short_extra_field(self):
1959        """If an extra field in the header is less than 4 bytes, skip it."""
1960        zipdata = (
1961            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1962            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1963            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1964            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1965            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1966            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1967            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1968        )
1969        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1970            # testzip returns the name of the first corrupt file, or None
1971            self.assertIsNone(zipf.testzip())
1972
1973    def test_open_conflicting_handles(self):
1974        # It's only possible to open one writable file handle at a time
1975        msg1 = b"It's fun to charter an accountant!"
1976        msg2 = b"And sail the wide accountant sea"
1977        msg3 = b"To find, explore the funds offshore"
1978        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1979            with zipf.open('foo', mode='w') as w2:
1980                w2.write(msg1)
1981            with zipf.open('bar', mode='w') as w1:
1982                with self.assertRaises(ValueError):
1983                    zipf.open('handle', mode='w')
1984                with self.assertRaises(ValueError):
1985                    zipf.open('foo', mode='r')
1986                with self.assertRaises(ValueError):
1987                    zipf.writestr('str', 'abcde')
1988                with self.assertRaises(ValueError):
1989                    zipf.write(__file__, 'file')
1990                with self.assertRaises(ValueError):
1991                    zipf.close()
1992                w1.write(msg2)
1993            with zipf.open('baz', mode='w') as w2:
1994                w2.write(msg3)
1995
1996        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1997            self.assertEqual(zipf.read('foo'), msg1)
1998            self.assertEqual(zipf.read('bar'), msg2)
1999            self.assertEqual(zipf.read('baz'), msg3)
2000            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
2001
2002    def test_seek_tell(self):
2003        # Test seek functionality
2004        txt = b"Where's Bruce?"
2005        bloc = txt.find(b"Bruce")
2006        # Check seek on a file
2007        with zipfile.ZipFile(TESTFN, "w") as zipf:
2008            zipf.writestr("foo.txt", txt)
2009        with zipfile.ZipFile(TESTFN, "r") as zipf:
2010            with zipf.open("foo.txt", "r") as fp:
2011                fp.seek(bloc, os.SEEK_SET)
2012                self.assertEqual(fp.tell(), bloc)
2013                fp.seek(-bloc, os.SEEK_CUR)
2014                self.assertEqual(fp.tell(), 0)
2015                fp.seek(bloc, os.SEEK_CUR)
2016                self.assertEqual(fp.tell(), bloc)
2017                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2018                fp.seek(0, os.SEEK_END)
2019                self.assertEqual(fp.tell(), len(txt))
2020                fp.seek(0, os.SEEK_SET)
2021                self.assertEqual(fp.tell(), 0)
2022        # Check seek on memory file
2023        data = io.BytesIO()
2024        with zipfile.ZipFile(data, mode="w") as zipf:
2025            zipf.writestr("foo.txt", txt)
2026        with zipfile.ZipFile(data, mode="r") as zipf:
2027            with zipf.open("foo.txt", "r") as fp:
2028                fp.seek(bloc, os.SEEK_SET)
2029                self.assertEqual(fp.tell(), bloc)
2030                fp.seek(-bloc, os.SEEK_CUR)
2031                self.assertEqual(fp.tell(), 0)
2032                fp.seek(bloc, os.SEEK_CUR)
2033                self.assertEqual(fp.tell(), bloc)
2034                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2035                fp.seek(0, os.SEEK_END)
2036                self.assertEqual(fp.tell(), len(txt))
2037                fp.seek(0, os.SEEK_SET)
2038                self.assertEqual(fp.tell(), 0)
2039
2040    @requires_bz2()
2041    def test_decompress_without_3rd_party_library(self):
2042        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2043        zip_file = io.BytesIO(data)
2044        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2045            zf.writestr('a.txt', b'a')
2046        with mock.patch('zipfile.bz2', None):
2047            with zipfile.ZipFile(zip_file) as zf:
2048                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2049
2050    def tearDown(self):
2051        unlink(TESTFN)
2052        unlink(TESTFN2)
2053
2054
2055class AbstractBadCrcTests:
2056    def test_testzip_with_bad_crc(self):
2057        """Tests that files with bad CRCs return their name from testzip."""
2058        zipdata = self.zip_with_bad_crc
2059
2060        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2061            # testzip returns the name of the first corrupt file, or None
2062            self.assertEqual('afile', zipf.testzip())
2063
2064    def test_read_with_bad_crc(self):
2065        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2066        zipdata = self.zip_with_bad_crc
2067
2068        # Using ZipFile.read()
2069        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2070            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2071
2072        # Using ZipExtFile.read()
2073        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2074            with zipf.open('afile', 'r') as corrupt_file:
2075                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2076
2077        # Same with small reads (in order to exercise the buffering logic)
2078        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2079            with zipf.open('afile', 'r') as corrupt_file:
2080                corrupt_file.MIN_READ_SIZE = 2
2081                with self.assertRaises(zipfile.BadZipFile):
2082                    while corrupt_file.read(2):
2083                        pass
2084
2085
2086class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2087    compression = zipfile.ZIP_STORED
2088    zip_with_bad_crc = (
2089        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2090        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2091        b'ilehello,AworldP'
2092        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2093        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2094        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2095        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2096        b'\0\0/\0\0\0\0\0')
2097
2098@requires_zlib()
2099class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2100    compression = zipfile.ZIP_DEFLATED
2101    zip_with_bad_crc = (
2102        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2103        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2104        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2105        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2106        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2107        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2108        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2109        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2110
2111@requires_bz2()
2112class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2113    compression = zipfile.ZIP_BZIP2
2114    zip_with_bad_crc = (
2115        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2116        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2117        b'ileBZh91AY&SY\xd4\xa8\xca'
2118        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2119        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2120        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2121        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2122        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2123        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2124        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2125        b'\x00\x00\x00\x00')
2126
2127@requires_lzma()
2128class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2129    compression = zipfile.ZIP_LZMA
2130    zip_with_bad_crc = (
2131        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2132        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2133        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2134        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2135        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2136        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2137        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2138        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2139        b'\x00>\x00\x00\x00\x00\x00')
2140
2141
2142class DecryptionTests(unittest.TestCase):
2143    """Check that ZIP decryption works. Since the library does not
2144    support encryption at the moment, we use a pre-generated encrypted
2145    ZIP file."""
2146
2147    data = (
2148        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2149        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2150        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2151        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2152        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2153        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2154        b'\x00\x00L\x00\x00\x00\x00\x00' )
2155    data2 = (
2156        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2157        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2158        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2159        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2160        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2161        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2162        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2163        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2164
2165    plain = b'zipfile.py encryption test'
2166    plain2 = b'\x00'*512
2167
2168    def setUp(self):
2169        with open(TESTFN, "wb") as fp:
2170            fp.write(self.data)
2171        self.zip = zipfile.ZipFile(TESTFN, "r")
2172        with open(TESTFN2, "wb") as fp:
2173            fp.write(self.data2)
2174        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2175
2176    def tearDown(self):
2177        self.zip.close()
2178        os.unlink(TESTFN)
2179        self.zip2.close()
2180        os.unlink(TESTFN2)
2181
2182    def test_no_password(self):
2183        # Reading the encrypted file without password
2184        # must generate a RunTime exception
2185        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2186        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2187
2188    def test_bad_password(self):
2189        self.zip.setpassword(b"perl")
2190        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2191        self.zip2.setpassword(b"perl")
2192        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2193
2194    @requires_zlib()
2195    def test_good_password(self):
2196        self.zip.setpassword(b"python")
2197        self.assertEqual(self.zip.read("test.txt"), self.plain)
2198        self.zip2.setpassword(b"12345")
2199        self.assertEqual(self.zip2.read("zero"), self.plain2)
2200
2201    def test_unicode_password(self):
2202        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2203        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2204        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2205        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2206
2207    def test_seek_tell(self):
2208        self.zip.setpassword(b"python")
2209        txt = self.plain
2210        test_word = b'encryption'
2211        bloc = txt.find(test_word)
2212        bloc_len = len(test_word)
2213        with self.zip.open("test.txt", "r") as fp:
2214            fp.seek(bloc, os.SEEK_SET)
2215            self.assertEqual(fp.tell(), bloc)
2216            fp.seek(-bloc, os.SEEK_CUR)
2217            self.assertEqual(fp.tell(), 0)
2218            fp.seek(bloc, os.SEEK_CUR)
2219            self.assertEqual(fp.tell(), bloc)
2220            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2221
2222            # Make sure that the second read after seeking back beyond
2223            # _readbuffer returns the same content (ie. rewind to the start of
2224            # the file to read forward to the required position).
2225            old_read_size = fp.MIN_READ_SIZE
2226            fp.MIN_READ_SIZE = 1
2227            fp._readbuffer = b''
2228            fp._offset = 0
2229            fp.seek(0, os.SEEK_SET)
2230            self.assertEqual(fp.tell(), 0)
2231            fp.seek(bloc, os.SEEK_CUR)
2232            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2233            fp.MIN_READ_SIZE = old_read_size
2234
2235            fp.seek(0, os.SEEK_END)
2236            self.assertEqual(fp.tell(), len(txt))
2237            fp.seek(0, os.SEEK_SET)
2238            self.assertEqual(fp.tell(), 0)
2239
2240            # Read the file completely to definitely call any eof integrity
2241            # checks (crc) and make sure they still pass.
2242            fp.read()
2243
2244
2245class AbstractTestsWithRandomBinaryFiles:
2246    @classmethod
2247    def setUpClass(cls):
2248        datacount = randint(16, 64)*1024 + randint(1, 1024)
2249        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2250                            for i in range(datacount))
2251
2252    def setUp(self):
2253        # Make a source file with some lines
2254        with open(TESTFN, "wb") as fp:
2255            fp.write(self.data)
2256
2257    def tearDown(self):
2258        unlink(TESTFN)
2259        unlink(TESTFN2)
2260
2261    def make_test_archive(self, f, compression):
2262        # Create the ZIP archive
2263        with zipfile.ZipFile(f, "w", compression) as zipfp:
2264            zipfp.write(TESTFN, "another.name")
2265            zipfp.write(TESTFN, TESTFN)
2266
2267    def zip_test(self, f, compression):
2268        self.make_test_archive(f, compression)
2269
2270        # Read the ZIP archive
2271        with zipfile.ZipFile(f, "r", compression) as zipfp:
2272            testdata = zipfp.read(TESTFN)
2273            self.assertEqual(len(testdata), len(self.data))
2274            self.assertEqual(testdata, self.data)
2275            self.assertEqual(zipfp.read("another.name"), self.data)
2276
2277    def test_read(self):
2278        for f in get_files(self):
2279            self.zip_test(f, self.compression)
2280
2281    def zip_open_test(self, f, compression):
2282        self.make_test_archive(f, compression)
2283
2284        # Read the ZIP archive
2285        with zipfile.ZipFile(f, "r", compression) as zipfp:
2286            zipdata1 = []
2287            with zipfp.open(TESTFN) as zipopen1:
2288                while True:
2289                    read_data = zipopen1.read(256)
2290                    if not read_data:
2291                        break
2292                    zipdata1.append(read_data)
2293
2294            zipdata2 = []
2295            with zipfp.open("another.name") as zipopen2:
2296                while True:
2297                    read_data = zipopen2.read(256)
2298                    if not read_data:
2299                        break
2300                    zipdata2.append(read_data)
2301
2302            testdata1 = b''.join(zipdata1)
2303            self.assertEqual(len(testdata1), len(self.data))
2304            self.assertEqual(testdata1, self.data)
2305
2306            testdata2 = b''.join(zipdata2)
2307            self.assertEqual(len(testdata2), len(self.data))
2308            self.assertEqual(testdata2, self.data)
2309
2310    def test_open(self):
2311        for f in get_files(self):
2312            self.zip_open_test(f, self.compression)
2313
2314    def zip_random_open_test(self, f, compression):
2315        self.make_test_archive(f, compression)
2316
2317        # Read the ZIP archive
2318        with zipfile.ZipFile(f, "r", compression) as zipfp:
2319            zipdata1 = []
2320            with zipfp.open(TESTFN) as zipopen1:
2321                while True:
2322                    read_data = zipopen1.read(randint(1, 1024))
2323                    if not read_data:
2324                        break
2325                    zipdata1.append(read_data)
2326
2327            testdata = b''.join(zipdata1)
2328            self.assertEqual(len(testdata), len(self.data))
2329            self.assertEqual(testdata, self.data)
2330
2331    def test_random_open(self):
2332        for f in get_files(self):
2333            self.zip_random_open_test(f, self.compression)
2334
2335
2336class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2337                                       unittest.TestCase):
2338    compression = zipfile.ZIP_STORED
2339
2340@requires_zlib()
2341class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2342                                        unittest.TestCase):
2343    compression = zipfile.ZIP_DEFLATED
2344
2345@requires_bz2()
2346class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2347                                      unittest.TestCase):
2348    compression = zipfile.ZIP_BZIP2
2349
2350@requires_lzma()
2351class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2352                                     unittest.TestCase):
2353    compression = zipfile.ZIP_LZMA
2354
2355
2356# Provide the tell() method but not seek()
2357class Tellable:
2358    def __init__(self, fp):
2359        self.fp = fp
2360        self.offset = 0
2361
2362    def write(self, data):
2363        n = self.fp.write(data)
2364        self.offset += n
2365        return n
2366
2367    def tell(self):
2368        return self.offset
2369
2370    def flush(self):
2371        self.fp.flush()
2372
2373class Unseekable:
2374    def __init__(self, fp):
2375        self.fp = fp
2376
2377    def write(self, data):
2378        return self.fp.write(data)
2379
2380    def flush(self):
2381        self.fp.flush()
2382
2383class UnseekableTests(unittest.TestCase):
2384    def test_writestr(self):
2385        for wrapper in (lambda f: f), Tellable, Unseekable:
2386            with self.subTest(wrapper=wrapper):
2387                f = io.BytesIO()
2388                f.write(b'abc')
2389                bf = io.BufferedWriter(f)
2390                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2391                    zipfp.writestr('ones', b'111')
2392                    zipfp.writestr('twos', b'222')
2393                self.assertEqual(f.getvalue()[:5], b'abcPK')
2394                with zipfile.ZipFile(f, mode='r') as zipf:
2395                    with zipf.open('ones') as zopen:
2396                        self.assertEqual(zopen.read(), b'111')
2397                    with zipf.open('twos') as zopen:
2398                        self.assertEqual(zopen.read(), b'222')
2399
2400    def test_write(self):
2401        for wrapper in (lambda f: f), Tellable, Unseekable:
2402            with self.subTest(wrapper=wrapper):
2403                f = io.BytesIO()
2404                f.write(b'abc')
2405                bf = io.BufferedWriter(f)
2406                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2407                    self.addCleanup(unlink, TESTFN)
2408                    with open(TESTFN, 'wb') as f2:
2409                        f2.write(b'111')
2410                    zipfp.write(TESTFN, 'ones')
2411                    with open(TESTFN, 'wb') as f2:
2412                        f2.write(b'222')
2413                    zipfp.write(TESTFN, 'twos')
2414                self.assertEqual(f.getvalue()[:5], b'abcPK')
2415                with zipfile.ZipFile(f, mode='r') as zipf:
2416                    with zipf.open('ones') as zopen:
2417                        self.assertEqual(zopen.read(), b'111')
2418                    with zipf.open('twos') as zopen:
2419                        self.assertEqual(zopen.read(), b'222')
2420
2421    def test_open_write(self):
2422        for wrapper in (lambda f: f), Tellable, Unseekable:
2423            with self.subTest(wrapper=wrapper):
2424                f = io.BytesIO()
2425                f.write(b'abc')
2426                bf = io.BufferedWriter(f)
2427                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2428                    with zipf.open('ones', 'w') as zopen:
2429                        zopen.write(b'111')
2430                    with zipf.open('twos', 'w') as zopen:
2431                        zopen.write(b'222')
2432                self.assertEqual(f.getvalue()[:5], b'abcPK')
2433                with zipfile.ZipFile(f) as zipf:
2434                    self.assertEqual(zipf.read('ones'), b'111')
2435                    self.assertEqual(zipf.read('twos'), b'222')
2436
2437
2438@requires_zlib()
2439class TestsWithMultipleOpens(unittest.TestCase):
2440    @classmethod
2441    def setUpClass(cls):
2442        cls.data1 = b'111' + randbytes(10000)
2443        cls.data2 = b'222' + randbytes(10000)
2444
2445    def make_test_archive(self, f):
2446        # Create the ZIP archive
2447        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2448            zipfp.writestr('ones', self.data1)
2449            zipfp.writestr('twos', self.data2)
2450
2451    def test_same_file(self):
2452        # Verify that (when the ZipFile is in control of creating file objects)
2453        # multiple open() calls can be made without interfering with each other.
2454        for f in get_files(self):
2455            self.make_test_archive(f)
2456            with zipfile.ZipFile(f, mode="r") as zipf:
2457                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2458                    data1 = zopen1.read(500)
2459                    data2 = zopen2.read(500)
2460                    data1 += zopen1.read()
2461                    data2 += zopen2.read()
2462                self.assertEqual(data1, data2)
2463                self.assertEqual(data1, self.data1)
2464
2465    def test_different_file(self):
2466        # Verify that (when the ZipFile is in control of creating file objects)
2467        # multiple open() calls can be made without interfering with each other.
2468        for f in get_files(self):
2469            self.make_test_archive(f)
2470            with zipfile.ZipFile(f, mode="r") as zipf:
2471                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2472                    data1 = zopen1.read(500)
2473                    data2 = zopen2.read(500)
2474                    data1 += zopen1.read()
2475                    data2 += zopen2.read()
2476                self.assertEqual(data1, self.data1)
2477                self.assertEqual(data2, self.data2)
2478
2479    def test_interleaved(self):
2480        # Verify that (when the ZipFile is in control of creating file objects)
2481        # multiple open() calls can be made without interfering with each other.
2482        for f in get_files(self):
2483            self.make_test_archive(f)
2484            with zipfile.ZipFile(f, mode="r") as zipf:
2485                with zipf.open('ones') as zopen1:
2486                    data1 = zopen1.read(500)
2487                    with zipf.open('twos') as zopen2:
2488                        data2 = zopen2.read(500)
2489                        data1 += zopen1.read()
2490                        data2 += zopen2.read()
2491                self.assertEqual(data1, self.data1)
2492                self.assertEqual(data2, self.data2)
2493
2494    def test_read_after_close(self):
2495        for f in get_files(self):
2496            self.make_test_archive(f)
2497            with contextlib.ExitStack() as stack:
2498                with zipfile.ZipFile(f, 'r') as zipf:
2499                    zopen1 = stack.enter_context(zipf.open('ones'))
2500                    zopen2 = stack.enter_context(zipf.open('twos'))
2501                data1 = zopen1.read(500)
2502                data2 = zopen2.read(500)
2503                data1 += zopen1.read()
2504                data2 += zopen2.read()
2505            self.assertEqual(data1, self.data1)
2506            self.assertEqual(data2, self.data2)
2507
2508    def test_read_after_write(self):
2509        for f in get_files(self):
2510            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2511                zipf.writestr('ones', self.data1)
2512                zipf.writestr('twos', self.data2)
2513                with zipf.open('ones') as zopen1:
2514                    data1 = zopen1.read(500)
2515            self.assertEqual(data1, self.data1[:500])
2516            with zipfile.ZipFile(f, 'r') as zipf:
2517                data1 = zipf.read('ones')
2518                data2 = zipf.read('twos')
2519            self.assertEqual(data1, self.data1)
2520            self.assertEqual(data2, self.data2)
2521
2522    def test_write_after_read(self):
2523        for f in get_files(self):
2524            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2525                zipf.writestr('ones', self.data1)
2526                with zipf.open('ones') as zopen1:
2527                    zopen1.read(500)
2528                    zipf.writestr('twos', self.data2)
2529            with zipfile.ZipFile(f, 'r') as zipf:
2530                data1 = zipf.read('ones')
2531                data2 = zipf.read('twos')
2532            self.assertEqual(data1, self.data1)
2533            self.assertEqual(data2, self.data2)
2534
2535    def test_many_opens(self):
2536        # Verify that read() and open() promptly close the file descriptor,
2537        # and don't rely on the garbage collector to free resources.
2538        self.make_test_archive(TESTFN2)
2539        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2540            for x in range(100):
2541                zipf.read('ones')
2542                with zipf.open('ones') as zopen1:
2543                    pass
2544        with open(os.devnull, "rb") as f:
2545            self.assertLess(f.fileno(), 100)
2546
2547    def test_write_while_reading(self):
2548        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2549            zipf.writestr('ones', self.data1)
2550        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2551            with zipf.open('ones', 'r') as r1:
2552                data1 = r1.read(500)
2553                with zipf.open('twos', 'w') as w1:
2554                    w1.write(self.data2)
2555                data1 += r1.read()
2556        self.assertEqual(data1, self.data1)
2557        with zipfile.ZipFile(TESTFN2) as zipf:
2558            self.assertEqual(zipf.read('twos'), self.data2)
2559
2560    def tearDown(self):
2561        unlink(TESTFN2)
2562
2563
2564class TestWithDirectory(unittest.TestCase):
2565    def setUp(self):
2566        os.mkdir(TESTFN2)
2567
2568    def test_extract_dir(self):
2569        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2570            zipf.extractall(TESTFN2)
2571        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2572        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2573        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2574
2575    def test_bug_6050(self):
2576        # Extraction should succeed if directories already exist
2577        os.mkdir(os.path.join(TESTFN2, "a"))
2578        self.test_extract_dir()
2579
2580    def test_write_dir(self):
2581        dirpath = os.path.join(TESTFN2, "x")
2582        os.mkdir(dirpath)
2583        mode = os.stat(dirpath).st_mode & 0xFFFF
2584        with zipfile.ZipFile(TESTFN, "w") as zipf:
2585            zipf.write(dirpath)
2586            zinfo = zipf.filelist[0]
2587            self.assertTrue(zinfo.filename.endswith("/x/"))
2588            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2589            zipf.write(dirpath, "y")
2590            zinfo = zipf.filelist[1]
2591            self.assertTrue(zinfo.filename, "y/")
2592            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2593        with zipfile.ZipFile(TESTFN, "r") as zipf:
2594            zinfo = zipf.filelist[0]
2595            self.assertTrue(zinfo.filename.endswith("/x/"))
2596            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2597            zinfo = zipf.filelist[1]
2598            self.assertTrue(zinfo.filename, "y/")
2599            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2600            target = os.path.join(TESTFN2, "target")
2601            os.mkdir(target)
2602            zipf.extractall(target)
2603            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2604            self.assertEqual(len(os.listdir(target)), 2)
2605
2606    def test_writestr_dir(self):
2607        os.mkdir(os.path.join(TESTFN2, "x"))
2608        with zipfile.ZipFile(TESTFN, "w") as zipf:
2609            zipf.writestr("x/", b'')
2610            zinfo = zipf.filelist[0]
2611            self.assertEqual(zinfo.filename, "x/")
2612            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2613        with zipfile.ZipFile(TESTFN, "r") as zipf:
2614            zinfo = zipf.filelist[0]
2615            self.assertTrue(zinfo.filename.endswith("x/"))
2616            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2617            target = os.path.join(TESTFN2, "target")
2618            os.mkdir(target)
2619            zipf.extractall(target)
2620            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2621            self.assertEqual(os.listdir(target), ["x"])
2622
2623    def tearDown(self):
2624        rmtree(TESTFN2)
2625        if os.path.exists(TESTFN):
2626            unlink(TESTFN)
2627
2628
2629class ZipInfoTests(unittest.TestCase):
2630    def test_from_file(self):
2631        zi = zipfile.ZipInfo.from_file(__file__)
2632        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2633        self.assertFalse(zi.is_dir())
2634        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2635
2636    def test_from_file_pathlike(self):
2637        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2638        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2639        self.assertFalse(zi.is_dir())
2640        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2641
2642    def test_from_file_bytes(self):
2643        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2644        self.assertEqual(posixpath.basename(zi.filename), 'test')
2645        self.assertFalse(zi.is_dir())
2646        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2647
2648    def test_from_file_fileno(self):
2649        with open(__file__, 'rb') as f:
2650            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2651            self.assertEqual(posixpath.basename(zi.filename), 'test')
2652            self.assertFalse(zi.is_dir())
2653            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2654
2655    def test_from_dir(self):
2656        dirpath = os.path.dirname(os.path.abspath(__file__))
2657        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2658        self.assertEqual(zi.filename, 'stdlib_tests/')
2659        self.assertTrue(zi.is_dir())
2660        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2661        self.assertEqual(zi.file_size, 0)
2662
2663
2664class CommandLineTest(unittest.TestCase):
2665
2666    def zipfilecmd(self, *args, **kwargs):
2667        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2668                                                      **kwargs)
2669        return out.replace(os.linesep.encode(), b'\n')
2670
2671    def zipfilecmd_failure(self, *args):
2672        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2673
2674    def test_bad_use(self):
2675        rc, out, err = self.zipfilecmd_failure()
2676        self.assertEqual(out, b'')
2677        self.assertIn(b'usage', err.lower())
2678        self.assertIn(b'error', err.lower())
2679        self.assertIn(b'required', err.lower())
2680        rc, out, err = self.zipfilecmd_failure('-l', '')
2681        self.assertEqual(out, b'')
2682        self.assertNotEqual(err.strip(), b'')
2683
2684    def test_test_command(self):
2685        zip_name = findfile('zipdir.zip')
2686        for opt in '-t', '--test':
2687            out = self.zipfilecmd(opt, zip_name)
2688            self.assertEqual(out.rstrip(), b'Done testing')
2689        zip_name = findfile('testtar.tar')
2690        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2691        self.assertEqual(out, b'')
2692
2693    def test_list_command(self):
2694        zip_name = findfile('zipdir.zip')
2695        t = io.StringIO()
2696        with zipfile.ZipFile(zip_name, 'r') as tf:
2697            tf.printdir(t)
2698        expected = t.getvalue().encode('ascii', 'backslashreplace')
2699        for opt in '-l', '--list':
2700            out = self.zipfilecmd(opt, zip_name,
2701                                  PYTHONIOENCODING='ascii:backslashreplace')
2702            self.assertEqual(out, expected)
2703
2704    @requires_zlib()
2705    def test_create_command(self):
2706        self.addCleanup(unlink, TESTFN)
2707        with open(TESTFN, 'w', encoding='utf-8') as f:
2708            f.write('test 1')
2709        os.mkdir(TESTFNDIR)
2710        self.addCleanup(rmtree, TESTFNDIR)
2711        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f:
2712            f.write('test 2')
2713        files = [TESTFN, TESTFNDIR]
2714        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2715        for opt in '-c', '--create':
2716            try:
2717                out = self.zipfilecmd(opt, TESTFN2, *files)
2718                self.assertEqual(out, b'')
2719                with zipfile.ZipFile(TESTFN2) as zf:
2720                    self.assertEqual(zf.namelist(), namelist)
2721                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2722                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2723            finally:
2724                unlink(TESTFN2)
2725
2726    def test_extract_command(self):
2727        zip_name = findfile('zipdir.zip')
2728        for opt in '-e', '--extract':
2729            with temp_dir() as extdir:
2730                out = self.zipfilecmd(opt, zip_name, extdir)
2731                self.assertEqual(out, b'')
2732                with zipfile.ZipFile(zip_name) as zf:
2733                    for zi in zf.infolist():
2734                        path = os.path.join(extdir,
2735                                    zi.filename.replace('/', os.sep))
2736                        if zi.is_dir():
2737                            self.assertTrue(os.path.isdir(path))
2738                        else:
2739                            self.assertTrue(os.path.isfile(path))
2740                            with open(path, 'rb') as f:
2741                                self.assertEqual(f.read(), zf.read(zi))
2742
2743
2744class TestExecutablePrependedZip(unittest.TestCase):
2745    """Test our ability to open zip files with an executable prepended."""
2746
2747    def setUp(self):
2748        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2749        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2750
2751    def _test_zip_works(self, name):
2752        # bpo28494 sanity check: ensure is_zipfile works on these.
2753        self.assertTrue(zipfile.is_zipfile(name),
2754                        f'is_zipfile failed on {name}')
2755        # Ensure we can operate on these via ZipFile.
2756        with zipfile.ZipFile(name) as zipfp:
2757            for n in zipfp.namelist():
2758                data = zipfp.read(n)
2759                self.assertIn(b'FAVORITE_NUMBER', data)
2760
2761    def test_read_zip_with_exe_prepended(self):
2762        self._test_zip_works(self.exe_zip)
2763
2764    def test_read_zip64_with_exe_prepended(self):
2765        self._test_zip_works(self.exe_zip64)
2766
2767    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2768    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2769                         'Test relies on #!/bin/bash working.')
2770    def test_execute_zip2(self):
2771        output = subprocess.check_output([self.exe_zip, sys.executable])
2772        self.assertIn(b'number in executable: 5', output)
2773
2774    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2775    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2776                         'Test relies on #!/bin/bash working.')
2777    def test_execute_zip64(self):
2778        output = subprocess.check_output([self.exe_zip64, sys.executable])
2779        self.assertIn(b'number in executable: 5', output)
2780
2781
2782# Poor man's technique to consume a (smallish) iterable.
2783consume = tuple
2784
2785
2786# from jaraco.itertools 5.0
2787class jaraco:
2788    class itertools:
2789        class Counter:
2790            def __init__(self, i):
2791                self.count = 0
2792                self._orig_iter = iter(i)
2793
2794            def __iter__(self):
2795                return self
2796
2797            def __next__(self):
2798                result = next(self._orig_iter)
2799                self.count += 1
2800                return result
2801
2802
2803def add_dirs(zf):
2804    """
2805    Given a writable zip file zf, inject directory entries for
2806    any directories implied by the presence of children.
2807    """
2808    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2809        zf.writestr(name, b"")
2810    return zf
2811
2812
2813def build_alpharep_fixture():
2814    """
2815    Create a zip file with this structure:
2816
2817    .
2818    ├── a.txt
2819    ├── b
2820    │   ├── c.txt
2821    │   ├── d
2822    │   │   └── e.txt
2823    │   └── f.txt
2824    └── g
2825        └── h
2826            └── i.txt
2827
2828    This fixture has the following key characteristics:
2829
2830    - a file at the root (a)
2831    - a file two levels deep (b/d/e)
2832    - multiple files in a directory (b/c, b/f)
2833    - a directory containing only a directory (g/h)
2834
2835    "alpha" because it uses alphabet
2836    "rep" because it's a representative example
2837    """
2838    data = io.BytesIO()
2839    zf = zipfile.ZipFile(data, "w")
2840    zf.writestr("a.txt", b"content of a")
2841    zf.writestr("b/c.txt", b"content of c")
2842    zf.writestr("b/d/e.txt", b"content of e")
2843    zf.writestr("b/f.txt", b"content of f")
2844    zf.writestr("g/h/i.txt", b"content of i")
2845    zf.filename = "alpharep.zip"
2846    return zf
2847
2848
2849def pass_alpharep(meth):
2850    """
2851    Given a method, wrap it in a for loop that invokes method
2852    with each subtest.
2853    """
2854
2855    @functools.wraps(meth)
2856    def wrapper(self):
2857        for alpharep in self.zipfile_alpharep():
2858            meth(self, alpharep=alpharep)
2859
2860    return wrapper
2861
2862
2863class TestPath(unittest.TestCase):
2864    def setUp(self):
2865        self.fixtures = contextlib.ExitStack()
2866        self.addCleanup(self.fixtures.close)
2867
2868    def zipfile_alpharep(self):
2869        with self.subTest():
2870            yield build_alpharep_fixture()
2871        with self.subTest():
2872            yield add_dirs(build_alpharep_fixture())
2873
2874    def zipfile_ondisk(self, alpharep):
2875        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2876        buffer = alpharep.fp
2877        alpharep.close()
2878        path = tmpdir / alpharep.filename
2879        with path.open("wb") as strm:
2880            strm.write(buffer.getvalue())
2881        return path
2882
2883    @pass_alpharep
2884    def test_iterdir_and_types(self, alpharep):
2885        root = zipfile.Path(alpharep)
2886        assert root.is_dir()
2887        a, b, g = root.iterdir()
2888        assert a.is_file()
2889        assert b.is_dir()
2890        assert g.is_dir()
2891        c, f, d = b.iterdir()
2892        assert c.is_file() and f.is_file()
2893        (e,) = d.iterdir()
2894        assert e.is_file()
2895        (h,) = g.iterdir()
2896        (i,) = h.iterdir()
2897        assert i.is_file()
2898
2899    @pass_alpharep
2900    def test_is_file_missing(self, alpharep):
2901        root = zipfile.Path(alpharep)
2902        assert not root.joinpath('missing.txt').is_file()
2903
2904    @pass_alpharep
2905    def test_iterdir_on_file(self, alpharep):
2906        root = zipfile.Path(alpharep)
2907        a, b, g = root.iterdir()
2908        with self.assertRaises(ValueError):
2909            a.iterdir()
2910
2911    @pass_alpharep
2912    def test_subdir_is_dir(self, alpharep):
2913        root = zipfile.Path(alpharep)
2914        assert (root / 'b').is_dir()
2915        assert (root / 'b/').is_dir()
2916        assert (root / 'g').is_dir()
2917        assert (root / 'g/').is_dir()
2918
2919    @pass_alpharep
2920    def test_open(self, alpharep):
2921        root = zipfile.Path(alpharep)
2922        a, b, g = root.iterdir()
2923        with a.open(encoding="utf-8") as strm:
2924            data = strm.read()
2925        assert data == "content of a"
2926
2927    def test_open_write(self):
2928        """
2929        If the zipfile is open for write, it should be possible to
2930        write bytes or text to it.
2931        """
2932        zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
2933        with zf.joinpath('file.bin').open('wb') as strm:
2934            strm.write(b'binary contents')
2935        with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
2936            strm.write('text file')
2937
2938    def test_open_extant_directory(self):
2939        """
2940        Attempting to open a directory raises IsADirectoryError.
2941        """
2942        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2943        with self.assertRaises(IsADirectoryError):
2944            zf.joinpath('b').open()
2945
2946    @pass_alpharep
2947    def test_open_binary_invalid_args(self, alpharep):
2948        root = zipfile.Path(alpharep)
2949        with self.assertRaises(ValueError):
2950            root.joinpath('a.txt').open('rb', encoding='utf-8')
2951        with self.assertRaises(ValueError):
2952            root.joinpath('a.txt').open('rb', 'utf-8')
2953
2954    def test_open_missing_directory(self):
2955        """
2956        Attempting to open a missing directory raises FileNotFoundError.
2957        """
2958        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2959        with self.assertRaises(FileNotFoundError):
2960            zf.joinpath('z').open()
2961
2962    @pass_alpharep
2963    def test_read(self, alpharep):
2964        root = zipfile.Path(alpharep)
2965        a, b, g = root.iterdir()
2966        assert a.read_text(encoding="utf-8") == "content of a"
2967        assert a.read_bytes() == b"content of a"
2968
2969    @pass_alpharep
2970    def test_joinpath(self, alpharep):
2971        root = zipfile.Path(alpharep)
2972        a = root.joinpath("a.txt")
2973        assert a.is_file()
2974        e = root.joinpath("b").joinpath("d").joinpath("e.txt")
2975        assert e.read_text(encoding="utf-8") == "content of e"
2976
2977    @pass_alpharep
2978    def test_joinpath_multiple(self, alpharep):
2979        root = zipfile.Path(alpharep)
2980        e = root.joinpath("b", "d", "e.txt")
2981        assert e.read_text(encoding="utf-8") == "content of e"
2982
2983    @pass_alpharep
2984    def test_traverse_truediv(self, alpharep):
2985        root = zipfile.Path(alpharep)
2986        a = root / "a.txt"
2987        assert a.is_file()
2988        e = root / "b" / "d" / "e.txt"
2989        assert e.read_text(encoding="utf-8") == "content of e"
2990
2991    @pass_alpharep
2992    def test_traverse_simplediv(self, alpharep):
2993        """
2994        Disable the __future__.division when testing traversal.
2995        """
2996        code = compile(
2997            source="zipfile.Path(alpharep) / 'a'",
2998            filename="(test)",
2999            mode="eval",
3000            dont_inherit=True,
3001        )
3002        eval(code)
3003
3004    @pass_alpharep
3005    def test_pathlike_construction(self, alpharep):
3006        """
3007        zipfile.Path should be constructable from a path-like object
3008        """
3009        zipfile_ondisk = self.zipfile_ondisk(alpharep)
3010        pathlike = pathlib.Path(str(zipfile_ondisk))
3011        zipfile.Path(pathlike)
3012
3013    @pass_alpharep
3014    def test_traverse_pathlike(self, alpharep):
3015        root = zipfile.Path(alpharep)
3016        root / pathlib.Path("a")
3017
3018    @pass_alpharep
3019    def test_parent(self, alpharep):
3020        root = zipfile.Path(alpharep)
3021        assert (root / 'a').parent.at == ''
3022        assert (root / 'a' / 'b').parent.at == 'a/'
3023
3024    @pass_alpharep
3025    def test_dir_parent(self, alpharep):
3026        root = zipfile.Path(alpharep)
3027        assert (root / 'b').parent.at == ''
3028        assert (root / 'b/').parent.at == ''
3029
3030    @pass_alpharep
3031    def test_missing_dir_parent(self, alpharep):
3032        root = zipfile.Path(alpharep)
3033        assert (root / 'missing dir/').parent.at == ''
3034
3035    @pass_alpharep
3036    def test_mutability(self, alpharep):
3037        """
3038        If the underlying zipfile is changed, the Path object should
3039        reflect that change.
3040        """
3041        root = zipfile.Path(alpharep)
3042        a, b, g = root.iterdir()
3043        alpharep.writestr('foo.txt', 'foo')
3044        alpharep.writestr('bar/baz.txt', 'baz')
3045        assert any(child.name == 'foo.txt' for child in root.iterdir())
3046        assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
3047        (baz,) = (root / 'bar').iterdir()
3048        assert baz.read_text(encoding="utf-8") == 'baz'
3049
3050    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
3051
3052    def huge_zipfile(self):
3053        """Create a read-only zipfile with a huge number of entries entries."""
3054        strm = io.BytesIO()
3055        zf = zipfile.ZipFile(strm, "w")
3056        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
3057            zf.writestr(entry, entry)
3058        zf.mode = 'r'
3059        return zf
3060
3061    def test_joinpath_constant_time(self):
3062        """
3063        Ensure joinpath on items in zipfile is linear time.
3064        """
3065        root = zipfile.Path(self.huge_zipfile())
3066        entries = jaraco.itertools.Counter(root.iterdir())
3067        for entry in entries:
3068            entry.joinpath('suffix')
3069        # Check the file iterated all items
3070        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
3071
3072    # @func_timeout.func_set_timeout(3)
3073    def test_implied_dirs_performance(self):
3074        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
3075        zipfile.CompleteDirs._implied_dirs(data)
3076
3077    @pass_alpharep
3078    def test_read_does_not_close(self, alpharep):
3079        alpharep = self.zipfile_ondisk(alpharep)
3080        with zipfile.ZipFile(alpharep) as file:
3081            for rep in range(2):
3082                zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
3083
3084    @pass_alpharep
3085    def test_subclass(self, alpharep):
3086        class Subclass(zipfile.Path):
3087            pass
3088
3089        root = Subclass(alpharep)
3090        assert isinstance(root / 'b', Subclass)
3091
3092    @pass_alpharep
3093    def test_filename(self, alpharep):
3094        root = zipfile.Path(alpharep)
3095        assert root.filename == pathlib.Path('alpharep.zip')
3096
3097    @pass_alpharep
3098    def test_root_name(self, alpharep):
3099        """
3100        The name of the root should be the name of the zipfile
3101        """
3102        root = zipfile.Path(alpharep)
3103        assert root.name == 'alpharep.zip' == root.filename.name
3104
3105    @pass_alpharep
3106    def test_root_parent(self, alpharep):
3107        root = zipfile.Path(alpharep)
3108        assert root.parent == pathlib.Path('.')
3109        root.root.filename = 'foo/bar.zip'
3110        assert root.parent == pathlib.Path('foo')
3111
3112    @pass_alpharep
3113    def test_root_unnamed(self, alpharep):
3114        """
3115        It is an error to attempt to get the name
3116        or parent of an unnamed zipfile.
3117        """
3118        alpharep.filename = None
3119        root = zipfile.Path(alpharep)
3120        with self.assertRaises(TypeError):
3121            root.name
3122        with self.assertRaises(TypeError):
3123            root.parent
3124
3125        # .name and .parent should still work on subs
3126        sub = root / "b"
3127        assert sub.name == "b"
3128        assert sub.parent
3129
3130    @pass_alpharep
3131    def test_inheritance(self, alpharep):
3132        cls = type('PathChild', (zipfile.Path,), {})
3133        for alpharep in self.zipfile_alpharep():
3134            file = cls(alpharep).joinpath('some dir').parent
3135            assert isinstance(file, cls)
3136
3137
3138if __name__ == "__main__":
3139    unittest.main()
3140