• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.support import import_helper
4import binascii
5import copy
6import pickle
7import random
8import sys
9from test.support import bigmemtest, _1G, _4G, is_s390x
10
11
12zlib = import_helper.import_module('zlib')
13
14requires_Compress_copy = unittest.skipUnless(
15        hasattr(zlib.compressobj(), "copy"),
16        'requires Compress.copy()')
17requires_Decompress_copy = unittest.skipUnless(
18        hasattr(zlib.decompressobj(), "copy"),
19        'requires Decompress.copy()')
20
21
22def _zlib_runtime_version_tuple(zlib_version=zlib.ZLIB_RUNTIME_VERSION):
23    # Register "1.2.3" as "1.2.3.0"
24    # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
25    v = zlib_version.split('-', 1)[0].split('.')
26    if len(v) < 4:
27        v.append('0')
28    elif not v[-1].isnumeric():
29        v[-1] = '0'
30    return tuple(map(int, v))
31
32
33ZLIB_RUNTIME_VERSION_TUPLE = _zlib_runtime_version_tuple()
34
35
36# bpo-46623: When a hardware accelerator is used (currently only on s390x),
37# using different ways to compress data with zlib can produce different
38# compressed data.
39# Simplified test_pair() code:
40#
41#   def func1(data):
42#       return zlib.compress(data)
43#
44#   def func2(data)
45#       co = zlib.compressobj()
46#       x1 = co.compress(data)
47#       x2 = co.flush()
48#       return x1 + x2
49#
50# On s390x if zlib uses a hardware accelerator, func1() creates a single
51# "final" compressed block whereas func2() produces 3 compressed blocks (the
52# last one is a final block). On other platforms with no accelerator, func1()
53# and func2() produce the same compressed data made of a single (final)
54# compressed block.
55#
56# Only the compressed data is different, the decompression returns the original
57# data:
58#
59#   zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data
60#
61# To simplify the skip condition, make the assumption that s390x always has an
62# accelerator, and nothing else has it.
63HW_ACCELERATED = is_s390x
64
65
66class VersionTestCase(unittest.TestCase):
67
68    def test_library_version(self):
69        # Test that the major version of the actual library in use matches the
70        # major version that we were compiled against. We can't guarantee that
71        # the minor versions will match (even on the machine on which the module
72        # was compiled), and the API is stable between minor versions, so
73        # testing only the major versions avoids spurious failures.
74        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
75
76
77class ChecksumTestCase(unittest.TestCase):
78    # checksum test cases
79    def test_crc32start(self):
80        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
81        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
82
83    def test_crc32empty(self):
84        self.assertEqual(zlib.crc32(b"", 0), 0)
85        self.assertEqual(zlib.crc32(b"", 1), 1)
86        self.assertEqual(zlib.crc32(b"", 432), 432)
87
88    def test_adler32start(self):
89        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
90        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
91
92    def test_adler32empty(self):
93        self.assertEqual(zlib.adler32(b"", 0), 0)
94        self.assertEqual(zlib.adler32(b"", 1), 1)
95        self.assertEqual(zlib.adler32(b"", 432), 432)
96
97    def test_penguins(self):
98        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
99        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
100        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
101        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
102
103        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
104        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
105
106    def test_crc32_adler32_unsigned(self):
107        foo = b'abcdefghijklmnop'
108        # explicitly test signed behavior
109        self.assertEqual(zlib.crc32(foo), 2486878355)
110        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
111        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
112        self.assertEqual(zlib.adler32(b'spam'), 72286642)
113
114    def test_same_as_binascii_crc32(self):
115        foo = b'abcdefghijklmnop'
116        crc = 2486878355
117        self.assertEqual(binascii.crc32(foo), crc)
118        self.assertEqual(zlib.crc32(foo), crc)
119        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
120
121
122# Issue #10276 - check that inputs >=4 GiB are handled correctly.
123class ChecksumBigBufferTestCase(unittest.TestCase):
124
125    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
126    def test_big_buffer(self, size):
127        data = b"nyan" * (_1G + 1)
128        self.assertEqual(zlib.crc32(data), 1044521549)
129        self.assertEqual(zlib.adler32(data), 2256789997)
130
131
132class ExceptionTestCase(unittest.TestCase):
133    # make sure we generate some expected errors
134    def test_badlevel(self):
135        # specifying compression level out of range causes an error
136        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
137        # accepts 0 too)
138        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
139
140    def test_badargs(self):
141        self.assertRaises(TypeError, zlib.adler32)
142        self.assertRaises(TypeError, zlib.crc32)
143        self.assertRaises(TypeError, zlib.compress)
144        self.assertRaises(TypeError, zlib.decompress)
145        for arg in (42, None, '', 'abc', (), []):
146            self.assertRaises(TypeError, zlib.adler32, arg)
147            self.assertRaises(TypeError, zlib.crc32, arg)
148            self.assertRaises(TypeError, zlib.compress, arg)
149            self.assertRaises(TypeError, zlib.decompress, arg)
150
151    def test_badcompressobj(self):
152        # verify failure on building compress object with bad params
153        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
154        # specifying total bits too large causes an error
155        self.assertRaises(ValueError,
156                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
157
158    def test_baddecompressobj(self):
159        # verify failure on building decompress object with bad params
160        self.assertRaises(ValueError, zlib.decompressobj, -1)
161
162    def test_decompressobj_badflush(self):
163        # verify failure on calling decompressobj.flush with bad params
164        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
165        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
166
167    @support.cpython_only
168    def test_overflow(self):
169        with self.assertRaisesRegex(OverflowError, 'int too large'):
170            zlib.decompress(b'', 15, sys.maxsize + 1)
171        with self.assertRaisesRegex(OverflowError, 'int too large'):
172            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
173        with self.assertRaisesRegex(OverflowError, 'int too large'):
174            zlib.decompressobj().flush(sys.maxsize + 1)
175
176    @support.cpython_only
177    def test_disallow_instantiation(self):
178        # Ensure that the type disallows instantiation (bpo-43916)
179        support.check_disallow_instantiation(self, type(zlib.compressobj()))
180        support.check_disallow_instantiation(self, type(zlib.decompressobj()))
181
182
183class BaseCompressTestCase(object):
184    def check_big_compress_buffer(self, size, compress_func):
185        _1M = 1024 * 1024
186        # Generate 10 MiB worth of random, and expand it by repeating it.
187        # The assumption is that zlib's memory is not big enough to exploit
188        # such spread out redundancy.
189        data = random.randbytes(_1M * 10)
190        data = data * (size // len(data) + 1)
191        try:
192            compress_func(data)
193        finally:
194            # Release memory
195            data = None
196
197    def check_big_decompress_buffer(self, size, decompress_func):
198        data = b'x' * size
199        try:
200            compressed = zlib.compress(data, 1)
201        finally:
202            # Release memory
203            data = None
204        data = decompress_func(compressed)
205        # Sanity check
206        try:
207            self.assertEqual(len(data), size)
208            self.assertEqual(len(data.strip(b'x')), 0)
209        finally:
210            data = None
211
212
213class CompressTestCase(BaseCompressTestCase, unittest.TestCase):
214    # Test compression in one go (whole message compression)
215    def test_speech(self):
216        x = zlib.compress(HAMLET_SCENE)
217        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
218
219    def test_keywords(self):
220        x = zlib.compress(HAMLET_SCENE, level=3)
221        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
222        with self.assertRaises(TypeError):
223            zlib.compress(data=HAMLET_SCENE, level=3)
224        self.assertEqual(zlib.decompress(x,
225                                         wbits=zlib.MAX_WBITS,
226                                         bufsize=zlib.DEF_BUF_SIZE),
227                         HAMLET_SCENE)
228
229    def test_speech128(self):
230        # compress more data
231        data = HAMLET_SCENE * 128
232        x = zlib.compress(data)
233        # With hardware acceleration, the compressed bytes
234        # might not be identical.
235        if not HW_ACCELERATED:
236            self.assertEqual(zlib.compress(bytearray(data)), x)
237        for ob in x, bytearray(x):
238            self.assertEqual(zlib.decompress(ob), data)
239
240    def test_incomplete_stream(self):
241        # A useful error message is given
242        x = zlib.compress(HAMLET_SCENE)
243        self.assertRaisesRegex(zlib.error,
244            "Error -5 while decompressing data: incomplete or truncated stream",
245            zlib.decompress, x[:-1])
246
247    # Memory use of the following functions takes into account overallocation
248
249    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
250    def test_big_compress_buffer(self, size):
251        compress = lambda s: zlib.compress(s, 1)
252        self.check_big_compress_buffer(size, compress)
253
254    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
255    def test_big_decompress_buffer(self, size):
256        self.check_big_decompress_buffer(size, zlib.decompress)
257
258    @bigmemtest(size=_4G, memuse=1)
259    def test_large_bufsize(self, size):
260        # Test decompress(bufsize) parameter greater than the internal limit
261        data = HAMLET_SCENE * 10
262        compressed = zlib.compress(data, 1)
263        self.assertEqual(zlib.decompress(compressed, 15, size), data)
264
265    def test_custom_bufsize(self):
266        data = HAMLET_SCENE * 10
267        compressed = zlib.compress(data, 1)
268        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
269
270    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
271    @bigmemtest(size=_4G + 100, memuse=4)
272    def test_64bit_compress(self, size):
273        data = b'x' * size
274        try:
275            comp = zlib.compress(data, 0)
276            self.assertEqual(zlib.decompress(comp), data)
277        finally:
278            comp = data = None
279
280
281class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
282    # Test compression object
283    def test_pair(self):
284        # straightforward compress/decompress objects
285        datasrc = HAMLET_SCENE * 128
286        datazip = zlib.compress(datasrc)
287        # should compress both bytes and bytearray data
288        for data in (datasrc, bytearray(datasrc)):
289            co = zlib.compressobj()
290            x1 = co.compress(data)
291            x2 = co.flush()
292            self.assertRaises(zlib.error, co.flush) # second flush should not work
293            # With hardware acceleration, the compressed bytes might not
294            # be identical.
295            if not HW_ACCELERATED:
296                self.assertEqual(x1 + x2, datazip)
297        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
298            dco = zlib.decompressobj()
299            y1 = dco.decompress(v1 + v2)
300            y2 = dco.flush()
301            self.assertEqual(data, y1 + y2)
302            self.assertIsInstance(dco.unconsumed_tail, bytes)
303            self.assertIsInstance(dco.unused_data, bytes)
304
305    def test_keywords(self):
306        level = 2
307        method = zlib.DEFLATED
308        wbits = -12
309        memLevel = 9
310        strategy = zlib.Z_FILTERED
311        co = zlib.compressobj(level=level,
312                              method=method,
313                              wbits=wbits,
314                              memLevel=memLevel,
315                              strategy=strategy,
316                              zdict=b"")
317        do = zlib.decompressobj(wbits=wbits, zdict=b"")
318        with self.assertRaises(TypeError):
319            co.compress(data=HAMLET_SCENE)
320        with self.assertRaises(TypeError):
321            do.decompress(data=zlib.compress(HAMLET_SCENE))
322        x = co.compress(HAMLET_SCENE) + co.flush()
323        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
324        self.assertEqual(HAMLET_SCENE, y)
325
326    def test_compressoptions(self):
327        # specify lots of options to compressobj()
328        level = 2
329        method = zlib.DEFLATED
330        wbits = -12
331        memLevel = 9
332        strategy = zlib.Z_FILTERED
333        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
334        x1 = co.compress(HAMLET_SCENE)
335        x2 = co.flush()
336        dco = zlib.decompressobj(wbits)
337        y1 = dco.decompress(x1 + x2)
338        y2 = dco.flush()
339        self.assertEqual(HAMLET_SCENE, y1 + y2)
340
341    def test_compressincremental(self):
342        # compress object in steps, decompress object as one-shot
343        data = HAMLET_SCENE * 128
344        co = zlib.compressobj()
345        bufs = []
346        for i in range(0, len(data), 256):
347            bufs.append(co.compress(data[i:i+256]))
348        bufs.append(co.flush())
349        combuf = b''.join(bufs)
350
351        dco = zlib.decompressobj()
352        y1 = dco.decompress(b''.join(bufs))
353        y2 = dco.flush()
354        self.assertEqual(data, y1 + y2)
355
356    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
357        # compress object in steps, decompress object in steps
358        source = source or HAMLET_SCENE
359        data = source * 128
360        co = zlib.compressobj()
361        bufs = []
362        for i in range(0, len(data), cx):
363            bufs.append(co.compress(data[i:i+cx]))
364        bufs.append(co.flush())
365        combuf = b''.join(bufs)
366
367        decombuf = zlib.decompress(combuf)
368        # Test type of return value
369        self.assertIsInstance(decombuf, bytes)
370
371        self.assertEqual(data, decombuf)
372
373        dco = zlib.decompressobj()
374        bufs = []
375        for i in range(0, len(combuf), dcx):
376            bufs.append(dco.decompress(combuf[i:i+dcx]))
377            self.assertEqual(b'', dco.unconsumed_tail, ########
378                             "(A) uct should be b'': not %d long" %
379                                       len(dco.unconsumed_tail))
380            self.assertEqual(b'', dco.unused_data)
381        if flush:
382            bufs.append(dco.flush())
383        else:
384            while True:
385                chunk = dco.decompress(b'')
386                if chunk:
387                    bufs.append(chunk)
388                else:
389                    break
390        self.assertEqual(b'', dco.unconsumed_tail, ########
391                         "(B) uct should be b'': not %d long" %
392                                       len(dco.unconsumed_tail))
393        self.assertEqual(b'', dco.unused_data)
394        self.assertEqual(data, b''.join(bufs))
395        # Failure means: "decompressobj with init options failed"
396
397    def test_decompincflush(self):
398        self.test_decompinc(flush=True)
399
400    def test_decompimax(self, source=None, cx=256, dcx=64):
401        # compress in steps, decompress in length-restricted steps
402        source = source or HAMLET_SCENE
403        # Check a decompression object with max_length specified
404        data = source * 128
405        co = zlib.compressobj()
406        bufs = []
407        for i in range(0, len(data), cx):
408            bufs.append(co.compress(data[i:i+cx]))
409        bufs.append(co.flush())
410        combuf = b''.join(bufs)
411        self.assertEqual(data, zlib.decompress(combuf),
412                         'compressed data failure')
413
414        dco = zlib.decompressobj()
415        bufs = []
416        cb = combuf
417        while cb:
418            #max_length = 1 + len(cb)//10
419            chunk = dco.decompress(cb, dcx)
420            self.assertFalse(len(chunk) > dcx,
421                    'chunk too big (%d>%d)' % (len(chunk), dcx))
422            bufs.append(chunk)
423            cb = dco.unconsumed_tail
424        bufs.append(dco.flush())
425        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
426
427    def test_decompressmaxlen(self, flush=False):
428        # Check a decompression object with max_length specified
429        data = HAMLET_SCENE * 128
430        co = zlib.compressobj()
431        bufs = []
432        for i in range(0, len(data), 256):
433            bufs.append(co.compress(data[i:i+256]))
434        bufs.append(co.flush())
435        combuf = b''.join(bufs)
436        self.assertEqual(data, zlib.decompress(combuf),
437                         'compressed data failure')
438
439        dco = zlib.decompressobj()
440        bufs = []
441        cb = combuf
442        while cb:
443            max_length = 1 + len(cb)//10
444            chunk = dco.decompress(cb, max_length)
445            self.assertFalse(len(chunk) > max_length,
446                        'chunk too big (%d>%d)' % (len(chunk),max_length))
447            bufs.append(chunk)
448            cb = dco.unconsumed_tail
449        if flush:
450            bufs.append(dco.flush())
451        else:
452            while chunk:
453                chunk = dco.decompress(b'', max_length)
454                self.assertFalse(len(chunk) > max_length,
455                            'chunk too big (%d>%d)' % (len(chunk),max_length))
456                bufs.append(chunk)
457        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
458
459    def test_decompressmaxlenflush(self):
460        self.test_decompressmaxlen(flush=True)
461
462    def test_maxlenmisc(self):
463        # Misc tests of max_length
464        dco = zlib.decompressobj()
465        self.assertRaises(ValueError, dco.decompress, b"", -1)
466        self.assertEqual(b'', dco.unconsumed_tail)
467
468    def test_maxlen_large(self):
469        # Sizes up to sys.maxsize should be accepted, although zlib is
470        # internally limited to expressing sizes with unsigned int
471        data = HAMLET_SCENE * 10
472        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
473        compressed = zlib.compress(data, 1)
474        dco = zlib.decompressobj()
475        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
476
477    def test_maxlen_custom(self):
478        data = HAMLET_SCENE * 10
479        compressed = zlib.compress(data, 1)
480        dco = zlib.decompressobj()
481        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
482
483    def test_clear_unconsumed_tail(self):
484        # Issue #12050: calling decompress() without providing max_length
485        # should clear the unconsumed_tail attribute.
486        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
487        dco = zlib.decompressobj()
488        ddata = dco.decompress(cdata, 1)
489        ddata += dco.decompress(dco.unconsumed_tail)
490        self.assertEqual(dco.unconsumed_tail, b"")
491
492    def test_flushes(self):
493        # Test flush() with the various options, using all the
494        # different levels in order to provide more variations.
495        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
496                    'Z_PARTIAL_FLUSH']
497
498        # Z_BLOCK has a known failure prior to 1.2.5.3
499        if ZLIB_RUNTIME_VERSION_TUPLE >= (1, 2, 5, 3):
500            sync_opt.append('Z_BLOCK')
501
502        sync_opt = [getattr(zlib, opt) for opt in sync_opt
503                    if hasattr(zlib, opt)]
504        data = HAMLET_SCENE * 8
505
506        for sync in sync_opt:
507            for level in range(10):
508                with self.subTest(sync=sync, level=level):
509                    obj = zlib.compressobj( level )
510                    a = obj.compress( data[:3000] )
511                    b = obj.flush( sync )
512                    c = obj.compress( data[3000:] )
513                    d = obj.flush()
514                    self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
515                                     data, ("Decompress failed: flush "
516                                            "mode=%i, level=%i") % (sync, level))
517                    del obj
518
519    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
520                         'requires zlib.Z_SYNC_FLUSH')
521    def test_odd_flush(self):
522        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
523        import random
524        # Testing on 17K of "random" data
525
526        # Create compressor and decompressor objects
527        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
528        dco = zlib.decompressobj()
529
530        # Try 17K of data
531        # generate random data stream
532        data = random.randbytes(17 * 1024)
533
534        # compress, sync-flush, and decompress
535        first = co.compress(data)
536        second = co.flush(zlib.Z_SYNC_FLUSH)
537        expanded = dco.decompress(first + second)
538
539        # if decompressed data is different from the input data, choke.
540        self.assertEqual(expanded, data, "17K random source doesn't match")
541
542    def test_empty_flush(self):
543        # Test that calling .flush() on unused objects works.
544        # (Bug #1083110 -- calling .flush() on decompress objects
545        # caused a core dump.)
546
547        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
548        self.assertTrue(co.flush())  # Returns a zlib header
549        dco = zlib.decompressobj()
550        self.assertEqual(dco.flush(), b"") # Returns nothing
551
552    def test_dictionary(self):
553        h = HAMLET_SCENE
554        # Build a simulated dictionary out of the words in HAMLET.
555        words = h.split()
556        random.shuffle(words)
557        zdict = b''.join(words)
558        # Use it to compress HAMLET.
559        co = zlib.compressobj(zdict=zdict)
560        cd = co.compress(h) + co.flush()
561        # Verify that it will decompress with the dictionary.
562        dco = zlib.decompressobj(zdict=zdict)
563        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
564        # Verify that it fails when not given the dictionary.
565        dco = zlib.decompressobj()
566        self.assertRaises(zlib.error, dco.decompress, cd)
567
568    def test_dictionary_streaming(self):
569        # This simulates the reuse of a compressor object for compressing
570        # several separate data streams.
571        co = zlib.compressobj(zdict=HAMLET_SCENE)
572        do = zlib.decompressobj(zdict=HAMLET_SCENE)
573        piece = HAMLET_SCENE[1000:1500]
574        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
575        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
576        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
577        self.assertEqual(do.decompress(d0), piece)
578        self.assertEqual(do.decompress(d1), piece[100:])
579        self.assertEqual(do.decompress(d2), piece[:-100])
580
581    def test_decompress_incomplete_stream(self):
582        # This is 'foo', deflated
583        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
584        # For the record
585        self.assertEqual(zlib.decompress(x), b'foo')
586        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
587        # Omitting the stream end works with decompressor objects
588        # (see issue #8672).
589        dco = zlib.decompressobj()
590        y = dco.decompress(x[:-5])
591        y += dco.flush()
592        self.assertEqual(y, b'foo')
593
594    def test_decompress_eof(self):
595        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
596        dco = zlib.decompressobj()
597        self.assertFalse(dco.eof)
598        dco.decompress(x[:-5])
599        self.assertFalse(dco.eof)
600        dco.decompress(x[-5:])
601        self.assertTrue(dco.eof)
602        dco.flush()
603        self.assertTrue(dco.eof)
604
605    def test_decompress_eof_incomplete_stream(self):
606        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
607        dco = zlib.decompressobj()
608        self.assertFalse(dco.eof)
609        dco.decompress(x[:-5])
610        self.assertFalse(dco.eof)
611        dco.flush()
612        self.assertFalse(dco.eof)
613
614    def test_decompress_unused_data(self):
615        # Repeated calls to decompress() after EOF should accumulate data in
616        # dco.unused_data, instead of just storing the arg to the last call.
617        source = b'abcdefghijklmnopqrstuvwxyz'
618        remainder = b'0123456789'
619        y = zlib.compress(source)
620        x = y + remainder
621        for maxlen in 0, 1000:
622            for step in 1, 2, len(y), len(x):
623                dco = zlib.decompressobj()
624                data = b''
625                for i in range(0, len(x), step):
626                    if i < len(y):
627                        self.assertEqual(dco.unused_data, b'')
628                    if maxlen == 0:
629                        data += dco.decompress(x[i : i + step])
630                        self.assertEqual(dco.unconsumed_tail, b'')
631                    else:
632                        data += dco.decompress(
633                                dco.unconsumed_tail + x[i : i + step], maxlen)
634                data += dco.flush()
635                self.assertTrue(dco.eof)
636                self.assertEqual(data, source)
637                self.assertEqual(dco.unconsumed_tail, b'')
638                self.assertEqual(dco.unused_data, remainder)
639
640    # issue27164
641    def test_decompress_raw_with_dictionary(self):
642        zdict = b'abcdefghijklmnopqrstuvwxyz'
643        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
644        comp = co.compress(zdict) + co.flush()
645        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
646        uncomp = dco.decompress(comp) + dco.flush()
647        self.assertEqual(zdict, uncomp)
648
649    def test_flush_with_freed_input(self):
650        # Issue #16411: decompressor accesses input to last decompress() call
651        # in flush(), even if this object has been freed in the meanwhile.
652        input1 = b'abcdefghijklmnopqrstuvwxyz'
653        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
654        data = zlib.compress(input1)
655        dco = zlib.decompressobj()
656        dco.decompress(data, 1)
657        del data
658        data = zlib.compress(input2)
659        self.assertEqual(dco.flush(), input1[1:])
660
661    @bigmemtest(size=_4G, memuse=1)
662    def test_flush_large_length(self, size):
663        # Test flush(length) parameter greater than internal limit UINT_MAX
664        input = HAMLET_SCENE * 10
665        data = zlib.compress(input, 1)
666        dco = zlib.decompressobj()
667        dco.decompress(data, 1)
668        self.assertEqual(dco.flush(size), input[1:])
669
670    def test_flush_custom_length(self):
671        input = HAMLET_SCENE * 10
672        data = zlib.compress(input, 1)
673        dco = zlib.decompressobj()
674        dco.decompress(data, 1)
675        self.assertEqual(dco.flush(CustomInt()), input[1:])
676
677    @requires_Compress_copy
678    def test_compresscopy(self):
679        # Test copying a compression object
680        data0 = HAMLET_SCENE
681        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
682        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
683            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
684            bufs0 = []
685            bufs0.append(c0.compress(data0))
686
687            c1 = func(c0)
688            bufs1 = bufs0[:]
689
690            bufs0.append(c0.compress(data0))
691            bufs0.append(c0.flush())
692            s0 = b''.join(bufs0)
693
694            bufs1.append(c1.compress(data1))
695            bufs1.append(c1.flush())
696            s1 = b''.join(bufs1)
697
698            self.assertEqual(zlib.decompress(s0),data0+data0)
699            self.assertEqual(zlib.decompress(s1),data0+data1)
700
701    @requires_Compress_copy
702    def test_badcompresscopy(self):
703        # Test copying a compression object in an inconsistent state
704        c = zlib.compressobj()
705        c.compress(HAMLET_SCENE)
706        c.flush()
707        self.assertRaises(ValueError, c.copy)
708        self.assertRaises(ValueError, copy.copy, c)
709        self.assertRaises(ValueError, copy.deepcopy, c)
710
711    @requires_Decompress_copy
712    def test_decompresscopy(self):
713        # Test copying a decompression object
714        data = HAMLET_SCENE
715        comp = zlib.compress(data)
716        # Test type of return value
717        self.assertIsInstance(comp, bytes)
718
719        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
720            d0 = zlib.decompressobj()
721            bufs0 = []
722            bufs0.append(d0.decompress(comp[:32]))
723
724            d1 = func(d0)
725            bufs1 = bufs0[:]
726
727            bufs0.append(d0.decompress(comp[32:]))
728            s0 = b''.join(bufs0)
729
730            bufs1.append(d1.decompress(comp[32:]))
731            s1 = b''.join(bufs1)
732
733            self.assertEqual(s0,s1)
734            self.assertEqual(s0,data)
735
736    @requires_Decompress_copy
737    def test_baddecompresscopy(self):
738        # Test copying a compression object in an inconsistent state
739        data = zlib.compress(HAMLET_SCENE)
740        d = zlib.decompressobj()
741        d.decompress(data)
742        d.flush()
743        self.assertRaises(ValueError, d.copy)
744        self.assertRaises(ValueError, copy.copy, d)
745        self.assertRaises(ValueError, copy.deepcopy, d)
746
747    def test_compresspickle(self):
748        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
749            with self.assertRaises((TypeError, pickle.PicklingError)):
750                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
751
752    def test_decompresspickle(self):
753        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
754            with self.assertRaises((TypeError, pickle.PicklingError)):
755                pickle.dumps(zlib.decompressobj(), proto)
756
757    # Memory use of the following functions takes into account overallocation
758
759    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
760    def test_big_compress_buffer(self, size):
761        c = zlib.compressobj(1)
762        compress = lambda s: c.compress(s) + c.flush()
763        self.check_big_compress_buffer(size, compress)
764
765    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
766    def test_big_decompress_buffer(self, size):
767        d = zlib.decompressobj()
768        decompress = lambda s: d.decompress(s) + d.flush()
769        self.check_big_decompress_buffer(size, decompress)
770
771    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
772    @bigmemtest(size=_4G + 100, memuse=4)
773    def test_64bit_compress(self, size):
774        data = b'x' * size
775        co = zlib.compressobj(0)
776        do = zlib.decompressobj()
777        try:
778            comp = co.compress(data) + co.flush()
779            uncomp = do.decompress(comp) + do.flush()
780            self.assertEqual(uncomp, data)
781        finally:
782            comp = uncomp = data = None
783
784    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
785    @bigmemtest(size=_4G + 100, memuse=3)
786    def test_large_unused_data(self, size):
787        data = b'abcdefghijklmnop'
788        unused = b'x' * size
789        comp = zlib.compress(data) + unused
790        do = zlib.decompressobj()
791        try:
792            uncomp = do.decompress(comp) + do.flush()
793            self.assertEqual(unused, do.unused_data)
794            self.assertEqual(uncomp, data)
795        finally:
796            unused = comp = do = None
797
798    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
799    @bigmemtest(size=_4G + 100, memuse=5)
800    def test_large_unconsumed_tail(self, size):
801        data = b'x' * size
802        do = zlib.decompressobj()
803        try:
804            comp = zlib.compress(data, 0)
805            uncomp = do.decompress(comp, 1) + do.flush()
806            self.assertEqual(uncomp, data)
807            self.assertEqual(do.unconsumed_tail, b'')
808        finally:
809            comp = uncomp = data = None
810
811    def test_wbits(self):
812        # wbits=0 only supported since zlib v1.2.3.5
813        supports_wbits_0 = ZLIB_RUNTIME_VERSION_TUPLE >= (1, 2, 3, 5)
814
815        co = zlib.compressobj(level=1, wbits=15)
816        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
817        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
818        if supports_wbits_0:
819            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
820        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
821        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
822            zlib.decompress(zlib15, 14)
823        dco = zlib.decompressobj(wbits=32 + 15)
824        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
825        dco = zlib.decompressobj(wbits=14)
826        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
827            dco.decompress(zlib15)
828
829        co = zlib.compressobj(level=1, wbits=9)
830        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
831        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
832        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
833        if supports_wbits_0:
834            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
835        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
836        dco = zlib.decompressobj(wbits=32 + 9)
837        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
838
839        co = zlib.compressobj(level=1, wbits=-15)
840        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
841        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
842        dco = zlib.decompressobj(wbits=-15)
843        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
844
845        co = zlib.compressobj(level=1, wbits=-9)
846        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
847        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
848        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
849        dco = zlib.decompressobj(wbits=-9)
850        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
851
852        co = zlib.compressobj(level=1, wbits=16 + 15)
853        gzip = co.compress(HAMLET_SCENE) + co.flush()
854        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
855        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
856        dco = zlib.decompressobj(32 + 15)
857        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
858
859        for wbits in (-15, 15, 31):
860            with self.subTest(wbits=wbits):
861                expected = HAMLET_SCENE
862                actual = zlib.decompress(
863                    zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits
864                )
865                self.assertEqual(expected, actual)
866
867def choose_lines(source, number, seed=None, generator=random):
868    """Return a list of number lines randomly chosen from the source"""
869    if seed is not None:
870        generator.seed(seed)
871    sources = source.split('\n')
872    return [generator.choice(sources) for n in range(number)]
873
874
875HAMLET_SCENE = b"""
876LAERTES
877
878       O, fear me not.
879       I stay too long: but here my father comes.
880
881       Enter POLONIUS
882
883       A double blessing is a double grace,
884       Occasion smiles upon a second leave.
885
886LORD POLONIUS
887
888       Yet here, Laertes! aboard, aboard, for shame!
889       The wind sits in the shoulder of your sail,
890       And you are stay'd for. There; my blessing with thee!
891       And these few precepts in thy memory
892       See thou character. Give thy thoughts no tongue,
893       Nor any unproportioned thought his act.
894       Be thou familiar, but by no means vulgar.
895       Those friends thou hast, and their adoption tried,
896       Grapple them to thy soul with hoops of steel;
897       But do not dull thy palm with entertainment
898       Of each new-hatch'd, unfledged comrade. Beware
899       Of entrance to a quarrel, but being in,
900       Bear't that the opposed may beware of thee.
901       Give every man thy ear, but few thy voice;
902       Take each man's censure, but reserve thy judgment.
903       Costly thy habit as thy purse can buy,
904       But not express'd in fancy; rich, not gaudy;
905       For the apparel oft proclaims the man,
906       And they in France of the best rank and station
907       Are of a most select and generous chief in that.
908       Neither a borrower nor a lender be;
909       For loan oft loses both itself and friend,
910       And borrowing dulls the edge of husbandry.
911       This above all: to thine ownself be true,
912       And it must follow, as the night the day,
913       Thou canst not then be false to any man.
914       Farewell: my blessing season this in thee!
915
916LAERTES
917
918       Most humbly do I take my leave, my lord.
919
920LORD POLONIUS
921
922       The time invites you; go; your servants tend.
923
924LAERTES
925
926       Farewell, Ophelia; and remember well
927       What I have said to you.
928
929OPHELIA
930
931       'Tis in my memory lock'd,
932       And you yourself shall keep the key of it.
933
934LAERTES
935
936       Farewell.
937"""
938
939
940class ZlibDecompressorTest(unittest.TestCase):
941    # Test adopted from test_bz2.py
942    TEXT = HAMLET_SCENE
943    DATA = zlib.compress(HAMLET_SCENE)
944    BAD_DATA = b"Not a valid deflate block"
945    BIG_TEXT = DATA * ((128 * 1024 // len(DATA)) + 1)
946    BIG_DATA = zlib.compress(BIG_TEXT)
947
948    def test_Constructor(self):
949        self.assertRaises(TypeError, zlib._ZlibDecompressor, "ASDA")
950        self.assertRaises(TypeError, zlib._ZlibDecompressor, -15, "notbytes")
951        self.assertRaises(TypeError, zlib._ZlibDecompressor, -15, b"bytes", 5)
952
953    def testDecompress(self):
954        zlibd = zlib._ZlibDecompressor()
955        self.assertRaises(TypeError, zlibd.decompress)
956        text = zlibd.decompress(self.DATA)
957        self.assertEqual(text, self.TEXT)
958
959    def testDecompressChunks10(self):
960        zlibd = zlib._ZlibDecompressor()
961        text = b''
962        n = 0
963        while True:
964            str = self.DATA[n*10:(n+1)*10]
965            if not str:
966                break
967            text += zlibd.decompress(str)
968            n += 1
969        self.assertEqual(text, self.TEXT)
970
971    def testDecompressUnusedData(self):
972        zlibd = zlib._ZlibDecompressor()
973        unused_data = b"this is unused data"
974        text = zlibd.decompress(self.DATA+unused_data)
975        self.assertEqual(text, self.TEXT)
976        self.assertEqual(zlibd.unused_data, unused_data)
977
978    def testEOFError(self):
979        zlibd = zlib._ZlibDecompressor()
980        text = zlibd.decompress(self.DATA)
981        self.assertRaises(EOFError, zlibd.decompress, b"anything")
982        self.assertRaises(EOFError, zlibd.decompress, b"")
983
984    @support.skip_if_pgo_task
985    @bigmemtest(size=_4G + 100, memuse=3.3)
986    def testDecompress4G(self, size):
987        # "Test zlib._ZlibDecompressor.decompress() with >4GiB input"
988        blocksize = min(10 * 1024 * 1024, size)
989        block = random.randbytes(blocksize)
990        try:
991            data = block * ((size-1) // blocksize + 1)
992            compressed = zlib.compress(data)
993            zlibd = zlib._ZlibDecompressor()
994            decompressed = zlibd.decompress(compressed)
995            self.assertTrue(decompressed == data)
996        finally:
997            data = None
998            compressed = None
999            decompressed = None
1000
1001    def testPickle(self):
1002        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1003            with self.assertRaises(TypeError):
1004                pickle.dumps(zlib._ZlibDecompressor(), proto)
1005
1006    def testDecompressorChunksMaxsize(self):
1007        zlibd = zlib._ZlibDecompressor()
1008        max_length = 100
1009        out = []
1010
1011        # Feed some input
1012        len_ = len(self.BIG_DATA) - 64
1013        out.append(zlibd.decompress(self.BIG_DATA[:len_],
1014                                  max_length=max_length))
1015        self.assertFalse(zlibd.needs_input)
1016        self.assertEqual(len(out[-1]), max_length)
1017
1018        # Retrieve more data without providing more input
1019        out.append(zlibd.decompress(b'', max_length=max_length))
1020        self.assertFalse(zlibd.needs_input)
1021        self.assertEqual(len(out[-1]), max_length)
1022
1023        # Retrieve more data while providing more input
1024        out.append(zlibd.decompress(self.BIG_DATA[len_:],
1025                                  max_length=max_length))
1026        self.assertLessEqual(len(out[-1]), max_length)
1027
1028        # Retrieve remaining uncompressed data
1029        while not zlibd.eof:
1030            out.append(zlibd.decompress(b'', max_length=max_length))
1031            self.assertLessEqual(len(out[-1]), max_length)
1032
1033        out = b"".join(out)
1034        self.assertEqual(out, self.BIG_TEXT)
1035        self.assertEqual(zlibd.unused_data, b"")
1036
1037    def test_decompressor_inputbuf_1(self):
1038        # Test reusing input buffer after moving existing
1039        # contents to beginning
1040        zlibd = zlib._ZlibDecompressor()
1041        out = []
1042
1043        # Create input buffer and fill it
1044        self.assertEqual(zlibd.decompress(self.DATA[:100],
1045                                        max_length=0), b'')
1046
1047        # Retrieve some results, freeing capacity at beginning
1048        # of input buffer
1049        out.append(zlibd.decompress(b'', 2))
1050
1051        # Add more data that fits into input buffer after
1052        # moving existing data to beginning
1053        out.append(zlibd.decompress(self.DATA[100:105], 15))
1054
1055        # Decompress rest of data
1056        out.append(zlibd.decompress(self.DATA[105:]))
1057        self.assertEqual(b''.join(out), self.TEXT)
1058
1059    def test_decompressor_inputbuf_2(self):
1060        # Test reusing input buffer by appending data at the
1061        # end right away
1062        zlibd = zlib._ZlibDecompressor()
1063        out = []
1064
1065        # Create input buffer and empty it
1066        self.assertEqual(zlibd.decompress(self.DATA[:200],
1067                                        max_length=0), b'')
1068        out.append(zlibd.decompress(b''))
1069
1070        # Fill buffer with new data
1071        out.append(zlibd.decompress(self.DATA[200:280], 2))
1072
1073        # Append some more data, not enough to require resize
1074        out.append(zlibd.decompress(self.DATA[280:300], 2))
1075
1076        # Decompress rest of data
1077        out.append(zlibd.decompress(self.DATA[300:]))
1078        self.assertEqual(b''.join(out), self.TEXT)
1079
1080    def test_decompressor_inputbuf_3(self):
1081        # Test reusing input buffer after extending it
1082
1083        zlibd = zlib._ZlibDecompressor()
1084        out = []
1085
1086        # Create almost full input buffer
1087        out.append(zlibd.decompress(self.DATA[:200], 5))
1088
1089        # Add even more data to it, requiring resize
1090        out.append(zlibd.decompress(self.DATA[200:300], 5))
1091
1092        # Decompress rest of data
1093        out.append(zlibd.decompress(self.DATA[300:]))
1094        self.assertEqual(b''.join(out), self.TEXT)
1095
1096    def test_failure(self):
1097        zlibd = zlib._ZlibDecompressor()
1098        self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
1099        # Previously, a second call could crash due to internal inconsistency
1100        self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
1101
1102    @support.refcount_test
1103    def test_refleaks_in___init__(self):
1104        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
1105        zlibd = zlib._ZlibDecompressor()
1106        refs_before = gettotalrefcount()
1107        for i in range(100):
1108            zlibd.__init__()
1109        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
1110
1111
1112class CustomInt:
1113    def __index__(self):
1114        return 100
1115
1116
1117if __name__ == "__main__":
1118    unittest.main()
1119