• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.support import import_helper
4import binascii
5import copy
6import pickle
7import random
8import sys
9from test.support import bigmemtest, _1G, _4G
10
11
12zlib = import_helper.import_module('zlib')
13
14requires_Compress_copy = unittest.skipUnless(
15        hasattr(zlib.compressobj(), "copy"),
16        'requires Compress.copy()')
17requires_Decompress_copy = unittest.skipUnless(
18        hasattr(zlib.decompressobj(), "copy"),
19        'requires Decompress.copy()')
20
21
22class VersionTestCase(unittest.TestCase):
23
24    def test_library_version(self):
25        # Test that the major version of the actual library in use matches the
26        # major version that we were compiled against. We can't guarantee that
27        # the minor versions will match (even on the machine on which the module
28        # was compiled), and the API is stable between minor versions, so
29        # testing only the major versions avoids spurious failures.
30        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
31
32
33class ChecksumTestCase(unittest.TestCase):
34    # checksum test cases
35    def test_crc32start(self):
36        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
37        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
38
39    def test_crc32empty(self):
40        self.assertEqual(zlib.crc32(b"", 0), 0)
41        self.assertEqual(zlib.crc32(b"", 1), 1)
42        self.assertEqual(zlib.crc32(b"", 432), 432)
43
44    def test_adler32start(self):
45        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
46        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
47
48    def test_adler32empty(self):
49        self.assertEqual(zlib.adler32(b"", 0), 0)
50        self.assertEqual(zlib.adler32(b"", 1), 1)
51        self.assertEqual(zlib.adler32(b"", 432), 432)
52
53    def test_penguins(self):
54        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
55        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
56        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
57        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
58
59        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
60        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
61
62    def test_crc32_adler32_unsigned(self):
63        foo = b'abcdefghijklmnop'
64        # explicitly test signed behavior
65        self.assertEqual(zlib.crc32(foo), 2486878355)
66        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
67        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
68        self.assertEqual(zlib.adler32(b'spam'), 72286642)
69
70    def test_same_as_binascii_crc32(self):
71        foo = b'abcdefghijklmnop'
72        crc = 2486878355
73        self.assertEqual(binascii.crc32(foo), crc)
74        self.assertEqual(zlib.crc32(foo), crc)
75        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
76
77
78# Issue #10276 - check that inputs >=4 GiB are handled correctly.
79class ChecksumBigBufferTestCase(unittest.TestCase):
80
81    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
82    def test_big_buffer(self, size):
83        data = b"nyan" * (_1G + 1)
84        self.assertEqual(zlib.crc32(data), 1044521549)
85        self.assertEqual(zlib.adler32(data), 2256789997)
86
87
88class ExceptionTestCase(unittest.TestCase):
89    # make sure we generate some expected errors
90    def test_badlevel(self):
91        # specifying compression level out of range causes an error
92        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
93        # accepts 0 too)
94        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
95
96    def test_badargs(self):
97        self.assertRaises(TypeError, zlib.adler32)
98        self.assertRaises(TypeError, zlib.crc32)
99        self.assertRaises(TypeError, zlib.compress)
100        self.assertRaises(TypeError, zlib.decompress)
101        for arg in (42, None, '', 'abc', (), []):
102            self.assertRaises(TypeError, zlib.adler32, arg)
103            self.assertRaises(TypeError, zlib.crc32, arg)
104            self.assertRaises(TypeError, zlib.compress, arg)
105            self.assertRaises(TypeError, zlib.decompress, arg)
106
107    def test_badcompressobj(self):
108        # verify failure on building compress object with bad params
109        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
110        # specifying total bits too large causes an error
111        self.assertRaises(ValueError,
112                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
113
114    def test_baddecompressobj(self):
115        # verify failure on building decompress object with bad params
116        self.assertRaises(ValueError, zlib.decompressobj, -1)
117
118    def test_decompressobj_badflush(self):
119        # verify failure on calling decompressobj.flush with bad params
120        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
121        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
122
123    @support.cpython_only
124    def test_overflow(self):
125        with self.assertRaisesRegex(OverflowError, 'int too large'):
126            zlib.decompress(b'', 15, sys.maxsize + 1)
127        with self.assertRaisesRegex(OverflowError, 'int too large'):
128            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
129        with self.assertRaisesRegex(OverflowError, 'int too large'):
130            zlib.decompressobj().flush(sys.maxsize + 1)
131
132    @support.cpython_only
133    def test_disallow_instantiation(self):
134        # Ensure that the type disallows instantiation (bpo-43916)
135        support.check_disallow_instantiation(self, type(zlib.compressobj()))
136        support.check_disallow_instantiation(self, type(zlib.decompressobj()))
137
138
139class BaseCompressTestCase(object):
140    def check_big_compress_buffer(self, size, compress_func):
141        _1M = 1024 * 1024
142        # Generate 10 MiB worth of random, and expand it by repeating it.
143        # The assumption is that zlib's memory is not big enough to exploit
144        # such spread out redundancy.
145        data = random.randbytes(_1M * 10)
146        data = data * (size // len(data) + 1)
147        try:
148            compress_func(data)
149        finally:
150            # Release memory
151            data = None
152
153    def check_big_decompress_buffer(self, size, decompress_func):
154        data = b'x' * size
155        try:
156            compressed = zlib.compress(data, 1)
157        finally:
158            # Release memory
159            data = None
160        data = decompress_func(compressed)
161        # Sanity check
162        try:
163            self.assertEqual(len(data), size)
164            self.assertEqual(len(data.strip(b'x')), 0)
165        finally:
166            data = None
167
168
169class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
170    # Test compression in one go (whole message compression)
171    def test_speech(self):
172        x = zlib.compress(HAMLET_SCENE)
173        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
174
175    def test_keywords(self):
176        x = zlib.compress(HAMLET_SCENE, level=3)
177        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
178        with self.assertRaises(TypeError):
179            zlib.compress(data=HAMLET_SCENE, level=3)
180        self.assertEqual(zlib.decompress(x,
181                                         wbits=zlib.MAX_WBITS,
182                                         bufsize=zlib.DEF_BUF_SIZE),
183                         HAMLET_SCENE)
184
185    def test_speech128(self):
186        # compress more data
187        data = HAMLET_SCENE * 128
188        x = zlib.compress(data)
189        self.assertEqual(zlib.compress(bytearray(data)), x)
190        for ob in x, bytearray(x):
191            self.assertEqual(zlib.decompress(ob), data)
192
193    def test_incomplete_stream(self):
194        # A useful error message is given
195        x = zlib.compress(HAMLET_SCENE)
196        self.assertRaisesRegex(zlib.error,
197            "Error -5 while decompressing data: incomplete or truncated stream",
198            zlib.decompress, x[:-1])
199
200    # Memory use of the following functions takes into account overallocation
201
202    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
203    def test_big_compress_buffer(self, size):
204        compress = lambda s: zlib.compress(s, 1)
205        self.check_big_compress_buffer(size, compress)
206
207    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
208    def test_big_decompress_buffer(self, size):
209        self.check_big_decompress_buffer(size, zlib.decompress)
210
211    @bigmemtest(size=_4G, memuse=1)
212    def test_large_bufsize(self, size):
213        # Test decompress(bufsize) parameter greater than the internal limit
214        data = HAMLET_SCENE * 10
215        compressed = zlib.compress(data, 1)
216        self.assertEqual(zlib.decompress(compressed, 15, size), data)
217
218    def test_custom_bufsize(self):
219        data = HAMLET_SCENE * 10
220        compressed = zlib.compress(data, 1)
221        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
222
223    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
224    @bigmemtest(size=_4G + 100, memuse=4)
225    def test_64bit_compress(self, size):
226        data = b'x' * size
227        try:
228            comp = zlib.compress(data, 0)
229            self.assertEqual(zlib.decompress(comp), data)
230        finally:
231            comp = data = None
232
233
234class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
235    # Test compression object
236    def test_pair(self):
237        # straightforward compress/decompress objects
238        datasrc = HAMLET_SCENE * 128
239        datazip = zlib.compress(datasrc)
240        # should compress both bytes and bytearray data
241        for data in (datasrc, bytearray(datasrc)):
242            co = zlib.compressobj()
243            x1 = co.compress(data)
244            x2 = co.flush()
245            self.assertRaises(zlib.error, co.flush) # second flush should not work
246            self.assertEqual(x1 + x2, datazip)
247        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
248            dco = zlib.decompressobj()
249            y1 = dco.decompress(v1 + v2)
250            y2 = dco.flush()
251            self.assertEqual(data, y1 + y2)
252            self.assertIsInstance(dco.unconsumed_tail, bytes)
253            self.assertIsInstance(dco.unused_data, bytes)
254
255    def test_keywords(self):
256        level = 2
257        method = zlib.DEFLATED
258        wbits = -12
259        memLevel = 9
260        strategy = zlib.Z_FILTERED
261        co = zlib.compressobj(level=level,
262                              method=method,
263                              wbits=wbits,
264                              memLevel=memLevel,
265                              strategy=strategy,
266                              zdict=b"")
267        do = zlib.decompressobj(wbits=wbits, zdict=b"")
268        with self.assertRaises(TypeError):
269            co.compress(data=HAMLET_SCENE)
270        with self.assertRaises(TypeError):
271            do.decompress(data=zlib.compress(HAMLET_SCENE))
272        x = co.compress(HAMLET_SCENE) + co.flush()
273        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
274        self.assertEqual(HAMLET_SCENE, y)
275
276    def test_compressoptions(self):
277        # specify lots of options to compressobj()
278        level = 2
279        method = zlib.DEFLATED
280        wbits = -12
281        memLevel = 9
282        strategy = zlib.Z_FILTERED
283        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
284        x1 = co.compress(HAMLET_SCENE)
285        x2 = co.flush()
286        dco = zlib.decompressobj(wbits)
287        y1 = dco.decompress(x1 + x2)
288        y2 = dco.flush()
289        self.assertEqual(HAMLET_SCENE, y1 + y2)
290
291    def test_compressincremental(self):
292        # compress object in steps, decompress object as one-shot
293        data = HAMLET_SCENE * 128
294        co = zlib.compressobj()
295        bufs = []
296        for i in range(0, len(data), 256):
297            bufs.append(co.compress(data[i:i+256]))
298        bufs.append(co.flush())
299        combuf = b''.join(bufs)
300
301        dco = zlib.decompressobj()
302        y1 = dco.decompress(b''.join(bufs))
303        y2 = dco.flush()
304        self.assertEqual(data, y1 + y2)
305
306    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
307        # compress object in steps, decompress object in steps
308        source = source or HAMLET_SCENE
309        data = source * 128
310        co = zlib.compressobj()
311        bufs = []
312        for i in range(0, len(data), cx):
313            bufs.append(co.compress(data[i:i+cx]))
314        bufs.append(co.flush())
315        combuf = b''.join(bufs)
316
317        decombuf = zlib.decompress(combuf)
318        # Test type of return value
319        self.assertIsInstance(decombuf, bytes)
320
321        self.assertEqual(data, decombuf)
322
323        dco = zlib.decompressobj()
324        bufs = []
325        for i in range(0, len(combuf), dcx):
326            bufs.append(dco.decompress(combuf[i:i+dcx]))
327            self.assertEqual(b'', dco.unconsumed_tail, ########
328                             "(A) uct should be b'': not %d long" %
329                                       len(dco.unconsumed_tail))
330            self.assertEqual(b'', dco.unused_data)
331        if flush:
332            bufs.append(dco.flush())
333        else:
334            while True:
335                chunk = dco.decompress(b'')
336                if chunk:
337                    bufs.append(chunk)
338                else:
339                    break
340        self.assertEqual(b'', dco.unconsumed_tail, ########
341                         "(B) uct should be b'': not %d long" %
342                                       len(dco.unconsumed_tail))
343        self.assertEqual(b'', dco.unused_data)
344        self.assertEqual(data, b''.join(bufs))
345        # Failure means: "decompressobj with init options failed"
346
347    def test_decompincflush(self):
348        self.test_decompinc(flush=True)
349
350    def test_decompimax(self, source=None, cx=256, dcx=64):
351        # compress in steps, decompress in length-restricted steps
352        source = source or HAMLET_SCENE
353        # Check a decompression object with max_length specified
354        data = source * 128
355        co = zlib.compressobj()
356        bufs = []
357        for i in range(0, len(data), cx):
358            bufs.append(co.compress(data[i:i+cx]))
359        bufs.append(co.flush())
360        combuf = b''.join(bufs)
361        self.assertEqual(data, zlib.decompress(combuf),
362                         'compressed data failure')
363
364        dco = zlib.decompressobj()
365        bufs = []
366        cb = combuf
367        while cb:
368            #max_length = 1 + len(cb)//10
369            chunk = dco.decompress(cb, dcx)
370            self.assertFalse(len(chunk) > dcx,
371                    'chunk too big (%d>%d)' % (len(chunk), dcx))
372            bufs.append(chunk)
373            cb = dco.unconsumed_tail
374        bufs.append(dco.flush())
375        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
376
377    def test_decompressmaxlen(self, flush=False):
378        # Check a decompression object with max_length specified
379        data = HAMLET_SCENE * 128
380        co = zlib.compressobj()
381        bufs = []
382        for i in range(0, len(data), 256):
383            bufs.append(co.compress(data[i:i+256]))
384        bufs.append(co.flush())
385        combuf = b''.join(bufs)
386        self.assertEqual(data, zlib.decompress(combuf),
387                         'compressed data failure')
388
389        dco = zlib.decompressobj()
390        bufs = []
391        cb = combuf
392        while cb:
393            max_length = 1 + len(cb)//10
394            chunk = dco.decompress(cb, max_length)
395            self.assertFalse(len(chunk) > max_length,
396                        'chunk too big (%d>%d)' % (len(chunk),max_length))
397            bufs.append(chunk)
398            cb = dco.unconsumed_tail
399        if flush:
400            bufs.append(dco.flush())
401        else:
402            while chunk:
403                chunk = dco.decompress(b'', max_length)
404                self.assertFalse(len(chunk) > max_length,
405                            'chunk too big (%d>%d)' % (len(chunk),max_length))
406                bufs.append(chunk)
407        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
408
409    def test_decompressmaxlenflush(self):
410        self.test_decompressmaxlen(flush=True)
411
412    def test_maxlenmisc(self):
413        # Misc tests of max_length
414        dco = zlib.decompressobj()
415        self.assertRaises(ValueError, dco.decompress, b"", -1)
416        self.assertEqual(b'', dco.unconsumed_tail)
417
418    def test_maxlen_large(self):
419        # Sizes up to sys.maxsize should be accepted, although zlib is
420        # internally limited to expressing sizes with unsigned int
421        data = HAMLET_SCENE * 10
422        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
423        compressed = zlib.compress(data, 1)
424        dco = zlib.decompressobj()
425        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
426
427    def test_maxlen_custom(self):
428        data = HAMLET_SCENE * 10
429        compressed = zlib.compress(data, 1)
430        dco = zlib.decompressobj()
431        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
432
433    def test_clear_unconsumed_tail(self):
434        # Issue #12050: calling decompress() without providing max_length
435        # should clear the unconsumed_tail attribute.
436        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
437        dco = zlib.decompressobj()
438        ddata = dco.decompress(cdata, 1)
439        ddata += dco.decompress(dco.unconsumed_tail)
440        self.assertEqual(dco.unconsumed_tail, b"")
441
442    def test_flushes(self):
443        # Test flush() with the various options, using all the
444        # different levels in order to provide more variations.
445        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
446                    'Z_PARTIAL_FLUSH']
447
448        ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
449        # Z_BLOCK has a known failure prior to 1.2.5.3
450        if ver >= (1, 2, 5, 3):
451            sync_opt.append('Z_BLOCK')
452
453        sync_opt = [getattr(zlib, opt) for opt in sync_opt
454                    if hasattr(zlib, opt)]
455        data = HAMLET_SCENE * 8
456
457        for sync in sync_opt:
458            for level in range(10):
459                try:
460                    obj = zlib.compressobj( level )
461                    a = obj.compress( data[:3000] )
462                    b = obj.flush( sync )
463                    c = obj.compress( data[3000:] )
464                    d = obj.flush()
465                except:
466                    print("Error for flush mode={}, level={}"
467                          .format(sync, level))
468                    raise
469                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
470                                 data, ("Decompress failed: flush "
471                                        "mode=%i, level=%i") % (sync, level))
472                del obj
473
474    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
475                         'requires zlib.Z_SYNC_FLUSH')
476    def test_odd_flush(self):
477        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
478        import random
479        # Testing on 17K of "random" data
480
481        # Create compressor and decompressor objects
482        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
483        dco = zlib.decompressobj()
484
485        # Try 17K of data
486        # generate random data stream
487        try:
488            # In 2.3 and later, WichmannHill is the RNG of the bug report
489            gen = random.WichmannHill()
490        except AttributeError:
491            try:
492                # 2.2 called it Random
493                gen = random.Random()
494            except AttributeError:
495                # others might simply have a single RNG
496                gen = random
497        gen.seed(1)
498        data = gen.randbytes(17 * 1024)
499
500        # compress, sync-flush, and decompress
501        first = co.compress(data)
502        second = co.flush(zlib.Z_SYNC_FLUSH)
503        expanded = dco.decompress(first + second)
504
505        # if decompressed data is different from the input data, choke.
506        self.assertEqual(expanded, data, "17K random source doesn't match")
507
508    def test_empty_flush(self):
509        # Test that calling .flush() on unused objects works.
510        # (Bug #1083110 -- calling .flush() on decompress objects
511        # caused a core dump.)
512
513        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
514        self.assertTrue(co.flush())  # Returns a zlib header
515        dco = zlib.decompressobj()
516        self.assertEqual(dco.flush(), b"") # Returns nothing
517
518    def test_dictionary(self):
519        h = HAMLET_SCENE
520        # Build a simulated dictionary out of the words in HAMLET.
521        words = h.split()
522        random.shuffle(words)
523        zdict = b''.join(words)
524        # Use it to compress HAMLET.
525        co = zlib.compressobj(zdict=zdict)
526        cd = co.compress(h) + co.flush()
527        # Verify that it will decompress with the dictionary.
528        dco = zlib.decompressobj(zdict=zdict)
529        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
530        # Verify that it fails when not given the dictionary.
531        dco = zlib.decompressobj()
532        self.assertRaises(zlib.error, dco.decompress, cd)
533
534    def test_dictionary_streaming(self):
535        # This simulates the reuse of a compressor object for compressing
536        # several separate data streams.
537        co = zlib.compressobj(zdict=HAMLET_SCENE)
538        do = zlib.decompressobj(zdict=HAMLET_SCENE)
539        piece = HAMLET_SCENE[1000:1500]
540        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
541        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
542        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
543        self.assertEqual(do.decompress(d0), piece)
544        self.assertEqual(do.decompress(d1), piece[100:])
545        self.assertEqual(do.decompress(d2), piece[:-100])
546
547    def test_decompress_incomplete_stream(self):
548        # This is 'foo', deflated
549        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
550        # For the record
551        self.assertEqual(zlib.decompress(x), b'foo')
552        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
553        # Omitting the stream end works with decompressor objects
554        # (see issue #8672).
555        dco = zlib.decompressobj()
556        y = dco.decompress(x[:-5])
557        y += dco.flush()
558        self.assertEqual(y, b'foo')
559
560    def test_decompress_eof(self):
561        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
562        dco = zlib.decompressobj()
563        self.assertFalse(dco.eof)
564        dco.decompress(x[:-5])
565        self.assertFalse(dco.eof)
566        dco.decompress(x[-5:])
567        self.assertTrue(dco.eof)
568        dco.flush()
569        self.assertTrue(dco.eof)
570
571    def test_decompress_eof_incomplete_stream(self):
572        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
573        dco = zlib.decompressobj()
574        self.assertFalse(dco.eof)
575        dco.decompress(x[:-5])
576        self.assertFalse(dco.eof)
577        dco.flush()
578        self.assertFalse(dco.eof)
579
580    def test_decompress_unused_data(self):
581        # Repeated calls to decompress() after EOF should accumulate data in
582        # dco.unused_data, instead of just storing the arg to the last call.
583        source = b'abcdefghijklmnopqrstuvwxyz'
584        remainder = b'0123456789'
585        y = zlib.compress(source)
586        x = y + remainder
587        for maxlen in 0, 1000:
588            for step in 1, 2, len(y), len(x):
589                dco = zlib.decompressobj()
590                data = b''
591                for i in range(0, len(x), step):
592                    if i < len(y):
593                        self.assertEqual(dco.unused_data, b'')
594                    if maxlen == 0:
595                        data += dco.decompress(x[i : i + step])
596                        self.assertEqual(dco.unconsumed_tail, b'')
597                    else:
598                        data += dco.decompress(
599                                dco.unconsumed_tail + x[i : i + step], maxlen)
600                data += dco.flush()
601                self.assertTrue(dco.eof)
602                self.assertEqual(data, source)
603                self.assertEqual(dco.unconsumed_tail, b'')
604                self.assertEqual(dco.unused_data, remainder)
605
606    # issue27164
607    def test_decompress_raw_with_dictionary(self):
608        zdict = b'abcdefghijklmnopqrstuvwxyz'
609        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
610        comp = co.compress(zdict) + co.flush()
611        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
612        uncomp = dco.decompress(comp) + dco.flush()
613        self.assertEqual(zdict, uncomp)
614
615    def test_flush_with_freed_input(self):
616        # Issue #16411: decompressor accesses input to last decompress() call
617        # in flush(), even if this object has been freed in the meanwhile.
618        input1 = b'abcdefghijklmnopqrstuvwxyz'
619        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
620        data = zlib.compress(input1)
621        dco = zlib.decompressobj()
622        dco.decompress(data, 1)
623        del data
624        data = zlib.compress(input2)
625        self.assertEqual(dco.flush(), input1[1:])
626
627    @bigmemtest(size=_4G, memuse=1)
628    def test_flush_large_length(self, size):
629        # Test flush(length) parameter greater than internal limit UINT_MAX
630        input = HAMLET_SCENE * 10
631        data = zlib.compress(input, 1)
632        dco = zlib.decompressobj()
633        dco.decompress(data, 1)
634        self.assertEqual(dco.flush(size), input[1:])
635
636    def test_flush_custom_length(self):
637        input = HAMLET_SCENE * 10
638        data = zlib.compress(input, 1)
639        dco = zlib.decompressobj()
640        dco.decompress(data, 1)
641        self.assertEqual(dco.flush(CustomInt()), input[1:])
642
643    @requires_Compress_copy
644    def test_compresscopy(self):
645        # Test copying a compression object
646        data0 = HAMLET_SCENE
647        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
648        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
649            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
650            bufs0 = []
651            bufs0.append(c0.compress(data0))
652
653            c1 = func(c0)
654            bufs1 = bufs0[:]
655
656            bufs0.append(c0.compress(data0))
657            bufs0.append(c0.flush())
658            s0 = b''.join(bufs0)
659
660            bufs1.append(c1.compress(data1))
661            bufs1.append(c1.flush())
662            s1 = b''.join(bufs1)
663
664            self.assertEqual(zlib.decompress(s0),data0+data0)
665            self.assertEqual(zlib.decompress(s1),data0+data1)
666
667    @requires_Compress_copy
668    def test_badcompresscopy(self):
669        # Test copying a compression object in an inconsistent state
670        c = zlib.compressobj()
671        c.compress(HAMLET_SCENE)
672        c.flush()
673        self.assertRaises(ValueError, c.copy)
674        self.assertRaises(ValueError, copy.copy, c)
675        self.assertRaises(ValueError, copy.deepcopy, c)
676
677    @requires_Decompress_copy
678    def test_decompresscopy(self):
679        # Test copying a decompression object
680        data = HAMLET_SCENE
681        comp = zlib.compress(data)
682        # Test type of return value
683        self.assertIsInstance(comp, bytes)
684
685        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
686            d0 = zlib.decompressobj()
687            bufs0 = []
688            bufs0.append(d0.decompress(comp[:32]))
689
690            d1 = func(d0)
691            bufs1 = bufs0[:]
692
693            bufs0.append(d0.decompress(comp[32:]))
694            s0 = b''.join(bufs0)
695
696            bufs1.append(d1.decompress(comp[32:]))
697            s1 = b''.join(bufs1)
698
699            self.assertEqual(s0,s1)
700            self.assertEqual(s0,data)
701
702    @requires_Decompress_copy
703    def test_baddecompresscopy(self):
704        # Test copying a compression object in an inconsistent state
705        data = zlib.compress(HAMLET_SCENE)
706        d = zlib.decompressobj()
707        d.decompress(data)
708        d.flush()
709        self.assertRaises(ValueError, d.copy)
710        self.assertRaises(ValueError, copy.copy, d)
711        self.assertRaises(ValueError, copy.deepcopy, d)
712
713    def test_compresspickle(self):
714        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
715            with self.assertRaises((TypeError, pickle.PicklingError)):
716                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
717
718    def test_decompresspickle(self):
719        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
720            with self.assertRaises((TypeError, pickle.PicklingError)):
721                pickle.dumps(zlib.decompressobj(), proto)
722
723    # Memory use of the following functions takes into account overallocation
724
725    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
726    def test_big_compress_buffer(self, size):
727        c = zlib.compressobj(1)
728        compress = lambda s: c.compress(s) + c.flush()
729        self.check_big_compress_buffer(size, compress)
730
731    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
732    def test_big_decompress_buffer(self, size):
733        d = zlib.decompressobj()
734        decompress = lambda s: d.decompress(s) + d.flush()
735        self.check_big_decompress_buffer(size, decompress)
736
737    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
738    @bigmemtest(size=_4G + 100, memuse=4)
739    def test_64bit_compress(self, size):
740        data = b'x' * size
741        co = zlib.compressobj(0)
742        do = zlib.decompressobj()
743        try:
744            comp = co.compress(data) + co.flush()
745            uncomp = do.decompress(comp) + do.flush()
746            self.assertEqual(uncomp, data)
747        finally:
748            comp = uncomp = data = None
749
750    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
751    @bigmemtest(size=_4G + 100, memuse=3)
752    def test_large_unused_data(self, size):
753        data = b'abcdefghijklmnop'
754        unused = b'x' * size
755        comp = zlib.compress(data) + unused
756        do = zlib.decompressobj()
757        try:
758            uncomp = do.decompress(comp) + do.flush()
759            self.assertEqual(unused, do.unused_data)
760            self.assertEqual(uncomp, data)
761        finally:
762            unused = comp = do = None
763
764    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
765    @bigmemtest(size=_4G + 100, memuse=5)
766    def test_large_unconsumed_tail(self, size):
767        data = b'x' * size
768        do = zlib.decompressobj()
769        try:
770            comp = zlib.compress(data, 0)
771            uncomp = do.decompress(comp, 1) + do.flush()
772            self.assertEqual(uncomp, data)
773            self.assertEqual(do.unconsumed_tail, b'')
774        finally:
775            comp = uncomp = data = None
776
777    def test_wbits(self):
778        # wbits=0 only supported since zlib v1.2.3.5
779        # Register "1.2.3" as "1.2.3.0"
780        # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
781        v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
782        if len(v) < 4:
783            v.append('0')
784        elif not v[-1].isnumeric():
785            v[-1] = '0'
786
787        v = tuple(map(int, v))
788        supports_wbits_0 = v >= (1, 2, 3, 5)
789
790        co = zlib.compressobj(level=1, wbits=15)
791        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
792        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
793        if supports_wbits_0:
794            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
795        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
796        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
797            zlib.decompress(zlib15, 14)
798        dco = zlib.decompressobj(wbits=32 + 15)
799        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
800        dco = zlib.decompressobj(wbits=14)
801        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
802            dco.decompress(zlib15)
803
804        co = zlib.compressobj(level=1, wbits=9)
805        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
806        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
807        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
808        if supports_wbits_0:
809            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
810        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
811        dco = zlib.decompressobj(wbits=32 + 9)
812        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
813
814        co = zlib.compressobj(level=1, wbits=-15)
815        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
816        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
817        dco = zlib.decompressobj(wbits=-15)
818        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
819
820        co = zlib.compressobj(level=1, wbits=-9)
821        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
822        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
823        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
824        dco = zlib.decompressobj(wbits=-9)
825        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
826
827        co = zlib.compressobj(level=1, wbits=16 + 15)
828        gzip = co.compress(HAMLET_SCENE) + co.flush()
829        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
830        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
831        dco = zlib.decompressobj(32 + 15)
832        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
833
834
835def choose_lines(source, number, seed=None, generator=random):
836    """Return a list of number lines randomly chosen from the source"""
837    if seed is not None:
838        generator.seed(seed)
839    sources = source.split('\n')
840    return [generator.choice(sources) for n in range(number)]
841
842
843HAMLET_SCENE = b"""
844LAERTES
845
846       O, fear me not.
847       I stay too long: but here my father comes.
848
849       Enter POLONIUS
850
851       A double blessing is a double grace,
852       Occasion smiles upon a second leave.
853
854LORD POLONIUS
855
856       Yet here, Laertes! aboard, aboard, for shame!
857       The wind sits in the shoulder of your sail,
858       And you are stay'd for. There; my blessing with thee!
859       And these few precepts in thy memory
860       See thou character. Give thy thoughts no tongue,
861       Nor any unproportioned thought his act.
862       Be thou familiar, but by no means vulgar.
863       Those friends thou hast, and their adoption tried,
864       Grapple them to thy soul with hoops of steel;
865       But do not dull thy palm with entertainment
866       Of each new-hatch'd, unfledged comrade. Beware
867       Of entrance to a quarrel, but being in,
868       Bear't that the opposed may beware of thee.
869       Give every man thy ear, but few thy voice;
870       Take each man's censure, but reserve thy judgment.
871       Costly thy habit as thy purse can buy,
872       But not express'd in fancy; rich, not gaudy;
873       For the apparel oft proclaims the man,
874       And they in France of the best rank and station
875       Are of a most select and generous chief in that.
876       Neither a borrower nor a lender be;
877       For loan oft loses both itself and friend,
878       And borrowing dulls the edge of husbandry.
879       This above all: to thine ownself be true,
880       And it must follow, as the night the day,
881       Thou canst not then be false to any man.
882       Farewell: my blessing season this in thee!
883
884LAERTES
885
886       Most humbly do I take my leave, my lord.
887
888LORD POLONIUS
889
890       The time invites you; go; your servants tend.
891
892LAERTES
893
894       Farewell, Ophelia; and remember well
895       What I have said to you.
896
897OPHELIA
898
899       'Tis in my memory lock'd,
900       And you yourself shall keep the key of it.
901
902LAERTES
903
904       Farewell.
905"""
906
907
908class CustomInt:
909    def __index__(self):
910        return 100
911
912
913if __name__ == "__main__":
914    unittest.main()
915