• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Tests of the full ZIP64 functionality of zipfile
2# The support.requires call is the only reason for keeping this separate
3# from test_zipfile
4from test import support
5
6# XXX(nnorwitz): disable this test by looking for extralargefile resource,
7# which doesn't exist.  This test takes over 30 minutes to run in general
8# and requires more disk space than most of the buildbots.
9support.requires(
10        'extralargefile',
11        'test requires loads of disk-space bytes and a long time to run'
12    )
13
14import zipfile, os, unittest
15import time
16import sys
17
18from tempfile import TemporaryFile
19
20from test.support import TESTFN, requires_zlib
21
22TESTFN2 = TESTFN + "2"
23
24# How much time in seconds can pass before we print a 'Still working' message.
25_PRINT_WORKING_MSG_INTERVAL = 60
26
27class TestsWithSourceFile(unittest.TestCase):
28    def setUp(self):
29        # Create test data.
30        line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
31        self.data = '\n'.join(line_gen).encode('ascii')
32
33        # And write it to a file.
34        fp = open(TESTFN, "wb")
35        fp.write(self.data)
36        fp.close()
37
38    def zipTest(self, f, compression):
39        # Create the ZIP archive.
40        zipfp = zipfile.ZipFile(f, "w", compression)
41
42        # It will contain enough copies of self.data to reach about 6 GiB of
43        # raw data to store.
44        filecount = 6*1024**3 // len(self.data)
45
46        next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
47        for num in range(filecount):
48            zipfp.writestr("testfn%d" % num, self.data)
49            # Print still working message since this test can be really slow
50            if next_time <= time.monotonic():
51                next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
52                print((
53                   '  zipTest still writing %d of %d, be patient...' %
54                   (num, filecount)), file=sys.__stdout__)
55                sys.__stdout__.flush()
56        zipfp.close()
57
58        # Read the ZIP archive
59        zipfp = zipfile.ZipFile(f, "r", compression)
60        for num in range(filecount):
61            self.assertEqual(zipfp.read("testfn%d" % num), self.data)
62            # Print still working message since this test can be really slow
63            if next_time <= time.monotonic():
64                next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
65                print((
66                   '  zipTest still reading %d of %d, be patient...' %
67                   (num, filecount)), file=sys.__stdout__)
68                sys.__stdout__.flush()
69        zipfp.close()
70
71    def testStored(self):
72        # Try the temp file first.  If we do TESTFN2 first, then it hogs
73        # gigabytes of disk space for the duration of the test.
74        with TemporaryFile() as f:
75            self.zipTest(f, zipfile.ZIP_STORED)
76            self.assertFalse(f.closed)
77        self.zipTest(TESTFN2, zipfile.ZIP_STORED)
78
79    @requires_zlib
80    def testDeflated(self):
81        # Try the temp file first.  If we do TESTFN2 first, then it hogs
82        # gigabytes of disk space for the duration of the test.
83        with TemporaryFile() as f:
84            self.zipTest(f, zipfile.ZIP_DEFLATED)
85            self.assertFalse(f.closed)
86        self.zipTest(TESTFN2, zipfile.ZIP_DEFLATED)
87
88    def tearDown(self):
89        for fname in TESTFN, TESTFN2:
90            if os.path.exists(fname):
91                os.remove(fname)
92
93
94class OtherTests(unittest.TestCase):
95    def testMoreThan64kFiles(self):
96        # This test checks that more than 64k files can be added to an archive,
97        # and that the resulting archive can be read properly by ZipFile
98        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=True)
99        zipf.debug = 100
100        numfiles = (1 << 16) * 3//2
101        for i in range(numfiles):
102            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
103        self.assertEqual(len(zipf.namelist()), numfiles)
104        zipf.close()
105
106        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
107        self.assertEqual(len(zipf2.namelist()), numfiles)
108        for i in range(numfiles):
109            content = zipf2.read("foo%08d" % i).decode('ascii')
110            self.assertEqual(content, "%d" % (i**3 % 57))
111        zipf2.close()
112
113    def testMoreThan64kFilesAppend(self):
114        zipf = zipfile.ZipFile(TESTFN, mode="w", allowZip64=False)
115        zipf.debug = 100
116        numfiles = (1 << 16) - 1
117        for i in range(numfiles):
118            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
119        self.assertEqual(len(zipf.namelist()), numfiles)
120        with self.assertRaises(zipfile.LargeZipFile):
121            zipf.writestr("foo%08d" % numfiles, b'')
122        self.assertEqual(len(zipf.namelist()), numfiles)
123        zipf.close()
124
125        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=False)
126        zipf.debug = 100
127        self.assertEqual(len(zipf.namelist()), numfiles)
128        with self.assertRaises(zipfile.LargeZipFile):
129            zipf.writestr("foo%08d" % numfiles, b'')
130        self.assertEqual(len(zipf.namelist()), numfiles)
131        zipf.close()
132
133        zipf = zipfile.ZipFile(TESTFN, mode="a", allowZip64=True)
134        zipf.debug = 100
135        self.assertEqual(len(zipf.namelist()), numfiles)
136        numfiles2 = (1 << 16) * 3//2
137        for i in range(numfiles, numfiles2):
138            zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57))
139        self.assertEqual(len(zipf.namelist()), numfiles2)
140        zipf.close()
141
142        zipf2 = zipfile.ZipFile(TESTFN, mode="r")
143        self.assertEqual(len(zipf2.namelist()), numfiles2)
144        for i in range(numfiles2):
145            content = zipf2.read("foo%08d" % i).decode('ascii')
146            self.assertEqual(content, "%d" % (i**3 % 57))
147        zipf2.close()
148
149    def tearDown(self):
150        support.unlink(TESTFN)
151        support.unlink(TESTFN2)
152
153if __name__ == "__main__":
154    unittest.main()
155