1import unittest 2from test import support 3from test.support import import_helper 4import binascii 5import copy 6import pickle 7import random 8import sys 9from test.support import bigmemtest, _1G, _4G 10 11 12zlib = import_helper.import_module('zlib') 13 14requires_Compress_copy = unittest.skipUnless( 15 hasattr(zlib.compressobj(), "copy"), 16 'requires Compress.copy()') 17requires_Decompress_copy = unittest.skipUnless( 18 hasattr(zlib.decompressobj(), "copy"), 19 'requires Decompress.copy()') 20 21 22class VersionTestCase(unittest.TestCase): 23 24 def test_library_version(self): 25 # Test that the major version of the actual library in use matches the 26 # major version that we were compiled against. We can't guarantee that 27 # the minor versions will match (even on the machine on which the module 28 # was compiled), and the API is stable between minor versions, so 29 # testing only the major versions avoids spurious failures. 30 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) 31 32 33class ChecksumTestCase(unittest.TestCase): 34 # checksum test cases 35 def test_crc32start(self): 36 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) 37 self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) 38 39 def test_crc32empty(self): 40 self.assertEqual(zlib.crc32(b"", 0), 0) 41 self.assertEqual(zlib.crc32(b"", 1), 1) 42 self.assertEqual(zlib.crc32(b"", 432), 432) 43 44 def test_adler32start(self): 45 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) 46 self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) 47 48 def test_adler32empty(self): 49 self.assertEqual(zlib.adler32(b"", 0), 0) 50 self.assertEqual(zlib.adler32(b"", 1), 1) 51 self.assertEqual(zlib.adler32(b"", 432), 432) 52 53 def test_penguins(self): 54 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) 55 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) 56 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) 57 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) 58 59 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) 60 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) 61 62 def test_crc32_adler32_unsigned(self): 63 foo = b'abcdefghijklmnop' 64 # explicitly test signed behavior 65 self.assertEqual(zlib.crc32(foo), 2486878355) 66 self.assertEqual(zlib.crc32(b'spam'), 1138425661) 67 self.assertEqual(zlib.adler32(foo+foo), 3573550353) 68 self.assertEqual(zlib.adler32(b'spam'), 72286642) 69 70 def test_same_as_binascii_crc32(self): 71 foo = b'abcdefghijklmnop' 72 crc = 2486878355 73 self.assertEqual(binascii.crc32(foo), crc) 74 self.assertEqual(zlib.crc32(foo), crc) 75 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) 76 77 78# Issue #10276 - check that inputs >=4 GiB are handled correctly. 79class ChecksumBigBufferTestCase(unittest.TestCase): 80 81 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) 82 def test_big_buffer(self, size): 83 data = b"nyan" * (_1G + 1) 84 self.assertEqual(zlib.crc32(data), 1044521549) 85 self.assertEqual(zlib.adler32(data), 2256789997) 86 87 88class ExceptionTestCase(unittest.TestCase): 89 # make sure we generate some expected errors 90 def test_badlevel(self): 91 # specifying compression level out of range causes an error 92 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 93 # accepts 0 too) 94 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) 95 96 def test_badargs(self): 97 self.assertRaises(TypeError, zlib.adler32) 98 self.assertRaises(TypeError, zlib.crc32) 99 self.assertRaises(TypeError, zlib.compress) 100 self.assertRaises(TypeError, zlib.decompress) 101 for arg in (42, None, '', 'abc', (), []): 102 self.assertRaises(TypeError, zlib.adler32, arg) 103 self.assertRaises(TypeError, zlib.crc32, arg) 104 self.assertRaises(TypeError, zlib.compress, arg) 105 self.assertRaises(TypeError, zlib.decompress, arg) 106 107 def test_badcompressobj(self): 108 # verify failure on building compress object with bad params 109 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 110 # specifying total bits too large causes an error 111 self.assertRaises(ValueError, 112 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 113 114 def test_baddecompressobj(self): 115 # verify failure on building decompress object with bad params 116 self.assertRaises(ValueError, zlib.decompressobj, -1) 117 118 def test_decompressobj_badflush(self): 119 # verify failure on calling decompressobj.flush with bad params 120 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 121 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 122 123 @support.cpython_only 124 def test_overflow(self): 125 with self.assertRaisesRegex(OverflowError, 'int too large'): 126 zlib.decompress(b'', 15, sys.maxsize + 1) 127 with self.assertRaisesRegex(OverflowError, 'int too large'): 128 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 129 with self.assertRaisesRegex(OverflowError, 'int too large'): 130 zlib.decompressobj().flush(sys.maxsize + 1) 131 132 @support.cpython_only 133 def test_disallow_instantiation(self): 134 # Ensure that the type disallows instantiation (bpo-43916) 135 support.check_disallow_instantiation(self, type(zlib.compressobj())) 136 support.check_disallow_instantiation(self, type(zlib.decompressobj())) 137 138 139class BaseCompressTestCase(object): 140 def check_big_compress_buffer(self, size, compress_func): 141 _1M = 1024 * 1024 142 # Generate 10 MiB worth of random, and expand it by repeating it. 143 # The assumption is that zlib's memory is not big enough to exploit 144 # such spread out redundancy. 145 data = random.randbytes(_1M * 10) 146 data = data * (size // len(data) + 1) 147 try: 148 compress_func(data) 149 finally: 150 # Release memory 151 data = None 152 153 def check_big_decompress_buffer(self, size, decompress_func): 154 data = b'x' * size 155 try: 156 compressed = zlib.compress(data, 1) 157 finally: 158 # Release memory 159 data = None 160 data = decompress_func(compressed) 161 # Sanity check 162 try: 163 self.assertEqual(len(data), size) 164 self.assertEqual(len(data.strip(b'x')), 0) 165 finally: 166 data = None 167 168 169class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 170 # Test compression in one go (whole message compression) 171 def test_speech(self): 172 x = zlib.compress(HAMLET_SCENE) 173 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 174 175 def test_keywords(self): 176 x = zlib.compress(HAMLET_SCENE, level=3) 177 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 178 with self.assertRaises(TypeError): 179 zlib.compress(data=HAMLET_SCENE, level=3) 180 self.assertEqual(zlib.decompress(x, 181 wbits=zlib.MAX_WBITS, 182 bufsize=zlib.DEF_BUF_SIZE), 183 HAMLET_SCENE) 184 185 def test_speech128(self): 186 # compress more data 187 data = HAMLET_SCENE * 128 188 x = zlib.compress(data) 189 self.assertEqual(zlib.compress(bytearray(data)), x) 190 for ob in x, bytearray(x): 191 self.assertEqual(zlib.decompress(ob), data) 192 193 def test_incomplete_stream(self): 194 # A useful error message is given 195 x = zlib.compress(HAMLET_SCENE) 196 self.assertRaisesRegex(zlib.error, 197 "Error -5 while decompressing data: incomplete or truncated stream", 198 zlib.decompress, x[:-1]) 199 200 # Memory use of the following functions takes into account overallocation 201 202 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 203 def test_big_compress_buffer(self, size): 204 compress = lambda s: zlib.compress(s, 1) 205 self.check_big_compress_buffer(size, compress) 206 207 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 208 def test_big_decompress_buffer(self, size): 209 self.check_big_decompress_buffer(size, zlib.decompress) 210 211 @bigmemtest(size=_4G, memuse=1) 212 def test_large_bufsize(self, size): 213 # Test decompress(bufsize) parameter greater than the internal limit 214 data = HAMLET_SCENE * 10 215 compressed = zlib.compress(data, 1) 216 self.assertEqual(zlib.decompress(compressed, 15, size), data) 217 218 def test_custom_bufsize(self): 219 data = HAMLET_SCENE * 10 220 compressed = zlib.compress(data, 1) 221 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 222 223 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 224 @bigmemtest(size=_4G + 100, memuse=4) 225 def test_64bit_compress(self, size): 226 data = b'x' * size 227 try: 228 comp = zlib.compress(data, 0) 229 self.assertEqual(zlib.decompress(comp), data) 230 finally: 231 comp = data = None 232 233 234class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 235 # Test compression object 236 def test_pair(self): 237 # straightforward compress/decompress objects 238 datasrc = HAMLET_SCENE * 128 239 datazip = zlib.compress(datasrc) 240 # should compress both bytes and bytearray data 241 for data in (datasrc, bytearray(datasrc)): 242 co = zlib.compressobj() 243 x1 = co.compress(data) 244 x2 = co.flush() 245 self.assertRaises(zlib.error, co.flush) # second flush should not work 246 self.assertEqual(x1 + x2, datazip) 247 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): 248 dco = zlib.decompressobj() 249 y1 = dco.decompress(v1 + v2) 250 y2 = dco.flush() 251 self.assertEqual(data, y1 + y2) 252 self.assertIsInstance(dco.unconsumed_tail, bytes) 253 self.assertIsInstance(dco.unused_data, bytes) 254 255 def test_keywords(self): 256 level = 2 257 method = zlib.DEFLATED 258 wbits = -12 259 memLevel = 9 260 strategy = zlib.Z_FILTERED 261 co = zlib.compressobj(level=level, 262 method=method, 263 wbits=wbits, 264 memLevel=memLevel, 265 strategy=strategy, 266 zdict=b"") 267 do = zlib.decompressobj(wbits=wbits, zdict=b"") 268 with self.assertRaises(TypeError): 269 co.compress(data=HAMLET_SCENE) 270 with self.assertRaises(TypeError): 271 do.decompress(data=zlib.compress(HAMLET_SCENE)) 272 x = co.compress(HAMLET_SCENE) + co.flush() 273 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() 274 self.assertEqual(HAMLET_SCENE, y) 275 276 def test_compressoptions(self): 277 # specify lots of options to compressobj() 278 level = 2 279 method = zlib.DEFLATED 280 wbits = -12 281 memLevel = 9 282 strategy = zlib.Z_FILTERED 283 co = zlib.compressobj(level, method, wbits, memLevel, strategy) 284 x1 = co.compress(HAMLET_SCENE) 285 x2 = co.flush() 286 dco = zlib.decompressobj(wbits) 287 y1 = dco.decompress(x1 + x2) 288 y2 = dco.flush() 289 self.assertEqual(HAMLET_SCENE, y1 + y2) 290 291 def test_compressincremental(self): 292 # compress object in steps, decompress object as one-shot 293 data = HAMLET_SCENE * 128 294 co = zlib.compressobj() 295 bufs = [] 296 for i in range(0, len(data), 256): 297 bufs.append(co.compress(data[i:i+256])) 298 bufs.append(co.flush()) 299 combuf = b''.join(bufs) 300 301 dco = zlib.decompressobj() 302 y1 = dco.decompress(b''.join(bufs)) 303 y2 = dco.flush() 304 self.assertEqual(data, y1 + y2) 305 306 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 307 # compress object in steps, decompress object in steps 308 source = source or HAMLET_SCENE 309 data = source * 128 310 co = zlib.compressobj() 311 bufs = [] 312 for i in range(0, len(data), cx): 313 bufs.append(co.compress(data[i:i+cx])) 314 bufs.append(co.flush()) 315 combuf = b''.join(bufs) 316 317 decombuf = zlib.decompress(combuf) 318 # Test type of return value 319 self.assertIsInstance(decombuf, bytes) 320 321 self.assertEqual(data, decombuf) 322 323 dco = zlib.decompressobj() 324 bufs = [] 325 for i in range(0, len(combuf), dcx): 326 bufs.append(dco.decompress(combuf[i:i+dcx])) 327 self.assertEqual(b'', dco.unconsumed_tail, ######## 328 "(A) uct should be b'': not %d long" % 329 len(dco.unconsumed_tail)) 330 self.assertEqual(b'', dco.unused_data) 331 if flush: 332 bufs.append(dco.flush()) 333 else: 334 while True: 335 chunk = dco.decompress(b'') 336 if chunk: 337 bufs.append(chunk) 338 else: 339 break 340 self.assertEqual(b'', dco.unconsumed_tail, ######## 341 "(B) uct should be b'': not %d long" % 342 len(dco.unconsumed_tail)) 343 self.assertEqual(b'', dco.unused_data) 344 self.assertEqual(data, b''.join(bufs)) 345 # Failure means: "decompressobj with init options failed" 346 347 def test_decompincflush(self): 348 self.test_decompinc(flush=True) 349 350 def test_decompimax(self, source=None, cx=256, dcx=64): 351 # compress in steps, decompress in length-restricted steps 352 source = source or HAMLET_SCENE 353 # Check a decompression object with max_length specified 354 data = source * 128 355 co = zlib.compressobj() 356 bufs = [] 357 for i in range(0, len(data), cx): 358 bufs.append(co.compress(data[i:i+cx])) 359 bufs.append(co.flush()) 360 combuf = b''.join(bufs) 361 self.assertEqual(data, zlib.decompress(combuf), 362 'compressed data failure') 363 364 dco = zlib.decompressobj() 365 bufs = [] 366 cb = combuf 367 while cb: 368 #max_length = 1 + len(cb)//10 369 chunk = dco.decompress(cb, dcx) 370 self.assertFalse(len(chunk) > dcx, 371 'chunk too big (%d>%d)' % (len(chunk), dcx)) 372 bufs.append(chunk) 373 cb = dco.unconsumed_tail 374 bufs.append(dco.flush()) 375 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 376 377 def test_decompressmaxlen(self, flush=False): 378 # Check a decompression object with max_length specified 379 data = HAMLET_SCENE * 128 380 co = zlib.compressobj() 381 bufs = [] 382 for i in range(0, len(data), 256): 383 bufs.append(co.compress(data[i:i+256])) 384 bufs.append(co.flush()) 385 combuf = b''.join(bufs) 386 self.assertEqual(data, zlib.decompress(combuf), 387 'compressed data failure') 388 389 dco = zlib.decompressobj() 390 bufs = [] 391 cb = combuf 392 while cb: 393 max_length = 1 + len(cb)//10 394 chunk = dco.decompress(cb, max_length) 395 self.assertFalse(len(chunk) > max_length, 396 'chunk too big (%d>%d)' % (len(chunk),max_length)) 397 bufs.append(chunk) 398 cb = dco.unconsumed_tail 399 if flush: 400 bufs.append(dco.flush()) 401 else: 402 while chunk: 403 chunk = dco.decompress(b'', max_length) 404 self.assertFalse(len(chunk) > max_length, 405 'chunk too big (%d>%d)' % (len(chunk),max_length)) 406 bufs.append(chunk) 407 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 408 409 def test_decompressmaxlenflush(self): 410 self.test_decompressmaxlen(flush=True) 411 412 def test_maxlenmisc(self): 413 # Misc tests of max_length 414 dco = zlib.decompressobj() 415 self.assertRaises(ValueError, dco.decompress, b"", -1) 416 self.assertEqual(b'', dco.unconsumed_tail) 417 418 def test_maxlen_large(self): 419 # Sizes up to sys.maxsize should be accepted, although zlib is 420 # internally limited to expressing sizes with unsigned int 421 data = HAMLET_SCENE * 10 422 self.assertGreater(len(data), zlib.DEF_BUF_SIZE) 423 compressed = zlib.compress(data, 1) 424 dco = zlib.decompressobj() 425 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 426 427 def test_maxlen_custom(self): 428 data = HAMLET_SCENE * 10 429 compressed = zlib.compress(data, 1) 430 dco = zlib.decompressobj() 431 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 432 433 def test_clear_unconsumed_tail(self): 434 # Issue #12050: calling decompress() without providing max_length 435 # should clear the unconsumed_tail attribute. 436 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" 437 dco = zlib.decompressobj() 438 ddata = dco.decompress(cdata, 1) 439 ddata += dco.decompress(dco.unconsumed_tail) 440 self.assertEqual(dco.unconsumed_tail, b"") 441 442 def test_flushes(self): 443 # Test flush() with the various options, using all the 444 # different levels in order to provide more variations. 445 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH', 446 'Z_PARTIAL_FLUSH'] 447 448 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.')) 449 # Z_BLOCK has a known failure prior to 1.2.5.3 450 if ver >= (1, 2, 5, 3): 451 sync_opt.append('Z_BLOCK') 452 453 sync_opt = [getattr(zlib, opt) for opt in sync_opt 454 if hasattr(zlib, opt)] 455 data = HAMLET_SCENE * 8 456 457 for sync in sync_opt: 458 for level in range(10): 459 try: 460 obj = zlib.compressobj( level ) 461 a = obj.compress( data[:3000] ) 462 b = obj.flush( sync ) 463 c = obj.compress( data[3000:] ) 464 d = obj.flush() 465 except: 466 print("Error for flush mode={}, level={}" 467 .format(sync, level)) 468 raise 469 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), 470 data, ("Decompress failed: flush " 471 "mode=%i, level=%i") % (sync, level)) 472 del obj 473 474 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 475 'requires zlib.Z_SYNC_FLUSH') 476 def test_odd_flush(self): 477 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 478 import random 479 # Testing on 17K of "random" data 480 481 # Create compressor and decompressor objects 482 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 483 dco = zlib.decompressobj() 484 485 # Try 17K of data 486 # generate random data stream 487 try: 488 # In 2.3 and later, WichmannHill is the RNG of the bug report 489 gen = random.WichmannHill() 490 except AttributeError: 491 try: 492 # 2.2 called it Random 493 gen = random.Random() 494 except AttributeError: 495 # others might simply have a single RNG 496 gen = random 497 gen.seed(1) 498 data = gen.randbytes(17 * 1024) 499 500 # compress, sync-flush, and decompress 501 first = co.compress(data) 502 second = co.flush(zlib.Z_SYNC_FLUSH) 503 expanded = dco.decompress(first + second) 504 505 # if decompressed data is different from the input data, choke. 506 self.assertEqual(expanded, data, "17K random source doesn't match") 507 508 def test_empty_flush(self): 509 # Test that calling .flush() on unused objects works. 510 # (Bug #1083110 -- calling .flush() on decompress objects 511 # caused a core dump.) 512 513 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 514 self.assertTrue(co.flush()) # Returns a zlib header 515 dco = zlib.decompressobj() 516 self.assertEqual(dco.flush(), b"") # Returns nothing 517 518 def test_dictionary(self): 519 h = HAMLET_SCENE 520 # Build a simulated dictionary out of the words in HAMLET. 521 words = h.split() 522 random.shuffle(words) 523 zdict = b''.join(words) 524 # Use it to compress HAMLET. 525 co = zlib.compressobj(zdict=zdict) 526 cd = co.compress(h) + co.flush() 527 # Verify that it will decompress with the dictionary. 528 dco = zlib.decompressobj(zdict=zdict) 529 self.assertEqual(dco.decompress(cd) + dco.flush(), h) 530 # Verify that it fails when not given the dictionary. 531 dco = zlib.decompressobj() 532 self.assertRaises(zlib.error, dco.decompress, cd) 533 534 def test_dictionary_streaming(self): 535 # This simulates the reuse of a compressor object for compressing 536 # several separate data streams. 537 co = zlib.compressobj(zdict=HAMLET_SCENE) 538 do = zlib.decompressobj(zdict=HAMLET_SCENE) 539 piece = HAMLET_SCENE[1000:1500] 540 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) 541 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) 542 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) 543 self.assertEqual(do.decompress(d0), piece) 544 self.assertEqual(do.decompress(d1), piece[100:]) 545 self.assertEqual(do.decompress(d2), piece[:-100]) 546 547 def test_decompress_incomplete_stream(self): 548 # This is 'foo', deflated 549 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 550 # For the record 551 self.assertEqual(zlib.decompress(x), b'foo') 552 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 553 # Omitting the stream end works with decompressor objects 554 # (see issue #8672). 555 dco = zlib.decompressobj() 556 y = dco.decompress(x[:-5]) 557 y += dco.flush() 558 self.assertEqual(y, b'foo') 559 560 def test_decompress_eof(self): 561 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 562 dco = zlib.decompressobj() 563 self.assertFalse(dco.eof) 564 dco.decompress(x[:-5]) 565 self.assertFalse(dco.eof) 566 dco.decompress(x[-5:]) 567 self.assertTrue(dco.eof) 568 dco.flush() 569 self.assertTrue(dco.eof) 570 571 def test_decompress_eof_incomplete_stream(self): 572 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 573 dco = zlib.decompressobj() 574 self.assertFalse(dco.eof) 575 dco.decompress(x[:-5]) 576 self.assertFalse(dco.eof) 577 dco.flush() 578 self.assertFalse(dco.eof) 579 580 def test_decompress_unused_data(self): 581 # Repeated calls to decompress() after EOF should accumulate data in 582 # dco.unused_data, instead of just storing the arg to the last call. 583 source = b'abcdefghijklmnopqrstuvwxyz' 584 remainder = b'0123456789' 585 y = zlib.compress(source) 586 x = y + remainder 587 for maxlen in 0, 1000: 588 for step in 1, 2, len(y), len(x): 589 dco = zlib.decompressobj() 590 data = b'' 591 for i in range(0, len(x), step): 592 if i < len(y): 593 self.assertEqual(dco.unused_data, b'') 594 if maxlen == 0: 595 data += dco.decompress(x[i : i + step]) 596 self.assertEqual(dco.unconsumed_tail, b'') 597 else: 598 data += dco.decompress( 599 dco.unconsumed_tail + x[i : i + step], maxlen) 600 data += dco.flush() 601 self.assertTrue(dco.eof) 602 self.assertEqual(data, source) 603 self.assertEqual(dco.unconsumed_tail, b'') 604 self.assertEqual(dco.unused_data, remainder) 605 606 # issue27164 607 def test_decompress_raw_with_dictionary(self): 608 zdict = b'abcdefghijklmnopqrstuvwxyz' 609 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 610 comp = co.compress(zdict) + co.flush() 611 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 612 uncomp = dco.decompress(comp) + dco.flush() 613 self.assertEqual(zdict, uncomp) 614 615 def test_flush_with_freed_input(self): 616 # Issue #16411: decompressor accesses input to last decompress() call 617 # in flush(), even if this object has been freed in the meanwhile. 618 input1 = b'abcdefghijklmnopqrstuvwxyz' 619 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' 620 data = zlib.compress(input1) 621 dco = zlib.decompressobj() 622 dco.decompress(data, 1) 623 del data 624 data = zlib.compress(input2) 625 self.assertEqual(dco.flush(), input1[1:]) 626 627 @bigmemtest(size=_4G, memuse=1) 628 def test_flush_large_length(self, size): 629 # Test flush(length) parameter greater than internal limit UINT_MAX 630 input = HAMLET_SCENE * 10 631 data = zlib.compress(input, 1) 632 dco = zlib.decompressobj() 633 dco.decompress(data, 1) 634 self.assertEqual(dco.flush(size), input[1:]) 635 636 def test_flush_custom_length(self): 637 input = HAMLET_SCENE * 10 638 data = zlib.compress(input, 1) 639 dco = zlib.decompressobj() 640 dco.decompress(data, 1) 641 self.assertEqual(dco.flush(CustomInt()), input[1:]) 642 643 @requires_Compress_copy 644 def test_compresscopy(self): 645 # Test copying a compression object 646 data0 = HAMLET_SCENE 647 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") 648 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 649 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 650 bufs0 = [] 651 bufs0.append(c0.compress(data0)) 652 653 c1 = func(c0) 654 bufs1 = bufs0[:] 655 656 bufs0.append(c0.compress(data0)) 657 bufs0.append(c0.flush()) 658 s0 = b''.join(bufs0) 659 660 bufs1.append(c1.compress(data1)) 661 bufs1.append(c1.flush()) 662 s1 = b''.join(bufs1) 663 664 self.assertEqual(zlib.decompress(s0),data0+data0) 665 self.assertEqual(zlib.decompress(s1),data0+data1) 666 667 @requires_Compress_copy 668 def test_badcompresscopy(self): 669 # Test copying a compression object in an inconsistent state 670 c = zlib.compressobj() 671 c.compress(HAMLET_SCENE) 672 c.flush() 673 self.assertRaises(ValueError, c.copy) 674 self.assertRaises(ValueError, copy.copy, c) 675 self.assertRaises(ValueError, copy.deepcopy, c) 676 677 @requires_Decompress_copy 678 def test_decompresscopy(self): 679 # Test copying a decompression object 680 data = HAMLET_SCENE 681 comp = zlib.compress(data) 682 # Test type of return value 683 self.assertIsInstance(comp, bytes) 684 685 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 686 d0 = zlib.decompressobj() 687 bufs0 = [] 688 bufs0.append(d0.decompress(comp[:32])) 689 690 d1 = func(d0) 691 bufs1 = bufs0[:] 692 693 bufs0.append(d0.decompress(comp[32:])) 694 s0 = b''.join(bufs0) 695 696 bufs1.append(d1.decompress(comp[32:])) 697 s1 = b''.join(bufs1) 698 699 self.assertEqual(s0,s1) 700 self.assertEqual(s0,data) 701 702 @requires_Decompress_copy 703 def test_baddecompresscopy(self): 704 # Test copying a compression object in an inconsistent state 705 data = zlib.compress(HAMLET_SCENE) 706 d = zlib.decompressobj() 707 d.decompress(data) 708 d.flush() 709 self.assertRaises(ValueError, d.copy) 710 self.assertRaises(ValueError, copy.copy, d) 711 self.assertRaises(ValueError, copy.deepcopy, d) 712 713 def test_compresspickle(self): 714 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 715 with self.assertRaises((TypeError, pickle.PicklingError)): 716 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 717 718 def test_decompresspickle(self): 719 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 720 with self.assertRaises((TypeError, pickle.PicklingError)): 721 pickle.dumps(zlib.decompressobj(), proto) 722 723 # Memory use of the following functions takes into account overallocation 724 725 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 726 def test_big_compress_buffer(self, size): 727 c = zlib.compressobj(1) 728 compress = lambda s: c.compress(s) + c.flush() 729 self.check_big_compress_buffer(size, compress) 730 731 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 732 def test_big_decompress_buffer(self, size): 733 d = zlib.decompressobj() 734 decompress = lambda s: d.decompress(s) + d.flush() 735 self.check_big_decompress_buffer(size, decompress) 736 737 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 738 @bigmemtest(size=_4G + 100, memuse=4) 739 def test_64bit_compress(self, size): 740 data = b'x' * size 741 co = zlib.compressobj(0) 742 do = zlib.decompressobj() 743 try: 744 comp = co.compress(data) + co.flush() 745 uncomp = do.decompress(comp) + do.flush() 746 self.assertEqual(uncomp, data) 747 finally: 748 comp = uncomp = data = None 749 750 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 751 @bigmemtest(size=_4G + 100, memuse=3) 752 def test_large_unused_data(self, size): 753 data = b'abcdefghijklmnop' 754 unused = b'x' * size 755 comp = zlib.compress(data) + unused 756 do = zlib.decompressobj() 757 try: 758 uncomp = do.decompress(comp) + do.flush() 759 self.assertEqual(unused, do.unused_data) 760 self.assertEqual(uncomp, data) 761 finally: 762 unused = comp = do = None 763 764 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 765 @bigmemtest(size=_4G + 100, memuse=5) 766 def test_large_unconsumed_tail(self, size): 767 data = b'x' * size 768 do = zlib.decompressobj() 769 try: 770 comp = zlib.compress(data, 0) 771 uncomp = do.decompress(comp, 1) + do.flush() 772 self.assertEqual(uncomp, data) 773 self.assertEqual(do.unconsumed_tail, b'') 774 finally: 775 comp = uncomp = data = None 776 777 def test_wbits(self): 778 # wbits=0 only supported since zlib v1.2.3.5 779 # Register "1.2.3" as "1.2.3.0" 780 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux" 781 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.') 782 if len(v) < 4: 783 v.append('0') 784 elif not v[-1].isnumeric(): 785 v[-1] = '0' 786 787 v = tuple(map(int, v)) 788 supports_wbits_0 = v >= (1, 2, 3, 5) 789 790 co = zlib.compressobj(level=1, wbits=15) 791 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 792 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 793 if supports_wbits_0: 794 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) 795 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 796 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 797 zlib.decompress(zlib15, 14) 798 dco = zlib.decompressobj(wbits=32 + 15) 799 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 800 dco = zlib.decompressobj(wbits=14) 801 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 802 dco.decompress(zlib15) 803 804 co = zlib.compressobj(level=1, wbits=9) 805 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 806 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 807 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 808 if supports_wbits_0: 809 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) 810 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 811 dco = zlib.decompressobj(wbits=32 + 9) 812 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 813 814 co = zlib.compressobj(level=1, wbits=-15) 815 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 816 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 817 dco = zlib.decompressobj(wbits=-15) 818 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 819 820 co = zlib.compressobj(level=1, wbits=-9) 821 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 822 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 823 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 824 dco = zlib.decompressobj(wbits=-9) 825 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 826 827 co = zlib.compressobj(level=1, wbits=16 + 15) 828 gzip = co.compress(HAMLET_SCENE) + co.flush() 829 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 830 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 831 dco = zlib.decompressobj(32 + 15) 832 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 833 834 835def choose_lines(source, number, seed=None, generator=random): 836 """Return a list of number lines randomly chosen from the source""" 837 if seed is not None: 838 generator.seed(seed) 839 sources = source.split('\n') 840 return [generator.choice(sources) for n in range(number)] 841 842 843HAMLET_SCENE = b""" 844LAERTES 845 846 O, fear me not. 847 I stay too long: but here my father comes. 848 849 Enter POLONIUS 850 851 A double blessing is a double grace, 852 Occasion smiles upon a second leave. 853 854LORD POLONIUS 855 856 Yet here, Laertes! aboard, aboard, for shame! 857 The wind sits in the shoulder of your sail, 858 And you are stay'd for. There; my blessing with thee! 859 And these few precepts in thy memory 860 See thou character. Give thy thoughts no tongue, 861 Nor any unproportioned thought his act. 862 Be thou familiar, but by no means vulgar. 863 Those friends thou hast, and their adoption tried, 864 Grapple them to thy soul with hoops of steel; 865 But do not dull thy palm with entertainment 866 Of each new-hatch'd, unfledged comrade. Beware 867 Of entrance to a quarrel, but being in, 868 Bear't that the opposed may beware of thee. 869 Give every man thy ear, but few thy voice; 870 Take each man's censure, but reserve thy judgment. 871 Costly thy habit as thy purse can buy, 872 But not express'd in fancy; rich, not gaudy; 873 For the apparel oft proclaims the man, 874 And they in France of the best rank and station 875 Are of a most select and generous chief in that. 876 Neither a borrower nor a lender be; 877 For loan oft loses both itself and friend, 878 And borrowing dulls the edge of husbandry. 879 This above all: to thine ownself be true, 880 And it must follow, as the night the day, 881 Thou canst not then be false to any man. 882 Farewell: my blessing season this in thee! 883 884LAERTES 885 886 Most humbly do I take my leave, my lord. 887 888LORD POLONIUS 889 890 The time invites you; go; your servants tend. 891 892LAERTES 893 894 Farewell, Ophelia; and remember well 895 What I have said to you. 896 897OPHELIA 898 899 'Tis in my memory lock'd, 900 And you yourself shall keep the key of it. 901 902LAERTES 903 904 Farewell. 905""" 906 907 908class CustomInt: 909 def __index__(self): 910 return 100 911 912 913if __name__ == "__main__": 914 unittest.main() 915