1import unittest 2from test import support 3import binascii 4import copy 5import pickle 6import random 7import sys 8from test.support import bigmemtest, _1G, _4G 9 10zlib = support.import_module('zlib') 11 12requires_Compress_copy = unittest.skipUnless( 13 hasattr(zlib.compressobj(), "copy"), 14 'requires Compress.copy()') 15requires_Decompress_copy = unittest.skipUnless( 16 hasattr(zlib.decompressobj(), "copy"), 17 'requires Decompress.copy()') 18 19 20class VersionTestCase(unittest.TestCase): 21 22 def test_library_version(self): 23 # Test that the major version of the actual library in use matches the 24 # major version that we were compiled against. We can't guarantee that 25 # the minor versions will match (even on the machine on which the module 26 # was compiled), and the API is stable between minor versions, so 27 # testing only the major versions avoids spurious failures. 28 self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0]) 29 30 31class ChecksumTestCase(unittest.TestCase): 32 # checksum test cases 33 def test_crc32start(self): 34 self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0)) 35 self.assertTrue(zlib.crc32(b"abc", 0xffffffff)) 36 37 def test_crc32empty(self): 38 self.assertEqual(zlib.crc32(b"", 0), 0) 39 self.assertEqual(zlib.crc32(b"", 1), 1) 40 self.assertEqual(zlib.crc32(b"", 432), 432) 41 42 def test_adler32start(self): 43 self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1)) 44 self.assertTrue(zlib.adler32(b"abc", 0xffffffff)) 45 46 def test_adler32empty(self): 47 self.assertEqual(zlib.adler32(b"", 0), 0) 48 self.assertEqual(zlib.adler32(b"", 1), 1) 49 self.assertEqual(zlib.adler32(b"", 432), 432) 50 51 def test_penguins(self): 52 self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120) 53 self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94) 54 self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6) 55 self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7) 56 57 self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0)) 58 self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1)) 59 60 def test_crc32_adler32_unsigned(self): 61 foo = b'abcdefghijklmnop' 62 # explicitly test signed behavior 63 self.assertEqual(zlib.crc32(foo), 2486878355) 64 self.assertEqual(zlib.crc32(b'spam'), 1138425661) 65 self.assertEqual(zlib.adler32(foo+foo), 3573550353) 66 self.assertEqual(zlib.adler32(b'spam'), 72286642) 67 68 def test_same_as_binascii_crc32(self): 69 foo = b'abcdefghijklmnop' 70 crc = 2486878355 71 self.assertEqual(binascii.crc32(foo), crc) 72 self.assertEqual(zlib.crc32(foo), crc) 73 self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam')) 74 75 76# Issue #10276 - check that inputs >=4 GiB are handled correctly. 77class ChecksumBigBufferTestCase(unittest.TestCase): 78 79 @bigmemtest(size=_4G + 4, memuse=1, dry_run=False) 80 def test_big_buffer(self, size): 81 data = b"nyan" * (_1G + 1) 82 self.assertEqual(zlib.crc32(data), 1044521549) 83 self.assertEqual(zlib.adler32(data), 2256789997) 84 85 86class ExceptionTestCase(unittest.TestCase): 87 # make sure we generate some expected errors 88 def test_badlevel(self): 89 # specifying compression level out of range causes an error 90 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 91 # accepts 0 too) 92 self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10) 93 94 def test_badargs(self): 95 self.assertRaises(TypeError, zlib.adler32) 96 self.assertRaises(TypeError, zlib.crc32) 97 self.assertRaises(TypeError, zlib.compress) 98 self.assertRaises(TypeError, zlib.decompress) 99 for arg in (42, None, '', 'abc', (), []): 100 self.assertRaises(TypeError, zlib.adler32, arg) 101 self.assertRaises(TypeError, zlib.crc32, arg) 102 self.assertRaises(TypeError, zlib.compress, arg) 103 self.assertRaises(TypeError, zlib.decompress, arg) 104 105 def test_badcompressobj(self): 106 # verify failure on building compress object with bad params 107 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 108 # specifying total bits too large causes an error 109 self.assertRaises(ValueError, 110 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 111 112 def test_baddecompressobj(self): 113 # verify failure on building decompress object with bad params 114 self.assertRaises(ValueError, zlib.decompressobj, -1) 115 116 def test_decompressobj_badflush(self): 117 # verify failure on calling decompressobj.flush with bad params 118 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 119 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 120 121 @support.cpython_only 122 def test_overflow(self): 123 with self.assertRaisesRegex(OverflowError, 'int too large'): 124 zlib.decompress(b'', 15, sys.maxsize + 1) 125 with self.assertRaisesRegex(OverflowError, 'int too large'): 126 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 127 with self.assertRaisesRegex(OverflowError, 'int too large'): 128 zlib.decompressobj().flush(sys.maxsize + 1) 129 130 131class BaseCompressTestCase(object): 132 def check_big_compress_buffer(self, size, compress_func): 133 _1M = 1024 * 1024 134 # Generate 10 MiB worth of random, and expand it by repeating it. 135 # The assumption is that zlib's memory is not big enough to exploit 136 # such spread out redundancy. 137 data = random.randbytes(_1M * 10) 138 data = data * (size // len(data) + 1) 139 try: 140 compress_func(data) 141 finally: 142 # Release memory 143 data = None 144 145 def check_big_decompress_buffer(self, size, decompress_func): 146 data = b'x' * size 147 try: 148 compressed = zlib.compress(data, 1) 149 finally: 150 # Release memory 151 data = None 152 data = decompress_func(compressed) 153 # Sanity check 154 try: 155 self.assertEqual(len(data), size) 156 self.assertEqual(len(data.strip(b'x')), 0) 157 finally: 158 data = None 159 160 161class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 162 # Test compression in one go (whole message compression) 163 def test_speech(self): 164 x = zlib.compress(HAMLET_SCENE) 165 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 166 167 def test_keywords(self): 168 x = zlib.compress(HAMLET_SCENE, level=3) 169 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 170 with self.assertRaises(TypeError): 171 zlib.compress(data=HAMLET_SCENE, level=3) 172 self.assertEqual(zlib.decompress(x, 173 wbits=zlib.MAX_WBITS, 174 bufsize=zlib.DEF_BUF_SIZE), 175 HAMLET_SCENE) 176 177 def test_speech128(self): 178 # compress more data 179 data = HAMLET_SCENE * 128 180 x = zlib.compress(data) 181 self.assertEqual(zlib.compress(bytearray(data)), x) 182 for ob in x, bytearray(x): 183 self.assertEqual(zlib.decompress(ob), data) 184 185 def test_incomplete_stream(self): 186 # A useful error message is given 187 x = zlib.compress(HAMLET_SCENE) 188 self.assertRaisesRegex(zlib.error, 189 "Error -5 while decompressing data: incomplete or truncated stream", 190 zlib.decompress, x[:-1]) 191 192 # Memory use of the following functions takes into account overallocation 193 194 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 195 def test_big_compress_buffer(self, size): 196 compress = lambda s: zlib.compress(s, 1) 197 self.check_big_compress_buffer(size, compress) 198 199 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 200 def test_big_decompress_buffer(self, size): 201 self.check_big_decompress_buffer(size, zlib.decompress) 202 203 @bigmemtest(size=_4G, memuse=1) 204 def test_large_bufsize(self, size): 205 # Test decompress(bufsize) parameter greater than the internal limit 206 data = HAMLET_SCENE * 10 207 compressed = zlib.compress(data, 1) 208 self.assertEqual(zlib.decompress(compressed, 15, size), data) 209 210 def test_custom_bufsize(self): 211 data = HAMLET_SCENE * 10 212 compressed = zlib.compress(data, 1) 213 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 214 215 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 216 @bigmemtest(size=_4G + 100, memuse=4) 217 def test_64bit_compress(self, size): 218 data = b'x' * size 219 try: 220 comp = zlib.compress(data, 0) 221 self.assertEqual(zlib.decompress(comp), data) 222 finally: 223 comp = data = None 224 225 226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 227 # Test compression object 228 def test_pair(self): 229 # straightforward compress/decompress objects 230 datasrc = HAMLET_SCENE * 128 231 datazip = zlib.compress(datasrc) 232 # should compress both bytes and bytearray data 233 for data in (datasrc, bytearray(datasrc)): 234 co = zlib.compressobj() 235 x1 = co.compress(data) 236 x2 = co.flush() 237 self.assertRaises(zlib.error, co.flush) # second flush should not work 238 self.assertEqual(x1 + x2, datazip) 239 for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))): 240 dco = zlib.decompressobj() 241 y1 = dco.decompress(v1 + v2) 242 y2 = dco.flush() 243 self.assertEqual(data, y1 + y2) 244 self.assertIsInstance(dco.unconsumed_tail, bytes) 245 self.assertIsInstance(dco.unused_data, bytes) 246 247 def test_keywords(self): 248 level = 2 249 method = zlib.DEFLATED 250 wbits = -12 251 memLevel = 9 252 strategy = zlib.Z_FILTERED 253 co = zlib.compressobj(level=level, 254 method=method, 255 wbits=wbits, 256 memLevel=memLevel, 257 strategy=strategy, 258 zdict=b"") 259 do = zlib.decompressobj(wbits=wbits, zdict=b"") 260 with self.assertRaises(TypeError): 261 co.compress(data=HAMLET_SCENE) 262 with self.assertRaises(TypeError): 263 do.decompress(data=zlib.compress(HAMLET_SCENE)) 264 x = co.compress(HAMLET_SCENE) + co.flush() 265 y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush() 266 self.assertEqual(HAMLET_SCENE, y) 267 268 def test_compressoptions(self): 269 # specify lots of options to compressobj() 270 level = 2 271 method = zlib.DEFLATED 272 wbits = -12 273 memLevel = 9 274 strategy = zlib.Z_FILTERED 275 co = zlib.compressobj(level, method, wbits, memLevel, strategy) 276 x1 = co.compress(HAMLET_SCENE) 277 x2 = co.flush() 278 dco = zlib.decompressobj(wbits) 279 y1 = dco.decompress(x1 + x2) 280 y2 = dco.flush() 281 self.assertEqual(HAMLET_SCENE, y1 + y2) 282 283 def test_compressincremental(self): 284 # compress object in steps, decompress object as one-shot 285 data = HAMLET_SCENE * 128 286 co = zlib.compressobj() 287 bufs = [] 288 for i in range(0, len(data), 256): 289 bufs.append(co.compress(data[i:i+256])) 290 bufs.append(co.flush()) 291 combuf = b''.join(bufs) 292 293 dco = zlib.decompressobj() 294 y1 = dco.decompress(b''.join(bufs)) 295 y2 = dco.flush() 296 self.assertEqual(data, y1 + y2) 297 298 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 299 # compress object in steps, decompress object in steps 300 source = source or HAMLET_SCENE 301 data = source * 128 302 co = zlib.compressobj() 303 bufs = [] 304 for i in range(0, len(data), cx): 305 bufs.append(co.compress(data[i:i+cx])) 306 bufs.append(co.flush()) 307 combuf = b''.join(bufs) 308 309 decombuf = zlib.decompress(combuf) 310 # Test type of return value 311 self.assertIsInstance(decombuf, bytes) 312 313 self.assertEqual(data, decombuf) 314 315 dco = zlib.decompressobj() 316 bufs = [] 317 for i in range(0, len(combuf), dcx): 318 bufs.append(dco.decompress(combuf[i:i+dcx])) 319 self.assertEqual(b'', dco.unconsumed_tail, ######## 320 "(A) uct should be b'': not %d long" % 321 len(dco.unconsumed_tail)) 322 self.assertEqual(b'', dco.unused_data) 323 if flush: 324 bufs.append(dco.flush()) 325 else: 326 while True: 327 chunk = dco.decompress(b'') 328 if chunk: 329 bufs.append(chunk) 330 else: 331 break 332 self.assertEqual(b'', dco.unconsumed_tail, ######## 333 "(B) uct should be b'': not %d long" % 334 len(dco.unconsumed_tail)) 335 self.assertEqual(b'', dco.unused_data) 336 self.assertEqual(data, b''.join(bufs)) 337 # Failure means: "decompressobj with init options failed" 338 339 def test_decompincflush(self): 340 self.test_decompinc(flush=True) 341 342 def test_decompimax(self, source=None, cx=256, dcx=64): 343 # compress in steps, decompress in length-restricted steps 344 source = source or HAMLET_SCENE 345 # Check a decompression object with max_length specified 346 data = source * 128 347 co = zlib.compressobj() 348 bufs = [] 349 for i in range(0, len(data), cx): 350 bufs.append(co.compress(data[i:i+cx])) 351 bufs.append(co.flush()) 352 combuf = b''.join(bufs) 353 self.assertEqual(data, zlib.decompress(combuf), 354 'compressed data failure') 355 356 dco = zlib.decompressobj() 357 bufs = [] 358 cb = combuf 359 while cb: 360 #max_length = 1 + len(cb)//10 361 chunk = dco.decompress(cb, dcx) 362 self.assertFalse(len(chunk) > dcx, 363 'chunk too big (%d>%d)' % (len(chunk), dcx)) 364 bufs.append(chunk) 365 cb = dco.unconsumed_tail 366 bufs.append(dco.flush()) 367 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 368 369 def test_decompressmaxlen(self, flush=False): 370 # Check a decompression object with max_length specified 371 data = HAMLET_SCENE * 128 372 co = zlib.compressobj() 373 bufs = [] 374 for i in range(0, len(data), 256): 375 bufs.append(co.compress(data[i:i+256])) 376 bufs.append(co.flush()) 377 combuf = b''.join(bufs) 378 self.assertEqual(data, zlib.decompress(combuf), 379 'compressed data failure') 380 381 dco = zlib.decompressobj() 382 bufs = [] 383 cb = combuf 384 while cb: 385 max_length = 1 + len(cb)//10 386 chunk = dco.decompress(cb, max_length) 387 self.assertFalse(len(chunk) > max_length, 388 'chunk too big (%d>%d)' % (len(chunk),max_length)) 389 bufs.append(chunk) 390 cb = dco.unconsumed_tail 391 if flush: 392 bufs.append(dco.flush()) 393 else: 394 while chunk: 395 chunk = dco.decompress(b'', max_length) 396 self.assertFalse(len(chunk) > max_length, 397 'chunk too big (%d>%d)' % (len(chunk),max_length)) 398 bufs.append(chunk) 399 self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved') 400 401 def test_decompressmaxlenflush(self): 402 self.test_decompressmaxlen(flush=True) 403 404 def test_maxlenmisc(self): 405 # Misc tests of max_length 406 dco = zlib.decompressobj() 407 self.assertRaises(ValueError, dco.decompress, b"", -1) 408 self.assertEqual(b'', dco.unconsumed_tail) 409 410 def test_maxlen_large(self): 411 # Sizes up to sys.maxsize should be accepted, although zlib is 412 # internally limited to expressing sizes with unsigned int 413 data = HAMLET_SCENE * 10 414 self.assertGreater(len(data), zlib.DEF_BUF_SIZE) 415 compressed = zlib.compress(data, 1) 416 dco = zlib.decompressobj() 417 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 418 419 def test_maxlen_custom(self): 420 data = HAMLET_SCENE * 10 421 compressed = zlib.compress(data, 1) 422 dco = zlib.decompressobj() 423 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 424 425 def test_clear_unconsumed_tail(self): 426 # Issue #12050: calling decompress() without providing max_length 427 # should clear the unconsumed_tail attribute. 428 cdata = b"x\x9cKLJ\x06\x00\x02M\x01" # "abc" 429 dco = zlib.decompressobj() 430 ddata = dco.decompress(cdata, 1) 431 ddata += dco.decompress(dco.unconsumed_tail) 432 self.assertEqual(dco.unconsumed_tail, b"") 433 434 def test_flushes(self): 435 # Test flush() with the various options, using all the 436 # different levels in order to provide more variations. 437 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH', 438 'Z_PARTIAL_FLUSH'] 439 440 ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.')) 441 # Z_BLOCK has a known failure prior to 1.2.5.3 442 if ver >= (1, 2, 5, 3): 443 sync_opt.append('Z_BLOCK') 444 445 sync_opt = [getattr(zlib, opt) for opt in sync_opt 446 if hasattr(zlib, opt)] 447 data = HAMLET_SCENE * 8 448 449 for sync in sync_opt: 450 for level in range(10): 451 try: 452 obj = zlib.compressobj( level ) 453 a = obj.compress( data[:3000] ) 454 b = obj.flush( sync ) 455 c = obj.compress( data[3000:] ) 456 d = obj.flush() 457 except: 458 print("Error for flush mode={}, level={}" 459 .format(sync, level)) 460 raise 461 self.assertEqual(zlib.decompress(b''.join([a,b,c,d])), 462 data, ("Decompress failed: flush " 463 "mode=%i, level=%i") % (sync, level)) 464 del obj 465 466 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 467 'requires zlib.Z_SYNC_FLUSH') 468 def test_odd_flush(self): 469 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 470 import random 471 # Testing on 17K of "random" data 472 473 # Create compressor and decompressor objects 474 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 475 dco = zlib.decompressobj() 476 477 # Try 17K of data 478 # generate random data stream 479 try: 480 # In 2.3 and later, WichmannHill is the RNG of the bug report 481 gen = random.WichmannHill() 482 except AttributeError: 483 try: 484 # 2.2 called it Random 485 gen = random.Random() 486 except AttributeError: 487 # others might simply have a single RNG 488 gen = random 489 gen.seed(1) 490 data = gen.randbytes(17 * 1024) 491 492 # compress, sync-flush, and decompress 493 first = co.compress(data) 494 second = co.flush(zlib.Z_SYNC_FLUSH) 495 expanded = dco.decompress(first + second) 496 497 # if decompressed data is different from the input data, choke. 498 self.assertEqual(expanded, data, "17K random source doesn't match") 499 500 def test_empty_flush(self): 501 # Test that calling .flush() on unused objects works. 502 # (Bug #1083110 -- calling .flush() on decompress objects 503 # caused a core dump.) 504 505 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 506 self.assertTrue(co.flush()) # Returns a zlib header 507 dco = zlib.decompressobj() 508 self.assertEqual(dco.flush(), b"") # Returns nothing 509 510 def test_dictionary(self): 511 h = HAMLET_SCENE 512 # Build a simulated dictionary out of the words in HAMLET. 513 words = h.split() 514 random.shuffle(words) 515 zdict = b''.join(words) 516 # Use it to compress HAMLET. 517 co = zlib.compressobj(zdict=zdict) 518 cd = co.compress(h) + co.flush() 519 # Verify that it will decompress with the dictionary. 520 dco = zlib.decompressobj(zdict=zdict) 521 self.assertEqual(dco.decompress(cd) + dco.flush(), h) 522 # Verify that it fails when not given the dictionary. 523 dco = zlib.decompressobj() 524 self.assertRaises(zlib.error, dco.decompress, cd) 525 526 def test_dictionary_streaming(self): 527 # This simulates the reuse of a compressor object for compressing 528 # several separate data streams. 529 co = zlib.compressobj(zdict=HAMLET_SCENE) 530 do = zlib.decompressobj(zdict=HAMLET_SCENE) 531 piece = HAMLET_SCENE[1000:1500] 532 d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH) 533 d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH) 534 d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH) 535 self.assertEqual(do.decompress(d0), piece) 536 self.assertEqual(do.decompress(d1), piece[100:]) 537 self.assertEqual(do.decompress(d2), piece[:-100]) 538 539 def test_decompress_incomplete_stream(self): 540 # This is 'foo', deflated 541 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 542 # For the record 543 self.assertEqual(zlib.decompress(x), b'foo') 544 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 545 # Omitting the stream end works with decompressor objects 546 # (see issue #8672). 547 dco = zlib.decompressobj() 548 y = dco.decompress(x[:-5]) 549 y += dco.flush() 550 self.assertEqual(y, b'foo') 551 552 def test_decompress_eof(self): 553 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 554 dco = zlib.decompressobj() 555 self.assertFalse(dco.eof) 556 dco.decompress(x[:-5]) 557 self.assertFalse(dco.eof) 558 dco.decompress(x[-5:]) 559 self.assertTrue(dco.eof) 560 dco.flush() 561 self.assertTrue(dco.eof) 562 563 def test_decompress_eof_incomplete_stream(self): 564 x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' # 'foo' 565 dco = zlib.decompressobj() 566 self.assertFalse(dco.eof) 567 dco.decompress(x[:-5]) 568 self.assertFalse(dco.eof) 569 dco.flush() 570 self.assertFalse(dco.eof) 571 572 def test_decompress_unused_data(self): 573 # Repeated calls to decompress() after EOF should accumulate data in 574 # dco.unused_data, instead of just storing the arg to the last call. 575 source = b'abcdefghijklmnopqrstuvwxyz' 576 remainder = b'0123456789' 577 y = zlib.compress(source) 578 x = y + remainder 579 for maxlen in 0, 1000: 580 for step in 1, 2, len(y), len(x): 581 dco = zlib.decompressobj() 582 data = b'' 583 for i in range(0, len(x), step): 584 if i < len(y): 585 self.assertEqual(dco.unused_data, b'') 586 if maxlen == 0: 587 data += dco.decompress(x[i : i + step]) 588 self.assertEqual(dco.unconsumed_tail, b'') 589 else: 590 data += dco.decompress( 591 dco.unconsumed_tail + x[i : i + step], maxlen) 592 data += dco.flush() 593 self.assertTrue(dco.eof) 594 self.assertEqual(data, source) 595 self.assertEqual(dco.unconsumed_tail, b'') 596 self.assertEqual(dco.unused_data, remainder) 597 598 # issue27164 599 def test_decompress_raw_with_dictionary(self): 600 zdict = b'abcdefghijklmnopqrstuvwxyz' 601 co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 602 comp = co.compress(zdict) + co.flush() 603 dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict) 604 uncomp = dco.decompress(comp) + dco.flush() 605 self.assertEqual(zdict, uncomp) 606 607 def test_flush_with_freed_input(self): 608 # Issue #16411: decompressor accesses input to last decompress() call 609 # in flush(), even if this object has been freed in the meanwhile. 610 input1 = b'abcdefghijklmnopqrstuvwxyz' 611 input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM' 612 data = zlib.compress(input1) 613 dco = zlib.decompressobj() 614 dco.decompress(data, 1) 615 del data 616 data = zlib.compress(input2) 617 self.assertEqual(dco.flush(), input1[1:]) 618 619 @bigmemtest(size=_4G, memuse=1) 620 def test_flush_large_length(self, size): 621 # Test flush(length) parameter greater than internal limit UINT_MAX 622 input = HAMLET_SCENE * 10 623 data = zlib.compress(input, 1) 624 dco = zlib.decompressobj() 625 dco.decompress(data, 1) 626 self.assertEqual(dco.flush(size), input[1:]) 627 628 def test_flush_custom_length(self): 629 input = HAMLET_SCENE * 10 630 data = zlib.compress(input, 1) 631 dco = zlib.decompressobj() 632 dco.decompress(data, 1) 633 self.assertEqual(dco.flush(CustomInt()), input[1:]) 634 635 @requires_Compress_copy 636 def test_compresscopy(self): 637 # Test copying a compression object 638 data0 = HAMLET_SCENE 639 data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") 640 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 641 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 642 bufs0 = [] 643 bufs0.append(c0.compress(data0)) 644 645 c1 = func(c0) 646 bufs1 = bufs0[:] 647 648 bufs0.append(c0.compress(data0)) 649 bufs0.append(c0.flush()) 650 s0 = b''.join(bufs0) 651 652 bufs1.append(c1.compress(data1)) 653 bufs1.append(c1.flush()) 654 s1 = b''.join(bufs1) 655 656 self.assertEqual(zlib.decompress(s0),data0+data0) 657 self.assertEqual(zlib.decompress(s1),data0+data1) 658 659 @requires_Compress_copy 660 def test_badcompresscopy(self): 661 # Test copying a compression object in an inconsistent state 662 c = zlib.compressobj() 663 c.compress(HAMLET_SCENE) 664 c.flush() 665 self.assertRaises(ValueError, c.copy) 666 self.assertRaises(ValueError, copy.copy, c) 667 self.assertRaises(ValueError, copy.deepcopy, c) 668 669 @requires_Decompress_copy 670 def test_decompresscopy(self): 671 # Test copying a decompression object 672 data = HAMLET_SCENE 673 comp = zlib.compress(data) 674 # Test type of return value 675 self.assertIsInstance(comp, bytes) 676 677 for func in lambda c: c.copy(), copy.copy, copy.deepcopy: 678 d0 = zlib.decompressobj() 679 bufs0 = [] 680 bufs0.append(d0.decompress(comp[:32])) 681 682 d1 = func(d0) 683 bufs1 = bufs0[:] 684 685 bufs0.append(d0.decompress(comp[32:])) 686 s0 = b''.join(bufs0) 687 688 bufs1.append(d1.decompress(comp[32:])) 689 s1 = b''.join(bufs1) 690 691 self.assertEqual(s0,s1) 692 self.assertEqual(s0,data) 693 694 @requires_Decompress_copy 695 def test_baddecompresscopy(self): 696 # Test copying a compression object in an inconsistent state 697 data = zlib.compress(HAMLET_SCENE) 698 d = zlib.decompressobj() 699 d.decompress(data) 700 d.flush() 701 self.assertRaises(ValueError, d.copy) 702 self.assertRaises(ValueError, copy.copy, d) 703 self.assertRaises(ValueError, copy.deepcopy, d) 704 705 def test_compresspickle(self): 706 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 707 with self.assertRaises((TypeError, pickle.PicklingError)): 708 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 709 710 def test_decompresspickle(self): 711 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 712 with self.assertRaises((TypeError, pickle.PicklingError)): 713 pickle.dumps(zlib.decompressobj(), proto) 714 715 # Memory use of the following functions takes into account overallocation 716 717 @bigmemtest(size=_1G + 1024 * 1024, memuse=3) 718 def test_big_compress_buffer(self, size): 719 c = zlib.compressobj(1) 720 compress = lambda s: c.compress(s) + c.flush() 721 self.check_big_compress_buffer(size, compress) 722 723 @bigmemtest(size=_1G + 1024 * 1024, memuse=2) 724 def test_big_decompress_buffer(self, size): 725 d = zlib.decompressobj() 726 decompress = lambda s: d.decompress(s) + d.flush() 727 self.check_big_decompress_buffer(size, decompress) 728 729 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 730 @bigmemtest(size=_4G + 100, memuse=4) 731 def test_64bit_compress(self, size): 732 data = b'x' * size 733 co = zlib.compressobj(0) 734 do = zlib.decompressobj() 735 try: 736 comp = co.compress(data) + co.flush() 737 uncomp = do.decompress(comp) + do.flush() 738 self.assertEqual(uncomp, data) 739 finally: 740 comp = uncomp = data = None 741 742 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 743 @bigmemtest(size=_4G + 100, memuse=3) 744 def test_large_unused_data(self, size): 745 data = b'abcdefghijklmnop' 746 unused = b'x' * size 747 comp = zlib.compress(data) + unused 748 do = zlib.decompressobj() 749 try: 750 uncomp = do.decompress(comp) + do.flush() 751 self.assertEqual(unused, do.unused_data) 752 self.assertEqual(uncomp, data) 753 finally: 754 unused = comp = do = None 755 756 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 757 @bigmemtest(size=_4G + 100, memuse=5) 758 def test_large_unconsumed_tail(self, size): 759 data = b'x' * size 760 do = zlib.decompressobj() 761 try: 762 comp = zlib.compress(data, 0) 763 uncomp = do.decompress(comp, 1) + do.flush() 764 self.assertEqual(uncomp, data) 765 self.assertEqual(do.unconsumed_tail, b'') 766 finally: 767 comp = uncomp = data = None 768 769 def test_wbits(self): 770 # wbits=0 only supported since zlib v1.2.3.5 771 # Register "1.2.3" as "1.2.3.0" 772 # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux" 773 v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.') 774 if len(v) < 4: 775 v.append('0') 776 elif not v[-1].isnumeric(): 777 v[-1] = '0' 778 779 v = tuple(map(int, v)) 780 supports_wbits_0 = v >= (1, 2, 3, 5) 781 782 co = zlib.compressobj(level=1, wbits=15) 783 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 784 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 785 if supports_wbits_0: 786 self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE) 787 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 788 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 789 zlib.decompress(zlib15, 14) 790 dco = zlib.decompressobj(wbits=32 + 15) 791 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 792 dco = zlib.decompressobj(wbits=14) 793 with self.assertRaisesRegex(zlib.error, 'invalid window size'): 794 dco.decompress(zlib15) 795 796 co = zlib.compressobj(level=1, wbits=9) 797 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 798 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 799 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 800 if supports_wbits_0: 801 self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE) 802 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 803 dco = zlib.decompressobj(wbits=32 + 9) 804 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 805 806 co = zlib.compressobj(level=1, wbits=-15) 807 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 808 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 809 dco = zlib.decompressobj(wbits=-15) 810 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 811 812 co = zlib.compressobj(level=1, wbits=-9) 813 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 814 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 815 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 816 dco = zlib.decompressobj(wbits=-9) 817 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 818 819 co = zlib.compressobj(level=1, wbits=16 + 15) 820 gzip = co.compress(HAMLET_SCENE) + co.flush() 821 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 822 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 823 dco = zlib.decompressobj(32 + 15) 824 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 825 826 827def choose_lines(source, number, seed=None, generator=random): 828 """Return a list of number lines randomly chosen from the source""" 829 if seed is not None: 830 generator.seed(seed) 831 sources = source.split('\n') 832 return [generator.choice(sources) for n in range(number)] 833 834 835HAMLET_SCENE = b""" 836LAERTES 837 838 O, fear me not. 839 I stay too long: but here my father comes. 840 841 Enter POLONIUS 842 843 A double blessing is a double grace, 844 Occasion smiles upon a second leave. 845 846LORD POLONIUS 847 848 Yet here, Laertes! aboard, aboard, for shame! 849 The wind sits in the shoulder of your sail, 850 And you are stay'd for. There; my blessing with thee! 851 And these few precepts in thy memory 852 See thou character. Give thy thoughts no tongue, 853 Nor any unproportioned thought his act. 854 Be thou familiar, but by no means vulgar. 855 Those friends thou hast, and their adoption tried, 856 Grapple them to thy soul with hoops of steel; 857 But do not dull thy palm with entertainment 858 Of each new-hatch'd, unfledged comrade. Beware 859 Of entrance to a quarrel, but being in, 860 Bear't that the opposed may beware of thee. 861 Give every man thy ear, but few thy voice; 862 Take each man's censure, but reserve thy judgment. 863 Costly thy habit as thy purse can buy, 864 But not express'd in fancy; rich, not gaudy; 865 For the apparel oft proclaims the man, 866 And they in France of the best rank and station 867 Are of a most select and generous chief in that. 868 Neither a borrower nor a lender be; 869 For loan oft loses both itself and friend, 870 And borrowing dulls the edge of husbandry. 871 This above all: to thine ownself be true, 872 And it must follow, as the night the day, 873 Thou canst not then be false to any man. 874 Farewell: my blessing season this in thee! 875 876LAERTES 877 878 Most humbly do I take my leave, my lord. 879 880LORD POLONIUS 881 882 The time invites you; go; your servants tend. 883 884LAERTES 885 886 Farewell, Ophelia; and remember well 887 What I have said to you. 888 889OPHELIA 890 891 'Tis in my memory lock'd, 892 And you yourself shall keep the key of it. 893 894LAERTES 895 896 Farewell. 897""" 898 899 900class CustomInt: 901 def __index__(self): 902 return 100 903 904 905if __name__ == "__main__": 906 unittest.main() 907