1"""Test script for the gzip module. 2""" 3 4import array 5import functools 6import io 7import os 8import struct 9import sys 10import unittest 11from subprocess import PIPE, Popen 12from test.support import import_helper 13from test.support import os_helper 14from test.support import _4G, bigmemtest, requires_subprocess 15from test.support.script_helper import assert_python_ok, assert_python_failure 16 17gzip = import_helper.import_module('gzip') 18zlib = import_helper.import_module('zlib') 19 20data1 = b""" int length=DEFAULTALLOC, err = Z_OK; 21 PyObject *RetVal; 22 int flushmode = Z_FINISH; 23 unsigned long start_total_out; 24 25""" 26 27data2 = b"""/* zlibmodule.c -- gzip-compatible data compression */ 28/* See http://www.gzip.org/zlib/ 29/* See http://www.winimage.com/zLibDll for Windows */ 30""" 31 32 33TEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir' 34 35 36class UnseekableIO(io.BytesIO): 37 def seekable(self): 38 return False 39 40 def tell(self): 41 raise io.UnsupportedOperation 42 43 def seek(self, *args): 44 raise io.UnsupportedOperation 45 46 47class BaseTest(unittest.TestCase): 48 filename = os_helper.TESTFN 49 50 def setUp(self): 51 os_helper.unlink(self.filename) 52 53 def tearDown(self): 54 os_helper.unlink(self.filename) 55 56 57class TestGzip(BaseTest): 58 def write_and_read_back(self, data, mode='b'): 59 b_data = bytes(data) 60 with gzip.GzipFile(self.filename, 'w'+mode) as f: 61 l = f.write(data) 62 self.assertEqual(l, len(b_data)) 63 with gzip.GzipFile(self.filename, 'r'+mode) as f: 64 self.assertEqual(f.read(), b_data) 65 66 def test_write(self): 67 with gzip.GzipFile(self.filename, 'wb') as f: 68 f.write(data1 * 50) 69 70 # Try flush and fileno. 71 f.flush() 72 f.fileno() 73 if hasattr(os, 'fsync'): 74 os.fsync(f.fileno()) 75 f.close() 76 77 # Test multiple close() calls. 78 f.close() 79 80 def test_write_read_with_pathlike_file(self): 81 filename = os_helper.FakePath(self.filename) 82 with gzip.GzipFile(filename, 'w') as f: 83 f.write(data1 * 50) 84 self.assertIsInstance(f.name, str) 85 self.assertEqual(f.name, self.filename) 86 with gzip.GzipFile(filename, 'a') as f: 87 f.write(data1) 88 with gzip.GzipFile(filename) as f: 89 d = f.read() 90 self.assertEqual(d, data1 * 51) 91 self.assertIsInstance(f.name, str) 92 self.assertEqual(f.name, self.filename) 93 94 # The following test_write_xy methods test that write accepts 95 # the corresponding bytes-like object type as input 96 # and that the data written equals bytes(xy) in all cases. 97 def test_write_memoryview(self): 98 self.write_and_read_back(memoryview(data1 * 50)) 99 m = memoryview(bytes(range(256))) 100 data = m.cast('B', shape=[8,8,4]) 101 self.write_and_read_back(data) 102 103 def test_write_bytearray(self): 104 self.write_and_read_back(bytearray(data1 * 50)) 105 106 def test_write_array(self): 107 self.write_and_read_back(array.array('I', data1 * 40)) 108 109 def test_write_incompatible_type(self): 110 # Test that non-bytes-like types raise TypeError. 111 # Issue #21560: attempts to write incompatible types 112 # should not affect the state of the fileobject 113 with gzip.GzipFile(self.filename, 'wb') as f: 114 with self.assertRaises(TypeError): 115 f.write('') 116 with self.assertRaises(TypeError): 117 f.write([]) 118 f.write(data1) 119 with gzip.GzipFile(self.filename, 'rb') as f: 120 self.assertEqual(f.read(), data1) 121 122 def test_read(self): 123 self.test_write() 124 # Try reading. 125 with gzip.GzipFile(self.filename, 'r') as f: 126 d = f.read() 127 self.assertEqual(d, data1*50) 128 129 def test_read1(self): 130 self.test_write() 131 blocks = [] 132 nread = 0 133 with gzip.GzipFile(self.filename, 'r') as f: 134 while True: 135 d = f.read1() 136 if not d: 137 break 138 blocks.append(d) 139 nread += len(d) 140 # Check that position was updated correctly (see issue10791). 141 self.assertEqual(f.tell(), nread) 142 self.assertEqual(b''.join(blocks), data1 * 50) 143 144 @bigmemtest(size=_4G, memuse=1) 145 def test_read_large(self, size): 146 # Read chunk size over UINT_MAX should be supported, despite zlib's 147 # limitation per low-level call 148 compressed = gzip.compress(data1, compresslevel=1) 149 f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb') 150 self.assertEqual(f.read(size), data1) 151 152 def test_io_on_closed_object(self): 153 # Test that I/O operations on closed GzipFile objects raise a 154 # ValueError, just like the corresponding functions on file objects. 155 156 # Write to a file, open it for reading, then close it. 157 self.test_write() 158 f = gzip.GzipFile(self.filename, 'r') 159 fileobj = f.fileobj 160 self.assertFalse(fileobj.closed) 161 f.close() 162 self.assertTrue(fileobj.closed) 163 with self.assertRaises(ValueError): 164 f.read(1) 165 with self.assertRaises(ValueError): 166 f.seek(0) 167 with self.assertRaises(ValueError): 168 f.tell() 169 # Open the file for writing, then close it. 170 f = gzip.GzipFile(self.filename, 'w') 171 fileobj = f.fileobj 172 self.assertFalse(fileobj.closed) 173 f.close() 174 self.assertTrue(fileobj.closed) 175 with self.assertRaises(ValueError): 176 f.write(b'') 177 with self.assertRaises(ValueError): 178 f.flush() 179 180 def test_append(self): 181 self.test_write() 182 # Append to the previous file 183 with gzip.GzipFile(self.filename, 'ab') as f: 184 f.write(data2 * 15) 185 186 with gzip.GzipFile(self.filename, 'rb') as f: 187 d = f.read() 188 self.assertEqual(d, (data1*50) + (data2*15)) 189 190 def test_many_append(self): 191 # Bug #1074261 was triggered when reading a file that contained 192 # many, many members. Create such a file and verify that reading it 193 # works. 194 with gzip.GzipFile(self.filename, 'wb', 9) as f: 195 f.write(b'a') 196 for i in range(0, 200): 197 with gzip.GzipFile(self.filename, "ab", 9) as f: # append 198 f.write(b'a') 199 200 # Try reading the file 201 with gzip.GzipFile(self.filename, "rb") as zgfile: 202 contents = b"" 203 while 1: 204 ztxt = zgfile.read(8192) 205 contents += ztxt 206 if not ztxt: break 207 self.assertEqual(contents, b'a'*201) 208 209 def test_exclusive_write(self): 210 with gzip.GzipFile(self.filename, 'xb') as f: 211 f.write(data1 * 50) 212 with gzip.GzipFile(self.filename, 'rb') as f: 213 self.assertEqual(f.read(), data1 * 50) 214 with self.assertRaises(FileExistsError): 215 gzip.GzipFile(self.filename, 'xb') 216 217 def test_buffered_reader(self): 218 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for 219 # performance. 220 self.test_write() 221 222 with gzip.GzipFile(self.filename, 'rb') as f: 223 with io.BufferedReader(f) as r: 224 lines = [line for line in r] 225 226 self.assertEqual(lines, 50 * data1.splitlines(keepends=True)) 227 228 def test_readline(self): 229 self.test_write() 230 # Try .readline() with varying line lengths 231 232 with gzip.GzipFile(self.filename, 'rb') as f: 233 line_length = 0 234 while 1: 235 L = f.readline(line_length) 236 if not L and line_length != 0: break 237 self.assertTrue(len(L) <= line_length) 238 line_length = (line_length + 1) % 50 239 240 def test_readlines(self): 241 self.test_write() 242 # Try .readlines() 243 244 with gzip.GzipFile(self.filename, 'rb') as f: 245 L = f.readlines() 246 247 with gzip.GzipFile(self.filename, 'rb') as f: 248 while 1: 249 L = f.readlines(150) 250 if L == []: break 251 252 def test_seek_read(self): 253 self.test_write() 254 # Try seek, read test 255 256 with gzip.GzipFile(self.filename) as f: 257 while 1: 258 oldpos = f.tell() 259 line1 = f.readline() 260 if not line1: break 261 newpos = f.tell() 262 f.seek(oldpos) # negative seek 263 if len(line1)>10: 264 amount = 10 265 else: 266 amount = len(line1) 267 line2 = f.read(amount) 268 self.assertEqual(line1[:amount], line2) 269 f.seek(newpos) # positive seek 270 271 def test_seek_whence(self): 272 self.test_write() 273 # Try seek(whence=1), read test 274 275 with gzip.GzipFile(self.filename) as f: 276 f.read(10) 277 f.seek(10, whence=1) 278 y = f.read(10) 279 self.assertEqual(y, data1[20:30]) 280 281 def test_seek_write(self): 282 # Try seek, write test 283 with gzip.GzipFile(self.filename, 'w') as f: 284 for pos in range(0, 256, 16): 285 f.seek(pos) 286 f.write(b'GZ\n') 287 288 def test_mode(self): 289 self.test_write() 290 with gzip.GzipFile(self.filename, 'r') as f: 291 self.assertEqual(f.myfileobj.mode, 'rb') 292 os_helper.unlink(self.filename) 293 with gzip.GzipFile(self.filename, 'x') as f: 294 self.assertEqual(f.myfileobj.mode, 'xb') 295 296 def test_1647484(self): 297 for mode in ('wb', 'rb'): 298 with gzip.GzipFile(self.filename, mode) as f: 299 self.assertTrue(hasattr(f, "name")) 300 self.assertEqual(f.name, self.filename) 301 302 def test_paddedfile_getattr(self): 303 self.test_write() 304 with gzip.GzipFile(self.filename, 'rb') as f: 305 self.assertTrue(hasattr(f.fileobj, "name")) 306 self.assertEqual(f.fileobj.name, self.filename) 307 308 def test_mtime(self): 309 mtime = 123456789 310 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 311 fWrite.write(data1) 312 with gzip.GzipFile(self.filename) as fRead: 313 self.assertTrue(hasattr(fRead, 'mtime')) 314 self.assertIsNone(fRead.mtime) 315 dataRead = fRead.read() 316 self.assertEqual(dataRead, data1) 317 self.assertEqual(fRead.mtime, mtime) 318 319 def test_metadata(self): 320 mtime = 123456789 321 322 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 323 fWrite.write(data1) 324 325 with open(self.filename, 'rb') as fRead: 326 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html 327 328 idBytes = fRead.read(2) 329 self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID 330 331 cmByte = fRead.read(1) 332 self.assertEqual(cmByte, b'\x08') # deflate 333 334 try: 335 expectedname = self.filename.encode('Latin-1') + b'\x00' 336 expectedflags = b'\x08' # only the FNAME flag is set 337 except UnicodeEncodeError: 338 expectedname = b'' 339 expectedflags = b'\x00' 340 341 flagsByte = fRead.read(1) 342 self.assertEqual(flagsByte, expectedflags) 343 344 mtimeBytes = fRead.read(4) 345 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian 346 347 xflByte = fRead.read(1) 348 self.assertEqual(xflByte, b'\x02') # maximum compression 349 350 osByte = fRead.read(1) 351 self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent) 352 353 # Since the FNAME flag is set, the zero-terminated filename follows. 354 # RFC 1952 specifies that this is the name of the input file, if any. 355 # However, the gzip module defaults to storing the name of the output 356 # file in this field. 357 nameBytes = fRead.read(len(expectedname)) 358 self.assertEqual(nameBytes, expectedname) 359 360 # Since no other flags were set, the header ends here. 361 # Rather than process the compressed data, let's seek to the trailer. 362 fRead.seek(os.stat(self.filename).st_size - 8) 363 364 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1] 365 self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83') 366 367 isizeBytes = fRead.read(4) 368 self.assertEqual(isizeBytes, struct.pack('<i', len(data1))) 369 370 def test_metadata_ascii_name(self): 371 self.filename = os_helper.TESTFN_ASCII 372 self.test_metadata() 373 374 def test_compresslevel_metadata(self): 375 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html 376 # specifically, discussion of XFL in section 2.3.1 377 cases = [ 378 ('fast', 1, b'\x04'), 379 ('best', 9, b'\x02'), 380 ('tradeoff', 6, b'\x00'), 381 ] 382 xflOffset = 8 383 384 for (name, level, expectedXflByte) in cases: 385 with self.subTest(name): 386 fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level) 387 with fWrite: 388 fWrite.write(data1) 389 with open(self.filename, 'rb') as fRead: 390 fRead.seek(xflOffset) 391 xflByte = fRead.read(1) 392 self.assertEqual(xflByte, expectedXflByte) 393 394 def test_with_open(self): 395 # GzipFile supports the context management protocol 396 with gzip.GzipFile(self.filename, "wb") as f: 397 f.write(b"xxx") 398 f = gzip.GzipFile(self.filename, "rb") 399 f.close() 400 try: 401 with f: 402 pass 403 except ValueError: 404 pass 405 else: 406 self.fail("__enter__ on a closed file didn't raise an exception") 407 try: 408 with gzip.GzipFile(self.filename, "wb") as f: 409 1/0 410 except ZeroDivisionError: 411 pass 412 else: 413 self.fail("1/0 didn't raise an exception") 414 415 def test_zero_padded_file(self): 416 with gzip.GzipFile(self.filename, "wb") as f: 417 f.write(data1 * 50) 418 419 # Pad the file with zeroes 420 with open(self.filename, "ab") as f: 421 f.write(b"\x00" * 50) 422 423 with gzip.GzipFile(self.filename, "rb") as f: 424 d = f.read() 425 self.assertEqual(d, data1 * 50, "Incorrect data in file") 426 427 def test_gzip_BadGzipFile_exception(self): 428 self.assertTrue(issubclass(gzip.BadGzipFile, OSError)) 429 430 def test_bad_gzip_file(self): 431 with open(self.filename, 'wb') as file: 432 file.write(data1 * 50) 433 with gzip.GzipFile(self.filename, 'r') as file: 434 self.assertRaises(gzip.BadGzipFile, file.readlines) 435 436 def test_non_seekable_file(self): 437 uncompressed = data1 * 50 438 buf = UnseekableIO() 439 with gzip.GzipFile(fileobj=buf, mode="wb") as f: 440 f.write(uncompressed) 441 compressed = buf.getvalue() 442 buf = UnseekableIO(compressed) 443 with gzip.GzipFile(fileobj=buf, mode="rb") as f: 444 self.assertEqual(f.read(), uncompressed) 445 446 def test_peek(self): 447 uncompressed = data1 * 200 448 with gzip.GzipFile(self.filename, "wb") as f: 449 f.write(uncompressed) 450 451 def sizes(): 452 while True: 453 for n in range(5, 50, 10): 454 yield n 455 456 with gzip.GzipFile(self.filename, "rb") as f: 457 f.max_read_chunk = 33 458 nread = 0 459 for n in sizes(): 460 s = f.peek(n) 461 if s == b'': 462 break 463 self.assertEqual(f.read(len(s)), s) 464 nread += len(s) 465 self.assertEqual(f.read(100), b'') 466 self.assertEqual(nread, len(uncompressed)) 467 468 def test_textio_readlines(self): 469 # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile. 470 lines = (data1 * 50).decode("ascii").splitlines(keepends=True) 471 self.test_write() 472 with gzip.GzipFile(self.filename, 'r') as f: 473 with io.TextIOWrapper(f, encoding="ascii") as t: 474 self.assertEqual(t.readlines(), lines) 475 476 def test_fileobj_with_name(self): 477 with open(self.filename, "xb") as raw: 478 with gzip.GzipFile(fileobj=raw, mode="x") as f: 479 f.write(b'one') 480 self.assertEqual(f.name, raw.name) 481 self.assertEqual(f.fileno(), raw.fileno()) 482 self.assertEqual(f.mode, gzip.WRITE) 483 self.assertIs(f.readable(), False) 484 self.assertIs(f.writable(), True) 485 self.assertIs(f.seekable(), True) 486 self.assertIs(f.closed, False) 487 self.assertIs(f.closed, True) 488 self.assertEqual(f.name, raw.name) 489 self.assertRaises(AttributeError, f.fileno) 490 self.assertEqual(f.mode, gzip.WRITE) 491 self.assertIs(f.readable(), False) 492 self.assertIs(f.writable(), True) 493 self.assertIs(f.seekable(), True) 494 495 with open(self.filename, "wb") as raw: 496 with gzip.GzipFile(fileobj=raw, mode="w") as f: 497 f.write(b'two') 498 self.assertEqual(f.name, raw.name) 499 self.assertEqual(f.fileno(), raw.fileno()) 500 self.assertEqual(f.mode, gzip.WRITE) 501 self.assertIs(f.readable(), False) 502 self.assertIs(f.writable(), True) 503 self.assertIs(f.seekable(), True) 504 self.assertIs(f.closed, False) 505 self.assertIs(f.closed, True) 506 self.assertEqual(f.name, raw.name) 507 self.assertRaises(AttributeError, f.fileno) 508 self.assertEqual(f.mode, gzip.WRITE) 509 self.assertIs(f.readable(), False) 510 self.assertIs(f.writable(), True) 511 self.assertIs(f.seekable(), True) 512 513 with open(self.filename, "ab") as raw: 514 with gzip.GzipFile(fileobj=raw, mode="a") as f: 515 f.write(b'three') 516 self.assertEqual(f.name, raw.name) 517 self.assertEqual(f.fileno(), raw.fileno()) 518 self.assertEqual(f.mode, gzip.WRITE) 519 self.assertIs(f.readable(), False) 520 self.assertIs(f.writable(), True) 521 self.assertIs(f.seekable(), True) 522 self.assertIs(f.closed, False) 523 self.assertIs(f.closed, True) 524 self.assertEqual(f.name, raw.name) 525 self.assertRaises(AttributeError, f.fileno) 526 self.assertEqual(f.mode, gzip.WRITE) 527 self.assertIs(f.readable(), False) 528 self.assertIs(f.writable(), True) 529 self.assertIs(f.seekable(), True) 530 531 with open(self.filename, "rb") as raw: 532 with gzip.GzipFile(fileobj=raw, mode="r") as f: 533 self.assertEqual(f.read(), b'twothree') 534 self.assertEqual(f.name, raw.name) 535 self.assertEqual(f.fileno(), raw.fileno()) 536 self.assertEqual(f.mode, gzip.READ) 537 self.assertIs(f.readable(), True) 538 self.assertIs(f.writable(), False) 539 self.assertIs(f.seekable(), True) 540 self.assertIs(f.closed, False) 541 self.assertIs(f.closed, True) 542 self.assertEqual(f.name, raw.name) 543 self.assertRaises(AttributeError, f.fileno) 544 self.assertEqual(f.mode, gzip.READ) 545 self.assertIs(f.readable(), True) 546 self.assertIs(f.writable(), False) 547 self.assertIs(f.seekable(), True) 548 549 def test_fileobj_from_fdopen(self): 550 # Issue #13781: Opening a GzipFile for writing fails when using a 551 # fileobj created with os.fdopen(). 552 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 553 with os.fdopen(fd, "xb") as raw: 554 with gzip.GzipFile(fileobj=raw, mode="x") as f: 555 f.write(b'one') 556 self.assertEqual(f.name, '') 557 self.assertEqual(f.fileno(), raw.fileno()) 558 self.assertIs(f.closed, True) 559 self.assertEqual(f.name, '') 560 self.assertRaises(AttributeError, f.fileno) 561 562 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 563 with os.fdopen(fd, "wb") as raw: 564 with gzip.GzipFile(fileobj=raw, mode="w") as f: 565 f.write(b'two') 566 self.assertEqual(f.name, '') 567 self.assertEqual(f.fileno(), raw.fileno()) 568 self.assertEqual(f.name, '') 569 self.assertRaises(AttributeError, f.fileno) 570 571 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT | os.O_APPEND) 572 with os.fdopen(fd, "ab") as raw: 573 with gzip.GzipFile(fileobj=raw, mode="a") as f: 574 f.write(b'three') 575 self.assertEqual(f.name, '') 576 self.assertEqual(f.fileno(), raw.fileno()) 577 self.assertEqual(f.name, '') 578 self.assertRaises(AttributeError, f.fileno) 579 580 fd = os.open(self.filename, os.O_RDONLY) 581 with os.fdopen(fd, "rb") as raw: 582 with gzip.GzipFile(fileobj=raw, mode="r") as f: 583 self.assertEqual(f.read(), b'twothree') 584 self.assertEqual(f.name, '') 585 self.assertEqual(f.fileno(), raw.fileno()) 586 self.assertEqual(f.name, '') 587 self.assertRaises(AttributeError, f.fileno) 588 589 def test_fileobj_mode(self): 590 self.assertEqual(gzip.READ, 'rb') 591 self.assertEqual(gzip.WRITE, 'wb') 592 gzip.GzipFile(self.filename, "wb").close() 593 with open(self.filename, "r+b") as f: 594 with gzip.GzipFile(fileobj=f, mode='r') as g: 595 self.assertEqual(g.mode, gzip.READ) 596 with gzip.GzipFile(fileobj=f, mode='w') as g: 597 self.assertEqual(g.mode, gzip.WRITE) 598 with gzip.GzipFile(fileobj=f, mode='a') as g: 599 self.assertEqual(g.mode, gzip.WRITE) 600 with gzip.GzipFile(fileobj=f, mode='x') as g: 601 self.assertEqual(g.mode, gzip.WRITE) 602 with self.assertRaises(ValueError): 603 gzip.GzipFile(fileobj=f, mode='z') 604 for mode in "rb", "r+b": 605 with open(self.filename, mode) as f: 606 with gzip.GzipFile(fileobj=f) as g: 607 self.assertEqual(g.mode, gzip.READ) 608 for mode in "wb", "ab", "xb": 609 if "x" in mode: 610 os_helper.unlink(self.filename) 611 with open(self.filename, mode) as f: 612 with self.assertWarns(FutureWarning): 613 g = gzip.GzipFile(fileobj=f) 614 with g: 615 self.assertEqual(g.mode, gzip.WRITE) 616 617 def test_bytes_filename(self): 618 str_filename = self.filename 619 bytes_filename = os.fsencode(str_filename) 620 with gzip.GzipFile(bytes_filename, "wb") as f: 621 f.write(data1 * 50) 622 self.assertEqual(f.name, bytes_filename) 623 with gzip.GzipFile(bytes_filename, "rb") as f: 624 self.assertEqual(f.read(), data1 * 50) 625 self.assertEqual(f.name, bytes_filename) 626 # Sanity check that we are actually operating on the right file. 627 with gzip.GzipFile(str_filename, "rb") as f: 628 self.assertEqual(f.read(), data1 * 50) 629 self.assertEqual(f.name, str_filename) 630 631 def test_fileobj_without_name(self): 632 bio = io.BytesIO() 633 with gzip.GzipFile(fileobj=bio, mode='wb') as f: 634 f.write(data1 * 50) 635 self.assertEqual(f.name, '') 636 self.assertRaises(io.UnsupportedOperation, f.fileno) 637 self.assertEqual(f.mode, gzip.WRITE) 638 self.assertIs(f.readable(), False) 639 self.assertIs(f.writable(), True) 640 self.assertIs(f.seekable(), True) 641 self.assertIs(f.closed, False) 642 self.assertIs(f.closed, True) 643 self.assertEqual(f.name, '') 644 self.assertRaises(AttributeError, f.fileno) 645 self.assertEqual(f.mode, gzip.WRITE) 646 self.assertIs(f.readable(), False) 647 self.assertIs(f.writable(), True) 648 self.assertIs(f.seekable(), True) 649 650 bio.seek(0) 651 with gzip.GzipFile(fileobj=bio, mode='rb') as f: 652 self.assertEqual(f.read(), data1 * 50) 653 self.assertEqual(f.name, '') 654 self.assertRaises(io.UnsupportedOperation, f.fileno) 655 self.assertEqual(f.mode, gzip.READ) 656 self.assertIs(f.readable(), True) 657 self.assertIs(f.writable(), False) 658 self.assertIs(f.seekable(), True) 659 self.assertIs(f.closed, False) 660 self.assertIs(f.closed, True) 661 self.assertEqual(f.name, '') 662 self.assertRaises(AttributeError, f.fileno) 663 self.assertEqual(f.mode, gzip.READ) 664 self.assertIs(f.readable(), True) 665 self.assertIs(f.writable(), False) 666 self.assertIs(f.seekable(), True) 667 668 def test_fileobj_and_filename(self): 669 filename2 = self.filename + 'new' 670 with (open(self.filename, 'wb') as fileobj, 671 gzip.GzipFile(fileobj=fileobj, filename=filename2, mode='wb') as f): 672 f.write(data1 * 50) 673 self.assertEqual(f.name, filename2) 674 with (open(self.filename, 'rb') as fileobj, 675 gzip.GzipFile(fileobj=fileobj, filename=filename2, mode='rb') as f): 676 self.assertEqual(f.read(), data1 * 50) 677 self.assertEqual(f.name, filename2) 678 # Sanity check that we are actually operating on the right file. 679 with gzip.GzipFile(self.filename, 'rb') as f: 680 self.assertEqual(f.read(), data1 * 50) 681 self.assertEqual(f.name, self.filename) 682 683 def test_decompress_limited(self): 684 """Decompressed data buffering should be limited""" 685 bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9) 686 self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE) 687 688 bomb = io.BytesIO(bomb) 689 decomp = gzip.GzipFile(fileobj=bomb) 690 self.assertEqual(decomp.read(1), b'\0') 691 max_decomp = 1 + io.DEFAULT_BUFFER_SIZE 692 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 693 "Excessive amount of data was decompressed") 694 695 # Testing compress/decompress shortcut functions 696 697 def test_compress(self): 698 for data in [data1, data2]: 699 for args in [(), (1,), (6,), (9,)]: 700 datac = gzip.compress(data, *args) 701 self.assertEqual(type(datac), bytes) 702 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f: 703 self.assertEqual(f.read(), data) 704 705 def test_compress_mtime(self): 706 mtime = 123456789 707 for data in [data1, data2]: 708 for args in [(), (1,), (6,), (9,)]: 709 with self.subTest(data=data, args=args): 710 datac = gzip.compress(data, *args, mtime=mtime) 711 self.assertEqual(type(datac), bytes) 712 with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f: 713 f.read(1) # to set mtime attribute 714 self.assertEqual(f.mtime, mtime) 715 716 def test_compress_correct_level(self): 717 for mtime in (0, 42): 718 with self.subTest(mtime=mtime): 719 nocompress = gzip.compress(data1, compresslevel=0, mtime=mtime) 720 yescompress = gzip.compress(data1, compresslevel=1, mtime=mtime) 721 self.assertIn(data1, nocompress) 722 self.assertNotIn(data1, yescompress) 723 724 def test_issue112346(self): 725 # The OS byte should be 255, this should not change between Python versions. 726 for mtime in (0, 42): 727 with self.subTest(mtime=mtime): 728 compress = gzip.compress(data1, compresslevel=1, mtime=mtime) 729 self.assertEqual( 730 struct.unpack("<IxB", compress[4:10]), 731 (mtime, 255), 732 "Gzip header does not properly set either mtime or OS byte." 733 ) 734 735 def test_decompress(self): 736 for data in (data1, data2): 737 buf = io.BytesIO() 738 with gzip.GzipFile(fileobj=buf, mode="wb") as f: 739 f.write(data) 740 self.assertEqual(gzip.decompress(buf.getvalue()), data) 741 # Roundtrip with compress 742 datac = gzip.compress(data) 743 self.assertEqual(gzip.decompress(datac), data) 744 745 def test_decompress_truncated_trailer(self): 746 compressed_data = gzip.compress(data1) 747 self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4]) 748 749 def test_decompress_missing_trailer(self): 750 compressed_data = gzip.compress(data1) 751 self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8]) 752 753 def test_read_truncated(self): 754 data = data1*50 755 # Drop the CRC (4 bytes) and file size (4 bytes). 756 truncated = gzip.compress(data)[:-8] 757 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: 758 self.assertRaises(EOFError, f.read) 759 with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f: 760 self.assertEqual(f.read(len(data)), data) 761 self.assertRaises(EOFError, f.read, 1) 762 # Incomplete 10-byte header. 763 for i in range(2, 10): 764 with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f: 765 self.assertRaises(EOFError, f.read, 1) 766 767 def test_read_with_extra(self): 768 # Gzip data with an extra field 769 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff' 770 b'\x05\x00Extra' 771 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00') 772 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f: 773 self.assertEqual(f.read(), b'Test') 774 775 def test_prepend_error(self): 776 # See issue #20875 777 with gzip.open(self.filename, "wb") as f: 778 f.write(data1) 779 with gzip.open(self.filename, "rb") as f: 780 f._buffer.raw._fp.prepend() 781 782 def test_issue44439(self): 783 q = array.array('Q', [1, 2, 3, 4, 5]) 784 LENGTH = len(q) * q.itemsize 785 786 with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f: 787 self.assertEqual(f.write(q), LENGTH) 788 self.assertEqual(f.tell(), LENGTH) 789 790 def test_flush_flushes_compressor(self): 791 # See issue GH-105808. 792 b = io.BytesIO() 793 message = b"important message here." 794 with gzip.GzipFile(fileobj=b, mode='w') as f: 795 f.write(message) 796 f.flush() 797 partial_data = b.getvalue() 798 full_data = b.getvalue() 799 self.assertEqual(gzip.decompress(full_data), message) 800 # The partial data should contain the gzip header and the complete 801 # message, but not the end-of-stream markers (so we can't just 802 # decompress it directly). 803 with self.assertRaises(EOFError): 804 gzip.decompress(partial_data) 805 d = zlib.decompressobj(wbits=-zlib.MAX_WBITS) 806 f = io.BytesIO(partial_data) 807 gzip._read_gzip_header(f) 808 read_message = d.decompress(f.read()) 809 self.assertEqual(read_message, message) 810 811 def test_flush_modes(self): 812 # Make sure the argument to flush is properly passed to the 813 # zlib.compressobj; see issue GH-105808. 814 class FakeCompressor: 815 def __init__(self): 816 self.modes = [] 817 def compress(self, data): 818 return b'' 819 def flush(self, mode=-1): 820 self.modes.append(mode) 821 return b'' 822 b = io.BytesIO() 823 fc = FakeCompressor() 824 with gzip.GzipFile(fileobj=b, mode='w') as f: 825 f.compress = fc 826 f.flush() 827 f.flush(50) 828 f.flush(zlib_mode=100) 829 # The implicit close will also flush the compressor. 830 expected_modes = [ 831 zlib.Z_SYNC_FLUSH, 832 50, 833 100, 834 -1, 835 ] 836 self.assertEqual(fc.modes, expected_modes) 837 838 def test_write_seek_write(self): 839 # Make sure that offset is up-to-date before seeking 840 # See issue GH-108111 841 b = io.BytesIO() 842 message = b"important message here." 843 with gzip.GzipFile(fileobj=b, mode='w') as f: 844 f.write(message) 845 f.seek(len(message)) 846 f.write(message) 847 data = b.getvalue() 848 self.assertEqual(gzip.decompress(data), message * 2) 849 850 851class TestOpen(BaseTest): 852 def test_binary_modes(self): 853 uncompressed = data1 * 50 854 855 with gzip.open(self.filename, "wb") as f: 856 f.write(uncompressed) 857 with open(self.filename, "rb") as f: 858 file_data = gzip.decompress(f.read()) 859 self.assertEqual(file_data, uncompressed) 860 861 with gzip.open(self.filename, "rb") as f: 862 self.assertEqual(f.read(), uncompressed) 863 864 with gzip.open(self.filename, "ab") as f: 865 f.write(uncompressed) 866 with open(self.filename, "rb") as f: 867 file_data = gzip.decompress(f.read()) 868 self.assertEqual(file_data, uncompressed * 2) 869 870 with self.assertRaises(FileExistsError): 871 gzip.open(self.filename, "xb") 872 os_helper.unlink(self.filename) 873 with gzip.open(self.filename, "xb") as f: 874 f.write(uncompressed) 875 with open(self.filename, "rb") as f: 876 file_data = gzip.decompress(f.read()) 877 self.assertEqual(file_data, uncompressed) 878 879 def test_pathlike_file(self): 880 filename = os_helper.FakePath(self.filename) 881 with gzip.open(filename, "wb") as f: 882 f.write(data1 * 50) 883 self.assertEqual(f.name, self.filename) 884 with gzip.open(filename, "ab") as f: 885 f.write(data1) 886 self.assertEqual(f.name, self.filename) 887 with gzip.open(filename) as f: 888 self.assertEqual(f.read(), data1 * 51) 889 self.assertEqual(f.name, self.filename) 890 891 def test_implicit_binary_modes(self): 892 # Test implicit binary modes (no "b" or "t" in mode string). 893 uncompressed = data1 * 50 894 895 with gzip.open(self.filename, "w") as f: 896 f.write(uncompressed) 897 with open(self.filename, "rb") as f: 898 file_data = gzip.decompress(f.read()) 899 self.assertEqual(file_data, uncompressed) 900 901 with gzip.open(self.filename, "r") as f: 902 self.assertEqual(f.read(), uncompressed) 903 904 with gzip.open(self.filename, "a") as f: 905 f.write(uncompressed) 906 with open(self.filename, "rb") as f: 907 file_data = gzip.decompress(f.read()) 908 self.assertEqual(file_data, uncompressed * 2) 909 910 with self.assertRaises(FileExistsError): 911 gzip.open(self.filename, "x") 912 os_helper.unlink(self.filename) 913 with gzip.open(self.filename, "x") as f: 914 f.write(uncompressed) 915 with open(self.filename, "rb") as f: 916 file_data = gzip.decompress(f.read()) 917 self.assertEqual(file_data, uncompressed) 918 919 def test_text_modes(self): 920 uncompressed = data1.decode("ascii") * 50 921 uncompressed_raw = uncompressed.replace("\n", os.linesep) 922 with gzip.open(self.filename, "wt", encoding="ascii") as f: 923 f.write(uncompressed) 924 with open(self.filename, "rb") as f: 925 file_data = gzip.decompress(f.read()).decode("ascii") 926 self.assertEqual(file_data, uncompressed_raw) 927 with gzip.open(self.filename, "rt", encoding="ascii") as f: 928 self.assertEqual(f.read(), uncompressed) 929 with gzip.open(self.filename, "at", encoding="ascii") as f: 930 f.write(uncompressed) 931 with open(self.filename, "rb") as f: 932 file_data = gzip.decompress(f.read()).decode("ascii") 933 self.assertEqual(file_data, uncompressed_raw * 2) 934 935 def test_fileobj(self): 936 uncompressed_bytes = data1 * 50 937 uncompressed_str = uncompressed_bytes.decode("ascii") 938 compressed = gzip.compress(uncompressed_bytes) 939 with gzip.open(io.BytesIO(compressed), "r") as f: 940 self.assertEqual(f.read(), uncompressed_bytes) 941 with gzip.open(io.BytesIO(compressed), "rb") as f: 942 self.assertEqual(f.read(), uncompressed_bytes) 943 with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f: 944 self.assertEqual(f.read(), uncompressed_str) 945 946 def test_bad_params(self): 947 # Test invalid parameter combinations. 948 with self.assertRaises(TypeError): 949 gzip.open(123.456) 950 with self.assertRaises(ValueError): 951 gzip.open(self.filename, "wbt") 952 with self.assertRaises(ValueError): 953 gzip.open(self.filename, "xbt") 954 with self.assertRaises(ValueError): 955 gzip.open(self.filename, "rb", encoding="utf-8") 956 with self.assertRaises(ValueError): 957 gzip.open(self.filename, "rb", errors="ignore") 958 with self.assertRaises(ValueError): 959 gzip.open(self.filename, "rb", newline="\n") 960 961 def test_encoding(self): 962 # Test non-default encoding. 963 uncompressed = data1.decode("ascii") * 50 964 uncompressed_raw = uncompressed.replace("\n", os.linesep) 965 with gzip.open(self.filename, "wt", encoding="utf-16") as f: 966 f.write(uncompressed) 967 with open(self.filename, "rb") as f: 968 file_data = gzip.decompress(f.read()).decode("utf-16") 969 self.assertEqual(file_data, uncompressed_raw) 970 with gzip.open(self.filename, "rt", encoding="utf-16") as f: 971 self.assertEqual(f.read(), uncompressed) 972 973 def test_encoding_error_handler(self): 974 # Test with non-default encoding error handler. 975 with gzip.open(self.filename, "wb") as f: 976 f.write(b"foo\xffbar") 977 with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 978 as f: 979 self.assertEqual(f.read(), "foobar") 980 981 def test_newline(self): 982 # Test with explicit newline (universal newline mode disabled). 983 uncompressed = data1.decode("ascii") * 50 984 with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f: 985 f.write(uncompressed) 986 with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f: 987 self.assertEqual(f.readlines(), [uncompressed]) 988 989 990def create_and_remove_directory(directory): 991 def decorator(function): 992 @functools.wraps(function) 993 def wrapper(*args, **kwargs): 994 os.makedirs(directory) 995 try: 996 return function(*args, **kwargs) 997 finally: 998 os_helper.rmtree(directory) 999 return wrapper 1000 return decorator 1001 1002 1003class TestCommandLine(unittest.TestCase): 1004 data = b'This is a simple test with gzip' 1005 1006 @requires_subprocess() 1007 def test_decompress_stdin_stdout(self): 1008 with io.BytesIO() as bytes_io: 1009 with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file: 1010 gzip_file.write(self.data) 1011 1012 args = sys.executable, '-m', 'gzip', '-d' 1013 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: 1014 out, err = proc.communicate(bytes_io.getvalue()) 1015 1016 self.assertEqual(err, b'') 1017 self.assertEqual(out, self.data) 1018 1019 @create_and_remove_directory(TEMPDIR) 1020 def test_decompress_infile_outfile(self): 1021 gzipname = os.path.join(TEMPDIR, 'testgzip.gz') 1022 self.assertFalse(os.path.exists(gzipname)) 1023 1024 with gzip.open(gzipname, mode='wb') as fp: 1025 fp.write(self.data) 1026 rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname) 1027 1028 with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped: 1029 self.assertEqual(gunziped.read(), self.data) 1030 1031 self.assertTrue(os.path.exists(gzipname)) 1032 self.assertEqual(rc, 0) 1033 self.assertEqual(out, b'') 1034 self.assertEqual(err, b'') 1035 1036 def test_decompress_infile_outfile_error(self): 1037 rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out') 1038 self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip()) 1039 self.assertEqual(rc, 1) 1040 self.assertEqual(out, b'') 1041 1042 @requires_subprocess() 1043 @create_and_remove_directory(TEMPDIR) 1044 def test_compress_stdin_outfile(self): 1045 args = sys.executable, '-m', 'gzip' 1046 with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc: 1047 out, err = proc.communicate(self.data) 1048 1049 self.assertEqual(err, b'') 1050 self.assertEqual(out[:2], b"\x1f\x8b") 1051 1052 @create_and_remove_directory(TEMPDIR) 1053 def test_compress_infile_outfile_default(self): 1054 local_testgzip = os.path.join(TEMPDIR, 'testgzip') 1055 gzipname = local_testgzip + '.gz' 1056 self.assertFalse(os.path.exists(gzipname)) 1057 1058 with open(local_testgzip, 'wb') as fp: 1059 fp.write(self.data) 1060 1061 rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip) 1062 1063 self.assertTrue(os.path.exists(gzipname)) 1064 self.assertEqual(out, b'') 1065 self.assertEqual(err, b'') 1066 1067 @create_and_remove_directory(TEMPDIR) 1068 def test_compress_infile_outfile(self): 1069 for compress_level in ('--fast', '--best'): 1070 with self.subTest(compress_level=compress_level): 1071 local_testgzip = os.path.join(TEMPDIR, 'testgzip') 1072 gzipname = local_testgzip + '.gz' 1073 self.assertFalse(os.path.exists(gzipname)) 1074 1075 with open(local_testgzip, 'wb') as fp: 1076 fp.write(self.data) 1077 1078 rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip) 1079 1080 self.assertTrue(os.path.exists(gzipname)) 1081 self.assertEqual(out, b'') 1082 self.assertEqual(err, b'') 1083 os.remove(gzipname) 1084 self.assertFalse(os.path.exists(gzipname)) 1085 1086 def test_compress_fast_best_are_exclusive(self): 1087 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best') 1088 self.assertIn(b"error: argument --best: not allowed with argument --fast", err) 1089 self.assertEqual(out, b'') 1090 1091 def test_decompress_cannot_have_flags_compression(self): 1092 rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d') 1093 self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err) 1094 self.assertEqual(out, b'') 1095 1096 1097if __name__ == "__main__": 1098 unittest.main() 1099