• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the gzip module.
2"""
3
4import array
5import functools
6import io
7import os
8import pathlib
9import struct
10import sys
11import unittest
12from subprocess import PIPE, Popen
13from test import support
14from test.support import _4G, bigmemtest
15from test.support.script_helper import assert_python_ok, assert_python_failure
16
17gzip = support.import_module('gzip')
18
19data1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
20  PyObject *RetVal;
21  int flushmode = Z_FINISH;
22  unsigned long start_total_out;
23
24"""
25
26data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
27/* See http://www.gzip.org/zlib/
28/* See http://www.winimage.com/zLibDll for Windows */
29"""
30
31
32TEMPDIR = os.path.abspath(support.TESTFN) + '-gzdir'
33
34
35class UnseekableIO(io.BytesIO):
36    def seekable(self):
37        return False
38
39    def tell(self):
40        raise io.UnsupportedOperation
41
42    def seek(self, *args):
43        raise io.UnsupportedOperation
44
45
46class BaseTest(unittest.TestCase):
47    filename = support.TESTFN
48
49    def setUp(self):
50        support.unlink(self.filename)
51
52    def tearDown(self):
53        support.unlink(self.filename)
54
55
56class TestGzip(BaseTest):
57    def write_and_read_back(self, data, mode='b'):
58        b_data = bytes(data)
59        with gzip.GzipFile(self.filename, 'w'+mode) as f:
60            l = f.write(data)
61        self.assertEqual(l, len(b_data))
62        with gzip.GzipFile(self.filename, 'r'+mode) as f:
63            self.assertEqual(f.read(), b_data)
64
65    def test_write(self):
66        with gzip.GzipFile(self.filename, 'wb') as f:
67            f.write(data1 * 50)
68
69            # Try flush and fileno.
70            f.flush()
71            f.fileno()
72            if hasattr(os, 'fsync'):
73                os.fsync(f.fileno())
74            f.close()
75
76        # Test multiple close() calls.
77        f.close()
78
79    def test_write_read_with_pathlike_file(self):
80        filename = pathlib.Path(self.filename)
81        with gzip.GzipFile(filename, 'w') as f:
82            f.write(data1 * 50)
83        self.assertIsInstance(f.name, str)
84        with gzip.GzipFile(filename, 'a') as f:
85            f.write(data1)
86        with gzip.GzipFile(filename) as f:
87            d = f.read()
88        self.assertEqual(d, data1 * 51)
89        self.assertIsInstance(f.name, str)
90
91    # The following test_write_xy methods test that write accepts
92    # the corresponding bytes-like object type as input
93    # and that the data written equals bytes(xy) in all cases.
94    def test_write_memoryview(self):
95        self.write_and_read_back(memoryview(data1 * 50))
96        m = memoryview(bytes(range(256)))
97        data = m.cast('B', shape=[8,8,4])
98        self.write_and_read_back(data)
99
100    def test_write_bytearray(self):
101        self.write_and_read_back(bytearray(data1 * 50))
102
103    def test_write_array(self):
104        self.write_and_read_back(array.array('I', data1 * 40))
105
106    def test_write_incompatible_type(self):
107        # Test that non-bytes-like types raise TypeError.
108        # Issue #21560: attempts to write incompatible types
109        # should not affect the state of the fileobject
110        with gzip.GzipFile(self.filename, 'wb') as f:
111            with self.assertRaises(TypeError):
112                f.write('')
113            with self.assertRaises(TypeError):
114                f.write([])
115            f.write(data1)
116        with gzip.GzipFile(self.filename, 'rb') as f:
117            self.assertEqual(f.read(), data1)
118
119    def test_read(self):
120        self.test_write()
121        # Try reading.
122        with gzip.GzipFile(self.filename, 'r') as f:
123            d = f.read()
124        self.assertEqual(d, data1*50)
125
126    def test_read1(self):
127        self.test_write()
128        blocks = []
129        nread = 0
130        with gzip.GzipFile(self.filename, 'r') as f:
131            while True:
132                d = f.read1()
133                if not d:
134                    break
135                blocks.append(d)
136                nread += len(d)
137                # Check that position was updated correctly (see issue10791).
138                self.assertEqual(f.tell(), nread)
139        self.assertEqual(b''.join(blocks), data1 * 50)
140
141    @bigmemtest(size=_4G, memuse=1)
142    def test_read_large(self, size):
143        # Read chunk size over UINT_MAX should be supported, despite zlib's
144        # limitation per low-level call
145        compressed = gzip.compress(data1, compresslevel=1)
146        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
147        self.assertEqual(f.read(size), data1)
148
149    def test_io_on_closed_object(self):
150        # Test that I/O operations on closed GzipFile objects raise a
151        # ValueError, just like the corresponding functions on file objects.
152
153        # Write to a file, open it for reading, then close it.
154        self.test_write()
155        f = gzip.GzipFile(self.filename, 'r')
156        fileobj = f.fileobj
157        self.assertFalse(fileobj.closed)
158        f.close()
159        self.assertTrue(fileobj.closed)
160        with self.assertRaises(ValueError):
161            f.read(1)
162        with self.assertRaises(ValueError):
163            f.seek(0)
164        with self.assertRaises(ValueError):
165            f.tell()
166        # Open the file for writing, then close it.
167        f = gzip.GzipFile(self.filename, 'w')
168        fileobj = f.fileobj
169        self.assertFalse(fileobj.closed)
170        f.close()
171        self.assertTrue(fileobj.closed)
172        with self.assertRaises(ValueError):
173            f.write(b'')
174        with self.assertRaises(ValueError):
175            f.flush()
176
177    def test_append(self):
178        self.test_write()
179        # Append to the previous file
180        with gzip.GzipFile(self.filename, 'ab') as f:
181            f.write(data2 * 15)
182
183        with gzip.GzipFile(self.filename, 'rb') as f:
184            d = f.read()
185        self.assertEqual(d, (data1*50) + (data2*15))
186
187    def test_many_append(self):
188        # Bug #1074261 was triggered when reading a file that contained
189        # many, many members.  Create such a file and verify that reading it
190        # works.
191        with gzip.GzipFile(self.filename, 'wb', 9) as f:
192            f.write(b'a')
193        for i in range(0, 200):
194            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
195                f.write(b'a')
196
197        # Try reading the file
198        with gzip.GzipFile(self.filename, "rb") as zgfile:
199            contents = b""
200            while 1:
201                ztxt = zgfile.read(8192)
202                contents += ztxt
203                if not ztxt: break
204        self.assertEqual(contents, b'a'*201)
205
206    def test_exclusive_write(self):
207        with gzip.GzipFile(self.filename, 'xb') as f:
208            f.write(data1 * 50)
209        with gzip.GzipFile(self.filename, 'rb') as f:
210            self.assertEqual(f.read(), data1 * 50)
211        with self.assertRaises(FileExistsError):
212            gzip.GzipFile(self.filename, 'xb')
213
214    def test_buffered_reader(self):
215        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
216        # performance.
217        self.test_write()
218
219        with gzip.GzipFile(self.filename, 'rb') as f:
220            with io.BufferedReader(f) as r:
221                lines = [line for line in r]
222
223        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
224
225    def test_readline(self):
226        self.test_write()
227        # Try .readline() with varying line lengths
228
229        with gzip.GzipFile(self.filename, 'rb') as f:
230            line_length = 0
231            while 1:
232                L = f.readline(line_length)
233                if not L and line_length != 0: break
234                self.assertTrue(len(L) <= line_length)
235                line_length = (line_length + 1) % 50
236
237    def test_readlines(self):
238        self.test_write()
239        # Try .readlines()
240
241        with gzip.GzipFile(self.filename, 'rb') as f:
242            L = f.readlines()
243
244        with gzip.GzipFile(self.filename, 'rb') as f:
245            while 1:
246                L = f.readlines(150)
247                if L == []: break
248
249    def test_seek_read(self):
250        self.test_write()
251        # Try seek, read test
252
253        with gzip.GzipFile(self.filename) as f:
254            while 1:
255                oldpos = f.tell()
256                line1 = f.readline()
257                if not line1: break
258                newpos = f.tell()
259                f.seek(oldpos)  # negative seek
260                if len(line1)>10:
261                    amount = 10
262                else:
263                    amount = len(line1)
264                line2 = f.read(amount)
265                self.assertEqual(line1[:amount], line2)
266                f.seek(newpos)  # positive seek
267
268    def test_seek_whence(self):
269        self.test_write()
270        # Try seek(whence=1), read test
271
272        with gzip.GzipFile(self.filename) as f:
273            f.read(10)
274            f.seek(10, whence=1)
275            y = f.read(10)
276        self.assertEqual(y, data1[20:30])
277
278    def test_seek_write(self):
279        # Try seek, write test
280        with gzip.GzipFile(self.filename, 'w') as f:
281            for pos in range(0, 256, 16):
282                f.seek(pos)
283                f.write(b'GZ\n')
284
285    def test_mode(self):
286        self.test_write()
287        with gzip.GzipFile(self.filename, 'r') as f:
288            self.assertEqual(f.myfileobj.mode, 'rb')
289        support.unlink(self.filename)
290        with gzip.GzipFile(self.filename, 'x') as f:
291            self.assertEqual(f.myfileobj.mode, 'xb')
292
293    def test_1647484(self):
294        for mode in ('wb', 'rb'):
295            with gzip.GzipFile(self.filename, mode) as f:
296                self.assertTrue(hasattr(f, "name"))
297                self.assertEqual(f.name, self.filename)
298
299    def test_paddedfile_getattr(self):
300        self.test_write()
301        with gzip.GzipFile(self.filename, 'rb') as f:
302            self.assertTrue(hasattr(f.fileobj, "name"))
303            self.assertEqual(f.fileobj.name, self.filename)
304
305    def test_mtime(self):
306        mtime = 123456789
307        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
308            fWrite.write(data1)
309        with gzip.GzipFile(self.filename) as fRead:
310            self.assertTrue(hasattr(fRead, 'mtime'))
311            self.assertIsNone(fRead.mtime)
312            dataRead = fRead.read()
313            self.assertEqual(dataRead, data1)
314            self.assertEqual(fRead.mtime, mtime)
315
316    def test_metadata(self):
317        mtime = 123456789
318
319        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
320            fWrite.write(data1)
321
322        with open(self.filename, 'rb') as fRead:
323            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
324
325            idBytes = fRead.read(2)
326            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
327
328            cmByte = fRead.read(1)
329            self.assertEqual(cmByte, b'\x08') # deflate
330
331            try:
332                expectedname = self.filename.encode('Latin-1') + b'\x00'
333                expectedflags = b'\x08' # only the FNAME flag is set
334            except UnicodeEncodeError:
335                expectedname = b''
336                expectedflags = b'\x00'
337
338            flagsByte = fRead.read(1)
339            self.assertEqual(flagsByte, expectedflags)
340
341            mtimeBytes = fRead.read(4)
342            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
343
344            xflByte = fRead.read(1)
345            self.assertEqual(xflByte, b'\x02') # maximum compression
346
347            osByte = fRead.read(1)
348            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
349
350            # Since the FNAME flag is set, the zero-terminated filename follows.
351            # RFC 1952 specifies that this is the name of the input file, if any.
352            # However, the gzip module defaults to storing the name of the output
353            # file in this field.
354            nameBytes = fRead.read(len(expectedname))
355            self.assertEqual(nameBytes, expectedname)
356
357            # Since no other flags were set, the header ends here.
358            # Rather than process the compressed data, let's seek to the trailer.
359            fRead.seek(os.stat(self.filename).st_size - 8)
360
361            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
362            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
363
364            isizeBytes = fRead.read(4)
365            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
366
367    def test_metadata_ascii_name(self):
368        self.filename = support.TESTFN_ASCII
369        self.test_metadata()
370
371    def test_compresslevel_metadata(self):
372        # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
373        # specifically, discussion of XFL in section 2.3.1
374        cases = [
375            ('fast', 1, b'\x04'),
376            ('best', 9, b'\x02'),
377            ('tradeoff', 6, b'\x00'),
378        ]
379        xflOffset = 8
380
381        for (name, level, expectedXflByte) in cases:
382            with self.subTest(name):
383                fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
384                with fWrite:
385                    fWrite.write(data1)
386                with open(self.filename, 'rb') as fRead:
387                    fRead.seek(xflOffset)
388                    xflByte = fRead.read(1)
389                    self.assertEqual(xflByte, expectedXflByte)
390
391    def test_with_open(self):
392        # GzipFile supports the context management protocol
393        with gzip.GzipFile(self.filename, "wb") as f:
394            f.write(b"xxx")
395        f = gzip.GzipFile(self.filename, "rb")
396        f.close()
397        try:
398            with f:
399                pass
400        except ValueError:
401            pass
402        else:
403            self.fail("__enter__ on a closed file didn't raise an exception")
404        try:
405            with gzip.GzipFile(self.filename, "wb") as f:
406                1/0
407        except ZeroDivisionError:
408            pass
409        else:
410            self.fail("1/0 didn't raise an exception")
411
412    def test_zero_padded_file(self):
413        with gzip.GzipFile(self.filename, "wb") as f:
414            f.write(data1 * 50)
415
416        # Pad the file with zeroes
417        with open(self.filename, "ab") as f:
418            f.write(b"\x00" * 50)
419
420        with gzip.GzipFile(self.filename, "rb") as f:
421            d = f.read()
422            self.assertEqual(d, data1 * 50, "Incorrect data in file")
423
424    def test_gzip_BadGzipFile_exception(self):
425        self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
426
427    def test_bad_gzip_file(self):
428        with open(self.filename, 'wb') as file:
429            file.write(data1 * 50)
430        with gzip.GzipFile(self.filename, 'r') as file:
431            self.assertRaises(gzip.BadGzipFile, file.readlines)
432
433    def test_non_seekable_file(self):
434        uncompressed = data1 * 50
435        buf = UnseekableIO()
436        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
437            f.write(uncompressed)
438        compressed = buf.getvalue()
439        buf = UnseekableIO(compressed)
440        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
441            self.assertEqual(f.read(), uncompressed)
442
443    def test_peek(self):
444        uncompressed = data1 * 200
445        with gzip.GzipFile(self.filename, "wb") as f:
446            f.write(uncompressed)
447
448        def sizes():
449            while True:
450                for n in range(5, 50, 10):
451                    yield n
452
453        with gzip.GzipFile(self.filename, "rb") as f:
454            f.max_read_chunk = 33
455            nread = 0
456            for n in sizes():
457                s = f.peek(n)
458                if s == b'':
459                    break
460                self.assertEqual(f.read(len(s)), s)
461                nread += len(s)
462            self.assertEqual(f.read(100), b'')
463            self.assertEqual(nread, len(uncompressed))
464
465    def test_textio_readlines(self):
466        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
467        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
468        self.test_write()
469        with gzip.GzipFile(self.filename, 'r') as f:
470            with io.TextIOWrapper(f, encoding="ascii") as t:
471                self.assertEqual(t.readlines(), lines)
472
473    def test_fileobj_from_fdopen(self):
474        # Issue #13781: Opening a GzipFile for writing fails when using a
475        # fileobj created with os.fdopen().
476        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
477        with os.fdopen(fd, "wb") as f:
478            with gzip.GzipFile(fileobj=f, mode="w") as g:
479                pass
480
481    def test_fileobj_mode(self):
482        gzip.GzipFile(self.filename, "wb").close()
483        with open(self.filename, "r+b") as f:
484            with gzip.GzipFile(fileobj=f, mode='r') as g:
485                self.assertEqual(g.mode, gzip.READ)
486            with gzip.GzipFile(fileobj=f, mode='w') as g:
487                self.assertEqual(g.mode, gzip.WRITE)
488            with gzip.GzipFile(fileobj=f, mode='a') as g:
489                self.assertEqual(g.mode, gzip.WRITE)
490            with gzip.GzipFile(fileobj=f, mode='x') as g:
491                self.assertEqual(g.mode, gzip.WRITE)
492            with self.assertRaises(ValueError):
493                gzip.GzipFile(fileobj=f, mode='z')
494        for mode in "rb", "r+b":
495            with open(self.filename, mode) as f:
496                with gzip.GzipFile(fileobj=f) as g:
497                    self.assertEqual(g.mode, gzip.READ)
498        for mode in "wb", "ab", "xb":
499            if "x" in mode:
500                support.unlink(self.filename)
501            with open(self.filename, mode) as f:
502                with self.assertWarns(FutureWarning):
503                    g = gzip.GzipFile(fileobj=f)
504                with g:
505                    self.assertEqual(g.mode, gzip.WRITE)
506
507    def test_bytes_filename(self):
508        str_filename = self.filename
509        try:
510            bytes_filename = str_filename.encode("ascii")
511        except UnicodeEncodeError:
512            self.skipTest("Temporary file name needs to be ASCII")
513        with gzip.GzipFile(bytes_filename, "wb") as f:
514            f.write(data1 * 50)
515        with gzip.GzipFile(bytes_filename, "rb") as f:
516            self.assertEqual(f.read(), data1 * 50)
517        # Sanity check that we are actually operating on the right file.
518        with gzip.GzipFile(str_filename, "rb") as f:
519            self.assertEqual(f.read(), data1 * 50)
520
521    def test_decompress_limited(self):
522        """Decompressed data buffering should be limited"""
523        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
524        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
525
526        bomb = io.BytesIO(bomb)
527        decomp = gzip.GzipFile(fileobj=bomb)
528        self.assertEqual(decomp.read(1), b'\0')
529        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
530        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
531            "Excessive amount of data was decompressed")
532
533    # Testing compress/decompress shortcut functions
534
535    def test_compress(self):
536        for data in [data1, data2]:
537            for args in [(), (1,), (6,), (9,)]:
538                datac = gzip.compress(data, *args)
539                self.assertEqual(type(datac), bytes)
540                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
541                    self.assertEqual(f.read(), data)
542
543    def test_compress_mtime(self):
544        mtime = 123456789
545        for data in [data1, data2]:
546            for args in [(), (1,), (6,), (9,)]:
547                with self.subTest(data=data, args=args):
548                    datac = gzip.compress(data, *args, mtime=mtime)
549                    self.assertEqual(type(datac), bytes)
550                    with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
551                        f.read(1) # to set mtime attribute
552                        self.assertEqual(f.mtime, mtime)
553
554    def test_decompress(self):
555        for data in (data1, data2):
556            buf = io.BytesIO()
557            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
558                f.write(data)
559            self.assertEqual(gzip.decompress(buf.getvalue()), data)
560            # Roundtrip with compress
561            datac = gzip.compress(data)
562            self.assertEqual(gzip.decompress(datac), data)
563
564    def test_read_truncated(self):
565        data = data1*50
566        # Drop the CRC (4 bytes) and file size (4 bytes).
567        truncated = gzip.compress(data)[:-8]
568        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
569            self.assertRaises(EOFError, f.read)
570        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
571            self.assertEqual(f.read(len(data)), data)
572            self.assertRaises(EOFError, f.read, 1)
573        # Incomplete 10-byte header.
574        for i in range(2, 10):
575            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
576                self.assertRaises(EOFError, f.read, 1)
577
578    def test_read_with_extra(self):
579        # Gzip data with an extra field
580        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
581                  b'\x05\x00Extra'
582                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
583        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
584            self.assertEqual(f.read(), b'Test')
585
586    def test_prepend_error(self):
587        # See issue #20875
588        with gzip.open(self.filename, "wb") as f:
589            f.write(data1)
590        with gzip.open(self.filename, "rb") as f:
591            f._buffer.raw._fp.prepend()
592
593class TestOpen(BaseTest):
594    def test_binary_modes(self):
595        uncompressed = data1 * 50
596
597        with gzip.open(self.filename, "wb") as f:
598            f.write(uncompressed)
599        with open(self.filename, "rb") as f:
600            file_data = gzip.decompress(f.read())
601            self.assertEqual(file_data, uncompressed)
602
603        with gzip.open(self.filename, "rb") as f:
604            self.assertEqual(f.read(), uncompressed)
605
606        with gzip.open(self.filename, "ab") as f:
607            f.write(uncompressed)
608        with open(self.filename, "rb") as f:
609            file_data = gzip.decompress(f.read())
610            self.assertEqual(file_data, uncompressed * 2)
611
612        with self.assertRaises(FileExistsError):
613            gzip.open(self.filename, "xb")
614        support.unlink(self.filename)
615        with gzip.open(self.filename, "xb") as f:
616            f.write(uncompressed)
617        with open(self.filename, "rb") as f:
618            file_data = gzip.decompress(f.read())
619            self.assertEqual(file_data, uncompressed)
620
621    def test_pathlike_file(self):
622        filename = pathlib.Path(self.filename)
623        with gzip.open(filename, "wb") as f:
624            f.write(data1 * 50)
625        with gzip.open(filename, "ab") as f:
626            f.write(data1)
627        with gzip.open(filename) as f:
628            self.assertEqual(f.read(), data1 * 51)
629
630    def test_implicit_binary_modes(self):
631        # Test implicit binary modes (no "b" or "t" in mode string).
632        uncompressed = data1 * 50
633
634        with gzip.open(self.filename, "w") as f:
635            f.write(uncompressed)
636        with open(self.filename, "rb") as f:
637            file_data = gzip.decompress(f.read())
638            self.assertEqual(file_data, uncompressed)
639
640        with gzip.open(self.filename, "r") as f:
641            self.assertEqual(f.read(), uncompressed)
642
643        with gzip.open(self.filename, "a") as f:
644            f.write(uncompressed)
645        with open(self.filename, "rb") as f:
646            file_data = gzip.decompress(f.read())
647            self.assertEqual(file_data, uncompressed * 2)
648
649        with self.assertRaises(FileExistsError):
650            gzip.open(self.filename, "x")
651        support.unlink(self.filename)
652        with gzip.open(self.filename, "x") as f:
653            f.write(uncompressed)
654        with open(self.filename, "rb") as f:
655            file_data = gzip.decompress(f.read())
656            self.assertEqual(file_data, uncompressed)
657
658    def test_text_modes(self):
659        uncompressed = data1.decode("ascii") * 50
660        uncompressed_raw = uncompressed.replace("\n", os.linesep)
661        with gzip.open(self.filename, "wt") as f:
662            f.write(uncompressed)
663        with open(self.filename, "rb") as f:
664            file_data = gzip.decompress(f.read()).decode("ascii")
665            self.assertEqual(file_data, uncompressed_raw)
666        with gzip.open(self.filename, "rt") as f:
667            self.assertEqual(f.read(), uncompressed)
668        with gzip.open(self.filename, "at") as f:
669            f.write(uncompressed)
670        with open(self.filename, "rb") as f:
671            file_data = gzip.decompress(f.read()).decode("ascii")
672            self.assertEqual(file_data, uncompressed_raw * 2)
673
674    def test_fileobj(self):
675        uncompressed_bytes = data1 * 50
676        uncompressed_str = uncompressed_bytes.decode("ascii")
677        compressed = gzip.compress(uncompressed_bytes)
678        with gzip.open(io.BytesIO(compressed), "r") as f:
679            self.assertEqual(f.read(), uncompressed_bytes)
680        with gzip.open(io.BytesIO(compressed), "rb") as f:
681            self.assertEqual(f.read(), uncompressed_bytes)
682        with gzip.open(io.BytesIO(compressed), "rt") as f:
683            self.assertEqual(f.read(), uncompressed_str)
684
685    def test_bad_params(self):
686        # Test invalid parameter combinations.
687        with self.assertRaises(TypeError):
688            gzip.open(123.456)
689        with self.assertRaises(ValueError):
690            gzip.open(self.filename, "wbt")
691        with self.assertRaises(ValueError):
692            gzip.open(self.filename, "xbt")
693        with self.assertRaises(ValueError):
694            gzip.open(self.filename, "rb", encoding="utf-8")
695        with self.assertRaises(ValueError):
696            gzip.open(self.filename, "rb", errors="ignore")
697        with self.assertRaises(ValueError):
698            gzip.open(self.filename, "rb", newline="\n")
699
700    def test_encoding(self):
701        # Test non-default encoding.
702        uncompressed = data1.decode("ascii") * 50
703        uncompressed_raw = uncompressed.replace("\n", os.linesep)
704        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
705            f.write(uncompressed)
706        with open(self.filename, "rb") as f:
707            file_data = gzip.decompress(f.read()).decode("utf-16")
708            self.assertEqual(file_data, uncompressed_raw)
709        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
710            self.assertEqual(f.read(), uncompressed)
711
712    def test_encoding_error_handler(self):
713        # Test with non-default encoding error handler.
714        with gzip.open(self.filename, "wb") as f:
715            f.write(b"foo\xffbar")
716        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
717                as f:
718            self.assertEqual(f.read(), "foobar")
719
720    def test_newline(self):
721        # Test with explicit newline (universal newline mode disabled).
722        uncompressed = data1.decode("ascii") * 50
723        with gzip.open(self.filename, "wt", newline="\n") as f:
724            f.write(uncompressed)
725        with gzip.open(self.filename, "rt", newline="\r") as f:
726            self.assertEqual(f.readlines(), [uncompressed])
727
728
729def create_and_remove_directory(directory):
730    def decorator(function):
731        @functools.wraps(function)
732        def wrapper(*args, **kwargs):
733            os.makedirs(directory)
734            try:
735                return function(*args, **kwargs)
736            finally:
737                support.rmtree(directory)
738        return wrapper
739    return decorator
740
741
742class TestCommandLine(unittest.TestCase):
743    data = b'This is a simple test with gzip'
744
745    def test_decompress_stdin_stdout(self):
746        with io.BytesIO() as bytes_io:
747            with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
748                gzip_file.write(self.data)
749
750            args = sys.executable, '-m', 'gzip', '-d'
751            with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
752                out, err = proc.communicate(bytes_io.getvalue())
753
754        self.assertEqual(err, b'')
755        self.assertEqual(out, self.data)
756
757    @create_and_remove_directory(TEMPDIR)
758    def test_decompress_infile_outfile(self):
759        gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
760        self.assertFalse(os.path.exists(gzipname))
761
762        with gzip.open(gzipname, mode='wb') as fp:
763            fp.write(self.data)
764        rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
765
766        with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
767            self.assertEqual(gunziped.read(), self.data)
768
769        self.assertTrue(os.path.exists(gzipname))
770        self.assertEqual(rc, 0)
771        self.assertEqual(out, b'')
772        self.assertEqual(err, b'')
773
774    def test_decompress_infile_outfile_error(self):
775        rc, out, err = assert_python_ok('-m', 'gzip', '-d', 'thisisatest.out')
776        self.assertIn(b"filename doesn't end in .gz:", out)
777        self.assertEqual(rc, 0)
778        self.assertEqual(err, b'')
779
780    @create_and_remove_directory(TEMPDIR)
781    def test_compress_stdin_outfile(self):
782        args = sys.executable, '-m', 'gzip'
783        with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
784            out, err = proc.communicate(self.data)
785
786        self.assertEqual(err, b'')
787        self.assertEqual(out[:2], b"\x1f\x8b")
788
789    @create_and_remove_directory(TEMPDIR)
790    def test_compress_infile_outfile_default(self):
791        local_testgzip = os.path.join(TEMPDIR, 'testgzip')
792        gzipname = local_testgzip + '.gz'
793        self.assertFalse(os.path.exists(gzipname))
794
795        with open(local_testgzip, 'wb') as fp:
796            fp.write(self.data)
797
798        rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
799
800        self.assertTrue(os.path.exists(gzipname))
801        self.assertEqual(out, b'')
802        self.assertEqual(err, b'')
803
804    @create_and_remove_directory(TEMPDIR)
805    def test_compress_infile_outfile(self):
806        for compress_level in ('--fast', '--best'):
807            with self.subTest(compress_level=compress_level):
808                local_testgzip = os.path.join(TEMPDIR, 'testgzip')
809                gzipname = local_testgzip + '.gz'
810                self.assertFalse(os.path.exists(gzipname))
811
812                with open(local_testgzip, 'wb') as fp:
813                    fp.write(self.data)
814
815                rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
816
817                self.assertTrue(os.path.exists(gzipname))
818                self.assertEqual(out, b'')
819                self.assertEqual(err, b'')
820                os.remove(gzipname)
821                self.assertFalse(os.path.exists(gzipname))
822
823    def test_compress_fast_best_are_exclusive(self):
824        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
825        self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
826        self.assertEqual(out, b'')
827
828    def test_decompress_cannot_have_flags_compression(self):
829        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
830        self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
831        self.assertEqual(out, b'')
832
833
834def test_main(verbose=None):
835    support.run_unittest(TestGzip, TestOpen, TestCommandLine)
836
837
838if __name__ == "__main__":
839    test_main(verbose=True)
840