1import unittest 2from test import support 3from test.support import import_helper 4import binascii 5import copy 6import pickle 7import random 8import sys 9from test.support import bigmemtest, _1G, _4G, is_s390x 10 11 12zlib = import_helper.import_module('zlib') 13 14requires_Compress_copy = unittest.skipUnless( 15 hasattr(zlib.compressobj(), "copy"), 16 'requires Compress.copy()') 17requires_Decompress_copy = unittest.skipUnless( 18 hasattr(zlib.decompressobj(), "copy"), 19 'requires Decompress.copy()') 20 21 22def _zlib_runtime_version_tuple(zlib_version=zlib.ZLIB_RUNTIME_VERSION): 23 # Register "1.2.3" as "1.2.3.0" 24 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux" 25 v = zlib_version.split('-', 1)[0].split('.') 26 if len(v) < 4: 27 v.append('0') 28 elif not v[-1].isnumeric(): 29 v[-1] = '0' 30 return tuple(map(int, v)) 31 32 33ZLIB_RUNTIME_VERSION_TUPLE = _zlib_runtime_version_tuple() 34 35 36# bpo-46623: When a hardware accelerator is used (currently only on s390x), 37# using different ways to compress data with zlib can produce different 38# compressed data. 39# Simplified test_pair() code: 40# 41# def func1(data): 42# return zlib.compress(data) 43# 44# def func2(data) 45# co = zlib.compressobj() 46# x1 = co.compress(data) 47# x2 = co.flush() 48# return x1 + x2 49# 50# On s390x if zlib uses a hardware accelerator, func1() creates a single 51# "final" compressed block whereas func2() produces 3 compressed blocks (the 52# last one is a final block). On other platforms with no accelerator, func1() 53# and func2() produce the same compressed data made of a single (final) 54# compressed block. 55# 56# Only the compressed data is different, the decompression returns the original 57# data: 58# 59# zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data 60# 61# To simplify the skip condition, make the assumption that s390x always has an 62# accelerator, and nothing else has it. 63HW_ACCELERATED = is_s390x 64 65 66class VersionTestCase(unittest.TestCase): 67 68 def test_library_version(self): 69 # Test that the major version of the actual library in use matches the 70 # major version that we were compiled against. We can't guarantee that 71 # the minor versions will match (even on the machine on which the module 72 # was compiled), and the API is stable between minor versions, so 73 # testing only the major versions avoids spurious failures. 74 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) 75 76 77class ChecksumTestCase(unittest.TestCase): 78 # checksum test cases 79 def test_crc32start(self): 80 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) 81 self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) 82 83 def test_crc32empty(self): 84 self.assertEqual(zlib.crc32(b"", 0), 0) 85 self.assertEqual(zlib.crc32(b"", 1), 1) 86 self.assertEqual(zlib.crc32(b"", 432), 432) 87 88 def test_adler32start(self): 89 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) 90 self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) 91 92 def test_adler32empty(self): 93 self.assertEqual(zlib.adler32(b"", 0), 0) 94 self.assertEqual(zlib.adler32(b"", 1), 1) 95 self.assertEqual(zlib.adler32(b"", 432), 432) 96 97 def test_penguins(self): 98 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) 99 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) 100 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) 101 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) 102 103 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) 104 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) 105 106 def test_crc32_adler32_unsigned(self): 107 foo = b'abcdefghijklmnop' 108 # explicitly test signed behavior 109 self.assertEqual(zlib.crc32(foo), 2486878355) 110 self.assertEqual(zlib.crc32(b'spam'), 1138425661) 111 self.assertEqual(zlib.adler32(foo+foo), 3573550353) 112 self.assertEqual(zlib.adler32(b'spam'), 72286642) 113 114 def test_same_as_binascii_crc32(self): 115 foo = b'abcdefghijklmnop' 116 crc = 2486878355 117 self.assertEqual(binascii.crc32(foo), crc) 118 self.assertEqual(zlib.crc32(foo), crc) 119 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) 120 121 122# Issue #10276 - check that inputs >=4 GiB are handled correctly. 123class ChecksumBigBufferTestCase(unittest.TestCase): 124 125 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) 126 def test_big_buffer(self, size): 127 data = b"nyan" * (_1G + 1) 128 self.assertEqual(zlib.crc32(data), 1044521549) 129 self.assertEqual(zlib.adler32(data), 2256789997) 130 131 132class ExceptionTestCase(unittest.TestCase): 133 # make sure we generate some expected errors 134 def test_badlevel(self): 135 # specifying compression level out of range causes an error 136 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 137 # accepts 0 too) 138 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) 139 140 def test_badargs(self): 141 self.assertRaises(TypeError, zlib.adler32) 142 self.assertRaises(TypeError, zlib.crc32) 143 self.assertRaises(TypeError, zlib.compress) 144 self.assertRaises(TypeError, zlib.decompress) 145 for arg in (42, None, '', 'abc', (), []): 146 self.assertRaises(TypeError, zlib.adler32, arg) 147 self.assertRaises(TypeError, zlib.crc32, arg) 148 self.assertRaises(TypeError, zlib.compress, arg) 149 self.assertRaises(TypeError, zlib.decompress, arg) 150 151 def test_badcompressobj(self): 152 # verify failure on building compress object with bad params 153 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 154 # specifying total bits too large causes an error 155 self.assertRaises(ValueError, 156 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 157 158 def test_baddecompressobj(self): 159 # verify failure on building decompress object with bad params 160 self.assertRaises(ValueError, zlib.decompressobj, -1) 161 162 def test_decompressobj_badflush(self): 163 # verify failure on calling decompressobj.flush with bad params 164 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 165 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 166 167 @support.cpython_only 168 def test_overflow(self): 169 with self.assertRaisesRegex(OverflowError, 'int too large'): 170 zlib.decompress(b'', 15, sys.maxsize + 1) 171 with self.assertRaisesRegex(OverflowError, 'int too large'): 172 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 173 with self.assertRaisesRegex(OverflowError, 'int too large'): 174 zlib.decompressobj().flush(sys.maxsize + 1) 175 176 @support.cpython_only 177 def test_disallow_instantiation(self): 178 # Ensure that the type disallows instantiation (bpo-43916) 179 support.check_disallow_instantiation(self, type(zlib.compressobj())) 180 support.check_disallow_instantiation(self, type(zlib.decompressobj())) 181 182 183class BaseCompressTestCase(object): 184 def check_big_compress_buffer(self, size, compress_func): 185 _1M = 1024 * 1024 186 # Generate 10 MiB worth of random, and expand it by repeating it. 187 # The assumption is that zlib's memory is not big enough to exploit 188 # such spread out redundancy. 189 data = random.randbytes(_1M * 10) 190 data = data * (size // len(data) + 1) 191 try: 192 compress_func(data) 193 finally: 194 # Release memory 195 data = None 196 197 def check_big_decompress_buffer(self, size, decompress_func): 198 data = b'x' * size 199 try: 200 compressed = zlib.compress(data, 1) 201 finally: 202 # Release memory 203 data = None 204 data = decompress_func(compressed) 205 # Sanity check 206 try: 207 self.assertEqual(len(data), size) 208 self.assertEqual(len(data.strip(b'x')), 0) 209 finally: 210 data = None 211 212 213class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 214 # Test compression in one go (whole message compression) 215 def test_speech(self): 216 x = zlib.compress(HAMLET_SCENE) 217 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 218 219 def test_keywords(self): 220 x = zlib.compress(HAMLET_SCENE, level=3) 221 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 222 with self.assertRaises(TypeError): 223 zlib.compress(data=HAMLET_SCENE, level=3) 224 self.assertEqual(zlib.decompress(x, 225 wbits=zlib.MAX_WBITS, 226 bufsize=zlib.DEF_BUF_SIZE), 227 HAMLET_SCENE) 228 229 def test_speech128(self): 230 # compress more data 231 data = HAMLET_SCENE * 128 232 x = zlib.compress(data) 233 # With hardware acceleration, the compressed bytes 234 # might not be identical. 235 if not HW_ACCELERATED: 236 self.assertEqual(zlib.compress(bytearray(data)), x) 237 for ob in x, bytearray(x): 238 self.assertEqual(zlib.decompress(ob), data) 239 240 def test_incomplete_stream(self): 241 # A useful error message is given 242 x = zlib.compress(HAMLET_SCENE) 243 self.assertRaisesRegex(zlib.error, 244 "Error -5 while decompressing data: incomplete or truncated stream", 245 zlib.decompress, x[:-1]) 246 247 # Memory use of the following functions takes into account overallocation 248 249 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 250 def test_big_compress_buffer(self, size): 251 compress = lambda s: zlib.compress(s, 1) 252 self.check_big_compress_buffer(size, compress) 253 254 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 255 def test_big_decompress_buffer(self, size): 256 self.check_big_decompress_buffer(size, zlib.decompress) 257 258 @bigmemtest(size=_4G, memuse=1) 259 def test_large_bufsize(self, size): 260 # Test decompress(bufsize) parameter greater than the internal limit 261 data = HAMLET_SCENE * 10 262 compressed = zlib.compress(data, 1) 263 self.assertEqual(zlib.decompress(compressed, 15, size), data) 264 265 def test_custom_bufsize(self): 266 data = HAMLET_SCENE * 10 267 compressed = zlib.compress(data, 1) 268 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 269 270 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 271 @bigmemtest(size=_4G + 100, memuse=4) 272 def test_64bit_compress(self, size): 273 data = b'x' * size 274 try: 275 comp = zlib.compress(data, 0) 276 self.assertEqual(zlib.decompress(comp), data) 277 finally: 278 comp = data = None 279 280 281class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 282 # Test compression object 283 def test_pair(self): 284 # straightforward compress/decompress objects 285 datasrc = HAMLET_SCENE * 128 286 datazip = zlib.compress(datasrc) 287 # should compress both bytes and bytearray data 288 for data in (datasrc, bytearray(datasrc)): 289 co = zlib.compressobj() 290 x1 = co.compress(data) 291 x2 = co.flush() 292 self.assertRaises(zlib.error, co.flush) # second flush should not work 293 # With hardware acceleration, the compressed bytes might not 294 # be identical. 295 if not HW_ACCELERATED: 296 self.assertEqual(x1 + x2, datazip) 297 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): 298 dco = zlib.decompressobj() 299 y1 = dco.decompress(v1 + v2) 300 y2 = dco.flush() 301 self.assertEqual(data, y1 + y2) 302 self.assertIsInstance(dco.unconsumed_tail, bytes) 303 self.assertIsInstance(dco.unused_data, bytes) 304 305 def test_keywords(self): 306 level = 2 307 method = zlib.DEFLATED 308 wbits = -12 309 memLevel = 9 310 strategy = zlib.Z_FILTERED 311 co = zlib.compressobj(level=level, 312 method=method, 313 wbits=wbits, 314 memLevel=memLevel, 315 strategy=strategy, 316 zdict=b"") 317 do = zlib.decompressobj(wbits=wbits, zdict=b"") 318 with self.assertRaises(TypeError): 319 co.compress(data=HAMLET_SCENE) 320 with self.assertRaises(TypeError): 321 do.decompress(data=zlib.compress(HAMLET_SCENE)) 322 x = co.compress(HAMLET_SCENE) + co.flush() 323 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() 324 self.assertEqual(HAMLET_SCENE, y) 325 326 def test_compressoptions(self): 327 # specify lots of options to compressobj() 328 level = 2 329 method = zlib.DEFLATED 330 wbits = -12 331 memLevel = 9 332 strategy = zlib.Z_FILTERED 333 co = zlib.compressobj(level, method, wbits, memLevel, strategy) 334 x1 = co.compress(HAMLET_SCENE) 335 x2 = co.flush() 336 dco = zlib.decompressobj(wbits) 337 y1 = dco.decompress(x1 + x2) 338 y2 = dco.flush() 339 self.assertEqual(HAMLET_SCENE, y1 + y2) 340 341 def test_compressincremental(self): 342 # compress object in steps, decompress object as one-shot 343 data = HAMLET_SCENE * 128 344 co = zlib.compressobj() 345 bufs = [] 346 for i in range(0, len(data), 256): 347 bufs.append(co.compress(data[i:i+256])) 348 bufs.append(co.flush()) 349 combuf = b''.join(bufs) 350 351 dco = zlib.decompressobj() 352 y1 = dco.decompress(b''.join(bufs)) 353 y2 = dco.flush() 354 self.assertEqual(data, y1 + y2) 355 356 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 357 # compress object in steps, decompress object in steps 358 source = source or HAMLET_SCENE 359 data = source * 128 360 co = zlib.compressobj() 361 bufs = [] 362 for i in range(0, len(data), cx): 363 bufs.append(co.compress(data[i:i+cx])) 364 bufs.append(co.flush()) 365 combuf = b''.join(bufs) 366 367 decombuf = zlib.decompress(combuf) 368 # Test type of return value 369 self.assertIsInstance(decombuf, bytes) 370 371 self.assertEqual(data, decombuf) 372 373 dco = zlib.decompressobj() 374 bufs = [] 375 for i in range(0, len(combuf), dcx): 376 bufs.append(dco.decompress(combuf[i:i+dcx])) 377 self.assertEqual(b'', dco.unconsumed_tail, ######## 378 "(A) uct should be b'': not %d long" % 379 len(dco.unconsumed_tail)) 380 self.assertEqual(b'', dco.unused_data) 381 if flush: 382 bufs.append(dco.flush()) 383 else: 384 while True: 385 chunk = dco.decompress(b'') 386 if chunk: 387 bufs.append(chunk) 388 else: 389 break 390 self.assertEqual(b'', dco.unconsumed_tail, ######## 391 "(B) uct should be b'': not %d long" % 392 len(dco.unconsumed_tail)) 393 self.assertEqual(b'', dco.unused_data) 394 self.assertEqual(data, b''.join(bufs)) 395 # Failure means: "decompressobj with init options failed" 396 397 def test_decompincflush(self): 398 self.test_decompinc(flush=True) 399 400 def test_decompimax(self, source=None, cx=256, dcx=64): 401 # compress in steps, decompress in length-restricted steps 402 source = source or HAMLET_SCENE 403 # Check a decompression object with max_length specified 404 data = source * 128 405 co = zlib.compressobj() 406 bufs = [] 407 for i in range(0, len(data), cx): 408 bufs.append(co.compress(data[i:i+cx])) 409 bufs.append(co.flush()) 410 combuf = b''.join(bufs) 411 self.assertEqual(data, zlib.decompress(combuf), 412 'compressed data failure') 413 414 dco = zlib.decompressobj() 415 bufs = [] 416 cb = combuf 417 while cb: 418 #max_length = 1 + len(cb)//10 419 chunk = dco.decompress(cb, dcx) 420 self.assertFalse(len(chunk) > dcx, 421 'chunk too big (%d>%d)' % (len(chunk), dcx)) 422 bufs.append(chunk) 423 cb = dco.unconsumed_tail 424 bufs.append(dco.flush()) 425 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 426 427 def test_decompressmaxlen(self, flush=False): 428 # Check a decompression object with max_length specified 429 data = HAMLET_SCENE * 128 430 co = zlib.compressobj() 431 bufs = [] 432 for i in range(0, len(data), 256): 433 bufs.append(co.compress(data[i:i+256])) 434 bufs.append(co.flush()) 435 combuf = b''.join(bufs) 436 self.assertEqual(data, zlib.decompress(combuf), 437 'compressed data failure') 438 439 dco = zlib.decompressobj() 440 bufs = [] 441 cb = combuf 442 while cb: 443 max_length = 1 + len(cb)//10 444 chunk = dco.decompress(cb, max_length) 445 self.assertFalse(len(chunk) > max_length, 446 'chunk too big (%d>%d)' % (len(chunk),max_length)) 447 bufs.append(chunk) 448 cb = dco.unconsumed_tail 449 if flush: 450 bufs.append(dco.flush()) 451 else: 452 while chunk: 453 chunk = dco.decompress(b'', max_length) 454 self.assertFalse(len(chunk) > max_length, 455 'chunk too big (%d>%d)' % (len(chunk),max_length)) 456 bufs.append(chunk) 457 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 458 459 def test_decompressmaxlenflush(self): 460 self.test_decompressmaxlen(flush=True) 461 462 def test_maxlenmisc(self): 463 # Misc tests of max_length 464 dco = zlib.decompressobj() 465 self.assertRaises(ValueError, dco.decompress, b"", -1) 466 self.assertEqual(b'', dco.unconsumed_tail) 467 468 def test_maxlen_large(self): 469 # Sizes up to sys.maxsize should be accepted, although zlib is 470 # internally limited to expressing sizes with unsigned int 471 data = HAMLET_SCENE * 10 472 self.assertGreater(len(data), zlib.DEF_BUF_SIZE) 473 compressed = zlib.compress(data, 1) 474 dco = zlib.decompressobj() 475 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 476 477 def test_maxlen_custom(self): 478 data = HAMLET_SCENE * 10 479 compressed = zlib.compress(data, 1) 480 dco = zlib.decompressobj() 481 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 482 483 def test_clear_unconsumed_tail(self): 484 # Issue #12050: calling decompress() without providing max_length 485 # should clear the unconsumed_tail attribute. 486 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" 487 dco = zlib.decompressobj() 488 ddata = dco.decompress(cdata, 1) 489 ddata += dco.decompress(dco.unconsumed_tail) 490 self.assertEqual(dco.unconsumed_tail, b"") 491 492 def test_flushes(self): 493 # Test flush() with the various options, using all the 494 # different levels in order to provide more variations. 495 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH', 496 'Z_PARTIAL_FLUSH'] 497 498 # Z_BLOCK has a known failure prior to 1.2.5.3 499 if ZLIB_RUNTIME_VERSION_TUPLE >= (1, 2, 5, 3): 500 sync_opt.append('Z_BLOCK') 501 502 sync_opt = [getattr(zlib, opt) for opt in sync_opt 503 if hasattr(zlib, opt)] 504 data = HAMLET_SCENE * 8 505 506 for sync in sync_opt: 507 for level in range(10): 508 with self.subTest(sync=sync, level=level): 509 obj = zlib.compressobj( level ) 510 a = obj.compress( data[:3000] ) 511 b = obj.flush( sync ) 512 c = obj.compress( data[3000:] ) 513 d = obj.flush() 514 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), 515 data, ("Decompress failed: flush " 516 "mode=%i, level=%i") % (sync, level)) 517 del obj 518 519 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 520 'requires zlib.Z_SYNC_FLUSH') 521 def test_odd_flush(self): 522 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 523 import random 524 # Testing on 17K of "random" data 525 526 # Create compressor and decompressor objects 527 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 528 dco = zlib.decompressobj() 529 530 # Try 17K of data 531 # generate random data stream 532 data = random.randbytes(17 * 1024) 533 534 # compress, sync-flush, and decompress 535 first = co.compress(data) 536 second = co.flush(zlib.Z_SYNC_FLUSH) 537 expanded = dco.decompress(first + second) 538 539 # if decompressed data is different from the input data, choke. 540 self.assertEqual(expanded, data, "17K random source doesn't match") 541 542 def test_empty_flush(self): 543 # Test that calling .flush() on unused objects works. 544 # (Bug #1083110 -- calling .flush() on decompress objects 545 # caused a core dump.) 546 547 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 548 self.assertTrue(co.flush()) # Returns a zlib header 549 dco = zlib.decompressobj() 550 self.assertEqual(dco.flush(), b"") # Returns nothing 551 552 def test_dictionary(self): 553 h = HAMLET_SCENE 554 # Build a simulated dictionary out of the words in HAMLET. 555 words = h.split() 556 random.shuffle(words) 557 zdict = b''.join(words) 558 # Use it to compress HAMLET. 559 co = zlib.compressobj(zdict=zdict) 560 cd = co.compress(h) + co.flush() 561 # Verify that it will decompress with the dictionary. 562 dco = zlib.decompressobj(zdict=zdict) 563 self.assertEqual(dco.decompress(cd) + dco.flush(), h) 564 # Verify that it fails when not given the dictionary. 565 dco = zlib.decompressobj() 566 self.assertRaises(zlib.error, dco.decompress, cd) 567 568 def test_dictionary_streaming(self): 569 # This simulates the reuse of a compressor object for compressing 570 # several separate data streams. 571 co = zlib.compressobj(zdict=HAMLET_SCENE) 572 do = zlib.decompressobj(zdict=HAMLET_SCENE) 573 piece = HAMLET_SCENE[1000:1500] 574 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) 575 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) 576 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) 577 self.assertEqual(do.decompress(d0), piece) 578 self.assertEqual(do.decompress(d1), piece[100:]) 579 self.assertEqual(do.decompress(d2), piece[:-100]) 580 581 def test_decompress_incomplete_stream(self): 582 # This is 'foo', deflated 583 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 584 # For the record 585 self.assertEqual(zlib.decompress(x), b'foo') 586 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 587 # Omitting the stream end works with decompressor objects 588 # (see issue #8672). 589 dco = zlib.decompressobj() 590 y = dco.decompress(x[:-5]) 591 y += dco.flush() 592 self.assertEqual(y, b'foo') 593 594 def test_decompress_eof(self): 595 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 596 dco = zlib.decompressobj() 597 self.assertFalse(dco.eof) 598 dco.decompress(x[:-5]) 599 self.assertFalse(dco.eof) 600 dco.decompress(x[-5:]) 601 self.assertTrue(dco.eof) 602 dco.flush() 603 self.assertTrue(dco.eof) 604 605 def test_decompress_eof_incomplete_stream(self): 606 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 607 dco = zlib.decompressobj() 608 self.assertFalse(dco.eof) 609 dco.decompress(x[:-5]) 610 self.assertFalse(dco.eof) 611 dco.flush() 612 self.assertFalse(dco.eof) 613 614 def test_decompress_unused_data(self): 615 # Repeated calls to decompress() after EOF should accumulate data in 616 # dco.unused_data, instead of just storing the arg to the last call. 617 source = b'abcdefghijklmnopqrstuvwxyz' 618 remainder = b'0123456789' 619 y = zlib.compress(source) 620 x = y + remainder 621 for maxlen in 0, 1000: 622 for step in 1, 2, len(y), len(x): 623 dco = zlib.decompressobj() 624 data = b'' 625 for i in range(0, len(x), step): 626 if i < len(y): 627 self.assertEqual(dco.unused_data, b'') 628 if maxlen == 0: 629 data += dco.decompress(x[i : i + step]) 630 self.assertEqual(dco.unconsumed_tail, b'') 631 else: 632 data += dco.decompress( 633 dco.unconsumed_tail + x[i : i + step], maxlen) 634 data += dco.flush() 635 self.assertTrue(dco.eof) 636 self.assertEqual(data, source) 637 self.assertEqual(dco.unconsumed_tail, b'') 638 self.assertEqual(dco.unused_data, remainder) 639 640 # issue27164 641 def test_decompress_raw_with_dictionary(self): 642 zdict = b'abcdefghijklmnopqrstuvwxyz' 643 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 644 comp = co.compress(zdict) + co.flush() 645 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 646 uncomp = dco.decompress(comp) + dco.flush() 647 self.assertEqual(zdict, uncomp) 648 649 def test_flush_with_freed_input(self): 650 # Issue #16411: decompressor accesses input to last decompress() call 651 # in flush(), even if this object has been freed in the meanwhile. 652 input1 = b'abcdefghijklmnopqrstuvwxyz' 653 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' 654 data = zlib.compress(input1) 655 dco = zlib.decompressobj() 656 dco.decompress(data, 1) 657 del data 658 data = zlib.compress(input2) 659 self.assertEqual(dco.flush(), input1[1:]) 660 661 @bigmemtest(size=_4G, memuse=1) 662 def test_flush_large_length(self, size): 663 # Test flush(length) parameter greater than internal limit UINT_MAX 664 input = HAMLET_SCENE * 10 665 data = zlib.compress(input, 1) 666 dco = zlib.decompressobj() 667 dco.decompress(data, 1) 668 self.assertEqual(dco.flush(size), input[1:]) 669 670 def test_flush_custom_length(self): 671 input = HAMLET_SCENE * 10 672 data = zlib.compress(input, 1) 673 dco = zlib.decompressobj() 674 dco.decompress(data, 1) 675 self.assertEqual(dco.flush(CustomInt()), input[1:]) 676 677 @requires_Compress_copy 678 def test_compresscopy(self): 679 # Test copying a compression object 680 data0 = HAMLET_SCENE 681 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") 682 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 683 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 684 bufs0 = [] 685 bufs0.append(c0.compress(data0)) 686 687 c1 = func(c0) 688 bufs1 = bufs0[:] 689 690 bufs0.append(c0.compress(data0)) 691 bufs0.append(c0.flush()) 692 s0 = b''.join(bufs0) 693 694 bufs1.append(c1.compress(data1)) 695 bufs1.append(c1.flush()) 696 s1 = b''.join(bufs1) 697 698 self.assertEqual(zlib.decompress(s0),data0+data0) 699 self.assertEqual(zlib.decompress(s1),data0+data1) 700 701 @requires_Compress_copy 702 def test_badcompresscopy(self): 703 # Test copying a compression object in an inconsistent state 704 c = zlib.compressobj() 705 c.compress(HAMLET_SCENE) 706 c.flush() 707 self.assertRaises(ValueError, c.copy) 708 self.assertRaises(ValueError, copy.copy, c) 709 self.assertRaises(ValueError, copy.deepcopy, c) 710 711 @requires_Decompress_copy 712 def test_decompresscopy(self): 713 # Test copying a decompression object 714 data = HAMLET_SCENE 715 comp = zlib.compress(data) 716 # Test type of return value 717 self.assertIsInstance(comp, bytes) 718 719 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 720 d0 = zlib.decompressobj() 721 bufs0 = [] 722 bufs0.append(d0.decompress(comp[:32])) 723 724 d1 = func(d0) 725 bufs1 = bufs0[:] 726 727 bufs0.append(d0.decompress(comp[32:])) 728 s0 = b''.join(bufs0) 729 730 bufs1.append(d1.decompress(comp[32:])) 731 s1 = b''.join(bufs1) 732 733 self.assertEqual(s0,s1) 734 self.assertEqual(s0,data) 735 736 @requires_Decompress_copy 737 def test_baddecompresscopy(self): 738 # Test copying a compression object in an inconsistent state 739 data = zlib.compress(HAMLET_SCENE) 740 d = zlib.decompressobj() 741 d.decompress(data) 742 d.flush() 743 self.assertRaises(ValueError, d.copy) 744 self.assertRaises(ValueError, copy.copy, d) 745 self.assertRaises(ValueError, copy.deepcopy, d) 746 747 def test_compresspickle(self): 748 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 749 with self.assertRaises((TypeError, pickle.PicklingError)): 750 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 751 752 def test_decompresspickle(self): 753 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 754 with self.assertRaises((TypeError, pickle.PicklingError)): 755 pickle.dumps(zlib.decompressobj(), proto) 756 757 # Memory use of the following functions takes into account overallocation 758 759 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 760 def test_big_compress_buffer(self, size): 761 c = zlib.compressobj(1) 762 compress = lambda s: c.compress(s) + c.flush() 763 self.check_big_compress_buffer(size, compress) 764 765 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 766 def test_big_decompress_buffer(self, size): 767 d = zlib.decompressobj() 768 decompress = lambda s: d.decompress(s) + d.flush() 769 self.check_big_decompress_buffer(size, decompress) 770 771 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 772 @bigmemtest(size=_4G + 100, memuse=4) 773 def test_64bit_compress(self, size): 774 data = b'x' * size 775 co = zlib.compressobj(0) 776 do = zlib.decompressobj() 777 try: 778 comp = co.compress(data) + co.flush() 779 uncomp = do.decompress(comp) + do.flush() 780 self.assertEqual(uncomp, data) 781 finally: 782 comp = uncomp = data = None 783 784 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 785 @bigmemtest(size=_4G + 100, memuse=3) 786 def test_large_unused_data(self, size): 787 data = b'abcdefghijklmnop' 788 unused = b'x' * size 789 comp = zlib.compress(data) + unused 790 do = zlib.decompressobj() 791 try: 792 uncomp = do.decompress(comp) + do.flush() 793 self.assertEqual(unused, do.unused_data) 794 self.assertEqual(uncomp, data) 795 finally: 796 unused = comp = do = None 797 798 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 799 @bigmemtest(size=_4G + 100, memuse=5) 800 def test_large_unconsumed_tail(self, size): 801 data = b'x' * size 802 do = zlib.decompressobj() 803 try: 804 comp = zlib.compress(data, 0) 805 uncomp = do.decompress(comp, 1) + do.flush() 806 self.assertEqual(uncomp, data) 807 self.assertEqual(do.unconsumed_tail, b'') 808 finally: 809 comp = uncomp = data = None 810 811 def test_wbits(self): 812 # wbits=0 only supported since zlib v1.2.3.5 813 supports_wbits_0 = ZLIB_RUNTIME_VERSION_TUPLE >= (1, 2, 3, 5) 814 815 co = zlib.compressobj(level=1, wbits=15) 816 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 817 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 818 if supports_wbits_0: 819 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) 820 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 821 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 822 zlib.decompress(zlib15, 14) 823 dco = zlib.decompressobj(wbits=32 + 15) 824 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 825 dco = zlib.decompressobj(wbits=14) 826 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 827 dco.decompress(zlib15) 828 829 co = zlib.compressobj(level=1, wbits=9) 830 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 831 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 832 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 833 if supports_wbits_0: 834 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) 835 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 836 dco = zlib.decompressobj(wbits=32 + 9) 837 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 838 839 co = zlib.compressobj(level=1, wbits=-15) 840 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 841 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 842 dco = zlib.decompressobj(wbits=-15) 843 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 844 845 co = zlib.compressobj(level=1, wbits=-9) 846 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 847 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 848 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 849 dco = zlib.decompressobj(wbits=-9) 850 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 851 852 co = zlib.compressobj(level=1, wbits=16 + 15) 853 gzip = co.compress(HAMLET_SCENE) + co.flush() 854 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 855 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 856 dco = zlib.decompressobj(32 + 15) 857 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 858 859 for wbits in (-15, 15, 31): 860 with self.subTest(wbits=wbits): 861 expected = HAMLET_SCENE 862 actual = zlib.decompress( 863 zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits 864 ) 865 self.assertEqual(expected, actual) 866 867def choose_lines(source, number, seed=None, generator=random): 868 """Return a list of number lines randomly chosen from the source""" 869 if seed is not None: 870 generator.seed(seed) 871 sources = source.split('\n') 872 return [generator.choice(sources) for n in range(number)] 873 874 875HAMLET_SCENE = b""" 876LAERTES 877 878 O, fear me not. 879 I stay too long: but here my father comes. 880 881 Enter POLONIUS 882 883 A double blessing is a double grace, 884 Occasion smiles upon a second leave. 885 886LORD POLONIUS 887 888 Yet here, Laertes! aboard, aboard, for shame! 889 The wind sits in the shoulder of your sail, 890 And you are stay'd for. There; my blessing with thee! 891 And these few precepts in thy memory 892 See thou character. Give thy thoughts no tongue, 893 Nor any unproportioned thought his act. 894 Be thou familiar, but by no means vulgar. 895 Those friends thou hast, and their adoption tried, 896 Grapple them to thy soul with hoops of steel; 897 But do not dull thy palm with entertainment 898 Of each new-hatch'd, unfledged comrade. Beware 899 Of entrance to a quarrel, but being in, 900 Bear't that the opposed may beware of thee. 901 Give every man thy ear, but few thy voice; 902 Take each man's censure, but reserve thy judgment. 903 Costly thy habit as thy purse can buy, 904 But not express'd in fancy; rich, not gaudy; 905 For the apparel oft proclaims the man, 906 And they in France of the best rank and station 907 Are of a most select and generous chief in that. 908 Neither a borrower nor a lender be; 909 For loan oft loses both itself and friend, 910 And borrowing dulls the edge of husbandry. 911 This above all: to thine ownself be true, 912 And it must follow, as the night the day, 913 Thou canst not then be false to any man. 914 Farewell: my blessing season this in thee! 915 916LAERTES 917 918 Most humbly do I take my leave, my lord. 919 920LORD POLONIUS 921 922 The time invites you; go; your servants tend. 923 924LAERTES 925 926 Farewell, Ophelia; and remember well 927 What I have said to you. 928 929OPHELIA 930 931 'Tis in my memory lock'd, 932 And you yourself shall keep the key of it. 933 934LAERTES 935 936 Farewell. 937""" 938 939 940class ZlibDecompressorTest(unittest.TestCase): 941 # Test adopted from test_bz2.py 942 TEXT = HAMLET_SCENE 943 DATA = zlib.compress(HAMLET_SCENE) 944 BAD_DATA = b"Not a valid deflate block" 945 BIG_TEXT = DATA * ((128 * 1024 // len(DATA)) + 1) 946 BIG_DATA = zlib.compress(BIG_TEXT) 947 948 def test_Constructor(self): 949 self.assertRaises(TypeError, zlib._ZlibDecompressor, "ASDA") 950 self.assertRaises(TypeError, zlib._ZlibDecompressor, -15, "notbytes") 951 self.assertRaises(TypeError, zlib._ZlibDecompressor, -15, b"bytes", 5) 952 953 def testDecompress(self): 954 zlibd = zlib._ZlibDecompressor() 955 self.assertRaises(TypeError, zlibd.decompress) 956 text = zlibd.decompress(self.DATA) 957 self.assertEqual(text, self.TEXT) 958 959 def testDecompressChunks10(self): 960 zlibd = zlib._ZlibDecompressor() 961 text = b'' 962 n = 0 963 while True: 964 str = self.DATA[n*10:(n+1)*10] 965 if not str: 966 break 967 text += zlibd.decompress(str) 968 n += 1 969 self.assertEqual(text, self.TEXT) 970 971 def testDecompressUnusedData(self): 972 zlibd = zlib._ZlibDecompressor() 973 unused_data = b"this is unused data" 974 text = zlibd.decompress(self.DATA+unused_data) 975 self.assertEqual(text, self.TEXT) 976 self.assertEqual(zlibd.unused_data, unused_data) 977 978 def testEOFError(self): 979 zlibd = zlib._ZlibDecompressor() 980 text = zlibd.decompress(self.DATA) 981 self.assertRaises(EOFError, zlibd.decompress, b"anything") 982 self.assertRaises(EOFError, zlibd.decompress, b"") 983 984 @support.skip_if_pgo_task 985 @bigmemtest(size=_4G + 100, memuse=3.3) 986 def testDecompress4G(self, size): 987 # "Test zlib._ZlibDecompressor.decompress() with >4GiB input" 988 blocksize = min(10 * 1024 * 1024, size) 989 block = random.randbytes(blocksize) 990 try: 991 data = block * ((size-1) // blocksize + 1) 992 compressed = zlib.compress(data) 993 zlibd = zlib._ZlibDecompressor() 994 decompressed = zlibd.decompress(compressed) 995 self.assertTrue(decompressed == data) 996 finally: 997 data = None 998 compressed = None 999 decompressed = None 1000 1001 def testPickle(self): 1002 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1003 with self.assertRaises(TypeError): 1004 pickle.dumps(zlib._ZlibDecompressor(), proto) 1005 1006 def testDecompressorChunksMaxsize(self): 1007 zlibd = zlib._ZlibDecompressor() 1008 max_length = 100 1009 out = [] 1010 1011 # Feed some input 1012 len_ = len(self.BIG_DATA) - 64 1013 out.append(zlibd.decompress(self.BIG_DATA[:len_], 1014 max_length=max_length)) 1015 self.assertFalse(zlibd.needs_input) 1016 self.assertEqual(len(out[-1]), max_length) 1017 1018 # Retrieve more data without providing more input 1019 out.append(zlibd.decompress(b'', max_length=max_length)) 1020 self.assertFalse(zlibd.needs_input) 1021 self.assertEqual(len(out[-1]), max_length) 1022 1023 # Retrieve more data while providing more input 1024 out.append(zlibd.decompress(self.BIG_DATA[len_:], 1025 max_length=max_length)) 1026 self.assertLessEqual(len(out[-1]), max_length) 1027 1028 # Retrieve remaining uncompressed data 1029 while not zlibd.eof: 1030 out.append(zlibd.decompress(b'', max_length=max_length)) 1031 self.assertLessEqual(len(out[-1]), max_length) 1032 1033 out = b"".join(out) 1034 self.assertEqual(out, self.BIG_TEXT) 1035 self.assertEqual(zlibd.unused_data, b"") 1036 1037 def test_decompressor_inputbuf_1(self): 1038 # Test reusing input buffer after moving existing 1039 # contents to beginning 1040 zlibd = zlib._ZlibDecompressor() 1041 out = [] 1042 1043 # Create input buffer and fill it 1044 self.assertEqual(zlibd.decompress(self.DATA[:100], 1045 max_length=0), b'') 1046 1047 # Retrieve some results, freeing capacity at beginning 1048 # of input buffer 1049 out.append(zlibd.decompress(b'', 2)) 1050 1051 # Add more data that fits into input buffer after 1052 # moving existing data to beginning 1053 out.append(zlibd.decompress(self.DATA[100:105], 15)) 1054 1055 # Decompress rest of data 1056 out.append(zlibd.decompress(self.DATA[105:])) 1057 self.assertEqual(b''.join(out), self.TEXT) 1058 1059 def test_decompressor_inputbuf_2(self): 1060 # Test reusing input buffer by appending data at the 1061 # end right away 1062 zlibd = zlib._ZlibDecompressor() 1063 out = [] 1064 1065 # Create input buffer and empty it 1066 self.assertEqual(zlibd.decompress(self.DATA[:200], 1067 max_length=0), b'') 1068 out.append(zlibd.decompress(b'')) 1069 1070 # Fill buffer with new data 1071 out.append(zlibd.decompress(self.DATA[200:280], 2)) 1072 1073 # Append some more data, not enough to require resize 1074 out.append(zlibd.decompress(self.DATA[280:300], 2)) 1075 1076 # Decompress rest of data 1077 out.append(zlibd.decompress(self.DATA[300:])) 1078 self.assertEqual(b''.join(out), self.TEXT) 1079 1080 def test_decompressor_inputbuf_3(self): 1081 # Test reusing input buffer after extending it 1082 1083 zlibd = zlib._ZlibDecompressor() 1084 out = [] 1085 1086 # Create almost full input buffer 1087 out.append(zlibd.decompress(self.DATA[:200], 5)) 1088 1089 # Add even more data to it, requiring resize 1090 out.append(zlibd.decompress(self.DATA[200:300], 5)) 1091 1092 # Decompress rest of data 1093 out.append(zlibd.decompress(self.DATA[300:])) 1094 self.assertEqual(b''.join(out), self.TEXT) 1095 1096 def test_failure(self): 1097 zlibd = zlib._ZlibDecompressor() 1098 self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30) 1099 # Previously, a second call could crash due to internal inconsistency 1100 self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30) 1101 1102 @support.refcount_test 1103 def test_refleaks_in___init__(self): 1104 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 1105 zlibd = zlib._ZlibDecompressor() 1106 refs_before = gettotalrefcount() 1107 for i in range(100): 1108 zlibd.__init__() 1109 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 1110 1111 1112class CustomInt: 1113 def __index__(self): 1114 return 100 1115 1116 1117if __name__ == "__main__": 1118 unittest.main() 1119