• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3import binascii
4import copy
5import pickle
6import random
7import sys
8from test.support import bigmemtest, _1G, _4G
9
10zlib = support.import_module('zlib')
11
12requires_Compress_copy = unittest.skipUnless(
13        hasattr(zlib.compressobj(), "copy"),
14        'requires Compress.copy()')
15requires_Decompress_copy = unittest.skipUnless(
16        hasattr(zlib.decompressobj(), "copy"),
17        'requires Decompress.copy()')
18
19
20class VersionTestCase(unittest.TestCase):
21
22    def test_library_version(self):
23        # Test that the major version of the actual library in use matches the
24        # major version that we were compiled against. We can't guarantee that
25        # the minor versions will match (even on the machine on which the module
26        # was compiled), and the API is stable between minor versions, so
27        # testing only the major versions avoids spurious failures.
28        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
29
30
31class ChecksumTestCase(unittest.TestCase):
32    # checksum test cases
33    def test_crc32start(self):
34        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
35        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
36
37    def test_crc32empty(self):
38        self.assertEqual(zlib.crc32(b"", 0), 0)
39        self.assertEqual(zlib.crc32(b"", 1), 1)
40        self.assertEqual(zlib.crc32(b"", 432), 432)
41
42    def test_adler32start(self):
43        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
44        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
45
46    def test_adler32empty(self):
47        self.assertEqual(zlib.adler32(b"", 0), 0)
48        self.assertEqual(zlib.adler32(b"", 1), 1)
49        self.assertEqual(zlib.adler32(b"", 432), 432)
50
51    def test_penguins(self):
52        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
53        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
54        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
55        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
56
57        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
58        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
59
60    def test_crc32_adler32_unsigned(self):
61        foo = b'abcdefghijklmnop'
62        # explicitly test signed behavior
63        self.assertEqual(zlib.crc32(foo), 2486878355)
64        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
65        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
66        self.assertEqual(zlib.adler32(b'spam'), 72286642)
67
68    def test_same_as_binascii_crc32(self):
69        foo = b'abcdefghijklmnop'
70        crc = 2486878355
71        self.assertEqual(binascii.crc32(foo), crc)
72        self.assertEqual(zlib.crc32(foo), crc)
73        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
74
75
76# Issue #10276 - check that inputs >=4 GiB are handled correctly.
77class ChecksumBigBufferTestCase(unittest.TestCase):
78
79    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
80    def test_big_buffer(self, size):
81        data = b"nyan" * (_1G + 1)
82        self.assertEqual(zlib.crc32(data), 1044521549)
83        self.assertEqual(zlib.adler32(data), 2256789997)
84
85
86class ExceptionTestCase(unittest.TestCase):
87    # make sure we generate some expected errors
88    def test_badlevel(self):
89        # specifying compression level out of range causes an error
90        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
91        # accepts 0 too)
92        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
93
94    def test_badargs(self):
95        self.assertRaises(TypeError, zlib.adler32)
96        self.assertRaises(TypeError, zlib.crc32)
97        self.assertRaises(TypeError, zlib.compress)
98        self.assertRaises(TypeError, zlib.decompress)
99        for arg in (42, None, '', 'abc', (), []):
100            self.assertRaises(TypeError, zlib.adler32, arg)
101            self.assertRaises(TypeError, zlib.crc32, arg)
102            self.assertRaises(TypeError, zlib.compress, arg)
103            self.assertRaises(TypeError, zlib.decompress, arg)
104
105    def test_badcompressobj(self):
106        # verify failure on building compress object with bad params
107        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
108        # specifying total bits too large causes an error
109        self.assertRaises(ValueError,
110                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
111
112    def test_baddecompressobj(self):
113        # verify failure on building decompress object with bad params
114        self.assertRaises(ValueError, zlib.decompressobj, -1)
115
116    def test_decompressobj_badflush(self):
117        # verify failure on calling decompressobj.flush with bad params
118        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
119        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
120
121    @support.cpython_only
122    def test_overflow(self):
123        with self.assertRaisesRegex(OverflowError, 'int too large'):
124            zlib.decompress(b'', 15, sys.maxsize + 1)
125        with self.assertRaisesRegex(OverflowError, 'int too large'):
126            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
127        with self.assertRaisesRegex(OverflowError, 'int too large'):
128            zlib.decompressobj().flush(sys.maxsize + 1)
129
130
131class BaseCompressTestCase(object):
132    def check_big_compress_buffer(self, size, compress_func):
133        _1M = 1024 * 1024
134        # Generate 10 MiB worth of random, and expand it by repeating it.
135        # The assumption is that zlib's memory is not big enough to exploit
136        # such spread out redundancy.
137        data = random.randbytes(_1M * 10)
138        data = data * (size // len(data) + 1)
139        try:
140            compress_func(data)
141        finally:
142            # Release memory
143            data = None
144
145    def check_big_decompress_buffer(self, size, decompress_func):
146        data = b'x' * size
147        try:
148            compressed = zlib.compress(data, 1)
149        finally:
150            # Release memory
151            data = None
152        data = decompress_func(compressed)
153        # Sanity check
154        try:
155            self.assertEqual(len(data), size)
156            self.assertEqual(len(data.strip(b'x')), 0)
157        finally:
158            data = None
159
160
161class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
162    # Test compression in one go (whole message compression)
163    def test_speech(self):
164        x = zlib.compress(HAMLET_SCENE)
165        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
166
167    def test_keywords(self):
168        x = zlib.compress(HAMLET_SCENE, level=3)
169        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
170        with self.assertRaises(TypeError):
171            zlib.compress(data=HAMLET_SCENE, level=3)
172        self.assertEqual(zlib.decompress(x,
173                                         wbits=zlib.MAX_WBITS,
174                                         bufsize=zlib.DEF_BUF_SIZE),
175                         HAMLET_SCENE)
176
177    def test_speech128(self):
178        # compress more data
179        data = HAMLET_SCENE * 128
180        x = zlib.compress(data)
181        self.assertEqual(zlib.compress(bytearray(data)), x)
182        for ob in x, bytearray(x):
183            self.assertEqual(zlib.decompress(ob), data)
184
185    def test_incomplete_stream(self):
186        # A useful error message is given
187        x = zlib.compress(HAMLET_SCENE)
188        self.assertRaisesRegex(zlib.error,
189            "Error -5 while decompressing data: incomplete or truncated stream",
190            zlib.decompress, x[:-1])
191
192    # Memory use of the following functions takes into account overallocation
193
194    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
195    def test_big_compress_buffer(self, size):
196        compress = lambda s: zlib.compress(s, 1)
197        self.check_big_compress_buffer(size, compress)
198
199    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
200    def test_big_decompress_buffer(self, size):
201        self.check_big_decompress_buffer(size, zlib.decompress)
202
203    @bigmemtest(size=_4G, memuse=1)
204    def test_large_bufsize(self, size):
205        # Test decompress(bufsize) parameter greater than the internal limit
206        data = HAMLET_SCENE * 10
207        compressed = zlib.compress(data, 1)
208        self.assertEqual(zlib.decompress(compressed, 15, size), data)
209
210    def test_custom_bufsize(self):
211        data = HAMLET_SCENE * 10
212        compressed = zlib.compress(data, 1)
213        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
214
215    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
216    @bigmemtest(size=_4G + 100, memuse=4)
217    def test_64bit_compress(self, size):
218        data = b'x' * size
219        try:
220            comp = zlib.compress(data, 0)
221            self.assertEqual(zlib.decompress(comp), data)
222        finally:
223            comp = data = None
224
225
226class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
227    # Test compression object
228    def test_pair(self):
229        # straightforward compress/decompress objects
230        datasrc = HAMLET_SCENE * 128
231        datazip = zlib.compress(datasrc)
232        # should compress both bytes and bytearray data
233        for data in (datasrc, bytearray(datasrc)):
234            co = zlib.compressobj()
235            x1 = co.compress(data)
236            x2 = co.flush()
237            self.assertRaises(zlib.error, co.flush) # second flush should not work
238            self.assertEqual(x1 + x2, datazip)
239        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
240            dco = zlib.decompressobj()
241            y1 = dco.decompress(v1 + v2)
242            y2 = dco.flush()
243            self.assertEqual(data, y1 + y2)
244            self.assertIsInstance(dco.unconsumed_tail, bytes)
245            self.assertIsInstance(dco.unused_data, bytes)
246
247    def test_keywords(self):
248        level = 2
249        method = zlib.DEFLATED
250        wbits = -12
251        memLevel = 9
252        strategy = zlib.Z_FILTERED
253        co = zlib.compressobj(level=level,
254                              method=method,
255                              wbits=wbits,
256                              memLevel=memLevel,
257                              strategy=strategy,
258                              zdict=b"")
259        do = zlib.decompressobj(wbits=wbits, zdict=b"")
260        with self.assertRaises(TypeError):
261            co.compress(data=HAMLET_SCENE)
262        with self.assertRaises(TypeError):
263            do.decompress(data=zlib.compress(HAMLET_SCENE))
264        x = co.compress(HAMLET_SCENE) + co.flush()
265        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
266        self.assertEqual(HAMLET_SCENE, y)
267
268    def test_compressoptions(self):
269        # specify lots of options to compressobj()
270        level = 2
271        method = zlib.DEFLATED
272        wbits = -12
273        memLevel = 9
274        strategy = zlib.Z_FILTERED
275        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
276        x1 = co.compress(HAMLET_SCENE)
277        x2 = co.flush()
278        dco = zlib.decompressobj(wbits)
279        y1 = dco.decompress(x1 + x2)
280        y2 = dco.flush()
281        self.assertEqual(HAMLET_SCENE, y1 + y2)
282
283    def test_compressincremental(self):
284        # compress object in steps, decompress object as one-shot
285        data = HAMLET_SCENE * 128
286        co = zlib.compressobj()
287        bufs = []
288        for i in range(0, len(data), 256):
289            bufs.append(co.compress(data[i:i+256]))
290        bufs.append(co.flush())
291        combuf = b''.join(bufs)
292
293        dco = zlib.decompressobj()
294        y1 = dco.decompress(b''.join(bufs))
295        y2 = dco.flush()
296        self.assertEqual(data, y1 + y2)
297
298    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
299        # compress object in steps, decompress object in steps
300        source = source or HAMLET_SCENE
301        data = source * 128
302        co = zlib.compressobj()
303        bufs = []
304        for i in range(0, len(data), cx):
305            bufs.append(co.compress(data[i:i+cx]))
306        bufs.append(co.flush())
307        combuf = b''.join(bufs)
308
309        decombuf = zlib.decompress(combuf)
310        # Test type of return value
311        self.assertIsInstance(decombuf, bytes)
312
313        self.assertEqual(data, decombuf)
314
315        dco = zlib.decompressobj()
316        bufs = []
317        for i in range(0, len(combuf), dcx):
318            bufs.append(dco.decompress(combuf[i:i+dcx]))
319            self.assertEqual(b'', dco.unconsumed_tail, ########
320                             "(A) uct should be b'': not %d long" %
321                                       len(dco.unconsumed_tail))
322            self.assertEqual(b'', dco.unused_data)
323        if flush:
324            bufs.append(dco.flush())
325        else:
326            while True:
327                chunk = dco.decompress(b'')
328                if chunk:
329                    bufs.append(chunk)
330                else:
331                    break
332        self.assertEqual(b'', dco.unconsumed_tail, ########
333                         "(B) uct should be b'': not %d long" %
334                                       len(dco.unconsumed_tail))
335        self.assertEqual(b'', dco.unused_data)
336        self.assertEqual(data, b''.join(bufs))
337        # Failure means: "decompressobj with init options failed"
338
339    def test_decompincflush(self):
340        self.test_decompinc(flush=True)
341
342    def test_decompimax(self, source=None, cx=256, dcx=64):
343        # compress in steps, decompress in length-restricted steps
344        source = source or HAMLET_SCENE
345        # Check a decompression object with max_length specified
346        data = source * 128
347        co = zlib.compressobj()
348        bufs = []
349        for i in range(0, len(data), cx):
350            bufs.append(co.compress(data[i:i+cx]))
351        bufs.append(co.flush())
352        combuf = b''.join(bufs)
353        self.assertEqual(data, zlib.decompress(combuf),
354                         'compressed data failure')
355
356        dco = zlib.decompressobj()
357        bufs = []
358        cb = combuf
359        while cb:
360            #max_length = 1 + len(cb)//10
361            chunk = dco.decompress(cb, dcx)
362            self.assertFalse(len(chunk) > dcx,
363                    'chunk too big (%d>%d)' % (len(chunk), dcx))
364            bufs.append(chunk)
365            cb = dco.unconsumed_tail
366        bufs.append(dco.flush())
367        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
368
369    def test_decompressmaxlen(self, flush=False):
370        # Check a decompression object with max_length specified
371        data = HAMLET_SCENE * 128
372        co = zlib.compressobj()
373        bufs = []
374        for i in range(0, len(data), 256):
375            bufs.append(co.compress(data[i:i+256]))
376        bufs.append(co.flush())
377        combuf = b''.join(bufs)
378        self.assertEqual(data, zlib.decompress(combuf),
379                         'compressed data failure')
380
381        dco = zlib.decompressobj()
382        bufs = []
383        cb = combuf
384        while cb:
385            max_length = 1 + len(cb)//10
386            chunk = dco.decompress(cb, max_length)
387            self.assertFalse(len(chunk) > max_length,
388                        'chunk too big (%d>%d)' % (len(chunk),max_length))
389            bufs.append(chunk)
390            cb = dco.unconsumed_tail
391        if flush:
392            bufs.append(dco.flush())
393        else:
394            while chunk:
395                chunk = dco.decompress(b'', max_length)
396                self.assertFalse(len(chunk) > max_length,
397                            'chunk too big (%d>%d)' % (len(chunk),max_length))
398                bufs.append(chunk)
399        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
400
401    def test_decompressmaxlenflush(self):
402        self.test_decompressmaxlen(flush=True)
403
404    def test_maxlenmisc(self):
405        # Misc tests of max_length
406        dco = zlib.decompressobj()
407        self.assertRaises(ValueError, dco.decompress, b"", -1)
408        self.assertEqual(b'', dco.unconsumed_tail)
409
410    def test_maxlen_large(self):
411        # Sizes up to sys.maxsize should be accepted, although zlib is
412        # internally limited to expressing sizes with unsigned int
413        data = HAMLET_SCENE * 10
414        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
415        compressed = zlib.compress(data, 1)
416        dco = zlib.decompressobj()
417        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
418
419    def test_maxlen_custom(self):
420        data = HAMLET_SCENE * 10
421        compressed = zlib.compress(data, 1)
422        dco = zlib.decompressobj()
423        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
424
425    def test_clear_unconsumed_tail(self):
426        # Issue #12050: calling decompress() without providing max_length
427        # should clear the unconsumed_tail attribute.
428        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
429        dco = zlib.decompressobj()
430        ddata = dco.decompress(cdata, 1)
431        ddata += dco.decompress(dco.unconsumed_tail)
432        self.assertEqual(dco.unconsumed_tail, b"")
433
434    def test_flushes(self):
435        # Test flush() with the various options, using all the
436        # different levels in order to provide more variations.
437        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
438                    'Z_PARTIAL_FLUSH']
439
440        ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
441        # Z_BLOCK has a known failure prior to 1.2.5.3
442        if ver >= (1, 2, 5, 3):
443            sync_opt.append('Z_BLOCK')
444
445        sync_opt = [getattr(zlib, opt) for opt in sync_opt
446                    if hasattr(zlib, opt)]
447        data = HAMLET_SCENE * 8
448
449        for sync in sync_opt:
450            for level in range(10):
451                try:
452                    obj = zlib.compressobj( level )
453                    a = obj.compress( data[:3000] )
454                    b = obj.flush( sync )
455                    c = obj.compress( data[3000:] )
456                    d = obj.flush()
457                except:
458                    print("Error for flush mode={}, level={}"
459                          .format(sync, level))
460                    raise
461                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
462                                 data, ("Decompress failed: flush "
463                                        "mode=%i, level=%i") % (sync, level))
464                del obj
465
466    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
467                         'requires zlib.Z_SYNC_FLUSH')
468    def test_odd_flush(self):
469        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
470        import random
471        # Testing on 17K of "random" data
472
473        # Create compressor and decompressor objects
474        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
475        dco = zlib.decompressobj()
476
477        # Try 17K of data
478        # generate random data stream
479        try:
480            # In 2.3 and later, WichmannHill is the RNG of the bug report
481            gen = random.WichmannHill()
482        except AttributeError:
483            try:
484                # 2.2 called it Random
485                gen = random.Random()
486            except AttributeError:
487                # others might simply have a single RNG
488                gen = random
489        gen.seed(1)
490        data = gen.randbytes(17 * 1024)
491
492        # compress, sync-flush, and decompress
493        first = co.compress(data)
494        second = co.flush(zlib.Z_SYNC_FLUSH)
495        expanded = dco.decompress(first + second)
496
497        # if decompressed data is different from the input data, choke.
498        self.assertEqual(expanded, data, "17K random source doesn't match")
499
500    def test_empty_flush(self):
501        # Test that calling .flush() on unused objects works.
502        # (Bug #1083110 -- calling .flush() on decompress objects
503        # caused a core dump.)
504
505        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
506        self.assertTrue(co.flush())  # Returns a zlib header
507        dco = zlib.decompressobj()
508        self.assertEqual(dco.flush(), b"") # Returns nothing
509
510    def test_dictionary(self):
511        h = HAMLET_SCENE
512        # Build a simulated dictionary out of the words in HAMLET.
513        words = h.split()
514        random.shuffle(words)
515        zdict = b''.join(words)
516        # Use it to compress HAMLET.
517        co = zlib.compressobj(zdict=zdict)
518        cd = co.compress(h) + co.flush()
519        # Verify that it will decompress with the dictionary.
520        dco = zlib.decompressobj(zdict=zdict)
521        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
522        # Verify that it fails when not given the dictionary.
523        dco = zlib.decompressobj()
524        self.assertRaises(zlib.error, dco.decompress, cd)
525
526    def test_dictionary_streaming(self):
527        # This simulates the reuse of a compressor object for compressing
528        # several separate data streams.
529        co = zlib.compressobj(zdict=HAMLET_SCENE)
530        do = zlib.decompressobj(zdict=HAMLET_SCENE)
531        piece = HAMLET_SCENE[1000:1500]
532        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
533        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
534        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
535        self.assertEqual(do.decompress(d0), piece)
536        self.assertEqual(do.decompress(d1), piece[100:])
537        self.assertEqual(do.decompress(d2), piece[:-100])
538
539    def test_decompress_incomplete_stream(self):
540        # This is 'foo', deflated
541        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
542        # For the record
543        self.assertEqual(zlib.decompress(x), b'foo')
544        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
545        # Omitting the stream end works with decompressor objects
546        # (see issue #8672).
547        dco = zlib.decompressobj()
548        y = dco.decompress(x[:-5])
549        y += dco.flush()
550        self.assertEqual(y, b'foo')
551
552    def test_decompress_eof(self):
553        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
554        dco = zlib.decompressobj()
555        self.assertFalse(dco.eof)
556        dco.decompress(x[:-5])
557        self.assertFalse(dco.eof)
558        dco.decompress(x[-5:])
559        self.assertTrue(dco.eof)
560        dco.flush()
561        self.assertTrue(dco.eof)
562
563    def test_decompress_eof_incomplete_stream(self):
564        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
565        dco = zlib.decompressobj()
566        self.assertFalse(dco.eof)
567        dco.decompress(x[:-5])
568        self.assertFalse(dco.eof)
569        dco.flush()
570        self.assertFalse(dco.eof)
571
572    def test_decompress_unused_data(self):
573        # Repeated calls to decompress() after EOF should accumulate data in
574        # dco.unused_data, instead of just storing the arg to the last call.
575        source = b'abcdefghijklmnopqrstuvwxyz'
576        remainder = b'0123456789'
577        y = zlib.compress(source)
578        x = y + remainder
579        for maxlen in 0, 1000:
580            for step in 1, 2, len(y), len(x):
581                dco = zlib.decompressobj()
582                data = b''
583                for i in range(0, len(x), step):
584                    if i < len(y):
585                        self.assertEqual(dco.unused_data, b'')
586                    if maxlen == 0:
587                        data += dco.decompress(x[i : i + step])
588                        self.assertEqual(dco.unconsumed_tail, b'')
589                    else:
590                        data += dco.decompress(
591                                dco.unconsumed_tail + x[i : i + step], maxlen)
592                data += dco.flush()
593                self.assertTrue(dco.eof)
594                self.assertEqual(data, source)
595                self.assertEqual(dco.unconsumed_tail, b'')
596                self.assertEqual(dco.unused_data, remainder)
597
598    # issue27164
599    def test_decompress_raw_with_dictionary(self):
600        zdict = b'abcdefghijklmnopqrstuvwxyz'
601        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
602        comp = co.compress(zdict) + co.flush()
603        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
604        uncomp = dco.decompress(comp) + dco.flush()
605        self.assertEqual(zdict, uncomp)
606
607    def test_flush_with_freed_input(self):
608        # Issue #16411: decompressor accesses input to last decompress() call
609        # in flush(), even if this object has been freed in the meanwhile.
610        input1 = b'abcdefghijklmnopqrstuvwxyz'
611        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
612        data = zlib.compress(input1)
613        dco = zlib.decompressobj()
614        dco.decompress(data, 1)
615        del data
616        data = zlib.compress(input2)
617        self.assertEqual(dco.flush(), input1[1:])
618
619    @bigmemtest(size=_4G, memuse=1)
620    def test_flush_large_length(self, size):
621        # Test flush(length) parameter greater than internal limit UINT_MAX
622        input = HAMLET_SCENE * 10
623        data = zlib.compress(input, 1)
624        dco = zlib.decompressobj()
625        dco.decompress(data, 1)
626        self.assertEqual(dco.flush(size), input[1:])
627
628    def test_flush_custom_length(self):
629        input = HAMLET_SCENE * 10
630        data = zlib.compress(input, 1)
631        dco = zlib.decompressobj()
632        dco.decompress(data, 1)
633        self.assertEqual(dco.flush(CustomInt()), input[1:])
634
635    @requires_Compress_copy
636    def test_compresscopy(self):
637        # Test copying a compression object
638        data0 = HAMLET_SCENE
639        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
640        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
641            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
642            bufs0 = []
643            bufs0.append(c0.compress(data0))
644
645            c1 = func(c0)
646            bufs1 = bufs0[:]
647
648            bufs0.append(c0.compress(data0))
649            bufs0.append(c0.flush())
650            s0 = b''.join(bufs0)
651
652            bufs1.append(c1.compress(data1))
653            bufs1.append(c1.flush())
654            s1 = b''.join(bufs1)
655
656            self.assertEqual(zlib.decompress(s0),data0+data0)
657            self.assertEqual(zlib.decompress(s1),data0+data1)
658
659    @requires_Compress_copy
660    def test_badcompresscopy(self):
661        # Test copying a compression object in an inconsistent state
662        c = zlib.compressobj()
663        c.compress(HAMLET_SCENE)
664        c.flush()
665        self.assertRaises(ValueError, c.copy)
666        self.assertRaises(ValueError, copy.copy, c)
667        self.assertRaises(ValueError, copy.deepcopy, c)
668
669    @requires_Decompress_copy
670    def test_decompresscopy(self):
671        # Test copying a decompression object
672        data = HAMLET_SCENE
673        comp = zlib.compress(data)
674        # Test type of return value
675        self.assertIsInstance(comp, bytes)
676
677        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
678            d0 = zlib.decompressobj()
679            bufs0 = []
680            bufs0.append(d0.decompress(comp[:32]))
681
682            d1 = func(d0)
683            bufs1 = bufs0[:]
684
685            bufs0.append(d0.decompress(comp[32:]))
686            s0 = b''.join(bufs0)
687
688            bufs1.append(d1.decompress(comp[32:]))
689            s1 = b''.join(bufs1)
690
691            self.assertEqual(s0,s1)
692            self.assertEqual(s0,data)
693
694    @requires_Decompress_copy
695    def test_baddecompresscopy(self):
696        # Test copying a compression object in an inconsistent state
697        data = zlib.compress(HAMLET_SCENE)
698        d = zlib.decompressobj()
699        d.decompress(data)
700        d.flush()
701        self.assertRaises(ValueError, d.copy)
702        self.assertRaises(ValueError, copy.copy, d)
703        self.assertRaises(ValueError, copy.deepcopy, d)
704
705    def test_compresspickle(self):
706        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
707            with self.assertRaises((TypeError, pickle.PicklingError)):
708                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
709
710    def test_decompresspickle(self):
711        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
712            with self.assertRaises((TypeError, pickle.PicklingError)):
713                pickle.dumps(zlib.decompressobj(), proto)
714
715    # Memory use of the following functions takes into account overallocation
716
717    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
718    def test_big_compress_buffer(self, size):
719        c = zlib.compressobj(1)
720        compress = lambda s: c.compress(s) + c.flush()
721        self.check_big_compress_buffer(size, compress)
722
723    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
724    def test_big_decompress_buffer(self, size):
725        d = zlib.decompressobj()
726        decompress = lambda s: d.decompress(s) + d.flush()
727        self.check_big_decompress_buffer(size, decompress)
728
729    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
730    @bigmemtest(size=_4G + 100, memuse=4)
731    def test_64bit_compress(self, size):
732        data = b'x' * size
733        co = zlib.compressobj(0)
734        do = zlib.decompressobj()
735        try:
736            comp = co.compress(data) + co.flush()
737            uncomp = do.decompress(comp) + do.flush()
738            self.assertEqual(uncomp, data)
739        finally:
740            comp = uncomp = data = None
741
742    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
743    @bigmemtest(size=_4G + 100, memuse=3)
744    def test_large_unused_data(self, size):
745        data = b'abcdefghijklmnop'
746        unused = b'x' * size
747        comp = zlib.compress(data) + unused
748        do = zlib.decompressobj()
749        try:
750            uncomp = do.decompress(comp) + do.flush()
751            self.assertEqual(unused, do.unused_data)
752            self.assertEqual(uncomp, data)
753        finally:
754            unused = comp = do = None
755
756    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
757    @bigmemtest(size=_4G + 100, memuse=5)
758    def test_large_unconsumed_tail(self, size):
759        data = b'x' * size
760        do = zlib.decompressobj()
761        try:
762            comp = zlib.compress(data, 0)
763            uncomp = do.decompress(comp, 1) + do.flush()
764            self.assertEqual(uncomp, data)
765            self.assertEqual(do.unconsumed_tail, b'')
766        finally:
767            comp = uncomp = data = None
768
769    def test_wbits(self):
770        # wbits=0 only supported since zlib v1.2.3.5
771        # Register "1.2.3" as "1.2.3.0"
772        # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
773        v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
774        if len(v) < 4:
775            v.append('0')
776        elif not v[-1].isnumeric():
777            v[-1] = '0'
778
779        v = tuple(map(int, v))
780        supports_wbits_0 = v >= (1, 2, 3, 5)
781
782        co = zlib.compressobj(level=1, wbits=15)
783        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
784        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
785        if supports_wbits_0:
786            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
787        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
788        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
789            zlib.decompress(zlib15, 14)
790        dco = zlib.decompressobj(wbits=32 + 15)
791        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
792        dco = zlib.decompressobj(wbits=14)
793        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
794            dco.decompress(zlib15)
795
796        co = zlib.compressobj(level=1, wbits=9)
797        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
798        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
799        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
800        if supports_wbits_0:
801            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
802        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
803        dco = zlib.decompressobj(wbits=32 + 9)
804        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
805
806        co = zlib.compressobj(level=1, wbits=-15)
807        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
808        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
809        dco = zlib.decompressobj(wbits=-15)
810        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
811
812        co = zlib.compressobj(level=1, wbits=-9)
813        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
814        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
815        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
816        dco = zlib.decompressobj(wbits=-9)
817        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
818
819        co = zlib.compressobj(level=1, wbits=16 + 15)
820        gzip = co.compress(HAMLET_SCENE) + co.flush()
821        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
822        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
823        dco = zlib.decompressobj(32 + 15)
824        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
825
826
827def choose_lines(source, number, seed=None, generator=random):
828    """Return a list of number lines randomly chosen from the source"""
829    if seed is not None:
830        generator.seed(seed)
831    sources = source.split('\n')
832    return [generator.choice(sources) for n in range(number)]
833
834
835HAMLET_SCENE = b"""
836LAERTES
837
838       O, fear me not.
839       I stay too long: but here my father comes.
840
841       Enter POLONIUS
842
843       A double blessing is a double grace,
844       Occasion smiles upon a second leave.
845
846LORD POLONIUS
847
848       Yet here, Laertes! aboard, aboard, for shame!
849       The wind sits in the shoulder of your sail,
850       And you are stay'd for. There; my blessing with thee!
851       And these few precepts in thy memory
852       See thou character. Give thy thoughts no tongue,
853       Nor any unproportioned thought his act.
854       Be thou familiar, but by no means vulgar.
855       Those friends thou hast, and their adoption tried,
856       Grapple them to thy soul with hoops of steel;
857       But do not dull thy palm with entertainment
858       Of each new-hatch'd, unfledged comrade. Beware
859       Of entrance to a quarrel, but being in,
860       Bear't that the opposed may beware of thee.
861       Give every man thy ear, but few thy voice;
862       Take each man's censure, but reserve thy judgment.
863       Costly thy habit as thy purse can buy,
864       But not express'd in fancy; rich, not gaudy;
865       For the apparel oft proclaims the man,
866       And they in France of the best rank and station
867       Are of a most select and generous chief in that.
868       Neither a borrower nor a lender be;
869       For loan oft loses both itself and friend,
870       And borrowing dulls the edge of husbandry.
871       This above all: to thine ownself be true,
872       And it must follow, as the night the day,
873       Thou canst not then be false to any man.
874       Farewell: my blessing season this in thee!
875
876LAERTES
877
878       Most humbly do I take my leave, my lord.
879
880LORD POLONIUS
881
882       The time invites you; go; your servants tend.
883
884LAERTES
885
886       Farewell, Ophelia; and remember well
887       What I have said to you.
888
889OPHELIA
890
891       'Tis in my memory lock'd,
892       And you yourself shall keep the key of it.
893
894LAERTES
895
896       Farewell.
897"""
898
899
900class CustomInt:
901    def __index__(self):
902        return 100
903
904
905if __name__ == "__main__":
906    unittest.main()
907