1import unittest 2from test import test_support as support 3from test.test_support import TESTFN, run_unittest, import_module, unlink, requires 4import binascii 5import pickle 6import random 7from test.test_support import precisionbigmemtest, _1G, _4G 8import sys 9 10try: 11 import mmap 12except ImportError: 13 mmap = None 14 15zlib = import_module('zlib') 16 17requires_Compress_copy = unittest.skipUnless( 18 hasattr(zlib.compressobj(), "copy"), 19 'requires Compress.copy()') 20requires_Decompress_copy = unittest.skipUnless( 21 hasattr(zlib.decompressobj(), "copy"), 22 'requires Decompress.copy()') 23 24 25class ChecksumTestCase(unittest.TestCase): 26 # checksum test cases 27 def test_crc32start(self): 28 self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) 29 self.assertTrue(zlib.crc32("abc", 0xffffffff)) 30 31 def test_crc32empty(self): 32 self.assertEqual(zlib.crc32("", 0), 0) 33 self.assertEqual(zlib.crc32("", 1), 1) 34 self.assertEqual(zlib.crc32("", 432), 432) 35 36 def test_adler32start(self): 37 self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) 38 self.assertTrue(zlib.adler32("abc", 0xffffffff)) 39 40 def test_adler32empty(self): 41 self.assertEqual(zlib.adler32("", 0), 0) 42 self.assertEqual(zlib.adler32("", 1), 1) 43 self.assertEqual(zlib.adler32("", 432), 432) 44 45 def assertEqual32(self, seen, expected): 46 # 32-bit values masked -- checksums on 32- vs 64- bit machines 47 # This is important if bit 31 (0x08000000L) is set. 48 self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) 49 50 def test_penguins(self): 51 self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) 52 self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) 53 self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) 54 self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) 55 56 self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) 57 self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) 58 59 def test_abcdefghijklmnop(self): 60 """test issue1202 compliance: signed crc32, adler32 in 2.x""" 61 foo = 'abcdefghijklmnop' 62 # explicitly test signed behavior 63 self.assertEqual(zlib.crc32(foo), -1808088941) 64 self.assertEqual(zlib.crc32('spam'), 1138425661) 65 self.assertEqual(zlib.adler32(foo+foo), -721416943) 66 self.assertEqual(zlib.adler32('spam'), 72286642) 67 68 def test_same_as_binascii_crc32(self): 69 foo = 'abcdefghijklmnop' 70 self.assertEqual(binascii.crc32(foo), zlib.crc32(foo)) 71 self.assertEqual(binascii.crc32('spam'), zlib.crc32('spam')) 72 73 def test_negative_crc_iv_input(self): 74 # The range of valid input values for the crc state should be 75 # -2**31 through 2**32-1 to allow inputs artifically constrained 76 # to a signed 32-bit integer. 77 self.assertEqual(zlib.crc32('ham', -1), zlib.crc32('ham', 0xffffffffL)) 78 self.assertEqual(zlib.crc32('spam', -3141593), 79 zlib.crc32('spam', 0xffd01027L)) 80 self.assertEqual(zlib.crc32('spam', -(2**31)), 81 zlib.crc32('spam', (2**31))) 82 83 84# Issue #10276 - check that inputs >=4GB are handled correctly. 85class ChecksumBigBufferTestCase(unittest.TestCase): 86 87 @precisionbigmemtest(size=_4G + 4, memuse=1, dry_run=False) 88 def test_big_buffer(self, size): 89 data = b"nyan" * (_1G + 1) 90 self.assertEqual(zlib.crc32(data) & 0xFFFFFFFF, 1044521549) 91 self.assertEqual(zlib.adler32(data) & 0xFFFFFFFF, 2256789997) 92 93 94class ExceptionTestCase(unittest.TestCase): 95 # make sure we generate some expected errors 96 def test_badlevel(self): 97 # specifying compression level out of range causes an error 98 # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib 99 # accepts 0 too) 100 self.assertRaises(zlib.error, zlib.compress, 'ERROR', 10) 101 102 def test_badcompressobj(self): 103 # verify failure on building compress object with bad params 104 self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) 105 # specifying total bits too large causes an error 106 self.assertRaises(ValueError, 107 zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1) 108 109 def test_baddecompressobj(self): 110 # verify failure on building decompress object with bad params 111 self.assertRaises(ValueError, zlib.decompressobj, -1) 112 113 def test_decompressobj_badflush(self): 114 # verify failure on calling decompressobj.flush with bad params 115 self.assertRaises(ValueError, zlib.decompressobj().flush, 0) 116 self.assertRaises(ValueError, zlib.decompressobj().flush, -1) 117 118 @support.cpython_only 119 def test_overflow(self): 120 with self.assertRaisesRegexp(OverflowError, 'int too large'): 121 zlib.decompress(b'', 15, sys.maxsize + 1) 122 with self.assertRaisesRegexp(OverflowError, 'int too large'): 123 zlib.decompressobj().decompress(b'', sys.maxsize + 1) 124 with self.assertRaisesRegexp(OverflowError, 'int too large'): 125 zlib.decompressobj().flush(sys.maxsize + 1) 126 127 128class BaseCompressTestCase(object): 129 def check_big_compress_buffer(self, size, compress_func): 130 _1M = 1024 * 1024 131 fmt = "%%0%dx" % (2 * _1M) 132 # Generate 10MB worth of random, and expand it by repeating it. 133 # The assumption is that zlib's memory is not big enough to exploit 134 # such spread out redundancy. 135 data = ''.join([binascii.a2b_hex(fmt % random.getrandbits(8 * _1M)) 136 for i in range(10)]) 137 data = data * (size // len(data) + 1) 138 try: 139 compress_func(data) 140 finally: 141 # Release memory 142 data = None 143 144 def check_big_decompress_buffer(self, size, decompress_func): 145 data = 'x' * size 146 try: 147 compressed = zlib.compress(data, 1) 148 finally: 149 # Release memory 150 data = None 151 data = decompress_func(compressed) 152 # Sanity check 153 try: 154 self.assertEqual(len(data), size) 155 self.assertEqual(len(data.strip('x')), 0) 156 finally: 157 data = None 158 159 160class CompressTestCase(BaseCompressTestCase, unittest.TestCase): 161 # Test compression in one go (whole message compression) 162 def test_speech(self): 163 x = zlib.compress(HAMLET_SCENE) 164 self.assertEqual(zlib.decompress(x), HAMLET_SCENE) 165 166 def test_speech128(self): 167 # compress more data 168 data = HAMLET_SCENE * 128 169 x = zlib.compress(data) 170 self.assertEqual(zlib.decompress(x), data) 171 172 def test_incomplete_stream(self): 173 # A useful error message is given 174 x = zlib.compress(HAMLET_SCENE) 175 self.assertRaisesRegexp(zlib.error, 176 "Error -5 while decompressing data: incomplete or truncated stream", 177 zlib.decompress, x[:-1]) 178 179 # Memory use of the following functions takes into account overallocation 180 181 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 182 def test_big_compress_buffer(self, size): 183 compress = lambda s: zlib.compress(s, 1) 184 self.check_big_compress_buffer(size, compress) 185 186 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 187 def test_big_decompress_buffer(self, size): 188 self.check_big_decompress_buffer(size, zlib.decompress) 189 190 @precisionbigmemtest(size=_4G, memuse=1) 191 def test_large_bufsize(self, size): 192 # Test decompress(bufsize) parameter greater than the internal limit 193 data = HAMLET_SCENE * 10 194 compressed = zlib.compress(data, 1) 195 self.assertEqual(zlib.decompress(compressed, 15, size), data) 196 197 def test_custom_bufsize(self): 198 data = HAMLET_SCENE * 10 199 compressed = zlib.compress(data, 1) 200 self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data) 201 202 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 203 @precisionbigmemtest(size=_4G + 100, memuse=4) 204 def test_64bit_compress(self, size): 205 data = b'x' * size 206 try: 207 comp = zlib.compress(data, 0) 208 self.assertEqual(zlib.decompress(comp), data) 209 finally: 210 comp = data = None 211 212 213class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): 214 # Test compression object 215 def test_pair(self): 216 # straightforward compress/decompress objects 217 data = HAMLET_SCENE * 128 218 co = zlib.compressobj() 219 x1 = co.compress(data) 220 x2 = co.flush() 221 self.assertRaises(zlib.error, co.flush) # second flush should not work 222 dco = zlib.decompressobj() 223 y1 = dco.decompress(x1 + x2) 224 y2 = dco.flush() 225 self.assertEqual(data, y1 + y2) 226 227 def test_compressoptions(self): 228 # specify lots of options to compressobj() 229 level = 2 230 method = zlib.DEFLATED 231 wbits = -12 232 memlevel = 9 233 strategy = zlib.Z_FILTERED 234 co = zlib.compressobj(level, method, wbits, memlevel, strategy) 235 x1 = co.compress(HAMLET_SCENE) 236 x2 = co.flush() 237 dco = zlib.decompressobj(wbits) 238 y1 = dco.decompress(x1 + x2) 239 y2 = dco.flush() 240 self.assertEqual(HAMLET_SCENE, y1 + y2) 241 242 def test_compressincremental(self): 243 # compress object in steps, decompress object as one-shot 244 data = HAMLET_SCENE * 128 245 co = zlib.compressobj() 246 bufs = [] 247 for i in range(0, len(data), 256): 248 bufs.append(co.compress(data[i:i+256])) 249 bufs.append(co.flush()) 250 combuf = ''.join(bufs) 251 252 dco = zlib.decompressobj() 253 y1 = dco.decompress(''.join(bufs)) 254 y2 = dco.flush() 255 self.assertEqual(data, y1 + y2) 256 257 def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): 258 # compress object in steps, decompress object in steps 259 source = source or HAMLET_SCENE 260 data = source * 128 261 co = zlib.compressobj() 262 bufs = [] 263 for i in range(0, len(data), cx): 264 bufs.append(co.compress(data[i:i+cx])) 265 bufs.append(co.flush()) 266 combuf = ''.join(bufs) 267 268 self.assertEqual(data, zlib.decompress(combuf)) 269 270 dco = zlib.decompressobj() 271 bufs = [] 272 for i in range(0, len(combuf), dcx): 273 bufs.append(dco.decompress(combuf[i:i+dcx])) 274 self.assertEqual('', dco.unconsumed_tail, ######## 275 "(A) uct should be '': not %d long" % 276 len(dco.unconsumed_tail)) 277 if flush: 278 bufs.append(dco.flush()) 279 else: 280 while True: 281 chunk = dco.decompress('') 282 if chunk: 283 bufs.append(chunk) 284 else: 285 break 286 self.assertEqual('', dco.unconsumed_tail, ######## 287 "(B) uct should be '': not %d long" % 288 len(dco.unconsumed_tail)) 289 self.assertEqual(data, ''.join(bufs)) 290 # Failure means: "decompressobj with init options failed" 291 292 def test_decompincflush(self): 293 self.test_decompinc(flush=True) 294 295 def test_decompimax(self, source=None, cx=256, dcx=64): 296 # compress in steps, decompress in length-restricted steps 297 source = source or HAMLET_SCENE 298 # Check a decompression object with max_length specified 299 data = source * 128 300 co = zlib.compressobj() 301 bufs = [] 302 for i in range(0, len(data), cx): 303 bufs.append(co.compress(data[i:i+cx])) 304 bufs.append(co.flush()) 305 combuf = ''.join(bufs) 306 self.assertEqual(data, zlib.decompress(combuf), 307 'compressed data failure') 308 309 dco = zlib.decompressobj() 310 bufs = [] 311 cb = combuf 312 while cb: 313 #max_length = 1 + len(cb)//10 314 chunk = dco.decompress(cb, dcx) 315 self.assertFalse(len(chunk) > dcx, 316 'chunk too big (%d>%d)' % (len(chunk), dcx)) 317 bufs.append(chunk) 318 cb = dco.unconsumed_tail 319 bufs.append(dco.flush()) 320 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 321 322 def test_decompressmaxlen(self, flush=False): 323 # Check a decompression object with max_length specified 324 data = HAMLET_SCENE * 128 325 co = zlib.compressobj() 326 bufs = [] 327 for i in range(0, len(data), 256): 328 bufs.append(co.compress(data[i:i+256])) 329 bufs.append(co.flush()) 330 combuf = ''.join(bufs) 331 self.assertEqual(data, zlib.decompress(combuf), 332 'compressed data failure') 333 334 dco = zlib.decompressobj() 335 bufs = [] 336 cb = combuf 337 while cb: 338 max_length = 1 + len(cb)//10 339 chunk = dco.decompress(cb, max_length) 340 self.assertFalse(len(chunk) > max_length, 341 'chunk too big (%d>%d)' % (len(chunk),max_length)) 342 bufs.append(chunk) 343 cb = dco.unconsumed_tail 344 if flush: 345 bufs.append(dco.flush()) 346 else: 347 while chunk: 348 chunk = dco.decompress('', max_length) 349 self.assertFalse(len(chunk) > max_length, 350 'chunk too big (%d>%d)' % (len(chunk),max_length)) 351 bufs.append(chunk) 352 self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') 353 354 def test_decompressmaxlenflush(self): 355 self.test_decompressmaxlen(flush=True) 356 357 def test_maxlenmisc(self): 358 # Misc tests of max_length 359 dco = zlib.decompressobj() 360 self.assertRaises(ValueError, dco.decompress, "", -1) 361 self.assertEqual('', dco.unconsumed_tail) 362 363 def test_maxlen_large(self): 364 # Sizes up to sys.maxsize should be accepted, although zlib is 365 # internally limited to expressing sizes with unsigned int 366 data = HAMLET_SCENE * 10 367 DEFAULTALLOC = 16 * 1024 368 self.assertGreater(len(data), DEFAULTALLOC) 369 compressed = zlib.compress(data, 1) 370 dco = zlib.decompressobj() 371 self.assertEqual(dco.decompress(compressed, sys.maxsize), data) 372 373 def test_maxlen_custom(self): 374 data = HAMLET_SCENE * 10 375 compressed = zlib.compress(data, 1) 376 dco = zlib.decompressobj() 377 self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100]) 378 379 def test_clear_unconsumed_tail(self): 380 # Issue #12050: calling decompress() without providing max_length 381 # should clear the unconsumed_tail attribute. 382 cdata = "x\x9cKLJ\x06\x00\x02M\x01" # "abc" 383 dco = zlib.decompressobj() 384 ddata = dco.decompress(cdata, 1) 385 ddata += dco.decompress(dco.unconsumed_tail) 386 self.assertEqual(dco.unconsumed_tail, "") 387 388 def test_flushes(self): 389 # Test flush() with the various options, using all the 390 # different levels in order to provide more variations. 391 sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] 392 sync_opt = [getattr(zlib, opt) for opt in sync_opt 393 if hasattr(zlib, opt)] 394 data = HAMLET_SCENE * 8 395 396 for sync in sync_opt: 397 for level in range(10): 398 obj = zlib.compressobj( level ) 399 a = obj.compress( data[:3000] ) 400 b = obj.flush( sync ) 401 c = obj.compress( data[3000:] ) 402 d = obj.flush() 403 self.assertEqual(zlib.decompress(''.join([a,b,c,d])), 404 data, ("Decompress failed: flush " 405 "mode=%i, level=%i") % (sync, level)) 406 del obj 407 408 @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'), 409 'requires zlib.Z_SYNC_FLUSH') 410 def test_odd_flush(self): 411 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 412 import random 413 # Testing on 17K of "random" data 414 415 # Create compressor and decompressor objects 416 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 417 dco = zlib.decompressobj() 418 419 # Try 17K of data 420 # generate random data stream 421 try: 422 # In 2.3 and later, WichmannHill is the RNG of the bug report 423 gen = random.WichmannHill() 424 except AttributeError: 425 try: 426 # 2.2 called it Random 427 gen = random.Random() 428 except AttributeError: 429 # others might simply have a single RNG 430 gen = random 431 gen.seed(1) 432 data = genblock(1, 17 * 1024, generator=gen) 433 434 # compress, sync-flush, and decompress 435 first = co.compress(data) 436 second = co.flush(zlib.Z_SYNC_FLUSH) 437 expanded = dco.decompress(first + second) 438 439 # if decompressed data is different from the input data, choke. 440 self.assertEqual(expanded, data, "17K random source doesn't match") 441 442 def test_empty_flush(self): 443 # Test that calling .flush() on unused objects works. 444 # (Bug #1083110 -- calling .flush() on decompress objects 445 # caused a core dump.) 446 447 co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 448 self.assertTrue(co.flush()) # Returns a zlib header 449 dco = zlib.decompressobj() 450 self.assertEqual(dco.flush(), "") # Returns nothing 451 452 def test_decompress_incomplete_stream(self): 453 # This is 'foo', deflated 454 x = 'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E' 455 # For the record 456 self.assertEqual(zlib.decompress(x), 'foo') 457 self.assertRaises(zlib.error, zlib.decompress, x[:-5]) 458 # Omitting the stream end works with decompressor objects 459 # (see issue #8672). 460 dco = zlib.decompressobj() 461 y = dco.decompress(x[:-5]) 462 y += dco.flush() 463 self.assertEqual(y, 'foo') 464 465 def test_flush_with_freed_input(self): 466 # Issue #16411: decompressor accesses input to last decompress() call 467 # in flush(), even if this object has been freed in the meanwhile. 468 input1 = 'abcdefghijklmnopqrstuvwxyz' 469 input2 = 'QWERTYUIOPASDFGHJKLZXCVBNM' 470 data = zlib.compress(input1) 471 dco = zlib.decompressobj() 472 dco.decompress(data, 1) 473 del data 474 data = zlib.compress(input2) 475 self.assertEqual(dco.flush(), input1[1:]) 476 477 @precisionbigmemtest(size=_4G, memuse=1) 478 def test_flush_large_length(self, size): 479 # Test flush(length) parameter greater than internal limit UINT_MAX 480 input = HAMLET_SCENE * 10 481 data = zlib.compress(input, 1) 482 dco = zlib.decompressobj() 483 dco.decompress(data, 1) 484 self.assertEqual(dco.flush(size), input[1:]) 485 486 def test_flush_custom_length(self): 487 input = HAMLET_SCENE * 10 488 data = zlib.compress(input, 1) 489 dco = zlib.decompressobj() 490 dco.decompress(data, 1) 491 self.assertEqual(dco.flush(CustomInt()), input[1:]) 492 493 @requires_Compress_copy 494 def test_compresscopy(self): 495 # Test copying a compression object 496 data0 = HAMLET_SCENE 497 data1 = HAMLET_SCENE.swapcase() 498 c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) 499 bufs0 = [] 500 bufs0.append(c0.compress(data0)) 501 502 c1 = c0.copy() 503 bufs1 = bufs0[:] 504 505 bufs0.append(c0.compress(data0)) 506 bufs0.append(c0.flush()) 507 s0 = ''.join(bufs0) 508 509 bufs1.append(c1.compress(data1)) 510 bufs1.append(c1.flush()) 511 s1 = ''.join(bufs1) 512 513 self.assertEqual(zlib.decompress(s0),data0+data0) 514 self.assertEqual(zlib.decompress(s1),data0+data1) 515 516 @requires_Compress_copy 517 def test_badcompresscopy(self): 518 # Test copying a compression object in an inconsistent state 519 c = zlib.compressobj() 520 c.compress(HAMLET_SCENE) 521 c.flush() 522 self.assertRaises(ValueError, c.copy) 523 524 def test_decompress_unused_data(self): 525 # Repeated calls to decompress() after EOF should accumulate data in 526 # dco.unused_data, instead of just storing the arg to the last call. 527 source = b'abcdefghijklmnopqrstuvwxyz' 528 remainder = b'0123456789' 529 y = zlib.compress(source) 530 x = y + remainder 531 for maxlen in 0, 1000: 532 for step in 1, 2, len(y), len(x): 533 dco = zlib.decompressobj() 534 data = b'' 535 for i in range(0, len(x), step): 536 if i < len(y): 537 self.assertEqual(dco.unused_data, b'') 538 if maxlen == 0: 539 data += dco.decompress(x[i : i + step]) 540 self.assertEqual(dco.unconsumed_tail, b'') 541 else: 542 data += dco.decompress( 543 dco.unconsumed_tail + x[i : i + step], maxlen) 544 data += dco.flush() 545 self.assertEqual(data, source) 546 self.assertEqual(dco.unconsumed_tail, b'') 547 self.assertEqual(dco.unused_data, remainder) 548 549 @requires_Decompress_copy 550 def test_decompresscopy(self): 551 # Test copying a decompression object 552 data = HAMLET_SCENE 553 comp = zlib.compress(data) 554 555 d0 = zlib.decompressobj() 556 bufs0 = [] 557 bufs0.append(d0.decompress(comp[:32])) 558 559 d1 = d0.copy() 560 bufs1 = bufs0[:] 561 562 bufs0.append(d0.decompress(comp[32:])) 563 s0 = ''.join(bufs0) 564 565 bufs1.append(d1.decompress(comp[32:])) 566 s1 = ''.join(bufs1) 567 568 self.assertEqual(s0,s1) 569 self.assertEqual(s0,data) 570 571 @requires_Decompress_copy 572 def test_baddecompresscopy(self): 573 # Test copying a compression object in an inconsistent state 574 data = zlib.compress(HAMLET_SCENE) 575 d = zlib.decompressobj() 576 d.decompress(data) 577 d.flush() 578 self.assertRaises(ValueError, d.copy) 579 580 def test_compresspickle(self): 581 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 582 with self.assertRaises((TypeError, pickle.PicklingError)): 583 pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto) 584 585 def test_decompresspickle(self): 586 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 587 with self.assertRaises((TypeError, pickle.PicklingError)): 588 pickle.dumps(zlib.decompressobj(), proto) 589 590 # Memory use of the following functions takes into account overallocation 591 592 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=3) 593 def test_big_compress_buffer(self, size): 594 c = zlib.compressobj(1) 595 compress = lambda s: c.compress(s) + c.flush() 596 self.check_big_compress_buffer(size, compress) 597 598 @precisionbigmemtest(size=_1G + 1024 * 1024, memuse=2) 599 def test_big_decompress_buffer(self, size): 600 d = zlib.decompressobj() 601 decompress = lambda s: d.decompress(s) + d.flush() 602 self.check_big_decompress_buffer(size, decompress) 603 604 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 605 @precisionbigmemtest(size=_4G + 100, memuse=4) 606 def test_64bit_compress(self, size): 607 data = b'x' * size 608 co = zlib.compressobj(0) 609 do = zlib.decompressobj() 610 try: 611 comp = co.compress(data) + co.flush() 612 uncomp = do.decompress(comp) + do.flush() 613 self.assertEqual(uncomp, data) 614 finally: 615 comp = uncomp = data = None 616 617 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 618 @precisionbigmemtest(size=_4G + 100, memuse=3) 619 def test_large_unused_data(self, size): 620 data = b'abcdefghijklmnop' 621 unused = b'x' * size 622 comp = zlib.compress(data) + unused 623 do = zlib.decompressobj() 624 try: 625 uncomp = do.decompress(comp) + do.flush() 626 self.assertEqual(unused, do.unused_data) 627 self.assertEqual(uncomp, data) 628 finally: 629 unused = comp = do = None 630 631 @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 632 @precisionbigmemtest(size=_4G + 100, memuse=5) 633 def test_large_unconsumed_tail(self, size): 634 data = b'x' * size 635 do = zlib.decompressobj() 636 try: 637 comp = zlib.compress(data, 0) 638 uncomp = do.decompress(comp, 1) + do.flush() 639 self.assertEqual(uncomp, data) 640 self.assertEqual(do.unconsumed_tail, b'') 641 finally: 642 comp = uncomp = data = None 643 644 def test_wbits(self): 645 co = zlib.compressobj(1, zlib.DEFLATED, 15) 646 zlib15 = co.compress(HAMLET_SCENE) + co.flush() 647 self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE) 648 self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE) 649 with self.assertRaisesRegexp(zlib.error, 'invalid window size'): 650 zlib.decompress(zlib15, 14) 651 dco = zlib.decompressobj(32 + 15) 652 self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE) 653 dco = zlib.decompressobj(14) 654 with self.assertRaisesRegexp(zlib.error, 'invalid window size'): 655 dco.decompress(zlib15) 656 657 co = zlib.compressobj(1, zlib.DEFLATED, 9) 658 zlib9 = co.compress(HAMLET_SCENE) + co.flush() 659 self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE) 660 self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE) 661 self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE) 662 dco = zlib.decompressobj(32 + 9) 663 self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE) 664 665 co = zlib.compressobj(1, zlib.DEFLATED, -15) 666 deflate15 = co.compress(HAMLET_SCENE) + co.flush() 667 self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE) 668 dco = zlib.decompressobj(-15) 669 self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE) 670 671 co = zlib.compressobj(1, zlib.DEFLATED, -9) 672 deflate9 = co.compress(HAMLET_SCENE) + co.flush() 673 self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE) 674 self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE) 675 dco = zlib.decompressobj(-9) 676 self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE) 677 678 co = zlib.compressobj(1, zlib.DEFLATED, 16 + 15) 679 gzip = co.compress(HAMLET_SCENE) + co.flush() 680 self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE) 681 self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE) 682 dco = zlib.decompressobj(32 + 15) 683 self.assertEqual(dco.decompress(gzip), HAMLET_SCENE) 684 685 686def genblock(seed, length, step=1024, generator=random): 687 """length-byte stream of random data from a seed (in step-byte blocks).""" 688 if seed is not None: 689 generator.seed(seed) 690 randint = generator.randint 691 if length < step or step < 2: 692 step = length 693 blocks = [] 694 for i in range(0, length, step): 695 blocks.append(''.join([chr(randint(0,255)) 696 for x in range(step)])) 697 return ''.join(blocks)[:length] 698 699 700 701def choose_lines(source, number, seed=None, generator=random): 702 """Return a list of number lines randomly chosen from the source""" 703 if seed is not None: 704 generator.seed(seed) 705 sources = source.split('\n') 706 return [generator.choice(sources) for n in range(number)] 707 708 709 710HAMLET_SCENE = """ 711LAERTES 712 713 O, fear me not. 714 I stay too long: but here my father comes. 715 716 Enter POLONIUS 717 718 A double blessing is a double grace, 719 Occasion smiles upon a second leave. 720 721LORD POLONIUS 722 723 Yet here, Laertes! aboard, aboard, for shame! 724 The wind sits in the shoulder of your sail, 725 And you are stay'd for. There; my blessing with thee! 726 And these few precepts in thy memory 727 See thou character. Give thy thoughts no tongue, 728 Nor any unproportioned thought his act. 729 Be thou familiar, but by no means vulgar. 730 Those friends thou hast, and their adoption tried, 731 Grapple them to thy soul with hoops of steel; 732 But do not dull thy palm with entertainment 733 Of each new-hatch'd, unfledged comrade. Beware 734 Of entrance to a quarrel, but being in, 735 Bear't that the opposed may beware of thee. 736 Give every man thy ear, but few thy voice; 737 Take each man's censure, but reserve thy judgment. 738 Costly thy habit as thy purse can buy, 739 But not express'd in fancy; rich, not gaudy; 740 For the apparel oft proclaims the man, 741 And they in France of the best rank and station 742 Are of a most select and generous chief in that. 743 Neither a borrower nor a lender be; 744 For loan oft loses both itself and friend, 745 And borrowing dulls the edge of husbandry. 746 This above all: to thine ownself be true, 747 And it must follow, as the night the day, 748 Thou canst not then be false to any man. 749 Farewell: my blessing season this in thee! 750 751LAERTES 752 753 Most humbly do I take my leave, my lord. 754 755LORD POLONIUS 756 757 The time invites you; go; your servants tend. 758 759LAERTES 760 761 Farewell, Ophelia; and remember well 762 What I have said to you. 763 764OPHELIA 765 766 'Tis in my memory lock'd, 767 And you yourself shall keep the key of it. 768 769LAERTES 770 771 Farewell. 772""" 773 774 775class CustomInt: 776 def __int__(self): 777 return 100 778 779 780def test_main(): 781 run_unittest( 782 ChecksumTestCase, 783 ChecksumBigBufferTestCase, 784 ExceptionTestCase, 785 CompressTestCase, 786 CompressObjectTestCase 787 ) 788 789if __name__ == "__main__": 790 test_main() 791