• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import importlib.util
3import io
4import itertools
5import os
6import pathlib
7import posixpath
8import string
9import struct
10import subprocess
11import sys
12import time
13import unittest
14import unittest.mock as mock
15import zipfile
16import functools
17
18
19from tempfile import TemporaryFile
20from random import randint, random, randbytes
21
22from test.support import script_helper
23from test.support import (findfile, requires_zlib, requires_bz2,
24                          requires_lzma, captured_stdout)
25from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd
26
27
28TESTFN2 = TESTFN + "2"
29TESTFNDIR = TESTFN + "d"
30FIXEDTEST_SIZE = 1000
31DATAFILES_DIR = 'zipfile_datafiles'
32
33SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
34                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
35                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
36                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
37
38def get_files(test):
39    yield TESTFN2
40    with TemporaryFile() as f:
41        yield f
42        test.assertFalse(f.closed)
43    with io.BytesIO() as f:
44        yield f
45        test.assertFalse(f.closed)
46
47class AbstractTestsWithSourceFile:
48    @classmethod
49    def setUpClass(cls):
50        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
51                              (i, random()), "ascii")
52                        for i in range(FIXEDTEST_SIZE)]
53        cls.data = b''.join(cls.line_gen)
54
55    def setUp(self):
56        # Make a source file with some lines
57        with open(TESTFN, "wb") as fp:
58            fp.write(self.data)
59
60    def make_test_archive(self, f, compression, compresslevel=None):
61        kwargs = {'compression': compression, 'compresslevel': compresslevel}
62        # Create the ZIP archive
63        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
64            zipfp.write(TESTFN, "another.name")
65            zipfp.write(TESTFN, TESTFN)
66            zipfp.writestr("strfile", self.data)
67            with zipfp.open('written-open-w', mode='w') as f:
68                for line in self.line_gen:
69                    f.write(line)
70
71    def zip_test(self, f, compression, compresslevel=None):
72        self.make_test_archive(f, compression, compresslevel)
73
74        # Read the ZIP archive
75        with zipfile.ZipFile(f, "r", compression) as zipfp:
76            self.assertEqual(zipfp.read(TESTFN), self.data)
77            self.assertEqual(zipfp.read("another.name"), self.data)
78            self.assertEqual(zipfp.read("strfile"), self.data)
79
80            # Print the ZIP directory
81            fp = io.StringIO()
82            zipfp.printdir(file=fp)
83            directory = fp.getvalue()
84            lines = directory.splitlines()
85            self.assertEqual(len(lines), 5) # Number of files + header
86
87            self.assertIn('File Name', lines[0])
88            self.assertIn('Modified', lines[0])
89            self.assertIn('Size', lines[0])
90
91            fn, date, time_, size = lines[1].split()
92            self.assertEqual(fn, 'another.name')
93            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
94            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
95            self.assertEqual(size, str(len(self.data)))
96
97            # Check the namelist
98            names = zipfp.namelist()
99            self.assertEqual(len(names), 4)
100            self.assertIn(TESTFN, names)
101            self.assertIn("another.name", names)
102            self.assertIn("strfile", names)
103            self.assertIn("written-open-w", names)
104
105            # Check infolist
106            infos = zipfp.infolist()
107            names = [i.filename for i in infos]
108            self.assertEqual(len(names), 4)
109            self.assertIn(TESTFN, names)
110            self.assertIn("another.name", names)
111            self.assertIn("strfile", names)
112            self.assertIn("written-open-w", names)
113            for i in infos:
114                self.assertEqual(i.file_size, len(self.data))
115
116            # check getinfo
117            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
118                info = zipfp.getinfo(nm)
119                self.assertEqual(info.filename, nm)
120                self.assertEqual(info.file_size, len(self.data))
121
122            # Check that testzip doesn't raise an exception
123            zipfp.testzip()
124
125    def test_basic(self):
126        for f in get_files(self):
127            self.zip_test(f, self.compression)
128
129    def zip_open_test(self, f, compression):
130        self.make_test_archive(f, compression)
131
132        # Read the ZIP archive
133        with zipfile.ZipFile(f, "r", compression) as zipfp:
134            zipdata1 = []
135            with zipfp.open(TESTFN) as zipopen1:
136                while True:
137                    read_data = zipopen1.read(256)
138                    if not read_data:
139                        break
140                    zipdata1.append(read_data)
141
142            zipdata2 = []
143            with zipfp.open("another.name") as zipopen2:
144                while True:
145                    read_data = zipopen2.read(256)
146                    if not read_data:
147                        break
148                    zipdata2.append(read_data)
149
150            self.assertEqual(b''.join(zipdata1), self.data)
151            self.assertEqual(b''.join(zipdata2), self.data)
152
153    def test_open(self):
154        for f in get_files(self):
155            self.zip_open_test(f, self.compression)
156
157    def test_open_with_pathlike(self):
158        path = pathlib.Path(TESTFN2)
159        self.zip_open_test(path, self.compression)
160        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
161            self.assertIsInstance(zipfp.filename, str)
162
163    def zip_random_open_test(self, f, compression):
164        self.make_test_archive(f, compression)
165
166        # Read the ZIP archive
167        with zipfile.ZipFile(f, "r", compression) as zipfp:
168            zipdata1 = []
169            with zipfp.open(TESTFN) as zipopen1:
170                while True:
171                    read_data = zipopen1.read(randint(1, 1024))
172                    if not read_data:
173                        break
174                    zipdata1.append(read_data)
175
176            self.assertEqual(b''.join(zipdata1), self.data)
177
178    def test_random_open(self):
179        for f in get_files(self):
180            self.zip_random_open_test(f, self.compression)
181
182    def zip_read1_test(self, f, compression):
183        self.make_test_archive(f, compression)
184
185        # Read the ZIP archive
186        with zipfile.ZipFile(f, "r") as zipfp, \
187             zipfp.open(TESTFN) as zipopen:
188            zipdata = []
189            while True:
190                read_data = zipopen.read1(-1)
191                if not read_data:
192                    break
193                zipdata.append(read_data)
194
195        self.assertEqual(b''.join(zipdata), self.data)
196
197    def test_read1(self):
198        for f in get_files(self):
199            self.zip_read1_test(f, self.compression)
200
201    def zip_read1_10_test(self, f, compression):
202        self.make_test_archive(f, compression)
203
204        # Read the ZIP archive
205        with zipfile.ZipFile(f, "r") as zipfp, \
206             zipfp.open(TESTFN) as zipopen:
207            zipdata = []
208            while True:
209                read_data = zipopen.read1(10)
210                self.assertLessEqual(len(read_data), 10)
211                if not read_data:
212                    break
213                zipdata.append(read_data)
214
215        self.assertEqual(b''.join(zipdata), self.data)
216
217    def test_read1_10(self):
218        for f in get_files(self):
219            self.zip_read1_10_test(f, self.compression)
220
221    def zip_readline_read_test(self, f, compression):
222        self.make_test_archive(f, compression)
223
224        # Read the ZIP archive
225        with zipfile.ZipFile(f, "r") as zipfp, \
226             zipfp.open(TESTFN) as zipopen:
227            data = b''
228            while True:
229                read = zipopen.readline()
230                if not read:
231                    break
232                data += read
233
234                read = zipopen.read(100)
235                if not read:
236                    break
237                data += read
238
239        self.assertEqual(data, self.data)
240
241    def test_readline_read(self):
242        # Issue #7610: calls to readline() interleaved with calls to read().
243        for f in get_files(self):
244            self.zip_readline_read_test(f, self.compression)
245
246    def zip_readline_test(self, f, compression):
247        self.make_test_archive(f, compression)
248
249        # Read the ZIP archive
250        with zipfile.ZipFile(f, "r") as zipfp:
251            with zipfp.open(TESTFN) as zipopen:
252                for line in self.line_gen:
253                    linedata = zipopen.readline()
254                    self.assertEqual(linedata, line)
255
256    def test_readline(self):
257        for f in get_files(self):
258            self.zip_readline_test(f, self.compression)
259
260    def zip_readlines_test(self, f, compression):
261        self.make_test_archive(f, compression)
262
263        # Read the ZIP archive
264        with zipfile.ZipFile(f, "r") as zipfp:
265            with zipfp.open(TESTFN) as zipopen:
266                ziplines = zipopen.readlines()
267            for line, zipline in zip(self.line_gen, ziplines):
268                self.assertEqual(zipline, line)
269
270    def test_readlines(self):
271        for f in get_files(self):
272            self.zip_readlines_test(f, self.compression)
273
274    def zip_iterlines_test(self, f, compression):
275        self.make_test_archive(f, compression)
276
277        # Read the ZIP archive
278        with zipfile.ZipFile(f, "r") as zipfp:
279            with zipfp.open(TESTFN) as zipopen:
280                for line, zipline in zip(self.line_gen, zipopen):
281                    self.assertEqual(zipline, line)
282
283    def test_iterlines(self):
284        for f in get_files(self):
285            self.zip_iterlines_test(f, self.compression)
286
287    def test_low_compression(self):
288        """Check for cases where compressed data is larger than original."""
289        # Create the ZIP archive
290        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
291            zipfp.writestr("strfile", '12')
292
293        # Get an open object for strfile
294        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
295            with zipfp.open("strfile") as openobj:
296                self.assertEqual(openobj.read(1), b'1')
297                self.assertEqual(openobj.read(1), b'2')
298
299    def test_writestr_compression(self):
300        zipfp = zipfile.ZipFile(TESTFN2, "w")
301        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
302        info = zipfp.getinfo('b.txt')
303        self.assertEqual(info.compress_type, self.compression)
304
305    def test_writestr_compresslevel(self):
306        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
307        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
308        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
309                       compresslevel=2)
310
311        # Compression level follows the constructor.
312        a_info = zipfp.getinfo('a.txt')
313        self.assertEqual(a_info.compress_type, self.compression)
314        self.assertEqual(a_info._compresslevel, 1)
315
316        # Compression level is overridden.
317        b_info = zipfp.getinfo('b.txt')
318        self.assertEqual(b_info.compress_type, self.compression)
319        self.assertEqual(b_info._compresslevel, 2)
320
321    def test_read_return_size(self):
322        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
323        # than requested.
324        for test_size in (1, 4095, 4096, 4097, 16384):
325            file_size = test_size + 1
326            junk = randbytes(file_size)
327            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
328                zipf.writestr('foo', junk)
329                with zipf.open('foo', 'r') as fp:
330                    buf = fp.read(test_size)
331                    self.assertEqual(len(buf), test_size)
332
333    def test_truncated_zipfile(self):
334        fp = io.BytesIO()
335        with zipfile.ZipFile(fp, mode='w') as zipf:
336            zipf.writestr('strfile', self.data, compress_type=self.compression)
337            end_offset = fp.tell()
338        zipfiledata = fp.getvalue()
339
340        fp = io.BytesIO(zipfiledata)
341        with zipfile.ZipFile(fp) as zipf:
342            with zipf.open('strfile') as zipopen:
343                fp.truncate(end_offset - 20)
344                with self.assertRaises(EOFError):
345                    zipopen.read()
346
347        fp = io.BytesIO(zipfiledata)
348        with zipfile.ZipFile(fp) as zipf:
349            with zipf.open('strfile') as zipopen:
350                fp.truncate(end_offset - 20)
351                with self.assertRaises(EOFError):
352                    while zipopen.read(100):
353                        pass
354
355        fp = io.BytesIO(zipfiledata)
356        with zipfile.ZipFile(fp) as zipf:
357            with zipf.open('strfile') as zipopen:
358                fp.truncate(end_offset - 20)
359                with self.assertRaises(EOFError):
360                    while zipopen.read1(100):
361                        pass
362
363    def test_repr(self):
364        fname = 'file.name'
365        for f in get_files(self):
366            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
367                zipfp.write(TESTFN, fname)
368                r = repr(zipfp)
369                self.assertIn("mode='w'", r)
370
371            with zipfile.ZipFile(f, 'r') as zipfp:
372                r = repr(zipfp)
373                if isinstance(f, str):
374                    self.assertIn('filename=%r' % f, r)
375                else:
376                    self.assertIn('file=%r' % f, r)
377                self.assertIn("mode='r'", r)
378                r = repr(zipfp.getinfo(fname))
379                self.assertIn('filename=%r' % fname, r)
380                self.assertIn('filemode=', r)
381                self.assertIn('file_size=', r)
382                if self.compression != zipfile.ZIP_STORED:
383                    self.assertIn('compress_type=', r)
384                    self.assertIn('compress_size=', r)
385                with zipfp.open(fname) as zipopen:
386                    r = repr(zipopen)
387                    self.assertIn('name=%r' % fname, r)
388                    self.assertIn("mode='r'", r)
389                    if self.compression != zipfile.ZIP_STORED:
390                        self.assertIn('compress_type=', r)
391                self.assertIn('[closed]', repr(zipopen))
392            self.assertIn('[closed]', repr(zipfp))
393
394    def test_compresslevel_basic(self):
395        for f in get_files(self):
396            self.zip_test(f, self.compression, compresslevel=9)
397
398    def test_per_file_compresslevel(self):
399        """Check that files within a Zip archive can have different
400        compression levels."""
401        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
402            zipfp.write(TESTFN, 'compress_1')
403            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
404            one_info = zipfp.getinfo('compress_1')
405            nine_info = zipfp.getinfo('compress_9')
406            self.assertEqual(one_info._compresslevel, 1)
407            self.assertEqual(nine_info._compresslevel, 9)
408
409    def test_writing_errors(self):
410        class BrokenFile(io.BytesIO):
411            def write(self, data):
412                nonlocal count
413                if count is not None:
414                    if count == stop:
415                        raise OSError
416                    count += 1
417                super().write(data)
418
419        stop = 0
420        while True:
421            testfile = BrokenFile()
422            count = None
423            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
424                with zipfp.open('file1', 'w') as f:
425                    f.write(b'data1')
426                count = 0
427                try:
428                    with zipfp.open('file2', 'w') as f:
429                        f.write(b'data2')
430                except OSError:
431                    stop += 1
432                else:
433                    break
434                finally:
435                    count = None
436            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
437                self.assertEqual(zipfp.namelist(), ['file1'])
438                self.assertEqual(zipfp.read('file1'), b'data1')
439
440        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
441            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
442            self.assertEqual(zipfp.read('file1'), b'data1')
443            self.assertEqual(zipfp.read('file2'), b'data2')
444
445
446    def tearDown(self):
447        unlink(TESTFN)
448        unlink(TESTFN2)
449
450
451class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
452                                unittest.TestCase):
453    compression = zipfile.ZIP_STORED
454    test_low_compression = None
455
456    def zip_test_writestr_permissions(self, f, compression):
457        # Make sure that writestr and open(... mode='w') create files with
458        # mode 0600, when they are passed a name rather than a ZipInfo
459        # instance.
460
461        self.make_test_archive(f, compression)
462        with zipfile.ZipFile(f, "r") as zipfp:
463            zinfo = zipfp.getinfo('strfile')
464            self.assertEqual(zinfo.external_attr, 0o600 << 16)
465
466            zinfo2 = zipfp.getinfo('written-open-w')
467            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
468
469    def test_writestr_permissions(self):
470        for f in get_files(self):
471            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
472
473    def test_absolute_arcnames(self):
474        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
475            zipfp.write(TESTFN, "/absolute")
476
477        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
478            self.assertEqual(zipfp.namelist(), ["absolute"])
479
480    def test_append_to_zip_file(self):
481        """Test appending to an existing zipfile."""
482        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
483            zipfp.write(TESTFN, TESTFN)
484
485        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
486            zipfp.writestr("strfile", self.data)
487            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
488
489    def test_append_to_non_zip_file(self):
490        """Test appending to an existing file that is not a zipfile."""
491        # NOTE: this test fails if len(d) < 22 because of the first
492        # line "fpin.seek(-22, 2)" in _EndRecData
493        data = b'I am not a ZipFile!'*10
494        with open(TESTFN2, 'wb') as f:
495            f.write(data)
496
497        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
498            zipfp.write(TESTFN, TESTFN)
499
500        with open(TESTFN2, 'rb') as f:
501            f.seek(len(data))
502            with zipfile.ZipFile(f, "r") as zipfp:
503                self.assertEqual(zipfp.namelist(), [TESTFN])
504                self.assertEqual(zipfp.read(TESTFN), self.data)
505        with open(TESTFN2, 'rb') as f:
506            self.assertEqual(f.read(len(data)), data)
507            zipfiledata = f.read()
508        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
509            self.assertEqual(zipfp.namelist(), [TESTFN])
510            self.assertEqual(zipfp.read(TESTFN), self.data)
511
512    def test_read_concatenated_zip_file(self):
513        with io.BytesIO() as bio:
514            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
515                zipfp.write(TESTFN, TESTFN)
516            zipfiledata = bio.getvalue()
517        data = b'I am not a ZipFile!'*10
518        with open(TESTFN2, 'wb') as f:
519            f.write(data)
520            f.write(zipfiledata)
521
522        with zipfile.ZipFile(TESTFN2) as zipfp:
523            self.assertEqual(zipfp.namelist(), [TESTFN])
524            self.assertEqual(zipfp.read(TESTFN), self.data)
525
526    def test_append_to_concatenated_zip_file(self):
527        with io.BytesIO() as bio:
528            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
529                zipfp.write(TESTFN, TESTFN)
530            zipfiledata = bio.getvalue()
531        data = b'I am not a ZipFile!'*1000000
532        with open(TESTFN2, 'wb') as f:
533            f.write(data)
534            f.write(zipfiledata)
535
536        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
537            self.assertEqual(zipfp.namelist(), [TESTFN])
538            zipfp.writestr('strfile', self.data)
539
540        with open(TESTFN2, 'rb') as f:
541            self.assertEqual(f.read(len(data)), data)
542            zipfiledata = f.read()
543        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
544            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
545            self.assertEqual(zipfp.read(TESTFN), self.data)
546            self.assertEqual(zipfp.read('strfile'), self.data)
547
548    def test_ignores_newline_at_end(self):
549        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
550            zipfp.write(TESTFN, TESTFN)
551        with open(TESTFN2, 'a', encoding='utf-8') as f:
552            f.write("\r\n\00\00\00")
553        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
554            self.assertIsInstance(zipfp, zipfile.ZipFile)
555
556    def test_ignores_stuff_appended_past_comments(self):
557        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
558            zipfp.comment = b"this is a comment"
559            zipfp.write(TESTFN, TESTFN)
560        with open(TESTFN2, 'a', encoding='utf-8') as f:
561            f.write("abcdef\r\n")
562        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
563            self.assertIsInstance(zipfp, zipfile.ZipFile)
564            self.assertEqual(zipfp.comment, b"this is a comment")
565
566    def test_write_default_name(self):
567        """Check that calling ZipFile.write without arcname specified
568        produces the expected result."""
569        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
570            zipfp.write(TESTFN)
571            with open(TESTFN, "rb") as f:
572                self.assertEqual(zipfp.read(TESTFN), f.read())
573
574    def test_io_on_closed_zipextfile(self):
575        fname = "somefile.txt"
576        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
577            zipfp.writestr(fname, "bogus")
578
579        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
580            with zipfp.open(fname) as fid:
581                fid.close()
582                self.assertRaises(ValueError, fid.read)
583                self.assertRaises(ValueError, fid.seek, 0)
584                self.assertRaises(ValueError, fid.tell)
585                self.assertRaises(ValueError, fid.readable)
586                self.assertRaises(ValueError, fid.seekable)
587
588    def test_write_to_readonly(self):
589        """Check that trying to call write() on a readonly ZipFile object
590        raises a ValueError."""
591        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
592            zipfp.writestr("somefile.txt", "bogus")
593
594        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
595            self.assertRaises(ValueError, zipfp.write, TESTFN)
596
597        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
598            with self.assertRaises(ValueError):
599                zipfp.open(TESTFN, mode='w')
600
601    def test_add_file_before_1980(self):
602        # Set atime and mtime to 1970-01-01
603        os.utime(TESTFN, (0, 0))
604        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
605            self.assertRaises(ValueError, zipfp.write, TESTFN)
606
607        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
608            zipfp.write(TESTFN)
609            zinfo = zipfp.getinfo(TESTFN)
610            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
611
612    def test_add_file_after_2107(self):
613        # Set atime and mtime to 2108-12-30
614        ts = 4386268800
615        try:
616            time.localtime(ts)
617        except OverflowError:
618            self.skipTest(f'time.localtime({ts}) raises OverflowError')
619        try:
620            os.utime(TESTFN, (ts, ts))
621        except OverflowError:
622            self.skipTest('Host fs cannot set timestamp to required value.')
623
624        mtime_ns = os.stat(TESTFN).st_mtime_ns
625        if mtime_ns != (4386268800 * 10**9):
626            # XFS filesystem is limited to 32-bit timestamp, but the syscall
627            # didn't fail. Moreover, there is a VFS bug which returns
628            # a cached timestamp which is different than the value on disk.
629            #
630            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
631            #
632            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
633            # https://bugs.python.org/issue39460#msg360952
634            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
635
636        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
637            self.assertRaises(struct.error, zipfp.write, TESTFN)
638
639        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
640            zipfp.write(TESTFN)
641            zinfo = zipfp.getinfo(TESTFN)
642            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
643
644
645@requires_zlib()
646class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
647                                 unittest.TestCase):
648    compression = zipfile.ZIP_DEFLATED
649
650    def test_per_file_compression(self):
651        """Check that files within a Zip archive can have different
652        compression options."""
653        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
654            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
655            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
656            sinfo = zipfp.getinfo('storeme')
657            dinfo = zipfp.getinfo('deflateme')
658            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
659            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
660
661@requires_bz2()
662class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
663                               unittest.TestCase):
664    compression = zipfile.ZIP_BZIP2
665
666@requires_lzma()
667class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
668                              unittest.TestCase):
669    compression = zipfile.ZIP_LZMA
670
671
672class AbstractTestZip64InSmallFiles:
673    # These tests test the ZIP64 functionality without using large files,
674    # see test_zipfile64 for proper tests.
675
676    @classmethod
677    def setUpClass(cls):
678        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
679                    for i in range(0, FIXEDTEST_SIZE))
680        cls.data = b'\n'.join(line_gen)
681
682    def setUp(self):
683        self._limit = zipfile.ZIP64_LIMIT
684        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
685        zipfile.ZIP64_LIMIT = 1000
686        zipfile.ZIP_FILECOUNT_LIMIT = 9
687
688        # Make a source file with some lines
689        with open(TESTFN, "wb") as fp:
690            fp.write(self.data)
691
692    def zip_test(self, f, compression):
693        # Create the ZIP archive
694        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
695            zipfp.write(TESTFN, "another.name")
696            zipfp.write(TESTFN, TESTFN)
697            zipfp.writestr("strfile", self.data)
698
699        # Read the ZIP archive
700        with zipfile.ZipFile(f, "r", compression) as zipfp:
701            self.assertEqual(zipfp.read(TESTFN), self.data)
702            self.assertEqual(zipfp.read("another.name"), self.data)
703            self.assertEqual(zipfp.read("strfile"), self.data)
704
705            # Print the ZIP directory
706            fp = io.StringIO()
707            zipfp.printdir(fp)
708
709            directory = fp.getvalue()
710            lines = directory.splitlines()
711            self.assertEqual(len(lines), 4) # Number of files + header
712
713            self.assertIn('File Name', lines[0])
714            self.assertIn('Modified', lines[0])
715            self.assertIn('Size', lines[0])
716
717            fn, date, time_, size = lines[1].split()
718            self.assertEqual(fn, 'another.name')
719            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
720            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
721            self.assertEqual(size, str(len(self.data)))
722
723            # Check the namelist
724            names = zipfp.namelist()
725            self.assertEqual(len(names), 3)
726            self.assertIn(TESTFN, names)
727            self.assertIn("another.name", names)
728            self.assertIn("strfile", names)
729
730            # Check infolist
731            infos = zipfp.infolist()
732            names = [i.filename for i in infos]
733            self.assertEqual(len(names), 3)
734            self.assertIn(TESTFN, names)
735            self.assertIn("another.name", names)
736            self.assertIn("strfile", names)
737            for i in infos:
738                self.assertEqual(i.file_size, len(self.data))
739
740            # check getinfo
741            for nm in (TESTFN, "another.name", "strfile"):
742                info = zipfp.getinfo(nm)
743                self.assertEqual(info.filename, nm)
744                self.assertEqual(info.file_size, len(self.data))
745
746            # Check that testzip doesn't raise an exception
747            zipfp.testzip()
748
749    def test_basic(self):
750        for f in get_files(self):
751            self.zip_test(f, self.compression)
752
753    def test_too_many_files(self):
754        # This test checks that more than 64k files can be added to an archive,
755        # and that the resulting archive can be read properly by ZipFile
756        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
757                               allowZip64=True)
758        zipf.debug = 100
759        numfiles = 15
760        for i in range(numfiles):
761            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
762        self.assertEqual(len(zipf.namelist()), numfiles)
763        zipf.close()
764
765        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
766        self.assertEqual(len(zipf2.namelist()), numfiles)
767        for i in range(numfiles):
768            content = zipf2.read("foo%08d" % i).decode('ascii')
769            self.assertEqual(content, "%d" % (i**3 % 57))
770        zipf2.close()
771
772    def test_too_many_files_append(self):
773        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
774                               allowZip64=False)
775        zipf.debug = 100
776        numfiles = 9
777        for i in range(numfiles):
778            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
779        self.assertEqual(len(zipf.namelist()), numfiles)
780        with self.assertRaises(zipfile.LargeZipFile):
781            zipf.writestr("foo%08d" % numfiles, b'')
782        self.assertEqual(len(zipf.namelist()), numfiles)
783        zipf.close()
784
785        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
786                               allowZip64=False)
787        zipf.debug = 100
788        self.assertEqual(len(zipf.namelist()), numfiles)
789        with self.assertRaises(zipfile.LargeZipFile):
790            zipf.writestr("foo%08d" % numfiles, b'')
791        self.assertEqual(len(zipf.namelist()), numfiles)
792        zipf.close()
793
794        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
795                               allowZip64=True)
796        zipf.debug = 100
797        self.assertEqual(len(zipf.namelist()), numfiles)
798        numfiles2 = 15
799        for i in range(numfiles, numfiles2):
800            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
801        self.assertEqual(len(zipf.namelist()), numfiles2)
802        zipf.close()
803
804        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
805        self.assertEqual(len(zipf2.namelist()), numfiles2)
806        for i in range(numfiles2):
807            content = zipf2.read("foo%08d" % i).decode('ascii')
808            self.assertEqual(content, "%d" % (i**3 % 57))
809        zipf2.close()
810
811    def tearDown(self):
812        zipfile.ZIP64_LIMIT = self._limit
813        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
814        unlink(TESTFN)
815        unlink(TESTFN2)
816
817
818class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
819                                  unittest.TestCase):
820    compression = zipfile.ZIP_STORED
821
822    def large_file_exception_test(self, f, compression):
823        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
824            self.assertRaises(zipfile.LargeZipFile,
825                              zipfp.write, TESTFN, "another.name")
826
827    def large_file_exception_test2(self, f, compression):
828        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
829            self.assertRaises(zipfile.LargeZipFile,
830                              zipfp.writestr, "another.name", self.data)
831
832    def test_large_file_exception(self):
833        for f in get_files(self):
834            self.large_file_exception_test(f, zipfile.ZIP_STORED)
835            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
836
837    def test_absolute_arcnames(self):
838        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
839                             allowZip64=True) as zipfp:
840            zipfp.write(TESTFN, "/absolute")
841
842        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
843            self.assertEqual(zipfp.namelist(), ["absolute"])
844
845    def test_append(self):
846        # Test that appending to the Zip64 archive doesn't change
847        # extra fields of existing entries.
848        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
849            zipfp.writestr("strfile", self.data)
850        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
851            zinfo = zipfp.getinfo("strfile")
852            extra = zinfo.extra
853        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
854            zipfp.writestr("strfile2", self.data)
855        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
856            zinfo = zipfp.getinfo("strfile")
857            self.assertEqual(zinfo.extra, extra)
858
859    def make_zip64_file(
860        self, file_size_64_set=False, file_size_extra=False,
861        compress_size_64_set=False, compress_size_extra=False,
862        header_offset_64_set=False, header_offset_extra=False,
863    ):
864        """Generate bytes sequence for a zip with (incomplete) zip64 data.
865
866        The actual values (not the zip 64 0xffffffff values) stored in the file
867        are:
868        file_size: 8
869        compress_size: 8
870        header_offset: 0
871        """
872        actual_size = 8
873        actual_header_offset = 0
874        local_zip64_fields = []
875        central_zip64_fields = []
876
877        file_size = actual_size
878        if file_size_64_set:
879            file_size = 0xffffffff
880            if file_size_extra:
881                local_zip64_fields.append(actual_size)
882                central_zip64_fields.append(actual_size)
883        file_size = struct.pack("<L", file_size)
884
885        compress_size = actual_size
886        if compress_size_64_set:
887            compress_size = 0xffffffff
888            if compress_size_extra:
889                local_zip64_fields.append(actual_size)
890                central_zip64_fields.append(actual_size)
891        compress_size = struct.pack("<L", compress_size)
892
893        header_offset = actual_header_offset
894        if header_offset_64_set:
895            header_offset = 0xffffffff
896            if header_offset_extra:
897                central_zip64_fields.append(actual_header_offset)
898        header_offset = struct.pack("<L", header_offset)
899
900        local_extra = struct.pack(
901            '<HH' + 'Q'*len(local_zip64_fields),
902            0x0001,
903            8*len(local_zip64_fields),
904            *local_zip64_fields
905        )
906
907        central_extra = struct.pack(
908            '<HH' + 'Q'*len(central_zip64_fields),
909            0x0001,
910            8*len(central_zip64_fields),
911            *central_zip64_fields
912        )
913
914        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
915        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
916
917        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
918        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
919
920        filename = b"test.txt"
921        content = b"test1234"
922        filename_length = struct.pack("<H", len(filename))
923        zip64_contents = (
924            # Local file header
925            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
926            + compress_size
927            + file_size
928            + filename_length
929            + local_extra_length
930            + filename
931            + local_extra
932            + content
933            # Central directory:
934            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
935            + compress_size
936            + file_size
937            + filename_length
938            + central_extra_length
939            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
940            + header_offset
941            + filename
942            + central_extra
943            # Zip64 end of central directory
944            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
945            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
946            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
947            + central_dir_size
948            + offset_to_central_dir
949            # Zip64 end of central directory locator
950            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
951            + b"\x00\x00\x00"
952            # end of central directory
953            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
954            + b"\x00\x00\x00\x00"
955        )
956        return zip64_contents
957
958    def test_bad_zip64_extra(self):
959        """Missing zip64 extra records raises an exception.
960
961        There are 4 fields that the zip64 format handles (the disk number is
962        not used in this module and so is ignored here). According to the zip
963        spec:
964              The order of the fields in the zip64 extended
965              information record is fixed, but the fields MUST
966              only appear if the corresponding Local or Central
967              directory record field is set to 0xFFFF or 0xFFFFFFFF.
968
969        If the zip64 extra content doesn't contain enough entries for the
970        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
971        This test mismatches the length of the zip64 extra field and the number
972        of fields set to indicate the presence of zip64 data.
973        """
974        # zip64 file size present, no fields in extra, expecting one, equals
975        # missing file size.
976        missing_file_size_extra = self.make_zip64_file(
977            file_size_64_set=True,
978        )
979        with self.assertRaises(zipfile.BadZipFile) as e:
980            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
981        self.assertIn('file size', str(e.exception).lower())
982
983        # zip64 file size present, zip64 compress size present, one field in
984        # extra, expecting two, equals missing compress size.
985        missing_compress_size_extra = self.make_zip64_file(
986            file_size_64_set=True,
987            file_size_extra=True,
988            compress_size_64_set=True,
989        )
990        with self.assertRaises(zipfile.BadZipFile) as e:
991            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
992        self.assertIn('compress size', str(e.exception).lower())
993
994        # zip64 compress size present, no fields in extra, expecting one,
995        # equals missing compress size.
996        missing_compress_size_extra = self.make_zip64_file(
997            compress_size_64_set=True,
998        )
999        with self.assertRaises(zipfile.BadZipFile) as e:
1000            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1001        self.assertIn('compress size', str(e.exception).lower())
1002
1003        # zip64 file size present, zip64 compress size present, zip64 header
1004        # offset present, two fields in extra, expecting three, equals missing
1005        # header offset
1006        missing_header_offset_extra = self.make_zip64_file(
1007            file_size_64_set=True,
1008            file_size_extra=True,
1009            compress_size_64_set=True,
1010            compress_size_extra=True,
1011            header_offset_64_set=True,
1012        )
1013        with self.assertRaises(zipfile.BadZipFile) as e:
1014            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1015        self.assertIn('header offset', str(e.exception).lower())
1016
1017        # zip64 compress size present, zip64 header offset present, one field
1018        # in extra, expecting two, equals missing header offset
1019        missing_header_offset_extra = self.make_zip64_file(
1020            file_size_64_set=False,
1021            compress_size_64_set=True,
1022            compress_size_extra=True,
1023            header_offset_64_set=True,
1024        )
1025        with self.assertRaises(zipfile.BadZipFile) as e:
1026            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1027        self.assertIn('header offset', str(e.exception).lower())
1028
1029        # zip64 file size present, zip64 header offset present, one field in
1030        # extra, expecting two, equals missing header offset
1031        missing_header_offset_extra = self.make_zip64_file(
1032            file_size_64_set=True,
1033            file_size_extra=True,
1034            compress_size_64_set=False,
1035            header_offset_64_set=True,
1036        )
1037        with self.assertRaises(zipfile.BadZipFile) as e:
1038            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1039        self.assertIn('header offset', str(e.exception).lower())
1040
1041        # zip64 header offset present, no fields in extra, expecting one,
1042        # equals missing header offset
1043        missing_header_offset_extra = self.make_zip64_file(
1044            file_size_64_set=False,
1045            compress_size_64_set=False,
1046            header_offset_64_set=True,
1047        )
1048        with self.assertRaises(zipfile.BadZipFile) as e:
1049            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1050        self.assertIn('header offset', str(e.exception).lower())
1051
1052    def test_generated_valid_zip64_extra(self):
1053        # These values are what is set in the make_zip64_file method.
1054        expected_file_size = 8
1055        expected_compress_size = 8
1056        expected_header_offset = 0
1057        expected_content = b"test1234"
1058
1059        # Loop through the various valid combinations of zip64 masks
1060        # present and extra fields present.
1061        params = (
1062            {"file_size_64_set": True, "file_size_extra": True},
1063            {"compress_size_64_set": True, "compress_size_extra": True},
1064            {"header_offset_64_set": True, "header_offset_extra": True},
1065        )
1066
1067        for r in range(1, len(params) + 1):
1068            for combo in itertools.combinations(params, r):
1069                kwargs = {}
1070                for c in combo:
1071                    kwargs.update(c)
1072                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1073                    zinfo = zf.infolist()[0]
1074                    self.assertEqual(zinfo.file_size, expected_file_size)
1075                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1076                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1077                    self.assertEqual(zf.read(zinfo), expected_content)
1078
1079
1080@requires_zlib()
1081class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1082                                   unittest.TestCase):
1083    compression = zipfile.ZIP_DEFLATED
1084
1085@requires_bz2()
1086class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1087                                 unittest.TestCase):
1088    compression = zipfile.ZIP_BZIP2
1089
1090@requires_lzma()
1091class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1092                                unittest.TestCase):
1093    compression = zipfile.ZIP_LZMA
1094
1095
1096class AbstractWriterTests:
1097
1098    def tearDown(self):
1099        unlink(TESTFN2)
1100
1101    def test_close_after_close(self):
1102        data = b'content'
1103        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1104            w = zipf.open('test', 'w')
1105            w.write(data)
1106            w.close()
1107            self.assertTrue(w.closed)
1108            w.close()
1109            self.assertTrue(w.closed)
1110            self.assertEqual(zipf.read('test'), data)
1111
1112    def test_write_after_close(self):
1113        data = b'content'
1114        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1115            w = zipf.open('test', 'w')
1116            w.write(data)
1117            w.close()
1118            self.assertTrue(w.closed)
1119            self.assertRaises(ValueError, w.write, b'')
1120            self.assertEqual(zipf.read('test'), data)
1121
1122class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1123    compression = zipfile.ZIP_STORED
1124
1125@requires_zlib()
1126class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1127    compression = zipfile.ZIP_DEFLATED
1128
1129@requires_bz2()
1130class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1131    compression = zipfile.ZIP_BZIP2
1132
1133@requires_lzma()
1134class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1135    compression = zipfile.ZIP_LZMA
1136
1137
1138class PyZipFileTests(unittest.TestCase):
1139    def assertCompiledIn(self, name, namelist):
1140        if name + 'o' not in namelist:
1141            self.assertIn(name + 'c', namelist)
1142
1143    def requiresWriteAccess(self, path):
1144        # effective_ids unavailable on windows
1145        if not os.access(path, os.W_OK,
1146                         effective_ids=os.access in os.supports_effective_ids):
1147            self.skipTest('requires write access to the installed location')
1148        filename = os.path.join(path, 'test_zipfile.try')
1149        try:
1150            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1151            os.close(fd)
1152        except Exception:
1153            self.skipTest('requires write access to the installed location')
1154        unlink(filename)
1155
1156    def test_write_pyfile(self):
1157        self.requiresWriteAccess(os.path.dirname(__file__))
1158        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1159            fn = __file__
1160            if fn.endswith('.pyc'):
1161                path_split = fn.split(os.sep)
1162                if os.altsep is not None:
1163                    path_split.extend(fn.split(os.altsep))
1164                if '__pycache__' in path_split:
1165                    fn = importlib.util.source_from_cache(fn)
1166                else:
1167                    fn = fn[:-1]
1168
1169            zipfp.writepy(fn)
1170
1171            bn = os.path.basename(fn)
1172            self.assertNotIn(bn, zipfp.namelist())
1173            self.assertCompiledIn(bn, zipfp.namelist())
1174
1175        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1176            fn = __file__
1177            if fn.endswith('.pyc'):
1178                fn = fn[:-1]
1179
1180            zipfp.writepy(fn, "testpackage")
1181
1182            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1183            self.assertNotIn(bn, zipfp.namelist())
1184            self.assertCompiledIn(bn, zipfp.namelist())
1185
1186    def test_write_python_package(self):
1187        import email
1188        packagedir = os.path.dirname(email.__file__)
1189        self.requiresWriteAccess(packagedir)
1190
1191        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1192            zipfp.writepy(packagedir)
1193
1194            # Check for a couple of modules at different levels of the
1195            # hierarchy
1196            names = zipfp.namelist()
1197            self.assertCompiledIn('email/__init__.py', names)
1198            self.assertCompiledIn('email/mime/text.py', names)
1199
1200    def test_write_filtered_python_package(self):
1201        import test
1202        packagedir = os.path.dirname(test.__file__)
1203        self.requiresWriteAccess(packagedir)
1204
1205        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1206
1207            # first make sure that the test folder gives error messages
1208            # (on the badsyntax_... files)
1209            with captured_stdout() as reportSIO:
1210                zipfp.writepy(packagedir)
1211            reportStr = reportSIO.getvalue()
1212            self.assertTrue('SyntaxError' in reportStr)
1213
1214            # then check that the filter works on the whole package
1215            with captured_stdout() as reportSIO:
1216                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1217            reportStr = reportSIO.getvalue()
1218            self.assertTrue('SyntaxError' not in reportStr)
1219
1220            # then check that the filter works on individual files
1221            def filter(path):
1222                return not os.path.basename(path).startswith("bad")
1223            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1224                zipfp.writepy(packagedir, filterfunc=filter)
1225            reportStr = reportSIO.getvalue()
1226            if reportStr:
1227                print(reportStr)
1228            self.assertTrue('SyntaxError' not in reportStr)
1229
1230    def test_write_with_optimization(self):
1231        import email
1232        packagedir = os.path.dirname(email.__file__)
1233        self.requiresWriteAccess(packagedir)
1234        optlevel = 1 if __debug__ else 0
1235        ext = '.pyc'
1236
1237        with TemporaryFile() as t, \
1238             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1239            zipfp.writepy(packagedir)
1240
1241            names = zipfp.namelist()
1242            self.assertIn('email/__init__' + ext, names)
1243            self.assertIn('email/mime/text' + ext, names)
1244
1245    def test_write_python_directory(self):
1246        os.mkdir(TESTFN2)
1247        try:
1248            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1249                fp.write("print(42)\n")
1250
1251            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1252                fp.write("print(42 * 42)\n")
1253
1254            with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp:
1255                fp.write("bla bla bla\n")
1256
1257            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1258                zipfp.writepy(TESTFN2)
1259
1260                names = zipfp.namelist()
1261                self.assertCompiledIn('mod1.py', names)
1262                self.assertCompiledIn('mod2.py', names)
1263                self.assertNotIn('mod2.txt', names)
1264
1265        finally:
1266            rmtree(TESTFN2)
1267
1268    def test_write_python_directory_filtered(self):
1269        os.mkdir(TESTFN2)
1270        try:
1271            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1272                fp.write("print(42)\n")
1273
1274            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1275                fp.write("print(42 * 42)\n")
1276
1277            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1278                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1279                                                  not fn.endswith('mod2.py'))
1280
1281                names = zipfp.namelist()
1282                self.assertCompiledIn('mod1.py', names)
1283                self.assertNotIn('mod2.py', names)
1284
1285        finally:
1286            rmtree(TESTFN2)
1287
1288    def test_write_non_pyfile(self):
1289        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1290            with open(TESTFN, 'w', encoding='utf-8') as f:
1291                f.write('most definitely not a python file')
1292            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1293            unlink(TESTFN)
1294
1295    def test_write_pyfile_bad_syntax(self):
1296        os.mkdir(TESTFN2)
1297        try:
1298            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1299                fp.write("Bad syntax in python file\n")
1300
1301            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1302                # syntax errors are printed to stdout
1303                with captured_stdout() as s:
1304                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1305
1306                self.assertIn("SyntaxError", s.getvalue())
1307
1308                # as it will not have compiled the python file, it will
1309                # include the .py file not .pyc
1310                names = zipfp.namelist()
1311                self.assertIn('mod1.py', names)
1312                self.assertNotIn('mod1.pyc', names)
1313
1314        finally:
1315            rmtree(TESTFN2)
1316
1317    def test_write_pathlike(self):
1318        os.mkdir(TESTFN2)
1319        try:
1320            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1321                fp.write("print(42)\n")
1322
1323            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1324                zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py")
1325                names = zipfp.namelist()
1326                self.assertCompiledIn('mod1.py', names)
1327        finally:
1328            rmtree(TESTFN2)
1329
1330
1331class ExtractTests(unittest.TestCase):
1332
1333    def make_test_file(self):
1334        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1335            for fpath, fdata in SMALL_TEST_DATA:
1336                zipfp.writestr(fpath, fdata)
1337
1338    def test_extract(self):
1339        with temp_cwd():
1340            self.make_test_file()
1341            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1342                for fpath, fdata in SMALL_TEST_DATA:
1343                    writtenfile = zipfp.extract(fpath)
1344
1345                    # make sure it was written to the right place
1346                    correctfile = os.path.join(os.getcwd(), fpath)
1347                    correctfile = os.path.normpath(correctfile)
1348
1349                    self.assertEqual(writtenfile, correctfile)
1350
1351                    # make sure correct data is in correct file
1352                    with open(writtenfile, "rb") as f:
1353                        self.assertEqual(fdata.encode(), f.read())
1354
1355                    unlink(writtenfile)
1356
1357    def _test_extract_with_target(self, target):
1358        self.make_test_file()
1359        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1360            for fpath, fdata in SMALL_TEST_DATA:
1361                writtenfile = zipfp.extract(fpath, target)
1362
1363                # make sure it was written to the right place
1364                correctfile = os.path.join(target, fpath)
1365                correctfile = os.path.normpath(correctfile)
1366                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1367
1368                # make sure correct data is in correct file
1369                with open(writtenfile, "rb") as f:
1370                    self.assertEqual(fdata.encode(), f.read())
1371
1372                unlink(writtenfile)
1373
1374        unlink(TESTFN2)
1375
1376    def test_extract_with_target(self):
1377        with temp_dir() as extdir:
1378            self._test_extract_with_target(extdir)
1379
1380    def test_extract_with_target_pathlike(self):
1381        with temp_dir() as extdir:
1382            self._test_extract_with_target(pathlib.Path(extdir))
1383
1384    def test_extract_all(self):
1385        with temp_cwd():
1386            self.make_test_file()
1387            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1388                zipfp.extractall()
1389                for fpath, fdata in SMALL_TEST_DATA:
1390                    outfile = os.path.join(os.getcwd(), fpath)
1391
1392                    with open(outfile, "rb") as f:
1393                        self.assertEqual(fdata.encode(), f.read())
1394
1395                    unlink(outfile)
1396
1397    def _test_extract_all_with_target(self, target):
1398        self.make_test_file()
1399        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1400            zipfp.extractall(target)
1401            for fpath, fdata in SMALL_TEST_DATA:
1402                outfile = os.path.join(target, fpath)
1403
1404                with open(outfile, "rb") as f:
1405                    self.assertEqual(fdata.encode(), f.read())
1406
1407                unlink(outfile)
1408
1409        unlink(TESTFN2)
1410
1411    def test_extract_all_with_target(self):
1412        with temp_dir() as extdir:
1413            self._test_extract_all_with_target(extdir)
1414
1415    def test_extract_all_with_target_pathlike(self):
1416        with temp_dir() as extdir:
1417            self._test_extract_all_with_target(pathlib.Path(extdir))
1418
1419    def check_file(self, filename, content):
1420        self.assertTrue(os.path.isfile(filename))
1421        with open(filename, 'rb') as f:
1422            self.assertEqual(f.read(), content)
1423
1424    def test_sanitize_windows_name(self):
1425        san = zipfile.ZipFile._sanitize_windows_name
1426        # Passing pathsep in allows this test to work regardless of platform.
1427        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1428        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1429        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1430
1431    def test_extract_hackers_arcnames_common_cases(self):
1432        common_hacknames = [
1433            ('../foo/bar', 'foo/bar'),
1434            ('foo/../bar', 'foo/bar'),
1435            ('foo/../../bar', 'foo/bar'),
1436            ('foo/bar/..', 'foo/bar'),
1437            ('./../foo/bar', 'foo/bar'),
1438            ('/foo/bar', 'foo/bar'),
1439            ('/foo/../bar', 'foo/bar'),
1440            ('/foo/../../bar', 'foo/bar'),
1441        ]
1442        self._test_extract_hackers_arcnames(common_hacknames)
1443
1444    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1445    def test_extract_hackers_arcnames_windows_only(self):
1446        """Test combination of path fixing and windows name sanitization."""
1447        windows_hacknames = [
1448            (r'..\foo\bar', 'foo/bar'),
1449            (r'..\/foo\/bar', 'foo/bar'),
1450            (r'foo/\..\/bar', 'foo/bar'),
1451            (r'foo\/../\bar', 'foo/bar'),
1452            (r'C:foo/bar', 'foo/bar'),
1453            (r'C:/foo/bar', 'foo/bar'),
1454            (r'C://foo/bar', 'foo/bar'),
1455            (r'C:\foo\bar', 'foo/bar'),
1456            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1457            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1458            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1459            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1460            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
1461            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
1462            (r'//?/C:/foo/bar', 'foo/bar'),
1463            (r'\\?\C:\foo\bar', 'foo/bar'),
1464            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1465            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1466            ('../../foo../../ba..r', 'foo/ba..r'),
1467        ]
1468        self._test_extract_hackers_arcnames(windows_hacknames)
1469
1470    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1471    def test_extract_hackers_arcnames_posix_only(self):
1472        posix_hacknames = [
1473            ('//foo/bar', 'foo/bar'),
1474            ('../../foo../../ba..r', 'foo../ba..r'),
1475            (r'foo/..\bar', r'foo/..\bar'),
1476        ]
1477        self._test_extract_hackers_arcnames(posix_hacknames)
1478
1479    def _test_extract_hackers_arcnames(self, hacknames):
1480        for arcname, fixedname in hacknames:
1481            content = b'foobar' + arcname.encode()
1482            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1483                zinfo = zipfile.ZipInfo()
1484                # preserve backslashes
1485                zinfo.filename = arcname
1486                zinfo.external_attr = 0o600 << 16
1487                zipfp.writestr(zinfo, content)
1488
1489            arcname = arcname.replace(os.sep, "/")
1490            targetpath = os.path.join('target', 'subdir', 'subsub')
1491            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1492
1493            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1494                writtenfile = zipfp.extract(arcname, targetpath)
1495                self.assertEqual(writtenfile, correctfile,
1496                                 msg='extract %r: %r != %r' %
1497                                 (arcname, writtenfile, correctfile))
1498            self.check_file(correctfile, content)
1499            rmtree('target')
1500
1501            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1502                zipfp.extractall(targetpath)
1503            self.check_file(correctfile, content)
1504            rmtree('target')
1505
1506            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1507
1508            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1509                writtenfile = zipfp.extract(arcname)
1510                self.assertEqual(writtenfile, correctfile,
1511                                 msg="extract %r" % arcname)
1512            self.check_file(correctfile, content)
1513            rmtree(fixedname.split('/')[0])
1514
1515            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1516                zipfp.extractall()
1517            self.check_file(correctfile, content)
1518            rmtree(fixedname.split('/')[0])
1519
1520            unlink(TESTFN2)
1521
1522
1523class OtherTests(unittest.TestCase):
1524    def test_open_via_zip_info(self):
1525        # Create the ZIP archive
1526        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1527            zipfp.writestr("name", "foo")
1528            with self.assertWarns(UserWarning):
1529                zipfp.writestr("name", "bar")
1530            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1531
1532        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1533            infos = zipfp.infolist()
1534            data = b""
1535            for info in infos:
1536                with zipfp.open(info) as zipopen:
1537                    data += zipopen.read()
1538            self.assertIn(data, {b"foobar", b"barfoo"})
1539            data = b""
1540            for info in infos:
1541                data += zipfp.read(info)
1542            self.assertIn(data, {b"foobar", b"barfoo"})
1543
1544    def test_writestr_extended_local_header_issue1202(self):
1545        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1546            for data in 'abcdefghijklmnop':
1547                zinfo = zipfile.ZipInfo(data)
1548                zinfo.flag_bits |= 0x08  # Include an extended local header.
1549                orig_zip.writestr(zinfo, data)
1550
1551    def test_close(self):
1552        """Check that the zipfile is closed after the 'with' block."""
1553        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1554            for fpath, fdata in SMALL_TEST_DATA:
1555                zipfp.writestr(fpath, fdata)
1556                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1557        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1558
1559        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1560            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1561        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1562
1563    def test_close_on_exception(self):
1564        """Check that the zipfile is closed if an exception is raised in the
1565        'with' block."""
1566        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1567            for fpath, fdata in SMALL_TEST_DATA:
1568                zipfp.writestr(fpath, fdata)
1569
1570        try:
1571            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1572                raise zipfile.BadZipFile()
1573        except zipfile.BadZipFile:
1574            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1575
1576    def test_unsupported_version(self):
1577        # File has an extract_version of 120
1578        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1579                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1580                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1581                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1582                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1583
1584        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1585                          io.BytesIO(data), 'r')
1586
1587    @requires_zlib()
1588    def test_read_unicode_filenames(self):
1589        # bug #10801
1590        fname = findfile('zip_cp437_header.zip')
1591        with zipfile.ZipFile(fname) as zipfp:
1592            for name in zipfp.namelist():
1593                zipfp.open(name).close()
1594
1595    def test_write_unicode_filenames(self):
1596        with zipfile.ZipFile(TESTFN, "w") as zf:
1597            zf.writestr("foo.txt", "Test for unicode filename")
1598            zf.writestr("\xf6.txt", "Test for unicode filename")
1599            self.assertIsInstance(zf.infolist()[0].filename, str)
1600
1601        with zipfile.ZipFile(TESTFN, "r") as zf:
1602            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1603            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1604
1605    def test_read_after_write_unicode_filenames(self):
1606        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1607            zipfp.writestr('приклад', b'sample')
1608            self.assertEqual(zipfp.read('приклад'), b'sample')
1609
1610    def test_exclusive_create_zip_file(self):
1611        """Test exclusive creating a new zipfile."""
1612        unlink(TESTFN2)
1613        filename = 'testfile.txt'
1614        content = b'hello, world. this is some content.'
1615        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1616            zipfp.writestr(filename, content)
1617        with self.assertRaises(FileExistsError):
1618            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1619        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1620            self.assertEqual(zipfp.namelist(), [filename])
1621            self.assertEqual(zipfp.read(filename), content)
1622
1623    def test_create_non_existent_file_for_append(self):
1624        if os.path.exists(TESTFN):
1625            os.unlink(TESTFN)
1626
1627        filename = 'testfile.txt'
1628        content = b'hello, world. this is some content.'
1629
1630        try:
1631            with zipfile.ZipFile(TESTFN, 'a') as zf:
1632                zf.writestr(filename, content)
1633        except OSError:
1634            self.fail('Could not append data to a non-existent zip file.')
1635
1636        self.assertTrue(os.path.exists(TESTFN))
1637
1638        with zipfile.ZipFile(TESTFN, 'r') as zf:
1639            self.assertEqual(zf.read(filename), content)
1640
1641    def test_close_erroneous_file(self):
1642        # This test checks that the ZipFile constructor closes the file object
1643        # it opens if there's an error in the file.  If it doesn't, the
1644        # traceback holds a reference to the ZipFile object and, indirectly,
1645        # the file object.
1646        # On Windows, this causes the os.unlink() call to fail because the
1647        # underlying file is still open.  This is SF bug #412214.
1648        #
1649        with open(TESTFN, "w", encoding="utf-8") as fp:
1650            fp.write("this is not a legal zip file\n")
1651        try:
1652            zf = zipfile.ZipFile(TESTFN)
1653        except zipfile.BadZipFile:
1654            pass
1655
1656    def test_is_zip_erroneous_file(self):
1657        """Check that is_zipfile() correctly identifies non-zip files."""
1658        # - passing a filename
1659        with open(TESTFN, "w", encoding='utf-8') as fp:
1660            fp.write("this is not a legal zip file\n")
1661        self.assertFalse(zipfile.is_zipfile(TESTFN))
1662        # - passing a path-like object
1663        self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN)))
1664        # - passing a file object
1665        with open(TESTFN, "rb") as fp:
1666            self.assertFalse(zipfile.is_zipfile(fp))
1667        # - passing a file-like object
1668        fp = io.BytesIO()
1669        fp.write(b"this is not a legal zip file\n")
1670        self.assertFalse(zipfile.is_zipfile(fp))
1671        fp.seek(0, 0)
1672        self.assertFalse(zipfile.is_zipfile(fp))
1673
1674    def test_damaged_zipfile(self):
1675        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1676        # - Create a valid zip file
1677        fp = io.BytesIO()
1678        with zipfile.ZipFile(fp, mode="w") as zipf:
1679            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1680        zipfiledata = fp.getvalue()
1681
1682        # - Now create copies of it missing the last N bytes and make sure
1683        #   a BadZipFile exception is raised when we try to open it
1684        for N in range(len(zipfiledata)):
1685            fp = io.BytesIO(zipfiledata[:N])
1686            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1687
1688    def test_is_zip_valid_file(self):
1689        """Check that is_zipfile() correctly identifies zip files."""
1690        # - passing a filename
1691        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1692            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1693
1694        self.assertTrue(zipfile.is_zipfile(TESTFN))
1695        # - passing a file object
1696        with open(TESTFN, "rb") as fp:
1697            self.assertTrue(zipfile.is_zipfile(fp))
1698            fp.seek(0, 0)
1699            zip_contents = fp.read()
1700        # - passing a file-like object
1701        fp = io.BytesIO()
1702        fp.write(zip_contents)
1703        self.assertTrue(zipfile.is_zipfile(fp))
1704        fp.seek(0, 0)
1705        self.assertTrue(zipfile.is_zipfile(fp))
1706
1707    def test_non_existent_file_raises_OSError(self):
1708        # make sure we don't raise an AttributeError when a partially-constructed
1709        # ZipFile instance is finalized; this tests for regression on SF tracker
1710        # bug #403871.
1711
1712        # The bug we're testing for caused an AttributeError to be raised
1713        # when a ZipFile instance was created for a file that did not
1714        # exist; the .fp member was not initialized but was needed by the
1715        # __del__() method.  Since the AttributeError is in the __del__(),
1716        # it is ignored, but the user should be sufficiently annoyed by
1717        # the message on the output that regression will be noticed
1718        # quickly.
1719        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1720
1721    def test_empty_file_raises_BadZipFile(self):
1722        f = open(TESTFN, 'w', encoding='utf-8')
1723        f.close()
1724        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1725
1726        with open(TESTFN, 'w', encoding='utf-8') as fp:
1727            fp.write("short file")
1728        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1729
1730    def test_closed_zip_raises_ValueError(self):
1731        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1732        data = io.BytesIO()
1733        with zipfile.ZipFile(data, mode="w") as zipf:
1734            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1735
1736        # This is correct; calling .read on a closed ZipFile should raise
1737        # a ValueError, and so should calling .testzip.  An earlier
1738        # version of .testzip would swallow this exception (and any other)
1739        # and report that the first file in the archive was corrupt.
1740        self.assertRaises(ValueError, zipf.read, "foo.txt")
1741        self.assertRaises(ValueError, zipf.open, "foo.txt")
1742        self.assertRaises(ValueError, zipf.testzip)
1743        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1744        with open(TESTFN, 'w', encoding='utf-8') as f:
1745            f.write('zipfile test data')
1746        self.assertRaises(ValueError, zipf.write, TESTFN)
1747
1748    def test_bad_constructor_mode(self):
1749        """Check that bad modes passed to ZipFile constructor are caught."""
1750        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1751
1752    def test_bad_open_mode(self):
1753        """Check that bad modes passed to ZipFile.open are caught."""
1754        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1755            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1756
1757        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1758            # read the data to make sure the file is there
1759            zipf.read("foo.txt")
1760            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1761            # universal newlines support is removed
1762            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1763            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1764
1765    def test_read0(self):
1766        """Check that calling read(0) on a ZipExtFile object returns an empty
1767        string and doesn't advance file pointer."""
1768        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1769            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1770            # read the data to make sure the file is there
1771            with zipf.open("foo.txt") as f:
1772                for i in range(FIXEDTEST_SIZE):
1773                    self.assertEqual(f.read(0), b'')
1774
1775                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1776
1777    def test_open_non_existent_item(self):
1778        """Check that attempting to call open() for an item that doesn't
1779        exist in the archive raises a RuntimeError."""
1780        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1781            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1782
1783    def test_bad_compression_mode(self):
1784        """Check that bad compression methods passed to ZipFile.open are
1785        caught."""
1786        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1787
1788    def test_unsupported_compression(self):
1789        # data is declared as shrunk, but actually deflated
1790        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1791                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1792                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1793                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1794                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1795                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1796        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1797            self.assertRaises(NotImplementedError, zipf.open, 'x')
1798
1799    def test_null_byte_in_filename(self):
1800        """Check that a filename containing a null byte is properly
1801        terminated."""
1802        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1803            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1804            self.assertEqual(zipf.namelist(), ['foo.txt'])
1805
1806    def test_struct_sizes(self):
1807        """Check that ZIP internal structure sizes are calculated correctly."""
1808        self.assertEqual(zipfile.sizeEndCentDir, 22)
1809        self.assertEqual(zipfile.sizeCentralDir, 46)
1810        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1811        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1812
1813    def test_comments(self):
1814        """Check that comments on the archive are handled properly."""
1815
1816        # check default comment is empty
1817        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1818            self.assertEqual(zipf.comment, b'')
1819            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1820
1821        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1822            self.assertEqual(zipfr.comment, b'')
1823
1824        # check a simple short comment
1825        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1826        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1827            zipf.comment = comment
1828            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1829        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1830            self.assertEqual(zipf.comment, comment)
1831
1832        # check a comment of max length
1833        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1834        comment2 = comment2.encode("ascii")
1835        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1836            zipf.comment = comment2
1837            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1838
1839        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1840            self.assertEqual(zipfr.comment, comment2)
1841
1842        # check a comment that is too long is truncated
1843        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1844            with self.assertWarns(UserWarning):
1845                zipf.comment = comment2 + b'oops'
1846            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1847        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1848            self.assertEqual(zipfr.comment, comment2)
1849
1850        # check that comments are correctly modified in append mode
1851        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1852            zipf.comment = b"original comment"
1853            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1854        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1855            zipf.comment = b"an updated comment"
1856        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1857            self.assertEqual(zipf.comment, b"an updated comment")
1858
1859        # check that comments are correctly shortened in append mode
1860        # and the file is indeed truncated
1861        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1862            zipf.comment = b"original comment that's longer"
1863            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1864        original_zip_size = os.path.getsize(TESTFN)
1865        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1866            zipf.comment = b"shorter comment"
1867        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
1868        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1869            self.assertEqual(zipf.comment, b"shorter comment")
1870
1871    def test_unicode_comment(self):
1872        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1873            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1874            with self.assertRaises(TypeError):
1875                zipf.comment = "this is an error"
1876
1877    def test_change_comment_in_empty_archive(self):
1878        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1879            self.assertFalse(zipf.filelist)
1880            zipf.comment = b"this is a comment"
1881        with zipfile.ZipFile(TESTFN, "r") as zipf:
1882            self.assertEqual(zipf.comment, b"this is a comment")
1883
1884    def test_change_comment_in_nonempty_archive(self):
1885        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1886            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1887        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1888            self.assertTrue(zipf.filelist)
1889            zipf.comment = b"this is a comment"
1890        with zipfile.ZipFile(TESTFN, "r") as zipf:
1891            self.assertEqual(zipf.comment, b"this is a comment")
1892
1893    def test_empty_zipfile(self):
1894        # Check that creating a file in 'w' or 'a' mode and closing without
1895        # adding any files to the archives creates a valid empty ZIP file
1896        zipf = zipfile.ZipFile(TESTFN, mode="w")
1897        zipf.close()
1898        try:
1899            zipf = zipfile.ZipFile(TESTFN, mode="r")
1900        except zipfile.BadZipFile:
1901            self.fail("Unable to create empty ZIP file in 'w' mode")
1902
1903        zipf = zipfile.ZipFile(TESTFN, mode="a")
1904        zipf.close()
1905        try:
1906            zipf = zipfile.ZipFile(TESTFN, mode="r")
1907        except:
1908            self.fail("Unable to create empty ZIP file in 'a' mode")
1909
1910    def test_open_empty_file(self):
1911        # Issue 1710703: Check that opening a file with less than 22 bytes
1912        # raises a BadZipFile exception (rather than the previously unhelpful
1913        # OSError)
1914        f = open(TESTFN, 'w', encoding='utf-8')
1915        f.close()
1916        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1917
1918    def test_create_zipinfo_before_1980(self):
1919        self.assertRaises(ValueError,
1920                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1921
1922    def test_create_empty_zipinfo_repr(self):
1923        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
1924        zi = zipfile.ZipInfo(filename="empty")
1925        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
1926
1927    def test_create_empty_zipinfo_default_attributes(self):
1928        """Ensure all required attributes are set."""
1929        zi = zipfile.ZipInfo()
1930        self.assertEqual(zi.orig_filename, "NoName")
1931        self.assertEqual(zi.filename, "NoName")
1932        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
1933        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
1934        self.assertEqual(zi.comment, b"")
1935        self.assertEqual(zi.extra, b"")
1936        self.assertIn(zi.create_system, (0, 3))
1937        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
1938        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
1939        self.assertEqual(zi.reserved, 0)
1940        self.assertEqual(zi.flag_bits, 0)
1941        self.assertEqual(zi.volume, 0)
1942        self.assertEqual(zi.internal_attr, 0)
1943        self.assertEqual(zi.external_attr, 0)
1944
1945        # Before bpo-26185, both were missing
1946        self.assertEqual(zi.file_size, 0)
1947        self.assertEqual(zi.compress_size, 0)
1948
1949    def test_zipfile_with_short_extra_field(self):
1950        """If an extra field in the header is less than 4 bytes, skip it."""
1951        zipdata = (
1952            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1953            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1954            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1955            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1956            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1957            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1958            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1959        )
1960        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1961            # testzip returns the name of the first corrupt file, or None
1962            self.assertIsNone(zipf.testzip())
1963
1964    def test_open_conflicting_handles(self):
1965        # It's only possible to open one writable file handle at a time
1966        msg1 = b"It's fun to charter an accountant!"
1967        msg2 = b"And sail the wide accountant sea"
1968        msg3 = b"To find, explore the funds offshore"
1969        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1970            with zipf.open('foo', mode='w') as w2:
1971                w2.write(msg1)
1972            with zipf.open('bar', mode='w') as w1:
1973                with self.assertRaises(ValueError):
1974                    zipf.open('handle', mode='w')
1975                with self.assertRaises(ValueError):
1976                    zipf.open('foo', mode='r')
1977                with self.assertRaises(ValueError):
1978                    zipf.writestr('str', 'abcde')
1979                with self.assertRaises(ValueError):
1980                    zipf.write(__file__, 'file')
1981                with self.assertRaises(ValueError):
1982                    zipf.close()
1983                w1.write(msg2)
1984            with zipf.open('baz', mode='w') as w2:
1985                w2.write(msg3)
1986
1987        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1988            self.assertEqual(zipf.read('foo'), msg1)
1989            self.assertEqual(zipf.read('bar'), msg2)
1990            self.assertEqual(zipf.read('baz'), msg3)
1991            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1992
1993    def test_seek_tell(self):
1994        # Test seek functionality
1995        txt = b"Where's Bruce?"
1996        bloc = txt.find(b"Bruce")
1997        # Check seek on a file
1998        with zipfile.ZipFile(TESTFN, "w") as zipf:
1999            zipf.writestr("foo.txt", txt)
2000        with zipfile.ZipFile(TESTFN, "r") as zipf:
2001            with zipf.open("foo.txt", "r") as fp:
2002                fp.seek(bloc, os.SEEK_SET)
2003                self.assertEqual(fp.tell(), bloc)
2004                fp.seek(-bloc, os.SEEK_CUR)
2005                self.assertEqual(fp.tell(), 0)
2006                fp.seek(bloc, os.SEEK_CUR)
2007                self.assertEqual(fp.tell(), bloc)
2008                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2009                fp.seek(0, os.SEEK_END)
2010                self.assertEqual(fp.tell(), len(txt))
2011                fp.seek(0, os.SEEK_SET)
2012                self.assertEqual(fp.tell(), 0)
2013        # Check seek on memory file
2014        data = io.BytesIO()
2015        with zipfile.ZipFile(data, mode="w") as zipf:
2016            zipf.writestr("foo.txt", txt)
2017        with zipfile.ZipFile(data, mode="r") as zipf:
2018            with zipf.open("foo.txt", "r") as fp:
2019                fp.seek(bloc, os.SEEK_SET)
2020                self.assertEqual(fp.tell(), bloc)
2021                fp.seek(-bloc, os.SEEK_CUR)
2022                self.assertEqual(fp.tell(), 0)
2023                fp.seek(bloc, os.SEEK_CUR)
2024                self.assertEqual(fp.tell(), bloc)
2025                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2026                fp.seek(0, os.SEEK_END)
2027                self.assertEqual(fp.tell(), len(txt))
2028                fp.seek(0, os.SEEK_SET)
2029                self.assertEqual(fp.tell(), 0)
2030
2031    @requires_bz2()
2032    def test_decompress_without_3rd_party_library(self):
2033        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2034        zip_file = io.BytesIO(data)
2035        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2036            zf.writestr('a.txt', b'a')
2037        with mock.patch('zipfile.bz2', None):
2038            with zipfile.ZipFile(zip_file) as zf:
2039                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2040
2041    @requires_zlib()
2042    def test_full_overlap(self):
2043        data = (
2044            b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e'
2045            b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00a\xed'
2046            b'\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\d\x0b`P'
2047            b'K\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2'
2048            b'\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00'
2049            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00aPK'
2050            b'\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e'
2051            b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00\x00'
2052            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00bPK\x05'
2053            b'\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00\x00/\x00\x00'
2054            b'\x00\x00\x00'
2055        )
2056        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
2057            self.assertEqual(zipf.namelist(), ['a', 'b'])
2058            zi = zipf.getinfo('a')
2059            self.assertEqual(zi.header_offset, 0)
2060            self.assertEqual(zi.compress_size, 16)
2061            self.assertEqual(zi.file_size, 1033)
2062            zi = zipf.getinfo('b')
2063            self.assertEqual(zi.header_offset, 0)
2064            self.assertEqual(zi.compress_size, 16)
2065            self.assertEqual(zi.file_size, 1033)
2066            self.assertEqual(len(zipf.read('a')), 1033)
2067            with self.assertRaisesRegex(zipfile.BadZipFile, 'File name.*differ'):
2068                zipf.read('b')
2069
2070    @requires_zlib()
2071    def test_quoted_overlap(self):
2072        data = (
2073            b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05Y\xfc'
2074            b'8\x044\x00\x00\x00(\x04\x00\x00\x01\x00\x00\x00a\x00'
2075            b'\x1f\x00\xe0\xffPK\x03\x04\x14\x00\x00\x00\x08\x00\xa0l'
2076            b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00'
2077            b'\x00\x00b\xed\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\'
2078            b'd\x0b`PK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0'
2079            b'lH\x05Y\xfc8\x044\x00\x00\x00(\x04\x00\x00\x01'
2080            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2081            b'\x00aPK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0l'
2082            b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00'
2083            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00$\x00\x00\x00'
2084            b'bPK\x05\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00'
2085            b'\x00S\x00\x00\x00\x00\x00'
2086        )
2087        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
2088            self.assertEqual(zipf.namelist(), ['a', 'b'])
2089            zi = zipf.getinfo('a')
2090            self.assertEqual(zi.header_offset, 0)
2091            self.assertEqual(zi.compress_size, 52)
2092            self.assertEqual(zi.file_size, 1064)
2093            zi = zipf.getinfo('b')
2094            self.assertEqual(zi.header_offset, 36)
2095            self.assertEqual(zi.compress_size, 16)
2096            self.assertEqual(zi.file_size, 1033)
2097            with self.assertRaisesRegex(zipfile.BadZipFile, 'Overlapped entries'):
2098                zipf.read('a')
2099            self.assertEqual(len(zipf.read('b')), 1033)
2100
2101    def tearDown(self):
2102        unlink(TESTFN)
2103        unlink(TESTFN2)
2104
2105
2106class AbstractBadCrcTests:
2107    def test_testzip_with_bad_crc(self):
2108        """Tests that files with bad CRCs return their name from testzip."""
2109        zipdata = self.zip_with_bad_crc
2110
2111        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2112            # testzip returns the name of the first corrupt file, or None
2113            self.assertEqual('afile', zipf.testzip())
2114
2115    def test_read_with_bad_crc(self):
2116        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2117        zipdata = self.zip_with_bad_crc
2118
2119        # Using ZipFile.read()
2120        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2121            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2122
2123        # Using ZipExtFile.read()
2124        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2125            with zipf.open('afile', 'r') as corrupt_file:
2126                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2127
2128        # Same with small reads (in order to exercise the buffering logic)
2129        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2130            with zipf.open('afile', 'r') as corrupt_file:
2131                corrupt_file.MIN_READ_SIZE = 2
2132                with self.assertRaises(zipfile.BadZipFile):
2133                    while corrupt_file.read(2):
2134                        pass
2135
2136
2137class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2138    compression = zipfile.ZIP_STORED
2139    zip_with_bad_crc = (
2140        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2141        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2142        b'ilehello,AworldP'
2143        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2144        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2145        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2146        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2147        b'\0\0/\0\0\0\0\0')
2148
2149@requires_zlib()
2150class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2151    compression = zipfile.ZIP_DEFLATED
2152    zip_with_bad_crc = (
2153        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2154        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2155        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2156        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2157        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2158        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2159        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2160        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2161
2162@requires_bz2()
2163class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2164    compression = zipfile.ZIP_BZIP2
2165    zip_with_bad_crc = (
2166        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2167        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2168        b'ileBZh91AY&SY\xd4\xa8\xca'
2169        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2170        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2171        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2172        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2173        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2174        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2175        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2176        b'\x00\x00\x00\x00')
2177
2178@requires_lzma()
2179class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2180    compression = zipfile.ZIP_LZMA
2181    zip_with_bad_crc = (
2182        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2183        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2184        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2185        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2186        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2187        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2188        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2189        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2190        b'\x00>\x00\x00\x00\x00\x00')
2191
2192
2193class DecryptionTests(unittest.TestCase):
2194    """Check that ZIP decryption works. Since the library does not
2195    support encryption at the moment, we use a pre-generated encrypted
2196    ZIP file."""
2197
2198    data = (
2199        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2200        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2201        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2202        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2203        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2204        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2205        b'\x00\x00L\x00\x00\x00\x00\x00' )
2206    data2 = (
2207        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2208        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2209        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2210        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2211        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2212        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2213        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2214        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2215
2216    plain = b'zipfile.py encryption test'
2217    plain2 = b'\x00'*512
2218
2219    def setUp(self):
2220        with open(TESTFN, "wb") as fp:
2221            fp.write(self.data)
2222        self.zip = zipfile.ZipFile(TESTFN, "r")
2223        with open(TESTFN2, "wb") as fp:
2224            fp.write(self.data2)
2225        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2226
2227    def tearDown(self):
2228        self.zip.close()
2229        os.unlink(TESTFN)
2230        self.zip2.close()
2231        os.unlink(TESTFN2)
2232
2233    def test_no_password(self):
2234        # Reading the encrypted file without password
2235        # must generate a RunTime exception
2236        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2237        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2238
2239    def test_bad_password(self):
2240        self.zip.setpassword(b"perl")
2241        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2242        self.zip2.setpassword(b"perl")
2243        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2244
2245    @requires_zlib()
2246    def test_good_password(self):
2247        self.zip.setpassword(b"python")
2248        self.assertEqual(self.zip.read("test.txt"), self.plain)
2249        self.zip2.setpassword(b"12345")
2250        self.assertEqual(self.zip2.read("zero"), self.plain2)
2251
2252    def test_unicode_password(self):
2253        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
2254        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
2255        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
2256        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
2257
2258    def test_seek_tell(self):
2259        self.zip.setpassword(b"python")
2260        txt = self.plain
2261        test_word = b'encryption'
2262        bloc = txt.find(test_word)
2263        bloc_len = len(test_word)
2264        with self.zip.open("test.txt", "r") as fp:
2265            fp.seek(bloc, os.SEEK_SET)
2266            self.assertEqual(fp.tell(), bloc)
2267            fp.seek(-bloc, os.SEEK_CUR)
2268            self.assertEqual(fp.tell(), 0)
2269            fp.seek(bloc, os.SEEK_CUR)
2270            self.assertEqual(fp.tell(), bloc)
2271            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2272
2273            # Make sure that the second read after seeking back beyond
2274            # _readbuffer returns the same content (ie. rewind to the start of
2275            # the file to read forward to the required position).
2276            old_read_size = fp.MIN_READ_SIZE
2277            fp.MIN_READ_SIZE = 1
2278            fp._readbuffer = b''
2279            fp._offset = 0
2280            fp.seek(0, os.SEEK_SET)
2281            self.assertEqual(fp.tell(), 0)
2282            fp.seek(bloc, os.SEEK_CUR)
2283            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2284            fp.MIN_READ_SIZE = old_read_size
2285
2286            fp.seek(0, os.SEEK_END)
2287            self.assertEqual(fp.tell(), len(txt))
2288            fp.seek(0, os.SEEK_SET)
2289            self.assertEqual(fp.tell(), 0)
2290
2291            # Read the file completely to definitely call any eof integrity
2292            # checks (crc) and make sure they still pass.
2293            fp.read()
2294
2295
2296class AbstractTestsWithRandomBinaryFiles:
2297    @classmethod
2298    def setUpClass(cls):
2299        datacount = randint(16, 64)*1024 + randint(1, 1024)
2300        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2301                            for i in range(datacount))
2302
2303    def setUp(self):
2304        # Make a source file with some lines
2305        with open(TESTFN, "wb") as fp:
2306            fp.write(self.data)
2307
2308    def tearDown(self):
2309        unlink(TESTFN)
2310        unlink(TESTFN2)
2311
2312    def make_test_archive(self, f, compression):
2313        # Create the ZIP archive
2314        with zipfile.ZipFile(f, "w", compression) as zipfp:
2315            zipfp.write(TESTFN, "another.name")
2316            zipfp.write(TESTFN, TESTFN)
2317
2318    def zip_test(self, f, compression):
2319        self.make_test_archive(f, compression)
2320
2321        # Read the ZIP archive
2322        with zipfile.ZipFile(f, "r", compression) as zipfp:
2323            testdata = zipfp.read(TESTFN)
2324            self.assertEqual(len(testdata), len(self.data))
2325            self.assertEqual(testdata, self.data)
2326            self.assertEqual(zipfp.read("another.name"), self.data)
2327
2328    def test_read(self):
2329        for f in get_files(self):
2330            self.zip_test(f, self.compression)
2331
2332    def zip_open_test(self, f, compression):
2333        self.make_test_archive(f, compression)
2334
2335        # Read the ZIP archive
2336        with zipfile.ZipFile(f, "r", compression) as zipfp:
2337            zipdata1 = []
2338            with zipfp.open(TESTFN) as zipopen1:
2339                while True:
2340                    read_data = zipopen1.read(256)
2341                    if not read_data:
2342                        break
2343                    zipdata1.append(read_data)
2344
2345            zipdata2 = []
2346            with zipfp.open("another.name") as zipopen2:
2347                while True:
2348                    read_data = zipopen2.read(256)
2349                    if not read_data:
2350                        break
2351                    zipdata2.append(read_data)
2352
2353            testdata1 = b''.join(zipdata1)
2354            self.assertEqual(len(testdata1), len(self.data))
2355            self.assertEqual(testdata1, self.data)
2356
2357            testdata2 = b''.join(zipdata2)
2358            self.assertEqual(len(testdata2), len(self.data))
2359            self.assertEqual(testdata2, self.data)
2360
2361    def test_open(self):
2362        for f in get_files(self):
2363            self.zip_open_test(f, self.compression)
2364
2365    def zip_random_open_test(self, f, compression):
2366        self.make_test_archive(f, compression)
2367
2368        # Read the ZIP archive
2369        with zipfile.ZipFile(f, "r", compression) as zipfp:
2370            zipdata1 = []
2371            with zipfp.open(TESTFN) as zipopen1:
2372                while True:
2373                    read_data = zipopen1.read(randint(1, 1024))
2374                    if not read_data:
2375                        break
2376                    zipdata1.append(read_data)
2377
2378            testdata = b''.join(zipdata1)
2379            self.assertEqual(len(testdata), len(self.data))
2380            self.assertEqual(testdata, self.data)
2381
2382    def test_random_open(self):
2383        for f in get_files(self):
2384            self.zip_random_open_test(f, self.compression)
2385
2386
2387class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2388                                       unittest.TestCase):
2389    compression = zipfile.ZIP_STORED
2390
2391@requires_zlib()
2392class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2393                                        unittest.TestCase):
2394    compression = zipfile.ZIP_DEFLATED
2395
2396@requires_bz2()
2397class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2398                                      unittest.TestCase):
2399    compression = zipfile.ZIP_BZIP2
2400
2401@requires_lzma()
2402class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2403                                     unittest.TestCase):
2404    compression = zipfile.ZIP_LZMA
2405
2406
2407# Provide the tell() method but not seek()
2408class Tellable:
2409    def __init__(self, fp):
2410        self.fp = fp
2411        self.offset = 0
2412
2413    def write(self, data):
2414        n = self.fp.write(data)
2415        self.offset += n
2416        return n
2417
2418    def tell(self):
2419        return self.offset
2420
2421    def flush(self):
2422        self.fp.flush()
2423
2424class Unseekable:
2425    def __init__(self, fp):
2426        self.fp = fp
2427
2428    def write(self, data):
2429        return self.fp.write(data)
2430
2431    def flush(self):
2432        self.fp.flush()
2433
2434class UnseekableTests(unittest.TestCase):
2435    def test_writestr(self):
2436        for wrapper in (lambda f: f), Tellable, Unseekable:
2437            with self.subTest(wrapper=wrapper):
2438                f = io.BytesIO()
2439                f.write(b'abc')
2440                bf = io.BufferedWriter(f)
2441                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2442                    zipfp.writestr('ones', b'111')
2443                    zipfp.writestr('twos', b'222')
2444                self.assertEqual(f.getvalue()[:5], b'abcPK')
2445                with zipfile.ZipFile(f, mode='r') as zipf:
2446                    with zipf.open('ones') as zopen:
2447                        self.assertEqual(zopen.read(), b'111')
2448                    with zipf.open('twos') as zopen:
2449                        self.assertEqual(zopen.read(), b'222')
2450
2451    def test_write(self):
2452        for wrapper in (lambda f: f), Tellable, Unseekable:
2453            with self.subTest(wrapper=wrapper):
2454                f = io.BytesIO()
2455                f.write(b'abc')
2456                bf = io.BufferedWriter(f)
2457                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2458                    self.addCleanup(unlink, TESTFN)
2459                    with open(TESTFN, 'wb') as f2:
2460                        f2.write(b'111')
2461                    zipfp.write(TESTFN, 'ones')
2462                    with open(TESTFN, 'wb') as f2:
2463                        f2.write(b'222')
2464                    zipfp.write(TESTFN, 'twos')
2465                self.assertEqual(f.getvalue()[:5], b'abcPK')
2466                with zipfile.ZipFile(f, mode='r') as zipf:
2467                    with zipf.open('ones') as zopen:
2468                        self.assertEqual(zopen.read(), b'111')
2469                    with zipf.open('twos') as zopen:
2470                        self.assertEqual(zopen.read(), b'222')
2471
2472    def test_open_write(self):
2473        for wrapper in (lambda f: f), Tellable, Unseekable:
2474            with self.subTest(wrapper=wrapper):
2475                f = io.BytesIO()
2476                f.write(b'abc')
2477                bf = io.BufferedWriter(f)
2478                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2479                    with zipf.open('ones', 'w') as zopen:
2480                        zopen.write(b'111')
2481                    with zipf.open('twos', 'w') as zopen:
2482                        zopen.write(b'222')
2483                self.assertEqual(f.getvalue()[:5], b'abcPK')
2484                with zipfile.ZipFile(f) as zipf:
2485                    self.assertEqual(zipf.read('ones'), b'111')
2486                    self.assertEqual(zipf.read('twos'), b'222')
2487
2488
2489@requires_zlib()
2490class TestsWithMultipleOpens(unittest.TestCase):
2491    @classmethod
2492    def setUpClass(cls):
2493        cls.data1 = b'111' + randbytes(10000)
2494        cls.data2 = b'222' + randbytes(10000)
2495
2496    def make_test_archive(self, f):
2497        # Create the ZIP archive
2498        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2499            zipfp.writestr('ones', self.data1)
2500            zipfp.writestr('twos', self.data2)
2501
2502    def test_same_file(self):
2503        # Verify that (when the ZipFile is in control of creating file objects)
2504        # multiple open() calls can be made without interfering with each other.
2505        for f in get_files(self):
2506            self.make_test_archive(f)
2507            with zipfile.ZipFile(f, mode="r") as zipf:
2508                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2509                    data1 = zopen1.read(500)
2510                    data2 = zopen2.read(500)
2511                    data1 += zopen1.read()
2512                    data2 += zopen2.read()
2513                self.assertEqual(data1, data2)
2514                self.assertEqual(data1, self.data1)
2515
2516    def test_different_file(self):
2517        # Verify that (when the ZipFile is in control of creating file objects)
2518        # multiple open() calls can be made without interfering with each other.
2519        for f in get_files(self):
2520            self.make_test_archive(f)
2521            with zipfile.ZipFile(f, mode="r") as zipf:
2522                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2523                    data1 = zopen1.read(500)
2524                    data2 = zopen2.read(500)
2525                    data1 += zopen1.read()
2526                    data2 += zopen2.read()
2527                self.assertEqual(data1, self.data1)
2528                self.assertEqual(data2, self.data2)
2529
2530    def test_interleaved(self):
2531        # Verify that (when the ZipFile is in control of creating file objects)
2532        # multiple open() calls can be made without interfering with each other.
2533        for f in get_files(self):
2534            self.make_test_archive(f)
2535            with zipfile.ZipFile(f, mode="r") as zipf:
2536                with zipf.open('ones') as zopen1:
2537                    data1 = zopen1.read(500)
2538                    with zipf.open('twos') as zopen2:
2539                        data2 = zopen2.read(500)
2540                        data1 += zopen1.read()
2541                        data2 += zopen2.read()
2542                self.assertEqual(data1, self.data1)
2543                self.assertEqual(data2, self.data2)
2544
2545    def test_read_after_close(self):
2546        for f in get_files(self):
2547            self.make_test_archive(f)
2548            with contextlib.ExitStack() as stack:
2549                with zipfile.ZipFile(f, 'r') as zipf:
2550                    zopen1 = stack.enter_context(zipf.open('ones'))
2551                    zopen2 = stack.enter_context(zipf.open('twos'))
2552                data1 = zopen1.read(500)
2553                data2 = zopen2.read(500)
2554                data1 += zopen1.read()
2555                data2 += zopen2.read()
2556            self.assertEqual(data1, self.data1)
2557            self.assertEqual(data2, self.data2)
2558
2559    def test_read_after_write(self):
2560        for f in get_files(self):
2561            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2562                zipf.writestr('ones', self.data1)
2563                zipf.writestr('twos', self.data2)
2564                with zipf.open('ones') as zopen1:
2565                    data1 = zopen1.read(500)
2566            self.assertEqual(data1, self.data1[:500])
2567            with zipfile.ZipFile(f, 'r') as zipf:
2568                data1 = zipf.read('ones')
2569                data2 = zipf.read('twos')
2570            self.assertEqual(data1, self.data1)
2571            self.assertEqual(data2, self.data2)
2572
2573    def test_write_after_read(self):
2574        for f in get_files(self):
2575            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2576                zipf.writestr('ones', self.data1)
2577                with zipf.open('ones') as zopen1:
2578                    zopen1.read(500)
2579                    zipf.writestr('twos', self.data2)
2580            with zipfile.ZipFile(f, 'r') as zipf:
2581                data1 = zipf.read('ones')
2582                data2 = zipf.read('twos')
2583            self.assertEqual(data1, self.data1)
2584            self.assertEqual(data2, self.data2)
2585
2586    def test_many_opens(self):
2587        # Verify that read() and open() promptly close the file descriptor,
2588        # and don't rely on the garbage collector to free resources.
2589        self.make_test_archive(TESTFN2)
2590        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2591            for x in range(100):
2592                zipf.read('ones')
2593                with zipf.open('ones') as zopen1:
2594                    pass
2595        with open(os.devnull, "rb") as f:
2596            self.assertLess(f.fileno(), 100)
2597
2598    def test_write_while_reading(self):
2599        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2600            zipf.writestr('ones', self.data1)
2601        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2602            with zipf.open('ones', 'r') as r1:
2603                data1 = r1.read(500)
2604                with zipf.open('twos', 'w') as w1:
2605                    w1.write(self.data2)
2606                data1 += r1.read()
2607        self.assertEqual(data1, self.data1)
2608        with zipfile.ZipFile(TESTFN2) as zipf:
2609            self.assertEqual(zipf.read('twos'), self.data2)
2610
2611    def tearDown(self):
2612        unlink(TESTFN2)
2613
2614
2615class TestWithDirectory(unittest.TestCase):
2616    def setUp(self):
2617        os.mkdir(TESTFN2)
2618
2619    def test_extract_dir(self):
2620        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
2621            zipf.extractall(TESTFN2)
2622        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2623        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2624        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2625
2626    def test_bug_6050(self):
2627        # Extraction should succeed if directories already exist
2628        os.mkdir(os.path.join(TESTFN2, "a"))
2629        self.test_extract_dir()
2630
2631    def test_write_dir(self):
2632        dirpath = os.path.join(TESTFN2, "x")
2633        os.mkdir(dirpath)
2634        mode = os.stat(dirpath).st_mode & 0xFFFF
2635        with zipfile.ZipFile(TESTFN, "w") as zipf:
2636            zipf.write(dirpath)
2637            zinfo = zipf.filelist[0]
2638            self.assertTrue(zinfo.filename.endswith("/x/"))
2639            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2640            zipf.write(dirpath, "y")
2641            zinfo = zipf.filelist[1]
2642            self.assertTrue(zinfo.filename, "y/")
2643            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2644        with zipfile.ZipFile(TESTFN, "r") as zipf:
2645            zinfo = zipf.filelist[0]
2646            self.assertTrue(zinfo.filename.endswith("/x/"))
2647            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2648            zinfo = zipf.filelist[1]
2649            self.assertTrue(zinfo.filename, "y/")
2650            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2651            target = os.path.join(TESTFN2, "target")
2652            os.mkdir(target)
2653            zipf.extractall(target)
2654            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2655            self.assertEqual(len(os.listdir(target)), 2)
2656
2657    def test_writestr_dir(self):
2658        os.mkdir(os.path.join(TESTFN2, "x"))
2659        with zipfile.ZipFile(TESTFN, "w") as zipf:
2660            zipf.writestr("x/", b'')
2661            zinfo = zipf.filelist[0]
2662            self.assertEqual(zinfo.filename, "x/")
2663            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2664        with zipfile.ZipFile(TESTFN, "r") as zipf:
2665            zinfo = zipf.filelist[0]
2666            self.assertTrue(zinfo.filename.endswith("x/"))
2667            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2668            target = os.path.join(TESTFN2, "target")
2669            os.mkdir(target)
2670            zipf.extractall(target)
2671            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2672            self.assertEqual(os.listdir(target), ["x"])
2673
2674    def tearDown(self):
2675        rmtree(TESTFN2)
2676        if os.path.exists(TESTFN):
2677            unlink(TESTFN)
2678
2679
2680class ZipInfoTests(unittest.TestCase):
2681    def test_from_file(self):
2682        zi = zipfile.ZipInfo.from_file(__file__)
2683        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2684        self.assertFalse(zi.is_dir())
2685        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2686
2687    def test_from_file_pathlike(self):
2688        zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__))
2689        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2690        self.assertFalse(zi.is_dir())
2691        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2692
2693    def test_from_file_bytes(self):
2694        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
2695        self.assertEqual(posixpath.basename(zi.filename), 'test')
2696        self.assertFalse(zi.is_dir())
2697        self.assertEqual(zi.file_size, os.path.getsize(__file__))
2698
2699    def test_from_file_fileno(self):
2700        with open(__file__, 'rb') as f:
2701            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
2702            self.assertEqual(posixpath.basename(zi.filename), 'test')
2703            self.assertFalse(zi.is_dir())
2704            self.assertEqual(zi.file_size, os.path.getsize(__file__))
2705
2706    def test_from_dir(self):
2707        dirpath = os.path.dirname(os.path.abspath(__file__))
2708        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2709        self.assertEqual(zi.filename, 'stdlib_tests/')
2710        self.assertTrue(zi.is_dir())
2711        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2712        self.assertEqual(zi.file_size, 0)
2713
2714
2715class CommandLineTest(unittest.TestCase):
2716
2717    def zipfilecmd(self, *args, **kwargs):
2718        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2719                                                      **kwargs)
2720        return out.replace(os.linesep.encode(), b'\n')
2721
2722    def zipfilecmd_failure(self, *args):
2723        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2724
2725    def test_bad_use(self):
2726        rc, out, err = self.zipfilecmd_failure()
2727        self.assertEqual(out, b'')
2728        self.assertIn(b'usage', err.lower())
2729        self.assertIn(b'error', err.lower())
2730        self.assertIn(b'required', err.lower())
2731        rc, out, err = self.zipfilecmd_failure('-l', '')
2732        self.assertEqual(out, b'')
2733        self.assertNotEqual(err.strip(), b'')
2734
2735    def test_test_command(self):
2736        zip_name = findfile('zipdir.zip')
2737        for opt in '-t', '--test':
2738            out = self.zipfilecmd(opt, zip_name)
2739            self.assertEqual(out.rstrip(), b'Done testing')
2740        zip_name = findfile('testtar.tar')
2741        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2742        self.assertEqual(out, b'')
2743
2744    def test_list_command(self):
2745        zip_name = findfile('zipdir.zip')
2746        t = io.StringIO()
2747        with zipfile.ZipFile(zip_name, 'r') as tf:
2748            tf.printdir(t)
2749        expected = t.getvalue().encode('ascii', 'backslashreplace')
2750        for opt in '-l', '--list':
2751            out = self.zipfilecmd(opt, zip_name,
2752                                  PYTHONIOENCODING='ascii:backslashreplace')
2753            self.assertEqual(out, expected)
2754
2755    @requires_zlib()
2756    def test_create_command(self):
2757        self.addCleanup(unlink, TESTFN)
2758        with open(TESTFN, 'w', encoding='utf-8') as f:
2759            f.write('test 1')
2760        os.mkdir(TESTFNDIR)
2761        self.addCleanup(rmtree, TESTFNDIR)
2762        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f:
2763            f.write('test 2')
2764        files = [TESTFN, TESTFNDIR]
2765        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2766        for opt in '-c', '--create':
2767            try:
2768                out = self.zipfilecmd(opt, TESTFN2, *files)
2769                self.assertEqual(out, b'')
2770                with zipfile.ZipFile(TESTFN2) as zf:
2771                    self.assertEqual(zf.namelist(), namelist)
2772                    self.assertEqual(zf.read(namelist[0]), b'test 1')
2773                    self.assertEqual(zf.read(namelist[2]), b'test 2')
2774            finally:
2775                unlink(TESTFN2)
2776
2777    def test_extract_command(self):
2778        zip_name = findfile('zipdir.zip')
2779        for opt in '-e', '--extract':
2780            with temp_dir() as extdir:
2781                out = self.zipfilecmd(opt, zip_name, extdir)
2782                self.assertEqual(out, b'')
2783                with zipfile.ZipFile(zip_name) as zf:
2784                    for zi in zf.infolist():
2785                        path = os.path.join(extdir,
2786                                    zi.filename.replace('/', os.sep))
2787                        if zi.is_dir():
2788                            self.assertTrue(os.path.isdir(path))
2789                        else:
2790                            self.assertTrue(os.path.isfile(path))
2791                            with open(path, 'rb') as f:
2792                                self.assertEqual(f.read(), zf.read(zi))
2793
2794
2795class TestExecutablePrependedZip(unittest.TestCase):
2796    """Test our ability to open zip files with an executable prepended."""
2797
2798    def setUp(self):
2799        self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata')
2800        self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata')
2801
2802    def _test_zip_works(self, name):
2803        # bpo28494 sanity check: ensure is_zipfile works on these.
2804        self.assertTrue(zipfile.is_zipfile(name),
2805                        f'is_zipfile failed on {name}')
2806        # Ensure we can operate on these via ZipFile.
2807        with zipfile.ZipFile(name) as zipfp:
2808            for n in zipfp.namelist():
2809                data = zipfp.read(n)
2810                self.assertIn(b'FAVORITE_NUMBER', data)
2811
2812    def test_read_zip_with_exe_prepended(self):
2813        self._test_zip_works(self.exe_zip)
2814
2815    def test_read_zip64_with_exe_prepended(self):
2816        self._test_zip_works(self.exe_zip64)
2817
2818    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2819    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2820                         'Test relies on #!/bin/bash working.')
2821    def test_execute_zip2(self):
2822        output = subprocess.check_output([self.exe_zip, sys.executable])
2823        self.assertIn(b'number in executable: 5', output)
2824
2825    @unittest.skipUnless(sys.executable, 'sys.executable required.')
2826    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
2827                         'Test relies on #!/bin/bash working.')
2828    def test_execute_zip64(self):
2829        output = subprocess.check_output([self.exe_zip64, sys.executable])
2830        self.assertIn(b'number in executable: 5', output)
2831
2832
2833# Poor man's technique to consume a (smallish) iterable.
2834consume = tuple
2835
2836
2837# from jaraco.itertools 5.0
2838class jaraco:
2839    class itertools:
2840        class Counter:
2841            def __init__(self, i):
2842                self.count = 0
2843                self._orig_iter = iter(i)
2844
2845            def __iter__(self):
2846                return self
2847
2848            def __next__(self):
2849                result = next(self._orig_iter)
2850                self.count += 1
2851                return result
2852
2853
2854def add_dirs(zf):
2855    """
2856    Given a writable zip file zf, inject directory entries for
2857    any directories implied by the presence of children.
2858    """
2859    for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
2860        zf.writestr(name, b"")
2861    return zf
2862
2863
2864def build_alpharep_fixture():
2865    """
2866    Create a zip file with this structure:
2867
2868    .
2869    ├── a.txt
2870    ├── b
2871    │   ├── c.txt
2872    │   ├── d
2873    │   │   └── e.txt
2874    │   └── f.txt
2875    └── g
2876        └── h
2877            └── i.txt
2878
2879    This fixture has the following key characteristics:
2880
2881    - a file at the root (a)
2882    - a file two levels deep (b/d/e)
2883    - multiple files in a directory (b/c, b/f)
2884    - a directory containing only a directory (g/h)
2885
2886    "alpha" because it uses alphabet
2887    "rep" because it's a representative example
2888    """
2889    data = io.BytesIO()
2890    zf = zipfile.ZipFile(data, "w")
2891    zf.writestr("a.txt", b"content of a")
2892    zf.writestr("b/c.txt", b"content of c")
2893    zf.writestr("b/d/e.txt", b"content of e")
2894    zf.writestr("b/f.txt", b"content of f")
2895    zf.writestr("g/h/i.txt", b"content of i")
2896    zf.filename = "alpharep.zip"
2897    return zf
2898
2899
2900def pass_alpharep(meth):
2901    """
2902    Given a method, wrap it in a for loop that invokes method
2903    with each subtest.
2904    """
2905
2906    @functools.wraps(meth)
2907    def wrapper(self):
2908        for alpharep in self.zipfile_alpharep():
2909            meth(self, alpharep=alpharep)
2910
2911    return wrapper
2912
2913
2914class TestPath(unittest.TestCase):
2915    def setUp(self):
2916        self.fixtures = contextlib.ExitStack()
2917        self.addCleanup(self.fixtures.close)
2918
2919    def zipfile_alpharep(self):
2920        with self.subTest():
2921            yield build_alpharep_fixture()
2922        with self.subTest():
2923            yield add_dirs(build_alpharep_fixture())
2924
2925    def zipfile_ondisk(self, alpharep):
2926        tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir()))
2927        buffer = alpharep.fp
2928        alpharep.close()
2929        path = tmpdir / alpharep.filename
2930        with path.open("wb") as strm:
2931            strm.write(buffer.getvalue())
2932        return path
2933
2934    @pass_alpharep
2935    def test_iterdir_and_types(self, alpharep):
2936        root = zipfile.Path(alpharep)
2937        assert root.is_dir()
2938        a, b, g = root.iterdir()
2939        assert a.is_file()
2940        assert b.is_dir()
2941        assert g.is_dir()
2942        c, f, d = b.iterdir()
2943        assert c.is_file() and f.is_file()
2944        (e,) = d.iterdir()
2945        assert e.is_file()
2946        (h,) = g.iterdir()
2947        (i,) = h.iterdir()
2948        assert i.is_file()
2949
2950    @pass_alpharep
2951    def test_is_file_missing(self, alpharep):
2952        root = zipfile.Path(alpharep)
2953        assert not root.joinpath('missing.txt').is_file()
2954
2955    @pass_alpharep
2956    def test_iterdir_on_file(self, alpharep):
2957        root = zipfile.Path(alpharep)
2958        a, b, g = root.iterdir()
2959        with self.assertRaises(ValueError):
2960            a.iterdir()
2961
2962    @pass_alpharep
2963    def test_subdir_is_dir(self, alpharep):
2964        root = zipfile.Path(alpharep)
2965        assert (root / 'b').is_dir()
2966        assert (root / 'b/').is_dir()
2967        assert (root / 'g').is_dir()
2968        assert (root / 'g/').is_dir()
2969
2970    @pass_alpharep
2971    def test_open(self, alpharep):
2972        root = zipfile.Path(alpharep)
2973        a, b, g = root.iterdir()
2974        with a.open(encoding="utf-8") as strm:
2975            data = strm.read()
2976        assert data == "content of a"
2977
2978    def test_open_write(self):
2979        """
2980        If the zipfile is open for write, it should be possible to
2981        write bytes or text to it.
2982        """
2983        zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w'))
2984        with zf.joinpath('file.bin').open('wb') as strm:
2985            strm.write(b'binary contents')
2986        with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
2987            strm.write('text file')
2988
2989    def test_open_extant_directory(self):
2990        """
2991        Attempting to open a directory raises IsADirectoryError.
2992        """
2993        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
2994        with self.assertRaises(IsADirectoryError):
2995            zf.joinpath('b').open()
2996
2997    @pass_alpharep
2998    def test_open_binary_invalid_args(self, alpharep):
2999        root = zipfile.Path(alpharep)
3000        with self.assertRaises(ValueError):
3001            root.joinpath('a.txt').open('rb', encoding='utf-8')
3002        with self.assertRaises(ValueError):
3003            root.joinpath('a.txt').open('rb', 'utf-8')
3004
3005    def test_open_missing_directory(self):
3006        """
3007        Attempting to open a missing directory raises FileNotFoundError.
3008        """
3009        zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
3010        with self.assertRaises(FileNotFoundError):
3011            zf.joinpath('z').open()
3012
3013    @pass_alpharep
3014    def test_read(self, alpharep):
3015        root = zipfile.Path(alpharep)
3016        a, b, g = root.iterdir()
3017        assert a.read_text(encoding="utf-8") == "content of a"
3018        assert a.read_bytes() == b"content of a"
3019
3020    @pass_alpharep
3021    def test_joinpath(self, alpharep):
3022        root = zipfile.Path(alpharep)
3023        a = root.joinpath("a.txt")
3024        assert a.is_file()
3025        e = root.joinpath("b").joinpath("d").joinpath("e.txt")
3026        assert e.read_text(encoding="utf-8") == "content of e"
3027
3028    @pass_alpharep
3029    def test_joinpath_multiple(self, alpharep):
3030        root = zipfile.Path(alpharep)
3031        e = root.joinpath("b", "d", "e.txt")
3032        assert e.read_text(encoding="utf-8") == "content of e"
3033
3034    @pass_alpharep
3035    def test_traverse_truediv(self, alpharep):
3036        root = zipfile.Path(alpharep)
3037        a = root / "a.txt"
3038        assert a.is_file()
3039        e = root / "b" / "d" / "e.txt"
3040        assert e.read_text(encoding="utf-8") == "content of e"
3041
3042    @pass_alpharep
3043    def test_traverse_simplediv(self, alpharep):
3044        """
3045        Disable the __future__.division when testing traversal.
3046        """
3047        code = compile(
3048            source="zipfile.Path(alpharep) / 'a'",
3049            filename="(test)",
3050            mode="eval",
3051            dont_inherit=True,
3052        )
3053        eval(code)
3054
3055    @pass_alpharep
3056    def test_pathlike_construction(self, alpharep):
3057        """
3058        zipfile.Path should be constructable from a path-like object
3059        """
3060        zipfile_ondisk = self.zipfile_ondisk(alpharep)
3061        pathlike = pathlib.Path(str(zipfile_ondisk))
3062        zipfile.Path(pathlike)
3063
3064    @pass_alpharep
3065    def test_traverse_pathlike(self, alpharep):
3066        root = zipfile.Path(alpharep)
3067        root / pathlib.Path("a")
3068
3069    @pass_alpharep
3070    def test_parent(self, alpharep):
3071        root = zipfile.Path(alpharep)
3072        assert (root / 'a').parent.at == ''
3073        assert (root / 'a' / 'b').parent.at == 'a/'
3074
3075    @pass_alpharep
3076    def test_dir_parent(self, alpharep):
3077        root = zipfile.Path(alpharep)
3078        assert (root / 'b').parent.at == ''
3079        assert (root / 'b/').parent.at == ''
3080
3081    @pass_alpharep
3082    def test_missing_dir_parent(self, alpharep):
3083        root = zipfile.Path(alpharep)
3084        assert (root / 'missing dir/').parent.at == ''
3085
3086    @pass_alpharep
3087    def test_mutability(self, alpharep):
3088        """
3089        If the underlying zipfile is changed, the Path object should
3090        reflect that change.
3091        """
3092        root = zipfile.Path(alpharep)
3093        a, b, g = root.iterdir()
3094        alpharep.writestr('foo.txt', 'foo')
3095        alpharep.writestr('bar/baz.txt', 'baz')
3096        assert any(child.name == 'foo.txt' for child in root.iterdir())
3097        assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo'
3098        (baz,) = (root / 'bar').iterdir()
3099        assert baz.read_text(encoding="utf-8") == 'baz'
3100
3101    HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13
3102
3103    def huge_zipfile(self):
3104        """Create a read-only zipfile with a huge number of entries entries."""
3105        strm = io.BytesIO()
3106        zf = zipfile.ZipFile(strm, "w")
3107        for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)):
3108            zf.writestr(entry, entry)
3109        zf.mode = 'r'
3110        return zf
3111
3112    def test_joinpath_constant_time(self):
3113        """
3114        Ensure joinpath on items in zipfile is linear time.
3115        """
3116        root = zipfile.Path(self.huge_zipfile())
3117        entries = jaraco.itertools.Counter(root.iterdir())
3118        for entry in entries:
3119            entry.joinpath('suffix')
3120        # Check the file iterated all items
3121        assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
3122
3123    # @func_timeout.func_set_timeout(3)
3124    def test_implied_dirs_performance(self):
3125        data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
3126        zipfile.CompleteDirs._implied_dirs(data)
3127
3128    @pass_alpharep
3129    def test_read_does_not_close(self, alpharep):
3130        alpharep = self.zipfile_ondisk(alpharep)
3131        with zipfile.ZipFile(alpharep) as file:
3132            for rep in range(2):
3133                zipfile.Path(file, 'a.txt').read_text(encoding="utf-8")
3134
3135    @pass_alpharep
3136    def test_subclass(self, alpharep):
3137        class Subclass(zipfile.Path):
3138            pass
3139
3140        root = Subclass(alpharep)
3141        assert isinstance(root / 'b', Subclass)
3142
3143    @pass_alpharep
3144    def test_filename(self, alpharep):
3145        root = zipfile.Path(alpharep)
3146        assert root.filename == pathlib.Path('alpharep.zip')
3147
3148    @pass_alpharep
3149    def test_root_name(self, alpharep):
3150        """
3151        The name of the root should be the name of the zipfile
3152        """
3153        root = zipfile.Path(alpharep)
3154        assert root.name == 'alpharep.zip' == root.filename.name
3155
3156    @pass_alpharep
3157    def test_root_parent(self, alpharep):
3158        root = zipfile.Path(alpharep)
3159        assert root.parent == pathlib.Path('.')
3160        root.root.filename = 'foo/bar.zip'
3161        assert root.parent == pathlib.Path('foo')
3162
3163    @pass_alpharep
3164    def test_root_unnamed(self, alpharep):
3165        """
3166        It is an error to attempt to get the name
3167        or parent of an unnamed zipfile.
3168        """
3169        alpharep.filename = None
3170        root = zipfile.Path(alpharep)
3171        with self.assertRaises(TypeError):
3172            root.name
3173        with self.assertRaises(TypeError):
3174            root.parent
3175
3176        # .name and .parent should still work on subs
3177        sub = root / "b"
3178        assert sub.name == "b"
3179        assert sub.parent
3180
3181    @pass_alpharep
3182    def test_inheritance(self, alpharep):
3183        cls = type('PathChild', (zipfile.Path,), {})
3184        for alpharep in self.zipfile_alpharep():
3185            file = cls(alpharep).joinpath('some dir').parent
3186            assert isinstance(file, cls)
3187
3188
3189if __name__ == "__main__":
3190    unittest.main()
3191