1"""Test script for the gzip module. 2""" 3 4import unittest 5from test import test_support 6import os 7import io 8import struct 9import tempfile 10gzip = test_support.import_module('gzip') 11 12data1 = """ int length=DEFAULTALLOC, err = Z_OK; 13 PyObject *RetVal; 14 int flushmode = Z_FINISH; 15 unsigned long start_total_out; 16 17""" 18 19data2 = """/* zlibmodule.c -- gzip-compatible data compression */ 20/* See http://www.gzip.org/zlib/ 21/* See http://www.winimage.com/zLibDll for Windows */ 22""" 23 24 25class TestGzip(unittest.TestCase): 26 filename = test_support.TESTFN 27 28 def setUp(self): 29 test_support.unlink(self.filename) 30 31 def tearDown(self): 32 test_support.unlink(self.filename) 33 34 def write_and_read_back(self, data, mode='b'): 35 b_data = memoryview(data).tobytes() 36 with gzip.GzipFile(self.filename, 'w'+mode) as f: 37 l = f.write(data) 38 self.assertEqual(l, len(b_data)) 39 with gzip.GzipFile(self.filename, 'r'+mode) as f: 40 self.assertEqual(f.read(), b_data) 41 42 @test_support.requires_unicode 43 def test_unicode_filename(self): 44 unicode_filename = test_support.TESTFN_UNICODE 45 try: 46 unicode_filename.encode(test_support.TESTFN_ENCODING) 47 except (UnicodeError, TypeError): 48 self.skipTest("Requires unicode filenames support") 49 self.filename = unicode_filename 50 with gzip.GzipFile(unicode_filename, "wb") as f: 51 f.write(data1 * 50) 52 with gzip.GzipFile(unicode_filename, "rb") as f: 53 self.assertEqual(f.read(), data1 * 50) 54 # Sanity check that we are actually operating on the right file. 55 with open(unicode_filename, 'rb') as fobj, \ 56 gzip.GzipFile(fileobj=fobj, mode="rb") as f: 57 self.assertEqual(f.read(), data1 * 50) 58 59 def test_write(self): 60 with gzip.GzipFile(self.filename, 'wb') as f: 61 f.write(data1 * 50) 62 63 # Try flush and fileno. 64 f.flush() 65 f.fileno() 66 if hasattr(os, 'fsync'): 67 os.fsync(f.fileno()) 68 f.close() 69 70 # Test multiple close() calls. 71 f.close() 72 73 # The following test_write_xy methods test that write accepts 74 # the corresponding bytes-like object type as input 75 # and that the data written equals bytes(xy) in all cases. 76 def test_write_memoryview(self): 77 self.write_and_read_back(memoryview(data1 * 50)) 78 79 def test_write_incompatible_type(self): 80 # Test that non-bytes-like types raise TypeError. 81 # Issue #21560: attempts to write incompatible types 82 # should not affect the state of the fileobject 83 with gzip.GzipFile(self.filename, 'wb') as f: 84 with self.assertRaises(UnicodeEncodeError): 85 f.write(u'\xff') 86 with self.assertRaises(TypeError): 87 f.write([1]) 88 f.write(data1) 89 with gzip.GzipFile(self.filename, 'rb') as f: 90 self.assertEqual(f.read(), data1) 91 92 def test_read(self): 93 self.test_write() 94 # Try reading. 95 with gzip.GzipFile(self.filename, 'r') as f: 96 d = f.read() 97 self.assertEqual(d, data1*50) 98 99 def test_read_universal_newlines(self): 100 # Issue #5148: Reading breaks when mode contains 'U'. 101 self.test_write() 102 with gzip.GzipFile(self.filename, 'rU') as f: 103 d = f.read() 104 self.assertEqual(d, data1*50) 105 106 def test_io_on_closed_object(self): 107 # Test that I/O operations on closed GzipFile objects raise a 108 # ValueError, just like the corresponding functions on file objects. 109 110 # Write to a file, open it for reading, then close it. 111 self.test_write() 112 f = gzip.GzipFile(self.filename, 'r') 113 f.close() 114 with self.assertRaises(ValueError): 115 f.read(1) 116 with self.assertRaises(ValueError): 117 f.seek(0) 118 with self.assertRaises(ValueError): 119 f.tell() 120 # Open the file for writing, then close it. 121 f = gzip.GzipFile(self.filename, 'w') 122 f.close() 123 with self.assertRaises(ValueError): 124 f.write('') 125 with self.assertRaises(ValueError): 126 f.flush() 127 128 def test_append(self): 129 self.test_write() 130 # Append to the previous file 131 with gzip.GzipFile(self.filename, 'ab') as f: 132 f.write(data2 * 15) 133 134 with gzip.GzipFile(self.filename, 'rb') as f: 135 d = f.read() 136 self.assertEqual(d, (data1*50) + (data2*15)) 137 138 def test_many_append(self): 139 # Bug #1074261 was triggered when reading a file that contained 140 # many, many members. Create such a file and verify that reading it 141 # works. 142 with gzip.open(self.filename, 'wb', 9) as f: 143 f.write('a') 144 for i in range(0, 200): 145 with gzip.open(self.filename, "ab", 9) as f: # append 146 f.write('a') 147 148 # Try reading the file 149 with gzip.open(self.filename, "rb") as zgfile: 150 contents = "" 151 while 1: 152 ztxt = zgfile.read(8192) 153 contents += ztxt 154 if not ztxt: break 155 self.assertEqual(contents, 'a'*201) 156 157 def test_buffered_reader(self): 158 # Issue #7471: a GzipFile can be wrapped in a BufferedReader for 159 # performance. 160 self.test_write() 161 162 with gzip.GzipFile(self.filename, 'rb') as f: 163 with io.BufferedReader(f) as r: 164 lines = [line for line in r] 165 166 self.assertEqual(lines, 50 * data1.splitlines(True)) 167 168 def test_readline(self): 169 self.test_write() 170 # Try .readline() with varying line lengths 171 172 with gzip.GzipFile(self.filename, 'rb') as f: 173 line_length = 0 174 while 1: 175 L = f.readline(line_length) 176 if not L and line_length != 0: break 177 self.assertTrue(len(L) <= line_length) 178 line_length = (line_length + 1) % 50 179 180 def test_readlines(self): 181 self.test_write() 182 # Try .readlines() 183 184 with gzip.GzipFile(self.filename, 'rb') as f: 185 L = f.readlines() 186 187 with gzip.GzipFile(self.filename, 'rb') as f: 188 while 1: 189 L = f.readlines(150) 190 if L == []: break 191 192 def test_seek_read(self): 193 self.test_write() 194 # Try seek, read test 195 196 with gzip.GzipFile(self.filename) as f: 197 while 1: 198 oldpos = f.tell() 199 line1 = f.readline() 200 if not line1: break 201 newpos = f.tell() 202 f.seek(oldpos) # negative seek 203 if len(line1)>10: 204 amount = 10 205 else: 206 amount = len(line1) 207 line2 = f.read(amount) 208 self.assertEqual(line1[:amount], line2) 209 f.seek(newpos) # positive seek 210 211 def test_seek_whence(self): 212 self.test_write() 213 # Try seek(whence=1), read test 214 215 with gzip.GzipFile(self.filename) as f: 216 f.read(10) 217 f.seek(10, whence=1) 218 y = f.read(10) 219 self.assertEqual(y, data1[20:30]) 220 221 def test_seek_write(self): 222 # Try seek, write test 223 with gzip.GzipFile(self.filename, 'w') as f: 224 for pos in range(0, 256, 16): 225 f.seek(pos) 226 f.write('GZ\n') 227 228 def test_mode(self): 229 self.test_write() 230 with gzip.GzipFile(self.filename, 'r') as f: 231 self.assertEqual(f.myfileobj.mode, 'rb') 232 233 def test_1647484(self): 234 for mode in ('wb', 'rb'): 235 with gzip.GzipFile(self.filename, mode) as f: 236 self.assertTrue(hasattr(f, "name")) 237 self.assertEqual(f.name, self.filename) 238 239 def test_mtime(self): 240 mtime = 123456789 241 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 242 fWrite.write(data1) 243 with gzip.GzipFile(self.filename) as fRead: 244 dataRead = fRead.read() 245 self.assertEqual(dataRead, data1) 246 self.assertTrue(hasattr(fRead, 'mtime')) 247 self.assertEqual(fRead.mtime, mtime) 248 249 def test_metadata(self): 250 mtime = 123456789 251 252 with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite: 253 fWrite.write(data1) 254 255 with open(self.filename, 'rb') as fRead: 256 # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html 257 258 idBytes = fRead.read(2) 259 self.assertEqual(idBytes, '\x1f\x8b') # gzip ID 260 261 cmByte = fRead.read(1) 262 self.assertEqual(cmByte, '\x08') # deflate 263 264 flagsByte = fRead.read(1) 265 self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set 266 267 mtimeBytes = fRead.read(4) 268 self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian 269 270 xflByte = fRead.read(1) 271 self.assertEqual(xflByte, '\x02') # maximum compression 272 273 osByte = fRead.read(1) 274 self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent) 275 276 # Since the FNAME flag is set, the zero-terminated filename follows. 277 # RFC 1952 specifies that this is the name of the input file, if any. 278 # However, the gzip module defaults to storing the name of the output 279 # file in this field. 280 expected = self.filename.encode('Latin-1') + '\x00' 281 nameBytes = fRead.read(len(expected)) 282 self.assertEqual(nameBytes, expected) 283 284 # Since no other flags were set, the header ends here. 285 # Rather than process the compressed data, let's seek to the trailer. 286 fRead.seek(os.stat(self.filename).st_size - 8) 287 288 crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1] 289 self.assertEqual(crc32Bytes, '\xaf\xd7d\x83') 290 291 isizeBytes = fRead.read(4) 292 self.assertEqual(isizeBytes, struct.pack('<i', len(data1))) 293 294 def test_with_open(self): 295 # GzipFile supports the context management protocol 296 with gzip.GzipFile(self.filename, "wb") as f: 297 f.write(b"xxx") 298 f = gzip.GzipFile(self.filename, "rb") 299 f.close() 300 try: 301 with f: 302 pass 303 except ValueError: 304 pass 305 else: 306 self.fail("__enter__ on a closed file didn't raise an exception") 307 try: 308 with gzip.GzipFile(self.filename, "wb") as f: 309 1 // 0 310 except ZeroDivisionError: 311 pass 312 else: 313 self.fail("1 // 0 didn't raise an exception") 314 315 def test_zero_padded_file(self): 316 with gzip.GzipFile(self.filename, "wb") as f: 317 f.write(data1 * 50) 318 319 # Pad the file with zeroes 320 with open(self.filename, "ab") as f: 321 f.write("\x00" * 50) 322 323 with gzip.GzipFile(self.filename, "rb") as f: 324 d = f.read() 325 self.assertEqual(d, data1 * 50, "Incorrect data in file") 326 327 def test_fileobj_from_fdopen(self): 328 # Issue #13781: Creating a GzipFile using a fileobj from os.fdopen() 329 # should not embed the fake filename "<fdopen>" in the output file. 330 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT) 331 with os.fdopen(fd, "wb") as f: 332 with gzip.GzipFile(fileobj=f, mode="w") as g: 333 self.assertEqual(g.name, "") 334 335 def test_fileobj_from_io_open(self): 336 fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT) 337 with io.open(fd, "wb") as f: 338 with gzip.GzipFile(fileobj=f, mode="w") as g: 339 self.assertEqual(g.name, "") 340 341 def test_fileobj_mode(self): 342 gzip.GzipFile(self.filename, "wb").close() 343 with open(self.filename, "r+b") as f: 344 with gzip.GzipFile(fileobj=f, mode='r') as g: 345 self.assertEqual(g.mode, gzip.READ) 346 with gzip.GzipFile(fileobj=f, mode='w') as g: 347 self.assertEqual(g.mode, gzip.WRITE) 348 with gzip.GzipFile(fileobj=f, mode='a') as g: 349 self.assertEqual(g.mode, gzip.WRITE) 350 with self.assertRaises(IOError): 351 gzip.GzipFile(fileobj=f, mode='z') 352 for mode in "rb", "r+b": 353 with open(self.filename, mode) as f: 354 with gzip.GzipFile(fileobj=f) as g: 355 self.assertEqual(g.mode, gzip.READ) 356 for mode in "wb", "ab": 357 with open(self.filename, mode) as f: 358 with gzip.GzipFile(fileobj=f) as g: 359 self.assertEqual(g.mode, gzip.WRITE) 360 361 def test_read_with_extra(self): 362 # Gzip data with an extra field 363 gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff' 364 b'\x05\x00Extra' 365 b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00') 366 with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f: 367 self.assertEqual(f.read(), b'Test') 368 369 def test_fileobj_without_name(self): 370 # Issue #33038: GzipFile should not assume that file objects that have 371 # a .name attribute use a non-None value. 372 with tempfile.SpooledTemporaryFile() as f: 373 with gzip.GzipFile(fileobj=f, mode='wb') as archive: 374 archive.write(b'data') 375 self.assertEqual(archive.name, '') 376 377def test_main(verbose=None): 378 test_support.run_unittest(TestGzip) 379 380if __name__ == "__main__": 381 test_main(verbose=True) 382