• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the gzip module.
2"""
3
4import array
5import functools
6import io
7import os
8import pathlib
9import struct
10import sys
11import unittest
12from subprocess import PIPE, Popen
13from test.support import import_helper
14from test.support import os_helper
15from test.support import _4G, bigmemtest
16from test.support.script_helper import assert_python_ok, assert_python_failure
17
18gzip = import_helper.import_module('gzip')
19
20data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
21  PyObject *RetVal;
22  int flushmode = Z_FINISH;
23  unsigned long start_total_out;
24
25"""
26
27data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
28/* See http://www.gzip.org/zlib/
29/* See http://www.winimage.com/zLibDll for Windows */
30"""
31
32
33TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
34
35
36class UnseekableIO(io.BytesIO):
37    def seekable(self):
38        return False
39
40    def tell(self):
41        raise io.UnsupportedOperation
42
43    def seek(self, *args):
44        raise io.UnsupportedOperation
45
46
47class BaseTest(unittest.TestCase):
48    filename = os_helper.TESTFN
49
50    def setUp(self):
51        os_helper.unlink(self.filename)
52
53    def tearDown(self):
54        os_helper.unlink(self.filename)
55
56
57class TestGzip(BaseTest):
58    def write_and_read_back(self, data, mode='b'):
59        b_data = bytes(data)
60        with gzip.GzipFile(self.filename, 'w'+mode) as f:
61            l = f.write(data)
62        self.assertEqual(l, len(b_data))
63        with gzip.GzipFile(self.filename, 'r'+mode) as f:
64            self.assertEqual(f.read(), b_data)
65
66    def test_write(self):
67        with gzip.GzipFile(self.filename, 'wb') as f:
68            f.write(data1 * 50)
69
70            # Try flush and fileno.
71            f.flush()
72            f.fileno()
73            if hasattr(os, 'fsync'):
74                os.fsync(f.fileno())
75            f.close()
76
77        # Test multiple close() calls.
78        f.close()
79
80    def test_write_read_with_pathlike_file(self):
81        filename = pathlib.Path(self.filename)
82        with gzip.GzipFile(filename, 'w') as f:
83            f.write(data1 * 50)
84        self.assertIsInstance(f.name, str)
85        with gzip.GzipFile(filename, 'a') as f:
86            f.write(data1)
87        with gzip.GzipFile(filename) as f:
88            d = f.read()
89        self.assertEqual(d, data1 * 51)
90        self.assertIsInstance(f.name, str)
91
92    # The following test_write_xy methods test that write accepts
93    # the corresponding bytes-like object type as input
94    # and that the data written equals bytes(xy) in all cases.
95    def test_write_memoryview(self):
96        self.write_and_read_back(memoryview(data1 * 50))
97        m = memoryview(bytes(range(256)))
98        data = m.cast('B', shape=[8,8,4])
99        self.write_and_read_back(data)
100
101    def test_write_bytearray(self):
102        self.write_and_read_back(bytearray(data1 * 50))
103
104    def test_write_array(self):
105        self.write_and_read_back(array.array('I', data1 * 40))
106
107    def test_write_incompatible_type(self):
108        # Test that non-bytes-like types raise TypeError.
109        # Issue #21560: attempts to write incompatible types
110        # should not affect the state of the fileobject
111        with gzip.GzipFile(self.filename, 'wb') as f:
112            with self.assertRaises(TypeError):
113                f.write('')
114            with self.assertRaises(TypeError):
115                f.write([])
116            f.write(data1)
117        with gzip.GzipFile(self.filename, 'rb') as f:
118            self.assertEqual(f.read(), data1)
119
120    def test_read(self):
121        self.test_write()
122        # Try reading.
123        with gzip.GzipFile(self.filename, 'r') as f:
124            d = f.read()
125        self.assertEqual(d, data1*50)
126
127    def test_read1(self):
128        self.test_write()
129        blocks = []
130        nread = 0
131        with gzip.GzipFile(self.filename, 'r') as f:
132            while True:
133                d = f.read1()
134                if not d:
135                    break
136                blocks.append(d)
137                nread += len(d)
138                # Check that position was updated correctly (see issue10791).
139                self.assertEqual(f.tell(), nread)
140        self.assertEqual(b''.join(blocks), data1 * 50)
141
142    @bigmemtest(size=_4G, memuse=1)
143    def test_read_large(self, size):
144        # Read chunk size over UINT_MAX should be supported, despite zlib's
145        # limitation per low-level call
146        compressed = gzip.compress(data1, compresslevel=1)
147        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
148        self.assertEqual(f.read(size), data1)
149
150    def test_io_on_closed_object(self):
151        # Test that I/O operations on closed GzipFile objects raise a
152        # ValueError, just like the corresponding functions on file objects.
153
154        # Write to a file, open it for reading, then close it.
155        self.test_write()
156        f = gzip.GzipFile(self.filename, 'r')
157        fileobj = f.fileobj
158        self.assertFalse(fileobj.closed)
159        f.close()
160        self.assertTrue(fileobj.closed)
161        with self.assertRaises(ValueError):
162            f.read(1)
163        with self.assertRaises(ValueError):
164            f.seek(0)
165        with self.assertRaises(ValueError):
166            f.tell()
167        # Open the file for writing, then close it.
168        f = gzip.GzipFile(self.filename, 'w')
169        fileobj = f.fileobj
170        self.assertFalse(fileobj.closed)
171        f.close()
172        self.assertTrue(fileobj.closed)
173        with self.assertRaises(ValueError):
174            f.write(b'')
175        with self.assertRaises(ValueError):
176            f.flush()
177
178    def test_append(self):
179        self.test_write()
180        # Append to the previous file
181        with gzip.GzipFile(self.filename, 'ab') as f:
182            f.write(data2 * 15)
183
184        with gzip.GzipFile(self.filename, 'rb') as f:
185            d = f.read()
186        self.assertEqual(d, (data1*50) + (data2*15))
187
188    def test_many_append(self):
189        # Bug #1074261 was triggered when reading a file that contained
190        # many, many members.  Create such a file and verify that reading it
191        # works.
192        with gzip.GzipFile(self.filename, 'wb', 9) as f:
193            f.write(b'a')
194        for i in range(0, 200):
195            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
196                f.write(b'a')
197
198        # Try reading the file
199        with gzip.GzipFile(self.filename, "rb") as zgfile:
200            contents = b""
201            while 1:
202                ztxt = zgfile.read(8192)
203                contents += ztxt
204                if not ztxt: break
205        self.assertEqual(contents, b'a'*201)
206
207    def test_exclusive_write(self):
208        with gzip.GzipFile(self.filename, 'xb') as f:
209            f.write(data1 * 50)
210        with gzip.GzipFile(self.filename, 'rb') as f:
211            self.assertEqual(f.read(), data1 * 50)
212        with self.assertRaises(FileExistsError):
213            gzip.GzipFile(self.filename, 'xb')
214
215    def test_buffered_reader(self):
216        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
217        # performance.
218        self.test_write()
219
220        with gzip.GzipFile(self.filename, 'rb') as f:
221            with io.BufferedReader(f) as r:
222                lines = [line for line in r]
223
224        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
225
226    def test_readline(self):
227        self.test_write()
228        # Try .readline() with varying line lengths
229
230        with gzip.GzipFile(self.filename, 'rb') as f:
231            line_length = 0
232            while 1:
233                L = f.readline(line_length)
234                if not L and line_length != 0: break
235                self.assertTrue(len(L) <= line_length)
236                line_length = (line_length + 1) % 50
237
238    def test_readlines(self):
239        self.test_write()
240        # Try .readlines()
241
242        with gzip.GzipFile(self.filename, 'rb') as f:
243            L = f.readlines()
244
245        with gzip.GzipFile(self.filename, 'rb') as f:
246            while 1:
247                L = f.readlines(150)
248                if L == []: break
249
250    def test_seek_read(self):
251        self.test_write()
252        # Try seek, read test
253
254        with gzip.GzipFile(self.filename) as f:
255            while 1:
256                oldpos = f.tell()
257                line1 = f.readline()
258                if not line1: break
259                newpos = f.tell()
260                f.seek(oldpos)  # negative seek
261                if len(line1)>10:
262                    amount = 10
263                else:
264                    amount = len(line1)
265                line2 = f.read(amount)
266                self.assertEqual(line1[:amount], line2)
267                f.seek(newpos)  # positive seek
268
269    def test_seek_whence(self):
270        self.test_write()
271        # Try seek(whence=1), read test
272
273        with gzip.GzipFile(self.filename) as f:
274            f.read(10)
275            f.seek(10, whence=1)
276            y = f.read(10)
277        self.assertEqual(y, data1[20:30])
278
279    def test_seek_write(self):
280        # Try seek, write test
281        with gzip.GzipFile(self.filename, 'w') as f:
282            for pos in range(0, 256, 16):
283                f.seek(pos)
284                f.write(b'GZ\n')
285
286    def test_mode(self):
287        self.test_write()
288        with gzip.GzipFile(self.filename, 'r') as f:
289            self.assertEqual(f.myfileobj.mode, 'rb')
290        os_helper.unlink(self.filename)
291        with gzip.GzipFile(self.filename, 'x') as f:
292            self.assertEqual(f.myfileobj.mode, 'xb')
293
294    def test_1647484(self):
295        for mode in ('wb', 'rb'):
296            with gzip.GzipFile(self.filename, mode) as f:
297                self.assertTrue(hasattr(f, "name"))
298                self.assertEqual(f.name, self.filename)
299
300    def test_paddedfile_getattr(self):
301        self.test_write()
302        with gzip.GzipFile(self.filename, 'rb') as f:
303            self.assertTrue(hasattr(f.fileobj, "name"))
304            self.assertEqual(f.fileobj.name, self.filename)
305
306    def test_mtime(self):
307        mtime = 123456789
308        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
309            fWrite.write(data1)
310        with gzip.GzipFile(self.filename) as fRead:
311            self.assertTrue(hasattr(fRead, 'mtime'))
312            self.assertIsNone(fRead.mtime)
313            dataRead = fRead.read()
314            self.assertEqual(dataRead, data1)
315            self.assertEqual(fRead.mtime, mtime)
316
317    def test_metadata(self):
318        mtime = 123456789
319
320        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
321            fWrite.write(data1)
322
323        with open(self.filename, 'rb') as fRead:
324            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
325
326            idBytes = fRead.read(2)
327            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
328
329            cmByte = fRead.read(1)
330            self.assertEqual(cmByte, b'\x08') # deflate
331
332            try:
333                expectedname = self.filename.encode('Latin-1') + b'\x00'
334                expectedflags = b'\x08' # only the FNAME flag is set
335            except UnicodeEncodeError:
336                expectedname = b''
337                expectedflags = b'\x00'
338
339            flagsByte = fRead.read(1)
340            self.assertEqual(flagsByte, expectedflags)
341
342            mtimeBytes = fRead.read(4)
343            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
344
345            xflByte = fRead.read(1)
346            self.assertEqual(xflByte, b'\x02') # maximum compression
347
348            osByte = fRead.read(1)
349            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
350
351            # Since the FNAME flag is set, the zero-terminated filename follows.
352            # RFC 1952 specifies that this is the name of the input file, if any.
353            # However, the gzip module defaults to storing the name of the output
354            # file in this field.
355            nameBytes = fRead.read(len(expectedname))
356            self.assertEqual(nameBytes, expectedname)
357
358            # Since no other flags were set, the header ends here.
359            # Rather than process the compressed data, let's seek to the trailer.
360            fRead.seek(os.stat(self.filename).st_size - 8)
361
362            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
363            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
364
365            isizeBytes = fRead.read(4)
366            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
367
368    def test_metadata_ascii_name(self):
369        self.filename = os_helper.TESTFN_ASCII
370        self.test_metadata()
371
372    def test_compresslevel_metadata(self):
373        # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
374        # specifically, discussion of XFL in section 2.3.1
375        cases = [
376            ('fast', 1, b'\x04'),
377            ('best', 9, b'\x02'),
378            ('tradeoff', 6, b'\x00'),
379        ]
380        xflOffset = 8
381
382        for (name, level, expectedXflByte) in cases:
383            with self.subTest(name):
384                fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
385                with fWrite:
386                    fWrite.write(data1)
387                with open(self.filename, 'rb') as fRead:
388                    fRead.seek(xflOffset)
389                    xflByte = fRead.read(1)
390                    self.assertEqual(xflByte, expectedXflByte)
391
392    def test_with_open(self):
393        # GzipFile supports the context management protocol
394        with gzip.GzipFile(self.filename, "wb") as f:
395            f.write(b"xxx")
396        f = gzip.GzipFile(self.filename, "rb")
397        f.close()
398        try:
399            with f:
400                pass
401        except ValueError:
402            pass
403        else:
404            self.fail("__enter__ on a closed file didn't raise an exception")
405        try:
406            with gzip.GzipFile(self.filename, "wb") as f:
407                1/0
408        except ZeroDivisionError:
409            pass
410        else:
411            self.fail("1/0 didn't raise an exception")
412
413    def test_zero_padded_file(self):
414        with gzip.GzipFile(self.filename, "wb") as f:
415            f.write(data1 * 50)
416
417        # Pad the file with zeroes
418        with open(self.filename, "ab") as f:
419            f.write(b"\x00" * 50)
420
421        with gzip.GzipFile(self.filename, "rb") as f:
422            d = f.read()
423            self.assertEqual(d, data1 * 50, "Incorrect data in file")
424
425    def test_gzip_BadGzipFile_exception(self):
426        self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
427
428    def test_bad_gzip_file(self):
429        with open(self.filename, 'wb') as file:
430            file.write(data1 * 50)
431        with gzip.GzipFile(self.filename, 'r') as file:
432            self.assertRaises(gzip.BadGzipFile, file.readlines)
433
434    def test_non_seekable_file(self):
435        uncompressed = data1 * 50
436        buf = UnseekableIO()
437        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
438            f.write(uncompressed)
439        compressed = buf.getvalue()
440        buf = UnseekableIO(compressed)
441        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
442            self.assertEqual(f.read(), uncompressed)
443
444    def test_peek(self):
445        uncompressed = data1 * 200
446        with gzip.GzipFile(self.filename, "wb") as f:
447            f.write(uncompressed)
448
449        def sizes():
450            while True:
451                for n in range(5, 50, 10):
452                    yield n
453
454        with gzip.GzipFile(self.filename, "rb") as f:
455            f.max_read_chunk = 33
456            nread = 0
457            for n in sizes():
458                s = f.peek(n)
459                if s == b'':
460                    break
461                self.assertEqual(f.read(len(s)), s)
462                nread += len(s)
463            self.assertEqual(f.read(100), b'')
464            self.assertEqual(nread, len(uncompressed))
465
466    def test_textio_readlines(self):
467        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
468        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
469        self.test_write()
470        with gzip.GzipFile(self.filename, 'r') as f:
471            with io.TextIOWrapper(f, encoding="ascii") as t:
472                self.assertEqual(t.readlines(), lines)
473
474    def test_fileobj_from_fdopen(self):
475        # Issue #13781: Opening a GzipFile for writing fails when using a
476        # fileobj created with os.fdopen().
477        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
478        with os.fdopen(fd, "wb") as f:
479            with gzip.GzipFile(fileobj=f, mode="w") as g:
480                pass
481
482    def test_fileobj_mode(self):
483        gzip.GzipFile(self.filename, "wb").close()
484        with open(self.filename, "r+b") as f:
485            with gzip.GzipFile(fileobj=f, mode='r') as g:
486                self.assertEqual(g.mode, gzip.READ)
487            with gzip.GzipFile(fileobj=f, mode='w') as g:
488                self.assertEqual(g.mode, gzip.WRITE)
489            with gzip.GzipFile(fileobj=f, mode='a') as g:
490                self.assertEqual(g.mode, gzip.WRITE)
491            with gzip.GzipFile(fileobj=f, mode='x') as g:
492                self.assertEqual(g.mode, gzip.WRITE)
493            with self.assertRaises(ValueError):
494                gzip.GzipFile(fileobj=f, mode='z')
495        for mode in "rb", "r+b":
496            with open(self.filename, mode) as f:
497                with gzip.GzipFile(fileobj=f) as g:
498                    self.assertEqual(g.mode, gzip.READ)
499        for mode in "wb", "ab", "xb":
500            if "x" in mode:
501                os_helper.unlink(self.filename)
502            with open(self.filename, mode) as f:
503                with self.assertWarns(FutureWarning):
504                    g = gzip.GzipFile(fileobj=f)
505                with g:
506                    self.assertEqual(g.mode, gzip.WRITE)
507
508    def test_bytes_filename(self):
509        str_filename = self.filename
510        try:
511            bytes_filename = str_filename.encode("ascii")
512        except UnicodeEncodeError:
513            self.skipTest("Temporary file name needs to be ASCII")
514        with gzip.GzipFile(bytes_filename, "wb") as f:
515            f.write(data1 * 50)
516        with gzip.GzipFile(bytes_filename, "rb") as f:
517            self.assertEqual(f.read(), data1 * 50)
518        # Sanity check that we are actually operating on the right file.
519        with gzip.GzipFile(str_filename, "rb") as f:
520            self.assertEqual(f.read(), data1 * 50)
521
522    def test_decompress_limited(self):
523        """Decompressed data buffering should be limited"""
524        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
525        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
526
527        bomb = io.BytesIO(bomb)
528        decomp = gzip.GzipFile(fileobj=bomb)
529        self.assertEqual(decomp.read(1), b'\0')
530        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
531        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
532            "Excessive amount of data was decompressed")
533
534    # Testing compress/decompress shortcut functions
535
536    def test_compress(self):
537        for data in [data1, data2]:
538            for args in [(), (1,), (6,), (9,)]:
539                datac = gzip.compress(data, *args)
540                self.assertEqual(type(datac), bytes)
541                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
542                    self.assertEqual(f.read(), data)
543
544    def test_compress_mtime(self):
545        mtime = 123456789
546        for data in [data1, data2]:
547            for args in [(), (1,), (6,), (9,)]:
548                with self.subTest(data=data, args=args):
549                    datac = gzip.compress(data, *args, mtime=mtime)
550                    self.assertEqual(type(datac), bytes)
551                    with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
552                        f.read(1) # to set mtime attribute
553                        self.assertEqual(f.mtime, mtime)
554
555    def test_decompress(self):
556        for data in (data1, data2):
557            buf = io.BytesIO()
558            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
559                f.write(data)
560            self.assertEqual(gzip.decompress(buf.getvalue()), data)
561            # Roundtrip with compress
562            datac = gzip.compress(data)
563            self.assertEqual(gzip.decompress(datac), data)
564
565    def test_read_truncated(self):
566        data = data1*50
567        # Drop the CRC (4 bytes) and file size (4 bytes).
568        truncated = gzip.compress(data)[:-8]
569        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
570            self.assertRaises(EOFError, f.read)
571        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
572            self.assertEqual(f.read(len(data)), data)
573            self.assertRaises(EOFError, f.read, 1)
574        # Incomplete 10-byte header.
575        for i in range(2, 10):
576            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
577                self.assertRaises(EOFError, f.read, 1)
578
579    def test_read_with_extra(self):
580        # Gzip data with an extra field
581        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
582                  b'\x05\x00Extra'
583                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
584        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
585            self.assertEqual(f.read(), b'Test')
586
587    def test_prepend_error(self):
588        # See issue #20875
589        with gzip.open(self.filename, "wb") as f:
590            f.write(data1)
591        with gzip.open(self.filename, "rb") as f:
592            f._buffer.raw._fp.prepend()
593
594    def test_issue44439(self):
595        q = array.array('Q', [1, 2, 3, 4, 5])
596        LENGTH = len(q) * q.itemsize
597
598        with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
599            self.assertEqual(f.write(q), LENGTH)
600            self.assertEqual(f.tell(), LENGTH)
601
602
603class TestOpen(BaseTest):
604    def test_binary_modes(self):
605        uncompressed = data1 * 50
606
607        with gzip.open(self.filename, "wb") as f:
608            f.write(uncompressed)
609        with open(self.filename, "rb") as f:
610            file_data = gzip.decompress(f.read())
611            self.assertEqual(file_data, uncompressed)
612
613        with gzip.open(self.filename, "rb") as f:
614            self.assertEqual(f.read(), uncompressed)
615
616        with gzip.open(self.filename, "ab") as f:
617            f.write(uncompressed)
618        with open(self.filename, "rb") as f:
619            file_data = gzip.decompress(f.read())
620            self.assertEqual(file_data, uncompressed * 2)
621
622        with self.assertRaises(FileExistsError):
623            gzip.open(self.filename, "xb")
624        os_helper.unlink(self.filename)
625        with gzip.open(self.filename, "xb") as f:
626            f.write(uncompressed)
627        with open(self.filename, "rb") as f:
628            file_data = gzip.decompress(f.read())
629            self.assertEqual(file_data, uncompressed)
630
631    def test_pathlike_file(self):
632        filename = pathlib.Path(self.filename)
633        with gzip.open(filename, "wb") as f:
634            f.write(data1 * 50)
635        with gzip.open(filename, "ab") as f:
636            f.write(data1)
637        with gzip.open(filename) as f:
638            self.assertEqual(f.read(), data1 * 51)
639
640    def test_implicit_binary_modes(self):
641        # Test implicit binary modes (no "b" or "t" in mode string).
642        uncompressed = data1 * 50
643
644        with gzip.open(self.filename, "w") as f:
645            f.write(uncompressed)
646        with open(self.filename, "rb") as f:
647            file_data = gzip.decompress(f.read())
648            self.assertEqual(file_data, uncompressed)
649
650        with gzip.open(self.filename, "r") as f:
651            self.assertEqual(f.read(), uncompressed)
652
653        with gzip.open(self.filename, "a") as f:
654            f.write(uncompressed)
655        with open(self.filename, "rb") as f:
656            file_data = gzip.decompress(f.read())
657            self.assertEqual(file_data, uncompressed * 2)
658
659        with self.assertRaises(FileExistsError):
660            gzip.open(self.filename, "x")
661        os_helper.unlink(self.filename)
662        with gzip.open(self.filename, "x") as f:
663            f.write(uncompressed)
664        with open(self.filename, "rb") as f:
665            file_data = gzip.decompress(f.read())
666            self.assertEqual(file_data, uncompressed)
667
668    def test_text_modes(self):
669        uncompressed = data1.decode("ascii") * 50
670        uncompressed_raw = uncompressed.replace("\n", os.linesep)
671        with gzip.open(self.filename, "wt", encoding="ascii") as f:
672            f.write(uncompressed)
673        with open(self.filename, "rb") as f:
674            file_data = gzip.decompress(f.read()).decode("ascii")
675            self.assertEqual(file_data, uncompressed_raw)
676        with gzip.open(self.filename, "rt", encoding="ascii") as f:
677            self.assertEqual(f.read(), uncompressed)
678        with gzip.open(self.filename, "at", encoding="ascii") as f:
679            f.write(uncompressed)
680        with open(self.filename, "rb") as f:
681            file_data = gzip.decompress(f.read()).decode("ascii")
682            self.assertEqual(file_data, uncompressed_raw * 2)
683
684    def test_fileobj(self):
685        uncompressed_bytes = data1 * 50
686        uncompressed_str = uncompressed_bytes.decode("ascii")
687        compressed = gzip.compress(uncompressed_bytes)
688        with gzip.open(io.BytesIO(compressed), "r") as f:
689            self.assertEqual(f.read(), uncompressed_bytes)
690        with gzip.open(io.BytesIO(compressed), "rb") as f:
691            self.assertEqual(f.read(), uncompressed_bytes)
692        with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
693            self.assertEqual(f.read(), uncompressed_str)
694
695    def test_bad_params(self):
696        # Test invalid parameter combinations.
697        with self.assertRaises(TypeError):
698            gzip.open(123.456)
699        with self.assertRaises(ValueError):
700            gzip.open(self.filename, "wbt")
701        with self.assertRaises(ValueError):
702            gzip.open(self.filename, "xbt")
703        with self.assertRaises(ValueError):
704            gzip.open(self.filename, "rb", encoding="utf-8")
705        with self.assertRaises(ValueError):
706            gzip.open(self.filename, "rb", errors="ignore")
707        with self.assertRaises(ValueError):
708            gzip.open(self.filename, "rb", newline="\n")
709
710    def test_encoding(self):
711        # Test non-default encoding.
712        uncompressed = data1.decode("ascii") * 50
713        uncompressed_raw = uncompressed.replace("\n", os.linesep)
714        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
715            f.write(uncompressed)
716        with open(self.filename, "rb") as f:
717            file_data = gzip.decompress(f.read()).decode("utf-16")
718            self.assertEqual(file_data, uncompressed_raw)
719        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
720            self.assertEqual(f.read(), uncompressed)
721
722    def test_encoding_error_handler(self):
723        # Test with non-default encoding error handler.
724        with gzip.open(self.filename, "wb") as f:
725            f.write(b"foo\xffbar")
726        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
727                as f:
728            self.assertEqual(f.read(), "foobar")
729
730    def test_newline(self):
731        # Test with explicit newline (universal newline mode disabled).
732        uncompressed = data1.decode("ascii") * 50
733        with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
734            f.write(uncompressed)
735        with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
736            self.assertEqual(f.readlines(), [uncompressed])
737
738
739def create_and_remove_directory(directory):
740    def decorator(function):
741        @functools.wraps(function)
742        def wrapper(*args, **kwargs):
743            os.makedirs(directory)
744            try:
745                return function(*args, **kwargs)
746            finally:
747                os_helper.rmtree(directory)
748        return wrapper
749    return decorator
750
751
752class TestCommandLine(unittest.TestCase):
753    data = b'This is a simple test with gzip'
754
755    def test_decompress_stdin_stdout(self):
756        with io.BytesIO() as bytes_io:
757            with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
758                gzip_file.write(self.data)
759
760            args = sys.executable, '-m', 'gzip', '-d'
761            with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
762                out, err = proc.communicate(bytes_io.getvalue())
763
764        self.assertEqual(err, b'')
765        self.assertEqual(out, self.data)
766
767    @create_and_remove_directory(TEMPDIR)
768    def test_decompress_infile_outfile(self):
769        gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
770        self.assertFalse(os.path.exists(gzipname))
771
772        with gzip.open(gzipname, mode='wb') as fp:
773            fp.write(self.data)
774        rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
775
776        with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
777            self.assertEqual(gunziped.read(), self.data)
778
779        self.assertTrue(os.path.exists(gzipname))
780        self.assertEqual(rc, 0)
781        self.assertEqual(out, b'')
782        self.assertEqual(err, b'')
783
784    def test_decompress_infile_outfile_error(self):
785        rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
786        self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
787        self.assertEqual(rc, 1)
788        self.assertEqual(out, b'')
789
790    @create_and_remove_directory(TEMPDIR)
791    def test_compress_stdin_outfile(self):
792        args = sys.executable, '-m', 'gzip'
793        with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
794            out, err = proc.communicate(self.data)
795
796        self.assertEqual(err, b'')
797        self.assertEqual(out[:2], b"\x1f\x8b")
798
799    @create_and_remove_directory(TEMPDIR)
800    def test_compress_infile_outfile_default(self):
801        local_testgzip = os.path.join(TEMPDIR, 'testgzip')
802        gzipname = local_testgzip + '.gz'
803        self.assertFalse(os.path.exists(gzipname))
804
805        with open(local_testgzip, 'wb') as fp:
806            fp.write(self.data)
807
808        rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
809
810        self.assertTrue(os.path.exists(gzipname))
811        self.assertEqual(out, b'')
812        self.assertEqual(err, b'')
813
814    @create_and_remove_directory(TEMPDIR)
815    def test_compress_infile_outfile(self):
816        for compress_level in ('--fast', '--best'):
817            with self.subTest(compress_level=compress_level):
818                local_testgzip = os.path.join(TEMPDIR, 'testgzip')
819                gzipname = local_testgzip + '.gz'
820                self.assertFalse(os.path.exists(gzipname))
821
822                with open(local_testgzip, 'wb') as fp:
823                    fp.write(self.data)
824
825                rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
826
827                self.assertTrue(os.path.exists(gzipname))
828                self.assertEqual(out, b'')
829                self.assertEqual(err, b'')
830                os.remove(gzipname)
831                self.assertFalse(os.path.exists(gzipname))
832
833    def test_compress_fast_best_are_exclusive(self):
834        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
835        self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
836        self.assertEqual(out, b'')
837
838    def test_decompress_cannot_have_flags_compression(self):
839        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
840        self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
841        self.assertEqual(out, b'')
842
843
844if __name__ == "__main__":
845    unittest.main()
846