• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the gzip module.
2"""
3
4import array
5import functools
6import io
7import os
8import struct
9import sys
10import unittest
11from subprocess import PIPE, Popen
12from test.support import import_helper
13from test.support import os_helper
14from test.support import _4G, bigmemtest, requires_subprocess
15from test.support.script_helper import assert_python_ok, assert_python_failure
16
17gzip = import_helper.import_module('gzip')
18zlib = import_helper.import_module('zlib')
19
20data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
21  PyObject *RetVal;
22  int flushmode = Z_FINISH;
23  unsigned long start_total_out;
24
25"""
26
27data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
28/* See http://www.gzip.org/zlib/
29/* See http://www.winimage.com/zLibDll for Windows */
30"""
31
32
33TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
34
35
36class UnseekableIO(io.BytesIO):
37    def seekable(self):
38        return False
39
40    def tell(self):
41        raise io.UnsupportedOperation
42
43    def seek(self, *args):
44        raise io.UnsupportedOperation
45
46
47class BaseTest(unittest.TestCase):
48    filename = os_helper.TESTFN
49
50    def setUp(self):
51        os_helper.unlink(self.filename)
52
53    def tearDown(self):
54        os_helper.unlink(self.filename)
55
56
57class TestGzip(BaseTest):
58    def write_and_read_back(self, data, mode='b'):
59        b_data = bytes(data)
60        with gzip.GzipFile(self.filename, 'w'+mode) as f:
61            l = f.write(data)
62        self.assertEqual(l, len(b_data))
63        with gzip.GzipFile(self.filename, 'r'+mode) as f:
64            self.assertEqual(f.read(), b_data)
65
66    def test_write(self):
67        with gzip.GzipFile(self.filename, 'wb') as f:
68            f.write(data1 * 50)
69
70            # Try flush and fileno.
71            f.flush()
72            f.fileno()
73            if hasattr(os, 'fsync'):
74                os.fsync(f.fileno())
75            f.close()
76
77        # Test multiple close() calls.
78        f.close()
79
80    def test_write_read_with_pathlike_file(self):
81        filename = os_helper.FakePath(self.filename)
82        with gzip.GzipFile(filename, 'w') as f:
83            f.write(data1 * 50)
84        self.assertIsInstance(f.name, str)
85        self.assertEqual(f.name, self.filename)
86        with gzip.GzipFile(filename, 'a') as f:
87            f.write(data1)
88        with gzip.GzipFile(filename) as f:
89            d = f.read()
90        self.assertEqual(d, data1 * 51)
91        self.assertIsInstance(f.name, str)
92        self.assertEqual(f.name, self.filename)
93
94    # The following test_write_xy methods test that write accepts
95    # the corresponding bytes-like object type as input
96    # and that the data written equals bytes(xy) in all cases.
97    def test_write_memoryview(self):
98        self.write_and_read_back(memoryview(data1 * 50))
99        m = memoryview(bytes(range(256)))
100        data = m.cast('B', shape=[8,8,4])
101        self.write_and_read_back(data)
102
103    def test_write_bytearray(self):
104        self.write_and_read_back(bytearray(data1 * 50))
105
106    def test_write_array(self):
107        self.write_and_read_back(array.array('I', data1 * 40))
108
109    def test_write_incompatible_type(self):
110        # Test that non-bytes-like types raise TypeError.
111        # Issue #21560: attempts to write incompatible types
112        # should not affect the state of the fileobject
113        with gzip.GzipFile(self.filename, 'wb') as f:
114            with self.assertRaises(TypeError):
115                f.write('')
116            with self.assertRaises(TypeError):
117                f.write([])
118            f.write(data1)
119        with gzip.GzipFile(self.filename, 'rb') as f:
120            self.assertEqual(f.read(), data1)
121
122    def test_read(self):
123        self.test_write()
124        # Try reading.
125        with gzip.GzipFile(self.filename, 'r') as f:
126            d = f.read()
127        self.assertEqual(d, data1*50)
128
129    def test_read1(self):
130        self.test_write()
131        blocks = []
132        nread = 0
133        with gzip.GzipFile(self.filename, 'r') as f:
134            while True:
135                d = f.read1()
136                if not d:
137                    break
138                blocks.append(d)
139                nread += len(d)
140                # Check that position was updated correctly (see issue10791).
141                self.assertEqual(f.tell(), nread)
142        self.assertEqual(b''.join(blocks), data1 * 50)
143
144    @bigmemtest(size=_4G, memuse=1)
145    def test_read_large(self, size):
146        # Read chunk size over UINT_MAX should be supported, despite zlib's
147        # limitation per low-level call
148        compressed = gzip.compress(data1, compresslevel=1)
149        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
150        self.assertEqual(f.read(size), data1)
151
152    def test_io_on_closed_object(self):
153        # Test that I/O operations on closed GzipFile objects raise a
154        # ValueError, just like the corresponding functions on file objects.
155
156        # Write to a file, open it for reading, then close it.
157        self.test_write()
158        f = gzip.GzipFile(self.filename, 'r')
159        fileobj = f.fileobj
160        self.assertFalse(fileobj.closed)
161        f.close()
162        self.assertTrue(fileobj.closed)
163        with self.assertRaises(ValueError):
164            f.read(1)
165        with self.assertRaises(ValueError):
166            f.seek(0)
167        with self.assertRaises(ValueError):
168            f.tell()
169        # Open the file for writing, then close it.
170        f = gzip.GzipFile(self.filename, 'w')
171        fileobj = f.fileobj
172        self.assertFalse(fileobj.closed)
173        f.close()
174        self.assertTrue(fileobj.closed)
175        with self.assertRaises(ValueError):
176            f.write(b'')
177        with self.assertRaises(ValueError):
178            f.flush()
179
180    def test_append(self):
181        self.test_write()
182        # Append to the previous file
183        with gzip.GzipFile(self.filename, 'ab') as f:
184            f.write(data2 * 15)
185
186        with gzip.GzipFile(self.filename, 'rb') as f:
187            d = f.read()
188        self.assertEqual(d, (data1*50) + (data2*15))
189
190    def test_many_append(self):
191        # Bug #1074261 was triggered when reading a file that contained
192        # many, many members.  Create such a file and verify that reading it
193        # works.
194        with gzip.GzipFile(self.filename, 'wb', 9) as f:
195            f.write(b'a')
196        for i in range(0, 200):
197            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
198                f.write(b'a')
199
200        # Try reading the file
201        with gzip.GzipFile(self.filename, "rb") as zgfile:
202            contents = b""
203            while 1:
204                ztxt = zgfile.read(8192)
205                contents += ztxt
206                if not ztxt: break
207        self.assertEqual(contents, b'a'*201)
208
209    def test_exclusive_write(self):
210        with gzip.GzipFile(self.filename, 'xb') as f:
211            f.write(data1 * 50)
212        with gzip.GzipFile(self.filename, 'rb') as f:
213            self.assertEqual(f.read(), data1 * 50)
214        with self.assertRaises(FileExistsError):
215            gzip.GzipFile(self.filename, 'xb')
216
217    def test_buffered_reader(self):
218        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
219        # performance.
220        self.test_write()
221
222        with gzip.GzipFile(self.filename, 'rb') as f:
223            with io.BufferedReader(f) as r:
224                lines = [line for line in r]
225
226        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
227
228    def test_readline(self):
229        self.test_write()
230        # Try .readline() with varying line lengths
231
232        with gzip.GzipFile(self.filename, 'rb') as f:
233            line_length = 0
234            while 1:
235                L = f.readline(line_length)
236                if not L and line_length != 0: break
237                self.assertTrue(len(L) <= line_length)
238                line_length = (line_length + 1) % 50
239
240    def test_readlines(self):
241        self.test_write()
242        # Try .readlines()
243
244        with gzip.GzipFile(self.filename, 'rb') as f:
245            L = f.readlines()
246
247        with gzip.GzipFile(self.filename, 'rb') as f:
248            while 1:
249                L = f.readlines(150)
250                if L == []: break
251
252    def test_seek_read(self):
253        self.test_write()
254        # Try seek, read test
255
256        with gzip.GzipFile(self.filename) as f:
257            while 1:
258                oldpos = f.tell()
259                line1 = f.readline()
260                if not line1: break
261                newpos = f.tell()
262                f.seek(oldpos)  # negative seek
263                if len(line1)>10:
264                    amount = 10
265                else:
266                    amount = len(line1)
267                line2 = f.read(amount)
268                self.assertEqual(line1[:amount], line2)
269                f.seek(newpos)  # positive seek
270
271    def test_seek_whence(self):
272        self.test_write()
273        # Try seek(whence=1), read test
274
275        with gzip.GzipFile(self.filename) as f:
276            f.read(10)
277            f.seek(10, whence=1)
278            y = f.read(10)
279        self.assertEqual(y, data1[20:30])
280
281    def test_seek_write(self):
282        # Try seek, write test
283        with gzip.GzipFile(self.filename, 'w') as f:
284            for pos in range(0, 256, 16):
285                f.seek(pos)
286                f.write(b'GZ\n')
287
288    def test_mode(self):
289        self.test_write()
290        with gzip.GzipFile(self.filename, 'r') as f:
291            self.assertEqual(f.myfileobj.mode, 'rb')
292        os_helper.unlink(self.filename)
293        with gzip.GzipFile(self.filename, 'x') as f:
294            self.assertEqual(f.myfileobj.mode, 'xb')
295
296    def test_1647484(self):
297        for mode in ('wb', 'rb'):
298            with gzip.GzipFile(self.filename, mode) as f:
299                self.assertTrue(hasattr(f, "name"))
300                self.assertEqual(f.name, self.filename)
301
302    def test_paddedfile_getattr(self):
303        self.test_write()
304        with gzip.GzipFile(self.filename, 'rb') as f:
305            self.assertTrue(hasattr(f.fileobj, "name"))
306            self.assertEqual(f.fileobj.name, self.filename)
307
308    def test_mtime(self):
309        mtime = 123456789
310        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
311            fWrite.write(data1)
312        with gzip.GzipFile(self.filename) as fRead:
313            self.assertTrue(hasattr(fRead, 'mtime'))
314            self.assertIsNone(fRead.mtime)
315            dataRead = fRead.read()
316            self.assertEqual(dataRead, data1)
317            self.assertEqual(fRead.mtime, mtime)
318
319    def test_metadata(self):
320        mtime = 123456789
321
322        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
323            fWrite.write(data1)
324
325        with open(self.filename, 'rb') as fRead:
326            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
327
328            idBytes = fRead.read(2)
329            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
330
331            cmByte = fRead.read(1)
332            self.assertEqual(cmByte, b'\x08') # deflate
333
334            try:
335                expectedname = self.filename.encode('Latin-1') + b'\x00'
336                expectedflags = b'\x08' # only the FNAME flag is set
337            except UnicodeEncodeError:
338                expectedname = b''
339                expectedflags = b'\x00'
340
341            flagsByte = fRead.read(1)
342            self.assertEqual(flagsByte, expectedflags)
343
344            mtimeBytes = fRead.read(4)
345            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
346
347            xflByte = fRead.read(1)
348            self.assertEqual(xflByte, b'\x02') # maximum compression
349
350            osByte = fRead.read(1)
351            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
352
353            # Since the FNAME flag is set, the zero-terminated filename follows.
354            # RFC 1952 specifies that this is the name of the input file, if any.
355            # However, the gzip module defaults to storing the name of the output
356            # file in this field.
357            nameBytes = fRead.read(len(expectedname))
358            self.assertEqual(nameBytes, expectedname)
359
360            # Since no other flags were set, the header ends here.
361            # Rather than process the compressed data, let's seek to the trailer.
362            fRead.seek(os.stat(self.filename).st_size - 8)
363
364            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
365            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
366
367            isizeBytes = fRead.read(4)
368            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
369
370    def test_metadata_ascii_name(self):
371        self.filename = os_helper.TESTFN_ASCII
372        self.test_metadata()
373
374    def test_compresslevel_metadata(self):
375        # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
376        # specifically, discussion of XFL in section 2.3.1
377        cases = [
378            ('fast', 1, b'\x04'),
379            ('best', 9, b'\x02'),
380            ('tradeoff', 6, b'\x00'),
381        ]
382        xflOffset = 8
383
384        for (name, level, expectedXflByte) in cases:
385            with self.subTest(name):
386                fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
387                with fWrite:
388                    fWrite.write(data1)
389                with open(self.filename, 'rb') as fRead:
390                    fRead.seek(xflOffset)
391                    xflByte = fRead.read(1)
392                    self.assertEqual(xflByte, expectedXflByte)
393
394    def test_with_open(self):
395        # GzipFile supports the context management protocol
396        with gzip.GzipFile(self.filename, "wb") as f:
397            f.write(b"xxx")
398        f = gzip.GzipFile(self.filename, "rb")
399        f.close()
400        try:
401            with f:
402                pass
403        except ValueError:
404            pass
405        else:
406            self.fail("__enter__ on a closed file didn't raise an exception")
407        try:
408            with gzip.GzipFile(self.filename, "wb") as f:
409                1/0
410        except ZeroDivisionError:
411            pass
412        else:
413            self.fail("1/0 didn't raise an exception")
414
415    def test_zero_padded_file(self):
416        with gzip.GzipFile(self.filename, "wb") as f:
417            f.write(data1 * 50)
418
419        # Pad the file with zeroes
420        with open(self.filename, "ab") as f:
421            f.write(b"\x00" * 50)
422
423        with gzip.GzipFile(self.filename, "rb") as f:
424            d = f.read()
425            self.assertEqual(d, data1 * 50, "Incorrect data in file")
426
427    def test_gzip_BadGzipFile_exception(self):
428        self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
429
430    def test_bad_gzip_file(self):
431        with open(self.filename, 'wb') as file:
432            file.write(data1 * 50)
433        with gzip.GzipFile(self.filename, 'r') as file:
434            self.assertRaises(gzip.BadGzipFile, file.readlines)
435
436    def test_non_seekable_file(self):
437        uncompressed = data1 * 50
438        buf = UnseekableIO()
439        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
440            f.write(uncompressed)
441        compressed = buf.getvalue()
442        buf = UnseekableIO(compressed)
443        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
444            self.assertEqual(f.read(), uncompressed)
445
446    def test_peek(self):
447        uncompressed = data1 * 200
448        with gzip.GzipFile(self.filename, "wb") as f:
449            f.write(uncompressed)
450
451        def sizes():
452            while True:
453                for n in range(5, 50, 10):
454                    yield n
455
456        with gzip.GzipFile(self.filename, "rb") as f:
457            f.max_read_chunk = 33
458            nread = 0
459            for n in sizes():
460                s = f.peek(n)
461                if s == b'':
462                    break
463                self.assertEqual(f.read(len(s)), s)
464                nread += len(s)
465            self.assertEqual(f.read(100), b'')
466            self.assertEqual(nread, len(uncompressed))
467
468    def test_textio_readlines(self):
469        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
470        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
471        self.test_write()
472        with gzip.GzipFile(self.filename, 'r') as f:
473            with io.TextIOWrapper(f, encoding="ascii") as t:
474                self.assertEqual(t.readlines(), lines)
475
476    def test_fileobj_with_name(self):
477        with open(self.filename, "xb") as raw:
478            with gzip.GzipFile(fileobj=raw, mode="x") as f:
479                f.write(b'one')
480                self.assertEqual(f.name, raw.name)
481                self.assertEqual(f.fileno(), raw.fileno())
482                self.assertEqual(f.mode, gzip.WRITE)
483                self.assertIs(f.readable(), False)
484                self.assertIs(f.writable(), True)
485                self.assertIs(f.seekable(), True)
486                self.assertIs(f.closed, False)
487            self.assertIs(f.closed, True)
488            self.assertEqual(f.name, raw.name)
489            self.assertRaises(AttributeError, f.fileno)
490            self.assertEqual(f.mode, gzip.WRITE)
491            self.assertIs(f.readable(), False)
492            self.assertIs(f.writable(), True)
493            self.assertIs(f.seekable(), True)
494
495        with open(self.filename, "wb") as raw:
496            with gzip.GzipFile(fileobj=raw, mode="w") as f:
497                f.write(b'two')
498                self.assertEqual(f.name, raw.name)
499                self.assertEqual(f.fileno(), raw.fileno())
500                self.assertEqual(f.mode, gzip.WRITE)
501                self.assertIs(f.readable(), False)
502                self.assertIs(f.writable(), True)
503                self.assertIs(f.seekable(), True)
504                self.assertIs(f.closed, False)
505            self.assertIs(f.closed, True)
506            self.assertEqual(f.name, raw.name)
507            self.assertRaises(AttributeError, f.fileno)
508            self.assertEqual(f.mode, gzip.WRITE)
509            self.assertIs(f.readable(), False)
510            self.assertIs(f.writable(), True)
511            self.assertIs(f.seekable(), True)
512
513        with open(self.filename, "ab") as raw:
514            with gzip.GzipFile(fileobj=raw, mode="a") as f:
515                f.write(b'three')
516                self.assertEqual(f.name, raw.name)
517                self.assertEqual(f.fileno(), raw.fileno())
518                self.assertEqual(f.mode, gzip.WRITE)
519                self.assertIs(f.readable(), False)
520                self.assertIs(f.writable(), True)
521                self.assertIs(f.seekable(), True)
522                self.assertIs(f.closed, False)
523            self.assertIs(f.closed, True)
524            self.assertEqual(f.name, raw.name)
525            self.assertRaises(AttributeError, f.fileno)
526            self.assertEqual(f.mode, gzip.WRITE)
527            self.assertIs(f.readable(), False)
528            self.assertIs(f.writable(), True)
529            self.assertIs(f.seekable(), True)
530
531        with open(self.filename, "rb") as raw:
532            with gzip.GzipFile(fileobj=raw, mode="r") as f:
533                self.assertEqual(f.read(), b'twothree')
534                self.assertEqual(f.name, raw.name)
535                self.assertEqual(f.fileno(), raw.fileno())
536                self.assertEqual(f.mode, gzip.READ)
537                self.assertIs(f.readable(), True)
538                self.assertIs(f.writable(), False)
539                self.assertIs(f.seekable(), True)
540                self.assertIs(f.closed, False)
541            self.assertIs(f.closed, True)
542            self.assertEqual(f.name, raw.name)
543            self.assertRaises(AttributeError, f.fileno)
544            self.assertEqual(f.mode, gzip.READ)
545            self.assertIs(f.readable(), True)
546            self.assertIs(f.writable(), False)
547            self.assertIs(f.seekable(), True)
548
549    def test_fileobj_from_fdopen(self):
550        # Issue #13781: Opening a GzipFile for writing fails when using a
551        # fileobj created with os.fdopen().
552        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
553        with os.fdopen(fd, "xb") as raw:
554            with gzip.GzipFile(fileobj=raw, mode="x") as f:
555                f.write(b'one')
556                self.assertEqual(f.name, '')
557                self.assertEqual(f.fileno(), raw.fileno())
558            self.assertIs(f.closed, True)
559            self.assertEqual(f.name, '')
560            self.assertRaises(AttributeError, f.fileno)
561
562        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
563        with os.fdopen(fd, "wb") as raw:
564            with gzip.GzipFile(fileobj=raw, mode="w") as f:
565                f.write(b'two')
566                self.assertEqual(f.name, '')
567                self.assertEqual(f.fileno(), raw.fileno())
568            self.assertEqual(f.name, '')
569            self.assertRaises(AttributeError, f.fileno)
570
571        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_APPEND)
572        with os.fdopen(fd, "ab") as raw:
573            with gzip.GzipFile(fileobj=raw, mode="a") as f:
574                f.write(b'three')
575                self.assertEqual(f.name, '')
576                self.assertEqual(f.fileno(), raw.fileno())
577            self.assertEqual(f.name, '')
578            self.assertRaises(AttributeError, f.fileno)
579
580        fd = os.open(self.filename, os.O_RDONLY)
581        with os.fdopen(fd, "rb") as raw:
582            with gzip.GzipFile(fileobj=raw, mode="r") as f:
583                self.assertEqual(f.read(), b'twothree')
584                self.assertEqual(f.name, '')
585                self.assertEqual(f.fileno(), raw.fileno())
586            self.assertEqual(f.name, '')
587            self.assertRaises(AttributeError, f.fileno)
588
589    def test_fileobj_mode(self):
590        self.assertEqual(gzip.READ, 'rb')
591        self.assertEqual(gzip.WRITE, 'wb')
592        gzip.GzipFile(self.filename, "wb").close()
593        with open(self.filename, "r+b") as f:
594            with gzip.GzipFile(fileobj=f, mode='r') as g:
595                self.assertEqual(g.mode, gzip.READ)
596            with gzip.GzipFile(fileobj=f, mode='w') as g:
597                self.assertEqual(g.mode, gzip.WRITE)
598            with gzip.GzipFile(fileobj=f, mode='a') as g:
599                self.assertEqual(g.mode, gzip.WRITE)
600            with gzip.GzipFile(fileobj=f, mode='x') as g:
601                self.assertEqual(g.mode, gzip.WRITE)
602            with self.assertRaises(ValueError):
603                gzip.GzipFile(fileobj=f, mode='z')
604        for mode in "rb", "r+b":
605            with open(self.filename, mode) as f:
606                with gzip.GzipFile(fileobj=f) as g:
607                    self.assertEqual(g.mode, gzip.READ)
608        for mode in "wb", "ab", "xb":
609            if "x" in mode:
610                os_helper.unlink(self.filename)
611            with open(self.filename, mode) as f:
612                with self.assertWarns(FutureWarning):
613                    g = gzip.GzipFile(fileobj=f)
614                with g:
615                    self.assertEqual(g.mode, gzip.WRITE)
616
617    def test_bytes_filename(self):
618        str_filename = self.filename
619        bytes_filename = os.fsencode(str_filename)
620        with gzip.GzipFile(bytes_filename, "wb") as f:
621            f.write(data1 * 50)
622        self.assertEqual(f.name, bytes_filename)
623        with gzip.GzipFile(bytes_filename, "rb") as f:
624            self.assertEqual(f.read(), data1 * 50)
625        self.assertEqual(f.name, bytes_filename)
626        # Sanity check that we are actually operating on the right file.
627        with gzip.GzipFile(str_filename, "rb") as f:
628            self.assertEqual(f.read(), data1 * 50)
629        self.assertEqual(f.name, str_filename)
630
631    def test_fileobj_without_name(self):
632        bio = io.BytesIO()
633        with gzip.GzipFile(fileobj=bio, mode='wb') as f:
634            f.write(data1 * 50)
635            self.assertEqual(f.name, '')
636            self.assertRaises(io.UnsupportedOperation, f.fileno)
637            self.assertEqual(f.mode, gzip.WRITE)
638            self.assertIs(f.readable(), False)
639            self.assertIs(f.writable(), True)
640            self.assertIs(f.seekable(), True)
641            self.assertIs(f.closed, False)
642        self.assertIs(f.closed, True)
643        self.assertEqual(f.name, '')
644        self.assertRaises(AttributeError, f.fileno)
645        self.assertEqual(f.mode, gzip.WRITE)
646        self.assertIs(f.readable(), False)
647        self.assertIs(f.writable(), True)
648        self.assertIs(f.seekable(), True)
649
650        bio.seek(0)
651        with gzip.GzipFile(fileobj=bio, mode='rb') as f:
652            self.assertEqual(f.read(), data1 * 50)
653            self.assertEqual(f.name, '')
654            self.assertRaises(io.UnsupportedOperation, f.fileno)
655            self.assertEqual(f.mode, gzip.READ)
656            self.assertIs(f.readable(), True)
657            self.assertIs(f.writable(), False)
658            self.assertIs(f.seekable(), True)
659            self.assertIs(f.closed, False)
660        self.assertIs(f.closed, True)
661        self.assertEqual(f.name, '')
662        self.assertRaises(AttributeError, f.fileno)
663        self.assertEqual(f.mode, gzip.READ)
664        self.assertIs(f.readable(), True)
665        self.assertIs(f.writable(), False)
666        self.assertIs(f.seekable(), True)
667
668    def test_fileobj_and_filename(self):
669        filename2 = self.filename + 'new'
670        with (open(self.filename, 'wb') as fileobj,
671              gzip.GzipFile(fileobj=fileobj, filename=filename2, mode='wb') as f):
672            f.write(data1 * 50)
673            self.assertEqual(f.name, filename2)
674        with (open(self.filename, 'rb') as fileobj,
675              gzip.GzipFile(fileobj=fileobj, filename=filename2, mode='rb') as f):
676            self.assertEqual(f.read(), data1 * 50)
677            self.assertEqual(f.name, filename2)
678        # Sanity check that we are actually operating on the right file.
679        with gzip.GzipFile(self.filename, 'rb') as f:
680            self.assertEqual(f.read(), data1 * 50)
681            self.assertEqual(f.name, self.filename)
682
683    def test_decompress_limited(self):
684        """Decompressed data buffering should be limited"""
685        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
686        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
687
688        bomb = io.BytesIO(bomb)
689        decomp = gzip.GzipFile(fileobj=bomb)
690        self.assertEqual(decomp.read(1), b'\0')
691        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
692        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
693            "Excessive amount of data was decompressed")
694
695    # Testing compress/decompress shortcut functions
696
697    def test_compress(self):
698        for data in [data1, data2]:
699            for args in [(), (1,), (6,), (9,)]:
700                datac = gzip.compress(data, *args)
701                self.assertEqual(type(datac), bytes)
702                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
703                    self.assertEqual(f.read(), data)
704
705    def test_compress_mtime(self):
706        mtime = 123456789
707        for data in [data1, data2]:
708            for args in [(), (1,), (6,), (9,)]:
709                with self.subTest(data=data, args=args):
710                    datac = gzip.compress(data, *args, mtime=mtime)
711                    self.assertEqual(type(datac), bytes)
712                    with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
713                        f.read(1) # to set mtime attribute
714                        self.assertEqual(f.mtime, mtime)
715
716    def test_compress_correct_level(self):
717        for mtime in (0, 42):
718            with self.subTest(mtime=mtime):
719                nocompress = gzip.compress(data1, compresslevel=0, mtime=mtime)
720                yescompress = gzip.compress(data1, compresslevel=1, mtime=mtime)
721                self.assertIn(data1, nocompress)
722                self.assertNotIn(data1, yescompress)
723
724    def test_issue112346(self):
725        # The OS byte should be 255, this should not change between Python versions.
726        for mtime in (0, 42):
727            with self.subTest(mtime=mtime):
728                compress = gzip.compress(data1, compresslevel=1, mtime=mtime)
729                self.assertEqual(
730                    struct.unpack("<IxB", compress[4:10]),
731                    (mtime, 255),
732                    "Gzip header does not properly set either mtime or OS byte."
733                )
734
735    def test_decompress(self):
736        for data in (data1, data2):
737            buf = io.BytesIO()
738            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
739                f.write(data)
740            self.assertEqual(gzip.decompress(buf.getvalue()), data)
741            # Roundtrip with compress
742            datac = gzip.compress(data)
743            self.assertEqual(gzip.decompress(datac), data)
744
745    def test_decompress_truncated_trailer(self):
746        compressed_data = gzip.compress(data1)
747        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4])
748
749    def test_decompress_missing_trailer(self):
750        compressed_data = gzip.compress(data1)
751        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
752
753    def test_read_truncated(self):
754        data = data1*50
755        # Drop the CRC (4 bytes) and file size (4 bytes).
756        truncated = gzip.compress(data)[:-8]
757        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
758            self.assertRaises(EOFError, f.read)
759        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
760            self.assertEqual(f.read(len(data)), data)
761            self.assertRaises(EOFError, f.read, 1)
762        # Incomplete 10-byte header.
763        for i in range(2, 10):
764            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
765                self.assertRaises(EOFError, f.read, 1)
766
767    def test_read_with_extra(self):
768        # Gzip data with an extra field
769        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
770                  b'\x05\x00Extra'
771                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
772        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
773            self.assertEqual(f.read(), b'Test')
774
775    def test_prepend_error(self):
776        # See issue #20875
777        with gzip.open(self.filename, "wb") as f:
778            f.write(data1)
779        with gzip.open(self.filename, "rb") as f:
780            f._buffer.raw._fp.prepend()
781
782    def test_issue44439(self):
783        q = array.array('Q', [1, 2, 3, 4, 5])
784        LENGTH = len(q) * q.itemsize
785
786        with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
787            self.assertEqual(f.write(q), LENGTH)
788            self.assertEqual(f.tell(), LENGTH)
789
790    def test_flush_flushes_compressor(self):
791        # See issue GH-105808.
792        b = io.BytesIO()
793        message = b"important message here."
794        with gzip.GzipFile(fileobj=b, mode='w') as f:
795            f.write(message)
796            f.flush()
797            partial_data = b.getvalue()
798        full_data = b.getvalue()
799        self.assertEqual(gzip.decompress(full_data), message)
800        # The partial data should contain the gzip header and the complete
801        # message, but not the end-of-stream markers (so we can't just
802        # decompress it directly).
803        with self.assertRaises(EOFError):
804            gzip.decompress(partial_data)
805        d = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
806        f = io.BytesIO(partial_data)
807        gzip._read_gzip_header(f)
808        read_message = d.decompress(f.read())
809        self.assertEqual(read_message, message)
810
811    def test_flush_modes(self):
812        # Make sure the argument to flush is properly passed to the
813        # zlib.compressobj; see issue GH-105808.
814        class FakeCompressor:
815            def __init__(self):
816                self.modes = []
817            def compress(self, data):
818                return b''
819            def flush(self, mode=-1):
820                self.modes.append(mode)
821                return b''
822        b = io.BytesIO()
823        fc = FakeCompressor()
824        with gzip.GzipFile(fileobj=b, mode='w') as f:
825            f.compress = fc
826            f.flush()
827            f.flush(50)
828            f.flush(zlib_mode=100)
829        # The implicit close will also flush the compressor.
830        expected_modes = [
831            zlib.Z_SYNC_FLUSH,
832            50,
833            100,
834            -1,
835        ]
836        self.assertEqual(fc.modes, expected_modes)
837
838    def test_write_seek_write(self):
839        # Make sure that offset is up-to-date before seeking
840        # See issue GH-108111
841        b = io.BytesIO()
842        message = b"important message here."
843        with gzip.GzipFile(fileobj=b, mode='w') as f:
844            f.write(message)
845            f.seek(len(message))
846            f.write(message)
847        data = b.getvalue()
848        self.assertEqual(gzip.decompress(data), message * 2)
849
850
851class TestOpen(BaseTest):
852    def test_binary_modes(self):
853        uncompressed = data1 * 50
854
855        with gzip.open(self.filename, "wb") as f:
856            f.write(uncompressed)
857        with open(self.filename, "rb") as f:
858            file_data = gzip.decompress(f.read())
859            self.assertEqual(file_data, uncompressed)
860
861        with gzip.open(self.filename, "rb") as f:
862            self.assertEqual(f.read(), uncompressed)
863
864        with gzip.open(self.filename, "ab") as f:
865            f.write(uncompressed)
866        with open(self.filename, "rb") as f:
867            file_data = gzip.decompress(f.read())
868            self.assertEqual(file_data, uncompressed * 2)
869
870        with self.assertRaises(FileExistsError):
871            gzip.open(self.filename, "xb")
872        os_helper.unlink(self.filename)
873        with gzip.open(self.filename, "xb") as f:
874            f.write(uncompressed)
875        with open(self.filename, "rb") as f:
876            file_data = gzip.decompress(f.read())
877            self.assertEqual(file_data, uncompressed)
878
879    def test_pathlike_file(self):
880        filename = os_helper.FakePath(self.filename)
881        with gzip.open(filename, "wb") as f:
882            f.write(data1 * 50)
883        self.assertEqual(f.name, self.filename)
884        with gzip.open(filename, "ab") as f:
885            f.write(data1)
886        self.assertEqual(f.name, self.filename)
887        with gzip.open(filename) as f:
888            self.assertEqual(f.read(), data1 * 51)
889        self.assertEqual(f.name, self.filename)
890
891    def test_implicit_binary_modes(self):
892        # Test implicit binary modes (no "b" or "t" in mode string).
893        uncompressed = data1 * 50
894
895        with gzip.open(self.filename, "w") as f:
896            f.write(uncompressed)
897        with open(self.filename, "rb") as f:
898            file_data = gzip.decompress(f.read())
899            self.assertEqual(file_data, uncompressed)
900
901        with gzip.open(self.filename, "r") as f:
902            self.assertEqual(f.read(), uncompressed)
903
904        with gzip.open(self.filename, "a") as f:
905            f.write(uncompressed)
906        with open(self.filename, "rb") as f:
907            file_data = gzip.decompress(f.read())
908            self.assertEqual(file_data, uncompressed * 2)
909
910        with self.assertRaises(FileExistsError):
911            gzip.open(self.filename, "x")
912        os_helper.unlink(self.filename)
913        with gzip.open(self.filename, "x") as f:
914            f.write(uncompressed)
915        with open(self.filename, "rb") as f:
916            file_data = gzip.decompress(f.read())
917            self.assertEqual(file_data, uncompressed)
918
919    def test_text_modes(self):
920        uncompressed = data1.decode("ascii") * 50
921        uncompressed_raw = uncompressed.replace("\n", os.linesep)
922        with gzip.open(self.filename, "wt", encoding="ascii") as f:
923            f.write(uncompressed)
924        with open(self.filename, "rb") as f:
925            file_data = gzip.decompress(f.read()).decode("ascii")
926            self.assertEqual(file_data, uncompressed_raw)
927        with gzip.open(self.filename, "rt", encoding="ascii") as f:
928            self.assertEqual(f.read(), uncompressed)
929        with gzip.open(self.filename, "at", encoding="ascii") as f:
930            f.write(uncompressed)
931        with open(self.filename, "rb") as f:
932            file_data = gzip.decompress(f.read()).decode("ascii")
933            self.assertEqual(file_data, uncompressed_raw * 2)
934
935    def test_fileobj(self):
936        uncompressed_bytes = data1 * 50
937        uncompressed_str = uncompressed_bytes.decode("ascii")
938        compressed = gzip.compress(uncompressed_bytes)
939        with gzip.open(io.BytesIO(compressed), "r") as f:
940            self.assertEqual(f.read(), uncompressed_bytes)
941        with gzip.open(io.BytesIO(compressed), "rb") as f:
942            self.assertEqual(f.read(), uncompressed_bytes)
943        with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
944            self.assertEqual(f.read(), uncompressed_str)
945
946    def test_bad_params(self):
947        # Test invalid parameter combinations.
948        with self.assertRaises(TypeError):
949            gzip.open(123.456)
950        with self.assertRaises(ValueError):
951            gzip.open(self.filename, "wbt")
952        with self.assertRaises(ValueError):
953            gzip.open(self.filename, "xbt")
954        with self.assertRaises(ValueError):
955            gzip.open(self.filename, "rb", encoding="utf-8")
956        with self.assertRaises(ValueError):
957            gzip.open(self.filename, "rb", errors="ignore")
958        with self.assertRaises(ValueError):
959            gzip.open(self.filename, "rb", newline="\n")
960
961    def test_encoding(self):
962        # Test non-default encoding.
963        uncompressed = data1.decode("ascii") * 50
964        uncompressed_raw = uncompressed.replace("\n", os.linesep)
965        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
966            f.write(uncompressed)
967        with open(self.filename, "rb") as f:
968            file_data = gzip.decompress(f.read()).decode("utf-16")
969            self.assertEqual(file_data, uncompressed_raw)
970        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
971            self.assertEqual(f.read(), uncompressed)
972
973    def test_encoding_error_handler(self):
974        # Test with non-default encoding error handler.
975        with gzip.open(self.filename, "wb") as f:
976            f.write(b"foo\xffbar")
977        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
978                as f:
979            self.assertEqual(f.read(), "foobar")
980
981    def test_newline(self):
982        # Test with explicit newline (universal newline mode disabled).
983        uncompressed = data1.decode("ascii") * 50
984        with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
985            f.write(uncompressed)
986        with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
987            self.assertEqual(f.readlines(), [uncompressed])
988
989
990def create_and_remove_directory(directory):
991    def decorator(function):
992        @functools.wraps(function)
993        def wrapper(*args, **kwargs):
994            os.makedirs(directory)
995            try:
996                return function(*args, **kwargs)
997            finally:
998                os_helper.rmtree(directory)
999        return wrapper
1000    return decorator
1001
1002
1003class TestCommandLine(unittest.TestCase):
1004    data = b'This is a simple test with gzip'
1005
1006    @requires_subprocess()
1007    def test_decompress_stdin_stdout(self):
1008        with io.BytesIO() as bytes_io:
1009            with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
1010                gzip_file.write(self.data)
1011
1012            args = sys.executable, '-m', 'gzip', '-d'
1013            with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
1014                out, err = proc.communicate(bytes_io.getvalue())
1015
1016        self.assertEqual(err, b'')
1017        self.assertEqual(out, self.data)
1018
1019    @create_and_remove_directory(TEMPDIR)
1020    def test_decompress_infile_outfile(self):
1021        gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
1022        self.assertFalse(os.path.exists(gzipname))
1023
1024        with gzip.open(gzipname, mode='wb') as fp:
1025            fp.write(self.data)
1026        rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
1027
1028        with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
1029            self.assertEqual(gunziped.read(), self.data)
1030
1031        self.assertTrue(os.path.exists(gzipname))
1032        self.assertEqual(rc, 0)
1033        self.assertEqual(out, b'')
1034        self.assertEqual(err, b'')
1035
1036    def test_decompress_infile_outfile_error(self):
1037        rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
1038        self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
1039        self.assertEqual(rc, 1)
1040        self.assertEqual(out, b'')
1041
1042    @requires_subprocess()
1043    @create_and_remove_directory(TEMPDIR)
1044    def test_compress_stdin_outfile(self):
1045        args = sys.executable, '-m', 'gzip'
1046        with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
1047            out, err = proc.communicate(self.data)
1048
1049        self.assertEqual(err, b'')
1050        self.assertEqual(out[:2], b"\x1f\x8b")
1051
1052    @create_and_remove_directory(TEMPDIR)
1053    def test_compress_infile_outfile_default(self):
1054        local_testgzip = os.path.join(TEMPDIR, 'testgzip')
1055        gzipname = local_testgzip + '.gz'
1056        self.assertFalse(os.path.exists(gzipname))
1057
1058        with open(local_testgzip, 'wb') as fp:
1059            fp.write(self.data)
1060
1061        rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
1062
1063        self.assertTrue(os.path.exists(gzipname))
1064        self.assertEqual(out, b'')
1065        self.assertEqual(err, b'')
1066
1067    @create_and_remove_directory(TEMPDIR)
1068    def test_compress_infile_outfile(self):
1069        for compress_level in ('--fast', '--best'):
1070            with self.subTest(compress_level=compress_level):
1071                local_testgzip = os.path.join(TEMPDIR, 'testgzip')
1072                gzipname = local_testgzip + '.gz'
1073                self.assertFalse(os.path.exists(gzipname))
1074
1075                with open(local_testgzip, 'wb') as fp:
1076                    fp.write(self.data)
1077
1078                rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
1079
1080                self.assertTrue(os.path.exists(gzipname))
1081                self.assertEqual(out, b'')
1082                self.assertEqual(err, b'')
1083                os.remove(gzipname)
1084                self.assertFalse(os.path.exists(gzipname))
1085
1086    def test_compress_fast_best_are_exclusive(self):
1087        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
1088        self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
1089        self.assertEqual(out, b'')
1090
1091    def test_decompress_cannot_have_flags_compression(self):
1092        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
1093        self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
1094        self.assertEqual(out, b'')
1095
1096
1097if __name__ == "__main__":
1098    unittest.main()
1099