• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import io
3import os
4import importlib.util
5import posixpath
6import time
7import struct
8import zipfile
9import unittest
10
11
12from tempfile import TemporaryFile
13from random import randint, random, getrandbits
14
15from test.support import script_helper
16from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir,
17                          requires_zlib, requires_bz2, requires_lzma,
18                          captured_stdout, check_warnings)
19
20TESTFN2 = TESTFN + "2"
21TESTFNDIR = TESTFN + "d"
22FIXEDTEST_SIZE = 1000
23DATAFILES_DIR = 'zipfile_datafiles'
24
25SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'),
26                   ('ziptest2dir/_ziptest2', 'qawsedrftg'),
27                   ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'),
28                   ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')]
29
30def getrandbytes(size):
31    return getrandbits(8 * size).to_bytes(size, 'little')
32
33def get_files(test):
34    yield TESTFN2
35    with TemporaryFile() as f:
36        yield f
37        test.assertFalse(f.closed)
38    with io.BytesIO() as f:
39        yield f
40        test.assertFalse(f.closed)
41
42class AbstractTestsWithSourceFile:
43    @classmethod
44    def setUpClass(cls):
45        cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" %
46                              (i, random()), "ascii")
47                        for i in range(FIXEDTEST_SIZE)]
48        cls.data = b''.join(cls.line_gen)
49
50    def setUp(self):
51        # Make a source file with some lines
52        with open(TESTFN, "wb") as fp:
53            fp.write(self.data)
54
55    def make_test_archive(self, f, compression):
56        # Create the ZIP archive
57        with zipfile.ZipFile(f, "w", compression) as zipfp:
58            zipfp.write(TESTFN, "another.name")
59            zipfp.write(TESTFN, TESTFN)
60            zipfp.writestr("strfile", self.data)
61            with zipfp.open('written-open-w', mode='w') as f:
62                for line in self.line_gen:
63                    f.write(line)
64
65    def zip_test(self, f, compression):
66        self.make_test_archive(f, compression)
67
68        # Read the ZIP archive
69        with zipfile.ZipFile(f, "r", compression) as zipfp:
70            self.assertEqual(zipfp.read(TESTFN), self.data)
71            self.assertEqual(zipfp.read("another.name"), self.data)
72            self.assertEqual(zipfp.read("strfile"), self.data)
73
74            # Print the ZIP directory
75            fp = io.StringIO()
76            zipfp.printdir(file=fp)
77            directory = fp.getvalue()
78            lines = directory.splitlines()
79            self.assertEqual(len(lines), 5) # Number of files + header
80
81            self.assertIn('File Name', lines[0])
82            self.assertIn('Modified', lines[0])
83            self.assertIn('Size', lines[0])
84
85            fn, date, time_, size = lines[1].split()
86            self.assertEqual(fn, 'another.name')
87            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
88            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
89            self.assertEqual(size, str(len(self.data)))
90
91            # Check the namelist
92            names = zipfp.namelist()
93            self.assertEqual(len(names), 4)
94            self.assertIn(TESTFN, names)
95            self.assertIn("another.name", names)
96            self.assertIn("strfile", names)
97            self.assertIn("written-open-w", names)
98
99            # Check infolist
100            infos = zipfp.infolist()
101            names = [i.filename for i in infos]
102            self.assertEqual(len(names), 4)
103            self.assertIn(TESTFN, names)
104            self.assertIn("another.name", names)
105            self.assertIn("strfile", names)
106            self.assertIn("written-open-w", names)
107            for i in infos:
108                self.assertEqual(i.file_size, len(self.data))
109
110            # check getinfo
111            for nm in (TESTFN, "another.name", "strfile", "written-open-w"):
112                info = zipfp.getinfo(nm)
113                self.assertEqual(info.filename, nm)
114                self.assertEqual(info.file_size, len(self.data))
115
116            # Check that testzip doesn't raise an exception
117            zipfp.testzip()
118
119    def test_basic(self):
120        for f in get_files(self):
121            self.zip_test(f, self.compression)
122
123    def zip_open_test(self, f, compression):
124        self.make_test_archive(f, compression)
125
126        # Read the ZIP archive
127        with zipfile.ZipFile(f, "r", compression) as zipfp:
128            zipdata1 = []
129            with zipfp.open(TESTFN) as zipopen1:
130                while True:
131                    read_data = zipopen1.read(256)
132                    if not read_data:
133                        break
134                    zipdata1.append(read_data)
135
136            zipdata2 = []
137            with zipfp.open("another.name") as zipopen2:
138                while True:
139                    read_data = zipopen2.read(256)
140                    if not read_data:
141                        break
142                    zipdata2.append(read_data)
143
144            self.assertEqual(b''.join(zipdata1), self.data)
145            self.assertEqual(b''.join(zipdata2), self.data)
146
147    def test_open(self):
148        for f in get_files(self):
149            self.zip_open_test(f, self.compression)
150
151    def zip_random_open_test(self, f, compression):
152        self.make_test_archive(f, compression)
153
154        # Read the ZIP archive
155        with zipfile.ZipFile(f, "r", compression) as zipfp:
156            zipdata1 = []
157            with zipfp.open(TESTFN) as zipopen1:
158                while True:
159                    read_data = zipopen1.read(randint(1, 1024))
160                    if not read_data:
161                        break
162                    zipdata1.append(read_data)
163
164            self.assertEqual(b''.join(zipdata1), self.data)
165
166    def test_random_open(self):
167        for f in get_files(self):
168            self.zip_random_open_test(f, self.compression)
169
170    def zip_read1_test(self, f, compression):
171        self.make_test_archive(f, compression)
172
173        # Read the ZIP archive
174        with zipfile.ZipFile(f, "r") as zipfp, \
175             zipfp.open(TESTFN) as zipopen:
176            zipdata = []
177            while True:
178                read_data = zipopen.read1(-1)
179                if not read_data:
180                    break
181                zipdata.append(read_data)
182
183        self.assertEqual(b''.join(zipdata), self.data)
184
185    def test_read1(self):
186        for f in get_files(self):
187            self.zip_read1_test(f, self.compression)
188
189    def zip_read1_10_test(self, f, compression):
190        self.make_test_archive(f, compression)
191
192        # Read the ZIP archive
193        with zipfile.ZipFile(f, "r") as zipfp, \
194             zipfp.open(TESTFN) as zipopen:
195            zipdata = []
196            while True:
197                read_data = zipopen.read1(10)
198                self.assertLessEqual(len(read_data), 10)
199                if not read_data:
200                    break
201                zipdata.append(read_data)
202
203        self.assertEqual(b''.join(zipdata), self.data)
204
205    def test_read1_10(self):
206        for f in get_files(self):
207            self.zip_read1_10_test(f, self.compression)
208
209    def zip_readline_read_test(self, f, compression):
210        self.make_test_archive(f, compression)
211
212        # Read the ZIP archive
213        with zipfile.ZipFile(f, "r") as zipfp, \
214             zipfp.open(TESTFN) as zipopen:
215            data = b''
216            while True:
217                read = zipopen.readline()
218                if not read:
219                    break
220                data += read
221
222                read = zipopen.read(100)
223                if not read:
224                    break
225                data += read
226
227        self.assertEqual(data, self.data)
228
229    def test_readline_read(self):
230        # Issue #7610: calls to readline() interleaved with calls to read().
231        for f in get_files(self):
232            self.zip_readline_read_test(f, self.compression)
233
234    def zip_readline_test(self, f, compression):
235        self.make_test_archive(f, compression)
236
237        # Read the ZIP archive
238        with zipfile.ZipFile(f, "r") as zipfp:
239            with zipfp.open(TESTFN) as zipopen:
240                for line in self.line_gen:
241                    linedata = zipopen.readline()
242                    self.assertEqual(linedata, line)
243
244    def test_readline(self):
245        for f in get_files(self):
246            self.zip_readline_test(f, self.compression)
247
248    def zip_readlines_test(self, f, compression):
249        self.make_test_archive(f, compression)
250
251        # Read the ZIP archive
252        with zipfile.ZipFile(f, "r") as zipfp:
253            with zipfp.open(TESTFN) as zipopen:
254                ziplines = zipopen.readlines()
255            for line, zipline in zip(self.line_gen, ziplines):
256                self.assertEqual(zipline, line)
257
258    def test_readlines(self):
259        for f in get_files(self):
260            self.zip_readlines_test(f, self.compression)
261
262    def zip_iterlines_test(self, f, compression):
263        self.make_test_archive(f, compression)
264
265        # Read the ZIP archive
266        with zipfile.ZipFile(f, "r") as zipfp:
267            with zipfp.open(TESTFN) as zipopen:
268                for line, zipline in zip(self.line_gen, zipopen):
269                    self.assertEqual(zipline, line)
270
271    def test_iterlines(self):
272        for f in get_files(self):
273            self.zip_iterlines_test(f, self.compression)
274
275    def test_low_compression(self):
276        """Check for cases where compressed data is larger than original."""
277        # Create the ZIP archive
278        with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp:
279            zipfp.writestr("strfile", '12')
280
281        # Get an open object for strfile
282        with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp:
283            with zipfp.open("strfile") as openobj:
284                self.assertEqual(openobj.read(1), b'1')
285                self.assertEqual(openobj.read(1), b'2')
286
287    def test_writestr_compression(self):
288        zipfp = zipfile.ZipFile(TESTFN2, "w")
289        zipfp.writestr("b.txt", "hello world", compress_type=self.compression)
290        info = zipfp.getinfo('b.txt')
291        self.assertEqual(info.compress_type, self.compression)
292
293    def test_read_return_size(self):
294        # Issue #9837: ZipExtFile.read() shouldn't return more bytes
295        # than requested.
296        for test_size in (1, 4095, 4096, 4097, 16384):
297            file_size = test_size + 1
298            junk = getrandbytes(file_size)
299            with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf:
300                zipf.writestr('foo', junk)
301                with zipf.open('foo', 'r') as fp:
302                    buf = fp.read(test_size)
303                    self.assertEqual(len(buf), test_size)
304
305    def test_truncated_zipfile(self):
306        fp = io.BytesIO()
307        with zipfile.ZipFile(fp, mode='w') as zipf:
308            zipf.writestr('strfile', self.data, compress_type=self.compression)
309            end_offset = fp.tell()
310        zipfiledata = fp.getvalue()
311
312        fp = io.BytesIO(zipfiledata)
313        with zipfile.ZipFile(fp) as zipf:
314            with zipf.open('strfile') as zipopen:
315                fp.truncate(end_offset - 20)
316                with self.assertRaises(EOFError):
317                    zipopen.read()
318
319        fp = io.BytesIO(zipfiledata)
320        with zipfile.ZipFile(fp) as zipf:
321            with zipf.open('strfile') as zipopen:
322                fp.truncate(end_offset - 20)
323                with self.assertRaises(EOFError):
324                    while zipopen.read(100):
325                        pass
326
327        fp = io.BytesIO(zipfiledata)
328        with zipfile.ZipFile(fp) as zipf:
329            with zipf.open('strfile') as zipopen:
330                fp.truncate(end_offset - 20)
331                with self.assertRaises(EOFError):
332                    while zipopen.read1(100):
333                        pass
334
335    def test_repr(self):
336        fname = 'file.name'
337        for f in get_files(self):
338            with zipfile.ZipFile(f, 'w', self.compression) as zipfp:
339                zipfp.write(TESTFN, fname)
340                r = repr(zipfp)
341                self.assertIn("mode='w'", r)
342
343            with zipfile.ZipFile(f, 'r') as zipfp:
344                r = repr(zipfp)
345                if isinstance(f, str):
346                    self.assertIn('filename=%r' % f, r)
347                else:
348                    self.assertIn('file=%r' % f, r)
349                self.assertIn("mode='r'", r)
350                r = repr(zipfp.getinfo(fname))
351                self.assertIn('filename=%r' % fname, r)
352                self.assertIn('filemode=', r)
353                self.assertIn('file_size=', r)
354                if self.compression != zipfile.ZIP_STORED:
355                    self.assertIn('compress_type=', r)
356                    self.assertIn('compress_size=', r)
357                with zipfp.open(fname) as zipopen:
358                    r = repr(zipopen)
359                    self.assertIn('name=%r' % fname, r)
360                    self.assertIn("mode='r'", r)
361                    if self.compression != zipfile.ZIP_STORED:
362                        self.assertIn('compress_type=', r)
363                self.assertIn('[closed]', repr(zipopen))
364            self.assertIn('[closed]', repr(zipfp))
365
366    def tearDown(self):
367        unlink(TESTFN)
368        unlink(TESTFN2)
369
370
371class StoredTestsWithSourceFile(AbstractTestsWithSourceFile,
372                                unittest.TestCase):
373    compression = zipfile.ZIP_STORED
374    test_low_compression = None
375
376    def zip_test_writestr_permissions(self, f, compression):
377        # Make sure that writestr and open(... mode='w') create files with
378        # mode 0600, when they are passed a name rather than a ZipInfo
379        # instance.
380
381        self.make_test_archive(f, compression)
382        with zipfile.ZipFile(f, "r") as zipfp:
383            zinfo = zipfp.getinfo('strfile')
384            self.assertEqual(zinfo.external_attr, 0o600 << 16)
385
386            zinfo2 = zipfp.getinfo('written-open-w')
387            self.assertEqual(zinfo2.external_attr, 0o600 << 16)
388
389    def test_writestr_permissions(self):
390        for f in get_files(self):
391            self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
392
393    def test_absolute_arcnames(self):
394        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
395            zipfp.write(TESTFN, "/absolute")
396
397        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
398            self.assertEqual(zipfp.namelist(), ["absolute"])
399
400    def test_append_to_zip_file(self):
401        """Test appending to an existing zipfile."""
402        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
403            zipfp.write(TESTFN, TESTFN)
404
405        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
406            zipfp.writestr("strfile", self.data)
407            self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
408
409    def test_append_to_non_zip_file(self):
410        """Test appending to an existing file that is not a zipfile."""
411        # NOTE: this test fails if len(d) < 22 because of the first
412        # line "fpin.seek(-22, 2)" in _EndRecData
413        data = b'I am not a ZipFile!'*10
414        with open(TESTFN2, 'wb') as f:
415            f.write(data)
416
417        with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
418            zipfp.write(TESTFN, TESTFN)
419
420        with open(TESTFN2, 'rb') as f:
421            f.seek(len(data))
422            with zipfile.ZipFile(f, "r") as zipfp:
423                self.assertEqual(zipfp.namelist(), [TESTFN])
424                self.assertEqual(zipfp.read(TESTFN), self.data)
425        with open(TESTFN2, 'rb') as f:
426            self.assertEqual(f.read(len(data)), data)
427            zipfiledata = f.read()
428        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
429            self.assertEqual(zipfp.namelist(), [TESTFN])
430            self.assertEqual(zipfp.read(TESTFN), self.data)
431
432    def test_read_concatenated_zip_file(self):
433        with io.BytesIO() as bio:
434            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
435                zipfp.write(TESTFN, TESTFN)
436            zipfiledata = bio.getvalue()
437        data = b'I am not a ZipFile!'*10
438        with open(TESTFN2, 'wb') as f:
439            f.write(data)
440            f.write(zipfiledata)
441
442        with zipfile.ZipFile(TESTFN2) as zipfp:
443            self.assertEqual(zipfp.namelist(), [TESTFN])
444            self.assertEqual(zipfp.read(TESTFN), self.data)
445
446    def test_append_to_concatenated_zip_file(self):
447        with io.BytesIO() as bio:
448            with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp:
449                zipfp.write(TESTFN, TESTFN)
450            zipfiledata = bio.getvalue()
451        data = b'I am not a ZipFile!'*1000000
452        with open(TESTFN2, 'wb') as f:
453            f.write(data)
454            f.write(zipfiledata)
455
456        with zipfile.ZipFile(TESTFN2, 'a') as zipfp:
457            self.assertEqual(zipfp.namelist(), [TESTFN])
458            zipfp.writestr('strfile', self.data)
459
460        with open(TESTFN2, 'rb') as f:
461            self.assertEqual(f.read(len(data)), data)
462            zipfiledata = f.read()
463        with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp:
464            self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile'])
465            self.assertEqual(zipfp.read(TESTFN), self.data)
466            self.assertEqual(zipfp.read('strfile'), self.data)
467
468    def test_ignores_newline_at_end(self):
469        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
470            zipfp.write(TESTFN, TESTFN)
471        with open(TESTFN2, 'a') as f:
472            f.write("\r\n\00\00\00")
473        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
474            self.assertIsInstance(zipfp, zipfile.ZipFile)
475
476    def test_ignores_stuff_appended_past_comments(self):
477        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
478            zipfp.comment = b"this is a comment"
479            zipfp.write(TESTFN, TESTFN)
480        with open(TESTFN2, 'a') as f:
481            f.write("abcdef\r\n")
482        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
483            self.assertIsInstance(zipfp, zipfile.ZipFile)
484            self.assertEqual(zipfp.comment, b"this is a comment")
485
486    def test_write_default_name(self):
487        """Check that calling ZipFile.write without arcname specified
488        produces the expected result."""
489        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
490            zipfp.write(TESTFN)
491            with open(TESTFN, "rb") as f:
492                self.assertEqual(zipfp.read(TESTFN), f.read())
493
494    def test_write_to_readonly(self):
495        """Check that trying to call write() on a readonly ZipFile object
496        raises a ValueError."""
497        with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
498            zipfp.writestr("somefile.txt", "bogus")
499
500        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
501            self.assertRaises(ValueError, zipfp.write, TESTFN)
502
503        with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
504            with self.assertRaises(ValueError):
505                zipfp.open(TESTFN, mode='w')
506
507    def test_add_file_before_1980(self):
508        # Set atime and mtime to 1970-01-01
509        os.utime(TESTFN, (0, 0))
510        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
511            self.assertRaises(ValueError, zipfp.write, TESTFN)
512
513
514@requires_zlib
515class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile,
516                                 unittest.TestCase):
517    compression = zipfile.ZIP_DEFLATED
518
519    def test_per_file_compression(self):
520        """Check that files within a Zip archive can have different
521        compression options."""
522        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
523            zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
524            zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
525            sinfo = zipfp.getinfo('storeme')
526            dinfo = zipfp.getinfo('deflateme')
527            self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
528            self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
529
530@requires_bz2
531class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile,
532                               unittest.TestCase):
533    compression = zipfile.ZIP_BZIP2
534
535@requires_lzma
536class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile,
537                              unittest.TestCase):
538    compression = zipfile.ZIP_LZMA
539
540
541class AbstractTestZip64InSmallFiles:
542    # These tests test the ZIP64 functionality without using large files,
543    # see test_zipfile64 for proper tests.
544
545    @classmethod
546    def setUpClass(cls):
547        line_gen = (bytes("Test of zipfile line %d." % i, "ascii")
548                    for i in range(0, FIXEDTEST_SIZE))
549        cls.data = b'\n'.join(line_gen)
550
551    def setUp(self):
552        self._limit = zipfile.ZIP64_LIMIT
553        self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT
554        zipfile.ZIP64_LIMIT = 1000
555        zipfile.ZIP_FILECOUNT_LIMIT = 9
556
557        # Make a source file with some lines
558        with open(TESTFN, "wb") as fp:
559            fp.write(self.data)
560
561    def zip_test(self, f, compression):
562        # Create the ZIP archive
563        with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
564            zipfp.write(TESTFN, "another.name")
565            zipfp.write(TESTFN, TESTFN)
566            zipfp.writestr("strfile", self.data)
567
568        # Read the ZIP archive
569        with zipfile.ZipFile(f, "r", compression) as zipfp:
570            self.assertEqual(zipfp.read(TESTFN), self.data)
571            self.assertEqual(zipfp.read("another.name"), self.data)
572            self.assertEqual(zipfp.read("strfile"), self.data)
573
574            # Print the ZIP directory
575            fp = io.StringIO()
576            zipfp.printdir(fp)
577
578            directory = fp.getvalue()
579            lines = directory.splitlines()
580            self.assertEqual(len(lines), 4) # Number of files + header
581
582            self.assertIn('File Name', lines[0])
583            self.assertIn('Modified', lines[0])
584            self.assertIn('Size', lines[0])
585
586            fn, date, time_, size = lines[1].split()
587            self.assertEqual(fn, 'another.name')
588            self.assertTrue(time.strptime(date, '%Y-%m-%d'))
589            self.assertTrue(time.strptime(time_, '%H:%M:%S'))
590            self.assertEqual(size, str(len(self.data)))
591
592            # Check the namelist
593            names = zipfp.namelist()
594            self.assertEqual(len(names), 3)
595            self.assertIn(TESTFN, names)
596            self.assertIn("another.name", names)
597            self.assertIn("strfile", names)
598
599            # Check infolist
600            infos = zipfp.infolist()
601            names = [i.filename for i in infos]
602            self.assertEqual(len(names), 3)
603            self.assertIn(TESTFN, names)
604            self.assertIn("another.name", names)
605            self.assertIn("strfile", names)
606            for i in infos:
607                self.assertEqual(i.file_size, len(self.data))
608
609            # check getinfo
610            for nm in (TESTFN, "another.name", "strfile"):
611                info = zipfp.getinfo(nm)
612                self.assertEqual(info.filename, nm)
613                self.assertEqual(info.file_size, len(self.data))
614
615            # Check that testzip doesn't raise an exception
616            zipfp.testzip()
617
618    def test_basic(self):
619        for f in get_files(self):
620            self.zip_test(f, self.compression)
621
622    def test_too_many_files(self):
623        # This test checks that more than 64k files can be added to an archive,
624        # and that the resulting archive can be read properly by ZipFile
625        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
626                               allowZip64=True)
627        zipf.debug = 100
628        numfiles = 15
629        for i in range(numfiles):
630            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
631        self.assertEqual(len(zipf.namelist()), numfiles)
632        zipf.close()
633
634        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
635        self.assertEqual(len(zipf2.namelist()), numfiles)
636        for i in range(numfiles):
637            content = zipf2.read("foo%08d" % i).decode('ascii')
638            self.assertEqual(content, "%d" % (i**3 % 57))
639        zipf2.close()
640
641    def test_too_many_files_append(self):
642        zipf = zipfile.ZipFile(TESTFN, "w", self.compression,
643                               allowZip64=False)
644        zipf.debug = 100
645        numfiles = 9
646        for i in range(numfiles):
647            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
648        self.assertEqual(len(zipf.namelist()), numfiles)
649        with self.assertRaises(zipfile.LargeZipFile):
650            zipf.writestr("foo%08d" % numfiles, b'')
651        self.assertEqual(len(zipf.namelist()), numfiles)
652        zipf.close()
653
654        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
655                               allowZip64=False)
656        zipf.debug = 100
657        self.assertEqual(len(zipf.namelist()), numfiles)
658        with self.assertRaises(zipfile.LargeZipFile):
659            zipf.writestr("foo%08d" % numfiles, b'')
660        self.assertEqual(len(zipf.namelist()), numfiles)
661        zipf.close()
662
663        zipf = zipfile.ZipFile(TESTFN, "a", self.compression,
664                               allowZip64=True)
665        zipf.debug = 100
666        self.assertEqual(len(zipf.namelist()), numfiles)
667        numfiles2 = 15
668        for i in range(numfiles, numfiles2):
669            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
670        self.assertEqual(len(zipf.namelist()), numfiles2)
671        zipf.close()
672
673        zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression)
674        self.assertEqual(len(zipf2.namelist()), numfiles2)
675        for i in range(numfiles2):
676            content = zipf2.read("foo%08d" % i).decode('ascii')
677            self.assertEqual(content, "%d" % (i**3 % 57))
678        zipf2.close()
679
680    def tearDown(self):
681        zipfile.ZIP64_LIMIT = self._limit
682        zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit
683        unlink(TESTFN)
684        unlink(TESTFN2)
685
686
687class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
688                                  unittest.TestCase):
689    compression = zipfile.ZIP_STORED
690
691    def large_file_exception_test(self, f, compression):
692        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
693            self.assertRaises(zipfile.LargeZipFile,
694                              zipfp.write, TESTFN, "another.name")
695
696    def large_file_exception_test2(self, f, compression):
697        with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp:
698            self.assertRaises(zipfile.LargeZipFile,
699                              zipfp.writestr, "another.name", self.data)
700
701    def test_large_file_exception(self):
702        for f in get_files(self):
703            self.large_file_exception_test(f, zipfile.ZIP_STORED)
704            self.large_file_exception_test2(f, zipfile.ZIP_STORED)
705
706    def test_absolute_arcnames(self):
707        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
708                             allowZip64=True) as zipfp:
709            zipfp.write(TESTFN, "/absolute")
710
711        with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
712            self.assertEqual(zipfp.namelist(), ["absolute"])
713
714@requires_zlib
715class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
716                                   unittest.TestCase):
717    compression = zipfile.ZIP_DEFLATED
718
719@requires_bz2
720class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
721                                 unittest.TestCase):
722    compression = zipfile.ZIP_BZIP2
723
724@requires_lzma
725class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles,
726                                unittest.TestCase):
727    compression = zipfile.ZIP_LZMA
728
729
730class PyZipFileTests(unittest.TestCase):
731    def assertCompiledIn(self, name, namelist):
732        if name + 'o' not in namelist:
733            self.assertIn(name + 'c', namelist)
734
735    def requiresWriteAccess(self, path):
736        # effective_ids unavailable on windows
737        if not os.access(path, os.W_OK,
738                         effective_ids=os.access in os.supports_effective_ids):
739            self.skipTest('requires write access to the installed location')
740        filename = os.path.join(path, 'test_zipfile.try')
741        try:
742            fd = os.open(filename, os.O_WRONLY | os.O_CREAT)
743            os.close(fd)
744        except Exception:
745            self.skipTest('requires write access to the installed location')
746        unlink(filename)
747
748    def test_write_pyfile(self):
749        self.requiresWriteAccess(os.path.dirname(__file__))
750        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
751            fn = __file__
752            if fn.endswith('.pyc'):
753                path_split = fn.split(os.sep)
754                if os.altsep is not None:
755                    path_split.extend(fn.split(os.altsep))
756                if '__pycache__' in path_split:
757                    fn = importlib.util.source_from_cache(fn)
758                else:
759                    fn = fn[:-1]
760
761            zipfp.writepy(fn)
762
763            bn = os.path.basename(fn)
764            self.assertNotIn(bn, zipfp.namelist())
765            self.assertCompiledIn(bn, zipfp.namelist())
766
767        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
768            fn = __file__
769            if fn.endswith('.pyc'):
770                fn = fn[:-1]
771
772            zipfp.writepy(fn, "testpackage")
773
774            bn = "%s/%s" % ("testpackage", os.path.basename(fn))
775            self.assertNotIn(bn, zipfp.namelist())
776            self.assertCompiledIn(bn, zipfp.namelist())
777
778    def test_write_python_package(self):
779        import email
780        packagedir = os.path.dirname(email.__file__)
781        self.requiresWriteAccess(packagedir)
782
783        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
784            zipfp.writepy(packagedir)
785
786            # Check for a couple of modules at different levels of the
787            # hierarchy
788            names = zipfp.namelist()
789            self.assertCompiledIn('email/__init__.py', names)
790            self.assertCompiledIn('email/mime/text.py', names)
791
792    def test_write_filtered_python_package(self):
793        import test
794        packagedir = os.path.dirname(test.__file__)
795        self.requiresWriteAccess(packagedir)
796
797        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
798
799            # first make sure that the test folder gives error messages
800            # (on the badsyntax_... files)
801            with captured_stdout() as reportSIO:
802                zipfp.writepy(packagedir)
803            reportStr = reportSIO.getvalue()
804            self.assertTrue('SyntaxError' in reportStr)
805
806            # then check that the filter works on the whole package
807            with captured_stdout() as reportSIO:
808                zipfp.writepy(packagedir, filterfunc=lambda whatever: False)
809            reportStr = reportSIO.getvalue()
810            self.assertTrue('SyntaxError' not in reportStr)
811
812            # then check that the filter works on individual files
813            def filter(path):
814                return not os.path.basename(path).startswith("bad")
815            with captured_stdout() as reportSIO, self.assertWarns(UserWarning):
816                zipfp.writepy(packagedir, filterfunc=filter)
817            reportStr = reportSIO.getvalue()
818            if reportStr:
819                print(reportStr)
820            self.assertTrue('SyntaxError' not in reportStr)
821
822    def test_write_with_optimization(self):
823        import email
824        packagedir = os.path.dirname(email.__file__)
825        self.requiresWriteAccess(packagedir)
826        optlevel = 1 if __debug__ else 0
827        ext = '.pyc'
828
829        with TemporaryFile() as t, \
830             zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp:
831            zipfp.writepy(packagedir)
832
833            names = zipfp.namelist()
834            self.assertIn('email/__init__' + ext, names)
835            self.assertIn('email/mime/text' + ext, names)
836
837    def test_write_python_directory(self):
838        os.mkdir(TESTFN2)
839        try:
840            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
841                fp.write("print(42)\n")
842
843            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
844                fp.write("print(42 * 42)\n")
845
846            with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp:
847                fp.write("bla bla bla\n")
848
849            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
850                zipfp.writepy(TESTFN2)
851
852                names = zipfp.namelist()
853                self.assertCompiledIn('mod1.py', names)
854                self.assertCompiledIn('mod2.py', names)
855                self.assertNotIn('mod2.txt', names)
856
857        finally:
858            rmtree(TESTFN2)
859
860    def test_write_python_directory_filtered(self):
861        os.mkdir(TESTFN2)
862        try:
863            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
864                fp.write("print(42)\n")
865
866            with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp:
867                fp.write("print(42 * 42)\n")
868
869            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
870                zipfp.writepy(TESTFN2, filterfunc=lambda fn:
871                                                  not fn.endswith('mod2.py'))
872
873                names = zipfp.namelist()
874                self.assertCompiledIn('mod1.py', names)
875                self.assertNotIn('mod2.py', names)
876
877        finally:
878            rmtree(TESTFN2)
879
880    def test_write_non_pyfile(self):
881        with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
882            with open(TESTFN, 'w') as f:
883                f.write('most definitely not a python file')
884            self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
885            unlink(TESTFN)
886
887    def test_write_pyfile_bad_syntax(self):
888        os.mkdir(TESTFN2)
889        try:
890            with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp:
891                fp.write("Bad syntax in python file\n")
892
893            with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp:
894                # syntax errors are printed to stdout
895                with captured_stdout() as s:
896                    zipfp.writepy(os.path.join(TESTFN2, "mod1.py"))
897
898                self.assertIn("SyntaxError", s.getvalue())
899
900                # as it will not have compiled the python file, it will
901                # include the .py file not .pyc
902                names = zipfp.namelist()
903                self.assertIn('mod1.py', names)
904                self.assertNotIn('mod1.pyc', names)
905
906        finally:
907            rmtree(TESTFN2)
908
909
910class ExtractTests(unittest.TestCase):
911    def test_extract(self):
912        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
913            for fpath, fdata in SMALL_TEST_DATA:
914                zipfp.writestr(fpath, fdata)
915
916        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
917            for fpath, fdata in SMALL_TEST_DATA:
918                writtenfile = zipfp.extract(fpath)
919
920                # make sure it was written to the right place
921                correctfile = os.path.join(os.getcwd(), fpath)
922                correctfile = os.path.normpath(correctfile)
923
924                self.assertEqual(writtenfile, correctfile)
925
926                # make sure correct data is in correct file
927                with open(writtenfile, "rb") as f:
928                    self.assertEqual(fdata.encode(), f.read())
929
930                unlink(writtenfile)
931
932        # remove the test file subdirectories
933        rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
934
935    def test_extract_all(self):
936        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
937            for fpath, fdata in SMALL_TEST_DATA:
938                zipfp.writestr(fpath, fdata)
939
940        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
941            zipfp.extractall()
942            for fpath, fdata in SMALL_TEST_DATA:
943                outfile = os.path.join(os.getcwd(), fpath)
944
945                with open(outfile, "rb") as f:
946                    self.assertEqual(fdata.encode(), f.read())
947
948                unlink(outfile)
949
950        # remove the test file subdirectories
951        rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
952
953    def check_file(self, filename, content):
954        self.assertTrue(os.path.isfile(filename))
955        with open(filename, 'rb') as f:
956            self.assertEqual(f.read(), content)
957
958    def test_sanitize_windows_name(self):
959        san = zipfile.ZipFile._sanitize_windows_name
960        # Passing pathsep in allows this test to work regardless of platform.
961        self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z')
962        self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i')
963        self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r')
964
965    def test_extract_hackers_arcnames_common_cases(self):
966        common_hacknames = [
967            ('../foo/bar', 'foo/bar'),
968            ('foo/../bar', 'foo/bar'),
969            ('foo/../../bar', 'foo/bar'),
970            ('foo/bar/..', 'foo/bar'),
971            ('./../foo/bar', 'foo/bar'),
972            ('/foo/bar', 'foo/bar'),
973            ('/foo/../bar', 'foo/bar'),
974            ('/foo/../../bar', 'foo/bar'),
975        ]
976        self._test_extract_hackers_arcnames(common_hacknames)
977
978    @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.')
979    def test_extract_hackers_arcnames_windows_only(self):
980        """Test combination of path fixing and windows name sanitization."""
981        windows_hacknames = [
982            (r'..\foo\bar', 'foo/bar'),
983            (r'..\/foo\/bar', 'foo/bar'),
984            (r'foo/\..\/bar', 'foo/bar'),
985            (r'foo\/../\bar', 'foo/bar'),
986            (r'C:foo/bar', 'foo/bar'),
987            (r'C:/foo/bar', 'foo/bar'),
988            (r'C://foo/bar', 'foo/bar'),
989            (r'C:\foo\bar', 'foo/bar'),
990            (r'//conky/mountpoint/foo/bar', 'foo/bar'),
991            (r'\\conky\mountpoint\foo\bar', 'foo/bar'),
992            (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
993            (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
994            (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'),
995            (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'),
996            (r'//?/C:/foo/bar', 'foo/bar'),
997            (r'\\?\C:\foo\bar', 'foo/bar'),
998            (r'C:/../C:/foo/bar', 'C_/foo/bar'),
999            (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'),
1000            ('../../foo../../ba..r', 'foo/ba..r'),
1001        ]
1002        self._test_extract_hackers_arcnames(windows_hacknames)
1003
1004    @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.')
1005    def test_extract_hackers_arcnames_posix_only(self):
1006        posix_hacknames = [
1007            ('//foo/bar', 'foo/bar'),
1008            ('../../foo../../ba..r', 'foo../ba..r'),
1009            (r'foo/..\bar', r'foo/..\bar'),
1010        ]
1011        self._test_extract_hackers_arcnames(posix_hacknames)
1012
1013    def _test_extract_hackers_arcnames(self, hacknames):
1014        for arcname, fixedname in hacknames:
1015            content = b'foobar' + arcname.encode()
1016            with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp:
1017                zinfo = zipfile.ZipInfo()
1018                # preserve backslashes
1019                zinfo.filename = arcname
1020                zinfo.external_attr = 0o600 << 16
1021                zipfp.writestr(zinfo, content)
1022
1023            arcname = arcname.replace(os.sep, "/")
1024            targetpath = os.path.join('target', 'subdir', 'subsub')
1025            correctfile = os.path.join(targetpath, *fixedname.split('/'))
1026
1027            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1028                writtenfile = zipfp.extract(arcname, targetpath)
1029                self.assertEqual(writtenfile, correctfile,
1030                                 msg='extract %r: %r != %r' %
1031                                 (arcname, writtenfile, correctfile))
1032            self.check_file(correctfile, content)
1033            rmtree('target')
1034
1035            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1036                zipfp.extractall(targetpath)
1037            self.check_file(correctfile, content)
1038            rmtree('target')
1039
1040            correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
1041
1042            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1043                writtenfile = zipfp.extract(arcname)
1044                self.assertEqual(writtenfile, correctfile,
1045                                 msg="extract %r" % arcname)
1046            self.check_file(correctfile, content)
1047            rmtree(fixedname.split('/')[0])
1048
1049            with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
1050                zipfp.extractall()
1051            self.check_file(correctfile, content)
1052            rmtree(fixedname.split('/')[0])
1053
1054            unlink(TESTFN2)
1055
1056
1057class OtherTests(unittest.TestCase):
1058    def test_open_via_zip_info(self):
1059        # Create the ZIP archive
1060        with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
1061            zipfp.writestr("name", "foo")
1062            with self.assertWarns(UserWarning):
1063                zipfp.writestr("name", "bar")
1064            self.assertEqual(zipfp.namelist(), ["name"] * 2)
1065
1066        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1067            infos = zipfp.infolist()
1068            data = b""
1069            for info in infos:
1070                with zipfp.open(info) as zipopen:
1071                    data += zipopen.read()
1072            self.assertIn(data, {b"foobar", b"barfoo"})
1073            data = b""
1074            for info in infos:
1075                data += zipfp.read(info)
1076            self.assertIn(data, {b"foobar", b"barfoo"})
1077
1078    def test_writestr_extended_local_header_issue1202(self):
1079        with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
1080            for data in 'abcdefghijklmnop':
1081                zinfo = zipfile.ZipInfo(data)
1082                zinfo.flag_bits |= 0x08  # Include an extended local header.
1083                orig_zip.writestr(zinfo, data)
1084
1085    def test_close(self):
1086        """Check that the zipfile is closed after the 'with' block."""
1087        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1088            for fpath, fdata in SMALL_TEST_DATA:
1089                zipfp.writestr(fpath, fdata)
1090                self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1091        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1092
1093        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1094            self.assertIsNotNone(zipfp.fp, 'zipfp is not open')
1095        self.assertIsNone(zipfp.fp, 'zipfp is not closed')
1096
1097    def test_close_on_exception(self):
1098        """Check that the zipfile is closed if an exception is raised in the
1099        'with' block."""
1100        with zipfile.ZipFile(TESTFN2, "w") as zipfp:
1101            for fpath, fdata in SMALL_TEST_DATA:
1102                zipfp.writestr(fpath, fdata)
1103
1104        try:
1105            with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
1106                raise zipfile.BadZipFile()
1107        except zipfile.BadZipFile:
1108            self.assertIsNone(zipfp2.fp, 'zipfp is not closed')
1109
1110    def test_unsupported_version(self):
1111        # File has an extract_version of 120
1112        data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00'
1113                b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00'
1114                b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00'
1115                b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06'
1116                b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00')
1117
1118        self.assertRaises(NotImplementedError, zipfile.ZipFile,
1119                          io.BytesIO(data), 'r')
1120
1121    @requires_zlib
1122    def test_read_unicode_filenames(self):
1123        # bug #10801
1124        fname = findfile('zip_cp437_header.zip')
1125        with zipfile.ZipFile(fname) as zipfp:
1126            for name in zipfp.namelist():
1127                zipfp.open(name).close()
1128
1129    def test_write_unicode_filenames(self):
1130        with zipfile.ZipFile(TESTFN, "w") as zf:
1131            zf.writestr("foo.txt", "Test for unicode filename")
1132            zf.writestr("\xf6.txt", "Test for unicode filename")
1133            self.assertIsInstance(zf.infolist()[0].filename, str)
1134
1135        with zipfile.ZipFile(TESTFN, "r") as zf:
1136            self.assertEqual(zf.filelist[0].filename, "foo.txt")
1137            self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
1138
1139    def test_exclusive_create_zip_file(self):
1140        """Test exclusive creating a new zipfile."""
1141        unlink(TESTFN2)
1142        filename = 'testfile.txt'
1143        content = b'hello, world. this is some content.'
1144        with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp:
1145            zipfp.writestr(filename, content)
1146        with self.assertRaises(FileExistsError):
1147            zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED)
1148        with zipfile.ZipFile(TESTFN2, "r") as zipfp:
1149            self.assertEqual(zipfp.namelist(), [filename])
1150            self.assertEqual(zipfp.read(filename), content)
1151
1152    def test_create_non_existent_file_for_append(self):
1153        if os.path.exists(TESTFN):
1154            os.unlink(TESTFN)
1155
1156        filename = 'testfile.txt'
1157        content = b'hello, world. this is some content.'
1158
1159        try:
1160            with zipfile.ZipFile(TESTFN, 'a') as zf:
1161                zf.writestr(filename, content)
1162        except OSError:
1163            self.fail('Could not append data to a non-existent zip file.')
1164
1165        self.assertTrue(os.path.exists(TESTFN))
1166
1167        with zipfile.ZipFile(TESTFN, 'r') as zf:
1168            self.assertEqual(zf.read(filename), content)
1169
1170    def test_close_erroneous_file(self):
1171        # This test checks that the ZipFile constructor closes the file object
1172        # it opens if there's an error in the file.  If it doesn't, the
1173        # traceback holds a reference to the ZipFile object and, indirectly,
1174        # the file object.
1175        # On Windows, this causes the os.unlink() call to fail because the
1176        # underlying file is still open.  This is SF bug #412214.
1177        #
1178        with open(TESTFN, "w") as fp:
1179            fp.write("this is not a legal zip file\n")
1180        try:
1181            zf = zipfile.ZipFile(TESTFN)
1182        except zipfile.BadZipFile:
1183            pass
1184
1185    def test_is_zip_erroneous_file(self):
1186        """Check that is_zipfile() correctly identifies non-zip files."""
1187        # - passing a filename
1188        with open(TESTFN, "w") as fp:
1189            fp.write("this is not a legal zip file\n")
1190        self.assertFalse(zipfile.is_zipfile(TESTFN))
1191        # - passing a file object
1192        with open(TESTFN, "rb") as fp:
1193            self.assertFalse(zipfile.is_zipfile(fp))
1194        # - passing a file-like object
1195        fp = io.BytesIO()
1196        fp.write(b"this is not a legal zip file\n")
1197        self.assertFalse(zipfile.is_zipfile(fp))
1198        fp.seek(0, 0)
1199        self.assertFalse(zipfile.is_zipfile(fp))
1200
1201    def test_damaged_zipfile(self):
1202        """Check that zipfiles with missing bytes at the end raise BadZipFile."""
1203        # - Create a valid zip file
1204        fp = io.BytesIO()
1205        with zipfile.ZipFile(fp, mode="w") as zipf:
1206            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1207        zipfiledata = fp.getvalue()
1208
1209        # - Now create copies of it missing the last N bytes and make sure
1210        #   a BadZipFile exception is raised when we try to open it
1211        for N in range(len(zipfiledata)):
1212            fp = io.BytesIO(zipfiledata[:N])
1213            self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp)
1214
1215    def test_is_zip_valid_file(self):
1216        """Check that is_zipfile() correctly identifies zip files."""
1217        # - passing a filename
1218        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1219            zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
1220
1221        self.assertTrue(zipfile.is_zipfile(TESTFN))
1222        # - passing a file object
1223        with open(TESTFN, "rb") as fp:
1224            self.assertTrue(zipfile.is_zipfile(fp))
1225            fp.seek(0, 0)
1226            zip_contents = fp.read()
1227        # - passing a file-like object
1228        fp = io.BytesIO()
1229        fp.write(zip_contents)
1230        self.assertTrue(zipfile.is_zipfile(fp))
1231        fp.seek(0, 0)
1232        self.assertTrue(zipfile.is_zipfile(fp))
1233
1234    def test_non_existent_file_raises_OSError(self):
1235        # make sure we don't raise an AttributeError when a partially-constructed
1236        # ZipFile instance is finalized; this tests for regression on SF tracker
1237        # bug #403871.
1238
1239        # The bug we're testing for caused an AttributeError to be raised
1240        # when a ZipFile instance was created for a file that did not
1241        # exist; the .fp member was not initialized but was needed by the
1242        # __del__() method.  Since the AttributeError is in the __del__(),
1243        # it is ignored, but the user should be sufficiently annoyed by
1244        # the message on the output that regression will be noticed
1245        # quickly.
1246        self.assertRaises(OSError, zipfile.ZipFile, TESTFN)
1247
1248    def test_empty_file_raises_BadZipFile(self):
1249        f = open(TESTFN, 'w')
1250        f.close()
1251        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1252
1253        with open(TESTFN, 'w') as fp:
1254            fp.write("short file")
1255        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN)
1256
1257    def test_closed_zip_raises_ValueError(self):
1258        """Verify that testzip() doesn't swallow inappropriate exceptions."""
1259        data = io.BytesIO()
1260        with zipfile.ZipFile(data, mode="w") as zipf:
1261            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1262
1263        # This is correct; calling .read on a closed ZipFile should raise
1264        # a ValueError, and so should calling .testzip.  An earlier
1265        # version of .testzip would swallow this exception (and any other)
1266        # and report that the first file in the archive was corrupt.
1267        self.assertRaises(ValueError, zipf.read, "foo.txt")
1268        self.assertRaises(ValueError, zipf.open, "foo.txt")
1269        self.assertRaises(ValueError, zipf.testzip)
1270        self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus")
1271        with open(TESTFN, 'w') as f:
1272            f.write('zipfile test data')
1273        self.assertRaises(ValueError, zipf.write, TESTFN)
1274
1275    def test_bad_constructor_mode(self):
1276        """Check that bad modes passed to ZipFile constructor are caught."""
1277        self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q")
1278
1279    def test_bad_open_mode(self):
1280        """Check that bad modes passed to ZipFile.open are caught."""
1281        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1282            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1283
1284        with zipfile.ZipFile(TESTFN, mode="r") as zipf:
1285            # read the data to make sure the file is there
1286            zipf.read("foo.txt")
1287            self.assertRaises(ValueError, zipf.open, "foo.txt", "q")
1288            # universal newlines support is removed
1289            self.assertRaises(ValueError, zipf.open, "foo.txt", "U")
1290            self.assertRaises(ValueError, zipf.open, "foo.txt", "rU")
1291
1292    def test_read0(self):
1293        """Check that calling read(0) on a ZipExtFile object returns an empty
1294        string and doesn't advance file pointer."""
1295        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1296            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1297            # read the data to make sure the file is there
1298            with zipf.open("foo.txt") as f:
1299                for i in range(FIXEDTEST_SIZE):
1300                    self.assertEqual(f.read(0), b'')
1301
1302                self.assertEqual(f.read(), b"O, for a Muse of Fire!")
1303
1304    def test_open_non_existent_item(self):
1305        """Check that attempting to call open() for an item that doesn't
1306        exist in the archive raises a RuntimeError."""
1307        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1308            self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
1309
1310    def test_bad_compression_mode(self):
1311        """Check that bad compression methods passed to ZipFile.open are
1312        caught."""
1313        self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1)
1314
1315    def test_unsupported_compression(self):
1316        # data is declared as shrunk, but actually deflated
1317        data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00'
1318                b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01'
1319                b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00'
1320                b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
1321                b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00'
1322                b'/\x00\x00\x00!\x00\x00\x00\x00\x00')
1323        with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf:
1324            self.assertRaises(NotImplementedError, zipf.open, 'x')
1325
1326    def test_null_byte_in_filename(self):
1327        """Check that a filename containing a null byte is properly
1328        terminated."""
1329        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1330            zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
1331            self.assertEqual(zipf.namelist(), ['foo.txt'])
1332
1333    def test_struct_sizes(self):
1334        """Check that ZIP internal structure sizes are calculated correctly."""
1335        self.assertEqual(zipfile.sizeEndCentDir, 22)
1336        self.assertEqual(zipfile.sizeCentralDir, 46)
1337        self.assertEqual(zipfile.sizeEndCentDir64, 56)
1338        self.assertEqual(zipfile.sizeEndCentDir64Locator, 20)
1339
1340    def test_comments(self):
1341        """Check that comments on the archive are handled properly."""
1342
1343        # check default comment is empty
1344        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1345            self.assertEqual(zipf.comment, b'')
1346            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1347
1348        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1349            self.assertEqual(zipfr.comment, b'')
1350
1351        # check a simple short comment
1352        comment = b'Bravely taking to his feet, he beat a very brave retreat.'
1353        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1354            zipf.comment = comment
1355            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1356        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1357            self.assertEqual(zipf.comment, comment)
1358
1359        # check a comment of max length
1360        comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
1361        comment2 = comment2.encode("ascii")
1362        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1363            zipf.comment = comment2
1364            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1365
1366        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1367            self.assertEqual(zipfr.comment, comment2)
1368
1369        # check a comment that is too long is truncated
1370        with zipfile.ZipFile(TESTFN, mode="w") as zipf:
1371            with self.assertWarns(UserWarning):
1372                zipf.comment = comment2 + b'oops'
1373            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1374        with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
1375            self.assertEqual(zipfr.comment, comment2)
1376
1377        # check that comments are correctly modified in append mode
1378        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1379            zipf.comment = b"original comment"
1380            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1381        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1382            zipf.comment = b"an updated comment"
1383        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1384            self.assertEqual(zipf.comment, b"an updated comment")
1385
1386        # check that comments are correctly shortened in append mode
1387        with zipfile.ZipFile(TESTFN,mode="w") as zipf:
1388            zipf.comment = b"original comment that's longer"
1389            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1390        with zipfile.ZipFile(TESTFN,mode="a") as zipf:
1391            zipf.comment = b"shorter comment"
1392        with zipfile.ZipFile(TESTFN,mode="r") as zipf:
1393            self.assertEqual(zipf.comment, b"shorter comment")
1394
1395    def test_unicode_comment(self):
1396        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1397            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1398            with self.assertRaises(TypeError):
1399                zipf.comment = "this is an error"
1400
1401    def test_change_comment_in_empty_archive(self):
1402        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1403            self.assertFalse(zipf.filelist)
1404            zipf.comment = b"this is a comment"
1405        with zipfile.ZipFile(TESTFN, "r") as zipf:
1406            self.assertEqual(zipf.comment, b"this is a comment")
1407
1408    def test_change_comment_in_nonempty_archive(self):
1409        with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf:
1410            zipf.writestr("foo.txt", "O, for a Muse of Fire!")
1411        with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf:
1412            self.assertTrue(zipf.filelist)
1413            zipf.comment = b"this is a comment"
1414        with zipfile.ZipFile(TESTFN, "r") as zipf:
1415            self.assertEqual(zipf.comment, b"this is a comment")
1416
1417    def test_empty_zipfile(self):
1418        # Check that creating a file in 'w' or 'a' mode and closing without
1419        # adding any files to the archives creates a valid empty ZIP file
1420        zipf = zipfile.ZipFile(TESTFN, mode="w")
1421        zipf.close()
1422        try:
1423            zipf = zipfile.ZipFile(TESTFN, mode="r")
1424        except zipfile.BadZipFile:
1425            self.fail("Unable to create empty ZIP file in 'w' mode")
1426
1427        zipf = zipfile.ZipFile(TESTFN, mode="a")
1428        zipf.close()
1429        try:
1430            zipf = zipfile.ZipFile(TESTFN, mode="r")
1431        except:
1432            self.fail("Unable to create empty ZIP file in 'a' mode")
1433
1434    def test_open_empty_file(self):
1435        # Issue 1710703: Check that opening a file with less than 22 bytes
1436        # raises a BadZipFile exception (rather than the previously unhelpful
1437        # OSError)
1438        f = open(TESTFN, 'w')
1439        f.close()
1440        self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r')
1441
1442    def test_create_zipinfo_before_1980(self):
1443        self.assertRaises(ValueError,
1444                          zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0))
1445
1446    def test_zipfile_with_short_extra_field(self):
1447        """If an extra field in the header is less than 4 bytes, skip it."""
1448        zipdata = (
1449            b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e'
1450            b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab'
1451            b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00'
1452            b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00'
1453            b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00'
1454            b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00'
1455            b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00'
1456        )
1457        with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf:
1458            # testzip returns the name of the first corrupt file, or None
1459            self.assertIsNone(zipf.testzip())
1460
1461    def test_open_conflicting_handles(self):
1462        # It's only possible to open one writable file handle at a time
1463        msg1 = b"It's fun to charter an accountant!"
1464        msg2 = b"And sail the wide accountant sea"
1465        msg3 = b"To find, explore the funds offshore"
1466        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf:
1467            with zipf.open('foo', mode='w') as w2:
1468                w2.write(msg1)
1469            with zipf.open('bar', mode='w') as w1:
1470                with self.assertRaises(ValueError):
1471                    zipf.open('handle', mode='w')
1472                with self.assertRaises(ValueError):
1473                    zipf.open('foo', mode='r')
1474                with self.assertRaises(ValueError):
1475                    zipf.writestr('str', 'abcde')
1476                with self.assertRaises(ValueError):
1477                    zipf.write(__file__, 'file')
1478                with self.assertRaises(ValueError):
1479                    zipf.close()
1480                w1.write(msg2)
1481            with zipf.open('baz', mode='w') as w2:
1482                w2.write(msg3)
1483
1484        with zipfile.ZipFile(TESTFN2, 'r') as zipf:
1485            self.assertEqual(zipf.read('foo'), msg1)
1486            self.assertEqual(zipf.read('bar'), msg2)
1487            self.assertEqual(zipf.read('baz'), msg3)
1488            self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz'])
1489
1490    def tearDown(self):
1491        unlink(TESTFN)
1492        unlink(TESTFN2)
1493
1494
1495class AbstractBadCrcTests:
1496    def test_testzip_with_bad_crc(self):
1497        """Tests that files with bad CRCs return their name from testzip."""
1498        zipdata = self.zip_with_bad_crc
1499
1500        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1501            # testzip returns the name of the first corrupt file, or None
1502            self.assertEqual('afile', zipf.testzip())
1503
1504    def test_read_with_bad_crc(self):
1505        """Tests that files with bad CRCs raise a BadZipFile exception when read."""
1506        zipdata = self.zip_with_bad_crc
1507
1508        # Using ZipFile.read()
1509        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1510            self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile')
1511
1512        # Using ZipExtFile.read()
1513        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1514            with zipf.open('afile', 'r') as corrupt_file:
1515                self.assertRaises(zipfile.BadZipFile, corrupt_file.read)
1516
1517        # Same with small reads (in order to exercise the buffering logic)
1518        with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf:
1519            with zipf.open('afile', 'r') as corrupt_file:
1520                corrupt_file.MIN_READ_SIZE = 2
1521                with self.assertRaises(zipfile.BadZipFile):
1522                    while corrupt_file.read(2):
1523                        pass
1524
1525
1526class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1527    compression = zipfile.ZIP_STORED
1528    zip_with_bad_crc = (
1529        b'PK\003\004\024\0\0\0\0\0 \213\212;:r'
1530        b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af'
1531        b'ilehello,AworldP'
1532        b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:'
1533        b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0'
1534        b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi'
1535        b'lePK\005\006\0\0\0\0\001\0\001\0003\000'
1536        b'\0\0/\0\0\0\0\0')
1537
1538@requires_zlib
1539class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1540    compression = zipfile.ZIP_DEFLATED
1541    zip_with_bad_crc = (
1542        b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA'
1543        b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1544        b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0'
1545        b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n'
1546        b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05'
1547        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00'
1548        b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00'
1549        b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00')
1550
1551@requires_bz2
1552class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1553    compression = zipfile.ZIP_BZIP2
1554    zip_with_bad_crc = (
1555        b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA'
1556        b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1557        b'ileBZh91AY&SY\xd4\xa8\xca'
1558        b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5'
1559        b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f'
1560        b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14'
1561        b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8'
1562        b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00'
1563        b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK'
1564        b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00'
1565        b'\x00\x00\x00\x00')
1566
1567@requires_lzma
1568class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase):
1569    compression = zipfile.ZIP_LZMA
1570    zip_with_bad_crc = (
1571        b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
1572        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af'
1573        b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I'
1574        b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK'
1575        b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA'
1576        b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00'
1577        b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil'
1578        b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00'
1579        b'\x00>\x00\x00\x00\x00\x00')
1580
1581
1582class DecryptionTests(unittest.TestCase):
1583    """Check that ZIP decryption works. Since the library does not
1584    support encryption at the moment, we use a pre-generated encrypted
1585    ZIP file."""
1586
1587    data = (
1588        b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00'
1589        b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y'
1590        b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl'
1591        b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00'
1592        b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81'
1593        b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00'
1594        b'\x00\x00L\x00\x00\x00\x00\x00' )
1595    data2 = (
1596        b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02'
1597        b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04'
1598        b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0'
1599        b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03'
1600        b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00'
1601        b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze'
1602        b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01'
1603        b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' )
1604
1605    plain = b'zipfile.py encryption test'
1606    plain2 = b'\x00'*512
1607
1608    def setUp(self):
1609        with open(TESTFN, "wb") as fp:
1610            fp.write(self.data)
1611        self.zip = zipfile.ZipFile(TESTFN, "r")
1612        with open(TESTFN2, "wb") as fp:
1613            fp.write(self.data2)
1614        self.zip2 = zipfile.ZipFile(TESTFN2, "r")
1615
1616    def tearDown(self):
1617        self.zip.close()
1618        os.unlink(TESTFN)
1619        self.zip2.close()
1620        os.unlink(TESTFN2)
1621
1622    def test_no_password(self):
1623        # Reading the encrypted file without password
1624        # must generate a RunTime exception
1625        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1626        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1627
1628    def test_bad_password(self):
1629        self.zip.setpassword(b"perl")
1630        self.assertRaises(RuntimeError, self.zip.read, "test.txt")
1631        self.zip2.setpassword(b"perl")
1632        self.assertRaises(RuntimeError, self.zip2.read, "zero")
1633
1634    @requires_zlib
1635    def test_good_password(self):
1636        self.zip.setpassword(b"python")
1637        self.assertEqual(self.zip.read("test.txt"), self.plain)
1638        self.zip2.setpassword(b"12345")
1639        self.assertEqual(self.zip2.read("zero"), self.plain2)
1640
1641    def test_unicode_password(self):
1642        self.assertRaises(TypeError, self.zip.setpassword, "unicode")
1643        self.assertRaises(TypeError, self.zip.read, "test.txt", "python")
1644        self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python")
1645        self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python")
1646
1647class AbstractTestsWithRandomBinaryFiles:
1648    @classmethod
1649    def setUpClass(cls):
1650        datacount = randint(16, 64)*1024 + randint(1, 1024)
1651        cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000))
1652                            for i in range(datacount))
1653
1654    def setUp(self):
1655        # Make a source file with some lines
1656        with open(TESTFN, "wb") as fp:
1657            fp.write(self.data)
1658
1659    def tearDown(self):
1660        unlink(TESTFN)
1661        unlink(TESTFN2)
1662
1663    def make_test_archive(self, f, compression):
1664        # Create the ZIP archive
1665        with zipfile.ZipFile(f, "w", compression) as zipfp:
1666            zipfp.write(TESTFN, "another.name")
1667            zipfp.write(TESTFN, TESTFN)
1668
1669    def zip_test(self, f, compression):
1670        self.make_test_archive(f, compression)
1671
1672        # Read the ZIP archive
1673        with zipfile.ZipFile(f, "r", compression) as zipfp:
1674            testdata = zipfp.read(TESTFN)
1675            self.assertEqual(len(testdata), len(self.data))
1676            self.assertEqual(testdata, self.data)
1677            self.assertEqual(zipfp.read("another.name"), self.data)
1678
1679    def test_read(self):
1680        for f in get_files(self):
1681            self.zip_test(f, self.compression)
1682
1683    def zip_open_test(self, f, compression):
1684        self.make_test_archive(f, compression)
1685
1686        # Read the ZIP archive
1687        with zipfile.ZipFile(f, "r", compression) as zipfp:
1688            zipdata1 = []
1689            with zipfp.open(TESTFN) as zipopen1:
1690                while True:
1691                    read_data = zipopen1.read(256)
1692                    if not read_data:
1693                        break
1694                    zipdata1.append(read_data)
1695
1696            zipdata2 = []
1697            with zipfp.open("another.name") as zipopen2:
1698                while True:
1699                    read_data = zipopen2.read(256)
1700                    if not read_data:
1701                        break
1702                    zipdata2.append(read_data)
1703
1704            testdata1 = b''.join(zipdata1)
1705            self.assertEqual(len(testdata1), len(self.data))
1706            self.assertEqual(testdata1, self.data)
1707
1708            testdata2 = b''.join(zipdata2)
1709            self.assertEqual(len(testdata2), len(self.data))
1710            self.assertEqual(testdata2, self.data)
1711
1712    def test_open(self):
1713        for f in get_files(self):
1714            self.zip_open_test(f, self.compression)
1715
1716    def zip_random_open_test(self, f, compression):
1717        self.make_test_archive(f, compression)
1718
1719        # Read the ZIP archive
1720        with zipfile.ZipFile(f, "r", compression) as zipfp:
1721            zipdata1 = []
1722            with zipfp.open(TESTFN) as zipopen1:
1723                while True:
1724                    read_data = zipopen1.read(randint(1, 1024))
1725                    if not read_data:
1726                        break
1727                    zipdata1.append(read_data)
1728
1729            testdata = b''.join(zipdata1)
1730            self.assertEqual(len(testdata), len(self.data))
1731            self.assertEqual(testdata, self.data)
1732
1733    def test_random_open(self):
1734        for f in get_files(self):
1735            self.zip_random_open_test(f, self.compression)
1736
1737
1738class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1739                                       unittest.TestCase):
1740    compression = zipfile.ZIP_STORED
1741
1742@requires_zlib
1743class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1744                                        unittest.TestCase):
1745    compression = zipfile.ZIP_DEFLATED
1746
1747@requires_bz2
1748class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1749                                      unittest.TestCase):
1750    compression = zipfile.ZIP_BZIP2
1751
1752@requires_lzma
1753class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles,
1754                                     unittest.TestCase):
1755    compression = zipfile.ZIP_LZMA
1756
1757
1758# Privide the tell() method but not seek()
1759class Tellable:
1760    def __init__(self, fp):
1761        self.fp = fp
1762        self.offset = 0
1763
1764    def write(self, data):
1765        n = self.fp.write(data)
1766        self.offset += n
1767        return n
1768
1769    def tell(self):
1770        return self.offset
1771
1772    def flush(self):
1773        self.fp.flush()
1774
1775class Unseekable:
1776    def __init__(self, fp):
1777        self.fp = fp
1778
1779    def write(self, data):
1780        return self.fp.write(data)
1781
1782    def flush(self):
1783        self.fp.flush()
1784
1785class UnseekableTests(unittest.TestCase):
1786    def test_writestr(self):
1787        for wrapper in (lambda f: f), Tellable, Unseekable:
1788            with self.subTest(wrapper=wrapper):
1789                f = io.BytesIO()
1790                f.write(b'abc')
1791                bf = io.BufferedWriter(f)
1792                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
1793                    zipfp.writestr('ones', b'111')
1794                    zipfp.writestr('twos', b'222')
1795                self.assertEqual(f.getvalue()[:5], b'abcPK')
1796                with zipfile.ZipFile(f, mode='r') as zipf:
1797                    with zipf.open('ones') as zopen:
1798                        self.assertEqual(zopen.read(), b'111')
1799                    with zipf.open('twos') as zopen:
1800                        self.assertEqual(zopen.read(), b'222')
1801
1802    def test_write(self):
1803        for wrapper in (lambda f: f), Tellable, Unseekable:
1804            with self.subTest(wrapper=wrapper):
1805                f = io.BytesIO()
1806                f.write(b'abc')
1807                bf = io.BufferedWriter(f)
1808                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp:
1809                    self.addCleanup(unlink, TESTFN)
1810                    with open(TESTFN, 'wb') as f2:
1811                        f2.write(b'111')
1812                    zipfp.write(TESTFN, 'ones')
1813                    with open(TESTFN, 'wb') as f2:
1814                        f2.write(b'222')
1815                    zipfp.write(TESTFN, 'twos')
1816                self.assertEqual(f.getvalue()[:5], b'abcPK')
1817                with zipfile.ZipFile(f, mode='r') as zipf:
1818                    with zipf.open('ones') as zopen:
1819                        self.assertEqual(zopen.read(), b'111')
1820                    with zipf.open('twos') as zopen:
1821                        self.assertEqual(zopen.read(), b'222')
1822
1823    def test_open_write(self):
1824        for wrapper in (lambda f: f), Tellable, Unseekable:
1825            with self.subTest(wrapper=wrapper):
1826                f = io.BytesIO()
1827                f.write(b'abc')
1828                bf = io.BufferedWriter(f)
1829                with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf:
1830                    with zipf.open('ones', 'w') as zopen:
1831                        zopen.write(b'111')
1832                    with zipf.open('twos', 'w') as zopen:
1833                        zopen.write(b'222')
1834                self.assertEqual(f.getvalue()[:5], b'abcPK')
1835                with zipfile.ZipFile(f) as zipf:
1836                    self.assertEqual(zipf.read('ones'), b'111')
1837                    self.assertEqual(zipf.read('twos'), b'222')
1838
1839
1840@requires_zlib
1841class TestsWithMultipleOpens(unittest.TestCase):
1842    @classmethod
1843    def setUpClass(cls):
1844        cls.data1 = b'111' + getrandbytes(10000)
1845        cls.data2 = b'222' + getrandbytes(10000)
1846
1847    def make_test_archive(self, f):
1848        # Create the ZIP archive
1849        with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp:
1850            zipfp.writestr('ones', self.data1)
1851            zipfp.writestr('twos', self.data2)
1852
1853    def test_same_file(self):
1854        # Verify that (when the ZipFile is in control of creating file objects)
1855        # multiple open() calls can be made without interfering with each other.
1856        for f in get_files(self):
1857            self.make_test_archive(f)
1858            with zipfile.ZipFile(f, mode="r") as zipf:
1859                with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
1860                    data1 = zopen1.read(500)
1861                    data2 = zopen2.read(500)
1862                    data1 += zopen1.read()
1863                    data2 += zopen2.read()
1864                self.assertEqual(data1, data2)
1865                self.assertEqual(data1, self.data1)
1866
1867    def test_different_file(self):
1868        # Verify that (when the ZipFile is in control of creating file objects)
1869        # multiple open() calls can be made without interfering with each other.
1870        for f in get_files(self):
1871            self.make_test_archive(f)
1872            with zipfile.ZipFile(f, mode="r") as zipf:
1873                with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2:
1874                    data1 = zopen1.read(500)
1875                    data2 = zopen2.read(500)
1876                    data1 += zopen1.read()
1877                    data2 += zopen2.read()
1878                self.assertEqual(data1, self.data1)
1879                self.assertEqual(data2, self.data2)
1880
1881    def test_interleaved(self):
1882        # Verify that (when the ZipFile is in control of creating file objects)
1883        # multiple open() calls can be made without interfering with each other.
1884        for f in get_files(self):
1885            self.make_test_archive(f)
1886            with zipfile.ZipFile(f, mode="r") as zipf:
1887                with zipf.open('ones') as zopen1:
1888                    data1 = zopen1.read(500)
1889                    with zipf.open('twos') as zopen2:
1890                        data2 = zopen2.read(500)
1891                        data1 += zopen1.read()
1892                        data2 += zopen2.read()
1893                self.assertEqual(data1, self.data1)
1894                self.assertEqual(data2, self.data2)
1895
1896    def test_read_after_close(self):
1897        for f in get_files(self):
1898            self.make_test_archive(f)
1899            with contextlib.ExitStack() as stack:
1900                with zipfile.ZipFile(f, 'r') as zipf:
1901                    zopen1 = stack.enter_context(zipf.open('ones'))
1902                    zopen2 = stack.enter_context(zipf.open('twos'))
1903                data1 = zopen1.read(500)
1904                data2 = zopen2.read(500)
1905                data1 += zopen1.read()
1906                data2 += zopen2.read()
1907            self.assertEqual(data1, self.data1)
1908            self.assertEqual(data2, self.data2)
1909
1910    def test_read_after_write(self):
1911        for f in get_files(self):
1912            with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf:
1913                zipf.writestr('ones', self.data1)
1914                zipf.writestr('twos', self.data2)
1915                with zipf.open('ones') as zopen1:
1916                    data1 = zopen1.read(500)
1917            self.assertEqual(data1, self.data1[:500])
1918            with zipfile.ZipFile(f, 'r') as zipf:
1919                data1 = zipf.read('ones')
1920                data2 = zipf.read('twos')
1921            self.assertEqual(data1, self.data1)
1922            self.assertEqual(data2, self.data2)
1923
1924    def test_write_after_read(self):
1925        for f in get_files(self):
1926            with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf:
1927                zipf.writestr('ones', self.data1)
1928                with zipf.open('ones') as zopen1:
1929                    zopen1.read(500)
1930                    zipf.writestr('twos', self.data2)
1931            with zipfile.ZipFile(f, 'r') as zipf:
1932                data1 = zipf.read('ones')
1933                data2 = zipf.read('twos')
1934            self.assertEqual(data1, self.data1)
1935            self.assertEqual(data2, self.data2)
1936
1937    def test_many_opens(self):
1938        # Verify that read() and open() promptly close the file descriptor,
1939        # and don't rely on the garbage collector to free resources.
1940        self.make_test_archive(TESTFN2)
1941        with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
1942            for x in range(100):
1943                zipf.read('ones')
1944                with zipf.open('ones') as zopen1:
1945                    pass
1946        with open(os.devnull) as f:
1947            self.assertLess(f.fileno(), 100)
1948
1949    def test_write_while_reading(self):
1950        with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf:
1951            zipf.writestr('ones', self.data1)
1952        with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf:
1953            with zipf.open('ones', 'r') as r1:
1954                data1 = r1.read(500)
1955                with zipf.open('twos', 'w') as w1:
1956                    w1.write(self.data2)
1957                data1 += r1.read()
1958        self.assertEqual(data1, self.data1)
1959        with zipfile.ZipFile(TESTFN2) as zipf:
1960            self.assertEqual(zipf.read('twos'), self.data2)
1961
1962    def tearDown(self):
1963        unlink(TESTFN2)
1964
1965
1966class TestWithDirectory(unittest.TestCase):
1967    def setUp(self):
1968        os.mkdir(TESTFN2)
1969
1970    def test_extract_dir(self):
1971        with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
1972            zipf.extractall(TESTFN2)
1973        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
1974        self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
1975        self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c")))
1976
1977    def test_bug_6050(self):
1978        # Extraction should succeed if directories already exist
1979        os.mkdir(os.path.join(TESTFN2, "a"))
1980        self.test_extract_dir()
1981
1982    def test_write_dir(self):
1983        dirpath = os.path.join(TESTFN2, "x")
1984        os.mkdir(dirpath)
1985        mode = os.stat(dirpath).st_mode & 0xFFFF
1986        with zipfile.ZipFile(TESTFN, "w") as zipf:
1987            zipf.write(dirpath)
1988            zinfo = zipf.filelist[0]
1989            self.assertTrue(zinfo.filename.endswith("/x/"))
1990            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1991            zipf.write(dirpath, "y")
1992            zinfo = zipf.filelist[1]
1993            self.assertTrue(zinfo.filename, "y/")
1994            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1995        with zipfile.ZipFile(TESTFN, "r") as zipf:
1996            zinfo = zipf.filelist[0]
1997            self.assertTrue(zinfo.filename.endswith("/x/"))
1998            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
1999            zinfo = zipf.filelist[1]
2000            self.assertTrue(zinfo.filename, "y/")
2001            self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10)
2002            target = os.path.join(TESTFN2, "target")
2003            os.mkdir(target)
2004            zipf.extractall(target)
2005            self.assertTrue(os.path.isdir(os.path.join(target, "y")))
2006            self.assertEqual(len(os.listdir(target)), 2)
2007
2008    def test_writestr_dir(self):
2009        os.mkdir(os.path.join(TESTFN2, "x"))
2010        with zipfile.ZipFile(TESTFN, "w") as zipf:
2011            zipf.writestr("x/", b'')
2012            zinfo = zipf.filelist[0]
2013            self.assertEqual(zinfo.filename, "x/")
2014            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2015        with zipfile.ZipFile(TESTFN, "r") as zipf:
2016            zinfo = zipf.filelist[0]
2017            self.assertTrue(zinfo.filename.endswith("x/"))
2018            self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10)
2019            target = os.path.join(TESTFN2, "target")
2020            os.mkdir(target)
2021            zipf.extractall(target)
2022            self.assertTrue(os.path.isdir(os.path.join(target, "x")))
2023            self.assertEqual(os.listdir(target), ["x"])
2024
2025    def tearDown(self):
2026        rmtree(TESTFN2)
2027        if os.path.exists(TESTFN):
2028            unlink(TESTFN)
2029
2030
2031class ZipInfoTests(unittest.TestCase):
2032    def test_from_file(self):
2033        zi = zipfile.ZipInfo.from_file(__file__)
2034        self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py')
2035        self.assertFalse(zi.is_dir())
2036
2037    def test_from_dir(self):
2038        dirpath = os.path.dirname(os.path.abspath(__file__))
2039        zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests')
2040        self.assertEqual(zi.filename, 'stdlib_tests/')
2041        self.assertTrue(zi.is_dir())
2042        self.assertEqual(zi.compress_type, zipfile.ZIP_STORED)
2043        self.assertEqual(zi.file_size, 0)
2044
2045
2046class CommandLineTest(unittest.TestCase):
2047
2048    def zipfilecmd(self, *args, **kwargs):
2049        rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args,
2050                                                      **kwargs)
2051        return out.replace(os.linesep.encode(), b'\n')
2052
2053    def zipfilecmd_failure(self, *args):
2054        return script_helper.assert_python_failure('-m', 'zipfile', *args)
2055
2056    def test_test_command(self):
2057        zip_name = findfile('zipdir.zip')
2058        out = self.zipfilecmd('-t', zip_name)
2059        self.assertEqual(out.rstrip(), b'Done testing')
2060        zip_name = findfile('testtar.tar')
2061        rc, out, err = self.zipfilecmd_failure('-t', zip_name)
2062        self.assertEqual(out, b'')
2063
2064    def test_list_command(self):
2065        zip_name = findfile('zipdir.zip')
2066        t = io.StringIO()
2067        with zipfile.ZipFile(zip_name, 'r') as tf:
2068            tf.printdir(t)
2069        expected = t.getvalue().encode('ascii', 'backslashreplace')
2070        out = self.zipfilecmd('-l', zip_name,
2071                              PYTHONIOENCODING='ascii:backslashreplace')
2072        self.assertEqual(out, expected)
2073
2074    @requires_zlib
2075    def test_create_command(self):
2076        self.addCleanup(unlink, TESTFN)
2077        with open(TESTFN, 'w') as f:
2078            f.write('test 1')
2079        os.mkdir(TESTFNDIR)
2080        self.addCleanup(rmtree, TESTFNDIR)
2081        with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f:
2082            f.write('test 2')
2083        files = [TESTFN, TESTFNDIR]
2084        namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt']
2085        try:
2086            out = self.zipfilecmd('-c', TESTFN2, *files)
2087            self.assertEqual(out, b'')
2088            with zipfile.ZipFile(TESTFN2) as zf:
2089                self.assertEqual(zf.namelist(), namelist)
2090                self.assertEqual(zf.read(namelist[0]), b'test 1')
2091                self.assertEqual(zf.read(namelist[2]), b'test 2')
2092        finally:
2093            unlink(TESTFN2)
2094
2095    def test_extract_command(self):
2096        zip_name = findfile('zipdir.zip')
2097        with temp_dir() as extdir:
2098            out = self.zipfilecmd('-e', zip_name, extdir)
2099            self.assertEqual(out, b'')
2100            with zipfile.ZipFile(zip_name) as zf:
2101                for zi in zf.infolist():
2102                    path = os.path.join(extdir,
2103                                zi.filename.replace('/', os.sep))
2104                    if zi.is_dir():
2105                        self.assertTrue(os.path.isdir(path))
2106                    else:
2107                        self.assertTrue(os.path.isfile(path))
2108                        with open(path, 'rb') as f:
2109                            self.assertEqual(f.read(), zf.read(zi))
2110
2111if __name__ == "__main__":
2112    unittest.main()
2113