1"""Test script for the gzip module. 2""" 3 4import array 5import functools 6import io 7import os 8import pathlib 9import struct 10import sys 11import unittest 12from subprocess import PIPE, Popen 13from test.support import import_helper 14from test.support import os_helper 15from test.support import _4G, bigmemtest 16from test.support.script_helper import assert_python_ok, assert_python_failure 17 18gzip = import_helper.import_module('gzip') 19 20data1 = b""" int length=DEFAULTALLOC, err = Z_OK; 21 PyObject *RetVal; 22 int flushmode = Z_FINISH; 23 unsigned long start_total_out; 24 25""" 26 27data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */ 28/* See http://www.gzip.org/zlib/ 29/* See http://www.winimage.com/zLibDll for Windows */ 30""" 31 32 33TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir' 34 35 36class UnseekableIO(io.BytesIO): 37 def seekable(self): 38 return False 39 40 def tell(self): 41 raise io.UnsupportedOperation 42 43 def seek(self, *args): 44 raise io.UnsupportedOperation 45 46 47class BaseTest(unittest.TestCase): 48 filename = os_helper.TESTFN 49 50 def setUp(self): 51 os_helper.unlink(self.filename) 52 53 def tearDown(self): 54 os_helper.unlink(self.filename) 55 56 57class TestGzip(BaseTest): 58 def write_and_read_back(self, data, mode='b'): 59 b_data = bytes(data) 60 with gzip.GzipFile(self.filename, 'w'+mode) as f: 61 l = f.write(data) 62 self.assertEqual(l, len(b_data)) 63 with gzip.GzipFile(self.filename, 'r'+mode) as f: 64 self.assertEqual(f.read(), b_data) 65 66 def test_write(self): 67 with gzip.GzipFile(self.filename, 'wb') as f: 68 f.write(data1 * 50) 69 70 # Try flush and fileno. 71 f.flush() 72 f.fileno() 73 if hasattr(os, 'fsync'): 74 os.fsync(f.fileno()) 75 f.close() 76 77 # Test multiple close() calls. 78 f.close() 79 80 def test_write_read_with_pathlike_file(self): 81 filename = pathlib.Path(self.filename) 82 with gzip.GzipFile(filename, 'w') as f: 83 f.write(data1 * 50) 84 self.assertIsInstance(f.name, str) 85 with gzip.GzipFile(filename, 'a') as f: 86 f.write(data1) 87 with gzip.GzipFile(filename) as f: 88 d = f.read() 89 self.assertEqual(d, data1 * 51) 90 self.assertIsInstance(f.name, str) 91 92 # The following test_write_xy methods test that write accepts 93 # the corresponding bytes-like object type as input 94 # and that the data written equals bytes(xy) in all cases. 95 def test_write_memoryview(self): 96 self.write_and_read_back(memoryview(data1 * 50)) 97 m = memoryview(bytes(range(256))) 98 data = m.cast('B', shape=[8,8,4]) 99 self.write_and_read_back(data) 100 101 def test_write_bytearray(self): 102 self.write_and_read_back(bytearray(data1 * 50)) 103 104 def test_write_array(self): 105 self.write_and_read_back(array.array('I', data1 * 40)) 106 107 def test_write_incompatible_type(self): 108 # Test that non-bytes-like types raise TypeError. 109 # Issue #21560: attempts to write incompatible types 110 # should not affect the state of the fileobject 111 with gzip.GzipFile(self.filename, 'wb') as f: 112 with self.assertRaises(TypeError): 113 f.write('') 114 with self.assertRaises(TypeError): 115 f.write([]) 116 f.write(data1) 117 with gzip.GzipFile(self.filename, 'rb') as f: 118 self.assertEqual(f.read(), data1) 119 120 def test_read(self): 121 self.test_write() 122 # Try reading. 123 with gzip.GzipFile(self.filename, 'r') as f: 124 d = f.read() 125 self.assertEqual(d, data1*50) 126 127 def test_read1(self): 128 self.test_write() 129 blocks = [] 130 nread = 0 131 with gzip.GzipFile(self.filename, 'r') as f: 132 while True: 133 d = f.read1() 134 if not d: 135 break 136 blocks.append(d) 137 nread += len(d) 138 # Check that position was updated correctly (see issue10791). 139 self.assertEqual(f.tell(), nread) 140 self.assertEqual(b''.join(blocks), data1 * 50) 141 142 @bigmemtest(size=_4G, memuse=1) 143 def test_read_large(self, size): 144 # Read chunk size over UINT_MAX should be supported, despite zlib's 145 # limitation per low-level call 146 compressed = gzip.compress(data1, compresslevel=1) 147 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb') 148 self.assertEqual(f.read(size), data1) 149 150 def test_io_on_closed_object(self): 151 # Test that I/O operations on closed GzipFile objects raise a 152 # ValueError, just like the corresponding functions on file objects. 153 154 # Write to a file, open it for reading, then close it. 155 self.test_write() 156 f = gzip.GzipFile(self.filename, 'r') 157 fileobj = f.fileobj 158 self.assertFalse(fileobj.closed) 159 f.close() 160 self.assertTrue(fileobj.closed) 161 with self.assertRaises(ValueError): 162 f.read(1) 163 with self.assertRaises(ValueError): 164 f.seek(0) 165 with self.assertRaises(ValueError): 166 f.tell() 167 # Open the file for writing, then close it. 168 f = gzip.GzipFile(self.filename, 'w') 169 fileobj = f.fileobj 170 self.assertFalse(fileobj.closed) 171 f.close() 172 self.assertTrue(fileobj.closed) 173 with self.assertRaises(ValueError): 174 f.write(b'') 175 with self.assertRaises(ValueError): 176 f.flush() 177 178 def test_append(self): 179 self.test_write() 180 # Append to the previous file 181 with gzip.GzipFile(self.filename, 'ab') as f: 182 f.write(data2 * 15) 183 184 with gzip.GzipFile(self.filename, 'rb') as f: 185 d = f.read() 186 self.assertEqual(d, (data1*50) + (data2*15)) 187 188 def test_many_append(self): 189 # Bug #1074261 was triggered when reading a file that contained 190 # many, many members. Create such a file and verify that reading it 191 # works. 192 with gzip.GzipFile(self.filename, 'wb', 9) as f: 193 f.write(b'a') 194 for i in range(0, 200): 195 with gzip.GzipFile(self.filename, "ab", 9) as f: # append 196 f.write(b'a') 197 198 # Try reading the file 199 with gzip.GzipFile(self.filename, "rb") as zgfile: 200 contents = b"" 201 while 1: 202 ztxt = zgfile.read(8192) 203 contents += ztxt 204 if not ztxt: break 205 self.assertEqual(contents, b'a'*201) 206 207 def test_exclusive_write(self): 208 with gzip.GzipFile(self.filename, 'xb') as f: 209 f.write(data1 * 50) 210 with gzip.GzipFile(self.filename, 'rb') as f: 211 self.assertEqual(f.read(), data1 * 50) 212 with self.assertRaises(FileExistsError): 213 gzip.GzipFile(self.filename, 'xb') 214 215 def test_buffered_reader(self): 216 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for 217 # performance. 218 self.test_write() 219 220 with gzip.GzipFile(self.filename, 'rb') as f: 221 with io.BufferedReader(f) as r: 222 lines = [line for line in r] 223 224 self.assertEqual(lines, 50 * data1.splitlines(keepends=True)) 225 226 def test_readline(self): 227 self.test_write() 228 # Try .readline() with varying line lengths 229 230 with gzip.GzipFile(self.filename, 'rb') as f: 231 line_length = 0 232 while 1: 233 L = f.readline(line_length) 234 if not L and line_length != 0: break 235 self.assertTrue(len(L) <= line_length) 236 line_length = (line_length + 1) % 50 237 238 def test_readlines(self): 239 self.test_write() 240 # Try .readlines() 241 242 with gzip.GzipFile(self.filename, 'rb') as f: 243 L = f.readlines() 244 245 with gzip.GzipFile(self.filename, 'rb') as f: 246 while 1: 247 L = f.readlines(150) 248 if L == []: break 249 250 def test_seek_read(self): 251 self.test_write() 252 # Try seek, read test 253 254 with gzip.GzipFile(self.filename) as f: 255 while 1: 256 oldpos = f.tell() 257 line1 = f.readline() 258 if not line1: break 259 newpos = f.tell() 260 f.seek(oldpos) # negative seek 261 if len(line1)>10: 262 amount = 10 263 else: 264 amount = len(line1) 265 line2 = f.read(amount) 266 self.assertEqual(line1[:amount], line2) 267 f.seek(newpos) # positive seek 268 269 def test_seek_whence(self): 270 self.test_write() 271 # Try seek(whence=1), read test 272 273 with gzip.GzipFile(self.filename) as f: 274 f.read(10) 275 f.seek(10, whence=1) 276 y = f.read(10) 277 self.assertEqual(y, data1[20:30]) 278 279 def test_seek_write(self): 280 # Try seek, write test 281 with gzip.GzipFile(self.filename, 'w') as f: 282 for pos in range(0, 256, 16): 283 f.seek(pos) 284 f.write(b'GZ\n') 285 286 def test_mode(self): 287 self.test_write() 288 with gzip.GzipFile(self.filename, 'r') as f: 289 self.assertEqual(f.myfileobj.mode, 'rb') 290 os_helper.unlink(self.filename) 291 with gzip.GzipFile(self.filename, 'x') as f: 292 self.assertEqual(f.myfileobj.mode, 'xb') 293 294 def test_1647484(self): 295 for mode in ('wb', 'rb'): 296 with gzip.GzipFile(self.filename, mode) as f: 297 self.assertTrue(hasattr(f, "name")) 298 self.assertEqual(f.name, self.filename) 299 300 def test_paddedfile_getattr(self): 301 self.test_write() 302 with gzip.GzipFile(self.filename, 'rb') as f: 303 self.assertTrue(hasattr(f.fileobj, "name")) 304 self.assertEqual(f.fileobj.name, self.filename) 305 306 def test_mtime(self): 307 mtime = 123456789 308 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 309 fWrite.write(data1) 310 with gzip.GzipFile(self.filename) as fRead: 311 self.assertTrue(hasattr(fRead, 'mtime')) 312 self.assertIsNone(fRead.mtime) 313 dataRead = fRead.read() 314 self.assertEqual(dataRead, data1) 315 self.assertEqual(fRead.mtime, mtime) 316 317 def test_metadata(self): 318 mtime = 123456789 319 320 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 321 fWrite.write(data1) 322 323 with open(self.filename, 'rb') as fRead: 324 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html 325 326 idBytes = fRead.read(2) 327 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID 328 329 cmByte = fRead.read(1) 330 self.assertEqual(cmByte, b'\x08') # deflate 331 332 try: 333 expectedname = self.filename.encode('Latin-1') + b'\x00' 334 expectedflags = b'\x08' # only the FNAME flag is set 335 except UnicodeEncodeError: 336 expectedname = b'' 337 expectedflags = b'\x00' 338 339 flagsByte = fRead.read(1) 340 self.assertEqual(flagsByte, expectedflags) 341 342 mtimeBytes = fRead.read(4) 343 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian 344 345 xflByte = fRead.read(1) 346 self.assertEqual(xflByte, b'\x02') # maximum compression 347 348 osByte = fRead.read(1) 349 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent) 350 351 # Since the FNAME flag is set, the zero-terminated filename follows. 352 # RFC 1952 specifies that this is the name of the input file, if any. 353 # However, the gzip module defaults to storing the name of the output 354 # file in this field. 355 nameBytes = fRead.read(len(expectedname)) 356 self.assertEqual(nameBytes, expectedname) 357 358 # Since no other flags were set, the header ends here. 359 # Rather than process the compressed data, let's seek to the trailer. 360 fRead.seek(os.stat(self.filename).st_size - 8) 361 362 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1] 363 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83') 364 365 isizeBytes = fRead.read(4) 366 self.assertEqual(isizeBytes, struct.pack('<i', len(data1))) 367 368 def test_metadata_ascii_name(self): 369 self.filename = os_helper.TESTFN_ASCII 370 self.test_metadata() 371 372 def test_compresslevel_metadata(self): 373 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html 374 # specifically, discussion of XFL in section 2.3.1 375 cases = [ 376 ('fast', 1, b'\x04'), 377 ('best', 9, b'\x02'), 378 ('tradeoff', 6, b'\x00'), 379 ] 380 xflOffset = 8 381 382 for (name, level, expectedXflByte) in cases: 383 with self.subTest(name): 384 fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level) 385 with fWrite: 386 fWrite.write(data1) 387 with open(self.filename, 'rb') as fRead: 388 fRead.seek(xflOffset) 389 xflByte = fRead.read(1) 390 self.assertEqual(xflByte, expectedXflByte) 391 392 def test_with_open(self): 393 # GzipFile supports the context management protocol 394 with gzip.GzipFile(self.filename, "wb") as f: 395 f.write(b"xxx") 396 f = gzip.GzipFile(self.filename, "rb") 397 f.close() 398 try: 399 with f: 400 pass 401 except ValueError: 402 pass 403 else: 404 self.fail("__enter__ on a closed file didn't raise an exception") 405 try: 406 with gzip.GzipFile(self.filename, "wb") as f: 407 1/0 408 except ZeroDivisionError: 409 pass 410 else: 411 self.fail("1/0 didn't raise an exception") 412 413 def test_zero_padded_file(self): 414 with gzip.GzipFile(self.filename, "wb") as f: 415 f.write(data1 * 50) 416 417 # Pad the file with zeroes 418 with open(self.filename, "ab") as f: 419 f.write(b"\x00" * 50) 420 421 with gzip.GzipFile(self.filename, "rb") as f: 422 d = f.read() 423 self.assertEqual(d, data1 * 50, "Incorrect data in file") 424 425 def test_gzip_BadGzipFile_exception(self): 426 self.assertTrue(issubclass(gzip.BadGzipFile, OSError)) 427 428 def test_bad_gzip_file(self): 429 with open(self.filename, 'wb') as file: 430 file.write(data1 * 50) 431 with gzip.GzipFile(self.filename, 'r') as file: 432 self.assertRaises(gzip.BadGzipFile, file.readlines) 433 434 def test_non_seekable_file(self): 435 uncompressed = data1 * 50 436 buf = UnseekableIO() 437 with gzip.GzipFile(fileobj=buf, mode="wb") as f: 438 f.write(uncompressed) 439 compressed = buf.getvalue() 440 buf = UnseekableIO(compressed) 441 with gzip.GzipFile(fileobj=buf, mode="rb") as f: 442 self.assertEqual(f.read(), uncompressed) 443 444 def test_peek(self): 445 uncompressed = data1 * 200 446 with gzip.GzipFile(self.filename, "wb") as f: 447 f.write(uncompressed) 448 449 def sizes(): 450 while True: 451 for n in range(5, 50, 10): 452 yield n 453 454 with gzip.GzipFile(self.filename, "rb") as f: 455 f.max_read_chunk = 33 456 nread = 0 457 for n in sizes(): 458 s = f.peek(n) 459 if s == b'': 460 break 461 self.assertEqual(f.read(len(s)), s) 462 nread += len(s) 463 self.assertEqual(f.read(100), b'') 464 self.assertEqual(nread, len(uncompressed)) 465 466 def test_textio_readlines(self): 467 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile. 468 lines = (data1 * 50).decode("ascii").splitlines(keepends=True) 469 self.test_write() 470 with gzip.GzipFile(self.filename, 'r') as f: 471 with io.TextIOWrapper(f, encoding="ascii") as t: 472 self.assertEqual(t.readlines(), lines) 473 474 def test_fileobj_from_fdopen(self): 475 # Issue #13781: Opening a GzipFile for writing fails when using a 476 # fileobj created with os.fdopen(). 477 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT) 478 with os.fdopen(fd, "wb") as f: 479 with gzip.GzipFile(fileobj=f, mode="w") as g: 480 pass 481 482 def test_fileobj_mode(self): 483 gzip.GzipFile(self.filename, "wb").close() 484 with open(self.filename, "r+b") as f: 485 with gzip.GzipFile(fileobj=f, mode='r') as g: 486 self.assertEqual(g.mode, gzip.READ) 487 with gzip.GzipFile(fileobj=f, mode='w') as g: 488 self.assertEqual(g.mode, gzip.WRITE) 489 with gzip.GzipFile(fileobj=f, mode='a') as g: 490 self.assertEqual(g.mode, gzip.WRITE) 491 with gzip.GzipFile(fileobj=f, mode='x') as g: 492 self.assertEqual(g.mode, gzip.WRITE) 493 with self.assertRaises(ValueError): 494 gzip.GzipFile(fileobj=f, mode='z') 495 for mode in "rb", "r+b": 496 with open(self.filename, mode) as f: 497 with gzip.GzipFile(fileobj=f) as g: 498 self.assertEqual(g.mode, gzip.READ) 499 for mode in "wb", "ab", "xb": 500 if "x" in mode: 501 os_helper.unlink(self.filename) 502 with open(self.filename, mode) as f: 503 with self.assertWarns(FutureWarning): 504 g = gzip.GzipFile(fileobj=f) 505 with g: 506 self.assertEqual(g.mode, gzip.WRITE) 507 508 def test_bytes_filename(self): 509 str_filename = self.filename 510 try: 511 bytes_filename = str_filename.encode("ascii") 512 except UnicodeEncodeError: 513 self.skipTest("Temporary file name needs to be ASCII") 514 with gzip.GzipFile(bytes_filename, "wb") as f: 515 f.write(data1 * 50) 516 with gzip.GzipFile(bytes_filename, "rb") as f: 517 self.assertEqual(f.read(), data1 * 50) 518 # Sanity check that we are actually operating on the right file. 519 with gzip.GzipFile(str_filename, "rb") as f: 520 self.assertEqual(f.read(), data1 * 50) 521 522 def test_decompress_limited(self): 523 """Decompressed data buffering should be limited""" 524 bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9) 525 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE) 526 527 bomb = io.BytesIO(bomb) 528 decomp = gzip.GzipFile(fileobj=bomb) 529 self.assertEqual(decomp.read(1), b'\0') 530 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE 531 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 532 "Excessive amount of data was decompressed") 533 534 # Testing compress/decompress shortcut functions 535 536 def test_compress(self): 537 for data in [data1, data2]: 538 for args in [(), (1,), (6,), (9,)]: 539 datac = gzip.compress(data, *args) 540 self.assertEqual(type(datac), bytes) 541 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f: 542 self.assertEqual(f.read(), data) 543 544 def test_compress_mtime(self): 545 mtime = 123456789 546 for data in [data1, data2]: 547 for args in [(), (1,), (6,), (9,)]: 548 with self.subTest(data=data, args=args): 549 datac = gzip.compress(data, *args, mtime=mtime) 550 self.assertEqual(type(datac), bytes) 551 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f: 552 f.read(1) # to set mtime attribute 553 self.assertEqual(f.mtime, mtime) 554 555 def test_decompress(self): 556 for data in (data1, data2): 557 buf = io.BytesIO() 558 with gzip.GzipFile(fileobj=buf, mode="wb") as f: 559 f.write(data) 560 self.assertEqual(gzip.decompress(buf.getvalue()), data) 561 # Roundtrip with compress 562 datac = gzip.compress(data) 563 self.assertEqual(gzip.decompress(datac), data) 564 565 def test_read_truncated(self): 566 data = data1*50 567 # Drop the CRC (4 bytes) and file size (4 bytes). 568 truncated = gzip.compress(data)[:-8] 569 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: 570 self.assertRaises(EOFError, f.read) 571 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: 572 self.assertEqual(f.read(len(data)), data) 573 self.assertRaises(EOFError, f.read, 1) 574 # Incomplete 10-byte header. 575 for i in range(2, 10): 576 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f: 577 self.assertRaises(EOFError, f.read, 1) 578 579 def test_read_with_extra(self): 580 # Gzip data with an extra field 581 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff' 582 b'\x05\x00Extra' 583 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00') 584 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f: 585 self.assertEqual(f.read(), b'Test') 586 587 def test_prepend_error(self): 588 # See issue #20875 589 with gzip.open(self.filename, "wb") as f: 590 f.write(data1) 591 with gzip.open(self.filename, "rb") as f: 592 f._buffer.raw._fp.prepend() 593 594 def test_issue44439(self): 595 q = array.array('Q', [1, 2, 3, 4, 5]) 596 LENGTH = len(q) * q.itemsize 597 598 with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f: 599 self.assertEqual(f.write(q), LENGTH) 600 self.assertEqual(f.tell(), LENGTH) 601 602 603class TestOpen(BaseTest): 604 def test_binary_modes(self): 605 uncompressed = data1 * 50 606 607 with gzip.open(self.filename, "wb") as f: 608 f.write(uncompressed) 609 with open(self.filename, "rb") as f: 610 file_data = gzip.decompress(f.read()) 611 self.assertEqual(file_data, uncompressed) 612 613 with gzip.open(self.filename, "rb") as f: 614 self.assertEqual(f.read(), uncompressed) 615 616 with gzip.open(self.filename, "ab") as f: 617 f.write(uncompressed) 618 with open(self.filename, "rb") as f: 619 file_data = gzip.decompress(f.read()) 620 self.assertEqual(file_data, uncompressed * 2) 621 622 with self.assertRaises(FileExistsError): 623 gzip.open(self.filename, "xb") 624 os_helper.unlink(self.filename) 625 with gzip.open(self.filename, "xb") as f: 626 f.write(uncompressed) 627 with open(self.filename, "rb") as f: 628 file_data = gzip.decompress(f.read()) 629 self.assertEqual(file_data, uncompressed) 630 631 def test_pathlike_file(self): 632 filename = pathlib.Path(self.filename) 633 with gzip.open(filename, "wb") as f: 634 f.write(data1 * 50) 635 with gzip.open(filename, "ab") as f: 636 f.write(data1) 637 with gzip.open(filename) as f: 638 self.assertEqual(f.read(), data1 * 51) 639 640 def test_implicit_binary_modes(self): 641 # Test implicit binary modes (no "b" or "t" in mode string). 642 uncompressed = data1 * 50 643 644 with gzip.open(self.filename, "w") as f: 645 f.write(uncompressed) 646 with open(self.filename, "rb") as f: 647 file_data = gzip.decompress(f.read()) 648 self.assertEqual(file_data, uncompressed) 649 650 with gzip.open(self.filename, "r") as f: 651 self.assertEqual(f.read(), uncompressed) 652 653 with gzip.open(self.filename, "a") as f: 654 f.write(uncompressed) 655 with open(self.filename, "rb") as f: 656 file_data = gzip.decompress(f.read()) 657 self.assertEqual(file_data, uncompressed * 2) 658 659 with self.assertRaises(FileExistsError): 660 gzip.open(self.filename, "x") 661 os_helper.unlink(self.filename) 662 with gzip.open(self.filename, "x") as f: 663 f.write(uncompressed) 664 with open(self.filename, "rb") as f: 665 file_data = gzip.decompress(f.read()) 666 self.assertEqual(file_data, uncompressed) 667 668 def test_text_modes(self): 669 uncompressed = data1.decode("ascii") * 50 670 uncompressed_raw = uncompressed.replace("\n", os.linesep) 671 with gzip.open(self.filename, "wt", encoding="ascii") as f: 672 f.write(uncompressed) 673 with open(self.filename, "rb") as f: 674 file_data = gzip.decompress(f.read()).decode("ascii") 675 self.assertEqual(file_data, uncompressed_raw) 676 with gzip.open(self.filename, "rt", encoding="ascii") as f: 677 self.assertEqual(f.read(), uncompressed) 678 with gzip.open(self.filename, "at", encoding="ascii") as f: 679 f.write(uncompressed) 680 with open(self.filename, "rb") as f: 681 file_data = gzip.decompress(f.read()).decode("ascii") 682 self.assertEqual(file_data, uncompressed_raw * 2) 683 684 def test_fileobj(self): 685 uncompressed_bytes = data1 * 50 686 uncompressed_str = uncompressed_bytes.decode("ascii") 687 compressed = gzip.compress(uncompressed_bytes) 688 with gzip.open(io.BytesIO(compressed), "r") as f: 689 self.assertEqual(f.read(), uncompressed_bytes) 690 with gzip.open(io.BytesIO(compressed), "rb") as f: 691 self.assertEqual(f.read(), uncompressed_bytes) 692 with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f: 693 self.assertEqual(f.read(), uncompressed_str) 694 695 def test_bad_params(self): 696 # Test invalid parameter combinations. 697 with self.assertRaises(TypeError): 698 gzip.open(123.456) 699 with self.assertRaises(ValueError): 700 gzip.open(self.filename, "wbt") 701 with self.assertRaises(ValueError): 702 gzip.open(self.filename, "xbt") 703 with self.assertRaises(ValueError): 704 gzip.open(self.filename, "rb", encoding="utf-8") 705 with self.assertRaises(ValueError): 706 gzip.open(self.filename, "rb", errors="ignore") 707 with self.assertRaises(ValueError): 708 gzip.open(self.filename, "rb", newline="\n") 709 710 def test_encoding(self): 711 # Test non-default encoding. 712 uncompressed = data1.decode("ascii") * 50 713 uncompressed_raw = uncompressed.replace("\n", os.linesep) 714 with gzip.open(self.filename, "wt", encoding="utf-16") as f: 715 f.write(uncompressed) 716 with open(self.filename, "rb") as f: 717 file_data = gzip.decompress(f.read()).decode("utf-16") 718 self.assertEqual(file_data, uncompressed_raw) 719 with gzip.open(self.filename, "rt", encoding="utf-16") as f: 720 self.assertEqual(f.read(), uncompressed) 721 722 def test_encoding_error_handler(self): 723 # Test with non-default encoding error handler. 724 with gzip.open(self.filename, "wb") as f: 725 f.write(b"foo\xffbar") 726 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 727 as f: 728 self.assertEqual(f.read(), "foobar") 729 730 def test_newline(self): 731 # Test with explicit newline (universal newline mode disabled). 732 uncompressed = data1.decode("ascii") * 50 733 with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f: 734 f.write(uncompressed) 735 with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f: 736 self.assertEqual(f.readlines(), [uncompressed]) 737 738 739def create_and_remove_directory(directory): 740 def decorator(function): 741 @functools.wraps(function) 742 def wrapper(*args, **kwargs): 743 os.makedirs(directory) 744 try: 745 return function(*args, **kwargs) 746 finally: 747 os_helper.rmtree(directory) 748 return wrapper 749 return decorator 750 751 752class TestCommandLine(unittest.TestCase): 753 data = b'This is a simple test with gzip' 754 755 def test_decompress_stdin_stdout(self): 756 with io.BytesIO() as bytes_io: 757 with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file: 758 gzip_file.write(self.data) 759 760 args = sys.executable, '-m', 'gzip', '-d' 761 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: 762 out, err = proc.communicate(bytes_io.getvalue()) 763 764 self.assertEqual(err, b'') 765 self.assertEqual(out, self.data) 766 767 @create_and_remove_directory(TEMPDIR) 768 def test_decompress_infile_outfile(self): 769 gzipname = os.path.join(TEMPDIR, 'testgzip.gz') 770 self.assertFalse(os.path.exists(gzipname)) 771 772 with gzip.open(gzipname, mode='wb') as fp: 773 fp.write(self.data) 774 rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname) 775 776 with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped: 777 self.assertEqual(gunziped.read(), self.data) 778 779 self.assertTrue(os.path.exists(gzipname)) 780 self.assertEqual(rc, 0) 781 self.assertEqual(out, b'') 782 self.assertEqual(err, b'') 783 784 def test_decompress_infile_outfile_error(self): 785 rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out') 786 self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip()) 787 self.assertEqual(rc, 1) 788 self.assertEqual(out, b'') 789 790 @create_and_remove_directory(TEMPDIR) 791 def test_compress_stdin_outfile(self): 792 args = sys.executable, '-m', 'gzip' 793 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: 794 out, err = proc.communicate(self.data) 795 796 self.assertEqual(err, b'') 797 self.assertEqual(out[:2], b"\x1f\x8b") 798 799 @create_and_remove_directory(TEMPDIR) 800 def test_compress_infile_outfile_default(self): 801 local_testgzip = os.path.join(TEMPDIR, 'testgzip') 802 gzipname = local_testgzip + '.gz' 803 self.assertFalse(os.path.exists(gzipname)) 804 805 with open(local_testgzip, 'wb') as fp: 806 fp.write(self.data) 807 808 rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip) 809 810 self.assertTrue(os.path.exists(gzipname)) 811 self.assertEqual(out, b'') 812 self.assertEqual(err, b'') 813 814 @create_and_remove_directory(TEMPDIR) 815 def test_compress_infile_outfile(self): 816 for compress_level in ('--fast', '--best'): 817 with self.subTest(compress_level=compress_level): 818 local_testgzip = os.path.join(TEMPDIR, 'testgzip') 819 gzipname = local_testgzip + '.gz' 820 self.assertFalse(os.path.exists(gzipname)) 821 822 with open(local_testgzip, 'wb') as fp: 823 fp.write(self.data) 824 825 rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip) 826 827 self.assertTrue(os.path.exists(gzipname)) 828 self.assertEqual(out, b'') 829 self.assertEqual(err, b'') 830 os.remove(gzipname) 831 self.assertFalse(os.path.exists(gzipname)) 832 833 def test_compress_fast_best_are_exclusive(self): 834 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best') 835 self.assertIn(b"error: argument --best: not allowed with argument --fast", err) 836 self.assertEqual(out, b'') 837 838 def test_decompress_cannot_have_flags_compression(self): 839 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d') 840 self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err) 841 self.assertEqual(out, b'') 842 843 844if __name__ == "__main__": 845 unittest.main() 846