• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import importlib.util
3import io
4import itertools
5import os
6import pathlib
7import posixpath
8import string
9import struct
10import subprocess
11import sys
12import time
13import unittest
14import unittest.mock as mock
15import zipfile
16
17
18from tempfile import TemporaryFile
19from random import randint, random, getrandbits
20
21from test.support import script_helper
22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd,
23                          requires_zlib, requires_bz2, requires_lzma,
24                          captured_stdout)
25
26TESTFN2 = TESTFN + "2"
27TESTFNDIR = TESTFN + "d"
28FIXEDTEST_SIZE = 1000
29DATAFILES_DIR = 'zipfile_datafiles'
30
31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
32                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
33                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
34                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
35
36def getrandbytes(size):
37    return getrandbits(8 * size).to_bytes(size, 'little')
38
39def get_files(test):
40    yield TESTFN2
41    with TemporaryFile() as f:
42        yield f
43        test.assertFalse(f.closed)
44    with io.BytesIO() as f:
45        yield f
46        test.assertFalse(f.closed)
47
48class AbstractTestsWithSourceFile:
49    @classmethod
50    def setUpClass(cls):
51        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
52                              (i, random()), "ascii")
53                        for i in range(FIXEDTEST_SIZE)]
54        cls.data = b''.join(cls.line_gen)
55
56    def setUp(self):
57        # Make a source file with some lines
58        with open(TESTFN, "wb") as fp:
59            fp.write(self.data)
60
61    def make_test_archive(self, f, compression, compresslevel=None):
62        kwargs = {'compression': compression, 'compresslevel': compresslevel}
63        # Create the ZIP archive
64        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
65            zipfp.write(TESTFN, "another.name")
66            zipfp.write(TESTFN, TESTFN)
67            zipfp.writestr("strfile", self.data)
68            with zipfp.open('written-open-w', mode='w') as f:
69                for line in self.line_gen:
70                    f.write(line)
71
72    def zip_test(self, f, compression, compresslevel=None):
73        self.make_test_archive(f, compression, compresslevel)
74
75        # Read the ZIP archive
76        with zipfile.ZipFile(f, "r", compression) as zipfp:
77            self.assertEqual(zipfp.read(TESTFN), self.data)
78            self.assertEqual(zipfp.read("another.name"), self.data)
79            self.assertEqual(zipfp.read("strfile"), self.data)
80
81            # Print the ZIP directory
82            fp = io.StringIO()
83            zipfp.printdir(file=fp)
84            directory = fp.getvalue()
85            lines = directory.splitlines()
86            self.assertEqual(len(lines), 5) # Number of files + header
87
88            self.assertIn('File Name', lines[0])
89            self.assertIn('Modified', lines[0])
90            self.assertIn('Size', lines[0])
91
92            fn, date, time_, size = lines[1].split()
93            self.assertEqual(fn, 'another.name')
94            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
95            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
96            self.assertEqual(size, str(len(self.data)))
97
98            # Check the namelist
99            names = zipfp.namelist()
100            self.assertEqual(len(names), 4)
101            self.assertIn(TESTFN, names)
102            self.assertIn("another.name", names)
103            self.assertIn("strfile", names)
104            self.assertIn("written-open-w", names)
105
106            # Check infolist
107            infos = zipfp.infolist()
108            names = [i.filename for i in infos]
109            self.assertEqual(len(names), 4)
110            self.assertIn(TESTFN, names)
111            self.assertIn("another.name", names)
112            self.assertIn("strfile", names)
113            self.assertIn("written-open-w", names)
114            for i in infos:
115                self.assertEqual(i.file_size, len(self.data))
116
117            # check getinfo
118            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
119                info = zipfp.getinfo(nm)
120                self.assertEqual(info.filename, nm)
121                self.assertEqual(info.file_size, len(self.data))
122
123            # Check that testzip doesn't raise an exception
124            zipfp.testzip()
125
126    def test_basic(self):
127        for f in get_files(self):
128            self.zip_test(f, self.compression)
129
130    def zip_open_test(self, f, compression):
131        self.make_test_archive(f, compression)
132
133        # Read the ZIP archive
134        with zipfile.ZipFile(f, "r", compression) as zipfp:
135            zipdata1 = []
136            with zipfp.open(TESTFN) as zipopen1:
137                while True:
138                    read_data = zipopen1.read(256)
139                    if not read_data:
140                        break
141                    zipdata1.append(read_data)
142
143            zipdata2 = []
144            with zipfp.open("another.name") as zipopen2:
145                while True:
146                    read_data = zipopen2.read(256)
147                    if not read_data:
148                        break
149                    zipdata2.append(read_data)
150
151            self.assertEqual(b''.join(zipdata1), self.data)
152            self.assertEqual(b''.join(zipdata2), self.data)
153
154    def test_open(self):
155        for f in get_files(self):
156            self.zip_open_test(f, self.compression)
157
158    def test_open_with_pathlike(self):
159        path = pathlib.Path(TESTFN2)
160        self.zip_open_test(path, self.compression)
161        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
162            self.assertIsInstance(zipfp.filename, str)
163
164    def zip_random_open_test(self, f, compression):
165        self.make_test_archive(f, compression)
166
167        # Read the ZIP archive
168        with zipfile.ZipFile(f, "r", compression) as zipfp:
169            zipdata1 = []
170            with zipfp.open(TESTFN) as zipopen1:
171                while True:
172                    read_data = zipopen1.read(randint(1, 1024))
173                    if not read_data:
174                        break
175                    zipdata1.append(read_data)
176
177            self.assertEqual(b''.join(zipdata1), self.data)
178
179    def test_random_open(self):
180        for f in get_files(self):
181            self.zip_random_open_test(f, self.compression)
182
183    def zip_read1_test(self, f, compression):
184        self.make_test_archive(f, compression)
185
186        # Read the ZIP archive
187        with zipfile.ZipFile(f, "r") as zipfp, \
188             zipfp.open(TESTFN) as zipopen:
189            zipdata = []
190            while True:
191                read_data = zipopen.read1(-1)
192                if not read_data:
193                    break
194                zipdata.append(read_data)
195
196        self.assertEqual(b''.join(zipdata), self.data)
197
198    def test_read1(self):
199        for f in get_files(self):
200            self.zip_read1_test(f, self.compression)
201
202    def zip_read1_10_test(self, f, compression):
203        self.make_test_archive(f, compression)
204
205        # Read the ZIP archive
206        with zipfile.ZipFile(f, "r") as zipfp, \
207             zipfp.open(TESTFN) as zipopen:
208            zipdata = []
209            while True:
210                read_data = zipopen.read1(10)
211                self.assertLessEqual(len(read_data), 10)
212                if not read_data:
213                    break
214                zipdata.append(read_data)
215
216        self.assertEqual(b''.join(zipdata), self.data)
217
218    def test_read1_10(self):
219        for f in get_files(self):
220            self.zip_read1_10_test(f, self.compression)
221
222    def zip_readline_read_test(self, f, compression):
223        self.make_test_archive(f, compression)
224
225        # Read the ZIP archive
226        with zipfile.ZipFile(f, "r") as zipfp, \
227             zipfp.open(TESTFN) as zipopen:
228            data = b''
229            while True:
230                read = zipopen.readline()
231                if not read:
232                    break
233                data += read
234
235                read = zipopen.read(100)
236                if not read:
237                    break
238                data += read
239
240        self.assertEqual(data, self.data)
241
242    def test_readline_read(self):
243        # Issue #7610: calls to readline() interleaved with calls to read().
244        for f in get_files(self):
245            self.zip_readline_read_test(f, self.compression)
246
247    def zip_readline_test(self, f, compression):
248        self.make_test_archive(f, compression)
249
250        # Read the ZIP archive
251        with zipfile.ZipFile(f, "r") as zipfp:
252            with zipfp.open(TESTFN) as zipopen:
253                for line in self.line_gen:
254                    linedata = zipopen.readline()
255                    self.assertEqual(linedata, line)
256
257    def test_readline(self):
258        for f in get_files(self):
259            self.zip_readline_test(f, self.compression)
260
261    def zip_readlines_test(self, f, compression):
262        self.make_test_archive(f, compression)
263
264        # Read the ZIP archive
265        with zipfile.ZipFile(f, "r") as zipfp:
266            with zipfp.open(TESTFN) as zipopen:
267                ziplines = zipopen.readlines()
268            for line, zipline in zip(self.line_gen, ziplines):
269                self.assertEqual(zipline, line)
270
271    def test_readlines(self):
272        for f in get_files(self):
273            self.zip_readlines_test(f, self.compression)
274
275    def zip_iterlines_test(self, f, compression):
276        self.make_test_archive(f, compression)
277
278        # Read the ZIP archive
279        with zipfile.ZipFile(f, "r") as zipfp:
280            with zipfp.open(TESTFN) as zipopen:
281                for line, zipline in zip(self.line_gen, zipopen):
282                    self.assertEqual(zipline, line)
283
284    def test_iterlines(self):
285        for f in get_files(self):
286            self.zip_iterlines_test(f, self.compression)
287
288    def test_low_compression(self):
289        """Check for cases where compressed data is larger than original."""
290        # Create the ZIP archive
291        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
292            zipfp.writestr("strfile", '12')
293
294        # Get an open object for strfile
295        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
296            with zipfp.open("strfile") as openobj:
297                self.assertEqual(openobj.read(1), b'1')
298                self.assertEqual(openobj.read(1), b'2')
299
300    def test_writestr_compression(self):
301        zipfp = zipfile.ZipFile(TESTFN2, "w")
302        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
303        info = zipfp.getinfo('b.txt')
304        self.assertEqual(info.compress_type, self.compression)
305
306    def test_writestr_compresslevel(self):
307        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
308        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
309        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
310                       compresslevel=2)
311
312        # Compression level follows the constructor.
313        a_info = zipfp.getinfo('a.txt')
314        self.assertEqual(a_info.compress_type, self.compression)
315        self.assertEqual(a_info._compresslevel, 1)
316
317        # Compression level is overridden.
318        b_info = zipfp.getinfo('b.txt')
319        self.assertEqual(b_info.compress_type, self.compression)
320        self.assertEqual(b_info._compresslevel, 2)
321
322    def test_read_return_size(self):
323        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
324        # than requested.
325        for test_size in (1, 4095, 4096, 4097, 16384):
326            file_size = test_size + 1
327            junk = getrandbytes(file_size)
328            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
329                zipf.writestr('foo', junk)
330                with zipf.open('foo', 'r') as fp:
331                    buf = fp.read(test_size)
332                    self.assertEqual(len(buf), test_size)
333
334    def test_truncated_zipfile(self):
335        fp = io.BytesIO()
336        with zipfile.ZipFile(fp, mode='w') as zipf:
337            zipf.writestr('strfile', self.data, compress_type=self.compression)
338            end_offset = fp.tell()
339        zipfiledata = fp.getvalue()
340
341        fp = io.BytesIO(zipfiledata)
342        with zipfile.ZipFile(fp) as zipf:
343            with zipf.open('strfile') as zipopen:
344                fp.truncate(end_offset - 20)
345                with self.assertRaises(EOFError):
346                    zipopen.read()
347
348        fp = io.BytesIO(zipfiledata)
349        with zipfile.ZipFile(fp) as zipf:
350            with zipf.open('strfile') as zipopen:
351                fp.truncate(end_offset - 20)
352                with self.assertRaises(EOFError):
353                    while zipopen.read(100):
354                        pass
355
356        fp = io.BytesIO(zipfiledata)
357        with zipfile.ZipFile(fp) as zipf:
358            with zipf.open('strfile') as zipopen:
359                fp.truncate(end_offset - 20)
360                with self.assertRaises(EOFError):
361                    while zipopen.read1(100):
362                        pass
363
364    def test_repr(self):
365        fname = 'file.name'
366        for f in get_files(self):
367            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
368                zipfp.write(TESTFN, fname)
369                r = repr(zipfp)
370                self.assertIn("mode='w'", r)
371
372            with zipfile.ZipFile(f, 'r') as zipfp:
373                r = repr(zipfp)
374                if isinstance(f, str):
375                    self.assertIn('filename=%r' % f, r)
376                else:
377                    self.assertIn('file=%r' % f, r)
378                self.assertIn("mode='r'", r)
379                r = repr(zipfp.getinfo(fname))
380                self.assertIn('filename=%r' % fname, r)
381                self.assertIn('filemode=', r)
382                self.assertIn('file_size=', r)
383                if self.compression != zipfile.ZIP_STORED:
384                    self.assertIn('compress_type=', r)
385                    self.assertIn('compress_size=', r)
386                with zipfp.open(fname) as zipopen:
387                    r = repr(zipopen)
388                    self.assertIn('name=%r' % fname, r)
389                    self.assertIn("mode='r'", r)
390                    if self.compression != zipfile.ZIP_STORED:
391                        self.assertIn('compress_type=', r)
392                self.assertIn('[closed]', repr(zipopen))
393            self.assertIn('[closed]', repr(zipfp))
394
395    def test_compresslevel_basic(self):
396        for f in get_files(self):
397            self.zip_test(f, self.compression, compresslevel=9)
398
399    def test_per_file_compresslevel(self):
400        """Check that files within a Zip archive can have different
401        compression levels."""
402        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
403            zipfp.write(TESTFN, 'compress_1')
404            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
405            one_info = zipfp.getinfo('compress_1')
406            nine_info = zipfp.getinfo('compress_9')
407            self.assertEqual(one_info._compresslevel, 1)
408            self.assertEqual(nine_info._compresslevel, 9)
409
410    def test_writing_errors(self):
411        class BrokenFile(io.BytesIO):
412            def write(self, data):
413                nonlocal count
414                if count is not None:
415                    if count == stop:
416                        raise OSError
417                    count += 1
418                super().write(data)
419
420        stop = 0
421        while True:
422            testfile = BrokenFile()
423            count = None
424            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
425                with zipfp.open('file1', 'w') as f:
426                    f.write(b'data1')
427                count = 0
428                try:
429                    with zipfp.open('file2', 'w') as f:
430                        f.write(b'data2')
431                except OSError:
432                    stop += 1
433                else:
434                    break
435                finally:
436                    count = None
437            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
438                self.assertEqual(zipfp.namelist(), ['file1'])
439                self.assertEqual(zipfp.read('file1'), b'data1')
440
441        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
442            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
443            self.assertEqual(zipfp.read('file1'), b'data1')
444            self.assertEqual(zipfp.read('file2'), b'data2')
445
446
447    def tearDown(self):
448        unlink(TESTFN)
449        unlink(TESTFN2)
450
451
452class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
453                                unittest.TestCase):
454    compression = zipfile.ZIP_STORED
455    test_low_compression = None
456
457    def zip_test_writestr_permissions(self, f, compression):
458        # Make sure that writestr and open(... mode='w') create files with
459        # mode 0600, when they are passed a name rather than a ZipInfo
460        # instance.
461
462        self.make_test_archive(f, compression)
463        with zipfile.ZipFile(f, "r") as zipfp:
464            zinfo = zipfp.getinfo('strfile')
465            self.assertEqual(zinfo.external_attr, 0o600 << 16)
466
467            zinfo2 = zipfp.getinfo('written-open-w')
468            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
469
470    def test_writestr_permissions(self):
471        for f in get_files(self):
472            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
473
474    def test_absolute_arcnames(self):
475        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
476            zipfp.write(TESTFN, "/absolute")
477
478        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
479            self.assertEqual(zipfp.namelist(), ["absolute"])
480
481    def test_append_to_zip_file(self):
482        """Test appending to an existing zipfile."""
483        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
484            zipfp.write(TESTFN, TESTFN)
485
486        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
487            zipfp.writestr("strfile", self.data)
488            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
489
490    def test_append_to_non_zip_file(self):
491        """Test appending to an existing file that is not a zipfile."""
492        # NOTE: this test fails if len(d) < 22 because of the first
493        # line "fpin.seek(-22, 2)" in _EndRecData
494        data = b'I am not a ZipFile!'*10
495        with open(TESTFN2, 'wb') as f:
496            f.write(data)
497
498        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
499            zipfp.write(TESTFN, TESTFN)
500
501        with open(TESTFN2, 'rb') as f:
502            f.seek(len(data))
503            with zipfile.ZipFile(f, "r") as zipfp:
504                self.assertEqual(zipfp.namelist(), [TESTFN])
505                self.assertEqual(zipfp.read(TESTFN), self.data)
506        with open(TESTFN2, 'rb') as f:
507            self.assertEqual(f.read(len(data)), data)
508            zipfiledata = f.read()
509        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
510            self.assertEqual(zipfp.namelist(), [TESTFN])
511            self.assertEqual(zipfp.read(TESTFN), self.data)
512
513    def test_read_concatenated_zip_file(self):
514        with io.BytesIO() as bio:
515            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
516                zipfp.write(TESTFN, TESTFN)
517            zipfiledata = bio.getvalue()
518        data = b'I am not a ZipFile!'*10
519        with open(TESTFN2, 'wb') as f:
520            f.write(data)
521            f.write(zipfiledata)
522
523        with zipfile.ZipFile(TESTFN2) as zipfp:
524            self.assertEqual(zipfp.namelist(), [TESTFN])
525            self.assertEqual(zipfp.read(TESTFN), self.data)
526
527    def test_append_to_concatenated_zip_file(self):
528        with io.BytesIO() as bio:
529            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
530                zipfp.write(TESTFN, TESTFN)
531            zipfiledata = bio.getvalue()
532        data = b'I am not a ZipFile!'*1000000
533        with open(TESTFN2, 'wb') as f:
534            f.write(data)
535            f.write(zipfiledata)
536
537        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
538            self.assertEqual(zipfp.namelist(), [TESTFN])
539            zipfp.writestr('strfile', self.data)
540
541        with open(TESTFN2, 'rb') as f:
542            self.assertEqual(f.read(len(data)), data)
543            zipfiledata = f.read()
544        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
545            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
546            self.assertEqual(zipfp.read(TESTFN), self.data)
547            self.assertEqual(zipfp.read('strfile'), self.data)
548
549    def test_ignores_newline_at_end(self):
550        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
551            zipfp.write(TESTFN, TESTFN)
552        with open(TESTFN2, 'a') as f:
553            f.write("\r\n\00\00\00")
554        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
555            self.assertIsInstance(zipfp, zipfile.ZipFile)
556
557    def test_ignores_stuff_appended_past_comments(self):
558        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
559            zipfp.comment = b"this is a comment"
560            zipfp.write(TESTFN, TESTFN)
561        with open(TESTFN2, 'a') as f:
562            f.write("abcdef\r\n")
563        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
564            self.assertIsInstance(zipfp, zipfile.ZipFile)
565            self.assertEqual(zipfp.comment, b"this is a comment")
566
567    def test_write_default_name(self):
568        """Check that calling ZipFile.write without arcname specified
569        produces the expected result."""
570        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
571            zipfp.write(TESTFN)
572            with open(TESTFN, "rb") as f:
573                self.assertEqual(zipfp.read(TESTFN), f.read())
574
575    def test_write_to_readonly(self):
576        """Check that trying to call write() on a readonly ZipFile object
577        raises a ValueError."""
578        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
579            zipfp.writestr("somefile.txt", "bogus")
580
581        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
582            self.assertRaises(ValueError, zipfp.write, TESTFN)
583
584        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
585            with self.assertRaises(ValueError):
586                zipfp.open(TESTFN, mode='w')
587
588    def test_add_file_before_1980(self):
589        # Set atime and mtime to 1970-01-01
590        os.utime(TESTFN, (0, 0))
591        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
592            self.assertRaises(ValueError, zipfp.write, TESTFN)
593
594        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
595            zipfp.write(TESTFN)
596            zinfo = zipfp.getinfo(TESTFN)
597            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
598
599    def test_add_file_after_2107(self):
600        # Set atime and mtime to 2108-12-30
601        ts = 4386268800
602        try:
603            time.localtime(ts)
604        except OverflowError:
605            self.skipTest(f'time.localtime({ts}) raises OverflowError')
606        try:
607            os.utime(TESTFN, (ts, ts))
608        except OverflowError:
609            self.skipTest('Host fs cannot set timestamp to required value.')
610
611        mtime_ns = os.stat(TESTFN).st_mtime_ns
612        if mtime_ns != (4386268800 * 10**9):
613            # XFS filesystem is limited to 32-bit timestamp, but the syscall
614            # didn't fail. Moreover, there is a VFS bug which returns
615            # a cached timestamp which is different than the value on disk.
616            #
617            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
618            #
619            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
620            # https://bugs.python.org/issue39460#msg360952
621            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
622
623        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
624            self.assertRaises(struct.error, zipfp.write, TESTFN)
625
626        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
627            zipfp.write(TESTFN)
628            zinfo = zipfp.getinfo(TESTFN)
629            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
630
631
632@requires_zlib
633class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
634                                 unittest.TestCase):
635    compression = zipfile.ZIP_DEFLATED
636
637    def test_per_file_compression(self):
638        """Check that files within a Zip archive can have different
639        compression options."""
640        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
641            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
642            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
643            sinfo = zipfp.getinfo('storeme')
644            dinfo = zipfp.getinfo('deflateme')
645            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
646            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
647
648@requires_bz2
649class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
650                               unittest.TestCase):
651    compression = zipfile.ZIP_BZIP2
652
653@requires_lzma
654class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
655                              unittest.TestCase):
656    compression = zipfile.ZIP_LZMA
657
658
659class AbstractTestZip64InSmallFiles:
660    # These tests test the ZIP64 functionality without using large files,
661    # see test_zipfile64 for proper tests.
662
663    @classmethod
664    def setUpClass(cls):
665        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
666                    for i in range(0, FIXEDTEST_SIZE))
667        cls.data = b'\n'.join(line_gen)
668
669    def setUp(self):
670        self._limit = zipfile.ZIP64_LIMIT
671        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
672        zipfile.ZIP64_LIMIT = 1000
673        zipfile.ZIP_FILECOUNT_LIMIT = 9
674
675        # Make a source file with some lines
676        with open(TESTFN, "wb") as fp:
677            fp.write(self.data)
678
679    def zip_test(self, f, compression):
680        # Create the ZIP archive
681        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
682            zipfp.write(TESTFN, "another.name")
683            zipfp.write(TESTFN, TESTFN)
684            zipfp.writestr("strfile", self.data)
685
686        # Read the ZIP archive
687        with zipfile.ZipFile(f, "r", compression) as zipfp:
688            self.assertEqual(zipfp.read(TESTFN), self.data)
689            self.assertEqual(zipfp.read("another.name"), self.data)
690            self.assertEqual(zipfp.read("strfile"), self.data)
691
692            # Print the ZIP directory
693            fp = io.StringIO()
694            zipfp.printdir(fp)
695
696            directory = fp.getvalue()
697            lines = directory.splitlines()
698            self.assertEqual(len(lines), 4) # Number of files + header
699
700            self.assertIn('File Name', lines[0])
701            self.assertIn('Modified', lines[0])
702            self.assertIn('Size', lines[0])
703
704            fn, date, time_, size = lines[1].split()
705            self.assertEqual(fn, 'another.name')
706            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
707            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
708            self.assertEqual(size, str(len(self.data)))
709
710            # Check the namelist
711            names = zipfp.namelist()
712            self.assertEqual(len(names), 3)
713            self.assertIn(TESTFN, names)
714            self.assertIn("another.name", names)
715            self.assertIn("strfile", names)
716
717            # Check infolist
718            infos = zipfp.infolist()
719            names = [i.filename for i in infos]
720            self.assertEqual(len(names), 3)
721            self.assertIn(TESTFN, names)
722            self.assertIn("another.name", names)
723            self.assertIn("strfile", names)
724            for i in infos:
725                self.assertEqual(i.file_size, len(self.data))
726
727            # check getinfo
728            for nm in (TESTFN, "another.name", "strfile"):
729                info = zipfp.getinfo(nm)
730                self.assertEqual(info.filename, nm)
731                self.assertEqual(info.file_size, len(self.data))
732
733            # Check that testzip doesn't raise an exception
734            zipfp.testzip()
735
736    def test_basic(self):
737        for f in get_files(self):
738            self.zip_test(f, self.compression)
739
740    def test_too_many_files(self):
741        # This test checks that more than 64k files can be added to an archive,
742        # and that the resulting archive can be read properly by ZipFile
743        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
744                               allowZip64=True)
745        zipf.debug = 100
746        numfiles = 15
747        for i in range(numfiles):
748            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
749        self.assertEqual(len(zipf.namelist()), numfiles)
750        zipf.close()
751
752        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
753        self.assertEqual(len(zipf2.namelist()), numfiles)
754        for i in range(numfiles):
755            content = zipf2.read("foo%08d" % i).decode('ascii')
756            self.assertEqual(content, "%d" % (i**3 % 57))
757        zipf2.close()
758
759    def test_too_many_files_append(self):
760        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
761                               allowZip64=False)
762        zipf.debug = 100
763        numfiles = 9
764        for i in range(numfiles):
765            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
766        self.assertEqual(len(zipf.namelist()), numfiles)
767        with self.assertRaises(zipfile.LargeZipFile):
768            zipf.writestr("foo%08d" % numfiles, b'')
769        self.assertEqual(len(zipf.namelist()), numfiles)
770        zipf.close()
771
772        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
773                               allowZip64=False)
774        zipf.debug = 100
775        self.assertEqual(len(zipf.namelist()), numfiles)
776        with self.assertRaises(zipfile.LargeZipFile):
777            zipf.writestr("foo%08d" % numfiles, b'')
778        self.assertEqual(len(zipf.namelist()), numfiles)
779        zipf.close()
780
781        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
782                               allowZip64=True)
783        zipf.debug = 100
784        self.assertEqual(len(zipf.namelist()), numfiles)
785        numfiles2 = 15
786        for i in range(numfiles, numfiles2):
787            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
788        self.assertEqual(len(zipf.namelist()), numfiles2)
789        zipf.close()
790
791        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
792        self.assertEqual(len(zipf2.namelist()), numfiles2)
793        for i in range(numfiles2):
794            content = zipf2.read("foo%08d" % i).decode('ascii')
795            self.assertEqual(content, "%d" % (i**3 % 57))
796        zipf2.close()
797
798    def tearDown(self):
799        zipfile.ZIP64_LIMIT = self._limit
800        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
801        unlink(TESTFN)
802        unlink(TESTFN2)
803
804
805class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
806                                  unittest.TestCase):
807    compression = zipfile.ZIP_STORED
808
809    def large_file_exception_test(self, f, compression):
810        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
811            self.assertRaises(zipfile.LargeZipFile,
812                              zipfp.write, TESTFN, "another.name")
813
814    def large_file_exception_test2(self, f, compression):
815        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
816            self.assertRaises(zipfile.LargeZipFile,
817                              zipfp.writestr, "another.name", self.data)
818
819    def test_large_file_exception(self):
820        for f in get_files(self):
821            self.large_file_exception_test(f, zipfile.ZIP_STORED)
822            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
823
824    def test_absolute_arcnames(self):
825        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
826                             allowZip64=True) as zipfp:
827            zipfp.write(TESTFN, "/absolute")
828
829        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
830            self.assertEqual(zipfp.namelist(), ["absolute"])
831
832    def test_append(self):
833        # Test that appending to the Zip64 archive doesn't change
834        # extra fields of existing entries.
835        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
836            zipfp.writestr("strfile", self.data)
837        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
838            zinfo = zipfp.getinfo("strfile")
839            extra = zinfo.extra
840        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
841            zipfp.writestr("strfile2", self.data)
842        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
843            zinfo = zipfp.getinfo("strfile")
844            self.assertEqual(zinfo.extra, extra)
845
846    def make_zip64_file(
847        self, file_size_64_set=False, file_size_extra=False,
848        compress_size_64_set=False, compress_size_extra=False,
849        header_offset_64_set=False, header_offset_extra=False,
850    ):
851        """Generate bytes sequence for a zip with (incomplete) zip64 data.
852
853        The actual values (not the zip 64 0xffffffff values) stored in the file
854        are:
855        file_size: 8
856        compress_size: 8
857        header_offset: 0
858        """
859        actual_size = 8
860        actual_header_offset = 0
861        local_zip64_fields = []
862        central_zip64_fields = []
863
864        file_size = actual_size
865        if file_size_64_set:
866            file_size = 0xffffffff
867            if file_size_extra:
868                local_zip64_fields.append(actual_size)
869                central_zip64_fields.append(actual_size)
870        file_size = struct.pack("<L", file_size)
871
872        compress_size = actual_size
873        if compress_size_64_set:
874            compress_size = 0xffffffff
875            if compress_size_extra:
876                local_zip64_fields.append(actual_size)
877                central_zip64_fields.append(actual_size)
878        compress_size = struct.pack("<L", compress_size)
879
880        header_offset = actual_header_offset
881        if header_offset_64_set:
882            header_offset = 0xffffffff
883            if header_offset_extra:
884                central_zip64_fields.append(actual_header_offset)
885        header_offset = struct.pack("<L", header_offset)
886
887        local_extra = struct.pack(
888            '<HH' + 'Q'*len(local_zip64_fields),
889            0x0001,
890            8*len(local_zip64_fields),
891            *local_zip64_fields
892        )
893
894        central_extra = struct.pack(
895            '<HH' + 'Q'*len(central_zip64_fields),
896            0x0001,
897            8*len(central_zip64_fields),
898            *central_zip64_fields
899        )
900
901        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
902        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
903
904        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
905        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
906
907        filename = b"test.txt"
908        content = b"test1234"
909        filename_length = struct.pack("<H", len(filename))
910        zip64_contents = (
911            # Local file header
912            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
913            + compress_size
914            + file_size
915            + filename_length
916            + local_extra_length
917            + filename
918            + local_extra
919            + content
920            # Central directory:
921            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
922            + compress_size
923            + file_size
924            + filename_length
925            + central_extra_length
926            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
927            + header_offset
928            + filename
929            + central_extra
930            # Zip64 end of central directory
931            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
932            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
933            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
934            + central_dir_size
935            + offset_to_central_dir
936            # Zip64 end of central directory locator
937            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
938            + b"\x00\x00\x00"
939            # end of central directory
940            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
941            + b"\x00\x00\x00\x00"
942        )
943        return zip64_contents
944
945    def test_bad_zip64_extra(self):
946        """Missing zip64 extra records raises an exception.
947
948        There are 4 fields that the zip64 format handles (the disk number is
949        not used in this module and so is ignored here). According to the zip
950        spec:
951              The order of the fields in the zip64 extended
952              information record is fixed, but the fields MUST
953              only appear if the corresponding Local or Central
954              directory record field is set to 0xFFFF or 0xFFFFFFFF.
955
956        If the zip64 extra content doesn't contain enough entries for the
957        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
958        This test mismatches the length of the zip64 extra field and the number
959        of fields set to indicate the presence of zip64 data.
960        """
961        # zip64 file size present, no fields in extra, expecting one, equals
962        # missing file size.
963        missing_file_size_extra = self.make_zip64_file(
964            file_size_64_set=True,
965        )
966        with self.assertRaises(zipfile.BadZipFile) as e:
967            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
968        self.assertIn('file size', str(e.exception).lower())
969
970        # zip64 file size present, zip64 compress size present, one field in
971        # extra, expecting two, equals missing compress size.
972        missing_compress_size_extra = self.make_zip64_file(
973            file_size_64_set=True,
974            file_size_extra=True,
975            compress_size_64_set=True,
976        )
977        with self.assertRaises(zipfile.BadZipFile) as e:
978            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
979        self.assertIn('compress size', str(e.exception).lower())
980
981        # zip64 compress size present, no fields in extra, expecting one,
982        # equals missing compress size.
983        missing_compress_size_extra = self.make_zip64_file(
984            compress_size_64_set=True,
985        )
986        with self.assertRaises(zipfile.BadZipFile) as e:
987            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
988        self.assertIn('compress size', str(e.exception).lower())
989
990        # zip64 file size present, zip64 compress size present, zip64 header
991        # offset present, two fields in extra, expecting three, equals missing
992        # header offset
993        missing_header_offset_extra = self.make_zip64_file(
994            file_size_64_set=True,
995            file_size_extra=True,
996            compress_size_64_set=True,
997            compress_size_extra=True,
998            header_offset_64_set=True,
999        )
1000        with self.assertRaises(zipfile.BadZipFile) as e:
1001            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1002        self.assertIn('header offset', str(e.exception).lower())
1003
1004        # zip64 compress size present, zip64 header offset present, one field
1005        # in extra, expecting two, equals missing header offset
1006        missing_header_offset_extra = self.make_zip64_file(
1007            file_size_64_set=False,
1008            compress_size_64_set=True,
1009            compress_size_extra=True,
1010            header_offset_64_set=True,
1011        )
1012        with self.assertRaises(zipfile.BadZipFile) as e:
1013            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1014        self.assertIn('header offset', str(e.exception).lower())
1015
1016        # zip64 file size present, zip64 header offset present, one field in
1017        # extra, expecting two, equals missing header offset
1018        missing_header_offset_extra = self.make_zip64_file(
1019            file_size_64_set=True,
1020            file_size_extra=True,
1021            compress_size_64_set=False,
1022            header_offset_64_set=True,
1023        )
1024        with self.assertRaises(zipfile.BadZipFile) as e:
1025            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1026        self.assertIn('header offset', str(e.exception).lower())
1027
1028        # zip64 header offset present, no fields in extra, expecting one,
1029        # equals missing header offset
1030        missing_header_offset_extra = self.make_zip64_file(
1031            file_size_64_set=False,
1032            compress_size_64_set=False,
1033            header_offset_64_set=True,
1034        )
1035        with self.assertRaises(zipfile.BadZipFile) as e:
1036            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1037        self.assertIn('header offset', str(e.exception).lower())
1038
1039    def test_generated_valid_zip64_extra(self):
1040        # These values are what is set in the make_zip64_file method.
1041        expected_file_size = 8
1042        expected_compress_size = 8
1043        expected_header_offset = 0
1044        expected_content = b"test1234"
1045
1046        # Loop through the various valid combinations of zip64 masks
1047        # present and extra fields present.
1048        params = (
1049            {"file_size_64_set": True, "file_size_extra": True},
1050            {"compress_size_64_set": True, "compress_size_extra": True},
1051            {"header_offset_64_set": True, "header_offset_extra": True},
1052        )
1053
1054        for r in range(1, len(params) + 1):
1055            for combo in itertools.combinations(params, r):
1056                kwargs = {}
1057                for c in combo:
1058                    kwargs.update(c)
1059                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1060                    zinfo = zf.infolist()[0]
1061                    self.assertEqual(zinfo.file_size, expected_file_size)
1062                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1063                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1064                    self.assertEqual(zf.read(zinfo), expected_content)
1065
1066
1067@requires_zlib
1068class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1069                                   unittest.TestCase):
1070    compression = zipfile.ZIP_DEFLATED
1071
1072@requires_bz2
1073class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1074                                 unittest.TestCase):
1075    compression = zipfile.ZIP_BZIP2
1076
1077@requires_lzma
1078class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1079                                unittest.TestCase):
1080    compression = zipfile.ZIP_LZMA
1081
1082
1083class AbstractWriterTests:
1084
1085    def tearDown(self):
1086        unlink(TESTFN2)
1087
1088    def test_close_after_close(self):
1089        data = b'content'
1090        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1091            w = zipf.open('test', 'w')
1092            w.write(data)
1093            w.close()
1094            self.assertTrue(w.closed)
1095            w.close()
1096            self.assertTrue(w.closed)
1097            self.assertEqual(zipf.read('test'), data)
1098
1099    def test_write_after_close(self):
1100        data = b'content'
1101        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1102            w = zipf.open('test', 'w')
1103            w.write(data)
1104            w.close()
1105            self.assertTrue(w.closed)
1106            self.assertRaises(ValueError, w.write, b'')
1107            self.assertEqual(zipf.read('test'), data)
1108
1109class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1110    compression = zipfile.ZIP_STORED
1111
1112@requires_zlib
1113class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1114    compression = zipfile.ZIP_DEFLATED
1115
1116@requires_bz2
1117class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1118    compression = zipfile.ZIP_BZIP2
1119
1120@requires_lzma
1121class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1122    compression = zipfile.ZIP_LZMA
1123
1124
1125class PyZipFileTests(unittest.TestCase):
1126    def assertCompiledIn(self, name, namelist):
1127        if name + 'o' not in namelist:
1128            self.assertIn(name + 'c', namelist)
1129
1130    def requiresWriteAccess(self, path):
1131        # effective_ids unavailable on windows
1132        if not os.access(path, os.W_OK,
1133                         effective_ids=os.access in os.supports_effective_ids):
1134            self.skipTest('requires write access to the installed location')
1135        filename = os.path.join(path, 'test_zipfile.try')
1136        try:
1137            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1138            os.close(fd)
1139        except Exception:
1140            self.skipTest('requires write access to the installed location')
1141        unlink(filename)
1142
1143    def test_write_pyfile(self):
1144        self.requiresWriteAccess(os.path.dirname(__file__))
1145        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1146            fn = __file__
1147            if fn.endswith('.pyc'):
1148                path_split = fn.split(os.sep)
1149                if os.altsep is not None:
1150                    path_split.extend(fn.split(os.altsep))
1151                if '__pycache__' in path_split:
1152                    fn = importlib.util.source_from_cache(fn)
1153                else:
1154                    fn = fn[:-1]
1155
1156            zipfp.writepy(fn)
1157
1158            bn = os.path.basename(fn)
1159            self.assertNotIn(bn, zipfp.namelist())
1160            self.assertCompiledIn(bn, zipfp.namelist())
1161
1162        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1163            fn = __file__
1164            if fn.endswith('.pyc'):
1165                fn = fn[:-1]
1166
1167            zipfp.writepy(fn, "testpackage")
1168
1169            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1170            self.assertNotIn(bn, zipfp.namelist())
1171            self.assertCompiledIn(bn, zipfp.namelist())
1172
1173    def test_write_python_package(self):
1174        import email
1175        packagedir = os.path.dirname(email.__file__)
1176        self.requiresWriteAccess(packagedir)
1177
1178        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1179            zipfp.writepy(packagedir)
1180
1181            # Check for a couple of modules at different levels of the
1182            # hierarchy
1183            names = zipfp.namelist()
1184            self.assertCompiledIn('email/__init__.py', names)
1185            self.assertCompiledIn('email/mime/text.py', names)
1186
1187    def test_write_filtered_python_package(self):
1188        import test
1189        packagedir = os.path.dirname(test.__file__)
1190        self.requiresWriteAccess(packagedir)
1191
1192        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1193
1194            # first make sure that the test folder gives error messages
1195            # (on the badsyntax_... files)
1196            with captured_stdout() as reportSIO:
1197                zipfp.writepy(packagedir)
1198            reportStr = reportSIO.getvalue()
1199            self.assertTrue('SyntaxError' in reportStr)
1200
1201            # then check that the filter works on the whole package
1202            with captured_stdout() as reportSIO:
1203                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1204            reportStr = reportSIO.getvalue()
1205            self.assertTrue('SyntaxError' not in reportStr)
1206
1207            # then check that the filter works on individual files
1208            def filter(path):
1209                return not os.path.basename(path).startswith("bad")
1210            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1211                zipfp.writepy(packagedir, filterfunc=filter)
1212            reportStr = reportSIO.getvalue()
1213            if reportStr:
1214                print(reportStr)
1215            self.assertTrue('SyntaxError' not in reportStr)
1216
1217    def test_write_with_optimization(self):
1218        import email
1219        packagedir = os.path.dirname(email.__file__)
1220        self.requiresWriteAccess(packagedir)
1221        optlevel = 1 if __debug__ else 0
1222        ext = '.pyc'
1223
1224        with TemporaryFile() as t, \
1225             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1226            zipfp.writepy(packagedir)
1227
1228            names = zipfp.namelist()
1229            self.assertIn('email/__init__' + ext, names)
1230            self.assertIn('email/mime/text' + ext, names)
1231
1232    def test_write_python_directory(self):
1233        os.mkdir(TESTFN2)
1234        try:
1235            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1236                fp.write("print(42)\n")
1237
1238            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1239                fp.write("print(42 * 42)\n")
1240
1241            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
1242                fp.write("bla bla bla\n")
1243
1244            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1245                zipfp.writepy(TESTFN2)
1246
1247                names = zipfp.namelist()
1248                self.assertCompiledIn('mod1.py', names)
1249                self.assertCompiledIn('mod2.py', names)
1250                self.assertNotIn('mod2.txt', names)
1251
1252        finally:
1253            rmtree(TESTFN2)
1254
1255    def test_write_python_directory_filtered(self):
1256        os.mkdir(TESTFN2)
1257        try:
1258            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1259                fp.write("print(42)\n")
1260
1261            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
1262                fp.write("print(42 * 42)\n")
1263
1264            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1265                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1266                                                  not fn.endswith('mod2.py'))
1267
1268                names = zipfp.namelist()
1269                self.assertCompiledIn('mod1.py', names)
1270                self.assertNotIn('mod2.py', names)
1271
1272        finally:
1273            rmtree(TESTFN2)
1274
1275    def test_write_non_pyfile(self):
1276        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1277            with open(TESTFN, 'w') as f:
1278                f.write('most definitely not a python file')
1279            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1280            unlink(TESTFN)
1281
1282    def test_write_pyfile_bad_syntax(self):
1283        os.mkdir(TESTFN2)
1284        try:
1285            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1286                fp.write("Bad syntax in python file\n")
1287
1288            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1289                # syntax errors are printed to stdout
1290                with captured_stdout() as s:
1291                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1292
1293                self.assertIn("SyntaxError", s.getvalue())
1294
1295                # as it will not have compiled the python file, it will
1296                # include the .py file not .pyc
1297                names = zipfp.namelist()
1298                self.assertIn('mod1.py', names)
1299                self.assertNotIn('mod1.pyc', names)
1300
1301        finally:
1302            rmtree(TESTFN2)
1303
1304    def test_write_pathlike(self):
1305        os.mkdir(TESTFN2)
1306        try:
1307            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
1308                fp.write("print(42)\n")
1309
1310            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1311                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1312                names = zipfp.namelist()
1313                self.assertCompiledIn('mod1.py', names)
1314        finally:
1315            rmtree(TESTFN2)
1316
1317
1318class ExtractTests(unittest.TestCase):
1319
1320    def make_test_file(self):
1321        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1322            for fpath, fdata in SMALL_TEST_DATA:
1323                zipfp.writestr(fpath, fdata)
1324
1325    def test_extract(self):
1326        with temp_cwd():
1327            self.make_test_file()
1328            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1329                for fpath, fdata in SMALL_TEST_DATA:
1330                    writtenfile = zipfp.extract(fpath)
1331
1332                    # make sure it was written to the right place
1333                    correctfile = os.path.join(os.getcwd(), fpath)
1334                    correctfile = os.path.normpath(correctfile)
1335
1336                    self.assertEqual(writtenfile, correctfile)
1337
1338                    # make sure correct data is in correct file
1339                    with open(writtenfile, "rb") as f:
1340                        self.assertEqual(fdata.encode(), f.read())
1341
1342                    unlink(writtenfile)
1343
1344    def _test_extract_with_target(self, target):
1345        self.make_test_file()
1346        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1347            for fpath, fdata in SMALL_TEST_DATA:
1348                writtenfile = zipfp.extract(fpath, target)
1349
1350                # make sure it was written to the right place
1351                correctfile = os.path.join(target, fpath)
1352                correctfile = os.path.normpath(correctfile)
1353                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1354
1355                # make sure correct data is in correct file
1356                with open(writtenfile, "rb") as f:
1357                    self.assertEqual(fdata.encode(), f.read())
1358
1359                unlink(writtenfile)
1360
1361        unlink(TESTFN2)
1362
1363    def test_extract_with_target(self):
1364        with temp_dir() as extdir:
1365            self._test_extract_with_target(extdir)
1366
1367    def test_extract_with_target_pathlike(self):
1368        with temp_dir() as extdir:
1369            self._test_extract_with_target(pathlib.Path(extdir))
1370
1371    def test_extract_all(self):
1372        with temp_cwd():
1373            self.make_test_file()
1374            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1375                zipfp.extractall()
1376                for fpath, fdata in SMALL_TEST_DATA:
1377                    outfile = os.path.join(os.getcwd(), fpath)
1378
1379                    with open(outfile, "rb") as f:
1380                        self.assertEqual(fdata.encode(), f.read())
1381
1382                    unlink(outfile)
1383
1384    def _test_extract_all_with_target(self, target):
1385        self.make_test_file()
1386        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1387            zipfp.extractall(target)
1388            for fpath, fdata in SMALL_TEST_DATA:
1389                outfile = os.path.join(target, fpath)
1390
1391                with open(outfile, "rb") as f:
1392                    self.assertEqual(fdata.encode(), f.read())
1393
1394                unlink(outfile)
1395
1396        unlink(TESTFN2)
1397
1398    def test_extract_all_with_target(self):
1399        with temp_dir() as extdir:
1400            self._test_extract_all_with_target(extdir)
1401
1402    def test_extract_all_with_target_pathlike(self):
1403        with temp_dir() as extdir:
1404            self._test_extract_all_with_target(pathlib.Path(extdir))
1405
1406    def check_file(self, filename, content):
1407        self.assertTrue(os.path.isfile(filename))
1408        with open(filename, 'rb') as f:
1409            self.assertEqual(f.read(), content)
1410
1411    def test_sanitize_windows_name(self):
1412        san = zipfile.ZipFile._sanitize_windows_name
1413        # Passing pathsep in allows this test to work regardless of platform.
1414        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1415        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1416        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1417
1418    def test_extract_hackers_arcnames_common_cases(self):
1419        common_hacknames = [
1420            ('../foo/bar', 'foo/bar'),
1421            ('foo/../bar', 'foo/bar'),
1422            ('foo/../../bar', 'foo/bar'),
1423            ('foo/bar/..', 'foo/bar'),
1424            ('./../foo/bar', 'foo/bar'),
1425            ('/foo/bar', 'foo/bar'),
1426            ('/foo/../bar', 'foo/bar'),
1427            ('/foo/../../bar', 'foo/bar'),
1428        ]
1429        self._test_extract_hackers_arcnames(common_hacknames)
1430
1431    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1432    def test_extract_hackers_arcnames_windows_only(self):
1433        """Test combination of path fixing and windows name sanitization."""
1434        windows_hacknames = [
1435            (r'..\foo\bar', 'foo/bar'),
1436            (r'..\/foo\/bar', 'foo/bar'),
1437            (r'foo/\..\/bar', 'foo/bar'),
1438            (r'foo\/../\bar', 'foo/bar'),
1439            (r'C:foo/bar', 'foo/bar'),
1440            (r'C:/foo/bar', 'foo/bar'),
1441            (r'C://foo/bar', 'foo/bar'),
1442            (r'C:\foo\bar', 'foo/bar'),
1443            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1444            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1445            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1446            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1447            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1448            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1449            (r'//?/C:/foo/bar', 'foo/bar'),
1450            (r'\\?\C:\foo\bar', 'foo/bar'),
1451            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1452            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1453            ('../../foo../../ba..r', 'foo/ba..r'),
1454        ]
1455        self._test_extract_hackers_arcnames(windows_hacknames)
1456
1457    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1458    def test_extract_hackers_arcnames_posix_only(self):
1459        posix_hacknames = [
1460            ('//foo/bar', 'foo/bar'),
1461            ('../../foo../../ba..r', 'foo../ba..r'),
1462            (r'foo/..\bar', r'foo/..\bar'),
1463        ]
1464        self._test_extract_hackers_arcnames(posix_hacknames)
1465
1466    def _test_extract_hackers_arcnames(self, hacknames):
1467        for arcname, fixedname in hacknames:
1468            content = b'foobar' + arcname.encode()
1469            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1470                zinfo = zipfile.ZipInfo()
1471                # preserve backslashes
1472                zinfo.filename = arcname
1473                zinfo.external_attr = 0o600 << 16
1474                zipfp.writestr(zinfo, content)
1475
1476            arcname = arcname.replace(os.sep, "/")
1477            targetpath = os.path.join('target', 'subdir', 'subsub')
1478            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1479
1480            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1481                writtenfile = zipfp.extract(arcname, targetpath)
1482                self.assertEqual(writtenfile, correctfile,
1483                                 msg='extract %r: %r != %r' %
1484                                 (arcname, writtenfile, correctfile))
1485            self.check_file(correctfile, content)
1486            rmtree('target')
1487
1488            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1489                zipfp.extractall(targetpath)
1490            self.check_file(correctfile, content)
1491            rmtree('target')
1492
1493            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1494
1495            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1496                writtenfile = zipfp.extract(arcname)
1497                self.assertEqual(writtenfile, correctfile,
1498                                 msg="extract %r" % arcname)
1499            self.check_file(correctfile, content)
1500            rmtree(fixedname.split('/')[0])
1501
1502            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1503                zipfp.extractall()
1504            self.check_file(correctfile, content)
1505            rmtree(fixedname.split('/')[0])
1506
1507            unlink(TESTFN2)
1508
1509
1510class OtherTests(unittest.TestCase):
1511    def test_open_via_zip_info(self):
1512        # Create the ZIP archive
1513        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1514            zipfp.writestr("name", "foo")
1515            with self.assertWarns(UserWarning):
1516                zipfp.writestr("name", "bar")
1517            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1518
1519        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1520            infos = zipfp.infolist()
1521            data = b""
1522            for info in infos:
1523                with zipfp.open(info) as zipopen:
1524                    data += zipopen.read()
1525            self.assertIn(data, {b"foobar", b"barfoo"})
1526            data = b""
1527            for info in infos:
1528                data += zipfp.read(info)
1529            self.assertIn(data, {b"foobar", b"barfoo"})
1530
1531    def test_writestr_extended_local_header_issue1202(self):
1532        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1533            for data in 'abcdefghijklmnop':
1534                zinfo = zipfile.ZipInfo(data)
1535                zinfo.flag_bits |= 0x08  # Include an extended local header.
1536                orig_zip.writestr(zinfo, data)
1537
1538    def test_close(self):
1539        """Check that the zipfile is closed after the 'with' block."""
1540        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1541            for fpath, fdata in SMALL_TEST_DATA:
1542                zipfp.writestr(fpath, fdata)
1543                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1544        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1545
1546        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1547            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1548        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1549
1550    def test_close_on_exception(self):
1551        """Check that the zipfile is closed if an exception is raised in the
1552        'with' block."""
1553        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1554            for fpath, fdata in SMALL_TEST_DATA:
1555                zipfp.writestr(fpath, fdata)
1556
1557        try:
1558            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1559                raise zipfile.BadZipFile()
1560        except zipfile.BadZipFile:
1561            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1562
1563    def test_unsupported_version(self):
1564        # File has an extract_version of 120
1565        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1566                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1567                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1568                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1569                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1570
1571        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1572                          io.BytesIO(data), 'r')
1573
1574    @requires_zlib
1575    def test_read_unicode_filenames(self):
1576        # bug #10801
1577        fname = findfile('zip_cp437_header.zip')
1578        with zipfile.ZipFile(fname) as zipfp:
1579            for name in zipfp.namelist():
1580                zipfp.open(name).close()
1581
1582    def test_write_unicode_filenames(self):
1583        with zipfile.ZipFile(TESTFN, "w") as zf:
1584            zf.writestr("foo.txt", "Test for unicode filename")
1585            zf.writestr("\xf6.txt", "Test for unicode filename")
1586            self.assertIsInstance(zf.infolist()[0].filename, str)
1587
1588        with zipfile.ZipFile(TESTFN, "r") as zf:
1589            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1590            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1591
1592    def test_read_after_write_unicode_filenames(self):
1593        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1594            zipfp.writestr('приклад', b'sample')
1595            self.assertEqual(zipfp.read('приклад'), b'sample')
1596
1597    def test_exclusive_create_zip_file(self):
1598        """Test exclusive creating a new zipfile."""
1599        unlink(TESTFN2)
1600        filename = 'testfile.txt'
1601        content = b'hello, world. this is some content.'
1602        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1603            zipfp.writestr(filename, content)
1604        with self.assertRaises(FileExistsError):
1605            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1606        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1607            self.assertEqual(zipfp.namelist(), [filename])
1608            self.assertEqual(zipfp.read(filename), content)
1609
1610    def test_create_non_existent_file_for_append(self):
1611        if os.path.exists(TESTFN):
1612            os.unlink(TESTFN)
1613
1614        filename = 'testfile.txt'
1615        content = b'hello, world. this is some content.'
1616
1617        try:
1618            with zipfile.ZipFile(TESTFN, 'a') as zf:
1619                zf.writestr(filename, content)
1620        except OSError:
1621            self.fail('Could not append data to a non-existent zip file.')
1622
1623        self.assertTrue(os.path.exists(TESTFN))
1624
1625        with zipfile.ZipFile(TESTFN, 'r') as zf:
1626            self.assertEqual(zf.read(filename), content)
1627
1628    def test_close_erroneous_file(self):
1629        # This test checks that the ZipFile constructor closes the file object
1630        # it opens if there's an error in the file.  If it doesn't, the
1631        # traceback holds a reference to the ZipFile object and, indirectly,
1632        # the file object.
1633        # On Windows, this causes the os.unlink() call to fail because the
1634        # underlying file is still open.  This is SF bug #412214.
1635        #
1636        with open(TESTFN, "w") as fp:
1637            fp.write("this is not a legal zip file\n")
1638        try:
1639            zf = zipfile.ZipFile(TESTFN)
1640        except zipfile.BadZipFile:
1641            pass
1642
1643    def test_is_zip_erroneous_file(self):
1644        """Check that is_zipfile() correctly identifies non-zip files."""
1645        # - passing a filename
1646        with open(TESTFN, "w") as fp:
1647            fp.write("this is not a legal zip file\n")
1648        self.assertFalse(zipfile.is_zipfile(TESTFN))
1649        # - passing a path-like object
1650        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1651        # - passing a file object
1652        with open(TESTFN, "rb") as fp:
1653            self.assertFalse(zipfile.is_zipfile(fp))
1654        # - passing a file-like object
1655        fp = io.BytesIO()
1656        fp.write(b"this is not a legal zip file\n")
1657        self.assertFalse(zipfile.is_zipfile(fp))
1658        fp.seek(0, 0)
1659        self.assertFalse(zipfile.is_zipfile(fp))
1660
1661    def test_damaged_zipfile(self):
1662        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1663        # - Create a valid zip file
1664        fp = io.BytesIO()
1665        with zipfile.ZipFile(fp, mode="w") as zipf:
1666            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1667        zipfiledata = fp.getvalue()
1668
1669        # - Now create copies of it missing the last N bytes and make sure
1670        #   a BadZipFile exception is raised when we try to open it
1671        for N in range(len(zipfiledata)):
1672            fp = io.BytesIO(zipfiledata[:N])
1673            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1674
1675    def test_is_zip_valid_file(self):
1676        """Check that is_zipfile() correctly identifies zip files."""
1677        # - passing a filename
1678        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1679            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1680
1681        self.assertTrue(zipfile.is_zipfile(TESTFN))
1682        # - passing a file object
1683        with open(TESTFN, "rb") as fp:
1684            self.assertTrue(zipfile.is_zipfile(fp))
1685            fp.seek(0, 0)
1686            zip_contents = fp.read()
1687        # - passing a file-like object
1688        fp = io.BytesIO()
1689        fp.write(zip_contents)
1690        self.assertTrue(zipfile.is_zipfile(fp))
1691        fp.seek(0, 0)
1692        self.assertTrue(zipfile.is_zipfile(fp))
1693
1694    def test_non_existent_file_raises_OSError(self):
1695        # make sure we don't raise an AttributeError when a partially-constructed
1696        # ZipFile instance is finalized; this tests for regression on SF tracker
1697        # bug #403871.
1698
1699        # The bug we're testing for caused an AttributeError to be raised
1700        # when a ZipFile instance was created for a file that did not
1701        # exist; the .fp member was not initialized but was needed by the
1702        # __del__() method.  Since the AttributeError is in the __del__(),
1703        # it is ignored, but the user should be sufficiently annoyed by
1704        # the message on the output that regression will be noticed
1705        # quickly.
1706        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1707
1708    def test_empty_file_raises_BadZipFile(self):
1709        f = open(TESTFN, 'w')
1710        f.close()
1711        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1712
1713        with open(TESTFN, 'w') as fp:
1714            fp.write("short file")
1715        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1716
1717    def test_closed_zip_raises_ValueError(self):
1718        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1719        data = io.BytesIO()
1720        with zipfile.ZipFile(data, mode="w") as zipf:
1721            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1722
1723        # This is correct; calling .read on a closed ZipFile should raise
1724        # a ValueError, and so should calling .testzip.  An earlier
1725        # version of .testzip would swallow this exception (and any other)
1726        # and report that the first file in the archive was corrupt.
1727        self.assertRaises(ValueError, zipf.read, "foo.txt")
1728        self.assertRaises(ValueError, zipf.open, "foo.txt")
1729        self.assertRaises(ValueError, zipf.testzip)
1730        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1731        with open(TESTFN, 'w') as f:
1732            f.write('zipfile test data')
1733        self.assertRaises(ValueError, zipf.write, TESTFN)
1734
1735    def test_bad_constructor_mode(self):
1736        """Check that bad modes passed to ZipFile constructor are caught."""
1737        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1738
1739    def test_bad_open_mode(self):
1740        """Check that bad modes passed to ZipFile.open are caught."""
1741        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1742            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1743
1744        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1745            # read the data to make sure the file is there
1746            zipf.read("foo.txt")
1747            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1748            # universal newlines support is removed
1749            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1750            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1751
1752    def test_read0(self):
1753        """Check that calling read(0) on a ZipExtFile object returns an empty
1754        string and doesn't advance file pointer."""
1755        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1756            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1757            # read the data to make sure the file is there
1758            with zipf.open("foo.txt") as f:
1759                for i in range(FIXEDTEST_SIZE):
1760                    self.assertEqual(f.read(0), b'')
1761
1762                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1763
1764    def test_open_non_existent_item(self):
1765        """Check that attempting to call open() for an item that doesn't
1766        exist in the archive raises a RuntimeError."""
1767        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1768            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1769
1770    def test_bad_compression_mode(self):
1771        """Check that bad compression methods passed to ZipFile.open are
1772        caught."""
1773        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1774
1775    def test_unsupported_compression(self):
1776        # data is declared as shrunk, but actually deflated
1777        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1778                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1779                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1780                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1781                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1782                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1783        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1784            self.assertRaises(NotImplementedError, zipf.open, 'x')
1785
1786    def test_null_byte_in_filename(self):
1787        """Check that a filename containing a null byte is properly
1788        terminated."""
1789        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1790            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1791            self.assertEqual(zipf.namelist(), ['foo.txt'])
1792
1793    def test_struct_sizes(self):
1794        """Check that ZIP internal structure sizes are calculated correctly."""
1795        self.assertEqual(zipfile.sizeEndCentDir, 22)
1796        self.assertEqual(zipfile.sizeCentralDir, 46)
1797        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1798        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1799
1800    def test_comments(self):
1801        """Check that comments on the archive are handled properly."""
1802
1803        # check default comment is empty
1804        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1805            self.assertEqual(zipf.comment, b'')
1806            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1807
1808        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1809            self.assertEqual(zipfr.comment, b'')
1810
1811        # check a simple short comment
1812        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1813        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1814            zipf.comment = comment
1815            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1816        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1817            self.assertEqual(zipf.comment, comment)
1818
1819        # check a comment of max length
1820        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1821        comment2 = comment2.encode("ascii")
1822        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1823            zipf.comment = comment2
1824            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1825
1826        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1827            self.assertEqual(zipfr.comment, comment2)
1828
1829        # check a comment that is too long is truncated
1830        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1831            with self.assertWarns(UserWarning):
1832                zipf.comment = comment2 + b'oops'
1833            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1834        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1835            self.assertEqual(zipfr.comment, comment2)
1836
1837        # check that comments are correctly modified in append mode
1838        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1839            zipf.comment = b"original comment"
1840            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1841        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1842            zipf.comment = b"an updated comment"
1843        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1844            self.assertEqual(zipf.comment, b"an updated comment")
1845
1846        # check that comments are correctly shortened in append mode
1847        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1848            zipf.comment = b"original comment that's longer"
1849            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1850        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1851            zipf.comment = b"shorter comment"
1852        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1853            self.assertEqual(zipf.comment, b"shorter comment")
1854
1855    def test_unicode_comment(self):
1856        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1857            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1858            with self.assertRaises(TypeError):
1859                zipf.comment = "this is an error"
1860
1861    def test_change_comment_in_empty_archive(self):
1862        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1863            self.assertFalse(zipf.filelist)
1864            zipf.comment = b"this is a comment"
1865        with zipfile.ZipFile(TESTFN, "r") as zipf:
1866            self.assertEqual(zipf.comment, b"this is a comment")
1867
1868    def test_change_comment_in_nonempty_archive(self):
1869        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1870            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1871        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1872            self.assertTrue(zipf.filelist)
1873            zipf.comment = b"this is a comment"
1874        with zipfile.ZipFile(TESTFN, "r") as zipf:
1875            self.assertEqual(zipf.comment, b"this is a comment")
1876
1877    def test_empty_zipfile(self):
1878        # Check that creating a file in 'w' or 'a' mode and closing without
1879        # adding any files to the archives creates a valid empty ZIP file
1880        zipf = zipfile.ZipFile(TESTFN, mode="w")
1881        zipf.close()
1882        try:
1883            zipf = zipfile.ZipFile(TESTFN, mode="r")
1884        except zipfile.BadZipFile:
1885            self.fail("Unable to create empty ZIP file in 'w' mode")
1886
1887        zipf = zipfile.ZipFile(TESTFN, mode="a")
1888        zipf.close()
1889        try:
1890            zipf = zipfile.ZipFile(TESTFN, mode="r")
1891        except:
1892            self.fail("Unable to create empty ZIP file in 'a' mode")
1893
1894    def test_open_empty_file(self):
1895        # Issue 1710703: Check that opening a file with less than 22 bytes
1896        # raises a BadZipFile exception (rather than the previously unhelpful
1897        # OSError)
1898        f = open(TESTFN, 'w')
1899        f.close()
1900        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1901
1902    def test_create_zipinfo_before_1980(self):
1903        self.assertRaises(ValueError,
1904                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1905
1906    def test_zipfile_with_short_extra_field(self):
1907        """If an extra field in the header is less than 4 bytes, skip it."""
1908        zipdata = (
1909            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1910            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1911            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1912            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1913            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1914            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1915            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1916        )
1917        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1918            # testzip returns the name of the first corrupt file, or None
1919            self.assertIsNone(zipf.testzip())
1920
1921    def test_open_conflicting_handles(self):
1922        # It's only possible to open one writable file handle at a time
1923        msg1 = b"It's fun to charter an accountant!"
1924        msg2 = b"And sail the wide accountant sea"
1925        msg3 = b"To find, explore the funds offshore"
1926        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1927            with zipf.open('foo', mode='w') as w2:
1928                w2.write(msg1)
1929            with zipf.open('bar', mode='w') as w1:
1930                with self.assertRaises(ValueError):
1931                    zipf.open('handle', mode='w')
1932                with self.assertRaises(ValueError):
1933                    zipf.open('foo', mode='r')
1934                with self.assertRaises(ValueError):
1935                    zipf.writestr('str', 'abcde')
1936                with self.assertRaises(ValueError):
1937                    zipf.write(__file__, 'file')
1938                with self.assertRaises(ValueError):
1939                    zipf.close()
1940                w1.write(msg2)
1941            with zipf.open('baz', mode='w') as w2:
1942                w2.write(msg3)
1943
1944        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1945            self.assertEqual(zipf.read('foo'), msg1)
1946            self.assertEqual(zipf.read('bar'), msg2)
1947            self.assertEqual(zipf.read('baz'), msg3)
1948            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1949
1950    def test_seek_tell(self):
1951        # Test seek functionality
1952        txt = b"Where's Bruce?"
1953        bloc = txt.find(b"Bruce")
1954        # Check seek on a file
1955        with zipfile.ZipFile(TESTFN, "w") as zipf:
1956            zipf.writestr("foo.txt", txt)
1957        with zipfile.ZipFile(TESTFN, "r") as zipf:
1958            with zipf.open("foo.txt", "r") as fp:
1959                fp.seek(bloc, os.SEEK_SET)
1960                self.assertEqual(fp.tell(), bloc)
1961                fp.seek(-bloc, os.SEEK_CUR)
1962                self.assertEqual(fp.tell(), 0)
1963                fp.seek(bloc, os.SEEK_CUR)
1964                self.assertEqual(fp.tell(), bloc)
1965                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1966                fp.seek(0, os.SEEK_END)
1967                self.assertEqual(fp.tell(), len(txt))
1968                fp.seek(0, os.SEEK_SET)
1969                self.assertEqual(fp.tell(), 0)
1970        # Check seek on memory file
1971        data = io.BytesIO()
1972        with zipfile.ZipFile(data, mode="w") as zipf:
1973            zipf.writestr("foo.txt", txt)
1974        with zipfile.ZipFile(data, mode="r") as zipf:
1975            with zipf.open("foo.txt", "r") as fp:
1976                fp.seek(bloc, os.SEEK_SET)
1977                self.assertEqual(fp.tell(), bloc)
1978                fp.seek(-bloc, os.SEEK_CUR)
1979                self.assertEqual(fp.tell(), 0)
1980                fp.seek(bloc, os.SEEK_CUR)
1981                self.assertEqual(fp.tell(), bloc)
1982                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
1983                fp.seek(0, os.SEEK_END)
1984                self.assertEqual(fp.tell(), len(txt))
1985                fp.seek(0, os.SEEK_SET)
1986                self.assertEqual(fp.tell(), 0)
1987
1988    @requires_bz2
1989    def test_decompress_without_3rd_party_library(self):
1990        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1991        zip_file = io.BytesIO(data)
1992        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
1993            zf.writestr('a.txt', b'a')
1994        with mock.patch('zipfile.bz2', None):
1995            with zipfile.ZipFile(zip_file) as zf:
1996                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
1997
1998    def tearDown(self):
1999        unlink(TESTFN)
2000        unlink(TESTFN2)
2001
2002
2003class AbstractBadCrcTests:
2004    def test_testzip_with_bad_crc(self):
2005        """Tests that files with bad CRCs return their name from testzip."""
2006        zipdata = self.zip_with_bad_crc
2007
2008        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2009            # testzip returns the name of the first corrupt file, or None
2010            self.assertEqual('afile', zipf.testzip())
2011
2012    def test_read_with_bad_crc(self):
2013        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2014        zipdata = self.zip_with_bad_crc
2015
2016        # Using ZipFile.read()
2017        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2018            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2019
2020        # Using ZipExtFile.read()
2021        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2022            with zipf.open('afile', 'r') as corrupt_file:
2023                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2024
2025        # Same with small reads (in order to exercise the buffering logic)
2026        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2027            with zipf.open('afile', 'r') as corrupt_file:
2028                corrupt_file.MIN_READ_SIZE = 2
2029                with self.assertRaises(zipfile.BadZipFile):
2030                    while corrupt_file.read(2):
2031                        pass
2032
2033
2034class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2035    compression = zipfile.ZIP_STORED
2036    zip_with_bad_crc = (
2037        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2038        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2039        b'ilehello,AworldP'
2040        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2041        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2042        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2043        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2044        b'\0\0/\0\0\0\0\0')
2045
2046@requires_zlib
2047class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2048    compression = zipfile.ZIP_DEFLATED
2049    zip_with_bad_crc = (
2050        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2051        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2052        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2053        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2054        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2055        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2056        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2057        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2058
2059@requires_bz2
2060class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2061    compression = zipfile.ZIP_BZIP2
2062    zip_with_bad_crc = (
2063        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2064        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2065        b'ileBZh91AY&SY\xd4\xa8\xca'
2066        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2067        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2068        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2069        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2070        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2071        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2072        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2073        b'\x00\x00\x00\x00')
2074
2075@requires_lzma
2076class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2077    compression = zipfile.ZIP_LZMA
2078    zip_with_bad_crc = (
2079        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2080        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2081        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2082        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2083        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2084        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2085        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2086        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2087        b'\x00>\x00\x00\x00\x00\x00')
2088
2089
2090class DecryptionTests(unittest.TestCase):
2091    """Check that ZIP decryption works. Since the library does not
2092    support encryption at the moment, we use a pre-generated encrypted
2093    ZIP file."""
2094
2095    data = (
2096        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2097        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2098        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2099        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2100        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2101        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2102        b'\x00\x00L\x00\x00\x00\x00\x00' )
2103    data2 = (
2104        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2105        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2106        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2107        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2108        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2109        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2110        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2111        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2112
2113    plain = b'zipfile.py encryption test'
2114    plain2 = b'\x00'*512
2115
2116    def setUp(self):
2117        with open(TESTFN, "wb") as fp:
2118            fp.write(self.data)
2119        self.zip = zipfile.ZipFile(TESTFN, "r")
2120        with open(TESTFN2, "wb") as fp:
2121            fp.write(self.data2)
2122        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2123
2124    def tearDown(self):
2125        self.zip.close()
2126        os.unlink(TESTFN)
2127        self.zip2.close()
2128        os.unlink(TESTFN2)
2129
2130    def test_no_password(self):
2131        # Reading the encrypted file without password
2132        # must generate a RunTime exception
2133        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2134        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2135
2136    def test_bad_password(self):
2137        self.zip.setpassword(b"perl")
2138        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2139        self.zip2.setpassword(b"perl")
2140        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2141
2142    @requires_zlib
2143    def test_good_password(self):
2144        self.zip.setpassword(b"python")
2145        self.assertEqual(self.zip.read("test.txt"), self.plain)
2146        self.zip2.setpassword(b"12345")
2147        self.assertEqual(self.zip2.read("zero"), self.plain2)
2148
2149    def test_unicode_password(self):
2150        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2151        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2152        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2153        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2154
2155    def test_seek_tell(self):
2156        self.zip.setpassword(b"python")
2157        txt = self.plain
2158        test_word = b'encryption'
2159        bloc = txt.find(test_word)
2160        bloc_len = len(test_word)
2161        with self.zip.open("test.txt", "r") as fp:
2162            fp.seek(bloc, os.SEEK_SET)
2163            self.assertEqual(fp.tell(), bloc)
2164            fp.seek(-bloc, os.SEEK_CUR)
2165            self.assertEqual(fp.tell(), 0)
2166            fp.seek(bloc, os.SEEK_CUR)
2167            self.assertEqual(fp.tell(), bloc)
2168            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2169
2170            # Make sure that the second read after seeking back beyond
2171            # _readbuffer returns the same content (ie. rewind to the start of
2172            # the file to read forward to the required position).
2173            old_read_size = fp.MIN_READ_SIZE
2174            fp.MIN_READ_SIZE = 1
2175            fp._readbuffer = b''
2176            fp._offset = 0
2177            fp.seek(0, os.SEEK_SET)
2178            self.assertEqual(fp.tell(), 0)
2179            fp.seek(bloc, os.SEEK_CUR)
2180            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2181            fp.MIN_READ_SIZE = old_read_size
2182
2183            fp.seek(0, os.SEEK_END)
2184            self.assertEqual(fp.tell(), len(txt))
2185            fp.seek(0, os.SEEK_SET)
2186            self.assertEqual(fp.tell(), 0)
2187
2188            # Read the file completely to definitely call any eof integrity
2189            # checks (crc) and make sure they still pass.
2190            fp.read()
2191
2192
2193class AbstractTestsWithRandomBinaryFiles:
2194    @classmethod
2195    def setUpClass(cls):
2196        datacount = randint(16, 64)*1024 + randint(1, 1024)
2197        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2198                            for i in range(datacount))
2199
2200    def setUp(self):
2201        # Make a source file with some lines
2202        with open(TESTFN, "wb") as fp:
2203            fp.write(self.data)
2204
2205    def tearDown(self):
2206        unlink(TESTFN)
2207        unlink(TESTFN2)
2208
2209    def make_test_archive(self, f, compression):
2210        # Create the ZIP archive
2211        with zipfile.ZipFile(f, "w", compression) as zipfp:
2212            zipfp.write(TESTFN, "another.name")
2213            zipfp.write(TESTFN, TESTFN)
2214
2215    def zip_test(self, f, compression):
2216        self.make_test_archive(f, compression)
2217
2218        # Read the ZIP archive
2219        with zipfile.ZipFile(f, "r", compression) as zipfp:
2220            testdata = zipfp.read(TESTFN)
2221            self.assertEqual(len(testdata), len(self.data))
2222            self.assertEqual(testdata, self.data)
2223            self.assertEqual(zipfp.read("another.name"), self.data)
2224
2225    def test_read(self):
2226        for f in get_files(self):
2227            self.zip_test(f, self.compression)
2228
2229    def zip_open_test(self, f, compression):
2230        self.make_test_archive(f, compression)
2231
2232        # Read the ZIP archive
2233        with zipfile.ZipFile(f, "r", compression) as zipfp:
2234            zipdata1 = []
2235            with zipfp.open(TESTFN) as zipopen1:
2236                while True:
2237                    read_data = zipopen1.read(256)
2238                    if not read_data:
2239                        break
2240                    zipdata1.append(read_data)
2241
2242            zipdata2 = []
2243            with zipfp.open("another.name") as zipopen2:
2244                while True:
2245                    read_data = zipopen2.read(256)
2246                    if not read_data:
2247                        break
2248                    zipdata2.append(read_data)
2249
2250            testdata1 = b''.join(zipdata1)
2251            self.assertEqual(len(testdata1), len(self.data))
2252            self.assertEqual(testdata1, self.data)
2253
2254            testdata2 = b''.join(zipdata2)
2255            self.assertEqual(len(testdata2), len(self.data))
2256            self.assertEqual(testdata2, self.data)
2257
2258    def test_open(self):
2259        for f in get_files(self):
2260            self.zip_open_test(f, self.compression)
2261
2262    def zip_random_open_test(self, f, compression):
2263        self.make_test_archive(f, compression)
2264
2265        # Read the ZIP archive
2266        with zipfile.ZipFile(f, "r", compression) as zipfp:
2267            zipdata1 = []
2268            with zipfp.open(TESTFN) as zipopen1:
2269                while True:
2270                    read_data = zipopen1.read(randint(1, 1024))
2271                    if not read_data:
2272                        break
2273                    zipdata1.append(read_data)
2274
2275            testdata = b''.join(zipdata1)
2276            self.assertEqual(len(testdata), len(self.data))
2277            self.assertEqual(testdata, self.data)
2278
2279    def test_random_open(self):
2280        for f in get_files(self):
2281            self.zip_random_open_test(f, self.compression)
2282
2283
2284class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2285                                       unittest.TestCase):
2286    compression = zipfile.ZIP_STORED
2287
2288@requires_zlib
2289class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2290                                        unittest.TestCase):
2291    compression = zipfile.ZIP_DEFLATED
2292
2293@requires_bz2
2294class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2295                                      unittest.TestCase):
2296    compression = zipfile.ZIP_BZIP2
2297
2298@requires_lzma
2299class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2300                                     unittest.TestCase):
2301    compression = zipfile.ZIP_LZMA
2302
2303
2304# Provide the tell() method but not seek()
2305class Tellable:
2306    def __init__(self, fp):
2307        self.fp = fp
2308        self.offset = 0
2309
2310    def write(self, data):
2311        n = self.fp.write(data)
2312        self.offset += n
2313        return n
2314
2315    def tell(self):
2316        return self.offset
2317
2318    def flush(self):
2319        self.fp.flush()
2320
2321class Unseekable:
2322    def __init__(self, fp):
2323        self.fp = fp
2324
2325    def write(self, data):
2326        return self.fp.write(data)
2327
2328    def flush(self):
2329        self.fp.flush()
2330
2331class UnseekableTests(unittest.TestCase):
2332    def test_writestr(self):
2333        for wrapper in (lambda f: f), Tellable, Unseekable:
2334            with self.subTest(wrapper=wrapper):
2335                f = io.BytesIO()
2336                f.write(b'abc')
2337                bf = io.BufferedWriter(f)
2338                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2339                    zipfp.writestr('ones', b'111')
2340                    zipfp.writestr('twos', b'222')
2341                self.assertEqual(f.getvalue()[:5], b'abcPK')
2342                with zipfile.ZipFile(f, mode='r') as zipf:
2343                    with zipf.open('ones') as zopen:
2344                        self.assertEqual(zopen.read(), b'111')
2345                    with zipf.open('twos') as zopen:
2346                        self.assertEqual(zopen.read(), b'222')
2347
2348    def test_write(self):
2349        for wrapper in (lambda f: f), Tellable, Unseekable:
2350            with self.subTest(wrapper=wrapper):
2351                f = io.BytesIO()
2352                f.write(b'abc')
2353                bf = io.BufferedWriter(f)
2354                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2355                    self.addCleanup(unlink, TESTFN)
2356                    with open(TESTFN, 'wb') as f2:
2357                        f2.write(b'111')
2358                    zipfp.write(TESTFN, 'ones')
2359                    with open(TESTFN, 'wb') as f2:
2360                        f2.write(b'222')
2361                    zipfp.write(TESTFN, 'twos')
2362                self.assertEqual(f.getvalue()[:5], b'abcPK')
2363                with zipfile.ZipFile(f, mode='r') as zipf:
2364                    with zipf.open('ones') as zopen:
2365                        self.assertEqual(zopen.read(), b'111')
2366                    with zipf.open('twos') as zopen:
2367                        self.assertEqual(zopen.read(), b'222')
2368
2369    def test_open_write(self):
2370        for wrapper in (lambda f: f), Tellable, Unseekable:
2371            with self.subTest(wrapper=wrapper):
2372                f = io.BytesIO()
2373                f.write(b'abc')
2374                bf = io.BufferedWriter(f)
2375                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2376                    with zipf.open('ones', 'w') as zopen:
2377                        zopen.write(b'111')
2378                    with zipf.open('twos', 'w') as zopen:
2379                        zopen.write(b'222')
2380                self.assertEqual(f.getvalue()[:5], b'abcPK')
2381                with zipfile.ZipFile(f) as zipf:
2382                    self.assertEqual(zipf.read('ones'), b'111')
2383                    self.assertEqual(zipf.read('twos'), b'222')
2384
2385
2386@requires_zlib
2387class TestsWithMultipleOpens(unittest.TestCase):
2388    @classmethod
2389    def setUpClass(cls):
2390        cls.data1 = b'111' + getrandbytes(10000)
2391        cls.data2 = b'222' + getrandbytes(10000)
2392
2393    def make_test_archive(self, f):
2394        # Create the ZIP archive
2395        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2396            zipfp.writestr('ones', self.data1)
2397            zipfp.writestr('twos', self.data2)
2398
2399    def test_same_file(self):
2400        # Verify that (when the ZipFile is in control of creating file objects)
2401        # multiple open() calls can be made without interfering with each other.
2402        for f in get_files(self):
2403            self.make_test_archive(f)
2404            with zipfile.ZipFile(f, mode="r") as zipf:
2405                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2406                    data1 = zopen1.read(500)
2407                    data2 = zopen2.read(500)
2408                    data1 += zopen1.read()
2409                    data2 += zopen2.read()
2410                self.assertEqual(data1, data2)
2411                self.assertEqual(data1, self.data1)
2412
2413    def test_different_file(self):
2414        # Verify that (when the ZipFile is in control of creating file objects)
2415        # multiple open() calls can be made without interfering with each other.
2416        for f in get_files(self):
2417            self.make_test_archive(f)
2418            with zipfile.ZipFile(f, mode="r") as zipf:
2419                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2420                    data1 = zopen1.read(500)
2421                    data2 = zopen2.read(500)
2422                    data1 += zopen1.read()
2423                    data2 += zopen2.read()
2424                self.assertEqual(data1, self.data1)
2425                self.assertEqual(data2, self.data2)
2426
2427    def test_interleaved(self):
2428        # Verify that (when the ZipFile is in control of creating file objects)
2429        # multiple open() calls can be made without interfering with each other.
2430        for f in get_files(self):
2431            self.make_test_archive(f)
2432            with zipfile.ZipFile(f, mode="r") as zipf:
2433                with zipf.open('ones') as zopen1:
2434                    data1 = zopen1.read(500)
2435                    with zipf.open('twos') as zopen2:
2436                        data2 = zopen2.read(500)
2437                        data1 += zopen1.read()
2438                        data2 += zopen2.read()
2439                self.assertEqual(data1, self.data1)
2440                self.assertEqual(data2, self.data2)
2441
2442    def test_read_after_close(self):
2443        for f in get_files(self):
2444            self.make_test_archive(f)
2445            with contextlib.ExitStack() as stack:
2446                with zipfile.ZipFile(f, 'r') as zipf:
2447                    zopen1 = stack.enter_context(zipf.open('ones'))
2448                    zopen2 = stack.enter_context(zipf.open('twos'))
2449                data1 = zopen1.read(500)
2450                data2 = zopen2.read(500)
2451                data1 += zopen1.read()
2452                data2 += zopen2.read()
2453            self.assertEqual(data1, self.data1)
2454            self.assertEqual(data2, self.data2)
2455
2456    def test_read_after_write(self):
2457        for f in get_files(self):
2458            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2459                zipf.writestr('ones', self.data1)
2460                zipf.writestr('twos', self.data2)
2461                with zipf.open('ones') as zopen1:
2462                    data1 = zopen1.read(500)
2463            self.assertEqual(data1, self.data1[:500])
2464            with zipfile.ZipFile(f, 'r') as zipf:
2465                data1 = zipf.read('ones')
2466                data2 = zipf.read('twos')
2467            self.assertEqual(data1, self.data1)
2468            self.assertEqual(data2, self.data2)
2469
2470    def test_write_after_read(self):
2471        for f in get_files(self):
2472            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2473                zipf.writestr('ones', self.data1)
2474                with zipf.open('ones') as zopen1:
2475                    zopen1.read(500)
2476                    zipf.writestr('twos', self.data2)
2477            with zipfile.ZipFile(f, 'r') as zipf:
2478                data1 = zipf.read('ones')
2479                data2 = zipf.read('twos')
2480            self.assertEqual(data1, self.data1)
2481            self.assertEqual(data2, self.data2)
2482
2483    def test_many_opens(self):
2484        # Verify that read() and open() promptly close the file descriptor,
2485        # and don't rely on the garbage collector to free resources.
2486        self.make_test_archive(TESTFN2)
2487        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2488            for x in range(100):
2489                zipf.read('ones')
2490                with zipf.open('ones') as zopen1:
2491                    pass
2492        with open(os.devnull) as f:
2493            self.assertLess(f.fileno(), 100)
2494
2495    def test_write_while_reading(self):
2496        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2497            zipf.writestr('ones', self.data1)
2498        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2499            with zipf.open('ones', 'r') as r1:
2500                data1 = r1.read(500)
2501                with zipf.open('twos', 'w') as w1:
2502                    w1.write(self.data2)
2503                data1 += r1.read()
2504        self.assertEqual(data1, self.data1)
2505        with zipfile.ZipFile(TESTFN2) as zipf:
2506            self.assertEqual(zipf.read('twos'), self.data2)
2507
2508    def tearDown(self):
2509        unlink(TESTFN2)
2510
2511
2512class TestWithDirectory(unittest.TestCase):
2513    def setUp(self):
2514        os.mkdir(TESTFN2)
2515
2516    def test_extract_dir(self):
2517        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2518            zipf.extractall(TESTFN2)
2519        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2520        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2521        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2522
2523    def test_bug_6050(self):
2524        # Extraction should succeed if directories already exist
2525        os.mkdir(os.path.join(TESTFN2, "a"))
2526        self.test_extract_dir()
2527
2528    def test_write_dir(self):
2529        dirpath = os.path.join(TESTFN2, "x")
2530        os.mkdir(dirpath)
2531        mode = os.stat(dirpath).st_mode & 0xFFFF
2532        with zipfile.ZipFile(TESTFN, "w") as zipf:
2533            zipf.write(dirpath)
2534            zinfo = zipf.filelist[0]
2535            self.assertTrue(zinfo.filename.endswith("/x/"))
2536            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2537            zipf.write(dirpath, "y")
2538            zinfo = zipf.filelist[1]
2539            self.assertTrue(zinfo.filename, "y/")
2540            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2541        with zipfile.ZipFile(TESTFN, "r") as zipf:
2542            zinfo = zipf.filelist[0]
2543            self.assertTrue(zinfo.filename.endswith("/x/"))
2544            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2545            zinfo = zipf.filelist[1]
2546            self.assertTrue(zinfo.filename, "y/")
2547            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2548            target = os.path.join(TESTFN2, "target")
2549            os.mkdir(target)
2550            zipf.extractall(target)
2551            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2552            self.assertEqual(len(os.listdir(target)), 2)
2553
2554    def test_writestr_dir(self):
2555        os.mkdir(os.path.join(TESTFN2, "x"))
2556        with zipfile.ZipFile(TESTFN, "w") as zipf:
2557            zipf.writestr("x/", b'')
2558            zinfo = zipf.filelist[0]
2559            self.assertEqual(zinfo.filename, "x/")
2560            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2561        with zipfile.ZipFile(TESTFN, "r") as zipf:
2562            zinfo = zipf.filelist[0]
2563            self.assertTrue(zinfo.filename.endswith("x/"))
2564            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2565            target = os.path.join(TESTFN2, "target")
2566            os.mkdir(target)
2567            zipf.extractall(target)
2568            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2569            self.assertEqual(os.listdir(target), ["x"])
2570
2571    def tearDown(self):
2572        rmtree(TESTFN2)
2573        if os.path.exists(TESTFN):
2574            unlink(TESTFN)
2575
2576
2577class ZipInfoTests(unittest.TestCase):
2578    def test_from_file(self):
2579        zi = zipfile.ZipInfo.from_file(__file__)
2580        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2581        self.assertFalse(zi.is_dir())
2582        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2583
2584    def test_from_file_pathlike(self):
2585        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2586        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2587        self.assertFalse(zi.is_dir())
2588        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2589
2590    def test_from_file_bytes(self):
2591        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2592        self.assertEqual(posixpath.basename(zi.filename), 'test')
2593        self.assertFalse(zi.is_dir())
2594        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2595
2596    def test_from_file_fileno(self):
2597        with open(__file__, 'rb') as f:
2598            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2599            self.assertEqual(posixpath.basename(zi.filename), 'test')
2600            self.assertFalse(zi.is_dir())
2601            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2602
2603    def test_from_dir(self):
2604        dirpath = os.path.dirname(os.path.abspath(__file__))
2605        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2606        self.assertEqual(zi.filename, 'stdlib_tests/')
2607        self.assertTrue(zi.is_dir())
2608        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2609        self.assertEqual(zi.file_size, 0)
2610
2611
2612class CommandLineTest(unittest.TestCase):
2613
2614    def zipfilecmd(self, *args, **kwargs):
2615        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2616                                                      **kwargs)
2617        return out.replace(os.linesep.encode(), b'\n')
2618
2619    def zipfilecmd_failure(self, *args):
2620        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2621
2622    def test_bad_use(self):
2623        rc, out, err = self.zipfilecmd_failure()
2624        self.assertEqual(out, b'')
2625        self.assertIn(b'usage', err.lower())
2626        self.assertIn(b'error', err.lower())
2627        self.assertIn(b'required', err.lower())
2628        rc, out, err = self.zipfilecmd_failure('-l', '')
2629        self.assertEqual(out, b'')
2630        self.assertNotEqual(err.strip(), b'')
2631
2632    def test_test_command(self):
2633        zip_name = findfile('zipdir.zip')
2634        for opt in '-t', '--test':
2635            out = self.zipfilecmd(opt, zip_name)
2636            self.assertEqual(out.rstrip(), b'Done testing')
2637        zip_name = findfile('testtar.tar')
2638        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2639        self.assertEqual(out, b'')
2640
2641    def test_list_command(self):
2642        zip_name = findfile('zipdir.zip')
2643        t = io.StringIO()
2644        with zipfile.ZipFile(zip_name, 'r') as tf:
2645            tf.printdir(t)
2646        expected = t.getvalue().encode('ascii', 'backslashreplace')
2647        for opt in '-l', '--list':
2648            out = self.zipfilecmd(opt, zip_name,
2649                                  PYTHONIOENCODING='ascii:backslashreplace')
2650            self.assertEqual(out, expected)
2651
2652    @requires_zlib
2653    def test_create_command(self):
2654        self.addCleanup(unlink, TESTFN)
2655        with open(TESTFN, 'w') as f:
2656            f.write('test 1')
2657        os.mkdir(TESTFNDIR)
2658        self.addCleanup(rmtree, TESTFNDIR)
2659        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
2660            f.write('test 2')
2661        files = [TESTFN, TESTFNDIR]
2662        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2663        for opt in '-c', '--create':
2664            try:
2665                out = self.zipfilecmd(opt, TESTFN2, *files)
2666                self.assertEqual(out, b'')
2667                with zipfile.ZipFile(TESTFN2) as zf:
2668                    self.assertEqual(zf.namelist(), namelist)
2669                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2670                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2671            finally:
2672                unlink(TESTFN2)
2673
2674    def test_extract_command(self):
2675        zip_name = findfile('zipdir.zip')
2676        for opt in '-e', '--extract':
2677            with temp_dir() as extdir:
2678                out = self.zipfilecmd(opt, zip_name, extdir)
2679                self.assertEqual(out, b'')
2680                with zipfile.ZipFile(zip_name) as zf:
2681                    for zi in zf.infolist():
2682                        path = os.path.join(extdir,
2683                                    zi.filename.replace('/', os.sep))
2684                        if zi.is_dir():
2685                            self.assertTrue(os.path.isdir(path))
2686                        else:
2687                            self.assertTrue(os.path.isfile(path))
2688                            with open(path, 'rb') as f:
2689                                self.assertEqual(f.read(), zf.read(zi))
2690
2691
2692class TestExecutablePrependedZip(unittest.TestCase):
2693    """Test our ability to open zip files with an executable prepended."""
2694
2695    def setUp(self):
2696        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2697        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2698
2699    def _test_zip_works(self, name):
2700        # bpo28494 sanity check: ensure is_zipfile works on these.
2701        self.assertTrue(zipfile.is_zipfile(name),
2702                        f'is_zipfile failed on {name}')
2703        # Ensure we can operate on these via ZipFile.
2704        with zipfile.ZipFile(name) as zipfp:
2705            for n in zipfp.namelist():
2706                data = zipfp.read(n)
2707                self.assertIn(b'FAVORITE_NUMBER', data)
2708
2709    def test_read_zip_with_exe_prepended(self):
2710        self._test_zip_works(self.exe_zip)
2711
2712    def test_read_zip64_with_exe_prepended(self):
2713        self._test_zip_works(self.exe_zip64)
2714
2715    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2716    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2717                         'Test relies on #!/bin/bash working.')
2718    def test_execute_zip2(self):
2719        output = subprocess.check_output([self.exe_zip, sys.executable])
2720        self.assertIn(b'number in executable: 5', output)
2721
2722    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2723    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2724                         'Test relies on #!/bin/bash working.')
2725    def test_execute_zip64(self):
2726        output = subprocess.check_output([self.exe_zip64, sys.executable])
2727        self.assertIn(b'number in executable: 5', output)
2728
2729
2730# Poor man's technique to consume a (smallish) iterable.
2731consume = tuple
2732
2733
2734# from jaraco.itertools 5.0
2735class jaraco:
2736    class itertools:
2737        class Counter:
2738            def __init__(self, i):
2739                self.count = 0
2740                self._orig_iter = iter(i)
2741
2742            def __iter__(self):
2743                return self
2744
2745            def __next__(self):
2746                result = next(self._orig_iter)
2747                self.count += 1
2748                return result
2749
2750
2751def add_dirs(zf):
2752    """
2753    Given a writable zip file zf, inject directory entries for
2754    any directories implied by the presence of children.
2755    """
2756    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2757        zf.writestr(name, b"")
2758    return zf
2759
2760
2761def build_alpharep_fixture():
2762    """
2763    Create a zip file with this structure:
2764
2765    .
2766    ├── a.txt
2767    ├── b
2768    │   ├── c.txt
2769    │   ├── d
2770    │   │   └── e.txt
2771    │   └── f.txt
2772    └── g
2773        └── h
2774            └── i.txt
2775
2776    This fixture has the following key characteristics:
2777
2778    - a file at the root (a)
2779    - a file two levels deep (b/d/e)
2780    - multiple files in a directory (b/c, b/f)
2781    - a directory containing only a directory (g/h)
2782
2783    "alpha" because it uses alphabet
2784    "rep" because it's a representative example
2785    """
2786    data = io.BytesIO()
2787    zf = zipfile.ZipFile(data, "w")
2788    zf.writestr("a.txt", b"content of a")
2789    zf.writestr("b/c.txt", b"content of c")
2790    zf.writestr("b/d/e.txt", b"content of e")
2791    zf.writestr("b/f.txt", b"content of f")
2792    zf.writestr("g/h/i.txt", b"content of i")
2793    zf.filename = "alpharep.zip"
2794    return zf
2795
2796
2797class TestPath(unittest.TestCase):
2798    def setUp(self):
2799        self.fixtures = contextlib.ExitStack()
2800        self.addCleanup(self.fixtures.close)
2801
2802    def zipfile_alpharep(self):
2803        with self.subTest():
2804            yield build_alpharep_fixture()
2805        with self.subTest():
2806            yield add_dirs(build_alpharep_fixture())
2807
2808    def zipfile_ondisk(self):
2809        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2810        for alpharep in self.zipfile_alpharep():
2811            buffer = alpharep.fp
2812            alpharep.close()
2813            path = tmpdir / alpharep.filename
2814            with path.open("wb") as strm:
2815                strm.write(buffer.getvalue())
2816            yield path
2817
2818    def test_iterdir_and_types(self):
2819        for alpharep in self.zipfile_alpharep():
2820            root = zipfile.Path(alpharep)
2821            assert root.is_dir()
2822            a, b, g = root.iterdir()
2823            assert a.is_file()
2824            assert b.is_dir()
2825            assert g.is_dir()
2826            c, f, d = b.iterdir()
2827            assert c.is_file() and f.is_file()
2828            e, = d.iterdir()
2829            assert e.is_file()
2830            h, = g.iterdir()
2831            i, = h.iterdir()
2832            assert i.is_file()
2833
2834    def test_subdir_is_dir(self):
2835        for alpharep in self.zipfile_alpharep():
2836            root = zipfile.Path(alpharep)
2837            assert (root / 'b').is_dir()
2838            assert (root / 'b/').is_dir()
2839            assert (root / 'g').is_dir()
2840            assert (root / 'g/').is_dir()
2841
2842    def test_open(self):
2843        for alpharep in self.zipfile_alpharep():
2844            root = zipfile.Path(alpharep)
2845            a, b, g = root.iterdir()
2846            with a.open() as strm:
2847                data = strm.read()
2848            assert data == b"content of a"
2849
2850    def test_read(self):
2851        for alpharep in self.zipfile_alpharep():
2852            root = zipfile.Path(alpharep)
2853            a, b, g = root.iterdir()
2854            assert a.read_text() == "content of a"
2855            assert a.read_bytes() == b"content of a"
2856
2857    def test_joinpath(self):
2858        for alpharep in self.zipfile_alpharep():
2859            root = zipfile.Path(alpharep)
2860            a = root.joinpath("a")
2861            assert a.is_file()
2862            e = root.joinpath("b").joinpath("d").joinpath("e.txt")
2863            assert e.read_text() == "content of e"
2864
2865    def test_traverse_truediv(self):
2866        for alpharep in self.zipfile_alpharep():
2867            root = zipfile.Path(alpharep)
2868            a = root / "a"
2869            assert a.is_file()
2870            e = root / "b" / "d" / "e.txt"
2871            assert e.read_text() == "content of e"
2872
2873    def test_pathlike_construction(self):
2874        """
2875        zipfile.Path should be constructable from a path-like object
2876        """
2877        for zipfile_ondisk in self.zipfile_ondisk():
2878            pathlike = pathlib.Path(str(zipfile_ondisk))
2879            zipfile.Path(pathlike)
2880
2881    def test_traverse_pathlike(self):
2882        for alpharep in self.zipfile_alpharep():
2883            root = zipfile.Path(alpharep)
2884            root / pathlib.Path("a")
2885
2886    def test_parent(self):
2887        for alpharep in self.zipfile_alpharep():
2888            root = zipfile.Path(alpharep)
2889            assert (root / 'a').parent.at == ''
2890            assert (root / 'a' / 'b').parent.at == 'a/'
2891
2892    def test_dir_parent(self):
2893        for alpharep in self.zipfile_alpharep():
2894            root = zipfile.Path(alpharep)
2895            assert (root / 'b').parent.at == ''
2896            assert (root / 'b/').parent.at == ''
2897
2898    def test_missing_dir_parent(self):
2899        for alpharep in self.zipfile_alpharep():
2900            root = zipfile.Path(alpharep)
2901            assert (root / 'missing dir/').parent.at == ''
2902
2903    def test_mutability(self):
2904        """
2905        If the underlying zipfile is changed, the Path object should
2906        reflect that change.
2907        """
2908        for alpharep in self.zipfile_alpharep():
2909            root = zipfile.Path(alpharep)
2910            a, b, g = root.iterdir()
2911            alpharep.writestr('foo.txt', 'foo')
2912            alpharep.writestr('bar/baz.txt', 'baz')
2913            assert any(
2914                child.name == 'foo.txt'
2915                for child in root.iterdir())
2916            assert (root / 'foo.txt').read_text() == 'foo'
2917            baz, = (root / 'bar').iterdir()
2918            assert baz.read_text() == 'baz'
2919
2920    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
2921
2922    def huge_zipfile(self):
2923        """Create a read-only zipfile with a huge number of entries entries."""
2924        strm = io.BytesIO()
2925        zf = zipfile.ZipFile(strm, "w")
2926        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
2927            zf.writestr(entry, entry)
2928        zf.mode = 'r'
2929        return zf
2930
2931    def test_joinpath_constant_time(self):
2932        """
2933        Ensure joinpath on items in zipfile is linear time.
2934        """
2935        root = zipfile.Path(self.huge_zipfile())
2936        entries = jaraco.itertools.Counter(root.iterdir())
2937        for entry in entries:
2938            entry.joinpath('suffix')
2939        # Check the file iterated all items
2940        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
2941
2942    # @func_timeout.func_set_timeout(3)
2943    def test_implied_dirs_performance(self):
2944        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
2945        zipfile.CompleteDirs._implied_dirs(data)
2946
2947
2948if __name__ == "__main__":
2949    unittest.main()
2950