• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the gzip module.
2"""
3
4import unittest
5from test import test_support
6import os
7import io
8import struct
9gzip = test_support.import_module('gzip')
10
11data1 = """  int length=DEFAULTALLOC, err = Z_OK;
12  PyObject *RetVal;
13  int flushmode = Z_FINISH;
14  unsigned long start_total_out;
15
16"""
17
18data2 = """/* zlibmodule.c -- gzip-compatible data compression */
19/* See http://www.gzip.org/zlib/
20/* See http://www.winimage.com/zLibDll for Windows */
21"""
22
23
24class TestGzip(unittest.TestCase):
25    filename = test_support.TESTFN
26
27    def setUp(self):
28        test_support.unlink(self.filename)
29
30    def tearDown(self):
31        test_support.unlink(self.filename)
32
33    def write_and_read_back(self, data, mode='b'):
34        b_data = memoryview(data).tobytes()
35        with gzip.GzipFile(self.filename, 'w'+mode) as f:
36            l = f.write(data)
37        self.assertEqual(l, len(b_data))
38        with gzip.GzipFile(self.filename, 'r'+mode) as f:
39            self.assertEqual(f.read(), b_data)
40
41    @test_support.requires_unicode
42    def test_unicode_filename(self):
43        unicode_filename = test_support.TESTFN_UNICODE
44        try:
45            unicode_filename.encode(test_support.TESTFN_ENCODING)
46        except (UnicodeError, TypeError):
47            self.skipTest("Requires unicode filenames support")
48        self.filename = unicode_filename
49        with gzip.GzipFile(unicode_filename, "wb") as f:
50            f.write(data1 * 50)
51        with gzip.GzipFile(unicode_filename, "rb") as f:
52            self.assertEqual(f.read(), data1 * 50)
53        # Sanity check that we are actually operating on the right file.
54        with open(unicode_filename, 'rb') as fobj, \
55             gzip.GzipFile(fileobj=fobj, mode="rb") as f:
56            self.assertEqual(f.read(), data1 * 50)
57
58    def test_write(self):
59        with gzip.GzipFile(self.filename, 'wb') as f:
60            f.write(data1 * 50)
61
62            # Try flush and fileno.
63            f.flush()
64            f.fileno()
65            if hasattr(os, 'fsync'):
66                os.fsync(f.fileno())
67            f.close()
68
69        # Test multiple close() calls.
70        f.close()
71
72    # The following test_write_xy methods test that write accepts
73    # the corresponding bytes-like object type as input
74    # and that the data written equals bytes(xy) in all cases.
75    def test_write_memoryview(self):
76        self.write_and_read_back(memoryview(data1 * 50))
77
78    def test_write_incompatible_type(self):
79        # Test that non-bytes-like types raise TypeError.
80        # Issue #21560: attempts to write incompatible types
81        # should not affect the state of the fileobject
82        with gzip.GzipFile(self.filename, 'wb') as f:
83            with self.assertRaises(UnicodeEncodeError):
84                f.write(u'\xff')
85            with self.assertRaises(TypeError):
86                f.write([1])
87            f.write(data1)
88        with gzip.GzipFile(self.filename, 'rb') as f:
89            self.assertEqual(f.read(), data1)
90
91    def test_read(self):
92        self.test_write()
93        # Try reading.
94        with gzip.GzipFile(self.filename, 'r') as f:
95            d = f.read()
96        self.assertEqual(d, data1*50)
97
98    def test_read_universal_newlines(self):
99        # Issue #5148: Reading breaks when mode contains 'U'.
100        self.test_write()
101        with gzip.GzipFile(self.filename, 'rU') as f:
102            d = f.read()
103        self.assertEqual(d, data1*50)
104
105    def test_io_on_closed_object(self):
106        # Test that I/O operations on closed GzipFile objects raise a
107        # ValueError, just like the corresponding functions on file objects.
108
109        # Write to a file, open it for reading, then close it.
110        self.test_write()
111        f = gzip.GzipFile(self.filename, 'r')
112        f.close()
113        with self.assertRaises(ValueError):
114            f.read(1)
115        with self.assertRaises(ValueError):
116            f.seek(0)
117        with self.assertRaises(ValueError):
118            f.tell()
119        # Open the file for writing, then close it.
120        f = gzip.GzipFile(self.filename, 'w')
121        f.close()
122        with self.assertRaises(ValueError):
123            f.write('')
124        with self.assertRaises(ValueError):
125            f.flush()
126
127    def test_append(self):
128        self.test_write()
129        # Append to the previous file
130        with gzip.GzipFile(self.filename, 'ab') as f:
131            f.write(data2 * 15)
132
133        with gzip.GzipFile(self.filename, 'rb') as f:
134            d = f.read()
135        self.assertEqual(d, (data1*50) + (data2*15))
136
137    def test_many_append(self):
138        # Bug #1074261 was triggered when reading a file that contained
139        # many, many members.  Create such a file and verify that reading it
140        # works.
141        with gzip.open(self.filename, 'wb', 9) as f:
142            f.write('a')
143        for i in range(0, 200):
144            with gzip.open(self.filename, "ab", 9) as f: # append
145                f.write('a')
146
147        # Try reading the file
148        with gzip.open(self.filename, "rb") as zgfile:
149            contents = ""
150            while 1:
151                ztxt = zgfile.read(8192)
152                contents += ztxt
153                if not ztxt: break
154        self.assertEqual(contents, 'a'*201)
155
156    def test_buffered_reader(self):
157        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
158        # performance.
159        self.test_write()
160
161        with gzip.GzipFile(self.filename, 'rb') as f:
162            with io.BufferedReader(f) as r:
163                lines = [line for line in r]
164
165        self.assertEqual(lines, 50 * data1.splitlines(True))
166
167    def test_readline(self):
168        self.test_write()
169        # Try .readline() with varying line lengths
170
171        with gzip.GzipFile(self.filename, 'rb') as f:
172            line_length = 0
173            while 1:
174                L = f.readline(line_length)
175                if not L and line_length != 0: break
176                self.assertTrue(len(L) <= line_length)
177                line_length = (line_length + 1) % 50
178
179    def test_readlines(self):
180        self.test_write()
181        # Try .readlines()
182
183        with gzip.GzipFile(self.filename, 'rb') as f:
184            L = f.readlines()
185
186        with gzip.GzipFile(self.filename, 'rb') as f:
187            while 1:
188                L = f.readlines(150)
189                if L == []: break
190
191    def test_seek_read(self):
192        self.test_write()
193        # Try seek, read test
194
195        with gzip.GzipFile(self.filename) as f:
196            while 1:
197                oldpos = f.tell()
198                line1 = f.readline()
199                if not line1: break
200                newpos = f.tell()
201                f.seek(oldpos)  # negative seek
202                if len(line1)>10:
203                    amount = 10
204                else:
205                    amount = len(line1)
206                line2 = f.read(amount)
207                self.assertEqual(line1[:amount], line2)
208                f.seek(newpos)  # positive seek
209
210    def test_seek_whence(self):
211        self.test_write()
212        # Try seek(whence=1), read test
213
214        with gzip.GzipFile(self.filename) as f:
215            f.read(10)
216            f.seek(10, whence=1)
217            y = f.read(10)
218        self.assertEqual(y, data1[20:30])
219
220    def test_seek_write(self):
221        # Try seek, write test
222        with gzip.GzipFile(self.filename, 'w') as f:
223            for pos in range(0, 256, 16):
224                f.seek(pos)
225                f.write('GZ\n')
226
227    def test_mode(self):
228        self.test_write()
229        with gzip.GzipFile(self.filename, 'r') as f:
230            self.assertEqual(f.myfileobj.mode, 'rb')
231
232    def test_1647484(self):
233        for mode in ('wb', 'rb'):
234            with gzip.GzipFile(self.filename, mode) as f:
235                self.assertTrue(hasattr(f, "name"))
236                self.assertEqual(f.name, self.filename)
237
238    def test_mtime(self):
239        mtime = 123456789
240        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
241            fWrite.write(data1)
242        with gzip.GzipFile(self.filename) as fRead:
243            dataRead = fRead.read()
244            self.assertEqual(dataRead, data1)
245            self.assertTrue(hasattr(fRead, 'mtime'))
246            self.assertEqual(fRead.mtime, mtime)
247
248    def test_metadata(self):
249        mtime = 123456789
250
251        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
252            fWrite.write(data1)
253
254        with open(self.filename, 'rb') as fRead:
255            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
256
257            idBytes = fRead.read(2)
258            self.assertEqual(idBytes, '\x1f\x8b') # gzip ID
259
260            cmByte = fRead.read(1)
261            self.assertEqual(cmByte, '\x08') # deflate
262
263            flagsByte = fRead.read(1)
264            self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set
265
266            mtimeBytes = fRead.read(4)
267            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
268
269            xflByte = fRead.read(1)
270            self.assertEqual(xflByte, '\x02') # maximum compression
271
272            osByte = fRead.read(1)
273            self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent)
274
275            # Since the FNAME flag is set, the zero-terminated filename follows.
276            # RFC 1952 specifies that this is the name of the input file, if any.
277            # However, the gzip module defaults to storing the name of the output
278            # file in this field.
279            expected = self.filename.encode('Latin-1') + '\x00'
280            nameBytes = fRead.read(len(expected))
281            self.assertEqual(nameBytes, expected)
282
283            # Since no other flags were set, the header ends here.
284            # Rather than process the compressed data, let's seek to the trailer.
285            fRead.seek(os.stat(self.filename).st_size - 8)
286
287            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
288            self.assertEqual(crc32Bytes, '\xaf\xd7d\x83')
289
290            isizeBytes = fRead.read(4)
291            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
292
293    def test_with_open(self):
294        # GzipFile supports the context management protocol
295        with gzip.GzipFile(self.filename, "wb") as f:
296            f.write(b"xxx")
297        f = gzip.GzipFile(self.filename, "rb")
298        f.close()
299        try:
300            with f:
301                pass
302        except ValueError:
303            pass
304        else:
305            self.fail("__enter__ on a closed file didn't raise an exception")
306        try:
307            with gzip.GzipFile(self.filename, "wb") as f:
308                1 // 0
309        except ZeroDivisionError:
310            pass
311        else:
312            self.fail("1 // 0 didn't raise an exception")
313
314    def test_zero_padded_file(self):
315        with gzip.GzipFile(self.filename, "wb") as f:
316            f.write(data1 * 50)
317
318        # Pad the file with zeroes
319        with open(self.filename, "ab") as f:
320            f.write("\x00" * 50)
321
322        with gzip.GzipFile(self.filename, "rb") as f:
323            d = f.read()
324            self.assertEqual(d, data1 * 50, "Incorrect data in file")
325
326    def test_fileobj_from_fdopen(self):
327        # Issue #13781: Creating a GzipFile using a fileobj from os.fdopen()
328        # should not embed the fake filename "<fdopen>" in the output file.
329        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
330        with os.fdopen(fd, "wb") as f:
331            with gzip.GzipFile(fileobj=f, mode="w") as g:
332                self.assertEqual(g.name, "")
333
334    def test_read_with_extra(self):
335        # Gzip data with an extra field
336        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
337                  b'\x05\x00Extra'
338                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
339        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
340            self.assertEqual(f.read(), b'Test')
341
342def test_main(verbose=None):
343    test_support.run_unittest(TestGzip)
344
345if __name__ == "__main__":
346    test_main(verbose=True)
347