1import contextlib 2import importlib.util 3import io 4import itertools 5import os 6import pathlib 7import posixpath 8import string 9import struct 10import subprocess 11import sys 12import time 13import unittest 14import unittest.mock as mock 15import zipfile 16 17 18from tempfile import TemporaryFile 19from random import randint, random, getrandbits 20 21from test.support import script_helper 22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd, 23 requires_zlib, requires_bz2, requires_lzma, 24 captured_stdout) 25 26TESTFN2 = TESTFN + "2" 27TESTFNDIR = TESTFN + "d" 28FIXEDTEST_SIZE = 1000 29DATAFILES_DIR = 'zipfile_datafiles' 30 31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 32 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 33 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 34 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 35 36def getrandbytes(size): 37 return getrandbits(8 * size).to_bytes(size, 'little') 38 39def get_files(test): 40 yield TESTFN2 41 with TemporaryFile() as f: 42 yield f 43 test.assertFalse(f.closed) 44 with io.BytesIO() as f: 45 yield f 46 test.assertFalse(f.closed) 47 48class AbstractTestsWithSourceFile: 49 @classmethod 50 def setUpClass(cls): 51 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 52 (i, random()), "ascii") 53 for i in range(FIXEDTEST_SIZE)] 54 cls.data = b''.join(cls.line_gen) 55 56 def setUp(self): 57 # Make a source file with some lines 58 with open(TESTFN, "wb") as fp: 59 fp.write(self.data) 60 61 def make_test_archive(self, f, compression, compresslevel=None): 62 kwargs = {'compression': compression, 'compresslevel': compresslevel} 63 # Create the ZIP archive 64 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 65 zipfp.write(TESTFN, "another.name") 66 zipfp.write(TESTFN, TESTFN) 67 zipfp.writestr("strfile", self.data) 68 with zipfp.open('written-open-w', mode='w') as f: 69 for line in self.line_gen: 70 f.write(line) 71 72 def zip_test(self, f, compression, compresslevel=None): 73 self.make_test_archive(f, compression, compresslevel) 74 75 # Read the ZIP archive 76 with zipfile.ZipFile(f, "r", compression) as zipfp: 77 self.assertEqual(zipfp.read(TESTFN), self.data) 78 self.assertEqual(zipfp.read("another.name"), self.data) 79 self.assertEqual(zipfp.read("strfile"), self.data) 80 81 # Print the ZIP directory 82 fp = io.StringIO() 83 zipfp.printdir(file=fp) 84 directory = fp.getvalue() 85 lines = directory.splitlines() 86 self.assertEqual(len(lines), 5) # Number of files + header 87 88 self.assertIn('File Name', lines[0]) 89 self.assertIn('Modified', lines[0]) 90 self.assertIn('Size', lines[0]) 91 92 fn, date, time_, size = lines[1].split() 93 self.assertEqual(fn, 'another.name') 94 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 95 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 96 self.assertEqual(size, str(len(self.data))) 97 98 # Check the namelist 99 names = zipfp.namelist() 100 self.assertEqual(len(names), 4) 101 self.assertIn(TESTFN, names) 102 self.assertIn("another.name", names) 103 self.assertIn("strfile", names) 104 self.assertIn("written-open-w", names) 105 106 # Check infolist 107 infos = zipfp.infolist() 108 names = [i.filename for i in infos] 109 self.assertEqual(len(names), 4) 110 self.assertIn(TESTFN, names) 111 self.assertIn("another.name", names) 112 self.assertIn("strfile", names) 113 self.assertIn("written-open-w", names) 114 for i in infos: 115 self.assertEqual(i.file_size, len(self.data)) 116 117 # check getinfo 118 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 119 info = zipfp.getinfo(nm) 120 self.assertEqual(info.filename, nm) 121 self.assertEqual(info.file_size, len(self.data)) 122 123 # Check that testzip doesn't raise an exception 124 zipfp.testzip() 125 126 def test_basic(self): 127 for f in get_files(self): 128 self.zip_test(f, self.compression) 129 130 def zip_open_test(self, f, compression): 131 self.make_test_archive(f, compression) 132 133 # Read the ZIP archive 134 with zipfile.ZipFile(f, "r", compression) as zipfp: 135 zipdata1 = [] 136 with zipfp.open(TESTFN) as zipopen1: 137 while True: 138 read_data = zipopen1.read(256) 139 if not read_data: 140 break 141 zipdata1.append(read_data) 142 143 zipdata2 = [] 144 with zipfp.open("another.name") as zipopen2: 145 while True: 146 read_data = zipopen2.read(256) 147 if not read_data: 148 break 149 zipdata2.append(read_data) 150 151 self.assertEqual(b''.join(zipdata1), self.data) 152 self.assertEqual(b''.join(zipdata2), self.data) 153 154 def test_open(self): 155 for f in get_files(self): 156 self.zip_open_test(f, self.compression) 157 158 def test_open_with_pathlike(self): 159 path = pathlib.Path(TESTFN2) 160 self.zip_open_test(path, self.compression) 161 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 162 self.assertIsInstance(zipfp.filename, str) 163 164 def zip_random_open_test(self, f, compression): 165 self.make_test_archive(f, compression) 166 167 # Read the ZIP archive 168 with zipfile.ZipFile(f, "r", compression) as zipfp: 169 zipdata1 = [] 170 with zipfp.open(TESTFN) as zipopen1: 171 while True: 172 read_data = zipopen1.read(randint(1, 1024)) 173 if not read_data: 174 break 175 zipdata1.append(read_data) 176 177 self.assertEqual(b''.join(zipdata1), self.data) 178 179 def test_random_open(self): 180 for f in get_files(self): 181 self.zip_random_open_test(f, self.compression) 182 183 def zip_read1_test(self, f, compression): 184 self.make_test_archive(f, compression) 185 186 # Read the ZIP archive 187 with zipfile.ZipFile(f, "r") as zipfp, \ 188 zipfp.open(TESTFN) as zipopen: 189 zipdata = [] 190 while True: 191 read_data = zipopen.read1(-1) 192 if not read_data: 193 break 194 zipdata.append(read_data) 195 196 self.assertEqual(b''.join(zipdata), self.data) 197 198 def test_read1(self): 199 for f in get_files(self): 200 self.zip_read1_test(f, self.compression) 201 202 def zip_read1_10_test(self, f, compression): 203 self.make_test_archive(f, compression) 204 205 # Read the ZIP archive 206 with zipfile.ZipFile(f, "r") as zipfp, \ 207 zipfp.open(TESTFN) as zipopen: 208 zipdata = [] 209 while True: 210 read_data = zipopen.read1(10) 211 self.assertLessEqual(len(read_data), 10) 212 if not read_data: 213 break 214 zipdata.append(read_data) 215 216 self.assertEqual(b''.join(zipdata), self.data) 217 218 def test_read1_10(self): 219 for f in get_files(self): 220 self.zip_read1_10_test(f, self.compression) 221 222 def zip_readline_read_test(self, f, compression): 223 self.make_test_archive(f, compression) 224 225 # Read the ZIP archive 226 with zipfile.ZipFile(f, "r") as zipfp, \ 227 zipfp.open(TESTFN) as zipopen: 228 data = b'' 229 while True: 230 read = zipopen.readline() 231 if not read: 232 break 233 data += read 234 235 read = zipopen.read(100) 236 if not read: 237 break 238 data += read 239 240 self.assertEqual(data, self.data) 241 242 def test_readline_read(self): 243 # Issue #7610: calls to readline() interleaved with calls to read(). 244 for f in get_files(self): 245 self.zip_readline_read_test(f, self.compression) 246 247 def zip_readline_test(self, f, compression): 248 self.make_test_archive(f, compression) 249 250 # Read the ZIP archive 251 with zipfile.ZipFile(f, "r") as zipfp: 252 with zipfp.open(TESTFN) as zipopen: 253 for line in self.line_gen: 254 linedata = zipopen.readline() 255 self.assertEqual(linedata, line) 256 257 def test_readline(self): 258 for f in get_files(self): 259 self.zip_readline_test(f, self.compression) 260 261 def zip_readlines_test(self, f, compression): 262 self.make_test_archive(f, compression) 263 264 # Read the ZIP archive 265 with zipfile.ZipFile(f, "r") as zipfp: 266 with zipfp.open(TESTFN) as zipopen: 267 ziplines = zipopen.readlines() 268 for line, zipline in zip(self.line_gen, ziplines): 269 self.assertEqual(zipline, line) 270 271 def test_readlines(self): 272 for f in get_files(self): 273 self.zip_readlines_test(f, self.compression) 274 275 def zip_iterlines_test(self, f, compression): 276 self.make_test_archive(f, compression) 277 278 # Read the ZIP archive 279 with zipfile.ZipFile(f, "r") as zipfp: 280 with zipfp.open(TESTFN) as zipopen: 281 for line, zipline in zip(self.line_gen, zipopen): 282 self.assertEqual(zipline, line) 283 284 def test_iterlines(self): 285 for f in get_files(self): 286 self.zip_iterlines_test(f, self.compression) 287 288 def test_low_compression(self): 289 """Check for cases where compressed data is larger than original.""" 290 # Create the ZIP archive 291 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 292 zipfp.writestr("strfile", '12') 293 294 # Get an open object for strfile 295 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 296 with zipfp.open("strfile") as openobj: 297 self.assertEqual(openobj.read(1), b'1') 298 self.assertEqual(openobj.read(1), b'2') 299 300 def test_writestr_compression(self): 301 zipfp = zipfile.ZipFile(TESTFN2, "w") 302 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 303 info = zipfp.getinfo('b.txt') 304 self.assertEqual(info.compress_type, self.compression) 305 306 def test_writestr_compresslevel(self): 307 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 308 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 309 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 310 compresslevel=2) 311 312 # Compression level follows the constructor. 313 a_info = zipfp.getinfo('a.txt') 314 self.assertEqual(a_info.compress_type, self.compression) 315 self.assertEqual(a_info._compresslevel, 1) 316 317 # Compression level is overridden. 318 b_info = zipfp.getinfo('b.txt') 319 self.assertEqual(b_info.compress_type, self.compression) 320 self.assertEqual(b_info._compresslevel, 2) 321 322 def test_read_return_size(self): 323 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 324 # than requested. 325 for test_size in (1, 4095, 4096, 4097, 16384): 326 file_size = test_size + 1 327 junk = getrandbytes(file_size) 328 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 329 zipf.writestr('foo', junk) 330 with zipf.open('foo', 'r') as fp: 331 buf = fp.read(test_size) 332 self.assertEqual(len(buf), test_size) 333 334 def test_truncated_zipfile(self): 335 fp = io.BytesIO() 336 with zipfile.ZipFile(fp, mode='w') as zipf: 337 zipf.writestr('strfile', self.data, compress_type=self.compression) 338 end_offset = fp.tell() 339 zipfiledata = fp.getvalue() 340 341 fp = io.BytesIO(zipfiledata) 342 with zipfile.ZipFile(fp) as zipf: 343 with zipf.open('strfile') as zipopen: 344 fp.truncate(end_offset - 20) 345 with self.assertRaises(EOFError): 346 zipopen.read() 347 348 fp = io.BytesIO(zipfiledata) 349 with zipfile.ZipFile(fp) as zipf: 350 with zipf.open('strfile') as zipopen: 351 fp.truncate(end_offset - 20) 352 with self.assertRaises(EOFError): 353 while zipopen.read(100): 354 pass 355 356 fp = io.BytesIO(zipfiledata) 357 with zipfile.ZipFile(fp) as zipf: 358 with zipf.open('strfile') as zipopen: 359 fp.truncate(end_offset - 20) 360 with self.assertRaises(EOFError): 361 while zipopen.read1(100): 362 pass 363 364 def test_repr(self): 365 fname = 'file.name' 366 for f in get_files(self): 367 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 368 zipfp.write(TESTFN, fname) 369 r = repr(zipfp) 370 self.assertIn("mode='w'", r) 371 372 with zipfile.ZipFile(f, 'r') as zipfp: 373 r = repr(zipfp) 374 if isinstance(f, str): 375 self.assertIn('filename=%r' % f, r) 376 else: 377 self.assertIn('file=%r' % f, r) 378 self.assertIn("mode='r'", r) 379 r = repr(zipfp.getinfo(fname)) 380 self.assertIn('filename=%r' % fname, r) 381 self.assertIn('filemode=', r) 382 self.assertIn('file_size=', r) 383 if self.compression != zipfile.ZIP_STORED: 384 self.assertIn('compress_type=', r) 385 self.assertIn('compress_size=', r) 386 with zipfp.open(fname) as zipopen: 387 r = repr(zipopen) 388 self.assertIn('name=%r' % fname, r) 389 self.assertIn("mode='r'", r) 390 if self.compression != zipfile.ZIP_STORED: 391 self.assertIn('compress_type=', r) 392 self.assertIn('[closed]', repr(zipopen)) 393 self.assertIn('[closed]', repr(zipfp)) 394 395 def test_compresslevel_basic(self): 396 for f in get_files(self): 397 self.zip_test(f, self.compression, compresslevel=9) 398 399 def test_per_file_compresslevel(self): 400 """Check that files within a Zip archive can have different 401 compression levels.""" 402 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 403 zipfp.write(TESTFN, 'compress_1') 404 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 405 one_info = zipfp.getinfo('compress_1') 406 nine_info = zipfp.getinfo('compress_9') 407 self.assertEqual(one_info._compresslevel, 1) 408 self.assertEqual(nine_info._compresslevel, 9) 409 410 def test_writing_errors(self): 411 class BrokenFile(io.BytesIO): 412 def write(self, data): 413 nonlocal count 414 if count is not None: 415 if count == stop: 416 raise OSError 417 count += 1 418 super().write(data) 419 420 stop = 0 421 while True: 422 testfile = BrokenFile() 423 count = None 424 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 425 with zipfp.open('file1', 'w') as f: 426 f.write(b'data1') 427 count = 0 428 try: 429 with zipfp.open('file2', 'w') as f: 430 f.write(b'data2') 431 except OSError: 432 stop += 1 433 else: 434 break 435 finally: 436 count = None 437 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 438 self.assertEqual(zipfp.namelist(), ['file1']) 439 self.assertEqual(zipfp.read('file1'), b'data1') 440 441 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 442 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 443 self.assertEqual(zipfp.read('file1'), b'data1') 444 self.assertEqual(zipfp.read('file2'), b'data2') 445 446 447 def tearDown(self): 448 unlink(TESTFN) 449 unlink(TESTFN2) 450 451 452class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 453 unittest.TestCase): 454 compression = zipfile.ZIP_STORED 455 test_low_compression = None 456 457 def zip_test_writestr_permissions(self, f, compression): 458 # Make sure that writestr and open(... mode='w') create files with 459 # mode 0600, when they are passed a name rather than a ZipInfo 460 # instance. 461 462 self.make_test_archive(f, compression) 463 with zipfile.ZipFile(f, "r") as zipfp: 464 zinfo = zipfp.getinfo('strfile') 465 self.assertEqual(zinfo.external_attr, 0o600 << 16) 466 467 zinfo2 = zipfp.getinfo('written-open-w') 468 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 469 470 def test_writestr_permissions(self): 471 for f in get_files(self): 472 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 473 474 def test_absolute_arcnames(self): 475 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 476 zipfp.write(TESTFN, "/absolute") 477 478 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 479 self.assertEqual(zipfp.namelist(), ["absolute"]) 480 481 def test_append_to_zip_file(self): 482 """Test appending to an existing zipfile.""" 483 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 484 zipfp.write(TESTFN, TESTFN) 485 486 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 487 zipfp.writestr("strfile", self.data) 488 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 489 490 def test_append_to_non_zip_file(self): 491 """Test appending to an existing file that is not a zipfile.""" 492 # NOTE: this test fails if len(d) < 22 because of the first 493 # line "fpin.seek(-22, 2)" in _EndRecData 494 data = b'I am not a ZipFile!'*10 495 with open(TESTFN2, 'wb') as f: 496 f.write(data) 497 498 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 499 zipfp.write(TESTFN, TESTFN) 500 501 with open(TESTFN2, 'rb') as f: 502 f.seek(len(data)) 503 with zipfile.ZipFile(f, "r") as zipfp: 504 self.assertEqual(zipfp.namelist(), [TESTFN]) 505 self.assertEqual(zipfp.read(TESTFN), self.data) 506 with open(TESTFN2, 'rb') as f: 507 self.assertEqual(f.read(len(data)), data) 508 zipfiledata = f.read() 509 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 510 self.assertEqual(zipfp.namelist(), [TESTFN]) 511 self.assertEqual(zipfp.read(TESTFN), self.data) 512 513 def test_read_concatenated_zip_file(self): 514 with io.BytesIO() as bio: 515 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 516 zipfp.write(TESTFN, TESTFN) 517 zipfiledata = bio.getvalue() 518 data = b'I am not a ZipFile!'*10 519 with open(TESTFN2, 'wb') as f: 520 f.write(data) 521 f.write(zipfiledata) 522 523 with zipfile.ZipFile(TESTFN2) as zipfp: 524 self.assertEqual(zipfp.namelist(), [TESTFN]) 525 self.assertEqual(zipfp.read(TESTFN), self.data) 526 527 def test_append_to_concatenated_zip_file(self): 528 with io.BytesIO() as bio: 529 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 530 zipfp.write(TESTFN, TESTFN) 531 zipfiledata = bio.getvalue() 532 data = b'I am not a ZipFile!'*1000000 533 with open(TESTFN2, 'wb') as f: 534 f.write(data) 535 f.write(zipfiledata) 536 537 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 538 self.assertEqual(zipfp.namelist(), [TESTFN]) 539 zipfp.writestr('strfile', self.data) 540 541 with open(TESTFN2, 'rb') as f: 542 self.assertEqual(f.read(len(data)), data) 543 zipfiledata = f.read() 544 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 545 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 546 self.assertEqual(zipfp.read(TESTFN), self.data) 547 self.assertEqual(zipfp.read('strfile'), self.data) 548 549 def test_ignores_newline_at_end(self): 550 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 551 zipfp.write(TESTFN, TESTFN) 552 with open(TESTFN2, 'a') as f: 553 f.write("\r\n\00\00\00") 554 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 555 self.assertIsInstance(zipfp, zipfile.ZipFile) 556 557 def test_ignores_stuff_appended_past_comments(self): 558 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 559 zipfp.comment = b"this is a comment" 560 zipfp.write(TESTFN, TESTFN) 561 with open(TESTFN2, 'a') as f: 562 f.write("abcdef\r\n") 563 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 564 self.assertIsInstance(zipfp, zipfile.ZipFile) 565 self.assertEqual(zipfp.comment, b"this is a comment") 566 567 def test_write_default_name(self): 568 """Check that calling ZipFile.write without arcname specified 569 produces the expected result.""" 570 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 571 zipfp.write(TESTFN) 572 with open(TESTFN, "rb") as f: 573 self.assertEqual(zipfp.read(TESTFN), f.read()) 574 575 def test_write_to_readonly(self): 576 """Check that trying to call write() on a readonly ZipFile object 577 raises a ValueError.""" 578 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 579 zipfp.writestr("somefile.txt", "bogus") 580 581 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 582 self.assertRaises(ValueError, zipfp.write, TESTFN) 583 584 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 585 with self.assertRaises(ValueError): 586 zipfp.open(TESTFN, mode='w') 587 588 def test_add_file_before_1980(self): 589 # Set atime and mtime to 1970-01-01 590 os.utime(TESTFN, (0, 0)) 591 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 592 self.assertRaises(ValueError, zipfp.write, TESTFN) 593 594 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 595 zipfp.write(TESTFN) 596 zinfo = zipfp.getinfo(TESTFN) 597 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 598 599 def test_add_file_after_2107(self): 600 # Set atime and mtime to 2108-12-30 601 ts = 4386268800 602 try: 603 time.localtime(ts) 604 except OverflowError: 605 self.skipTest(f'time.localtime({ts}) raises OverflowError') 606 try: 607 os.utime(TESTFN, (ts, ts)) 608 except OverflowError: 609 self.skipTest('Host fs cannot set timestamp to required value.') 610 611 mtime_ns = os.stat(TESTFN).st_mtime_ns 612 if mtime_ns != (4386268800 * 10**9): 613 # XFS filesystem is limited to 32-bit timestamp, but the syscall 614 # didn't fail. Moreover, there is a VFS bug which returns 615 # a cached timestamp which is different than the value on disk. 616 # 617 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 618 # 619 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 620 # https://bugs.python.org/issue39460#msg360952 621 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 622 623 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 624 self.assertRaises(struct.error, zipfp.write, TESTFN) 625 626 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 627 zipfp.write(TESTFN) 628 zinfo = zipfp.getinfo(TESTFN) 629 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 630 631 632@requires_zlib 633class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 634 unittest.TestCase): 635 compression = zipfile.ZIP_DEFLATED 636 637 def test_per_file_compression(self): 638 """Check that files within a Zip archive can have different 639 compression options.""" 640 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 641 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 642 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 643 sinfo = zipfp.getinfo('storeme') 644 dinfo = zipfp.getinfo('deflateme') 645 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 646 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 647 648@requires_bz2 649class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 650 unittest.TestCase): 651 compression = zipfile.ZIP_BZIP2 652 653@requires_lzma 654class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 655 unittest.TestCase): 656 compression = zipfile.ZIP_LZMA 657 658 659class AbstractTestZip64InSmallFiles: 660 # These tests test the ZIP64 functionality without using large files, 661 # see test_zipfile64 for proper tests. 662 663 @classmethod 664 def setUpClass(cls): 665 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 666 for i in range(0, FIXEDTEST_SIZE)) 667 cls.data = b'\n'.join(line_gen) 668 669 def setUp(self): 670 self._limit = zipfile.ZIP64_LIMIT 671 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 672 zipfile.ZIP64_LIMIT = 1000 673 zipfile.ZIP_FILECOUNT_LIMIT = 9 674 675 # Make a source file with some lines 676 with open(TESTFN, "wb") as fp: 677 fp.write(self.data) 678 679 def zip_test(self, f, compression): 680 # Create the ZIP archive 681 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 682 zipfp.write(TESTFN, "another.name") 683 zipfp.write(TESTFN, TESTFN) 684 zipfp.writestr("strfile", self.data) 685 686 # Read the ZIP archive 687 with zipfile.ZipFile(f, "r", compression) as zipfp: 688 self.assertEqual(zipfp.read(TESTFN), self.data) 689 self.assertEqual(zipfp.read("another.name"), self.data) 690 self.assertEqual(zipfp.read("strfile"), self.data) 691 692 # Print the ZIP directory 693 fp = io.StringIO() 694 zipfp.printdir(fp) 695 696 directory = fp.getvalue() 697 lines = directory.splitlines() 698 self.assertEqual(len(lines), 4) # Number of files + header 699 700 self.assertIn('File Name', lines[0]) 701 self.assertIn('Modified', lines[0]) 702 self.assertIn('Size', lines[0]) 703 704 fn, date, time_, size = lines[1].split() 705 self.assertEqual(fn, 'another.name') 706 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 707 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 708 self.assertEqual(size, str(len(self.data))) 709 710 # Check the namelist 711 names = zipfp.namelist() 712 self.assertEqual(len(names), 3) 713 self.assertIn(TESTFN, names) 714 self.assertIn("another.name", names) 715 self.assertIn("strfile", names) 716 717 # Check infolist 718 infos = zipfp.infolist() 719 names = [i.filename for i in infos] 720 self.assertEqual(len(names), 3) 721 self.assertIn(TESTFN, names) 722 self.assertIn("another.name", names) 723 self.assertIn("strfile", names) 724 for i in infos: 725 self.assertEqual(i.file_size, len(self.data)) 726 727 # check getinfo 728 for nm in (TESTFN, "another.name", "strfile"): 729 info = zipfp.getinfo(nm) 730 self.assertEqual(info.filename, nm) 731 self.assertEqual(info.file_size, len(self.data)) 732 733 # Check that testzip doesn't raise an exception 734 zipfp.testzip() 735 736 def test_basic(self): 737 for f in get_files(self): 738 self.zip_test(f, self.compression) 739 740 def test_too_many_files(self): 741 # This test checks that more than 64k files can be added to an archive, 742 # and that the resulting archive can be read properly by ZipFile 743 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 744 allowZip64=True) 745 zipf.debug = 100 746 numfiles = 15 747 for i in range(numfiles): 748 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 749 self.assertEqual(len(zipf.namelist()), numfiles) 750 zipf.close() 751 752 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 753 self.assertEqual(len(zipf2.namelist()), numfiles) 754 for i in range(numfiles): 755 content = zipf2.read("foo%08d" % i).decode('ascii') 756 self.assertEqual(content, "%d" % (i**3 % 57)) 757 zipf2.close() 758 759 def test_too_many_files_append(self): 760 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 761 allowZip64=False) 762 zipf.debug = 100 763 numfiles = 9 764 for i in range(numfiles): 765 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 766 self.assertEqual(len(zipf.namelist()), numfiles) 767 with self.assertRaises(zipfile.LargeZipFile): 768 zipf.writestr("foo%08d" % numfiles, b'') 769 self.assertEqual(len(zipf.namelist()), numfiles) 770 zipf.close() 771 772 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 773 allowZip64=False) 774 zipf.debug = 100 775 self.assertEqual(len(zipf.namelist()), numfiles) 776 with self.assertRaises(zipfile.LargeZipFile): 777 zipf.writestr("foo%08d" % numfiles, b'') 778 self.assertEqual(len(zipf.namelist()), numfiles) 779 zipf.close() 780 781 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 782 allowZip64=True) 783 zipf.debug = 100 784 self.assertEqual(len(zipf.namelist()), numfiles) 785 numfiles2 = 15 786 for i in range(numfiles, numfiles2): 787 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 788 self.assertEqual(len(zipf.namelist()), numfiles2) 789 zipf.close() 790 791 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 792 self.assertEqual(len(zipf2.namelist()), numfiles2) 793 for i in range(numfiles2): 794 content = zipf2.read("foo%08d" % i).decode('ascii') 795 self.assertEqual(content, "%d" % (i**3 % 57)) 796 zipf2.close() 797 798 def tearDown(self): 799 zipfile.ZIP64_LIMIT = self._limit 800 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 801 unlink(TESTFN) 802 unlink(TESTFN2) 803 804 805class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 806 unittest.TestCase): 807 compression = zipfile.ZIP_STORED 808 809 def large_file_exception_test(self, f, compression): 810 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 811 self.assertRaises(zipfile.LargeZipFile, 812 zipfp.write, TESTFN, "another.name") 813 814 def large_file_exception_test2(self, f, compression): 815 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 816 self.assertRaises(zipfile.LargeZipFile, 817 zipfp.writestr, "another.name", self.data) 818 819 def test_large_file_exception(self): 820 for f in get_files(self): 821 self.large_file_exception_test(f, zipfile.ZIP_STORED) 822 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 823 824 def test_absolute_arcnames(self): 825 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 826 allowZip64=True) as zipfp: 827 zipfp.write(TESTFN, "/absolute") 828 829 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 830 self.assertEqual(zipfp.namelist(), ["absolute"]) 831 832 def test_append(self): 833 # Test that appending to the Zip64 archive doesn't change 834 # extra fields of existing entries. 835 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 836 zipfp.writestr("strfile", self.data) 837 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 838 zinfo = zipfp.getinfo("strfile") 839 extra = zinfo.extra 840 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 841 zipfp.writestr("strfile2", self.data) 842 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 843 zinfo = zipfp.getinfo("strfile") 844 self.assertEqual(zinfo.extra, extra) 845 846 def make_zip64_file( 847 self, file_size_64_set=False, file_size_extra=False, 848 compress_size_64_set=False, compress_size_extra=False, 849 header_offset_64_set=False, header_offset_extra=False, 850 ): 851 """Generate bytes sequence for a zip with (incomplete) zip64 data. 852 853 The actual values (not the zip 64 0xffffffff values) stored in the file 854 are: 855 file_size: 8 856 compress_size: 8 857 header_offset: 0 858 """ 859 actual_size = 8 860 actual_header_offset = 0 861 local_zip64_fields = [] 862 central_zip64_fields = [] 863 864 file_size = actual_size 865 if file_size_64_set: 866 file_size = 0xffffffff 867 if file_size_extra: 868 local_zip64_fields.append(actual_size) 869 central_zip64_fields.append(actual_size) 870 file_size = struct.pack("<L", file_size) 871 872 compress_size = actual_size 873 if compress_size_64_set: 874 compress_size = 0xffffffff 875 if compress_size_extra: 876 local_zip64_fields.append(actual_size) 877 central_zip64_fields.append(actual_size) 878 compress_size = struct.pack("<L", compress_size) 879 880 header_offset = actual_header_offset 881 if header_offset_64_set: 882 header_offset = 0xffffffff 883 if header_offset_extra: 884 central_zip64_fields.append(actual_header_offset) 885 header_offset = struct.pack("<L", header_offset) 886 887 local_extra = struct.pack( 888 '<HH' + 'Q'*len(local_zip64_fields), 889 0x0001, 890 8*len(local_zip64_fields), 891 *local_zip64_fields 892 ) 893 894 central_extra = struct.pack( 895 '<HH' + 'Q'*len(central_zip64_fields), 896 0x0001, 897 8*len(central_zip64_fields), 898 *central_zip64_fields 899 ) 900 901 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 902 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 903 904 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 905 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 906 907 filename = b"test.txt" 908 content = b"test1234" 909 filename_length = struct.pack("<H", len(filename)) 910 zip64_contents = ( 911 # Local file header 912 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 913 + compress_size 914 + file_size 915 + filename_length 916 + local_extra_length 917 + filename 918 + local_extra 919 + content 920 # Central directory: 921 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 922 + compress_size 923 + file_size 924 + filename_length 925 + central_extra_length 926 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 927 + header_offset 928 + filename 929 + central_extra 930 # Zip64 end of central directory 931 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 932 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 933 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 934 + central_dir_size 935 + offset_to_central_dir 936 # Zip64 end of central directory locator 937 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 938 + b"\x00\x00\x00" 939 # end of central directory 940 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 941 + b"\x00\x00\x00\x00" 942 ) 943 return zip64_contents 944 945 def test_bad_zip64_extra(self): 946 """Missing zip64 extra records raises an exception. 947 948 There are 4 fields that the zip64 format handles (the disk number is 949 not used in this module and so is ignored here). According to the zip 950 spec: 951 The order of the fields in the zip64 extended 952 information record is fixed, but the fields MUST 953 only appear if the corresponding Local or Central 954 directory record field is set to 0xFFFF or 0xFFFFFFFF. 955 956 If the zip64 extra content doesn't contain enough entries for the 957 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 958 This test mismatches the length of the zip64 extra field and the number 959 of fields set to indicate the presence of zip64 data. 960 """ 961 # zip64 file size present, no fields in extra, expecting one, equals 962 # missing file size. 963 missing_file_size_extra = self.make_zip64_file( 964 file_size_64_set=True, 965 ) 966 with self.assertRaises(zipfile.BadZipFile) as e: 967 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 968 self.assertIn('file size', str(e.exception).lower()) 969 970 # zip64 file size present, zip64 compress size present, one field in 971 # extra, expecting two, equals missing compress size. 972 missing_compress_size_extra = self.make_zip64_file( 973 file_size_64_set=True, 974 file_size_extra=True, 975 compress_size_64_set=True, 976 ) 977 with self.assertRaises(zipfile.BadZipFile) as e: 978 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 979 self.assertIn('compress size', str(e.exception).lower()) 980 981 # zip64 compress size present, no fields in extra, expecting one, 982 # equals missing compress size. 983 missing_compress_size_extra = self.make_zip64_file( 984 compress_size_64_set=True, 985 ) 986 with self.assertRaises(zipfile.BadZipFile) as e: 987 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 988 self.assertIn('compress size', str(e.exception).lower()) 989 990 # zip64 file size present, zip64 compress size present, zip64 header 991 # offset present, two fields in extra, expecting three, equals missing 992 # header offset 993 missing_header_offset_extra = self.make_zip64_file( 994 file_size_64_set=True, 995 file_size_extra=True, 996 compress_size_64_set=True, 997 compress_size_extra=True, 998 header_offset_64_set=True, 999 ) 1000 with self.assertRaises(zipfile.BadZipFile) as e: 1001 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1002 self.assertIn('header offset', str(e.exception).lower()) 1003 1004 # zip64 compress size present, zip64 header offset present, one field 1005 # in extra, expecting two, equals missing header offset 1006 missing_header_offset_extra = self.make_zip64_file( 1007 file_size_64_set=False, 1008 compress_size_64_set=True, 1009 compress_size_extra=True, 1010 header_offset_64_set=True, 1011 ) 1012 with self.assertRaises(zipfile.BadZipFile) as e: 1013 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1014 self.assertIn('header offset', str(e.exception).lower()) 1015 1016 # zip64 file size present, zip64 header offset present, one field in 1017 # extra, expecting two, equals missing header offset 1018 missing_header_offset_extra = self.make_zip64_file( 1019 file_size_64_set=True, 1020 file_size_extra=True, 1021 compress_size_64_set=False, 1022 header_offset_64_set=True, 1023 ) 1024 with self.assertRaises(zipfile.BadZipFile) as e: 1025 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1026 self.assertIn('header offset', str(e.exception).lower()) 1027 1028 # zip64 header offset present, no fields in extra, expecting one, 1029 # equals missing header offset 1030 missing_header_offset_extra = self.make_zip64_file( 1031 file_size_64_set=False, 1032 compress_size_64_set=False, 1033 header_offset_64_set=True, 1034 ) 1035 with self.assertRaises(zipfile.BadZipFile) as e: 1036 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1037 self.assertIn('header offset', str(e.exception).lower()) 1038 1039 def test_generated_valid_zip64_extra(self): 1040 # These values are what is set in the make_zip64_file method. 1041 expected_file_size = 8 1042 expected_compress_size = 8 1043 expected_header_offset = 0 1044 expected_content = b"test1234" 1045 1046 # Loop through the various valid combinations of zip64 masks 1047 # present and extra fields present. 1048 params = ( 1049 {"file_size_64_set": True, "file_size_extra": True}, 1050 {"compress_size_64_set": True, "compress_size_extra": True}, 1051 {"header_offset_64_set": True, "header_offset_extra": True}, 1052 ) 1053 1054 for r in range(1, len(params) + 1): 1055 for combo in itertools.combinations(params, r): 1056 kwargs = {} 1057 for c in combo: 1058 kwargs.update(c) 1059 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1060 zinfo = zf.infolist()[0] 1061 self.assertEqual(zinfo.file_size, expected_file_size) 1062 self.assertEqual(zinfo.compress_size, expected_compress_size) 1063 self.assertEqual(zinfo.header_offset, expected_header_offset) 1064 self.assertEqual(zf.read(zinfo), expected_content) 1065 1066 1067@requires_zlib 1068class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1069 unittest.TestCase): 1070 compression = zipfile.ZIP_DEFLATED 1071 1072@requires_bz2 1073class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1074 unittest.TestCase): 1075 compression = zipfile.ZIP_BZIP2 1076 1077@requires_lzma 1078class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1079 unittest.TestCase): 1080 compression = zipfile.ZIP_LZMA 1081 1082 1083class AbstractWriterTests: 1084 1085 def tearDown(self): 1086 unlink(TESTFN2) 1087 1088 def test_close_after_close(self): 1089 data = b'content' 1090 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1091 w = zipf.open('test', 'w') 1092 w.write(data) 1093 w.close() 1094 self.assertTrue(w.closed) 1095 w.close() 1096 self.assertTrue(w.closed) 1097 self.assertEqual(zipf.read('test'), data) 1098 1099 def test_write_after_close(self): 1100 data = b'content' 1101 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1102 w = zipf.open('test', 'w') 1103 w.write(data) 1104 w.close() 1105 self.assertTrue(w.closed) 1106 self.assertRaises(ValueError, w.write, b'') 1107 self.assertEqual(zipf.read('test'), data) 1108 1109class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1110 compression = zipfile.ZIP_STORED 1111 1112@requires_zlib 1113class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1114 compression = zipfile.ZIP_DEFLATED 1115 1116@requires_bz2 1117class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1118 compression = zipfile.ZIP_BZIP2 1119 1120@requires_lzma 1121class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1122 compression = zipfile.ZIP_LZMA 1123 1124 1125class PyZipFileTests(unittest.TestCase): 1126 def assertCompiledIn(self, name, namelist): 1127 if name + 'o' not in namelist: 1128 self.assertIn(name + 'c', namelist) 1129 1130 def requiresWriteAccess(self, path): 1131 # effective_ids unavailable on windows 1132 if not os.access(path, os.W_OK, 1133 effective_ids=os.access in os.supports_effective_ids): 1134 self.skipTest('requires write access to the installed location') 1135 filename = os.path.join(path, 'test_zipfile.try') 1136 try: 1137 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1138 os.close(fd) 1139 except Exception: 1140 self.skipTest('requires write access to the installed location') 1141 unlink(filename) 1142 1143 def test_write_pyfile(self): 1144 self.requiresWriteAccess(os.path.dirname(__file__)) 1145 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1146 fn = __file__ 1147 if fn.endswith('.pyc'): 1148 path_split = fn.split(os.sep) 1149 if os.altsep is not None: 1150 path_split.extend(fn.split(os.altsep)) 1151 if '__pycache__' in path_split: 1152 fn = importlib.util.source_from_cache(fn) 1153 else: 1154 fn = fn[:-1] 1155 1156 zipfp.writepy(fn) 1157 1158 bn = os.path.basename(fn) 1159 self.assertNotIn(bn, zipfp.namelist()) 1160 self.assertCompiledIn(bn, zipfp.namelist()) 1161 1162 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1163 fn = __file__ 1164 if fn.endswith('.pyc'): 1165 fn = fn[:-1] 1166 1167 zipfp.writepy(fn, "testpackage") 1168 1169 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1170 self.assertNotIn(bn, zipfp.namelist()) 1171 self.assertCompiledIn(bn, zipfp.namelist()) 1172 1173 def test_write_python_package(self): 1174 import email 1175 packagedir = os.path.dirname(email.__file__) 1176 self.requiresWriteAccess(packagedir) 1177 1178 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1179 zipfp.writepy(packagedir) 1180 1181 # Check for a couple of modules at different levels of the 1182 # hierarchy 1183 names = zipfp.namelist() 1184 self.assertCompiledIn('email/__init__.py', names) 1185 self.assertCompiledIn('email/mime/text.py', names) 1186 1187 def test_write_filtered_python_package(self): 1188 import test 1189 packagedir = os.path.dirname(test.__file__) 1190 self.requiresWriteAccess(packagedir) 1191 1192 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1193 1194 # first make sure that the test folder gives error messages 1195 # (on the badsyntax_... files) 1196 with captured_stdout() as reportSIO: 1197 zipfp.writepy(packagedir) 1198 reportStr = reportSIO.getvalue() 1199 self.assertTrue('SyntaxError' in reportStr) 1200 1201 # then check that the filter works on the whole package 1202 with captured_stdout() as reportSIO: 1203 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1204 reportStr = reportSIO.getvalue() 1205 self.assertTrue('SyntaxError' not in reportStr) 1206 1207 # then check that the filter works on individual files 1208 def filter(path): 1209 return not os.path.basename(path).startswith("bad") 1210 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1211 zipfp.writepy(packagedir, filterfunc=filter) 1212 reportStr = reportSIO.getvalue() 1213 if reportStr: 1214 print(reportStr) 1215 self.assertTrue('SyntaxError' not in reportStr) 1216 1217 def test_write_with_optimization(self): 1218 import email 1219 packagedir = os.path.dirname(email.__file__) 1220 self.requiresWriteAccess(packagedir) 1221 optlevel = 1 if __debug__ else 0 1222 ext = '.pyc' 1223 1224 with TemporaryFile() as t, \ 1225 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1226 zipfp.writepy(packagedir) 1227 1228 names = zipfp.namelist() 1229 self.assertIn('email/__init__' + ext, names) 1230 self.assertIn('email/mime/text' + ext, names) 1231 1232 def test_write_python_directory(self): 1233 os.mkdir(TESTFN2) 1234 try: 1235 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1236 fp.write("print(42)\n") 1237 1238 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1239 fp.write("print(42 * 42)\n") 1240 1241 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 1242 fp.write("bla bla bla\n") 1243 1244 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1245 zipfp.writepy(TESTFN2) 1246 1247 names = zipfp.namelist() 1248 self.assertCompiledIn('mod1.py', names) 1249 self.assertCompiledIn('mod2.py', names) 1250 self.assertNotIn('mod2.txt', names) 1251 1252 finally: 1253 rmtree(TESTFN2) 1254 1255 def test_write_python_directory_filtered(self): 1256 os.mkdir(TESTFN2) 1257 try: 1258 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1259 fp.write("print(42)\n") 1260 1261 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1262 fp.write("print(42 * 42)\n") 1263 1264 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1265 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1266 not fn.endswith('mod2.py')) 1267 1268 names = zipfp.namelist() 1269 self.assertCompiledIn('mod1.py', names) 1270 self.assertNotIn('mod2.py', names) 1271 1272 finally: 1273 rmtree(TESTFN2) 1274 1275 def test_write_non_pyfile(self): 1276 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1277 with open(TESTFN, 'w') as f: 1278 f.write('most definitely not a python file') 1279 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1280 unlink(TESTFN) 1281 1282 def test_write_pyfile_bad_syntax(self): 1283 os.mkdir(TESTFN2) 1284 try: 1285 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1286 fp.write("Bad syntax in python file\n") 1287 1288 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1289 # syntax errors are printed to stdout 1290 with captured_stdout() as s: 1291 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1292 1293 self.assertIn("SyntaxError", s.getvalue()) 1294 1295 # as it will not have compiled the python file, it will 1296 # include the .py file not .pyc 1297 names = zipfp.namelist() 1298 self.assertIn('mod1.py', names) 1299 self.assertNotIn('mod1.pyc', names) 1300 1301 finally: 1302 rmtree(TESTFN2) 1303 1304 def test_write_pathlike(self): 1305 os.mkdir(TESTFN2) 1306 try: 1307 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1308 fp.write("print(42)\n") 1309 1310 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1311 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1312 names = zipfp.namelist() 1313 self.assertCompiledIn('mod1.py', names) 1314 finally: 1315 rmtree(TESTFN2) 1316 1317 1318class ExtractTests(unittest.TestCase): 1319 1320 def make_test_file(self): 1321 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1322 for fpath, fdata in SMALL_TEST_DATA: 1323 zipfp.writestr(fpath, fdata) 1324 1325 def test_extract(self): 1326 with temp_cwd(): 1327 self.make_test_file() 1328 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1329 for fpath, fdata in SMALL_TEST_DATA: 1330 writtenfile = zipfp.extract(fpath) 1331 1332 # make sure it was written to the right place 1333 correctfile = os.path.join(os.getcwd(), fpath) 1334 correctfile = os.path.normpath(correctfile) 1335 1336 self.assertEqual(writtenfile, correctfile) 1337 1338 # make sure correct data is in correct file 1339 with open(writtenfile, "rb") as f: 1340 self.assertEqual(fdata.encode(), f.read()) 1341 1342 unlink(writtenfile) 1343 1344 def _test_extract_with_target(self, target): 1345 self.make_test_file() 1346 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1347 for fpath, fdata in SMALL_TEST_DATA: 1348 writtenfile = zipfp.extract(fpath, target) 1349 1350 # make sure it was written to the right place 1351 correctfile = os.path.join(target, fpath) 1352 correctfile = os.path.normpath(correctfile) 1353 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1354 1355 # make sure correct data is in correct file 1356 with open(writtenfile, "rb") as f: 1357 self.assertEqual(fdata.encode(), f.read()) 1358 1359 unlink(writtenfile) 1360 1361 unlink(TESTFN2) 1362 1363 def test_extract_with_target(self): 1364 with temp_dir() as extdir: 1365 self._test_extract_with_target(extdir) 1366 1367 def test_extract_with_target_pathlike(self): 1368 with temp_dir() as extdir: 1369 self._test_extract_with_target(pathlib.Path(extdir)) 1370 1371 def test_extract_all(self): 1372 with temp_cwd(): 1373 self.make_test_file() 1374 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1375 zipfp.extractall() 1376 for fpath, fdata in SMALL_TEST_DATA: 1377 outfile = os.path.join(os.getcwd(), fpath) 1378 1379 with open(outfile, "rb") as f: 1380 self.assertEqual(fdata.encode(), f.read()) 1381 1382 unlink(outfile) 1383 1384 def _test_extract_all_with_target(self, target): 1385 self.make_test_file() 1386 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1387 zipfp.extractall(target) 1388 for fpath, fdata in SMALL_TEST_DATA: 1389 outfile = os.path.join(target, fpath) 1390 1391 with open(outfile, "rb") as f: 1392 self.assertEqual(fdata.encode(), f.read()) 1393 1394 unlink(outfile) 1395 1396 unlink(TESTFN2) 1397 1398 def test_extract_all_with_target(self): 1399 with temp_dir() as extdir: 1400 self._test_extract_all_with_target(extdir) 1401 1402 def test_extract_all_with_target_pathlike(self): 1403 with temp_dir() as extdir: 1404 self._test_extract_all_with_target(pathlib.Path(extdir)) 1405 1406 def check_file(self, filename, content): 1407 self.assertTrue(os.path.isfile(filename)) 1408 with open(filename, 'rb') as f: 1409 self.assertEqual(f.read(), content) 1410 1411 def test_sanitize_windows_name(self): 1412 san = zipfile.ZipFile._sanitize_windows_name 1413 # Passing pathsep in allows this test to work regardless of platform. 1414 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1415 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1416 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1417 1418 def test_extract_hackers_arcnames_common_cases(self): 1419 common_hacknames = [ 1420 ('../foo/bar', 'foo/bar'), 1421 ('foo/../bar', 'foo/bar'), 1422 ('foo/../../bar', 'foo/bar'), 1423 ('foo/bar/..', 'foo/bar'), 1424 ('./../foo/bar', 'foo/bar'), 1425 ('/foo/bar', 'foo/bar'), 1426 ('/foo/../bar', 'foo/bar'), 1427 ('/foo/../../bar', 'foo/bar'), 1428 ] 1429 self._test_extract_hackers_arcnames(common_hacknames) 1430 1431 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1432 def test_extract_hackers_arcnames_windows_only(self): 1433 """Test combination of path fixing and windows name sanitization.""" 1434 windows_hacknames = [ 1435 (r'..\foo\bar', 'foo/bar'), 1436 (r'..\/foo\/bar', 'foo/bar'), 1437 (r'foo/\..\/bar', 'foo/bar'), 1438 (r'foo\/../\bar', 'foo/bar'), 1439 (r'C:foo/bar', 'foo/bar'), 1440 (r'C:/foo/bar', 'foo/bar'), 1441 (r'C://foo/bar', 'foo/bar'), 1442 (r'C:\foo\bar', 'foo/bar'), 1443 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1444 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1445 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1446 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1447 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1448 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1449 (r'//?/C:/foo/bar', 'foo/bar'), 1450 (r'\\?\C:\foo\bar', 'foo/bar'), 1451 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1452 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1453 ('../../foo../../ba..r', 'foo/ba..r'), 1454 ] 1455 self._test_extract_hackers_arcnames(windows_hacknames) 1456 1457 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1458 def test_extract_hackers_arcnames_posix_only(self): 1459 posix_hacknames = [ 1460 ('//foo/bar', 'foo/bar'), 1461 ('../../foo../../ba..r', 'foo../ba..r'), 1462 (r'foo/..\bar', r'foo/..\bar'), 1463 ] 1464 self._test_extract_hackers_arcnames(posix_hacknames) 1465 1466 def _test_extract_hackers_arcnames(self, hacknames): 1467 for arcname, fixedname in hacknames: 1468 content = b'foobar' + arcname.encode() 1469 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1470 zinfo = zipfile.ZipInfo() 1471 # preserve backslashes 1472 zinfo.filename = arcname 1473 zinfo.external_attr = 0o600 << 16 1474 zipfp.writestr(zinfo, content) 1475 1476 arcname = arcname.replace(os.sep, "/") 1477 targetpath = os.path.join('target', 'subdir', 'subsub') 1478 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1479 1480 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1481 writtenfile = zipfp.extract(arcname, targetpath) 1482 self.assertEqual(writtenfile, correctfile, 1483 msg='extract %r: %r != %r' % 1484 (arcname, writtenfile, correctfile)) 1485 self.check_file(correctfile, content) 1486 rmtree('target') 1487 1488 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1489 zipfp.extractall(targetpath) 1490 self.check_file(correctfile, content) 1491 rmtree('target') 1492 1493 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1494 1495 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1496 writtenfile = zipfp.extract(arcname) 1497 self.assertEqual(writtenfile, correctfile, 1498 msg="extract %r" % arcname) 1499 self.check_file(correctfile, content) 1500 rmtree(fixedname.split('/')[0]) 1501 1502 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1503 zipfp.extractall() 1504 self.check_file(correctfile, content) 1505 rmtree(fixedname.split('/')[0]) 1506 1507 unlink(TESTFN2) 1508 1509 1510class OtherTests(unittest.TestCase): 1511 def test_open_via_zip_info(self): 1512 # Create the ZIP archive 1513 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1514 zipfp.writestr("name", "foo") 1515 with self.assertWarns(UserWarning): 1516 zipfp.writestr("name", "bar") 1517 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1518 1519 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1520 infos = zipfp.infolist() 1521 data = b"" 1522 for info in infos: 1523 with zipfp.open(info) as zipopen: 1524 data += zipopen.read() 1525 self.assertIn(data, {b"foobar", b"barfoo"}) 1526 data = b"" 1527 for info in infos: 1528 data += zipfp.read(info) 1529 self.assertIn(data, {b"foobar", b"barfoo"}) 1530 1531 def test_writestr_extended_local_header_issue1202(self): 1532 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1533 for data in 'abcdefghijklmnop': 1534 zinfo = zipfile.ZipInfo(data) 1535 zinfo.flag_bits |= 0x08 # Include an extended local header. 1536 orig_zip.writestr(zinfo, data) 1537 1538 def test_close(self): 1539 """Check that the zipfile is closed after the 'with' block.""" 1540 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1541 for fpath, fdata in SMALL_TEST_DATA: 1542 zipfp.writestr(fpath, fdata) 1543 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1544 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1545 1546 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1547 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1548 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1549 1550 def test_close_on_exception(self): 1551 """Check that the zipfile is closed if an exception is raised in the 1552 'with' block.""" 1553 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1554 for fpath, fdata in SMALL_TEST_DATA: 1555 zipfp.writestr(fpath, fdata) 1556 1557 try: 1558 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1559 raise zipfile.BadZipFile() 1560 except zipfile.BadZipFile: 1561 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1562 1563 def test_unsupported_version(self): 1564 # File has an extract_version of 120 1565 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1566 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1567 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1568 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1569 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1570 1571 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1572 io.BytesIO(data), 'r') 1573 1574 @requires_zlib 1575 def test_read_unicode_filenames(self): 1576 # bug #10801 1577 fname = findfile('zip_cp437_header.zip') 1578 with zipfile.ZipFile(fname) as zipfp: 1579 for name in zipfp.namelist(): 1580 zipfp.open(name).close() 1581 1582 def test_write_unicode_filenames(self): 1583 with zipfile.ZipFile(TESTFN, "w") as zf: 1584 zf.writestr("foo.txt", "Test for unicode filename") 1585 zf.writestr("\xf6.txt", "Test for unicode filename") 1586 self.assertIsInstance(zf.infolist()[0].filename, str) 1587 1588 with zipfile.ZipFile(TESTFN, "r") as zf: 1589 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1590 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1591 1592 def test_read_after_write_unicode_filenames(self): 1593 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1594 zipfp.writestr('приклад', b'sample') 1595 self.assertEqual(zipfp.read('приклад'), b'sample') 1596 1597 def test_exclusive_create_zip_file(self): 1598 """Test exclusive creating a new zipfile.""" 1599 unlink(TESTFN2) 1600 filename = 'testfile.txt' 1601 content = b'hello, world. this is some content.' 1602 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1603 zipfp.writestr(filename, content) 1604 with self.assertRaises(FileExistsError): 1605 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1606 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1607 self.assertEqual(zipfp.namelist(), [filename]) 1608 self.assertEqual(zipfp.read(filename), content) 1609 1610 def test_create_non_existent_file_for_append(self): 1611 if os.path.exists(TESTFN): 1612 os.unlink(TESTFN) 1613 1614 filename = 'testfile.txt' 1615 content = b'hello, world. this is some content.' 1616 1617 try: 1618 with zipfile.ZipFile(TESTFN, 'a') as zf: 1619 zf.writestr(filename, content) 1620 except OSError: 1621 self.fail('Could not append data to a non-existent zip file.') 1622 1623 self.assertTrue(os.path.exists(TESTFN)) 1624 1625 with zipfile.ZipFile(TESTFN, 'r') as zf: 1626 self.assertEqual(zf.read(filename), content) 1627 1628 def test_close_erroneous_file(self): 1629 # This test checks that the ZipFile constructor closes the file object 1630 # it opens if there's an error in the file. If it doesn't, the 1631 # traceback holds a reference to the ZipFile object and, indirectly, 1632 # the file object. 1633 # On Windows, this causes the os.unlink() call to fail because the 1634 # underlying file is still open. This is SF bug #412214. 1635 # 1636 with open(TESTFN, "w") as fp: 1637 fp.write("this is not a legal zip file\n") 1638 try: 1639 zf = zipfile.ZipFile(TESTFN) 1640 except zipfile.BadZipFile: 1641 pass 1642 1643 def test_is_zip_erroneous_file(self): 1644 """Check that is_zipfile() correctly identifies non-zip files.""" 1645 # - passing a filename 1646 with open(TESTFN, "w") as fp: 1647 fp.write("this is not a legal zip file\n") 1648 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1649 # - passing a path-like object 1650 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1651 # - passing a file object 1652 with open(TESTFN, "rb") as fp: 1653 self.assertFalse(zipfile.is_zipfile(fp)) 1654 # - passing a file-like object 1655 fp = io.BytesIO() 1656 fp.write(b"this is not a legal zip file\n") 1657 self.assertFalse(zipfile.is_zipfile(fp)) 1658 fp.seek(0, 0) 1659 self.assertFalse(zipfile.is_zipfile(fp)) 1660 1661 def test_damaged_zipfile(self): 1662 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1663 # - Create a valid zip file 1664 fp = io.BytesIO() 1665 with zipfile.ZipFile(fp, mode="w") as zipf: 1666 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1667 zipfiledata = fp.getvalue() 1668 1669 # - Now create copies of it missing the last N bytes and make sure 1670 # a BadZipFile exception is raised when we try to open it 1671 for N in range(len(zipfiledata)): 1672 fp = io.BytesIO(zipfiledata[:N]) 1673 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1674 1675 def test_is_zip_valid_file(self): 1676 """Check that is_zipfile() correctly identifies zip files.""" 1677 # - passing a filename 1678 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1679 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1680 1681 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1682 # - passing a file object 1683 with open(TESTFN, "rb") as fp: 1684 self.assertTrue(zipfile.is_zipfile(fp)) 1685 fp.seek(0, 0) 1686 zip_contents = fp.read() 1687 # - passing a file-like object 1688 fp = io.BytesIO() 1689 fp.write(zip_contents) 1690 self.assertTrue(zipfile.is_zipfile(fp)) 1691 fp.seek(0, 0) 1692 self.assertTrue(zipfile.is_zipfile(fp)) 1693 1694 def test_non_existent_file_raises_OSError(self): 1695 # make sure we don't raise an AttributeError when a partially-constructed 1696 # ZipFile instance is finalized; this tests for regression on SF tracker 1697 # bug #403871. 1698 1699 # The bug we're testing for caused an AttributeError to be raised 1700 # when a ZipFile instance was created for a file that did not 1701 # exist; the .fp member was not initialized but was needed by the 1702 # __del__() method. Since the AttributeError is in the __del__(), 1703 # it is ignored, but the user should be sufficiently annoyed by 1704 # the message on the output that regression will be noticed 1705 # quickly. 1706 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1707 1708 def test_empty_file_raises_BadZipFile(self): 1709 f = open(TESTFN, 'w') 1710 f.close() 1711 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1712 1713 with open(TESTFN, 'w') as fp: 1714 fp.write("short file") 1715 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1716 1717 def test_closed_zip_raises_ValueError(self): 1718 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1719 data = io.BytesIO() 1720 with zipfile.ZipFile(data, mode="w") as zipf: 1721 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1722 1723 # This is correct; calling .read on a closed ZipFile should raise 1724 # a ValueError, and so should calling .testzip. An earlier 1725 # version of .testzip would swallow this exception (and any other) 1726 # and report that the first file in the archive was corrupt. 1727 self.assertRaises(ValueError, zipf.read, "foo.txt") 1728 self.assertRaises(ValueError, zipf.open, "foo.txt") 1729 self.assertRaises(ValueError, zipf.testzip) 1730 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1731 with open(TESTFN, 'w') as f: 1732 f.write('zipfile test data') 1733 self.assertRaises(ValueError, zipf.write, TESTFN) 1734 1735 def test_bad_constructor_mode(self): 1736 """Check that bad modes passed to ZipFile constructor are caught.""" 1737 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1738 1739 def test_bad_open_mode(self): 1740 """Check that bad modes passed to ZipFile.open are caught.""" 1741 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1742 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1743 1744 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1745 # read the data to make sure the file is there 1746 zipf.read("foo.txt") 1747 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1748 # universal newlines support is removed 1749 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1750 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1751 1752 def test_read0(self): 1753 """Check that calling read(0) on a ZipExtFile object returns an empty 1754 string and doesn't advance file pointer.""" 1755 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1756 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1757 # read the data to make sure the file is there 1758 with zipf.open("foo.txt") as f: 1759 for i in range(FIXEDTEST_SIZE): 1760 self.assertEqual(f.read(0), b'') 1761 1762 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1763 1764 def test_open_non_existent_item(self): 1765 """Check that attempting to call open() for an item that doesn't 1766 exist in the archive raises a RuntimeError.""" 1767 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1768 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1769 1770 def test_bad_compression_mode(self): 1771 """Check that bad compression methods passed to ZipFile.open are 1772 caught.""" 1773 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1774 1775 def test_unsupported_compression(self): 1776 # data is declared as shrunk, but actually deflated 1777 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1778 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1779 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1780 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1781 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1782 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1783 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1784 self.assertRaises(NotImplementedError, zipf.open, 'x') 1785 1786 def test_null_byte_in_filename(self): 1787 """Check that a filename containing a null byte is properly 1788 terminated.""" 1789 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1790 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1791 self.assertEqual(zipf.namelist(), ['foo.txt']) 1792 1793 def test_struct_sizes(self): 1794 """Check that ZIP internal structure sizes are calculated correctly.""" 1795 self.assertEqual(zipfile.sizeEndCentDir, 22) 1796 self.assertEqual(zipfile.sizeCentralDir, 46) 1797 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1798 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1799 1800 def test_comments(self): 1801 """Check that comments on the archive are handled properly.""" 1802 1803 # check default comment is empty 1804 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1805 self.assertEqual(zipf.comment, b'') 1806 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1807 1808 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1809 self.assertEqual(zipfr.comment, b'') 1810 1811 # check a simple short comment 1812 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1813 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1814 zipf.comment = comment 1815 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1816 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1817 self.assertEqual(zipf.comment, comment) 1818 1819 # check a comment of max length 1820 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1821 comment2 = comment2.encode("ascii") 1822 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1823 zipf.comment = comment2 1824 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1825 1826 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1827 self.assertEqual(zipfr.comment, comment2) 1828 1829 # check a comment that is too long is truncated 1830 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1831 with self.assertWarns(UserWarning): 1832 zipf.comment = comment2 + b'oops' 1833 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1834 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1835 self.assertEqual(zipfr.comment, comment2) 1836 1837 # check that comments are correctly modified in append mode 1838 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1839 zipf.comment = b"original comment" 1840 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1841 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1842 zipf.comment = b"an updated comment" 1843 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1844 self.assertEqual(zipf.comment, b"an updated comment") 1845 1846 # check that comments are correctly shortened in append mode 1847 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1848 zipf.comment = b"original comment that's longer" 1849 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1850 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1851 zipf.comment = b"shorter comment" 1852 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1853 self.assertEqual(zipf.comment, b"shorter comment") 1854 1855 def test_unicode_comment(self): 1856 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1857 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1858 with self.assertRaises(TypeError): 1859 zipf.comment = "this is an error" 1860 1861 def test_change_comment_in_empty_archive(self): 1862 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1863 self.assertFalse(zipf.filelist) 1864 zipf.comment = b"this is a comment" 1865 with zipfile.ZipFile(TESTFN, "r") as zipf: 1866 self.assertEqual(zipf.comment, b"this is a comment") 1867 1868 def test_change_comment_in_nonempty_archive(self): 1869 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1870 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1871 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1872 self.assertTrue(zipf.filelist) 1873 zipf.comment = b"this is a comment" 1874 with zipfile.ZipFile(TESTFN, "r") as zipf: 1875 self.assertEqual(zipf.comment, b"this is a comment") 1876 1877 def test_empty_zipfile(self): 1878 # Check that creating a file in 'w' or 'a' mode and closing without 1879 # adding any files to the archives creates a valid empty ZIP file 1880 zipf = zipfile.ZipFile(TESTFN, mode="w") 1881 zipf.close() 1882 try: 1883 zipf = zipfile.ZipFile(TESTFN, mode="r") 1884 except zipfile.BadZipFile: 1885 self.fail("Unable to create empty ZIP file in 'w' mode") 1886 1887 zipf = zipfile.ZipFile(TESTFN, mode="a") 1888 zipf.close() 1889 try: 1890 zipf = zipfile.ZipFile(TESTFN, mode="r") 1891 except: 1892 self.fail("Unable to create empty ZIP file in 'a' mode") 1893 1894 def test_open_empty_file(self): 1895 # Issue 1710703: Check that opening a file with less than 22 bytes 1896 # raises a BadZipFile exception (rather than the previously unhelpful 1897 # OSError) 1898 f = open(TESTFN, 'w') 1899 f.close() 1900 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1901 1902 def test_create_zipinfo_before_1980(self): 1903 self.assertRaises(ValueError, 1904 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1905 1906 def test_zipfile_with_short_extra_field(self): 1907 """If an extra field in the header is less than 4 bytes, skip it.""" 1908 zipdata = ( 1909 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1910 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1911 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1912 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1913 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1914 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1915 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1916 ) 1917 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1918 # testzip returns the name of the first corrupt file, or None 1919 self.assertIsNone(zipf.testzip()) 1920 1921 def test_open_conflicting_handles(self): 1922 # It's only possible to open one writable file handle at a time 1923 msg1 = b"It's fun to charter an accountant!" 1924 msg2 = b"And sail the wide accountant sea" 1925 msg3 = b"To find, explore the funds offshore" 1926 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1927 with zipf.open('foo', mode='w') as w2: 1928 w2.write(msg1) 1929 with zipf.open('bar', mode='w') as w1: 1930 with self.assertRaises(ValueError): 1931 zipf.open('handle', mode='w') 1932 with self.assertRaises(ValueError): 1933 zipf.open('foo', mode='r') 1934 with self.assertRaises(ValueError): 1935 zipf.writestr('str', 'abcde') 1936 with self.assertRaises(ValueError): 1937 zipf.write(__file__, 'file') 1938 with self.assertRaises(ValueError): 1939 zipf.close() 1940 w1.write(msg2) 1941 with zipf.open('baz', mode='w') as w2: 1942 w2.write(msg3) 1943 1944 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1945 self.assertEqual(zipf.read('foo'), msg1) 1946 self.assertEqual(zipf.read('bar'), msg2) 1947 self.assertEqual(zipf.read('baz'), msg3) 1948 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1949 1950 def test_seek_tell(self): 1951 # Test seek functionality 1952 txt = b"Where's Bruce?" 1953 bloc = txt.find(b"Bruce") 1954 # Check seek on a file 1955 with zipfile.ZipFile(TESTFN, "w") as zipf: 1956 zipf.writestr("foo.txt", txt) 1957 with zipfile.ZipFile(TESTFN, "r") as zipf: 1958 with zipf.open("foo.txt", "r") as fp: 1959 fp.seek(bloc, os.SEEK_SET) 1960 self.assertEqual(fp.tell(), bloc) 1961 fp.seek(-bloc, os.SEEK_CUR) 1962 self.assertEqual(fp.tell(), 0) 1963 fp.seek(bloc, os.SEEK_CUR) 1964 self.assertEqual(fp.tell(), bloc) 1965 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1966 fp.seek(0, os.SEEK_END) 1967 self.assertEqual(fp.tell(), len(txt)) 1968 fp.seek(0, os.SEEK_SET) 1969 self.assertEqual(fp.tell(), 0) 1970 # Check seek on memory file 1971 data = io.BytesIO() 1972 with zipfile.ZipFile(data, mode="w") as zipf: 1973 zipf.writestr("foo.txt", txt) 1974 with zipfile.ZipFile(data, mode="r") as zipf: 1975 with zipf.open("foo.txt", "r") as fp: 1976 fp.seek(bloc, os.SEEK_SET) 1977 self.assertEqual(fp.tell(), bloc) 1978 fp.seek(-bloc, os.SEEK_CUR) 1979 self.assertEqual(fp.tell(), 0) 1980 fp.seek(bloc, os.SEEK_CUR) 1981 self.assertEqual(fp.tell(), bloc) 1982 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 1983 fp.seek(0, os.SEEK_END) 1984 self.assertEqual(fp.tell(), len(txt)) 1985 fp.seek(0, os.SEEK_SET) 1986 self.assertEqual(fp.tell(), 0) 1987 1988 @requires_bz2 1989 def test_decompress_without_3rd_party_library(self): 1990 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1991 zip_file = io.BytesIO(data) 1992 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 1993 zf.writestr('a.txt', b'a') 1994 with mock.patch('zipfile.bz2', None): 1995 with zipfile.ZipFile(zip_file) as zf: 1996 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 1997 1998 def tearDown(self): 1999 unlink(TESTFN) 2000 unlink(TESTFN2) 2001 2002 2003class AbstractBadCrcTests: 2004 def test_testzip_with_bad_crc(self): 2005 """Tests that files with bad CRCs return their name from testzip.""" 2006 zipdata = self.zip_with_bad_crc 2007 2008 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2009 # testzip returns the name of the first corrupt file, or None 2010 self.assertEqual('afile', zipf.testzip()) 2011 2012 def test_read_with_bad_crc(self): 2013 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2014 zipdata = self.zip_with_bad_crc 2015 2016 # Using ZipFile.read() 2017 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2018 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2019 2020 # Using ZipExtFile.read() 2021 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2022 with zipf.open('afile', 'r') as corrupt_file: 2023 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2024 2025 # Same with small reads (in order to exercise the buffering logic) 2026 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2027 with zipf.open('afile', 'r') as corrupt_file: 2028 corrupt_file.MIN_READ_SIZE = 2 2029 with self.assertRaises(zipfile.BadZipFile): 2030 while corrupt_file.read(2): 2031 pass 2032 2033 2034class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2035 compression = zipfile.ZIP_STORED 2036 zip_with_bad_crc = ( 2037 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2038 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2039 b'ilehello,AworldP' 2040 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2041 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2042 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2043 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2044 b'\0\0/\0\0\0\0\0') 2045 2046@requires_zlib 2047class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2048 compression = zipfile.ZIP_DEFLATED 2049 zip_with_bad_crc = ( 2050 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2051 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2052 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2053 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2054 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2055 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2056 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2057 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2058 2059@requires_bz2 2060class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2061 compression = zipfile.ZIP_BZIP2 2062 zip_with_bad_crc = ( 2063 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2064 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2065 b'ileBZh91AY&SY\xd4\xa8\xca' 2066 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2067 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2068 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2069 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2070 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2071 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2072 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2073 b'\x00\x00\x00\x00') 2074 2075@requires_lzma 2076class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2077 compression = zipfile.ZIP_LZMA 2078 zip_with_bad_crc = ( 2079 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2080 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2081 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2082 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2083 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2084 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2085 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2086 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2087 b'\x00>\x00\x00\x00\x00\x00') 2088 2089 2090class DecryptionTests(unittest.TestCase): 2091 """Check that ZIP decryption works. Since the library does not 2092 support encryption at the moment, we use a pre-generated encrypted 2093 ZIP file.""" 2094 2095 data = ( 2096 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2097 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2098 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2099 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2100 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2101 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2102 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2103 data2 = ( 2104 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2105 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2106 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2107 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2108 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2109 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2110 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2111 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2112 2113 plain = b'zipfile.py encryption test' 2114 plain2 = b'\x00'*512 2115 2116 def setUp(self): 2117 with open(TESTFN, "wb") as fp: 2118 fp.write(self.data) 2119 self.zip = zipfile.ZipFile(TESTFN, "r") 2120 with open(TESTFN2, "wb") as fp: 2121 fp.write(self.data2) 2122 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2123 2124 def tearDown(self): 2125 self.zip.close() 2126 os.unlink(TESTFN) 2127 self.zip2.close() 2128 os.unlink(TESTFN2) 2129 2130 def test_no_password(self): 2131 # Reading the encrypted file without password 2132 # must generate a RunTime exception 2133 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2134 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2135 2136 def test_bad_password(self): 2137 self.zip.setpassword(b"perl") 2138 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2139 self.zip2.setpassword(b"perl") 2140 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2141 2142 @requires_zlib 2143 def test_good_password(self): 2144 self.zip.setpassword(b"python") 2145 self.assertEqual(self.zip.read("test.txt"), self.plain) 2146 self.zip2.setpassword(b"12345") 2147 self.assertEqual(self.zip2.read("zero"), self.plain2) 2148 2149 def test_unicode_password(self): 2150 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2151 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2152 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2153 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2154 2155 def test_seek_tell(self): 2156 self.zip.setpassword(b"python") 2157 txt = self.plain 2158 test_word = b'encryption' 2159 bloc = txt.find(test_word) 2160 bloc_len = len(test_word) 2161 with self.zip.open("test.txt", "r") as fp: 2162 fp.seek(bloc, os.SEEK_SET) 2163 self.assertEqual(fp.tell(), bloc) 2164 fp.seek(-bloc, os.SEEK_CUR) 2165 self.assertEqual(fp.tell(), 0) 2166 fp.seek(bloc, os.SEEK_CUR) 2167 self.assertEqual(fp.tell(), bloc) 2168 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2169 2170 # Make sure that the second read after seeking back beyond 2171 # _readbuffer returns the same content (ie. rewind to the start of 2172 # the file to read forward to the required position). 2173 old_read_size = fp.MIN_READ_SIZE 2174 fp.MIN_READ_SIZE = 1 2175 fp._readbuffer = b'' 2176 fp._offset = 0 2177 fp.seek(0, os.SEEK_SET) 2178 self.assertEqual(fp.tell(), 0) 2179 fp.seek(bloc, os.SEEK_CUR) 2180 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2181 fp.MIN_READ_SIZE = old_read_size 2182 2183 fp.seek(0, os.SEEK_END) 2184 self.assertEqual(fp.tell(), len(txt)) 2185 fp.seek(0, os.SEEK_SET) 2186 self.assertEqual(fp.tell(), 0) 2187 2188 # Read the file completely to definitely call any eof integrity 2189 # checks (crc) and make sure they still pass. 2190 fp.read() 2191 2192 2193class AbstractTestsWithRandomBinaryFiles: 2194 @classmethod 2195 def setUpClass(cls): 2196 datacount = randint(16, 64)*1024 + randint(1, 1024) 2197 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2198 for i in range(datacount)) 2199 2200 def setUp(self): 2201 # Make a source file with some lines 2202 with open(TESTFN, "wb") as fp: 2203 fp.write(self.data) 2204 2205 def tearDown(self): 2206 unlink(TESTFN) 2207 unlink(TESTFN2) 2208 2209 def make_test_archive(self, f, compression): 2210 # Create the ZIP archive 2211 with zipfile.ZipFile(f, "w", compression) as zipfp: 2212 zipfp.write(TESTFN, "another.name") 2213 zipfp.write(TESTFN, TESTFN) 2214 2215 def zip_test(self, f, compression): 2216 self.make_test_archive(f, compression) 2217 2218 # Read the ZIP archive 2219 with zipfile.ZipFile(f, "r", compression) as zipfp: 2220 testdata = zipfp.read(TESTFN) 2221 self.assertEqual(len(testdata), len(self.data)) 2222 self.assertEqual(testdata, self.data) 2223 self.assertEqual(zipfp.read("another.name"), self.data) 2224 2225 def test_read(self): 2226 for f in get_files(self): 2227 self.zip_test(f, self.compression) 2228 2229 def zip_open_test(self, f, compression): 2230 self.make_test_archive(f, compression) 2231 2232 # Read the ZIP archive 2233 with zipfile.ZipFile(f, "r", compression) as zipfp: 2234 zipdata1 = [] 2235 with zipfp.open(TESTFN) as zipopen1: 2236 while True: 2237 read_data = zipopen1.read(256) 2238 if not read_data: 2239 break 2240 zipdata1.append(read_data) 2241 2242 zipdata2 = [] 2243 with zipfp.open("another.name") as zipopen2: 2244 while True: 2245 read_data = zipopen2.read(256) 2246 if not read_data: 2247 break 2248 zipdata2.append(read_data) 2249 2250 testdata1 = b''.join(zipdata1) 2251 self.assertEqual(len(testdata1), len(self.data)) 2252 self.assertEqual(testdata1, self.data) 2253 2254 testdata2 = b''.join(zipdata2) 2255 self.assertEqual(len(testdata2), len(self.data)) 2256 self.assertEqual(testdata2, self.data) 2257 2258 def test_open(self): 2259 for f in get_files(self): 2260 self.zip_open_test(f, self.compression) 2261 2262 def zip_random_open_test(self, f, compression): 2263 self.make_test_archive(f, compression) 2264 2265 # Read the ZIP archive 2266 with zipfile.ZipFile(f, "r", compression) as zipfp: 2267 zipdata1 = [] 2268 with zipfp.open(TESTFN) as zipopen1: 2269 while True: 2270 read_data = zipopen1.read(randint(1, 1024)) 2271 if not read_data: 2272 break 2273 zipdata1.append(read_data) 2274 2275 testdata = b''.join(zipdata1) 2276 self.assertEqual(len(testdata), len(self.data)) 2277 self.assertEqual(testdata, self.data) 2278 2279 def test_random_open(self): 2280 for f in get_files(self): 2281 self.zip_random_open_test(f, self.compression) 2282 2283 2284class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2285 unittest.TestCase): 2286 compression = zipfile.ZIP_STORED 2287 2288@requires_zlib 2289class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2290 unittest.TestCase): 2291 compression = zipfile.ZIP_DEFLATED 2292 2293@requires_bz2 2294class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2295 unittest.TestCase): 2296 compression = zipfile.ZIP_BZIP2 2297 2298@requires_lzma 2299class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2300 unittest.TestCase): 2301 compression = zipfile.ZIP_LZMA 2302 2303 2304# Provide the tell() method but not seek() 2305class Tellable: 2306 def __init__(self, fp): 2307 self.fp = fp 2308 self.offset = 0 2309 2310 def write(self, data): 2311 n = self.fp.write(data) 2312 self.offset += n 2313 return n 2314 2315 def tell(self): 2316 return self.offset 2317 2318 def flush(self): 2319 self.fp.flush() 2320 2321class Unseekable: 2322 def __init__(self, fp): 2323 self.fp = fp 2324 2325 def write(self, data): 2326 return self.fp.write(data) 2327 2328 def flush(self): 2329 self.fp.flush() 2330 2331class UnseekableTests(unittest.TestCase): 2332 def test_writestr(self): 2333 for wrapper in (lambda f: f), Tellable, Unseekable: 2334 with self.subTest(wrapper=wrapper): 2335 f = io.BytesIO() 2336 f.write(b'abc') 2337 bf = io.BufferedWriter(f) 2338 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2339 zipfp.writestr('ones', b'111') 2340 zipfp.writestr('twos', b'222') 2341 self.assertEqual(f.getvalue()[:5], b'abcPK') 2342 with zipfile.ZipFile(f, mode='r') as zipf: 2343 with zipf.open('ones') as zopen: 2344 self.assertEqual(zopen.read(), b'111') 2345 with zipf.open('twos') as zopen: 2346 self.assertEqual(zopen.read(), b'222') 2347 2348 def test_write(self): 2349 for wrapper in (lambda f: f), Tellable, Unseekable: 2350 with self.subTest(wrapper=wrapper): 2351 f = io.BytesIO() 2352 f.write(b'abc') 2353 bf = io.BufferedWriter(f) 2354 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2355 self.addCleanup(unlink, TESTFN) 2356 with open(TESTFN, 'wb') as f2: 2357 f2.write(b'111') 2358 zipfp.write(TESTFN, 'ones') 2359 with open(TESTFN, 'wb') as f2: 2360 f2.write(b'222') 2361 zipfp.write(TESTFN, 'twos') 2362 self.assertEqual(f.getvalue()[:5], b'abcPK') 2363 with zipfile.ZipFile(f, mode='r') as zipf: 2364 with zipf.open('ones') as zopen: 2365 self.assertEqual(zopen.read(), b'111') 2366 with zipf.open('twos') as zopen: 2367 self.assertEqual(zopen.read(), b'222') 2368 2369 def test_open_write(self): 2370 for wrapper in (lambda f: f), Tellable, Unseekable: 2371 with self.subTest(wrapper=wrapper): 2372 f = io.BytesIO() 2373 f.write(b'abc') 2374 bf = io.BufferedWriter(f) 2375 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2376 with zipf.open('ones', 'w') as zopen: 2377 zopen.write(b'111') 2378 with zipf.open('twos', 'w') as zopen: 2379 zopen.write(b'222') 2380 self.assertEqual(f.getvalue()[:5], b'abcPK') 2381 with zipfile.ZipFile(f) as zipf: 2382 self.assertEqual(zipf.read('ones'), b'111') 2383 self.assertEqual(zipf.read('twos'), b'222') 2384 2385 2386@requires_zlib 2387class TestsWithMultipleOpens(unittest.TestCase): 2388 @classmethod 2389 def setUpClass(cls): 2390 cls.data1 = b'111' + getrandbytes(10000) 2391 cls.data2 = b'222' + getrandbytes(10000) 2392 2393 def make_test_archive(self, f): 2394 # Create the ZIP archive 2395 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2396 zipfp.writestr('ones', self.data1) 2397 zipfp.writestr('twos', self.data2) 2398 2399 def test_same_file(self): 2400 # Verify that (when the ZipFile is in control of creating file objects) 2401 # multiple open() calls can be made without interfering with each other. 2402 for f in get_files(self): 2403 self.make_test_archive(f) 2404 with zipfile.ZipFile(f, mode="r") as zipf: 2405 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2406 data1 = zopen1.read(500) 2407 data2 = zopen2.read(500) 2408 data1 += zopen1.read() 2409 data2 += zopen2.read() 2410 self.assertEqual(data1, data2) 2411 self.assertEqual(data1, self.data1) 2412 2413 def test_different_file(self): 2414 # Verify that (when the ZipFile is in control of creating file objects) 2415 # multiple open() calls can be made without interfering with each other. 2416 for f in get_files(self): 2417 self.make_test_archive(f) 2418 with zipfile.ZipFile(f, mode="r") as zipf: 2419 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2420 data1 = zopen1.read(500) 2421 data2 = zopen2.read(500) 2422 data1 += zopen1.read() 2423 data2 += zopen2.read() 2424 self.assertEqual(data1, self.data1) 2425 self.assertEqual(data2, self.data2) 2426 2427 def test_interleaved(self): 2428 # Verify that (when the ZipFile is in control of creating file objects) 2429 # multiple open() calls can be made without interfering with each other. 2430 for f in get_files(self): 2431 self.make_test_archive(f) 2432 with zipfile.ZipFile(f, mode="r") as zipf: 2433 with zipf.open('ones') as zopen1: 2434 data1 = zopen1.read(500) 2435 with zipf.open('twos') as zopen2: 2436 data2 = zopen2.read(500) 2437 data1 += zopen1.read() 2438 data2 += zopen2.read() 2439 self.assertEqual(data1, self.data1) 2440 self.assertEqual(data2, self.data2) 2441 2442 def test_read_after_close(self): 2443 for f in get_files(self): 2444 self.make_test_archive(f) 2445 with contextlib.ExitStack() as stack: 2446 with zipfile.ZipFile(f, 'r') as zipf: 2447 zopen1 = stack.enter_context(zipf.open('ones')) 2448 zopen2 = stack.enter_context(zipf.open('twos')) 2449 data1 = zopen1.read(500) 2450 data2 = zopen2.read(500) 2451 data1 += zopen1.read() 2452 data2 += zopen2.read() 2453 self.assertEqual(data1, self.data1) 2454 self.assertEqual(data2, self.data2) 2455 2456 def test_read_after_write(self): 2457 for f in get_files(self): 2458 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2459 zipf.writestr('ones', self.data1) 2460 zipf.writestr('twos', self.data2) 2461 with zipf.open('ones') as zopen1: 2462 data1 = zopen1.read(500) 2463 self.assertEqual(data1, self.data1[:500]) 2464 with zipfile.ZipFile(f, 'r') as zipf: 2465 data1 = zipf.read('ones') 2466 data2 = zipf.read('twos') 2467 self.assertEqual(data1, self.data1) 2468 self.assertEqual(data2, self.data2) 2469 2470 def test_write_after_read(self): 2471 for f in get_files(self): 2472 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2473 zipf.writestr('ones', self.data1) 2474 with zipf.open('ones') as zopen1: 2475 zopen1.read(500) 2476 zipf.writestr('twos', self.data2) 2477 with zipfile.ZipFile(f, 'r') as zipf: 2478 data1 = zipf.read('ones') 2479 data2 = zipf.read('twos') 2480 self.assertEqual(data1, self.data1) 2481 self.assertEqual(data2, self.data2) 2482 2483 def test_many_opens(self): 2484 # Verify that read() and open() promptly close the file descriptor, 2485 # and don't rely on the garbage collector to free resources. 2486 self.make_test_archive(TESTFN2) 2487 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2488 for x in range(100): 2489 zipf.read('ones') 2490 with zipf.open('ones') as zopen1: 2491 pass 2492 with open(os.devnull) as f: 2493 self.assertLess(f.fileno(), 100) 2494 2495 def test_write_while_reading(self): 2496 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2497 zipf.writestr('ones', self.data1) 2498 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2499 with zipf.open('ones', 'r') as r1: 2500 data1 = r1.read(500) 2501 with zipf.open('twos', 'w') as w1: 2502 w1.write(self.data2) 2503 data1 += r1.read() 2504 self.assertEqual(data1, self.data1) 2505 with zipfile.ZipFile(TESTFN2) as zipf: 2506 self.assertEqual(zipf.read('twos'), self.data2) 2507 2508 def tearDown(self): 2509 unlink(TESTFN2) 2510 2511 2512class TestWithDirectory(unittest.TestCase): 2513 def setUp(self): 2514 os.mkdir(TESTFN2) 2515 2516 def test_extract_dir(self): 2517 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2518 zipf.extractall(TESTFN2) 2519 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2520 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2521 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2522 2523 def test_bug_6050(self): 2524 # Extraction should succeed if directories already exist 2525 os.mkdir(os.path.join(TESTFN2, "a")) 2526 self.test_extract_dir() 2527 2528 def test_write_dir(self): 2529 dirpath = os.path.join(TESTFN2, "x") 2530 os.mkdir(dirpath) 2531 mode = os.stat(dirpath).st_mode & 0xFFFF 2532 with zipfile.ZipFile(TESTFN, "w") as zipf: 2533 zipf.write(dirpath) 2534 zinfo = zipf.filelist[0] 2535 self.assertTrue(zinfo.filename.endswith("/x/")) 2536 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2537 zipf.write(dirpath, "y") 2538 zinfo = zipf.filelist[1] 2539 self.assertTrue(zinfo.filename, "y/") 2540 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2541 with zipfile.ZipFile(TESTFN, "r") as zipf: 2542 zinfo = zipf.filelist[0] 2543 self.assertTrue(zinfo.filename.endswith("/x/")) 2544 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2545 zinfo = zipf.filelist[1] 2546 self.assertTrue(zinfo.filename, "y/") 2547 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2548 target = os.path.join(TESTFN2, "target") 2549 os.mkdir(target) 2550 zipf.extractall(target) 2551 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2552 self.assertEqual(len(os.listdir(target)), 2) 2553 2554 def test_writestr_dir(self): 2555 os.mkdir(os.path.join(TESTFN2, "x")) 2556 with zipfile.ZipFile(TESTFN, "w") as zipf: 2557 zipf.writestr("x/", b'') 2558 zinfo = zipf.filelist[0] 2559 self.assertEqual(zinfo.filename, "x/") 2560 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2561 with zipfile.ZipFile(TESTFN, "r") as zipf: 2562 zinfo = zipf.filelist[0] 2563 self.assertTrue(zinfo.filename.endswith("x/")) 2564 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2565 target = os.path.join(TESTFN2, "target") 2566 os.mkdir(target) 2567 zipf.extractall(target) 2568 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2569 self.assertEqual(os.listdir(target), ["x"]) 2570 2571 def tearDown(self): 2572 rmtree(TESTFN2) 2573 if os.path.exists(TESTFN): 2574 unlink(TESTFN) 2575 2576 2577class ZipInfoTests(unittest.TestCase): 2578 def test_from_file(self): 2579 zi = zipfile.ZipInfo.from_file(__file__) 2580 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2581 self.assertFalse(zi.is_dir()) 2582 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2583 2584 def test_from_file_pathlike(self): 2585 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2586 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2587 self.assertFalse(zi.is_dir()) 2588 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2589 2590 def test_from_file_bytes(self): 2591 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2592 self.assertEqual(posixpath.basename(zi.filename), 'test') 2593 self.assertFalse(zi.is_dir()) 2594 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2595 2596 def test_from_file_fileno(self): 2597 with open(__file__, 'rb') as f: 2598 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2599 self.assertEqual(posixpath.basename(zi.filename), 'test') 2600 self.assertFalse(zi.is_dir()) 2601 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2602 2603 def test_from_dir(self): 2604 dirpath = os.path.dirname(os.path.abspath(__file__)) 2605 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2606 self.assertEqual(zi.filename, 'stdlib_tests/') 2607 self.assertTrue(zi.is_dir()) 2608 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2609 self.assertEqual(zi.file_size, 0) 2610 2611 2612class CommandLineTest(unittest.TestCase): 2613 2614 def zipfilecmd(self, *args, **kwargs): 2615 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2616 **kwargs) 2617 return out.replace(os.linesep.encode(), b'\n') 2618 2619 def zipfilecmd_failure(self, *args): 2620 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2621 2622 def test_bad_use(self): 2623 rc, out, err = self.zipfilecmd_failure() 2624 self.assertEqual(out, b'') 2625 self.assertIn(b'usage', err.lower()) 2626 self.assertIn(b'error', err.lower()) 2627 self.assertIn(b'required', err.lower()) 2628 rc, out, err = self.zipfilecmd_failure('-l', '') 2629 self.assertEqual(out, b'') 2630 self.assertNotEqual(err.strip(), b'') 2631 2632 def test_test_command(self): 2633 zip_name = findfile('zipdir.zip') 2634 for opt in '-t', '--test': 2635 out = self.zipfilecmd(opt, zip_name) 2636 self.assertEqual(out.rstrip(), b'Done testing') 2637 zip_name = findfile('testtar.tar') 2638 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2639 self.assertEqual(out, b'') 2640 2641 def test_list_command(self): 2642 zip_name = findfile('zipdir.zip') 2643 t = io.StringIO() 2644 with zipfile.ZipFile(zip_name, 'r') as tf: 2645 tf.printdir(t) 2646 expected = t.getvalue().encode('ascii', 'backslashreplace') 2647 for opt in '-l', '--list': 2648 out = self.zipfilecmd(opt, zip_name, 2649 PYTHONIOENCODING='ascii:backslashreplace') 2650 self.assertEqual(out, expected) 2651 2652 @requires_zlib 2653 def test_create_command(self): 2654 self.addCleanup(unlink, TESTFN) 2655 with open(TESTFN, 'w') as f: 2656 f.write('test 1') 2657 os.mkdir(TESTFNDIR) 2658 self.addCleanup(rmtree, TESTFNDIR) 2659 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f: 2660 f.write('test 2') 2661 files = [TESTFN, TESTFNDIR] 2662 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2663 for opt in '-c', '--create': 2664 try: 2665 out = self.zipfilecmd(opt, TESTFN2, *files) 2666 self.assertEqual(out, b'') 2667 with zipfile.ZipFile(TESTFN2) as zf: 2668 self.assertEqual(zf.namelist(), namelist) 2669 self.assertEqual(zf.read(namelist[0]), b'test 1') 2670 self.assertEqual(zf.read(namelist[2]), b'test 2') 2671 finally: 2672 unlink(TESTFN2) 2673 2674 def test_extract_command(self): 2675 zip_name = findfile('zipdir.zip') 2676 for opt in '-e', '--extract': 2677 with temp_dir() as extdir: 2678 out = self.zipfilecmd(opt, zip_name, extdir) 2679 self.assertEqual(out, b'') 2680 with zipfile.ZipFile(zip_name) as zf: 2681 for zi in zf.infolist(): 2682 path = os.path.join(extdir, 2683 zi.filename.replace('/', os.sep)) 2684 if zi.is_dir(): 2685 self.assertTrue(os.path.isdir(path)) 2686 else: 2687 self.assertTrue(os.path.isfile(path)) 2688 with open(path, 'rb') as f: 2689 self.assertEqual(f.read(), zf.read(zi)) 2690 2691 2692class TestExecutablePrependedZip(unittest.TestCase): 2693 """Test our ability to open zip files with an executable prepended.""" 2694 2695 def setUp(self): 2696 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2697 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2698 2699 def _test_zip_works(self, name): 2700 # bpo28494 sanity check: ensure is_zipfile works on these. 2701 self.assertTrue(zipfile.is_zipfile(name), 2702 f'is_zipfile failed on {name}') 2703 # Ensure we can operate on these via ZipFile. 2704 with zipfile.ZipFile(name) as zipfp: 2705 for n in zipfp.namelist(): 2706 data = zipfp.read(n) 2707 self.assertIn(b'FAVORITE_NUMBER', data) 2708 2709 def test_read_zip_with_exe_prepended(self): 2710 self._test_zip_works(self.exe_zip) 2711 2712 def test_read_zip64_with_exe_prepended(self): 2713 self._test_zip_works(self.exe_zip64) 2714 2715 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2716 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2717 'Test relies on #!/bin/bash working.') 2718 def test_execute_zip2(self): 2719 output = subprocess.check_output([self.exe_zip, sys.executable]) 2720 self.assertIn(b'number in executable: 5', output) 2721 2722 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2723 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2724 'Test relies on #!/bin/bash working.') 2725 def test_execute_zip64(self): 2726 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2727 self.assertIn(b'number in executable: 5', output) 2728 2729 2730# Poor man's technique to consume a (smallish) iterable. 2731consume = tuple 2732 2733 2734# from jaraco.itertools 5.0 2735class jaraco: 2736 class itertools: 2737 class Counter: 2738 def __init__(self, i): 2739 self.count = 0 2740 self._orig_iter = iter(i) 2741 2742 def __iter__(self): 2743 return self 2744 2745 def __next__(self): 2746 result = next(self._orig_iter) 2747 self.count += 1 2748 return result 2749 2750 2751def add_dirs(zf): 2752 """ 2753 Given a writable zip file zf, inject directory entries for 2754 any directories implied by the presence of children. 2755 """ 2756 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2757 zf.writestr(name, b"") 2758 return zf 2759 2760 2761def build_alpharep_fixture(): 2762 """ 2763 Create a zip file with this structure: 2764 2765 . 2766 ├── a.txt 2767 ├── b 2768 │ ├── c.txt 2769 │ ├── d 2770 │ │ └── e.txt 2771 │ └── f.txt 2772 └── g 2773 └── h 2774 └── i.txt 2775 2776 This fixture has the following key characteristics: 2777 2778 - a file at the root (a) 2779 - a file two levels deep (b/d/e) 2780 - multiple files in a directory (b/c, b/f) 2781 - a directory containing only a directory (g/h) 2782 2783 "alpha" because it uses alphabet 2784 "rep" because it's a representative example 2785 """ 2786 data = io.BytesIO() 2787 zf = zipfile.ZipFile(data, "w") 2788 zf.writestr("a.txt", b"content of a") 2789 zf.writestr("b/c.txt", b"content of c") 2790 zf.writestr("b/d/e.txt", b"content of e") 2791 zf.writestr("b/f.txt", b"content of f") 2792 zf.writestr("g/h/i.txt", b"content of i") 2793 zf.filename = "alpharep.zip" 2794 return zf 2795 2796 2797class TestPath(unittest.TestCase): 2798 def setUp(self): 2799 self.fixtures = contextlib.ExitStack() 2800 self.addCleanup(self.fixtures.close) 2801 2802 def zipfile_alpharep(self): 2803 with self.subTest(): 2804 yield build_alpharep_fixture() 2805 with self.subTest(): 2806 yield add_dirs(build_alpharep_fixture()) 2807 2808 def zipfile_ondisk(self): 2809 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2810 for alpharep in self.zipfile_alpharep(): 2811 buffer = alpharep.fp 2812 alpharep.close() 2813 path = tmpdir / alpharep.filename 2814 with path.open("wb") as strm: 2815 strm.write(buffer.getvalue()) 2816 yield path 2817 2818 def test_iterdir_and_types(self): 2819 for alpharep in self.zipfile_alpharep(): 2820 root = zipfile.Path(alpharep) 2821 assert root.is_dir() 2822 a, b, g = root.iterdir() 2823 assert a.is_file() 2824 assert b.is_dir() 2825 assert g.is_dir() 2826 c, f, d = b.iterdir() 2827 assert c.is_file() and f.is_file() 2828 e, = d.iterdir() 2829 assert e.is_file() 2830 h, = g.iterdir() 2831 i, = h.iterdir() 2832 assert i.is_file() 2833 2834 def test_subdir_is_dir(self): 2835 for alpharep in self.zipfile_alpharep(): 2836 root = zipfile.Path(alpharep) 2837 assert (root / 'b').is_dir() 2838 assert (root / 'b/').is_dir() 2839 assert (root / 'g').is_dir() 2840 assert (root / 'g/').is_dir() 2841 2842 def test_open(self): 2843 for alpharep in self.zipfile_alpharep(): 2844 root = zipfile.Path(alpharep) 2845 a, b, g = root.iterdir() 2846 with a.open() as strm: 2847 data = strm.read() 2848 assert data == b"content of a" 2849 2850 def test_read(self): 2851 for alpharep in self.zipfile_alpharep(): 2852 root = zipfile.Path(alpharep) 2853 a, b, g = root.iterdir() 2854 assert a.read_text() == "content of a" 2855 assert a.read_bytes() == b"content of a" 2856 2857 def test_joinpath(self): 2858 for alpharep in self.zipfile_alpharep(): 2859 root = zipfile.Path(alpharep) 2860 a = root.joinpath("a") 2861 assert a.is_file() 2862 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 2863 assert e.read_text() == "content of e" 2864 2865 def test_traverse_truediv(self): 2866 for alpharep in self.zipfile_alpharep(): 2867 root = zipfile.Path(alpharep) 2868 a = root / "a" 2869 assert a.is_file() 2870 e = root / "b" / "d" / "e.txt" 2871 assert e.read_text() == "content of e" 2872 2873 def test_pathlike_construction(self): 2874 """ 2875 zipfile.Path should be constructable from a path-like object 2876 """ 2877 for zipfile_ondisk in self.zipfile_ondisk(): 2878 pathlike = pathlib.Path(str(zipfile_ondisk)) 2879 zipfile.Path(pathlike) 2880 2881 def test_traverse_pathlike(self): 2882 for alpharep in self.zipfile_alpharep(): 2883 root = zipfile.Path(alpharep) 2884 root / pathlib.Path("a") 2885 2886 def test_parent(self): 2887 for alpharep in self.zipfile_alpharep(): 2888 root = zipfile.Path(alpharep) 2889 assert (root / 'a').parent.at == '' 2890 assert (root / 'a' / 'b').parent.at == 'a/' 2891 2892 def test_dir_parent(self): 2893 for alpharep in self.zipfile_alpharep(): 2894 root = zipfile.Path(alpharep) 2895 assert (root / 'b').parent.at == '' 2896 assert (root / 'b/').parent.at == '' 2897 2898 def test_missing_dir_parent(self): 2899 for alpharep in self.zipfile_alpharep(): 2900 root = zipfile.Path(alpharep) 2901 assert (root / 'missing dir/').parent.at == '' 2902 2903 def test_mutability(self): 2904 """ 2905 If the underlying zipfile is changed, the Path object should 2906 reflect that change. 2907 """ 2908 for alpharep in self.zipfile_alpharep(): 2909 root = zipfile.Path(alpharep) 2910 a, b, g = root.iterdir() 2911 alpharep.writestr('foo.txt', 'foo') 2912 alpharep.writestr('bar/baz.txt', 'baz') 2913 assert any( 2914 child.name == 'foo.txt' 2915 for child in root.iterdir()) 2916 assert (root / 'foo.txt').read_text() == 'foo' 2917 baz, = (root / 'bar').iterdir() 2918 assert baz.read_text() == 'baz' 2919 2920 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 2921 2922 def huge_zipfile(self): 2923 """Create a read-only zipfile with a huge number of entries entries.""" 2924 strm = io.BytesIO() 2925 zf = zipfile.ZipFile(strm, "w") 2926 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 2927 zf.writestr(entry, entry) 2928 zf.mode = 'r' 2929 return zf 2930 2931 def test_joinpath_constant_time(self): 2932 """ 2933 Ensure joinpath on items in zipfile is linear time. 2934 """ 2935 root = zipfile.Path(self.huge_zipfile()) 2936 entries = jaraco.itertools.Counter(root.iterdir()) 2937 for entry in entries: 2938 entry.joinpath('suffix') 2939 # Check the file iterated all items 2940 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 2941 2942 # @func_timeout.func_set_timeout(3) 2943 def test_implied_dirs_performance(self): 2944 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 2945 zipfile.CompleteDirs._implied_dirs(data) 2946 2947 2948if __name__ == "__main__": 2949 unittest.main() 2950