• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the gzip module.
2"""
3
4import unittest
5from test import test_support
6import os
7import io
8import struct
9import tempfile
10gzip = test_support.import_module('gzip')
11
12data1 = """  int length=DEFAULTALLOC, err = Z_OK;
13  PyObject *RetVal;
14  int flushmode = Z_FINISH;
15  unsigned long start_total_out;
16
17"""
18
19data2 = """/* zlibmodule.c -- gzip-compatible data compression */
20/* See http://www.gzip.org/zlib/
21/* See http://www.winimage.com/zLibDll for Windows */
22"""
23
24
25class TestGzip(unittest.TestCase):
26    filename = test_support.TESTFN
27
28    def setUp(self):
29        test_support.unlink(self.filename)
30
31    def tearDown(self):
32        test_support.unlink(self.filename)
33
34    def write_and_read_back(self, data, mode='b'):
35        b_data = memoryview(data).tobytes()
36        with gzip.GzipFile(self.filename, 'w'+mode) as f:
37            l = f.write(data)
38        self.assertEqual(l, len(b_data))
39        with gzip.GzipFile(self.filename, 'r'+mode) as f:
40            self.assertEqual(f.read(), b_data)
41
42    @test_support.requires_unicode
43    def test_unicode_filename(self):
44        unicode_filename = test_support.TESTFN_UNICODE
45        try:
46            unicode_filename.encode(test_support.TESTFN_ENCODING)
47        except (UnicodeError, TypeError):
48            self.skipTest("Requires unicode filenames support")
49        self.filename = unicode_filename
50        with gzip.GzipFile(unicode_filename, "wb") as f:
51            f.write(data1 * 50)
52        with gzip.GzipFile(unicode_filename, "rb") as f:
53            self.assertEqual(f.read(), data1 * 50)
54        # Sanity check that we are actually operating on the right file.
55        with open(unicode_filename, 'rb') as fobj, \
56             gzip.GzipFile(fileobj=fobj, mode="rb") as f:
57            self.assertEqual(f.read(), data1 * 50)
58
59    def test_write(self):
60        with gzip.GzipFile(self.filename, 'wb') as f:
61            f.write(data1 * 50)
62
63            # Try flush and fileno.
64            f.flush()
65            f.fileno()
66            if hasattr(os, 'fsync'):
67                os.fsync(f.fileno())
68            f.close()
69
70        # Test multiple close() calls.
71        f.close()
72
73    # The following test_write_xy methods test that write accepts
74    # the corresponding bytes-like object type as input
75    # and that the data written equals bytes(xy) in all cases.
76    def test_write_memoryview(self):
77        self.write_and_read_back(memoryview(data1 * 50))
78
79    def test_write_incompatible_type(self):
80        # Test that non-bytes-like types raise TypeError.
81        # Issue #21560: attempts to write incompatible types
82        # should not affect the state of the fileobject
83        with gzip.GzipFile(self.filename, 'wb') as f:
84            with self.assertRaises(UnicodeEncodeError):
85                f.write(u'\xff')
86            with self.assertRaises(TypeError):
87                f.write([1])
88            f.write(data1)
89        with gzip.GzipFile(self.filename, 'rb') as f:
90            self.assertEqual(f.read(), data1)
91
92    def test_read(self):
93        self.test_write()
94        # Try reading.
95        with gzip.GzipFile(self.filename, 'r') as f:
96            d = f.read()
97        self.assertEqual(d, data1*50)
98
99    def test_read_universal_newlines(self):
100        # Issue #5148: Reading breaks when mode contains 'U'.
101        self.test_write()
102        with gzip.GzipFile(self.filename, 'rU') as f:
103            d = f.read()
104        self.assertEqual(d, data1*50)
105
106    def test_io_on_closed_object(self):
107        # Test that I/O operations on closed GzipFile objects raise a
108        # ValueError, just like the corresponding functions on file objects.
109
110        # Write to a file, open it for reading, then close it.
111        self.test_write()
112        f = gzip.GzipFile(self.filename, 'r')
113        f.close()
114        with self.assertRaises(ValueError):
115            f.read(1)
116        with self.assertRaises(ValueError):
117            f.seek(0)
118        with self.assertRaises(ValueError):
119            f.tell()
120        # Open the file for writing, then close it.
121        f = gzip.GzipFile(self.filename, 'w')
122        f.close()
123        with self.assertRaises(ValueError):
124            f.write('')
125        with self.assertRaises(ValueError):
126            f.flush()
127
128    def test_append(self):
129        self.test_write()
130        # Append to the previous file
131        with gzip.GzipFile(self.filename, 'ab') as f:
132            f.write(data2 * 15)
133
134        with gzip.GzipFile(self.filename, 'rb') as f:
135            d = f.read()
136        self.assertEqual(d, (data1*50) + (data2*15))
137
138    def test_many_append(self):
139        # Bug #1074261 was triggered when reading a file that contained
140        # many, many members.  Create such a file and verify that reading it
141        # works.
142        with gzip.open(self.filename, 'wb', 9) as f:
143            f.write('a')
144        for i in range(0, 200):
145            with gzip.open(self.filename, "ab", 9) as f: # append
146                f.write('a')
147
148        # Try reading the file
149        with gzip.open(self.filename, "rb") as zgfile:
150            contents = ""
151            while 1:
152                ztxt = zgfile.read(8192)
153                contents += ztxt
154                if not ztxt: break
155        self.assertEqual(contents, 'a'*201)
156
157    def test_buffered_reader(self):
158        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
159        # performance.
160        self.test_write()
161
162        with gzip.GzipFile(self.filename, 'rb') as f:
163            with io.BufferedReader(f) as r:
164                lines = [line for line in r]
165
166        self.assertEqual(lines, 50 * data1.splitlines(True))
167
168    def test_readline(self):
169        self.test_write()
170        # Try .readline() with varying line lengths
171
172        with gzip.GzipFile(self.filename, 'rb') as f:
173            line_length = 0
174            while 1:
175                L = f.readline(line_length)
176                if not L and line_length != 0: break
177                self.assertTrue(len(L) <= line_length)
178                line_length = (line_length + 1) % 50
179
180    def test_readlines(self):
181        self.test_write()
182        # Try .readlines()
183
184        with gzip.GzipFile(self.filename, 'rb') as f:
185            L = f.readlines()
186
187        with gzip.GzipFile(self.filename, 'rb') as f:
188            while 1:
189                L = f.readlines(150)
190                if L == []: break
191
192    def test_seek_read(self):
193        self.test_write()
194        # Try seek, read test
195
196        with gzip.GzipFile(self.filename) as f:
197            while 1:
198                oldpos = f.tell()
199                line1 = f.readline()
200                if not line1: break
201                newpos = f.tell()
202                f.seek(oldpos)  # negative seek
203                if len(line1)>10:
204                    amount = 10
205                else:
206                    amount = len(line1)
207                line2 = f.read(amount)
208                self.assertEqual(line1[:amount], line2)
209                f.seek(newpos)  # positive seek
210
211    def test_seek_whence(self):
212        self.test_write()
213        # Try seek(whence=1), read test
214
215        with gzip.GzipFile(self.filename) as f:
216            f.read(10)
217            f.seek(10, whence=1)
218            y = f.read(10)
219        self.assertEqual(y, data1[20:30])
220
221    def test_seek_write(self):
222        # Try seek, write test
223        with gzip.GzipFile(self.filename, 'w') as f:
224            for pos in range(0, 256, 16):
225                f.seek(pos)
226                f.write('GZ\n')
227
228    def test_mode(self):
229        self.test_write()
230        with gzip.GzipFile(self.filename, 'r') as f:
231            self.assertEqual(f.myfileobj.mode, 'rb')
232
233    def test_1647484(self):
234        for mode in ('wb', 'rb'):
235            with gzip.GzipFile(self.filename, mode) as f:
236                self.assertTrue(hasattr(f, "name"))
237                self.assertEqual(f.name, self.filename)
238
239    def test_mtime(self):
240        mtime = 123456789
241        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
242            fWrite.write(data1)
243        with gzip.GzipFile(self.filename) as fRead:
244            dataRead = fRead.read()
245            self.assertEqual(dataRead, data1)
246            self.assertTrue(hasattr(fRead, 'mtime'))
247            self.assertEqual(fRead.mtime, mtime)
248
249    def test_metadata(self):
250        mtime = 123456789
251
252        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
253            fWrite.write(data1)
254
255        with open(self.filename, 'rb') as fRead:
256            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
257
258            idBytes = fRead.read(2)
259            self.assertEqual(idBytes, '\x1f\x8b') # gzip ID
260
261            cmByte = fRead.read(1)
262            self.assertEqual(cmByte, '\x08') # deflate
263
264            flagsByte = fRead.read(1)
265            self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set
266
267            mtimeBytes = fRead.read(4)
268            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
269
270            xflByte = fRead.read(1)
271            self.assertEqual(xflByte, '\x02') # maximum compression
272
273            osByte = fRead.read(1)
274            self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent)
275
276            # Since the FNAME flag is set, the zero-terminated filename follows.
277            # RFC 1952 specifies that this is the name of the input file, if any.
278            # However, the gzip module defaults to storing the name of the output
279            # file in this field.
280            expected = self.filename.encode('Latin-1') + '\x00'
281            nameBytes = fRead.read(len(expected))
282            self.assertEqual(nameBytes, expected)
283
284            # Since no other flags were set, the header ends here.
285            # Rather than process the compressed data, let's seek to the trailer.
286            fRead.seek(os.stat(self.filename).st_size - 8)
287
288            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
289            self.assertEqual(crc32Bytes, '\xaf\xd7d\x83')
290
291            isizeBytes = fRead.read(4)
292            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
293
294    def test_with_open(self):
295        # GzipFile supports the context management protocol
296        with gzip.GzipFile(self.filename, "wb") as f:
297            f.write(b"xxx")
298        f = gzip.GzipFile(self.filename, "rb")
299        f.close()
300        try:
301            with f:
302                pass
303        except ValueError:
304            pass
305        else:
306            self.fail("__enter__ on a closed file didn't raise an exception")
307        try:
308            with gzip.GzipFile(self.filename, "wb") as f:
309                1 // 0
310        except ZeroDivisionError:
311            pass
312        else:
313            self.fail("1 // 0 didn't raise an exception")
314
315    def test_zero_padded_file(self):
316        with gzip.GzipFile(self.filename, "wb") as f:
317            f.write(data1 * 50)
318
319        # Pad the file with zeroes
320        with open(self.filename, "ab") as f:
321            f.write("\x00" * 50)
322
323        with gzip.GzipFile(self.filename, "rb") as f:
324            d = f.read()
325            self.assertEqual(d, data1 * 50, "Incorrect data in file")
326
327    def test_fileobj_from_fdopen(self):
328        # Issue #13781: Creating a GzipFile using a fileobj from os.fdopen()
329        # should not embed the fake filename "<fdopen>" in the output file.
330        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
331        with os.fdopen(fd, "wb") as f:
332            with gzip.GzipFile(fileobj=f, mode="w") as g:
333                self.assertEqual(g.name, "")
334
335    def test_fileobj_from_io_open(self):
336        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
337        with io.open(fd, "wb") as f:
338            with gzip.GzipFile(fileobj=f, mode="w") as g:
339                self.assertEqual(g.name, "")
340
341    def test_fileobj_mode(self):
342        gzip.GzipFile(self.filename, "wb").close()
343        with open(self.filename, "r+b") as f:
344            with gzip.GzipFile(fileobj=f, mode='r') as g:
345                self.assertEqual(g.mode, gzip.READ)
346            with gzip.GzipFile(fileobj=f, mode='w') as g:
347                self.assertEqual(g.mode, gzip.WRITE)
348            with gzip.GzipFile(fileobj=f, mode='a') as g:
349                self.assertEqual(g.mode, gzip.WRITE)
350            with self.assertRaises(IOError):
351                gzip.GzipFile(fileobj=f, mode='z')
352        for mode in "rb", "r+b":
353            with open(self.filename, mode) as f:
354                with gzip.GzipFile(fileobj=f) as g:
355                    self.assertEqual(g.mode, gzip.READ)
356        for mode in "wb", "ab":
357            with open(self.filename, mode) as f:
358                with gzip.GzipFile(fileobj=f) as g:
359                    self.assertEqual(g.mode, gzip.WRITE)
360
361    def test_read_with_extra(self):
362        # Gzip data with an extra field
363        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
364                  b'\x05\x00Extra'
365                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
366        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
367            self.assertEqual(f.read(), b'Test')
368
369    def test_fileobj_without_name(self):
370        # Issue #33038: GzipFile should not assume that file objects that have
371        # a .name attribute use a non-None value.
372        with tempfile.SpooledTemporaryFile() as f:
373            with gzip.GzipFile(fileobj=f, mode='wb') as archive:
374                archive.write(b'data')
375                self.assertEqual(archive.name, '')
376
377def test_main(verbose=None):
378    test_support.run_unittest(TestGzip)
379
380if __name__ == "__main__":
381    test_main(verbose=True)
382