• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import array
2import contextlib
3import importlib.util
4import io
5import itertools
6import os
7import posixpath
8import struct
9import subprocess
10import sys
11import time
12import unittest
13import unittest.mock as mock
14import zipfile
15
16
17from tempfile import TemporaryFile
18from random import randint, random, randbytes
19
20from test import archiver_tests
21from test.support import script_helper
22from test.support import (
23    findfile, requires_zlib, requires_bz2, requires_lzma,
24    captured_stdout, captured_stderr, requires_subprocess
25)
26from test.support.os_helper import (
27    TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count, FakePath
28)
29
30
31TESTFN2 = TESTFN + "2"
32TESTFNDIR = TESTFN + "d"
33FIXEDTEST_SIZE = 1000
34DATAFILES_DIR = 'zipfile_datafiles'
35
36SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
37                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
38                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
39                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
40
41def get_files(test):
42    yield TESTFN2
43    with TemporaryFile() as f:
44        yield f
45        test.assertFalse(f.closed)
46    with io.BytesIO() as f:
47        yield f
48        test.assertFalse(f.closed)
49
50class AbstractTestsWithSourceFile:
51    @classmethod
52    def setUpClass(cls):
53        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
54                              (i, random()), "ascii")
55                        for i in range(FIXEDTEST_SIZE)]
56        cls.data = b''.join(cls.line_gen)
57
58    def setUp(self):
59        # Make a source file with some lines
60        with open(TESTFN, "wb") as fp:
61            fp.write(self.data)
62
63    def make_test_archive(self, f, compression, compresslevel=None):
64        kwargs = {'compression': compression, 'compresslevel': compresslevel}
65        # Create the ZIP archive
66        with zipfile.ZipFile(f, "w", **kwargs) as zipfp:
67            zipfp.write(TESTFN, "another.name")
68            zipfp.write(TESTFN, TESTFN)
69            zipfp.writestr("strfile", self.data)
70            with zipfp.open('written-open-w', mode='w') as f:
71                for line in self.line_gen:
72                    f.write(line)
73
74    def zip_test(self, f, compression, compresslevel=None):
75        self.make_test_archive(f, compression, compresslevel)
76
77        # Read the ZIP archive
78        with zipfile.ZipFile(f, "r", compression) as zipfp:
79            self.assertEqual(zipfp.read(TESTFN), self.data)
80            self.assertEqual(zipfp.read("another.name"), self.data)
81            self.assertEqual(zipfp.read("strfile"), self.data)
82
83            # Print the ZIP directory
84            fp = io.StringIO()
85            zipfp.printdir(file=fp)
86            directory = fp.getvalue()
87            lines = directory.splitlines()
88            self.assertEqual(len(lines), 5) # Number of files + header
89
90            self.assertIn('File Name', lines[0])
91            self.assertIn('Modified', lines[0])
92            self.assertIn('Size', lines[0])
93
94            fn, date, time_, size = lines[1].split()
95            self.assertEqual(fn, 'another.name')
96            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
97            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
98            self.assertEqual(size, str(len(self.data)))
99
100            # Check the namelist
101            names = zipfp.namelist()
102            self.assertEqual(len(names), 4)
103            self.assertIn(TESTFN, names)
104            self.assertIn("another.name", names)
105            self.assertIn("strfile", names)
106            self.assertIn("written-open-w", names)
107
108            # Check infolist
109            infos = zipfp.infolist()
110            names = [i.filename for i in infos]
111            self.assertEqual(len(names), 4)
112            self.assertIn(TESTFN, names)
113            self.assertIn("another.name", names)
114            self.assertIn("strfile", names)
115            self.assertIn("written-open-w", names)
116            for i in infos:
117                self.assertEqual(i.file_size, len(self.data))
118
119            # check getinfo
120            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
121                info = zipfp.getinfo(nm)
122                self.assertEqual(info.filename, nm)
123                self.assertEqual(info.file_size, len(self.data))
124
125            # Check that testzip thinks the archive is ok
126            # (it returns None if all contents could be read properly)
127            self.assertIsNone(zipfp.testzip())
128
129    def test_basic(self):
130        for f in get_files(self):
131            self.zip_test(f, self.compression)
132
133    def zip_open_test(self, f, compression):
134        self.make_test_archive(f, compression)
135
136        # Read the ZIP archive
137        with zipfile.ZipFile(f, "r", compression) as zipfp:
138            zipdata1 = []
139            with zipfp.open(TESTFN) as zipopen1:
140                while True:
141                    read_data = zipopen1.read(256)
142                    if not read_data:
143                        break
144                    zipdata1.append(read_data)
145
146            zipdata2 = []
147            with zipfp.open("another.name") as zipopen2:
148                while True:
149                    read_data = zipopen2.read(256)
150                    if not read_data:
151                        break
152                    zipdata2.append(read_data)
153
154            self.assertEqual(b''.join(zipdata1), self.data)
155            self.assertEqual(b''.join(zipdata2), self.data)
156
157    def test_open(self):
158        for f in get_files(self):
159            self.zip_open_test(f, self.compression)
160
161    def test_open_with_pathlike(self):
162        path = FakePath(TESTFN2)
163        self.zip_open_test(path, self.compression)
164        with zipfile.ZipFile(path, "r", self.compression) as zipfp:
165            self.assertIsInstance(zipfp.filename, str)
166
167    def zip_random_open_test(self, f, compression):
168        self.make_test_archive(f, compression)
169
170        # Read the ZIP archive
171        with zipfile.ZipFile(f, "r", compression) as zipfp:
172            zipdata1 = []
173            with zipfp.open(TESTFN) as zipopen1:
174                while True:
175                    read_data = zipopen1.read(randint(1, 1024))
176                    if not read_data:
177                        break
178                    zipdata1.append(read_data)
179
180            self.assertEqual(b''.join(zipdata1), self.data)
181
182    def test_random_open(self):
183        for f in get_files(self):
184            self.zip_random_open_test(f, self.compression)
185
186    def zip_read1_test(self, f, compression):
187        self.make_test_archive(f, compression)
188
189        # Read the ZIP archive
190        with zipfile.ZipFile(f, "r") as zipfp, \
191             zipfp.open(TESTFN) as zipopen:
192            zipdata = []
193            while True:
194                read_data = zipopen.read1(-1)
195                if not read_data:
196                    break
197                zipdata.append(read_data)
198
199        self.assertEqual(b''.join(zipdata), self.data)
200
201    def test_read1(self):
202        for f in get_files(self):
203            self.zip_read1_test(f, self.compression)
204
205    def zip_read1_10_test(self, f, compression):
206        self.make_test_archive(f, compression)
207
208        # Read the ZIP archive
209        with zipfile.ZipFile(f, "r") as zipfp, \
210             zipfp.open(TESTFN) as zipopen:
211            zipdata = []
212            while True:
213                read_data = zipopen.read1(10)
214                self.assertLessEqual(len(read_data), 10)
215                if not read_data:
216                    break
217                zipdata.append(read_data)
218
219        self.assertEqual(b''.join(zipdata), self.data)
220
221    def test_read1_10(self):
222        for f in get_files(self):
223            self.zip_read1_10_test(f, self.compression)
224
225    def zip_readline_read_test(self, f, compression):
226        self.make_test_archive(f, compression)
227
228        # Read the ZIP archive
229        with zipfile.ZipFile(f, "r") as zipfp, \
230             zipfp.open(TESTFN) as zipopen:
231            data = b''
232            while True:
233                read = zipopen.readline()
234                if not read:
235                    break
236                data += read
237
238                read = zipopen.read(100)
239                if not read:
240                    break
241                data += read
242
243        self.assertEqual(data, self.data)
244
245    def test_readline_read(self):
246        # Issue #7610: calls to readline() interleaved with calls to read().
247        for f in get_files(self):
248            self.zip_readline_read_test(f, self.compression)
249
250    def zip_readline_test(self, f, compression):
251        self.make_test_archive(f, compression)
252
253        # Read the ZIP archive
254        with zipfile.ZipFile(f, "r") as zipfp:
255            with zipfp.open(TESTFN) as zipopen:
256                for line in self.line_gen:
257                    linedata = zipopen.readline()
258                    self.assertEqual(linedata, line)
259
260    def test_readline(self):
261        for f in get_files(self):
262            self.zip_readline_test(f, self.compression)
263
264    def zip_readlines_test(self, f, compression):
265        self.make_test_archive(f, compression)
266
267        # Read the ZIP archive
268        with zipfile.ZipFile(f, "r") as zipfp:
269            with zipfp.open(TESTFN) as zipopen:
270                ziplines = zipopen.readlines()
271            for line, zipline in zip(self.line_gen, ziplines):
272                self.assertEqual(zipline, line)
273
274    def test_readlines(self):
275        for f in get_files(self):
276            self.zip_readlines_test(f, self.compression)
277
278    def zip_iterlines_test(self, f, compression):
279        self.make_test_archive(f, compression)
280
281        # Read the ZIP archive
282        with zipfile.ZipFile(f, "r") as zipfp:
283            with zipfp.open(TESTFN) as zipopen:
284                for line, zipline in zip(self.line_gen, zipopen):
285                    self.assertEqual(zipline, line)
286
287    def test_iterlines(self):
288        for f in get_files(self):
289            self.zip_iterlines_test(f, self.compression)
290
291    def test_low_compression(self):
292        """Check for cases where compressed data is larger than original."""
293        # Create the ZIP archive
294        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
295            zipfp.writestr("strfile", '12')
296
297        # Get an open object for strfile
298        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
299            with zipfp.open("strfile") as openobj:
300                self.assertEqual(openobj.read(1), b'1')
301                self.assertEqual(openobj.read(1), b'2')
302
303    def test_writestr_compression(self):
304        zipfp = zipfile.ZipFile(TESTFN2, "w")
305        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
306        info = zipfp.getinfo('b.txt')
307        self.assertEqual(info.compress_type, self.compression)
308
309    def test_writestr_compresslevel(self):
310        zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1)
311        zipfp.writestr("a.txt", "hello world", compress_type=self.compression)
312        zipfp.writestr("b.txt", "hello world", compress_type=self.compression,
313                       compresslevel=2)
314
315        # Compression level follows the constructor.
316        a_info = zipfp.getinfo('a.txt')
317        self.assertEqual(a_info.compress_type, self.compression)
318        self.assertEqual(a_info.compress_level, 1)
319
320        # Compression level is overridden.
321        b_info = zipfp.getinfo('b.txt')
322        self.assertEqual(b_info.compress_type, self.compression)
323        self.assertEqual(b_info._compresslevel, 2)
324
325    def test_read_return_size(self):
326        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
327        # than requested.
328        for test_size in (1, 4095, 4096, 4097, 16384):
329            file_size = test_size + 1
330            junk = randbytes(file_size)
331            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
332                zipf.writestr('foo', junk)
333                with zipf.open('foo', 'r') as fp:
334                    buf = fp.read(test_size)
335                    self.assertEqual(len(buf), test_size)
336
337    def test_truncated_zipfile(self):
338        fp = io.BytesIO()
339        with zipfile.ZipFile(fp, mode='w') as zipf:
340            zipf.writestr('strfile', self.data, compress_type=self.compression)
341            end_offset = fp.tell()
342        zipfiledata = fp.getvalue()
343
344        fp = io.BytesIO(zipfiledata)
345        with zipfile.ZipFile(fp) as zipf:
346            with zipf.open('strfile') as zipopen:
347                fp.truncate(end_offset - 20)
348                with self.assertRaises(EOFError):
349                    zipopen.read()
350
351        fp = io.BytesIO(zipfiledata)
352        with zipfile.ZipFile(fp) as zipf:
353            with zipf.open('strfile') as zipopen:
354                fp.truncate(end_offset - 20)
355                with self.assertRaises(EOFError):
356                    while zipopen.read(100):
357                        pass
358
359        fp = io.BytesIO(zipfiledata)
360        with zipfile.ZipFile(fp) as zipf:
361            with zipf.open('strfile') as zipopen:
362                fp.truncate(end_offset - 20)
363                with self.assertRaises(EOFError):
364                    while zipopen.read1(100):
365                        pass
366
367    def test_repr(self):
368        fname = 'file.name'
369        for f in get_files(self):
370            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
371                zipfp.write(TESTFN, fname)
372                r = repr(zipfp)
373                self.assertIn("mode='w'", r)
374
375            with zipfile.ZipFile(f, 'r') as zipfp:
376                r = repr(zipfp)
377                if isinstance(f, str):
378                    self.assertIn('filename=%r' % f, r)
379                else:
380                    self.assertIn('file=%r' % f, r)
381                self.assertIn("mode='r'", r)
382                r = repr(zipfp.getinfo(fname))
383                self.assertIn('filename=%r' % fname, r)
384                self.assertIn('filemode=', r)
385                self.assertIn('file_size=', r)
386                if self.compression != zipfile.ZIP_STORED:
387                    self.assertIn('compress_type=', r)
388                    self.assertIn('compress_size=', r)
389                with zipfp.open(fname) as zipopen:
390                    r = repr(zipopen)
391                    self.assertIn('name=%r' % fname, r)
392                    if self.compression != zipfile.ZIP_STORED:
393                        self.assertIn('compress_type=', r)
394                self.assertIn('[closed]', repr(zipopen))
395            self.assertIn('[closed]', repr(zipfp))
396
397    def test_compresslevel_basic(self):
398        for f in get_files(self):
399            self.zip_test(f, self.compression, compresslevel=9)
400
401    def test_per_file_compresslevel(self):
402        """Check that files within a Zip archive can have different
403        compression levels."""
404        with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp:
405            zipfp.write(TESTFN, 'compress_1')
406            zipfp.write(TESTFN, 'compress_9', compresslevel=9)
407            one_info = zipfp.getinfo('compress_1')
408            nine_info = zipfp.getinfo('compress_9')
409            self.assertEqual(one_info._compresslevel, 1)
410            self.assertEqual(nine_info.compress_level, 9)
411
412    def test_writing_errors(self):
413        class BrokenFile(io.BytesIO):
414            def write(self, data):
415                nonlocal count
416                if count is not None:
417                    if count == stop:
418                        raise OSError
419                    count += 1
420                super().write(data)
421
422        stop = 0
423        while True:
424            testfile = BrokenFile()
425            count = None
426            with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp:
427                with zipfp.open('file1', 'w') as f:
428                    f.write(b'data1')
429                count = 0
430                try:
431                    with zipfp.open('file2', 'w') as f:
432                        f.write(b'data2')
433                except OSError:
434                    stop += 1
435                else:
436                    break
437                finally:
438                    count = None
439            with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
440                self.assertEqual(zipfp.namelist(), ['file1'])
441                self.assertEqual(zipfp.read('file1'), b'data1')
442
443        with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp:
444            self.assertEqual(zipfp.namelist(), ['file1', 'file2'])
445            self.assertEqual(zipfp.read('file1'), b'data1')
446            self.assertEqual(zipfp.read('file2'), b'data2')
447
448    def test_zipextfile_attrs(self):
449        fname = "somefile.txt"
450        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
451            zipfp.writestr(fname, "bogus")
452
453        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
454            with zipfp.open(fname) as fid:
455                self.assertEqual(fid.name, fname)
456                self.assertRaises(io.UnsupportedOperation, fid.fileno)
457                self.assertEqual(fid.mode, 'rb')
458                self.assertIs(fid.readable(), True)
459                self.assertIs(fid.writable(), False)
460                self.assertIs(fid.seekable(), True)
461                self.assertIs(fid.closed, False)
462            self.assertIs(fid.closed, True)
463            self.assertEqual(fid.name, fname)
464            self.assertEqual(fid.mode, 'rb')
465            self.assertRaises(io.UnsupportedOperation, fid.fileno)
466            self.assertRaises(ValueError, fid.readable)
467            self.assertIs(fid.writable(), False)
468            self.assertRaises(ValueError, fid.seekable)
469
470    def tearDown(self):
471        unlink(TESTFN)
472        unlink(TESTFN2)
473
474
475class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
476                                unittest.TestCase):
477    compression = zipfile.ZIP_STORED
478    test_low_compression = None
479
480    def zip_test_writestr_permissions(self, f, compression):
481        # Make sure that writestr and open(... mode='w') create files with
482        # mode 0600, when they are passed a name rather than a ZipInfo
483        # instance.
484
485        self.make_test_archive(f, compression)
486        with zipfile.ZipFile(f, "r") as zipfp:
487            zinfo = zipfp.getinfo('strfile')
488            self.assertEqual(zinfo.external_attr, 0o600 << 16)
489
490            zinfo2 = zipfp.getinfo('written-open-w')
491            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
492
493    def test_writestr_permissions(self):
494        for f in get_files(self):
495            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
496
497    def test_absolute_arcnames(self):
498        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
499            zipfp.write(TESTFN, "/absolute")
500
501        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
502            self.assertEqual(zipfp.namelist(), ["absolute"])
503
504    def test_append_to_zip_file(self):
505        """Test appending to an existing zipfile."""
506        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
507            zipfp.write(TESTFN, TESTFN)
508
509        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
510            zipfp.writestr("strfile", self.data)
511            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
512
513    def test_append_to_non_zip_file(self):
514        """Test appending to an existing file that is not a zipfile."""
515        # NOTE: this test fails if len(d) < 22 because of the first
516        # line "fpin.seek(-22, 2)" in _EndRecData
517        data = b'I am not a ZipFile!'*10
518        with open(TESTFN2, 'wb') as f:
519            f.write(data)
520
521        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
522            zipfp.write(TESTFN, TESTFN)
523
524        with open(TESTFN2, 'rb') as f:
525            f.seek(len(data))
526            with zipfile.ZipFile(f, "r") as zipfp:
527                self.assertEqual(zipfp.namelist(), [TESTFN])
528                self.assertEqual(zipfp.read(TESTFN), self.data)
529        with open(TESTFN2, 'rb') as f:
530            self.assertEqual(f.read(len(data)), data)
531            zipfiledata = f.read()
532        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
533            self.assertEqual(zipfp.namelist(), [TESTFN])
534            self.assertEqual(zipfp.read(TESTFN), self.data)
535
536    def test_read_concatenated_zip_file(self):
537        with io.BytesIO() as bio:
538            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
539                zipfp.write(TESTFN, TESTFN)
540            zipfiledata = bio.getvalue()
541        data = b'I am not a ZipFile!'*10
542        with open(TESTFN2, 'wb') as f:
543            f.write(data)
544            f.write(zipfiledata)
545
546        with zipfile.ZipFile(TESTFN2) as zipfp:
547            self.assertEqual(zipfp.namelist(), [TESTFN])
548            self.assertEqual(zipfp.read(TESTFN), self.data)
549
550    def test_append_to_concatenated_zip_file(self):
551        with io.BytesIO() as bio:
552            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
553                zipfp.write(TESTFN, TESTFN)
554            zipfiledata = bio.getvalue()
555        data = b'I am not a ZipFile!'*1000000
556        with open(TESTFN2, 'wb') as f:
557            f.write(data)
558            f.write(zipfiledata)
559
560        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
561            self.assertEqual(zipfp.namelist(), [TESTFN])
562            zipfp.writestr('strfile', self.data)
563
564        with open(TESTFN2, 'rb') as f:
565            self.assertEqual(f.read(len(data)), data)
566            zipfiledata = f.read()
567        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
568            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
569            self.assertEqual(zipfp.read(TESTFN), self.data)
570            self.assertEqual(zipfp.read('strfile'), self.data)
571
572    def test_ignores_newline_at_end(self):
573        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
574            zipfp.write(TESTFN, TESTFN)
575        with open(TESTFN2, 'a', encoding='utf-8') as f:
576            f.write("\r\n\00\00\00")
577        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
578            self.assertIsInstance(zipfp, zipfile.ZipFile)
579
580    def test_ignores_stuff_appended_past_comments(self):
581        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
582            zipfp.comment = b"this is a comment"
583            zipfp.write(TESTFN, TESTFN)
584        with open(TESTFN2, 'a', encoding='utf-8') as f:
585            f.write("abcdef\r\n")
586        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
587            self.assertIsInstance(zipfp, zipfile.ZipFile)
588            self.assertEqual(zipfp.comment, b"this is a comment")
589
590    def test_write_default_name(self):
591        """Check that calling ZipFile.write without arcname specified
592        produces the expected result."""
593        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
594            zipfp.write(TESTFN)
595            with open(TESTFN, "rb") as f:
596                self.assertEqual(zipfp.read(TESTFN), f.read())
597
598    def test_io_on_closed_zipextfile(self):
599        fname = "somefile.txt"
600        with zipfile.ZipFile(TESTFN2, mode="w", compression=self.compression) as zipfp:
601            zipfp.writestr(fname, "bogus")
602
603        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
604            with zipfp.open(fname) as fid:
605                fid.close()
606                self.assertIs(fid.closed, True)
607                self.assertRaises(ValueError, fid.read)
608                self.assertRaises(ValueError, fid.seek, 0)
609                self.assertRaises(ValueError, fid.tell)
610
611    def test_write_to_readonly(self):
612        """Check that trying to call write() on a readonly ZipFile object
613        raises a ValueError."""
614        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
615            zipfp.writestr("somefile.txt", "bogus")
616
617        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
618            self.assertRaises(ValueError, zipfp.write, TESTFN)
619
620        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
621            with self.assertRaises(ValueError):
622                zipfp.open(TESTFN, mode='w')
623
624    def test_add_file_before_1980(self):
625        # Set atime and mtime to 1970-01-01
626        os.utime(TESTFN, (0, 0))
627        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
628            self.assertRaises(ValueError, zipfp.write, TESTFN)
629
630        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
631            zipfp.write(TESTFN)
632            zinfo = zipfp.getinfo(TESTFN)
633            self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0))
634
635    def test_add_file_after_2107(self):
636        # Set atime and mtime to 2108-12-30
637        ts = 4386268800
638        try:
639            time.localtime(ts)
640        except OverflowError:
641            self.skipTest(f'time.localtime({ts}) raises OverflowError')
642        try:
643            os.utime(TESTFN, (ts, ts))
644        except OverflowError:
645            self.skipTest('Host fs cannot set timestamp to required value.')
646
647        mtime_ns = os.stat(TESTFN).st_mtime_ns
648        if mtime_ns != (4386268800 * 10**9):
649            # XFS filesystem is limited to 32-bit timestamp, but the syscall
650            # didn't fail. Moreover, there is a VFS bug which returns
651            # a cached timestamp which is different than the value on disk.
652            #
653            # Test st_mtime_ns rather than st_mtime to avoid rounding issues.
654            #
655            # https://bugzilla.redhat.com/show_bug.cgi?id=1795576
656            # https://bugs.python.org/issue39460#msg360952
657            self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}")
658
659        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
660            self.assertRaises(struct.error, zipfp.write, TESTFN)
661
662        with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp:
663            zipfp.write(TESTFN)
664            zinfo = zipfp.getinfo(TESTFN)
665            self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59))
666
667
668@requires_zlib()
669class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
670                                 unittest.TestCase):
671    compression = zipfile.ZIP_DEFLATED
672
673    def test_per_file_compression(self):
674        """Check that files within a Zip archive can have different
675        compression options."""
676        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
677            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
678            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
679            sinfo = zipfp.getinfo('storeme')
680            dinfo = zipfp.getinfo('deflateme')
681            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
682            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
683
684@requires_bz2()
685class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
686                               unittest.TestCase):
687    compression = zipfile.ZIP_BZIP2
688
689@requires_lzma()
690class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
691                              unittest.TestCase):
692    compression = zipfile.ZIP_LZMA
693
694
695class AbstractTestZip64InSmallFiles:
696    # These tests test the ZIP64 functionality without using large files,
697    # see test_zipfile64 for proper tests.
698
699    @classmethod
700    def setUpClass(cls):
701        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
702                    for i in range(0, FIXEDTEST_SIZE))
703        cls.data = b'\n'.join(line_gen)
704
705    def setUp(self):
706        self._limit = zipfile.ZIP64_LIMIT
707        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
708        zipfile.ZIP64_LIMIT = 1000
709        zipfile.ZIP_FILECOUNT_LIMIT = 9
710
711        # Make a source file with some lines
712        with open(TESTFN, "wb") as fp:
713            fp.write(self.data)
714
715    def zip_test(self, f, compression):
716        # Create the ZIP archive
717        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
718            zipfp.write(TESTFN, "another.name")
719            zipfp.write(TESTFN, TESTFN)
720            zipfp.writestr("strfile", self.data)
721
722        # Read the ZIP archive
723        with zipfile.ZipFile(f, "r", compression) as zipfp:
724            self.assertEqual(zipfp.read(TESTFN), self.data)
725            self.assertEqual(zipfp.read("another.name"), self.data)
726            self.assertEqual(zipfp.read("strfile"), self.data)
727
728            # Print the ZIP directory
729            fp = io.StringIO()
730            zipfp.printdir(fp)
731
732            directory = fp.getvalue()
733            lines = directory.splitlines()
734            self.assertEqual(len(lines), 4) # Number of files + header
735
736            self.assertIn('File Name', lines[0])
737            self.assertIn('Modified', lines[0])
738            self.assertIn('Size', lines[0])
739
740            fn, date, time_, size = lines[1].split()
741            self.assertEqual(fn, 'another.name')
742            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
743            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
744            self.assertEqual(size, str(len(self.data)))
745
746            # Check the namelist
747            names = zipfp.namelist()
748            self.assertEqual(len(names), 3)
749            self.assertIn(TESTFN, names)
750            self.assertIn("another.name", names)
751            self.assertIn("strfile", names)
752
753            # Check infolist
754            infos = zipfp.infolist()
755            names = [i.filename for i in infos]
756            self.assertEqual(len(names), 3)
757            self.assertIn(TESTFN, names)
758            self.assertIn("another.name", names)
759            self.assertIn("strfile", names)
760            for i in infos:
761                self.assertEqual(i.file_size, len(self.data))
762
763            # check getinfo
764            for nm in (TESTFN, "another.name", "strfile"):
765                info = zipfp.getinfo(nm)
766                self.assertEqual(info.filename, nm)
767                self.assertEqual(info.file_size, len(self.data))
768
769            # Check that testzip thinks the archive is valid
770            self.assertIsNone(zipfp.testzip())
771
772    def test_basic(self):
773        for f in get_files(self):
774            self.zip_test(f, self.compression)
775
776    def test_too_many_files(self):
777        # This test checks that more than 64k files can be added to an archive,
778        # and that the resulting archive can be read properly by ZipFile
779        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
780                               allowZip64=True)
781        zipf.debug = 100
782        numfiles = 15
783        for i in range(numfiles):
784            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
785        self.assertEqual(len(zipf.namelist()), numfiles)
786        zipf.close()
787
788        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
789        self.assertEqual(len(zipf2.namelist()), numfiles)
790        for i in range(numfiles):
791            content = zipf2.read("foo%08d" % i).decode('ascii')
792            self.assertEqual(content, "%d" % (i**3 % 57))
793        zipf2.close()
794
795    def test_too_many_files_append(self):
796        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
797                               allowZip64=False)
798        zipf.debug = 100
799        numfiles = 9
800        for i in range(numfiles):
801            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
802        self.assertEqual(len(zipf.namelist()), numfiles)
803        with self.assertRaises(zipfile.LargeZipFile):
804            zipf.writestr("foo%08d" % numfiles, b'')
805        self.assertEqual(len(zipf.namelist()), numfiles)
806        zipf.close()
807
808        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
809                               allowZip64=False)
810        zipf.debug = 100
811        self.assertEqual(len(zipf.namelist()), numfiles)
812        with self.assertRaises(zipfile.LargeZipFile):
813            zipf.writestr("foo%08d" % numfiles, b'')
814        self.assertEqual(len(zipf.namelist()), numfiles)
815        zipf.close()
816
817        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
818                               allowZip64=True)
819        zipf.debug = 100
820        self.assertEqual(len(zipf.namelist()), numfiles)
821        numfiles2 = 15
822        for i in range(numfiles, numfiles2):
823            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
824        self.assertEqual(len(zipf.namelist()), numfiles2)
825        zipf.close()
826
827        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
828        self.assertEqual(len(zipf2.namelist()), numfiles2)
829        for i in range(numfiles2):
830            content = zipf2.read("foo%08d" % i).decode('ascii')
831            self.assertEqual(content, "%d" % (i**3 % 57))
832        zipf2.close()
833
834    def tearDown(self):
835        zipfile.ZIP64_LIMIT = self._limit
836        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
837        unlink(TESTFN)
838        unlink(TESTFN2)
839
840
841class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
842                                  unittest.TestCase):
843    compression = zipfile.ZIP_STORED
844
845    def large_file_exception_test(self, f, compression):
846        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
847            self.assertRaises(zipfile.LargeZipFile,
848                              zipfp.write, TESTFN, "another.name")
849
850    def large_file_exception_test2(self, f, compression):
851        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
852            self.assertRaises(zipfile.LargeZipFile,
853                              zipfp.writestr, "another.name", self.data)
854
855    def test_large_file_exception(self):
856        for f in get_files(self):
857            self.large_file_exception_test(f, zipfile.ZIP_STORED)
858            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
859
860    def test_absolute_arcnames(self):
861        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
862                             allowZip64=True) as zipfp:
863            zipfp.write(TESTFN, "/absolute")
864
865        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
866            self.assertEqual(zipfp.namelist(), ["absolute"])
867
868    def test_append(self):
869        # Test that appending to the Zip64 archive doesn't change
870        # extra fields of existing entries.
871        with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp:
872            zipfp.writestr("strfile", self.data)
873        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
874            zinfo = zipfp.getinfo("strfile")
875            extra = zinfo.extra
876        with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp:
877            zipfp.writestr("strfile2", self.data)
878        with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp:
879            zinfo = zipfp.getinfo("strfile")
880            self.assertEqual(zinfo.extra, extra)
881
882    def make_zip64_file(
883        self, file_size_64_set=False, file_size_extra=False,
884        compress_size_64_set=False, compress_size_extra=False,
885        header_offset_64_set=False, header_offset_extra=False,
886    ):
887        """Generate bytes sequence for a zip with (incomplete) zip64 data.
888
889        The actual values (not the zip 64 0xffffffff values) stored in the file
890        are:
891        file_size: 8
892        compress_size: 8
893        header_offset: 0
894        """
895        actual_size = 8
896        actual_header_offset = 0
897        local_zip64_fields = []
898        central_zip64_fields = []
899
900        file_size = actual_size
901        if file_size_64_set:
902            file_size = 0xffffffff
903            if file_size_extra:
904                local_zip64_fields.append(actual_size)
905                central_zip64_fields.append(actual_size)
906        file_size = struct.pack("<L", file_size)
907
908        compress_size = actual_size
909        if compress_size_64_set:
910            compress_size = 0xffffffff
911            if compress_size_extra:
912                local_zip64_fields.append(actual_size)
913                central_zip64_fields.append(actual_size)
914        compress_size = struct.pack("<L", compress_size)
915
916        header_offset = actual_header_offset
917        if header_offset_64_set:
918            header_offset = 0xffffffff
919            if header_offset_extra:
920                central_zip64_fields.append(actual_header_offset)
921        header_offset = struct.pack("<L", header_offset)
922
923        local_extra = struct.pack(
924            '<HH' + 'Q'*len(local_zip64_fields),
925            0x0001,
926            8*len(local_zip64_fields),
927            *local_zip64_fields
928        )
929
930        central_extra = struct.pack(
931            '<HH' + 'Q'*len(central_zip64_fields),
932            0x0001,
933            8*len(central_zip64_fields),
934            *central_zip64_fields
935        )
936
937        central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
938        offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
939
940        local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
941        central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
942
943        filename = b"test.txt"
944        content = b"test1234"
945        filename_length = struct.pack("<H", len(filename))
946        zip64_contents = (
947            # Local file header
948            b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
949            + compress_size
950            + file_size
951            + filename_length
952            + local_extra_length
953            + filename
954            + local_extra
955            + content
956            # Central directory:
957            + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf"
958            + compress_size
959            + file_size
960            + filename_length
961            + central_extra_length
962            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01"
963            + header_offset
964            + filename
965            + central_extra
966            # Zip64 end of central directory
967            + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
968            + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
969            + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
970            + central_dir_size
971            + offset_to_central_dir
972            # Zip64 end of central directory locator
973            + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
974            + b"\x00\x00\x00"
975            # end of central directory
976            + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
977            + b"\x00\x00\x00\x00"
978        )
979        return zip64_contents
980
981    def test_bad_zip64_extra(self):
982        """Missing zip64 extra records raises an exception.
983
984        There are 4 fields that the zip64 format handles (the disk number is
985        not used in this module and so is ignored here). According to the zip
986        spec:
987              The order of the fields in the zip64 extended
988              information record is fixed, but the fields MUST
989              only appear if the corresponding Local or Central
990              directory record field is set to 0xFFFF or 0xFFFFFFFF.
991
992        If the zip64 extra content doesn't contain enough entries for the
993        number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error.
994        This test mismatches the length of the zip64 extra field and the number
995        of fields set to indicate the presence of zip64 data.
996        """
997        # zip64 file size present, no fields in extra, expecting one, equals
998        # missing file size.
999        missing_file_size_extra = self.make_zip64_file(
1000            file_size_64_set=True,
1001        )
1002        with self.assertRaises(zipfile.BadZipFile) as e:
1003            zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
1004        self.assertIn('file size', str(e.exception).lower())
1005
1006        # zip64 file size present, zip64 compress size present, one field in
1007        # extra, expecting two, equals missing compress size.
1008        missing_compress_size_extra = self.make_zip64_file(
1009            file_size_64_set=True,
1010            file_size_extra=True,
1011            compress_size_64_set=True,
1012        )
1013        with self.assertRaises(zipfile.BadZipFile) as e:
1014            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1015        self.assertIn('compress size', str(e.exception).lower())
1016
1017        # zip64 compress size present, no fields in extra, expecting one,
1018        # equals missing compress size.
1019        missing_compress_size_extra = self.make_zip64_file(
1020            compress_size_64_set=True,
1021        )
1022        with self.assertRaises(zipfile.BadZipFile) as e:
1023            zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
1024        self.assertIn('compress size', str(e.exception).lower())
1025
1026        # zip64 file size present, zip64 compress size present, zip64 header
1027        # offset present, two fields in extra, expecting three, equals missing
1028        # header offset
1029        missing_header_offset_extra = self.make_zip64_file(
1030            file_size_64_set=True,
1031            file_size_extra=True,
1032            compress_size_64_set=True,
1033            compress_size_extra=True,
1034            header_offset_64_set=True,
1035        )
1036        with self.assertRaises(zipfile.BadZipFile) as e:
1037            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1038        self.assertIn('header offset', str(e.exception).lower())
1039
1040        # zip64 compress size present, zip64 header offset present, one field
1041        # in extra, expecting two, equals missing header offset
1042        missing_header_offset_extra = self.make_zip64_file(
1043            file_size_64_set=False,
1044            compress_size_64_set=True,
1045            compress_size_extra=True,
1046            header_offset_64_set=True,
1047        )
1048        with self.assertRaises(zipfile.BadZipFile) as e:
1049            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1050        self.assertIn('header offset', str(e.exception).lower())
1051
1052        # zip64 file size present, zip64 header offset present, one field in
1053        # extra, expecting two, equals missing header offset
1054        missing_header_offset_extra = self.make_zip64_file(
1055            file_size_64_set=True,
1056            file_size_extra=True,
1057            compress_size_64_set=False,
1058            header_offset_64_set=True,
1059        )
1060        with self.assertRaises(zipfile.BadZipFile) as e:
1061            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1062        self.assertIn('header offset', str(e.exception).lower())
1063
1064        # zip64 header offset present, no fields in extra, expecting one,
1065        # equals missing header offset
1066        missing_header_offset_extra = self.make_zip64_file(
1067            file_size_64_set=False,
1068            compress_size_64_set=False,
1069            header_offset_64_set=True,
1070        )
1071        with self.assertRaises(zipfile.BadZipFile) as e:
1072            zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
1073        self.assertIn('header offset', str(e.exception).lower())
1074
1075    def test_generated_valid_zip64_extra(self):
1076        # These values are what is set in the make_zip64_file method.
1077        expected_file_size = 8
1078        expected_compress_size = 8
1079        expected_header_offset = 0
1080        expected_content = b"test1234"
1081
1082        # Loop through the various valid combinations of zip64 masks
1083        # present and extra fields present.
1084        params = (
1085            {"file_size_64_set": True, "file_size_extra": True},
1086            {"compress_size_64_set": True, "compress_size_extra": True},
1087            {"header_offset_64_set": True, "header_offset_extra": True},
1088        )
1089
1090        for r in range(1, len(params) + 1):
1091            for combo in itertools.combinations(params, r):
1092                kwargs = {}
1093                for c in combo:
1094                    kwargs.update(c)
1095                with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf:
1096                    zinfo = zf.infolist()[0]
1097                    self.assertEqual(zinfo.file_size, expected_file_size)
1098                    self.assertEqual(zinfo.compress_size, expected_compress_size)
1099                    self.assertEqual(zinfo.header_offset, expected_header_offset)
1100                    self.assertEqual(zf.read(zinfo), expected_content)
1101
1102    def test_force_zip64(self):
1103        """Test that forcing zip64 extensions correctly notes this in the zip file"""
1104
1105        # GH-103861 describes an issue where forcing a small file to use zip64
1106        # extensions would add a zip64 extra record, but not change the data
1107        # sizes to 0xFFFFFFFF to indicate to the extractor that the zip64
1108        # record should be read. Additionally, it would not set the required
1109        # version to indicate that zip64 extensions are required to extract it.
1110        # This test replicates the situation and reads the raw data to specifically ensure:
1111        #  - The required extract version is always >= ZIP64_VERSION
1112        #  - The compressed and uncompressed size in the file headers are both
1113        #     0xFFFFFFFF (ie. point to zip64 record)
1114        #  - The zip64 record is provided and has the correct sizes in it
1115        # Other aspects of the zip are checked as well, but verifying the above is the main goal.
1116        # Because this is hard to verify by parsing the data as a zip, the raw
1117        # bytes are checked to ensure that they line up with the zip spec.
1118        # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT
1119        # The relevant sections for this test are:
1120        #  - 4.3.7 for local file header
1121        #  - 4.5.3 for zip64 extra field
1122
1123        data = io.BytesIO()
1124        with zipfile.ZipFile(data, mode="w", allowZip64=True) as zf:
1125            with zf.open("text.txt", mode="w", force_zip64=True) as zi:
1126                zi.write(b"_")
1127
1128        zipdata = data.getvalue()
1129
1130        # pull out and check zip information
1131        (
1132            header, vers, os, flags, comp, csize, usize, fn_len,
1133            ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, cd_sig
1134        ) = struct.unpack("<4sBBHH8xIIHH8shhQQx4s", zipdata[:63])
1135
1136        self.assertEqual(header, b"PK\x03\x04")  # local file header
1137        self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1138        self.assertEqual(os, 0)  # compatible with MS-DOS
1139        self.assertEqual(flags, 0)  # no flags
1140        self.assertEqual(comp, 0)  # compression method = stored
1141        self.assertEqual(csize, 0xFFFFFFFF)  # sizes are in zip64 extra
1142        self.assertEqual(usize, 0xFFFFFFFF)
1143        self.assertEqual(fn_len, 8)  # filename len
1144        self.assertEqual(ex_total_len, 20)  # size of extra records
1145        self.assertEqual(ex_id, 1)  # Zip64 extra record
1146        self.assertEqual(ex_len, 16)  # 16 bytes of data
1147        self.assertEqual(ex_usize, 1)  # uncompressed size
1148        self.assertEqual(ex_csize, 1)  # compressed size
1149        self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next
1150
1151        z = zipfile.ZipFile(io.BytesIO(zipdata))
1152        zinfos = z.infolist()
1153        self.assertEqual(len(zinfos), 1)
1154        self.assertGreaterEqual(zinfos[0].extract_version, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1155
1156    def test_unseekable_zip_unknown_filesize(self):
1157        """Test that creating a zip with/without seeking will raise a RuntimeError if zip64 was required but not used"""
1158
1159        def make_zip(fp):
1160            with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf:
1161                with zf.open("text.txt", mode="w", force_zip64=False) as zi:
1162                    zi.write(b"_" * (zipfile.ZIP64_LIMIT + 1))
1163
1164        self.assertRaises(RuntimeError, make_zip, io.BytesIO())
1165        self.assertRaises(RuntimeError, make_zip, Unseekable(io.BytesIO()))
1166
1167    def test_zip64_required_not_allowed_fail(self):
1168        """Test that trying to add a large file to a zip that doesn't allow zip64 extensions fails on add"""
1169        def make_zip(fp):
1170            with zipfile.ZipFile(fp, mode="w", allowZip64=False) as zf:
1171                # pretend zipfile.ZipInfo.from_file was used to get the name and filesize
1172                info = zipfile.ZipInfo("text.txt")
1173                info.file_size = zipfile.ZIP64_LIMIT + 1
1174                zf.open(info, mode="w")
1175
1176        self.assertRaises(zipfile.LargeZipFile, make_zip, io.BytesIO())
1177        self.assertRaises(zipfile.LargeZipFile, make_zip, Unseekable(io.BytesIO()))
1178
1179    def test_unseekable_zip_known_filesize(self):
1180        """Test that creating a zip without seeking will use zip64 extensions if the file size is provided up-front"""
1181
1182        # This test ensures that the zip will use a zip64 data descriptor (same
1183        # as a regular data descriptor except the sizes are 8 bytes instead of
1184        # 4) record to communicate the size of a file if the zip is being
1185        # written to an unseekable stream.
1186        # Because this sort of thing is hard to verify by parsing the data back
1187        # in as a zip, this test looks at the raw bytes created to ensure that
1188        # the correct data has been generated.
1189        # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT
1190        # The relevant sections for this test are:
1191        #  - 4.3.7 for local file header
1192        #  - 4.3.9 for the data descriptor
1193        #  - 4.5.3 for zip64 extra field
1194
1195        file_size = zipfile.ZIP64_LIMIT + 1
1196
1197        def make_zip(fp):
1198            with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf:
1199                # pretend zipfile.ZipInfo.from_file was used to get the name and filesize
1200                info = zipfile.ZipInfo("text.txt")
1201                info.file_size = file_size
1202                with zf.open(info, mode="w", force_zip64=False) as zi:
1203                    zi.write(b"_" * file_size)
1204            return fp
1205
1206        # check seekable file information
1207        seekable_data = make_zip(io.BytesIO()).getvalue()
1208        (
1209            header, vers, os, flags, comp, csize, usize, fn_len,
1210            ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize,
1211            cd_sig
1212        ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s".format(file_size), seekable_data[:62 + file_size])
1213
1214        self.assertEqual(header, b"PK\x03\x04")  # local file header
1215        self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1216        self.assertEqual(os, 0)  # compatible with MS-DOS
1217        self.assertEqual(flags, 0)  # no flags set
1218        self.assertEqual(comp, 0)  # compression method = stored
1219        self.assertEqual(csize, 0xFFFFFFFF)  # sizes are in zip64 extra
1220        self.assertEqual(usize, 0xFFFFFFFF)
1221        self.assertEqual(fn_len, 8)  # filename len
1222        self.assertEqual(ex_total_len, 20)  # size of extra records
1223        self.assertEqual(ex_id, 1)  # Zip64 extra record
1224        self.assertEqual(ex_len, 16)  # 16 bytes of data
1225        self.assertEqual(ex_usize, file_size)  # uncompressed size
1226        self.assertEqual(ex_csize, file_size)  # compressed size
1227        self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next
1228
1229        # check unseekable file information
1230        unseekable_data = make_zip(Unseekable(io.BytesIO())).fp.getvalue()
1231        (
1232            header, vers, os, flags, comp, csize, usize, fn_len,
1233            ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize,
1234            dd_header, dd_usize, dd_csize, cd_sig
1235        ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s4xQQ4s".format(file_size), unseekable_data[:86 + file_size])
1236
1237        self.assertEqual(header, b"PK\x03\x04")  # local file header
1238        self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION)  # requires zip64 to extract
1239        self.assertEqual(os, 0)  # compatible with MS-DOS
1240        self.assertEqual("{:b}".format(flags), "1000")  # streaming flag set
1241        self.assertEqual(comp, 0)  # compression method = stored
1242        self.assertEqual(csize, 0xFFFFFFFF)  # sizes are in zip64 extra
1243        self.assertEqual(usize, 0xFFFFFFFF)
1244        self.assertEqual(fn_len, 8)  # filename len
1245        self.assertEqual(ex_total_len, 20)  # size of extra records
1246        self.assertEqual(ex_id, 1)  # Zip64 extra record
1247        self.assertEqual(ex_len, 16)  # 16 bytes of data
1248        self.assertEqual(ex_usize, 0)  # uncompressed size - 0 to defer to data descriptor
1249        self.assertEqual(ex_csize, 0)  # compressed size - 0 to defer to data descriptor
1250        self.assertEqual(dd_header, b"PK\07\x08")  # data descriptor
1251        self.assertEqual(dd_usize, file_size)  # file size (8 bytes because zip64)
1252        self.assertEqual(dd_csize, file_size)  # compressed size (8 bytes because zip64)
1253        self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next
1254
1255
1256@requires_zlib()
1257class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1258                                   unittest.TestCase):
1259    compression = zipfile.ZIP_DEFLATED
1260
1261@requires_bz2()
1262class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1263                                 unittest.TestCase):
1264    compression = zipfile.ZIP_BZIP2
1265
1266@requires_lzma()
1267class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
1268                                unittest.TestCase):
1269    compression = zipfile.ZIP_LZMA
1270
1271
1272class AbstractWriterTests:
1273
1274    def tearDown(self):
1275        unlink(TESTFN2)
1276
1277    def test_close_after_close(self):
1278        data = b'content'
1279        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1280            w = zipf.open('test', 'w')
1281            w.write(data)
1282            w.close()
1283            self.assertTrue(w.closed)
1284            w.close()
1285            self.assertTrue(w.closed)
1286            self.assertEqual(zipf.read('test'), data)
1287
1288    def test_write_after_close(self):
1289        data = b'content'
1290        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf:
1291            w = zipf.open('test', 'w')
1292            w.write(data)
1293            w.close()
1294            self.assertTrue(w.closed)
1295            self.assertRaises(ValueError, w.write, b'')
1296            self.assertEqual(zipf.read('test'), data)
1297
1298    def test_issue44439(self):
1299        q = array.array('Q', [1, 2, 3, 4, 5])
1300        LENGTH = len(q) * q.itemsize
1301        with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip:
1302            with zip.open('data', 'w') as data:
1303                self.assertEqual(data.write(q), LENGTH)
1304            self.assertEqual(zip.getinfo('data').file_size, LENGTH)
1305
1306    def test_zipwritefile_attrs(self):
1307        fname = "somefile.txt"
1308        with zipfile.ZipFile(TESTFN2, mode="w", compression=self.compression) as zipfp:
1309            with zipfp.open(fname, 'w') as fid:
1310                self.assertEqual(fid.name, fname)
1311                self.assertRaises(io.UnsupportedOperation, fid.fileno)
1312                self.assertEqual(fid.mode, 'wb')
1313                self.assertIs(fid.readable(), False)
1314                self.assertIs(fid.writable(), True)
1315                self.assertIs(fid.seekable(), False)
1316                self.assertIs(fid.closed, False)
1317            self.assertIs(fid.closed, True)
1318            self.assertEqual(fid.name, fname)
1319            self.assertEqual(fid.mode, 'wb')
1320            self.assertRaises(io.UnsupportedOperation, fid.fileno)
1321            self.assertIs(fid.readable(), False)
1322            self.assertIs(fid.writable(), True)
1323            self.assertIs(fid.seekable(), False)
1324
1325class StoredWriterTests(AbstractWriterTests, unittest.TestCase):
1326    compression = zipfile.ZIP_STORED
1327
1328@requires_zlib()
1329class DeflateWriterTests(AbstractWriterTests, unittest.TestCase):
1330    compression = zipfile.ZIP_DEFLATED
1331
1332@requires_bz2()
1333class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase):
1334    compression = zipfile.ZIP_BZIP2
1335
1336@requires_lzma()
1337class LzmaWriterTests(AbstractWriterTests, unittest.TestCase):
1338    compression = zipfile.ZIP_LZMA
1339
1340
1341class PyZipFileTests(unittest.TestCase):
1342    def assertCompiledIn(self, name, namelist):
1343        if name + 'o' not in namelist:
1344            self.assertIn(name + 'c', namelist)
1345
1346    def requiresWriteAccess(self, path):
1347        # effective_ids unavailable on windows
1348        if not os.access(path, os.W_OK,
1349                         effective_ids=os.access in os.supports_effective_ids):
1350            self.skipTest('requires write access to the installed location')
1351        filename = os.path.join(path, 'test_zipfile.try')
1352        try:
1353            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
1354            os.close(fd)
1355        except Exception:
1356            self.skipTest('requires write access to the installed location')
1357        unlink(filename)
1358
1359    def test_write_pyfile(self):
1360        self.requiresWriteAccess(os.path.dirname(__file__))
1361        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1362            fn = __file__
1363            if fn.endswith('.pyc'):
1364                path_split = fn.split(os.sep)
1365                if os.altsep is not None:
1366                    path_split.extend(fn.split(os.altsep))
1367                if '__pycache__' in path_split:
1368                    fn = importlib.util.source_from_cache(fn)
1369                else:
1370                    fn = fn[:-1]
1371
1372            zipfp.writepy(fn)
1373
1374            bn = os.path.basename(fn)
1375            self.assertNotIn(bn, zipfp.namelist())
1376            self.assertCompiledIn(bn, zipfp.namelist())
1377
1378        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1379            fn = __file__
1380            if fn.endswith('.pyc'):
1381                fn = fn[:-1]
1382
1383            zipfp.writepy(fn, "testpackage")
1384
1385            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
1386            self.assertNotIn(bn, zipfp.namelist())
1387            self.assertCompiledIn(bn, zipfp.namelist())
1388
1389    def test_write_python_package(self):
1390        import email
1391        packagedir = os.path.dirname(email.__file__)
1392        self.requiresWriteAccess(packagedir)
1393
1394        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1395            zipfp.writepy(packagedir)
1396
1397            # Check for a couple of modules at different levels of the
1398            # hierarchy
1399            names = zipfp.namelist()
1400            self.assertCompiledIn('email/__init__.py', names)
1401            self.assertCompiledIn('email/mime/text.py', names)
1402
1403    def test_write_filtered_python_package(self):
1404        import test
1405        packagedir = os.path.dirname(test.__file__)
1406        self.requiresWriteAccess(packagedir)
1407
1408        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1409
1410            # first make sure that the test folder gives error messages
1411            # (on the badsyntax_... files)
1412            with captured_stdout() as reportSIO:
1413                zipfp.writepy(packagedir)
1414            reportStr = reportSIO.getvalue()
1415            self.assertTrue('SyntaxError' in reportStr)
1416
1417            # then check that the filter works on the whole package
1418            with captured_stdout() as reportSIO:
1419                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
1420            reportStr = reportSIO.getvalue()
1421            self.assertTrue('SyntaxError' not in reportStr)
1422
1423            # then check that the filter works on individual files
1424            def filter(path):
1425                return not os.path.basename(path).startswith("bad")
1426            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
1427                zipfp.writepy(packagedir, filterfunc=filter)
1428            reportStr = reportSIO.getvalue()
1429            if reportStr:
1430                print(reportStr)
1431            self.assertTrue('SyntaxError' not in reportStr)
1432
1433    def test_write_with_optimization(self):
1434        import email
1435        packagedir = os.path.dirname(email.__file__)
1436        self.requiresWriteAccess(packagedir)
1437        optlevel = 1 if __debug__ else 0
1438        ext = '.pyc'
1439
1440        with TemporaryFile() as t, \
1441             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
1442            zipfp.writepy(packagedir)
1443
1444            names = zipfp.namelist()
1445            self.assertIn('email/__init__' + ext, names)
1446            self.assertIn('email/mime/text' + ext, names)
1447
1448    def test_write_python_directory(self):
1449        os.mkdir(TESTFN2)
1450        try:
1451            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1452                fp.write("print(42)\n")
1453
1454            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1455                fp.write("print(42 * 42)\n")
1456
1457            with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp:
1458                fp.write("bla bla bla\n")
1459
1460            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1461                zipfp.writepy(TESTFN2)
1462
1463                names = zipfp.namelist()
1464                self.assertCompiledIn('mod1.py', names)
1465                self.assertCompiledIn('mod2.py', names)
1466                self.assertNotIn('mod2.txt', names)
1467
1468        finally:
1469            rmtree(TESTFN2)
1470
1471    def test_write_python_directory_filtered(self):
1472        os.mkdir(TESTFN2)
1473        try:
1474            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1475                fp.write("print(42)\n")
1476
1477            with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp:
1478                fp.write("print(42 * 42)\n")
1479
1480            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1481                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
1482                                                  not fn.endswith('mod2.py'))
1483
1484                names = zipfp.namelist()
1485                self.assertCompiledIn('mod1.py', names)
1486                self.assertNotIn('mod2.py', names)
1487
1488        finally:
1489            rmtree(TESTFN2)
1490
1491    def test_write_non_pyfile(self):
1492        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1493            with open(TESTFN, 'w', encoding='utf-8') as f:
1494                f.write('most definitely not a python file')
1495            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
1496            unlink(TESTFN)
1497
1498    def test_write_pyfile_bad_syntax(self):
1499        os.mkdir(TESTFN2)
1500        try:
1501            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1502                fp.write("Bad syntax in python file\n")
1503
1504            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1505                # syntax errors are printed to stdout
1506                with captured_stdout() as s:
1507                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
1508
1509                self.assertIn("SyntaxError", s.getvalue())
1510
1511                # as it will not have compiled the python file, it will
1512                # include the .py file not .pyc
1513                names = zipfp.namelist()
1514                self.assertIn('mod1.py', names)
1515                self.assertNotIn('mod1.pyc', names)
1516
1517        finally:
1518            rmtree(TESTFN2)
1519
1520    def test_write_pathlike(self):
1521        os.mkdir(TESTFN2)
1522        try:
1523            with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp:
1524                fp.write("print(42)\n")
1525
1526            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
1527                zipfp.writepy(FakePath(os.path.join(TESTFN2, "mod1.py")))
1528                names = zipfp.namelist()
1529                self.assertCompiledIn('mod1.py', names)
1530        finally:
1531            rmtree(TESTFN2)
1532
1533
1534class ExtractTests(unittest.TestCase):
1535
1536    def make_test_file(self):
1537        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1538            for fpath, fdata in SMALL_TEST_DATA:
1539                zipfp.writestr(fpath, fdata)
1540
1541    def test_extract(self):
1542        with temp_cwd():
1543            self.make_test_file()
1544            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1545                for fpath, fdata in SMALL_TEST_DATA:
1546                    writtenfile = zipfp.extract(fpath)
1547
1548                    # make sure it was written to the right place
1549                    correctfile = os.path.join(os.getcwd(), fpath)
1550                    correctfile = os.path.normpath(correctfile)
1551
1552                    self.assertEqual(writtenfile, correctfile)
1553
1554                    # make sure correct data is in correct file
1555                    with open(writtenfile, "rb") as f:
1556                        self.assertEqual(fdata.encode(), f.read())
1557
1558                    unlink(writtenfile)
1559
1560    def _test_extract_with_target(self, target):
1561        self.make_test_file()
1562        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1563            for fpath, fdata in SMALL_TEST_DATA:
1564                writtenfile = zipfp.extract(fpath, target)
1565
1566                # make sure it was written to the right place
1567                correctfile = os.path.join(target, fpath)
1568                correctfile = os.path.normpath(correctfile)
1569                self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target))
1570
1571                # make sure correct data is in correct file
1572                with open(writtenfile, "rb") as f:
1573                    self.assertEqual(fdata.encode(), f.read())
1574
1575                unlink(writtenfile)
1576
1577        unlink(TESTFN2)
1578
1579    def test_extract_with_target(self):
1580        with temp_dir() as extdir:
1581            self._test_extract_with_target(extdir)
1582
1583    def test_extract_with_target_pathlike(self):
1584        with temp_dir() as extdir:
1585            self._test_extract_with_target(FakePath(extdir))
1586
1587    def test_extract_all(self):
1588        with temp_cwd():
1589            self.make_test_file()
1590            with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1591                zipfp.extractall()
1592                for fpath, fdata in SMALL_TEST_DATA:
1593                    outfile = os.path.join(os.getcwd(), fpath)
1594
1595                    with open(outfile, "rb") as f:
1596                        self.assertEqual(fdata.encode(), f.read())
1597
1598                    unlink(outfile)
1599
1600    def _test_extract_all_with_target(self, target):
1601        self.make_test_file()
1602        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1603            zipfp.extractall(target)
1604            for fpath, fdata in SMALL_TEST_DATA:
1605                outfile = os.path.join(target, fpath)
1606
1607                with open(outfile, "rb") as f:
1608                    self.assertEqual(fdata.encode(), f.read())
1609
1610                unlink(outfile)
1611
1612        unlink(TESTFN2)
1613
1614    def test_extract_all_with_target(self):
1615        with temp_dir() as extdir:
1616            self._test_extract_all_with_target(extdir)
1617
1618    def test_extract_all_with_target_pathlike(self):
1619        with temp_dir() as extdir:
1620            self._test_extract_all_with_target(FakePath(extdir))
1621
1622    def check_file(self, filename, content):
1623        self.assertTrue(os.path.isfile(filename))
1624        with open(filename, 'rb') as f:
1625            self.assertEqual(f.read(), content)
1626
1627    def test_sanitize_windows_name(self):
1628        san = zipfile.ZipFile._sanitize_windows_name
1629        # Passing pathsep in allows this test to work regardless of platform.
1630        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
1631        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
1632        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
1633        self.assertEqual(san('  /  /foo  /  /ba  r', '/'), r'foo/ba  r')
1634        self.assertEqual(san(' . /. /foo ./ . /. ./ba .r', '/'), r'foo/ba .r')
1635
1636    def test_extract_hackers_arcnames_common_cases(self):
1637        common_hacknames = [
1638            ('../foo/bar', 'foo/bar'),
1639            ('foo/../bar', 'foo/bar'),
1640            ('foo/../../bar', 'foo/bar'),
1641            ('foo/bar/..', 'foo/bar'),
1642            ('./../foo/bar', 'foo/bar'),
1643            ('/foo/bar', 'foo/bar'),
1644            ('/foo/../bar', 'foo/bar'),
1645            ('/foo/../../bar', 'foo/bar'),
1646        ]
1647        self._test_extract_hackers_arcnames(common_hacknames)
1648
1649    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
1650    def test_extract_hackers_arcnames_windows_only(self):
1651        """Test combination of path fixing and windows name sanitization."""
1652        windows_hacknames = [
1653            (r'..\foo\bar', 'foo/bar'),
1654            (r'..\/foo\/bar', 'foo/bar'),
1655            (r'foo/\..\/bar', 'foo/bar'),
1656            (r'foo\/../\bar', 'foo/bar'),
1657            (r'C:foo/bar', 'foo/bar'),
1658            (r'C:/foo/bar', 'foo/bar'),
1659            (r'C://foo/bar', 'foo/bar'),
1660            (r'C:\foo\bar', 'foo/bar'),
1661            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
1662            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
1663            (r'///conky/mountpoint/foo/bar', 'mountpoint/foo/bar'),
1664            (r'\\\conky\mountpoint\foo\bar', 'mountpoint/foo/bar'),
1665            (r'//conky//mountpoint/foo/bar', 'mountpoint/foo/bar'),
1666            (r'\\conky\\mountpoint\foo\bar', 'mountpoint/foo/bar'),
1667            (r'//?/C:/foo/bar', 'foo/bar'),
1668            (r'\\?\C:\foo\bar', 'foo/bar'),
1669            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
1670            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1671            ('../../foo../../ba..r', 'foo/ba..r'),
1672        ]
1673        self._test_extract_hackers_arcnames(windows_hacknames)
1674
1675    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1676    def test_extract_hackers_arcnames_posix_only(self):
1677        posix_hacknames = [
1678            ('//foo/bar', 'foo/bar'),
1679            ('../../foo../../ba..r', 'foo../ba..r'),
1680            (r'foo/..\bar', r'foo/..\bar'),
1681        ]
1682        self._test_extract_hackers_arcnames(posix_hacknames)
1683
1684    def _test_extract_hackers_arcnames(self, hacknames):
1685        for arcname, fixedname in hacknames:
1686            content = b'foobar' + arcname.encode()
1687            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1688                zinfo = zipfile.ZipInfo()
1689                # preserve backslashes
1690                zinfo.filename = arcname
1691                zinfo.external_attr = 0o600 << 16
1692                zipfp.writestr(zinfo, content)
1693
1694            arcname = arcname.replace(os.sep, "/")
1695            targetpath = os.path.join('target', 'subdir', 'subsub')
1696            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1697
1698            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1699                writtenfile = zipfp.extract(arcname, targetpath)
1700                self.assertEqual(writtenfile, correctfile,
1701                                 msg='extract %r: %r != %r' %
1702                                 (arcname, writtenfile, correctfile))
1703            self.check_file(correctfile, content)
1704            rmtree('target')
1705
1706            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1707                zipfp.extractall(targetpath)
1708            self.check_file(correctfile, content)
1709            rmtree('target')
1710
1711            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1712
1713            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1714                writtenfile = zipfp.extract(arcname)
1715                self.assertEqual(writtenfile, correctfile,
1716                                 msg="extract %r" % arcname)
1717            self.check_file(correctfile, content)
1718            rmtree(fixedname.split('/')[0])
1719
1720            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1721                zipfp.extractall()
1722            self.check_file(correctfile, content)
1723            rmtree(fixedname.split('/')[0])
1724
1725            unlink(TESTFN2)
1726
1727
1728class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
1729    testdir = TESTFN
1730
1731    @classmethod
1732    def setUpClass(cls):
1733        p = cls.ar_with_file = TESTFN + '-with-file.zip'
1734        cls.addClassCleanup(unlink, p)
1735        with zipfile.ZipFile(p, 'w') as zipfp:
1736            zipfp.writestr('test', b'newcontent')
1737
1738        p = cls.ar_with_dir = TESTFN + '-with-dir.zip'
1739        cls.addClassCleanup(unlink, p)
1740        with zipfile.ZipFile(p, 'w') as zipfp:
1741            zipfp.mkdir('test')
1742
1743        p = cls.ar_with_implicit_dir = TESTFN + '-with-implicit-dir.zip'
1744        cls.addClassCleanup(unlink, p)
1745        with zipfile.ZipFile(p, 'w') as zipfp:
1746            zipfp.writestr('test/file', b'newcontent')
1747
1748    def open(self, path):
1749        return zipfile.ZipFile(path, 'r')
1750
1751    def extractall(self, ar):
1752        ar.extractall(self.testdir)
1753
1754
1755class OtherTests(unittest.TestCase):
1756    def test_open_via_zip_info(self):
1757        # Create the ZIP archive
1758        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1759            zipfp.writestr("name", "foo")
1760            with self.assertWarns(UserWarning):
1761                zipfp.writestr("name", "bar")
1762            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1763
1764        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1765            infos = zipfp.infolist()
1766            data = b""
1767            for info in infos:
1768                with zipfp.open(info) as zipopen:
1769                    data += zipopen.read()
1770            self.assertIn(data, {b"foobar", b"barfoo"})
1771            data = b""
1772            for info in infos:
1773                data += zipfp.read(info)
1774            self.assertIn(data, {b"foobar", b"barfoo"})
1775
1776    def test_writestr_extended_local_header_issue1202(self):
1777        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1778            for data in 'abcdefghijklmnop':
1779                zinfo = zipfile.ZipInfo(data)
1780                zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR  # Include an extended local header.
1781                orig_zip.writestr(zinfo, data)
1782
1783    def test_close(self):
1784        """Check that the zipfile is closed after the 'with' block."""
1785        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1786            for fpath, fdata in SMALL_TEST_DATA:
1787                zipfp.writestr(fpath, fdata)
1788                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1789        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1790
1791        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1792            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1793        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1794
1795    def test_close_on_exception(self):
1796        """Check that the zipfile is closed if an exception is raised in the
1797        'with' block."""
1798        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1799            for fpath, fdata in SMALL_TEST_DATA:
1800                zipfp.writestr(fpath, fdata)
1801
1802        try:
1803            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1804                raise zipfile.BadZipFile()
1805        except zipfile.BadZipFile:
1806            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1807
1808    def test_unsupported_version(self):
1809        # File has an extract_version of 120
1810        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1811                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1812                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1813                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1814                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1815
1816        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1817                          io.BytesIO(data), 'r')
1818
1819    @requires_zlib()
1820    def test_read_unicode_filenames(self):
1821        # bug #10801
1822        fname = findfile('zip_cp437_header.zip', subdir='archivetestdata')
1823        with zipfile.ZipFile(fname) as zipfp:
1824            for name in zipfp.namelist():
1825                zipfp.open(name).close()
1826
1827    def test_write_unicode_filenames(self):
1828        with zipfile.ZipFile(TESTFN, "w") as zf:
1829            zf.writestr("foo.txt", "Test for unicode filename")
1830            zf.writestr("\xf6.txt", "Test for unicode filename")
1831            self.assertIsInstance(zf.infolist()[0].filename, str)
1832
1833        with zipfile.ZipFile(TESTFN, "r") as zf:
1834            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1835            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1836
1837    def create_zipfile_with_extra_data(self, filename, extra_data_name):
1838        with zipfile.ZipFile(TESTFN, mode='w') as zf:
1839            filename_encoded = filename.encode("utf-8")
1840            # create a ZipInfo object with Unicode path extra field
1841            zip_info = zipfile.ZipInfo(filename)
1842
1843            tag_for_unicode_path = b'\x75\x70'
1844            version_of_unicode_path = b'\x01'
1845
1846            import zlib
1847            filename_crc = struct.pack('<L', zlib.crc32(filename_encoded))
1848
1849            extra_data = version_of_unicode_path + filename_crc + extra_data_name
1850            tsize = len(extra_data).to_bytes(2, 'little')
1851
1852            zip_info.extra = tag_for_unicode_path + tsize + extra_data
1853
1854            # add the file to the ZIP archive
1855            zf.writestr(zip_info, b'Hello World!')
1856
1857    @requires_zlib()
1858    def test_read_zipfile_containing_unicode_path_extra_field(self):
1859        self.create_zipfile_with_extra_data("이름.txt", "이름.txt".encode("utf-8"))
1860        with zipfile.ZipFile(TESTFN, "r") as zf:
1861            self.assertEqual(zf.filelist[0].filename, "이름.txt")
1862
1863    @requires_zlib()
1864    def test_read_zipfile_warning(self):
1865        self.create_zipfile_with_extra_data("이름.txt", b"")
1866        with self.assertWarns(UserWarning):
1867            zipfile.ZipFile(TESTFN, "r").close()
1868
1869    @requires_zlib()
1870    def test_read_zipfile_error(self):
1871        self.create_zipfile_with_extra_data("이름.txt", b"\xff")
1872        with self.assertRaises(zipfile.BadZipfile):
1873            zipfile.ZipFile(TESTFN, "r").close()
1874
1875    def test_read_after_write_unicode_filenames(self):
1876        with zipfile.ZipFile(TESTFN2, 'w') as zipfp:
1877            zipfp.writestr('приклад', b'sample')
1878            self.assertEqual(zipfp.read('приклад'), b'sample')
1879
1880    def test_exclusive_create_zip_file(self):
1881        """Test exclusive creating a new zipfile."""
1882        unlink(TESTFN2)
1883        filename = 'testfile.txt'
1884        content = b'hello, world. this is some content.'
1885        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1886            zipfp.writestr(filename, content)
1887        with self.assertRaises(FileExistsError):
1888            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1889        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1890            self.assertEqual(zipfp.namelist(), [filename])
1891            self.assertEqual(zipfp.read(filename), content)
1892
1893    def test_create_non_existent_file_for_append(self):
1894        if os.path.exists(TESTFN):
1895            os.unlink(TESTFN)
1896
1897        filename = 'testfile.txt'
1898        content = b'hello, world. this is some content.'
1899
1900        try:
1901            with zipfile.ZipFile(TESTFN, 'a') as zf:
1902                zf.writestr(filename, content)
1903        except OSError:
1904            self.fail('Could not append data to a non-existent zip file.')
1905
1906        self.assertTrue(os.path.exists(TESTFN))
1907
1908        with zipfile.ZipFile(TESTFN, 'r') as zf:
1909            self.assertEqual(zf.read(filename), content)
1910
1911    def test_close_erroneous_file(self):
1912        # This test checks that the ZipFile constructor closes the file object
1913        # it opens if there's an error in the file.  If it doesn't, the
1914        # traceback holds a reference to the ZipFile object and, indirectly,
1915        # the file object.
1916        # On Windows, this causes the os.unlink() call to fail because the
1917        # underlying file is still open.  This is SF bug #412214.
1918        #
1919        with open(TESTFN, "w", encoding="utf-8") as fp:
1920            fp.write("this is not a legal zip file\n")
1921        try:
1922            zf = zipfile.ZipFile(TESTFN)
1923        except zipfile.BadZipFile:
1924            pass
1925
1926    def test_is_zip_erroneous_file(self):
1927        """Check that is_zipfile() correctly identifies non-zip files."""
1928        # - passing a filename
1929        with open(TESTFN, "w", encoding='utf-8') as fp:
1930            fp.write("this is not a legal zip file\n")
1931        self.assertFalse(zipfile.is_zipfile(TESTFN))
1932        # - passing a path-like object
1933        self.assertFalse(zipfile.is_zipfile(FakePath(TESTFN)))
1934        # - passing a file object
1935        with open(TESTFN, "rb") as fp:
1936            self.assertFalse(zipfile.is_zipfile(fp))
1937        # - passing a file-like object
1938        fp = io.BytesIO()
1939        fp.write(b"this is not a legal zip file\n")
1940        self.assertFalse(zipfile.is_zipfile(fp))
1941        fp.seek(0, 0)
1942        self.assertFalse(zipfile.is_zipfile(fp))
1943
1944    def test_damaged_zipfile(self):
1945        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1946        # - Create a valid zip file
1947        fp = io.BytesIO()
1948        with zipfile.ZipFile(fp, mode="w") as zipf:
1949            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1950        zipfiledata = fp.getvalue()
1951
1952        # - Now create copies of it missing the last N bytes and make sure
1953        #   a BadZipFile exception is raised when we try to open it
1954        for N in range(len(zipfiledata)):
1955            fp = io.BytesIO(zipfiledata[:N])
1956            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1957
1958    def test_is_zip_valid_file(self):
1959        """Check that is_zipfile() correctly identifies zip files."""
1960        # - passing a filename
1961        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1962            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1963
1964        self.assertTrue(zipfile.is_zipfile(TESTFN))
1965        # - passing a file object
1966        with open(TESTFN, "rb") as fp:
1967            self.assertTrue(zipfile.is_zipfile(fp))
1968            fp.seek(0, 0)
1969            zip_contents = fp.read()
1970        # - passing a file-like object
1971        fp = io.BytesIO()
1972        fp.write(zip_contents)
1973        self.assertTrue(zipfile.is_zipfile(fp))
1974        fp.seek(0, 0)
1975        self.assertTrue(zipfile.is_zipfile(fp))
1976
1977    def test_non_existent_file_raises_OSError(self):
1978        # make sure we don't raise an AttributeError when a partially-constructed
1979        # ZipFile instance is finalized; this tests for regression on SF tracker
1980        # bug #403871.
1981
1982        # The bug we're testing for caused an AttributeError to be raised
1983        # when a ZipFile instance was created for a file that did not
1984        # exist; the .fp member was not initialized but was needed by the
1985        # __del__() method.  Since the AttributeError is in the __del__(),
1986        # it is ignored, but the user should be sufficiently annoyed by
1987        # the message on the output that regression will be noticed
1988        # quickly.
1989        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1990
1991    def test_empty_file_raises_BadZipFile(self):
1992        f = open(TESTFN, 'w', encoding='utf-8')
1993        f.close()
1994        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1995
1996        with open(TESTFN, 'w', encoding='utf-8') as fp:
1997            fp.write("short file")
1998        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1999
2000    def test_negative_central_directory_offset_raises_BadZipFile(self):
2001        # Zip file containing an empty EOCD record
2002        buffer = bytearray(b'PK\x05\x06' + b'\0'*18)
2003
2004        # Set the size of the central directory bytes to become 1,
2005        # causing the central directory offset to become negative
2006        for dirsize in 1, 2**32-1:
2007            buffer[12:16] = struct.pack('<L', dirsize)
2008            f = io.BytesIO(buffer)
2009            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, f)
2010
2011    def test_closed_zip_raises_ValueError(self):
2012        """Verify that testzip() doesn't swallow inappropriate exceptions."""
2013        data = io.BytesIO()
2014        with zipfile.ZipFile(data, mode="w") as zipf:
2015            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2016
2017        # This is correct; calling .read on a closed ZipFile should raise
2018        # a ValueError, and so should calling .testzip.  An earlier
2019        # version of .testzip would swallow this exception (and any other)
2020        # and report that the first file in the archive was corrupt.
2021        self.assertRaises(ValueError, zipf.read, "foo.txt")
2022        self.assertRaises(ValueError, zipf.open, "foo.txt")
2023        self.assertRaises(ValueError, zipf.testzip)
2024        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
2025        with open(TESTFN, 'w', encoding='utf-8') as f:
2026            f.write('zipfile test data')
2027        self.assertRaises(ValueError, zipf.write, TESTFN)
2028
2029    def test_bad_constructor_mode(self):
2030        """Check that bad modes passed to ZipFile constructor are caught."""
2031        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
2032
2033    def test_bad_open_mode(self):
2034        """Check that bad modes passed to ZipFile.open are caught."""
2035        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2036            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2037
2038        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
2039            # read the data to make sure the file is there
2040            zipf.read("foo.txt")
2041            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
2042            # universal newlines support is removed
2043            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
2044            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
2045
2046    def test_read0(self):
2047        """Check that calling read(0) on a ZipExtFile object returns an empty
2048        string and doesn't advance file pointer."""
2049        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2050            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2051            # read the data to make sure the file is there
2052            with zipf.open("foo.txt") as f:
2053                for i in range(FIXEDTEST_SIZE):
2054                    self.assertEqual(f.read(0), b'')
2055
2056                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
2057
2058    def test_open_non_existent_item(self):
2059        """Check that attempting to call open() for an item that doesn't
2060        exist in the archive raises a RuntimeError."""
2061        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2062            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
2063
2064    def test_bad_compression_mode(self):
2065        """Check that bad compression methods passed to ZipFile.open are
2066        caught."""
2067        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
2068
2069    def test_unsupported_compression(self):
2070        # data is declared as shrunk, but actually deflated
2071        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
2072                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
2073                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
2074                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2075                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
2076                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
2077        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
2078            self.assertRaises(NotImplementedError, zipf.open, 'x')
2079
2080    def test_null_byte_in_filename(self):
2081        """Check that a filename containing a null byte is properly
2082        terminated."""
2083        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2084            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
2085            self.assertEqual(zipf.namelist(), ['foo.txt'])
2086
2087    def test_struct_sizes(self):
2088        """Check that ZIP internal structure sizes are calculated correctly."""
2089        self.assertEqual(zipfile.sizeEndCentDir, 22)
2090        self.assertEqual(zipfile.sizeCentralDir, 46)
2091        self.assertEqual(zipfile.sizeEndCentDir64, 56)
2092        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
2093
2094    def test_comments(self):
2095        """Check that comments on the archive are handled properly."""
2096
2097        # check default comment is empty
2098        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2099            self.assertEqual(zipf.comment, b'')
2100            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2101
2102        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2103            self.assertEqual(zipfr.comment, b'')
2104
2105        # check a simple short comment
2106        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
2107        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2108            zipf.comment = comment
2109            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2110        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2111            self.assertEqual(zipf.comment, comment)
2112
2113        # check a comment of max length
2114        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
2115        comment2 = comment2.encode("ascii")
2116        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2117            zipf.comment = comment2
2118            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2119
2120        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2121            self.assertEqual(zipfr.comment, comment2)
2122
2123        # check a comment that is too long is truncated
2124        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
2125            with self.assertWarns(UserWarning):
2126                zipf.comment = comment2 + b'oops'
2127            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2128        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
2129            self.assertEqual(zipfr.comment, comment2)
2130
2131        # check that comments are correctly modified in append mode
2132        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
2133            zipf.comment = b"original comment"
2134            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2135        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
2136            zipf.comment = b"an updated comment"
2137        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
2138            self.assertEqual(zipf.comment, b"an updated comment")
2139
2140        # check that comments are correctly shortened in append mode
2141        # and the file is indeed truncated
2142        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
2143            zipf.comment = b"original comment that's longer"
2144            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2145        original_zip_size = os.path.getsize(TESTFN)
2146        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
2147            zipf.comment = b"shorter comment"
2148        self.assertTrue(original_zip_size > os.path.getsize(TESTFN))
2149        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
2150            self.assertEqual(zipf.comment, b"shorter comment")
2151
2152    def test_unicode_comment(self):
2153        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
2154            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2155            with self.assertRaises(TypeError):
2156                zipf.comment = "this is an error"
2157
2158    def test_change_comment_in_empty_archive(self):
2159        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
2160            self.assertFalse(zipf.filelist)
2161            zipf.comment = b"this is a comment"
2162        with zipfile.ZipFile(TESTFN, "r") as zipf:
2163            self.assertEqual(zipf.comment, b"this is a comment")
2164
2165    def test_change_comment_in_nonempty_archive(self):
2166        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
2167            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
2168        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
2169            self.assertTrue(zipf.filelist)
2170            zipf.comment = b"this is a comment"
2171        with zipfile.ZipFile(TESTFN, "r") as zipf:
2172            self.assertEqual(zipf.comment, b"this is a comment")
2173
2174    def test_empty_zipfile(self):
2175        # Check that creating a file in 'w' or 'a' mode and closing without
2176        # adding any files to the archives creates a valid empty ZIP file
2177        zipf = zipfile.ZipFile(TESTFN, mode="w")
2178        zipf.close()
2179        try:
2180            zipf = zipfile.ZipFile(TESTFN, mode="r")
2181        except zipfile.BadZipFile:
2182            self.fail("Unable to create empty ZIP file in 'w' mode")
2183
2184        zipf = zipfile.ZipFile(TESTFN, mode="a")
2185        zipf.close()
2186        try:
2187            zipf = zipfile.ZipFile(TESTFN, mode="r")
2188        except:
2189            self.fail("Unable to create empty ZIP file in 'a' mode")
2190
2191    def test_open_empty_file(self):
2192        # Issue 1710703: Check that opening a file with less than 22 bytes
2193        # raises a BadZipFile exception (rather than the previously unhelpful
2194        # OSError)
2195        f = open(TESTFN, 'w', encoding='utf-8')
2196        f.close()
2197        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
2198
2199    def test_create_zipinfo_before_1980(self):
2200        self.assertRaises(ValueError,
2201                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
2202
2203    def test_create_empty_zipinfo_repr(self):
2204        """Before bpo-26185, repr() on empty ZipInfo object was failing."""
2205        zi = zipfile.ZipInfo(filename="empty")
2206        self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>")
2207
2208    def test_create_empty_zipinfo_default_attributes(self):
2209        """Ensure all required attributes are set."""
2210        zi = zipfile.ZipInfo()
2211        self.assertEqual(zi.orig_filename, "NoName")
2212        self.assertEqual(zi.filename, "NoName")
2213        self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0))
2214        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2215        self.assertEqual(zi.comment, b"")
2216        self.assertEqual(zi.extra, b"")
2217        self.assertIn(zi.create_system, (0, 3))
2218        self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION)
2219        self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION)
2220        self.assertEqual(zi.reserved, 0)
2221        self.assertEqual(zi.flag_bits, 0)
2222        self.assertEqual(zi.volume, 0)
2223        self.assertEqual(zi.internal_attr, 0)
2224        self.assertEqual(zi.external_attr, 0)
2225
2226        # Before bpo-26185, both were missing
2227        self.assertEqual(zi.file_size, 0)
2228        self.assertEqual(zi.compress_size, 0)
2229
2230    def test_zipfile_with_short_extra_field(self):
2231        """If an extra field in the header is less than 4 bytes, skip it."""
2232        zipdata = (
2233            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
2234            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
2235            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
2236            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
2237            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
2238            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
2239            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
2240        )
2241        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
2242            # testzip returns the name of the first corrupt file, or None
2243            self.assertIsNone(zipf.testzip())
2244
2245    def test_open_conflicting_handles(self):
2246        # It's only possible to open one writable file handle at a time
2247        msg1 = b"It's fun to charter an accountant!"
2248        msg2 = b"And sail the wide accountant sea"
2249        msg3 = b"To find, explore the funds offshore"
2250        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
2251            with zipf.open('foo', mode='w') as w2:
2252                w2.write(msg1)
2253            with zipf.open('bar', mode='w') as w1:
2254                with self.assertRaises(ValueError):
2255                    zipf.open('handle', mode='w')
2256                with self.assertRaises(ValueError):
2257                    zipf.open('foo', mode='r')
2258                with self.assertRaises(ValueError):
2259                    zipf.writestr('str', 'abcde')
2260                with self.assertRaises(ValueError):
2261                    zipf.write(__file__, 'file')
2262                with self.assertRaises(ValueError):
2263                    zipf.close()
2264                w1.write(msg2)
2265            with zipf.open('baz', mode='w') as w2:
2266                w2.write(msg3)
2267
2268        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
2269            self.assertEqual(zipf.read('foo'), msg1)
2270            self.assertEqual(zipf.read('bar'), msg2)
2271            self.assertEqual(zipf.read('baz'), msg3)
2272            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
2273
2274    def test_seek_tell(self):
2275        # Test seek functionality
2276        txt = b"Where's Bruce?"
2277        bloc = txt.find(b"Bruce")
2278        # Check seek on a file
2279        with zipfile.ZipFile(TESTFN, "w") as zipf:
2280            zipf.writestr("foo.txt", txt)
2281        with zipfile.ZipFile(TESTFN, "r") as zipf:
2282            with zipf.open("foo.txt", "r") as fp:
2283                fp.seek(bloc, os.SEEK_SET)
2284                self.assertEqual(fp.tell(), bloc)
2285                fp.seek(-bloc, os.SEEK_CUR)
2286                self.assertEqual(fp.tell(), 0)
2287                fp.seek(bloc, os.SEEK_CUR)
2288                self.assertEqual(fp.tell(), bloc)
2289                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2290                self.assertEqual(fp.tell(), bloc + 5)
2291                fp.seek(0, os.SEEK_END)
2292                self.assertEqual(fp.tell(), len(txt))
2293                fp.seek(0, os.SEEK_SET)
2294                self.assertEqual(fp.tell(), 0)
2295        # Check seek on memory file
2296        data = io.BytesIO()
2297        with zipfile.ZipFile(data, mode="w") as zipf:
2298            zipf.writestr("foo.txt", txt)
2299        with zipfile.ZipFile(data, mode="r") as zipf:
2300            with zipf.open("foo.txt", "r") as fp:
2301                fp.seek(bloc, os.SEEK_SET)
2302                self.assertEqual(fp.tell(), bloc)
2303                fp.seek(-bloc, os.SEEK_CUR)
2304                self.assertEqual(fp.tell(), 0)
2305                fp.seek(bloc, os.SEEK_CUR)
2306                self.assertEqual(fp.tell(), bloc)
2307                self.assertEqual(fp.read(5), txt[bloc:bloc+5])
2308                self.assertEqual(fp.tell(), bloc + 5)
2309                fp.seek(0, os.SEEK_END)
2310                self.assertEqual(fp.tell(), len(txt))
2311                fp.seek(0, os.SEEK_SET)
2312                self.assertEqual(fp.tell(), 0)
2313
2314    def test_read_after_seek(self):
2315        # Issue 102956: Make sure seek(x, os.SEEK_CUR) doesn't break read()
2316        txt = b"Charge men!"
2317        bloc = txt.find(b"men")
2318        with zipfile.ZipFile(TESTFN, "w") as zipf:
2319            zipf.writestr("foo.txt", txt)
2320        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
2321            with zipf.open("foo.txt", "r") as fp:
2322                fp.seek(bloc, os.SEEK_CUR)
2323                self.assertEqual(fp.read(-1), b'men!')
2324        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
2325            with zipf.open("foo.txt", "r") as fp:
2326                fp.read(6)
2327                fp.seek(1, os.SEEK_CUR)
2328                self.assertEqual(fp.read(-1), b'men!')
2329
2330    @requires_bz2()
2331    def test_decompress_without_3rd_party_library(self):
2332        data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2333        zip_file = io.BytesIO(data)
2334        with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf:
2335            zf.writestr('a.txt', b'a')
2336        with mock.patch('zipfile.bz2', None):
2337            with zipfile.ZipFile(zip_file) as zf:
2338                self.assertRaises(RuntimeError, zf.extract, 'a.txt')
2339
2340    @requires_zlib()
2341    def test_full_overlap(self):
2342        data = (
2343            b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e'
2344            b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00a\xed'
2345            b'\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\d\x0b`P'
2346            b'K\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2'
2347            b'\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00'
2348            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00aPK'
2349            b'\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e'
2350            b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00\x00'
2351            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00bPK\x05'
2352            b'\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00\x00/\x00\x00'
2353            b'\x00\x00\x00'
2354        )
2355        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
2356            self.assertEqual(zipf.namelist(), ['a', 'b'])
2357            zi = zipf.getinfo('a')
2358            self.assertEqual(zi.header_offset, 0)
2359            self.assertEqual(zi.compress_size, 16)
2360            self.assertEqual(zi.file_size, 1033)
2361            zi = zipf.getinfo('b')
2362            self.assertEqual(zi.header_offset, 0)
2363            self.assertEqual(zi.compress_size, 16)
2364            self.assertEqual(zi.file_size, 1033)
2365            self.assertEqual(len(zipf.read('a')), 1033)
2366            with self.assertRaisesRegex(zipfile.BadZipFile, 'File name.*differ'):
2367                zipf.read('b')
2368
2369    @requires_zlib()
2370    def test_quoted_overlap(self):
2371        data = (
2372            b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05Y\xfc'
2373            b'8\x044\x00\x00\x00(\x04\x00\x00\x01\x00\x00\x00a\x00'
2374            b'\x1f\x00\xe0\xffPK\x03\x04\x14\x00\x00\x00\x08\x00\xa0l'
2375            b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00'
2376            b'\x00\x00b\xed\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\'
2377            b'd\x0b`PK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0'
2378            b'lH\x05Y\xfc8\x044\x00\x00\x00(\x04\x00\x00\x01'
2379            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
2380            b'\x00aPK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0l'
2381            b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00'
2382            b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00$\x00\x00\x00'
2383            b'bPK\x05\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00'
2384            b'\x00S\x00\x00\x00\x00\x00'
2385        )
2386        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
2387            self.assertEqual(zipf.namelist(), ['a', 'b'])
2388            zi = zipf.getinfo('a')
2389            self.assertEqual(zi.header_offset, 0)
2390            self.assertEqual(zi.compress_size, 52)
2391            self.assertEqual(zi.file_size, 1064)
2392            zi = zipf.getinfo('b')
2393            self.assertEqual(zi.header_offset, 36)
2394            self.assertEqual(zi.compress_size, 16)
2395            self.assertEqual(zi.file_size, 1033)
2396            with self.assertRaisesRegex(zipfile.BadZipFile, 'Overlapped entries'):
2397                zipf.read('a')
2398            self.assertEqual(len(zipf.read('b')), 1033)
2399
2400    def tearDown(self):
2401        unlink(TESTFN)
2402        unlink(TESTFN2)
2403
2404
2405class AbstractBadCrcTests:
2406    def test_testzip_with_bad_crc(self):
2407        """Tests that files with bad CRCs return their name from testzip."""
2408        zipdata = self.zip_with_bad_crc
2409
2410        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2411            # testzip returns the name of the first corrupt file, or None
2412            self.assertEqual('afile', zipf.testzip())
2413
2414    def test_read_with_bad_crc(self):
2415        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
2416        zipdata = self.zip_with_bad_crc
2417
2418        # Using ZipFile.read()
2419        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2420            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
2421
2422        # Using ZipExtFile.read()
2423        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2424            with zipf.open('afile', 'r') as corrupt_file:
2425                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
2426
2427        # Same with small reads (in order to exercise the buffering logic)
2428        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
2429            with zipf.open('afile', 'r') as corrupt_file:
2430                corrupt_file.MIN_READ_SIZE = 2
2431                with self.assertRaises(zipfile.BadZipFile):
2432                    while corrupt_file.read(2):
2433                        pass
2434
2435
2436class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2437    compression = zipfile.ZIP_STORED
2438    zip_with_bad_crc = (
2439        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
2440        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
2441        b'ilehello,AworldP'
2442        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
2443        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
2444        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
2445        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
2446        b'\0\0/\0\0\0\0\0')
2447
2448@requires_zlib()
2449class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2450    compression = zipfile.ZIP_DEFLATED
2451    zip_with_bad_crc = (
2452        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
2453        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2454        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
2455        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
2456        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
2457        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
2458        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
2459        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
2460
2461@requires_bz2()
2462class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2463    compression = zipfile.ZIP_BZIP2
2464    zip_with_bad_crc = (
2465        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
2466        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2467        b'ileBZh91AY&SY\xd4\xa8\xca'
2468        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
2469        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
2470        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
2471        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
2472        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
2473        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
2474        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
2475        b'\x00\x00\x00\x00')
2476
2477@requires_lzma()
2478class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
2479    compression = zipfile.ZIP_LZMA
2480    zip_with_bad_crc = (
2481        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2482        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
2483        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
2484        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
2485        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
2486        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
2487        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
2488        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
2489        b'\x00>\x00\x00\x00\x00\x00')
2490
2491
2492class DecryptionTests(unittest.TestCase):
2493    """Check that ZIP decryption works. Since the library does not
2494    support encryption at the moment, we use a pre-generated encrypted
2495    ZIP file."""
2496
2497    data = (
2498        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
2499        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
2500        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
2501        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
2502        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
2503        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
2504        b'\x00\x00L\x00\x00\x00\x00\x00' )
2505    data2 = (
2506        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
2507        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
2508        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
2509        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
2510        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
2511        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
2512        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
2513        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
2514
2515    plain = b'zipfile.py encryption test'
2516    plain2 = b'\x00'*512
2517
2518    def setUp(self):
2519        with open(TESTFN, "wb") as fp:
2520            fp.write(self.data)
2521        self.zip = zipfile.ZipFile(TESTFN, "r")
2522        with open(TESTFN2, "wb") as fp:
2523            fp.write(self.data2)
2524        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
2525
2526    def tearDown(self):
2527        self.zip.close()
2528        os.unlink(TESTFN)
2529        self.zip2.close()
2530        os.unlink(TESTFN2)
2531
2532    def test_no_password(self):
2533        # Reading the encrypted file without password
2534        # must generate a RunTime exception
2535        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2536        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2537
2538    def test_bad_password(self):
2539        self.zip.setpassword(b"perl")
2540        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
2541        self.zip2.setpassword(b"perl")
2542        self.assertRaises(RuntimeError, self.zip2.read, "zero")
2543
2544    @requires_zlib()
2545    def test_good_password(self):
2546        self.zip.setpassword(b"python")
2547        self.assertEqual(self.zip.read("test.txt"), self.plain)
2548        self.zip2.setpassword(b"12345")
2549        self.assertEqual(self.zip2.read("zero"), self.plain2)
2550
2551    def test_unicode_password(self):
2552        expected_msg = "pwd: expected bytes, got str"
2553
2554        with self.assertRaisesRegex(TypeError, expected_msg):
2555            self.zip.setpassword("unicode")
2556
2557        with self.assertRaisesRegex(TypeError, expected_msg):
2558            self.zip.read("test.txt", "python")
2559
2560        with self.assertRaisesRegex(TypeError, expected_msg):
2561            self.zip.open("test.txt", pwd="python")
2562
2563        with self.assertRaisesRegex(TypeError, expected_msg):
2564            self.zip.extract("test.txt", pwd="python")
2565
2566        with self.assertRaisesRegex(TypeError, expected_msg):
2567            self.zip.pwd = "python"
2568            self.zip.open("test.txt")
2569
2570    def test_seek_tell(self):
2571        self.zip.setpassword(b"python")
2572        txt = self.plain
2573        test_word = b'encryption'
2574        bloc = txt.find(test_word)
2575        bloc_len = len(test_word)
2576        with self.zip.open("test.txt", "r") as fp:
2577            fp.seek(bloc, os.SEEK_SET)
2578            self.assertEqual(fp.tell(), bloc)
2579            fp.seek(-bloc, os.SEEK_CUR)
2580            self.assertEqual(fp.tell(), 0)
2581            fp.seek(bloc, os.SEEK_CUR)
2582            self.assertEqual(fp.tell(), bloc)
2583            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2584
2585            # Make sure that the second read after seeking back beyond
2586            # _readbuffer returns the same content (ie. rewind to the start of
2587            # the file to read forward to the required position).
2588            old_read_size = fp.MIN_READ_SIZE
2589            fp.MIN_READ_SIZE = 1
2590            fp._readbuffer = b''
2591            fp._offset = 0
2592            fp.seek(0, os.SEEK_SET)
2593            self.assertEqual(fp.tell(), 0)
2594            fp.seek(bloc, os.SEEK_CUR)
2595            self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len])
2596            fp.MIN_READ_SIZE = old_read_size
2597
2598            fp.seek(0, os.SEEK_END)
2599            self.assertEqual(fp.tell(), len(txt))
2600            fp.seek(0, os.SEEK_SET)
2601            self.assertEqual(fp.tell(), 0)
2602
2603            # Read the file completely to definitely call any eof integrity
2604            # checks (crc) and make sure they still pass.
2605            fp.read()
2606
2607
2608class AbstractTestsWithRandomBinaryFiles:
2609    @classmethod
2610    def setUpClass(cls):
2611        datacount = randint(16, 64)*1024 + randint(1, 1024)
2612        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
2613                            for i in range(datacount))
2614
2615    def setUp(self):
2616        # Make a source file with some lines
2617        with open(TESTFN, "wb") as fp:
2618            fp.write(self.data)
2619
2620    def tearDown(self):
2621        unlink(TESTFN)
2622        unlink(TESTFN2)
2623
2624    def make_test_archive(self, f, compression):
2625        # Create the ZIP archive
2626        with zipfile.ZipFile(f, "w", compression) as zipfp:
2627            zipfp.write(TESTFN, "another.name")
2628            zipfp.write(TESTFN, TESTFN)
2629
2630    def zip_test(self, f, compression):
2631        self.make_test_archive(f, compression)
2632
2633        # Read the ZIP archive
2634        with zipfile.ZipFile(f, "r", compression) as zipfp:
2635            testdata = zipfp.read(TESTFN)
2636            self.assertEqual(len(testdata), len(self.data))
2637            self.assertEqual(testdata, self.data)
2638            self.assertEqual(zipfp.read("another.name"), self.data)
2639
2640    def test_read(self):
2641        for f in get_files(self):
2642            self.zip_test(f, self.compression)
2643
2644    def zip_open_test(self, f, compression):
2645        self.make_test_archive(f, compression)
2646
2647        # Read the ZIP archive
2648        with zipfile.ZipFile(f, "r", compression) as zipfp:
2649            zipdata1 = []
2650            with zipfp.open(TESTFN) as zipopen1:
2651                while True:
2652                    read_data = zipopen1.read(256)
2653                    if not read_data:
2654                        break
2655                    zipdata1.append(read_data)
2656
2657            zipdata2 = []
2658            with zipfp.open("another.name") as zipopen2:
2659                while True:
2660                    read_data = zipopen2.read(256)
2661                    if not read_data:
2662                        break
2663                    zipdata2.append(read_data)
2664
2665            testdata1 = b''.join(zipdata1)
2666            self.assertEqual(len(testdata1), len(self.data))
2667            self.assertEqual(testdata1, self.data)
2668
2669            testdata2 = b''.join(zipdata2)
2670            self.assertEqual(len(testdata2), len(self.data))
2671            self.assertEqual(testdata2, self.data)
2672
2673    def test_open(self):
2674        for f in get_files(self):
2675            self.zip_open_test(f, self.compression)
2676
2677    def zip_random_open_test(self, f, compression):
2678        self.make_test_archive(f, compression)
2679
2680        # Read the ZIP archive
2681        with zipfile.ZipFile(f, "r", compression) as zipfp:
2682            zipdata1 = []
2683            with zipfp.open(TESTFN) as zipopen1:
2684                while True:
2685                    read_data = zipopen1.read(randint(1, 1024))
2686                    if not read_data:
2687                        break
2688                    zipdata1.append(read_data)
2689
2690            testdata = b''.join(zipdata1)
2691            self.assertEqual(len(testdata), len(self.data))
2692            self.assertEqual(testdata, self.data)
2693
2694    def test_random_open(self):
2695        for f in get_files(self):
2696            self.zip_random_open_test(f, self.compression)
2697
2698
2699class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2700                                       unittest.TestCase):
2701    compression = zipfile.ZIP_STORED
2702
2703@requires_zlib()
2704class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2705                                        unittest.TestCase):
2706    compression = zipfile.ZIP_DEFLATED
2707
2708@requires_bz2()
2709class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2710                                      unittest.TestCase):
2711    compression = zipfile.ZIP_BZIP2
2712
2713@requires_lzma()
2714class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
2715                                     unittest.TestCase):
2716    compression = zipfile.ZIP_LZMA
2717
2718
2719# Provide the tell() method but not seek()
2720class Tellable:
2721    def __init__(self, fp):
2722        self.fp = fp
2723        self.offset = 0
2724
2725    def write(self, data):
2726        n = self.fp.write(data)
2727        self.offset += n
2728        return n
2729
2730    def tell(self):
2731        return self.offset
2732
2733    def flush(self):
2734        self.fp.flush()
2735
2736class Unseekable:
2737    def __init__(self, fp):
2738        self.fp = fp
2739
2740    def write(self, data):
2741        return self.fp.write(data)
2742
2743    def flush(self):
2744        self.fp.flush()
2745
2746class UnseekableTests(unittest.TestCase):
2747    def test_writestr(self):
2748        for wrapper in (lambda f: f), Tellable, Unseekable:
2749            with self.subTest(wrapper=wrapper):
2750                f = io.BytesIO()
2751                f.write(b'abc')
2752                bf = io.BufferedWriter(f)
2753                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2754                    zipfp.writestr('ones', b'111')
2755                    zipfp.writestr('twos', b'222')
2756                self.assertEqual(f.getvalue()[:5], b'abcPK')
2757                with zipfile.ZipFile(f, mode='r') as zipf:
2758                    with zipf.open('ones') as zopen:
2759                        self.assertEqual(zopen.read(), b'111')
2760                    with zipf.open('twos') as zopen:
2761                        self.assertEqual(zopen.read(), b'222')
2762
2763    def test_write(self):
2764        for wrapper in (lambda f: f), Tellable, Unseekable:
2765            with self.subTest(wrapper=wrapper):
2766                f = io.BytesIO()
2767                f.write(b'abc')
2768                bf = io.BufferedWriter(f)
2769                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
2770                    self.addCleanup(unlink, TESTFN)
2771                    with open(TESTFN, 'wb') as f2:
2772                        f2.write(b'111')
2773                    zipfp.write(TESTFN, 'ones')
2774                    with open(TESTFN, 'wb') as f2:
2775                        f2.write(b'222')
2776                    zipfp.write(TESTFN, 'twos')
2777                self.assertEqual(f.getvalue()[:5], b'abcPK')
2778                with zipfile.ZipFile(f, mode='r') as zipf:
2779                    with zipf.open('ones') as zopen:
2780                        self.assertEqual(zopen.read(), b'111')
2781                    with zipf.open('twos') as zopen:
2782                        self.assertEqual(zopen.read(), b'222')
2783
2784    def test_open_write(self):
2785        for wrapper in (lambda f: f), Tellable, Unseekable:
2786            with self.subTest(wrapper=wrapper):
2787                f = io.BytesIO()
2788                f.write(b'abc')
2789                bf = io.BufferedWriter(f)
2790                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
2791                    with zipf.open('ones', 'w') as zopen:
2792                        zopen.write(b'111')
2793                    with zipf.open('twos', 'w') as zopen:
2794                        zopen.write(b'222')
2795                self.assertEqual(f.getvalue()[:5], b'abcPK')
2796                with zipfile.ZipFile(f) as zipf:
2797                    self.assertEqual(zipf.read('ones'), b'111')
2798                    self.assertEqual(zipf.read('twos'), b'222')
2799
2800
2801@requires_zlib()
2802class TestsWithMultipleOpens(unittest.TestCase):
2803    @classmethod
2804    def setUpClass(cls):
2805        cls.data1 = b'111' + randbytes(10000)
2806        cls.data2 = b'222' + randbytes(10000)
2807
2808    def make_test_archive(self, f):
2809        # Create the ZIP archive
2810        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
2811            zipfp.writestr('ones', self.data1)
2812            zipfp.writestr('twos', self.data2)
2813
2814    def test_same_file(self):
2815        # Verify that (when the ZipFile is in control of creating file objects)
2816        # multiple open() calls can be made without interfering with each other.
2817        for f in get_files(self):
2818            self.make_test_archive(f)
2819            with zipfile.ZipFile(f, mode="r") as zipf:
2820                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
2821                    data1 = zopen1.read(500)
2822                    data2 = zopen2.read(500)
2823                    data1 += zopen1.read()
2824                    data2 += zopen2.read()
2825                self.assertEqual(data1, data2)
2826                self.assertEqual(data1, self.data1)
2827
2828    def test_different_file(self):
2829        # Verify that (when the ZipFile is in control of creating file objects)
2830        # multiple open() calls can be made without interfering with each other.
2831        for f in get_files(self):
2832            self.make_test_archive(f)
2833            with zipfile.ZipFile(f, mode="r") as zipf:
2834                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
2835                    data1 = zopen1.read(500)
2836                    data2 = zopen2.read(500)
2837                    data1 += zopen1.read()
2838                    data2 += zopen2.read()
2839                self.assertEqual(data1, self.data1)
2840                self.assertEqual(data2, self.data2)
2841
2842    def test_interleaved(self):
2843        # Verify that (when the ZipFile is in control of creating file objects)
2844        # multiple open() calls can be made without interfering with each other.
2845        for f in get_files(self):
2846            self.make_test_archive(f)
2847            with zipfile.ZipFile(f, mode="r") as zipf:
2848                with zipf.open('ones') as zopen1:
2849                    data1 = zopen1.read(500)
2850                    with zipf.open('twos') as zopen2:
2851                        data2 = zopen2.read(500)
2852                        data1 += zopen1.read()
2853                        data2 += zopen2.read()
2854                self.assertEqual(data1, self.data1)
2855                self.assertEqual(data2, self.data2)
2856
2857    def test_read_after_close(self):
2858        for f in get_files(self):
2859            self.make_test_archive(f)
2860            with contextlib.ExitStack() as stack:
2861                with zipfile.ZipFile(f, 'r') as zipf:
2862                    zopen1 = stack.enter_context(zipf.open('ones'))
2863                    zopen2 = stack.enter_context(zipf.open('twos'))
2864                data1 = zopen1.read(500)
2865                data2 = zopen2.read(500)
2866                data1 += zopen1.read()
2867                data2 += zopen2.read()
2868            self.assertEqual(data1, self.data1)
2869            self.assertEqual(data2, self.data2)
2870
2871    def test_read_after_write(self):
2872        for f in get_files(self):
2873            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
2874                zipf.writestr('ones', self.data1)
2875                zipf.writestr('twos', self.data2)
2876                with zipf.open('ones') as zopen1:
2877                    data1 = zopen1.read(500)
2878            self.assertEqual(data1, self.data1[:500])
2879            with zipfile.ZipFile(f, 'r') as zipf:
2880                data1 = zipf.read('ones')
2881                data2 = zipf.read('twos')
2882            self.assertEqual(data1, self.data1)
2883            self.assertEqual(data2, self.data2)
2884
2885    def test_write_after_read(self):
2886        for f in get_files(self):
2887            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
2888                zipf.writestr('ones', self.data1)
2889                with zipf.open('ones') as zopen1:
2890                    zopen1.read(500)
2891                    zipf.writestr('twos', self.data2)
2892            with zipfile.ZipFile(f, 'r') as zipf:
2893                data1 = zipf.read('ones')
2894                data2 = zipf.read('twos')
2895            self.assertEqual(data1, self.data1)
2896            self.assertEqual(data2, self.data2)
2897
2898    def test_many_opens(self):
2899        # Verify that read() and open() promptly close the file descriptor,
2900        # and don't rely on the garbage collector to free resources.
2901        startcount = fd_count()
2902        self.make_test_archive(TESTFN2)
2903        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
2904            for x in range(100):
2905                zipf.read('ones')
2906                with zipf.open('ones') as zopen1:
2907                    pass
2908        self.assertEqual(startcount, fd_count())
2909
2910    def test_write_while_reading(self):
2911        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
2912            zipf.writestr('ones', self.data1)
2913        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
2914            with zipf.open('ones', 'r') as r1:
2915                data1 = r1.read(500)
2916                with zipf.open('twos', 'w') as w1:
2917                    w1.write(self.data2)
2918                data1 += r1.read()
2919        self.assertEqual(data1, self.data1)
2920        with zipfile.ZipFile(TESTFN2) as zipf:
2921            self.assertEqual(zipf.read('twos'), self.data2)
2922
2923    def tearDown(self):
2924        unlink(TESTFN2)
2925
2926
2927class TestWithDirectory(unittest.TestCase):
2928    def setUp(self):
2929        os.mkdir(TESTFN2)
2930
2931    def test_extract_dir(self):
2932        with zipfile.ZipFile(findfile("zipdir.zip", subdir="archivetestdata")) as zipf:
2933            zipf.extractall(TESTFN2)
2934        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2935        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2936        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
2937
2938    def test_bug_6050(self):
2939        # Extraction should succeed if directories already exist
2940        os.mkdir(os.path.join(TESTFN2, "a"))
2941        self.test_extract_dir()
2942
2943    def test_extract_dir_backslash(self):
2944        zfname = findfile("zipdir_backslash.zip", subdir="archivetestdata")
2945        with zipfile.ZipFile(zfname) as zipf:
2946            zipf.extractall(TESTFN2)
2947        if os.name == 'nt':
2948            self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
2949            self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
2950            self.assertTrue(os.path.isfile(os.path.join(TESTFN2, "a", "b", "c")))
2951            self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "d")))
2952            self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "d", "e")))
2953        else:
2954            self.assertTrue(os.path.isfile(os.path.join(TESTFN2, "a\\b\\c")))
2955            self.assertTrue(os.path.isfile(os.path.join(TESTFN2, "d\\e\\")))
2956            self.assertFalse(os.path.exists(os.path.join(TESTFN2, "a")))
2957            self.assertFalse(os.path.exists(os.path.join(TESTFN2, "d")))
2958
2959    def test_write_dir(self):
2960        dirpath = os.path.join(TESTFN2, "x")
2961        os.mkdir(dirpath)
2962        mode = os.stat(dirpath).st_mode & 0xFFFF
2963        with zipfile.ZipFile(TESTFN, "w") as zipf:
2964            zipf.write(dirpath)
2965            zinfo = zipf.filelist[0]
2966            self.assertTrue(zinfo.filename.endswith("/x/"))
2967            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2968            zipf.write(dirpath, "y")
2969            zinfo = zipf.filelist[1]
2970            self.assertTrue(zinfo.filename, "y/")
2971            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2972        with zipfile.ZipFile(TESTFN, "r") as zipf:
2973            zinfo = zipf.filelist[0]
2974            self.assertTrue(zinfo.filename.endswith("/x/"))
2975            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2976            zinfo = zipf.filelist[1]
2977            self.assertTrue(zinfo.filename, "y/")
2978            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2979            target = os.path.join(TESTFN2, "target")
2980            os.mkdir(target)
2981            zipf.extractall(target)
2982            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2983            self.assertEqual(len(os.listdir(target)), 2)
2984
2985    def test_writestr_dir(self):
2986        os.mkdir(os.path.join(TESTFN2, "x"))
2987        with zipfile.ZipFile(TESTFN, "w") as zipf:
2988            zipf.writestr("x/", b'')
2989            zinfo = zipf.filelist[0]
2990            self.assertEqual(zinfo.filename, "x/")
2991            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2992        with zipfile.ZipFile(TESTFN, "r") as zipf:
2993            zinfo = zipf.filelist[0]
2994            self.assertTrue(zinfo.filename.endswith("x/"))
2995            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2996            target = os.path.join(TESTFN2, "target")
2997            os.mkdir(target)
2998            zipf.extractall(target)
2999            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
3000            self.assertEqual(os.listdir(target), ["x"])
3001
3002    def test_mkdir(self):
3003        with zipfile.ZipFile(TESTFN, "w") as zf:
3004            zf.mkdir("directory")
3005            zinfo = zf.filelist[0]
3006            self.assertEqual(zinfo.filename, "directory/")
3007            self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
3008
3009            zf.mkdir("directory2/")
3010            zinfo = zf.filelist[1]
3011            self.assertEqual(zinfo.filename, "directory2/")
3012            self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
3013
3014            zf.mkdir("directory3", mode=0o777)
3015            zinfo = zf.filelist[2]
3016            self.assertEqual(zinfo.filename, "directory3/")
3017            self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10)
3018
3019            old_zinfo = zipfile.ZipInfo("directory4/")
3020            old_zinfo.external_attr = (0o40777 << 16) | 0x10
3021            old_zinfo.CRC = 0
3022            old_zinfo.file_size = 0
3023            old_zinfo.compress_size = 0
3024            zf.mkdir(old_zinfo)
3025            new_zinfo = zf.filelist[3]
3026            self.assertEqual(old_zinfo.filename, "directory4/")
3027            self.assertEqual(old_zinfo.external_attr, new_zinfo.external_attr)
3028
3029            target = os.path.join(TESTFN2, "target")
3030            os.mkdir(target)
3031            zf.extractall(target)
3032            self.assertEqual(set(os.listdir(target)), {"directory", "directory2", "directory3", "directory4"})
3033
3034    def test_create_directory_with_write(self):
3035        with zipfile.ZipFile(TESTFN, "w") as zf:
3036            zf.writestr(zipfile.ZipInfo('directory/'), '')
3037
3038            zinfo = zf.filelist[0]
3039            self.assertEqual(zinfo.filename, "directory/")
3040
3041            directory = os.path.join(TESTFN2, "directory2")
3042            os.mkdir(directory)
3043            mode = os.stat(directory).st_mode & 0xFFFF
3044            zf.write(directory, arcname="directory2/")
3045            zinfo = zf.filelist[1]
3046            self.assertEqual(zinfo.filename, "directory2/")
3047            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
3048
3049            target = os.path.join(TESTFN2, "target")
3050            os.mkdir(target)
3051            zf.extractall(target)
3052
3053            self.assertEqual(set(os.listdir(target)), {"directory", "directory2"})
3054
3055    def test_root_folder_in_zipfile(self):
3056        """
3057        gh-112795: Some tools or self constructed codes will add '/' folder to
3058        the zip file, this is a strange behavior, but we should support it.
3059        """
3060        in_memory_file = io.BytesIO()
3061        zf = zipfile.ZipFile(in_memory_file, "w")
3062        zf.mkdir('/')
3063        zf.writestr('./a.txt', 'aaa')
3064        zf.extractall(TESTFN2)
3065
3066    def tearDown(self):
3067        rmtree(TESTFN2)
3068        if os.path.exists(TESTFN):
3069            unlink(TESTFN)
3070
3071
3072class ZipInfoTests(unittest.TestCase):
3073    def test_from_file(self):
3074        zi = zipfile.ZipInfo.from_file(__file__)
3075        self.assertEqual(posixpath.basename(zi.filename), 'test_core.py')
3076        self.assertFalse(zi.is_dir())
3077        self.assertEqual(zi.file_size, os.path.getsize(__file__))
3078
3079    def test_from_file_pathlike(self):
3080        zi = zipfile.ZipInfo.from_file(FakePath(__file__))
3081        self.assertEqual(posixpath.basename(zi.filename), 'test_core.py')
3082        self.assertFalse(zi.is_dir())
3083        self.assertEqual(zi.file_size, os.path.getsize(__file__))
3084
3085    def test_from_file_bytes(self):
3086        zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test')
3087        self.assertEqual(posixpath.basename(zi.filename), 'test')
3088        self.assertFalse(zi.is_dir())
3089        self.assertEqual(zi.file_size, os.path.getsize(__file__))
3090
3091    def test_from_file_fileno(self):
3092        with open(__file__, 'rb') as f:
3093            zi = zipfile.ZipInfo.from_file(f.fileno(), 'test')
3094            self.assertEqual(posixpath.basename(zi.filename), 'test')
3095            self.assertFalse(zi.is_dir())
3096            self.assertEqual(zi.file_size, os.path.getsize(__file__))
3097
3098    def test_from_dir(self):
3099        dirpath = os.path.dirname(os.path.abspath(__file__))
3100        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
3101        self.assertEqual(zi.filename, 'stdlib_tests/')
3102        self.assertTrue(zi.is_dir())
3103        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
3104        self.assertEqual(zi.file_size, 0)
3105
3106    def test_compresslevel_property(self):
3107        zinfo = zipfile.ZipInfo("xxx")
3108        self.assertFalse(zinfo._compresslevel)
3109        self.assertFalse(zinfo.compress_level)
3110        zinfo._compresslevel = 99  # test the legacy @property.setter
3111        self.assertEqual(zinfo.compress_level, 99)
3112        self.assertEqual(zinfo._compresslevel, 99)
3113        zinfo.compress_level = 8
3114        self.assertEqual(zinfo.compress_level, 8)
3115        self.assertEqual(zinfo._compresslevel, 8)
3116
3117
3118class CommandLineTest(unittest.TestCase):
3119
3120    def zipfilecmd(self, *args, **kwargs):
3121        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
3122                                                      **kwargs)
3123        return out.replace(os.linesep.encode(), b'\n')
3124
3125    def zipfilecmd_failure(self, *args):
3126        return script_helper.assert_python_failure('-m', 'zipfile', *args)
3127
3128    def test_bad_use(self):
3129        rc, out, err = self.zipfilecmd_failure()
3130        self.assertEqual(out, b'')
3131        self.assertIn(b'usage', err.lower())
3132        self.assertIn(b'error', err.lower())
3133        self.assertIn(b'required', err.lower())
3134        rc, out, err = self.zipfilecmd_failure('-l', '')
3135        self.assertEqual(out, b'')
3136        self.assertNotEqual(err.strip(), b'')
3137
3138    def test_test_command(self):
3139        zip_name = findfile('zipdir.zip', subdir='archivetestdata')
3140        for opt in '-t', '--test':
3141            out = self.zipfilecmd(opt, zip_name)
3142            self.assertEqual(out.rstrip(), b'Done testing')
3143        zip_name = findfile('testtar.tar')
3144        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
3145        self.assertEqual(out, b'')
3146
3147    def test_list_command(self):
3148        zip_name = findfile('zipdir.zip', subdir='archivetestdata')
3149        t = io.StringIO()
3150        with zipfile.ZipFile(zip_name, 'r') as tf:
3151            tf.printdir(t)
3152        expected = t.getvalue().encode('ascii', 'backslashreplace')
3153        for opt in '-l', '--list':
3154            out = self.zipfilecmd(opt, zip_name,
3155                                  PYTHONIOENCODING='ascii:backslashreplace')
3156            self.assertEqual(out, expected)
3157
3158    @requires_zlib()
3159    def test_create_command(self):
3160        self.addCleanup(unlink, TESTFN)
3161        with open(TESTFN, 'w', encoding='utf-8') as f:
3162            f.write('test 1')
3163        os.mkdir(TESTFNDIR)
3164        self.addCleanup(rmtree, TESTFNDIR)
3165        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f:
3166            f.write('test 2')
3167        files = [TESTFN, TESTFNDIR]
3168        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
3169        for opt in '-c', '--create':
3170            try:
3171                out = self.zipfilecmd(opt, TESTFN2, *files)
3172                self.assertEqual(out, b'')
3173                with zipfile.ZipFile(TESTFN2) as zf:
3174                    self.assertEqual(zf.namelist(), namelist)
3175                    self.assertEqual(zf.read(namelist[0]), b'test 1')
3176                    self.assertEqual(zf.read(namelist[2]), b'test 2')
3177            finally:
3178                unlink(TESTFN2)
3179
3180    def test_extract_command(self):
3181        zip_name = findfile('zipdir.zip', subdir='archivetestdata')
3182        for opt in '-e', '--extract':
3183            with temp_dir() as extdir:
3184                out = self.zipfilecmd(opt, zip_name, extdir)
3185                self.assertEqual(out, b'')
3186                with zipfile.ZipFile(zip_name) as zf:
3187                    for zi in zf.infolist():
3188                        path = os.path.join(extdir,
3189                                    zi.filename.replace('/', os.sep))
3190                        if zi.is_dir():
3191                            self.assertTrue(os.path.isdir(path))
3192                        else:
3193                            self.assertTrue(os.path.isfile(path))
3194                            with open(path, 'rb') as f:
3195                                self.assertEqual(f.read(), zf.read(zi))
3196
3197
3198class TestExecutablePrependedZip(unittest.TestCase):
3199    """Test our ability to open zip files with an executable prepended."""
3200
3201    def setUp(self):
3202        self.exe_zip = findfile('exe_with_zip', subdir='archivetestdata')
3203        self.exe_zip64 = findfile('exe_with_z64', subdir='archivetestdata')
3204
3205    def _test_zip_works(self, name):
3206        # bpo28494 sanity check: ensure is_zipfile works on these.
3207        self.assertTrue(zipfile.is_zipfile(name),
3208                        f'is_zipfile failed on {name}')
3209        # Ensure we can operate on these via ZipFile.
3210        with zipfile.ZipFile(name) as zipfp:
3211            for n in zipfp.namelist():
3212                data = zipfp.read(n)
3213                self.assertIn(b'FAVORITE_NUMBER', data)
3214
3215    def test_read_zip_with_exe_prepended(self):
3216        self._test_zip_works(self.exe_zip)
3217
3218    def test_read_zip64_with_exe_prepended(self):
3219        self._test_zip_works(self.exe_zip64)
3220
3221    @unittest.skipUnless(sys.executable, 'sys.executable required.')
3222    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
3223                         'Test relies on #!/bin/bash working.')
3224    @requires_subprocess()
3225    def test_execute_zip2(self):
3226        output = subprocess.check_output([self.exe_zip, sys.executable])
3227        self.assertIn(b'number in executable: 5', output)
3228
3229    @unittest.skipUnless(sys.executable, 'sys.executable required.')
3230    @unittest.skipUnless(os.access('/bin/bash', os.X_OK),
3231                         'Test relies on #!/bin/bash working.')
3232    @requires_subprocess()
3233    def test_execute_zip64(self):
3234        output = subprocess.check_output([self.exe_zip64, sys.executable])
3235        self.assertIn(b'number in executable: 5', output)
3236
3237
3238class EncodedMetadataTests(unittest.TestCase):
3239    file_names = ['\u4e00', '\u4e8c', '\u4e09']  # Han 'one', 'two', 'three'
3240    file_content = [
3241        "This is pure ASCII.\n".encode('ascii'),
3242        # This is modern Japanese. (UTF-8)
3243        "\u3053\u308c\u306f\u73fe\u4ee3\u7684\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('utf-8'),
3244        # This is obsolete Japanese. (Shift JIS)
3245        "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'),
3246    ]
3247
3248    def setUp(self):
3249        self.addCleanup(unlink, TESTFN)
3250        # Create .zip of 3 members with Han names encoded in Shift JIS.
3251        # Each name is 1 Han character encoding to 2 bytes in Shift JIS.
3252        # The ASCII names are arbitrary as long as they are length 2 and
3253        # not otherwise contained in the zip file.
3254        # Data elements are encoded bytes (ascii, utf-8, shift_jis).
3255        placeholders = ["n1", "n2"] + self.file_names[2:]
3256        with zipfile.ZipFile(TESTFN, mode="w") as tf:
3257            for temp, content in zip(placeholders, self.file_content):
3258                tf.writestr(temp, content, zipfile.ZIP_STORED)
3259        # Hack in the Shift JIS names with flag bit 11 (UTF-8) unset.
3260        with open(TESTFN, "rb") as tf:
3261            data = tf.read()
3262        for name, temp in zip(self.file_names, placeholders[:2]):
3263            data = data.replace(temp.encode('ascii'),
3264                                name.encode('shift_jis'))
3265        with open(TESTFN, "wb") as tf:
3266            tf.write(data)
3267
3268    def _test_read(self, zipfp, expected_names, expected_content):
3269        # Check the namelist
3270        names = zipfp.namelist()
3271        self.assertEqual(sorted(names), sorted(expected_names))
3272
3273        # Check infolist
3274        infos = zipfp.infolist()
3275        names = [zi.filename for zi in infos]
3276        self.assertEqual(sorted(names), sorted(expected_names))
3277
3278        # check getinfo
3279        for name, content in zip(expected_names, expected_content):
3280            info = zipfp.getinfo(name)
3281            self.assertEqual(info.filename, name)
3282            self.assertEqual(info.file_size, len(content))
3283            self.assertEqual(zipfp.read(name), content)
3284
3285    def test_read_with_metadata_encoding(self):
3286        # Read the ZIP archive with correct metadata_encoding
3287        with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp:
3288            self._test_read(zipfp, self.file_names, self.file_content)
3289
3290    def test_read_without_metadata_encoding(self):
3291        # Read the ZIP archive without metadata_encoding
3292        expected_names = [name.encode('shift_jis').decode('cp437')
3293                          for name in self.file_names[:2]] + self.file_names[2:]
3294        with zipfile.ZipFile(TESTFN, "r") as zipfp:
3295            self._test_read(zipfp, expected_names, self.file_content)
3296
3297    def test_read_with_incorrect_metadata_encoding(self):
3298        # Read the ZIP archive with incorrect metadata_encoding
3299        expected_names = [name.encode('shift_jis').decode('koi8-u')
3300                          for name in self.file_names[:2]] + self.file_names[2:]
3301        with zipfile.ZipFile(TESTFN, "r", metadata_encoding='koi8-u') as zipfp:
3302            self._test_read(zipfp, expected_names, self.file_content)
3303
3304    def test_read_with_unsuitable_metadata_encoding(self):
3305        # Read the ZIP archive with metadata_encoding unsuitable for
3306        # decoding metadata
3307        with self.assertRaises(UnicodeDecodeError):
3308            zipfile.ZipFile(TESTFN, "r", metadata_encoding='ascii')
3309        with self.assertRaises(UnicodeDecodeError):
3310            zipfile.ZipFile(TESTFN, "r", metadata_encoding='utf-8')
3311
3312    def test_read_after_append(self):
3313        newname = '\u56db'  # Han 'four'
3314        expected_names = [name.encode('shift_jis').decode('cp437')
3315                          for name in self.file_names[:2]] + self.file_names[2:]
3316        expected_names.append(newname)
3317        expected_content = (*self.file_content, b"newcontent")
3318
3319        with zipfile.ZipFile(TESTFN, "a") as zipfp:
3320            zipfp.writestr(newname, "newcontent")
3321            self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names))
3322
3323        with zipfile.ZipFile(TESTFN, "r") as zipfp:
3324            self._test_read(zipfp, expected_names, expected_content)
3325
3326        with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp:
3327            self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names))
3328            for i, (name, content) in enumerate(zip(expected_names, expected_content)):
3329                info = zipfp.getinfo(name)
3330                self.assertEqual(info.filename, name)
3331                self.assertEqual(info.file_size, len(content))
3332                if i < 2:
3333                    with self.assertRaises(zipfile.BadZipFile):
3334                        zipfp.read(name)
3335                else:
3336                    self.assertEqual(zipfp.read(name), content)
3337
3338    def test_write_with_metadata_encoding(self):
3339        ZF = zipfile.ZipFile
3340        for mode in ("w", "x", "a"):
3341            with self.assertRaisesRegex(ValueError,
3342                                        "^metadata_encoding is only"):
3343                ZF("nonesuch.zip", mode, metadata_encoding="shift_jis")
3344
3345    def test_cli_with_metadata_encoding(self):
3346        errmsg = "Non-conforming encodings not supported with -c."
3347        args = ["--metadata-encoding=shift_jis", "-c", "nonesuch", "nonesuch"]
3348        with captured_stdout() as stdout:
3349            with captured_stderr() as stderr:
3350                self.assertRaises(SystemExit, zipfile.main, args)
3351        self.assertEqual(stdout.getvalue(), "")
3352        self.assertIn(errmsg, stderr.getvalue())
3353
3354        with captured_stdout() as stdout:
3355            zipfile.main(["--metadata-encoding=shift_jis", "-t", TESTFN])
3356        listing = stdout.getvalue()
3357
3358        with captured_stdout() as stdout:
3359            zipfile.main(["--metadata-encoding=shift_jis", "-l", TESTFN])
3360        listing = stdout.getvalue()
3361        for name in self.file_names:
3362            self.assertIn(name, listing)
3363
3364    def test_cli_with_metadata_encoding_extract(self):
3365        os.mkdir(TESTFN2)
3366        self.addCleanup(rmtree, TESTFN2)
3367        # Depending on locale, extracted file names can be not encodable
3368        # with the filesystem encoding.
3369        for fn in self.file_names:
3370            try:
3371                os.stat(os.path.join(TESTFN2, fn))
3372            except OSError:
3373                pass
3374            except UnicodeEncodeError:
3375                self.skipTest(f'cannot encode file name {fn!r}')
3376
3377        zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2])
3378        listing = os.listdir(TESTFN2)
3379        for name in self.file_names:
3380            self.assertIn(name, listing)
3381
3382
3383class StripExtraTests(unittest.TestCase):
3384    # Note: all of the "z" characters are technically invalid, but up
3385    # to 3 bytes at the end of the extra will be passed through as they
3386    # are too short to encode a valid extra.
3387
3388    ZIP64_EXTRA = 1
3389
3390    def test_no_data(self):
3391        s = struct.Struct("<HH")
3392        a = s.pack(self.ZIP64_EXTRA, 0)
3393        b = s.pack(2, 0)
3394        c = s.pack(3, 0)
3395
3396        self.assertEqual(b'', zipfile._Extra.strip(a, (self.ZIP64_EXTRA,)))
3397        self.assertEqual(b, zipfile._Extra.strip(b, (self.ZIP64_EXTRA,)))
3398        self.assertEqual(
3399            b+b"z", zipfile._Extra.strip(b+b"z", (self.ZIP64_EXTRA,)))
3400
3401        self.assertEqual(b+c, zipfile._Extra.strip(a+b+c, (self.ZIP64_EXTRA,)))
3402        self.assertEqual(b+c, zipfile._Extra.strip(b+a+c, (self.ZIP64_EXTRA,)))
3403        self.assertEqual(b+c, zipfile._Extra.strip(b+c+a, (self.ZIP64_EXTRA,)))
3404
3405    def test_with_data(self):
3406        s = struct.Struct("<HH")
3407        a = s.pack(self.ZIP64_EXTRA, 1) + b"a"
3408        b = s.pack(2, 2) + b"bb"
3409        c = s.pack(3, 3) + b"ccc"
3410
3411        self.assertEqual(b"", zipfile._Extra.strip(a, (self.ZIP64_EXTRA,)))
3412        self.assertEqual(b, zipfile._Extra.strip(b, (self.ZIP64_EXTRA,)))
3413        self.assertEqual(
3414            b+b"z", zipfile._Extra.strip(b+b"z", (self.ZIP64_EXTRA,)))
3415
3416        self.assertEqual(b+c, zipfile._Extra.strip(a+b+c, (self.ZIP64_EXTRA,)))
3417        self.assertEqual(b+c, zipfile._Extra.strip(b+a+c, (self.ZIP64_EXTRA,)))
3418        self.assertEqual(b+c, zipfile._Extra.strip(b+c+a, (self.ZIP64_EXTRA,)))
3419
3420    def test_multiples(self):
3421        s = struct.Struct("<HH")
3422        a = s.pack(self.ZIP64_EXTRA, 1) + b"a"
3423        b = s.pack(2, 2) + b"bb"
3424
3425        self.assertEqual(b"", zipfile._Extra.strip(a+a, (self.ZIP64_EXTRA,)))
3426        self.assertEqual(b"", zipfile._Extra.strip(a+a+a, (self.ZIP64_EXTRA,)))
3427        self.assertEqual(
3428            b"z", zipfile._Extra.strip(a+a+b"z", (self.ZIP64_EXTRA,)))
3429        self.assertEqual(
3430            b+b"z", zipfile._Extra.strip(a+a+b+b"z", (self.ZIP64_EXTRA,)))
3431
3432        self.assertEqual(b, zipfile._Extra.strip(a+a+b, (self.ZIP64_EXTRA,)))
3433        self.assertEqual(b, zipfile._Extra.strip(a+b+a, (self.ZIP64_EXTRA,)))
3434        self.assertEqual(b, zipfile._Extra.strip(b+a+a, (self.ZIP64_EXTRA,)))
3435
3436    def test_too_short(self):
3437        self.assertEqual(b"", zipfile._Extra.strip(b"", (self.ZIP64_EXTRA,)))
3438        self.assertEqual(b"z", zipfile._Extra.strip(b"z", (self.ZIP64_EXTRA,)))
3439        self.assertEqual(
3440            b"zz", zipfile._Extra.strip(b"zz", (self.ZIP64_EXTRA,)))
3441        self.assertEqual(
3442            b"zzz", zipfile._Extra.strip(b"zzz", (self.ZIP64_EXTRA,)))
3443
3444
3445if __name__ == "__main__":
3446    unittest.main()
3447