1# Copyright 2016 The Brotli Authors. All rights reserved. 2# 3# Distributed under MIT license. 4# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT 5 6import functools 7import unittest 8 9from . import _test_utils 10import brotli 11 12 13# Do not inherit from TestCase here to ensure that test methods 14# are not run automatically and instead are run as part of a specific 15# configuration below. 16class _TestCompressor(object): 17 18 CHUNK_SIZE = 2048 19 20 def tearDown(self): 21 self.compressor = None 22 23 def _check_decompression(self, test_data): 24 # Write decompression to temp file and verify it matches the original. 25 temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data) 26 temp_compressed = _test_utils.get_temp_compressed_name(test_data) 27 original = test_data 28 with open(temp_uncompressed, 'wb') as out_file: 29 with open(temp_compressed, 'rb') as in_file: 30 out_file.write(brotli.decompress(in_file.read())) 31 self.assertFilesMatch(temp_uncompressed, original) 32 33 def _test_single_process(self, test_data): 34 # Write single-shot compression to temp file. 35 temp_compressed = _test_utils.get_temp_compressed_name(test_data) 36 with open(temp_compressed, 'wb') as out_file: 37 with open(test_data, 'rb') as in_file: 38 out_file.write(self.compressor.process(in_file.read())) 39 out_file.write(self.compressor.finish()) 40 self._check_decompression(test_data) 41 42 def _test_multiple_process(self, test_data): 43 # Write chunked compression to temp file. 44 temp_compressed = _test_utils.get_temp_compressed_name(test_data) 45 with open(temp_compressed, 'wb') as out_file: 46 with open(test_data, 'rb') as in_file: 47 read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) 48 for data in iter(read_chunk, b''): 49 out_file.write(self.compressor.process(data)) 50 out_file.write(self.compressor.finish()) 51 self._check_decompression(test_data) 52 53 def _test_multiple_process_and_flush(self, test_data): 54 # Write chunked and flushed compression to temp file. 55 temp_compressed = _test_utils.get_temp_compressed_name(test_data) 56 with open(temp_compressed, 'wb') as out_file: 57 with open(test_data, 'rb') as in_file: 58 read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE) 59 for data in iter(read_chunk, b''): 60 out_file.write(self.compressor.process(data)) 61 out_file.write(self.compressor.flush()) 62 out_file.write(self.compressor.finish()) 63 self._check_decompression(test_data) 64 65 66_test_utils.generate_test_methods(_TestCompressor) 67 68 69class TestCompressorQuality1(_TestCompressor, _test_utils.TestCase): 70 71 def setUp(self): 72 self.compressor = brotli.Compressor(quality=1) 73 74 75class TestCompressorQuality6(_TestCompressor, _test_utils.TestCase): 76 77 def setUp(self): 78 self.compressor = brotli.Compressor(quality=6) 79 80 81class TestCompressorQuality9(_TestCompressor, _test_utils.TestCase): 82 83 def setUp(self): 84 self.compressor = brotli.Compressor(quality=9) 85 86 87class TestCompressorQuality11(_TestCompressor, _test_utils.TestCase): 88 89 def setUp(self): 90 self.compressor = brotli.Compressor(quality=11) 91 92 93if __name__ == '__main__': 94 unittest.main() 95