1import contextlib 2import importlib.util 3import io 4import itertools 5import os 6import pathlib 7import posixpath 8import string 9import struct 10import subprocess 11import sys 12import time 13import unittest 14import unittest.mock as mock 15import zipfile 16 17 18from tempfile import TemporaryFile 19from random import randint, random, randbytes 20 21from test.support import script_helper 22from test.support import (TESTFN, findfile, unlink, rmtree, temp_dir, temp_cwd, 23 requires_zlib, requires_bz2, requires_lzma, 24 captured_stdout) 25 26TESTFN2 = TESTFN + "2" 27TESTFNDIR = TESTFN + "d" 28FIXEDTEST_SIZE = 1000 29DATAFILES_DIR = 'zipfile_datafiles' 30 31SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 32 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 33 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 34 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 35 36def get_files(test): 37 yield TESTFN2 38 with TemporaryFile() as f: 39 yield f 40 test.assertFalse(f.closed) 41 with io.BytesIO() as f: 42 yield f 43 test.assertFalse(f.closed) 44 45class AbstractTestsWithSourceFile: 46 @classmethod 47 def setUpClass(cls): 48 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 49 (i, random()), "ascii") 50 for i in range(FIXEDTEST_SIZE)] 51 cls.data = b''.join(cls.line_gen) 52 53 def setUp(self): 54 # Make a source file with some lines 55 with open(TESTFN, "wb") as fp: 56 fp.write(self.data) 57 58 def make_test_archive(self, f, compression, compresslevel=None): 59 kwargs = {'compression': compression, 'compresslevel': compresslevel} 60 # Create the ZIP archive 61 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 62 zipfp.write(TESTFN, "another.name") 63 zipfp.write(TESTFN, TESTFN) 64 zipfp.writestr("strfile", self.data) 65 with zipfp.open('written-open-w', mode='w') as f: 66 for line in self.line_gen: 67 f.write(line) 68 69 def zip_test(self, f, compression, compresslevel=None): 70 self.make_test_archive(f, compression, compresslevel) 71 72 # Read the ZIP archive 73 with zipfile.ZipFile(f, "r", compression) as zipfp: 74 self.assertEqual(zipfp.read(TESTFN), self.data) 75 self.assertEqual(zipfp.read("another.name"), self.data) 76 self.assertEqual(zipfp.read("strfile"), self.data) 77 78 # Print the ZIP directory 79 fp = io.StringIO() 80 zipfp.printdir(file=fp) 81 directory = fp.getvalue() 82 lines = directory.splitlines() 83 self.assertEqual(len(lines), 5) # Number of files + header 84 85 self.assertIn('File Name', lines[0]) 86 self.assertIn('Modified', lines[0]) 87 self.assertIn('Size', lines[0]) 88 89 fn, date, time_, size = lines[1].split() 90 self.assertEqual(fn, 'another.name') 91 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 92 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 93 self.assertEqual(size, str(len(self.data))) 94 95 # Check the namelist 96 names = zipfp.namelist() 97 self.assertEqual(len(names), 4) 98 self.assertIn(TESTFN, names) 99 self.assertIn("another.name", names) 100 self.assertIn("strfile", names) 101 self.assertIn("written-open-w", names) 102 103 # Check infolist 104 infos = zipfp.infolist() 105 names = [i.filename for i in infos] 106 self.assertEqual(len(names), 4) 107 self.assertIn(TESTFN, names) 108 self.assertIn("another.name", names) 109 self.assertIn("strfile", names) 110 self.assertIn("written-open-w", names) 111 for i in infos: 112 self.assertEqual(i.file_size, len(self.data)) 113 114 # check getinfo 115 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 116 info = zipfp.getinfo(nm) 117 self.assertEqual(info.filename, nm) 118 self.assertEqual(info.file_size, len(self.data)) 119 120 # Check that testzip doesn't raise an exception 121 zipfp.testzip() 122 123 def test_basic(self): 124 for f in get_files(self): 125 self.zip_test(f, self.compression) 126 127 def zip_open_test(self, f, compression): 128 self.make_test_archive(f, compression) 129 130 # Read the ZIP archive 131 with zipfile.ZipFile(f, "r", compression) as zipfp: 132 zipdata1 = [] 133 with zipfp.open(TESTFN) as zipopen1: 134 while True: 135 read_data = zipopen1.read(256) 136 if not read_data: 137 break 138 zipdata1.append(read_data) 139 140 zipdata2 = [] 141 with zipfp.open("another.name") as zipopen2: 142 while True: 143 read_data = zipopen2.read(256) 144 if not read_data: 145 break 146 zipdata2.append(read_data) 147 148 self.assertEqual(b''.join(zipdata1), self.data) 149 self.assertEqual(b''.join(zipdata2), self.data) 150 151 def test_open(self): 152 for f in get_files(self): 153 self.zip_open_test(f, self.compression) 154 155 def test_open_with_pathlike(self): 156 path = pathlib.Path(TESTFN2) 157 self.zip_open_test(path, self.compression) 158 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 159 self.assertIsInstance(zipfp.filename, str) 160 161 def zip_random_open_test(self, f, compression): 162 self.make_test_archive(f, compression) 163 164 # Read the ZIP archive 165 with zipfile.ZipFile(f, "r", compression) as zipfp: 166 zipdata1 = [] 167 with zipfp.open(TESTFN) as zipopen1: 168 while True: 169 read_data = zipopen1.read(randint(1, 1024)) 170 if not read_data: 171 break 172 zipdata1.append(read_data) 173 174 self.assertEqual(b''.join(zipdata1), self.data) 175 176 def test_random_open(self): 177 for f in get_files(self): 178 self.zip_random_open_test(f, self.compression) 179 180 def zip_read1_test(self, f, compression): 181 self.make_test_archive(f, compression) 182 183 # Read the ZIP archive 184 with zipfile.ZipFile(f, "r") as zipfp, \ 185 zipfp.open(TESTFN) as zipopen: 186 zipdata = [] 187 while True: 188 read_data = zipopen.read1(-1) 189 if not read_data: 190 break 191 zipdata.append(read_data) 192 193 self.assertEqual(b''.join(zipdata), self.data) 194 195 def test_read1(self): 196 for f in get_files(self): 197 self.zip_read1_test(f, self.compression) 198 199 def zip_read1_10_test(self, f, compression): 200 self.make_test_archive(f, compression) 201 202 # Read the ZIP archive 203 with zipfile.ZipFile(f, "r") as zipfp, \ 204 zipfp.open(TESTFN) as zipopen: 205 zipdata = [] 206 while True: 207 read_data = zipopen.read1(10) 208 self.assertLessEqual(len(read_data), 10) 209 if not read_data: 210 break 211 zipdata.append(read_data) 212 213 self.assertEqual(b''.join(zipdata), self.data) 214 215 def test_read1_10(self): 216 for f in get_files(self): 217 self.zip_read1_10_test(f, self.compression) 218 219 def zip_readline_read_test(self, f, compression): 220 self.make_test_archive(f, compression) 221 222 # Read the ZIP archive 223 with zipfile.ZipFile(f, "r") as zipfp, \ 224 zipfp.open(TESTFN) as zipopen: 225 data = b'' 226 while True: 227 read = zipopen.readline() 228 if not read: 229 break 230 data += read 231 232 read = zipopen.read(100) 233 if not read: 234 break 235 data += read 236 237 self.assertEqual(data, self.data) 238 239 def test_readline_read(self): 240 # Issue #7610: calls to readline() interleaved with calls to read(). 241 for f in get_files(self): 242 self.zip_readline_read_test(f, self.compression) 243 244 def zip_readline_test(self, f, compression): 245 self.make_test_archive(f, compression) 246 247 # Read the ZIP archive 248 with zipfile.ZipFile(f, "r") as zipfp: 249 with zipfp.open(TESTFN) as zipopen: 250 for line in self.line_gen: 251 linedata = zipopen.readline() 252 self.assertEqual(linedata, line) 253 254 def test_readline(self): 255 for f in get_files(self): 256 self.zip_readline_test(f, self.compression) 257 258 def zip_readlines_test(self, f, compression): 259 self.make_test_archive(f, compression) 260 261 # Read the ZIP archive 262 with zipfile.ZipFile(f, "r") as zipfp: 263 with zipfp.open(TESTFN) as zipopen: 264 ziplines = zipopen.readlines() 265 for line, zipline in zip(self.line_gen, ziplines): 266 self.assertEqual(zipline, line) 267 268 def test_readlines(self): 269 for f in get_files(self): 270 self.zip_readlines_test(f, self.compression) 271 272 def zip_iterlines_test(self, f, compression): 273 self.make_test_archive(f, compression) 274 275 # Read the ZIP archive 276 with zipfile.ZipFile(f, "r") as zipfp: 277 with zipfp.open(TESTFN) as zipopen: 278 for line, zipline in zip(self.line_gen, zipopen): 279 self.assertEqual(zipline, line) 280 281 def test_iterlines(self): 282 for f in get_files(self): 283 self.zip_iterlines_test(f, self.compression) 284 285 def test_low_compression(self): 286 """Check for cases where compressed data is larger than original.""" 287 # Create the ZIP archive 288 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 289 zipfp.writestr("strfile", '12') 290 291 # Get an open object for strfile 292 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 293 with zipfp.open("strfile") as openobj: 294 self.assertEqual(openobj.read(1), b'1') 295 self.assertEqual(openobj.read(1), b'2') 296 297 def test_writestr_compression(self): 298 zipfp = zipfile.ZipFile(TESTFN2, "w") 299 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 300 info = zipfp.getinfo('b.txt') 301 self.assertEqual(info.compress_type, self.compression) 302 303 def test_writestr_compresslevel(self): 304 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 305 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 306 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 307 compresslevel=2) 308 309 # Compression level follows the constructor. 310 a_info = zipfp.getinfo('a.txt') 311 self.assertEqual(a_info.compress_type, self.compression) 312 self.assertEqual(a_info._compresslevel, 1) 313 314 # Compression level is overridden. 315 b_info = zipfp.getinfo('b.txt') 316 self.assertEqual(b_info.compress_type, self.compression) 317 self.assertEqual(b_info._compresslevel, 2) 318 319 def test_read_return_size(self): 320 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 321 # than requested. 322 for test_size in (1, 4095, 4096, 4097, 16384): 323 file_size = test_size + 1 324 junk = randbytes(file_size) 325 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 326 zipf.writestr('foo', junk) 327 with zipf.open('foo', 'r') as fp: 328 buf = fp.read(test_size) 329 self.assertEqual(len(buf), test_size) 330 331 def test_truncated_zipfile(self): 332 fp = io.BytesIO() 333 with zipfile.ZipFile(fp, mode='w') as zipf: 334 zipf.writestr('strfile', self.data, compress_type=self.compression) 335 end_offset = fp.tell() 336 zipfiledata = fp.getvalue() 337 338 fp = io.BytesIO(zipfiledata) 339 with zipfile.ZipFile(fp) as zipf: 340 with zipf.open('strfile') as zipopen: 341 fp.truncate(end_offset - 20) 342 with self.assertRaises(EOFError): 343 zipopen.read() 344 345 fp = io.BytesIO(zipfiledata) 346 with zipfile.ZipFile(fp) as zipf: 347 with zipf.open('strfile') as zipopen: 348 fp.truncate(end_offset - 20) 349 with self.assertRaises(EOFError): 350 while zipopen.read(100): 351 pass 352 353 fp = io.BytesIO(zipfiledata) 354 with zipfile.ZipFile(fp) as zipf: 355 with zipf.open('strfile') as zipopen: 356 fp.truncate(end_offset - 20) 357 with self.assertRaises(EOFError): 358 while zipopen.read1(100): 359 pass 360 361 def test_repr(self): 362 fname = 'file.name' 363 for f in get_files(self): 364 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 365 zipfp.write(TESTFN, fname) 366 r = repr(zipfp) 367 self.assertIn("mode='w'", r) 368 369 with zipfile.ZipFile(f, 'r') as zipfp: 370 r = repr(zipfp) 371 if isinstance(f, str): 372 self.assertIn('filename=%r' % f, r) 373 else: 374 self.assertIn('file=%r' % f, r) 375 self.assertIn("mode='r'", r) 376 r = repr(zipfp.getinfo(fname)) 377 self.assertIn('filename=%r' % fname, r) 378 self.assertIn('filemode=', r) 379 self.assertIn('file_size=', r) 380 if self.compression != zipfile.ZIP_STORED: 381 self.assertIn('compress_type=', r) 382 self.assertIn('compress_size=', r) 383 with zipfp.open(fname) as zipopen: 384 r = repr(zipopen) 385 self.assertIn('name=%r' % fname, r) 386 self.assertIn("mode='r'", r) 387 if self.compression != zipfile.ZIP_STORED: 388 self.assertIn('compress_type=', r) 389 self.assertIn('[closed]', repr(zipopen)) 390 self.assertIn('[closed]', repr(zipfp)) 391 392 def test_compresslevel_basic(self): 393 for f in get_files(self): 394 self.zip_test(f, self.compression, compresslevel=9) 395 396 def test_per_file_compresslevel(self): 397 """Check that files within a Zip archive can have different 398 compression levels.""" 399 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 400 zipfp.write(TESTFN, 'compress_1') 401 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 402 one_info = zipfp.getinfo('compress_1') 403 nine_info = zipfp.getinfo('compress_9') 404 self.assertEqual(one_info._compresslevel, 1) 405 self.assertEqual(nine_info._compresslevel, 9) 406 407 def test_writing_errors(self): 408 class BrokenFile(io.BytesIO): 409 def write(self, data): 410 nonlocal count 411 if count is not None: 412 if count == stop: 413 raise OSError 414 count += 1 415 super().write(data) 416 417 stop = 0 418 while True: 419 testfile = BrokenFile() 420 count = None 421 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 422 with zipfp.open('file1', 'w') as f: 423 f.write(b'data1') 424 count = 0 425 try: 426 with zipfp.open('file2', 'w') as f: 427 f.write(b'data2') 428 except OSError: 429 stop += 1 430 else: 431 break 432 finally: 433 count = None 434 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 435 self.assertEqual(zipfp.namelist(), ['file1']) 436 self.assertEqual(zipfp.read('file1'), b'data1') 437 438 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 439 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 440 self.assertEqual(zipfp.read('file1'), b'data1') 441 self.assertEqual(zipfp.read('file2'), b'data2') 442 443 444 def tearDown(self): 445 unlink(TESTFN) 446 unlink(TESTFN2) 447 448 449class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 450 unittest.TestCase): 451 compression = zipfile.ZIP_STORED 452 test_low_compression = None 453 454 def zip_test_writestr_permissions(self, f, compression): 455 # Make sure that writestr and open(... mode='w') create files with 456 # mode 0600, when they are passed a name rather than a ZipInfo 457 # instance. 458 459 self.make_test_archive(f, compression) 460 with zipfile.ZipFile(f, "r") as zipfp: 461 zinfo = zipfp.getinfo('strfile') 462 self.assertEqual(zinfo.external_attr, 0o600 << 16) 463 464 zinfo2 = zipfp.getinfo('written-open-w') 465 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 466 467 def test_writestr_permissions(self): 468 for f in get_files(self): 469 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 470 471 def test_absolute_arcnames(self): 472 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 473 zipfp.write(TESTFN, "/absolute") 474 475 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 476 self.assertEqual(zipfp.namelist(), ["absolute"]) 477 478 def test_append_to_zip_file(self): 479 """Test appending to an existing zipfile.""" 480 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 481 zipfp.write(TESTFN, TESTFN) 482 483 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 484 zipfp.writestr("strfile", self.data) 485 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 486 487 def test_append_to_non_zip_file(self): 488 """Test appending to an existing file that is not a zipfile.""" 489 # NOTE: this test fails if len(d) < 22 because of the first 490 # line "fpin.seek(-22, 2)" in _EndRecData 491 data = b'I am not a ZipFile!'*10 492 with open(TESTFN2, 'wb') as f: 493 f.write(data) 494 495 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 496 zipfp.write(TESTFN, TESTFN) 497 498 with open(TESTFN2, 'rb') as f: 499 f.seek(len(data)) 500 with zipfile.ZipFile(f, "r") as zipfp: 501 self.assertEqual(zipfp.namelist(), [TESTFN]) 502 self.assertEqual(zipfp.read(TESTFN), self.data) 503 with open(TESTFN2, 'rb') as f: 504 self.assertEqual(f.read(len(data)), data) 505 zipfiledata = f.read() 506 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 507 self.assertEqual(zipfp.namelist(), [TESTFN]) 508 self.assertEqual(zipfp.read(TESTFN), self.data) 509 510 def test_read_concatenated_zip_file(self): 511 with io.BytesIO() as bio: 512 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 513 zipfp.write(TESTFN, TESTFN) 514 zipfiledata = bio.getvalue() 515 data = b'I am not a ZipFile!'*10 516 with open(TESTFN2, 'wb') as f: 517 f.write(data) 518 f.write(zipfiledata) 519 520 with zipfile.ZipFile(TESTFN2) as zipfp: 521 self.assertEqual(zipfp.namelist(), [TESTFN]) 522 self.assertEqual(zipfp.read(TESTFN), self.data) 523 524 def test_append_to_concatenated_zip_file(self): 525 with io.BytesIO() as bio: 526 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 527 zipfp.write(TESTFN, TESTFN) 528 zipfiledata = bio.getvalue() 529 data = b'I am not a ZipFile!'*1000000 530 with open(TESTFN2, 'wb') as f: 531 f.write(data) 532 f.write(zipfiledata) 533 534 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 535 self.assertEqual(zipfp.namelist(), [TESTFN]) 536 zipfp.writestr('strfile', self.data) 537 538 with open(TESTFN2, 'rb') as f: 539 self.assertEqual(f.read(len(data)), data) 540 zipfiledata = f.read() 541 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 542 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 543 self.assertEqual(zipfp.read(TESTFN), self.data) 544 self.assertEqual(zipfp.read('strfile'), self.data) 545 546 def test_ignores_newline_at_end(self): 547 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 548 zipfp.write(TESTFN, TESTFN) 549 with open(TESTFN2, 'a') as f: 550 f.write("\r\n\00\00\00") 551 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 552 self.assertIsInstance(zipfp, zipfile.ZipFile) 553 554 def test_ignores_stuff_appended_past_comments(self): 555 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 556 zipfp.comment = b"this is a comment" 557 zipfp.write(TESTFN, TESTFN) 558 with open(TESTFN2, 'a') as f: 559 f.write("abcdef\r\n") 560 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 561 self.assertIsInstance(zipfp, zipfile.ZipFile) 562 self.assertEqual(zipfp.comment, b"this is a comment") 563 564 def test_write_default_name(self): 565 """Check that calling ZipFile.write without arcname specified 566 produces the expected result.""" 567 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 568 zipfp.write(TESTFN) 569 with open(TESTFN, "rb") as f: 570 self.assertEqual(zipfp.read(TESTFN), f.read()) 571 572 def test_io_on_closed_zipextfile(self): 573 fname = "somefile.txt" 574 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 575 zipfp.writestr(fname, "bogus") 576 577 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 578 with zipfp.open(fname) as fid: 579 fid.close() 580 self.assertRaises(ValueError, fid.read) 581 self.assertRaises(ValueError, fid.seek, 0) 582 self.assertRaises(ValueError, fid.tell) 583 self.assertRaises(ValueError, fid.readable) 584 self.assertRaises(ValueError, fid.seekable) 585 586 def test_write_to_readonly(self): 587 """Check that trying to call write() on a readonly ZipFile object 588 raises a ValueError.""" 589 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 590 zipfp.writestr("somefile.txt", "bogus") 591 592 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 593 self.assertRaises(ValueError, zipfp.write, TESTFN) 594 595 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 596 with self.assertRaises(ValueError): 597 zipfp.open(TESTFN, mode='w') 598 599 def test_add_file_before_1980(self): 600 # Set atime and mtime to 1970-01-01 601 os.utime(TESTFN, (0, 0)) 602 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 603 self.assertRaises(ValueError, zipfp.write, TESTFN) 604 605 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 606 zipfp.write(TESTFN) 607 zinfo = zipfp.getinfo(TESTFN) 608 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 609 610 def test_add_file_after_2107(self): 611 # Set atime and mtime to 2108-12-30 612 ts = 4386268800 613 try: 614 time.localtime(ts) 615 except OverflowError: 616 self.skipTest(f'time.localtime({ts}) raises OverflowError') 617 try: 618 os.utime(TESTFN, (ts, ts)) 619 except OverflowError: 620 self.skipTest('Host fs cannot set timestamp to required value.') 621 622 mtime_ns = os.stat(TESTFN).st_mtime_ns 623 if mtime_ns != (4386268800 * 10**9): 624 # XFS filesystem is limited to 32-bit timestamp, but the syscall 625 # didn't fail. Moreover, there is a VFS bug which returns 626 # a cached timestamp which is different than the value on disk. 627 # 628 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 629 # 630 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 631 # https://bugs.python.org/issue39460#msg360952 632 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 633 634 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 635 self.assertRaises(struct.error, zipfp.write, TESTFN) 636 637 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 638 zipfp.write(TESTFN) 639 zinfo = zipfp.getinfo(TESTFN) 640 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 641 642 643@requires_zlib() 644class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 645 unittest.TestCase): 646 compression = zipfile.ZIP_DEFLATED 647 648 def test_per_file_compression(self): 649 """Check that files within a Zip archive can have different 650 compression options.""" 651 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 652 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 653 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 654 sinfo = zipfp.getinfo('storeme') 655 dinfo = zipfp.getinfo('deflateme') 656 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 657 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 658 659@requires_bz2() 660class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 661 unittest.TestCase): 662 compression = zipfile.ZIP_BZIP2 663 664@requires_lzma() 665class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 666 unittest.TestCase): 667 compression = zipfile.ZIP_LZMA 668 669 670class AbstractTestZip64InSmallFiles: 671 # These tests test the ZIP64 functionality without using large files, 672 # see test_zipfile64 for proper tests. 673 674 @classmethod 675 def setUpClass(cls): 676 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 677 for i in range(0, FIXEDTEST_SIZE)) 678 cls.data = b'\n'.join(line_gen) 679 680 def setUp(self): 681 self._limit = zipfile.ZIP64_LIMIT 682 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 683 zipfile.ZIP64_LIMIT = 1000 684 zipfile.ZIP_FILECOUNT_LIMIT = 9 685 686 # Make a source file with some lines 687 with open(TESTFN, "wb") as fp: 688 fp.write(self.data) 689 690 def zip_test(self, f, compression): 691 # Create the ZIP archive 692 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 693 zipfp.write(TESTFN, "another.name") 694 zipfp.write(TESTFN, TESTFN) 695 zipfp.writestr("strfile", self.data) 696 697 # Read the ZIP archive 698 with zipfile.ZipFile(f, "r", compression) as zipfp: 699 self.assertEqual(zipfp.read(TESTFN), self.data) 700 self.assertEqual(zipfp.read("another.name"), self.data) 701 self.assertEqual(zipfp.read("strfile"), self.data) 702 703 # Print the ZIP directory 704 fp = io.StringIO() 705 zipfp.printdir(fp) 706 707 directory = fp.getvalue() 708 lines = directory.splitlines() 709 self.assertEqual(len(lines), 4) # Number of files + header 710 711 self.assertIn('File Name', lines[0]) 712 self.assertIn('Modified', lines[0]) 713 self.assertIn('Size', lines[0]) 714 715 fn, date, time_, size = lines[1].split() 716 self.assertEqual(fn, 'another.name') 717 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 718 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 719 self.assertEqual(size, str(len(self.data))) 720 721 # Check the namelist 722 names = zipfp.namelist() 723 self.assertEqual(len(names), 3) 724 self.assertIn(TESTFN, names) 725 self.assertIn("another.name", names) 726 self.assertIn("strfile", names) 727 728 # Check infolist 729 infos = zipfp.infolist() 730 names = [i.filename for i in infos] 731 self.assertEqual(len(names), 3) 732 self.assertIn(TESTFN, names) 733 self.assertIn("another.name", names) 734 self.assertIn("strfile", names) 735 for i in infos: 736 self.assertEqual(i.file_size, len(self.data)) 737 738 # check getinfo 739 for nm in (TESTFN, "another.name", "strfile"): 740 info = zipfp.getinfo(nm) 741 self.assertEqual(info.filename, nm) 742 self.assertEqual(info.file_size, len(self.data)) 743 744 # Check that testzip doesn't raise an exception 745 zipfp.testzip() 746 747 def test_basic(self): 748 for f in get_files(self): 749 self.zip_test(f, self.compression) 750 751 def test_too_many_files(self): 752 # This test checks that more than 64k files can be added to an archive, 753 # and that the resulting archive can be read properly by ZipFile 754 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 755 allowZip64=True) 756 zipf.debug = 100 757 numfiles = 15 758 for i in range(numfiles): 759 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 760 self.assertEqual(len(zipf.namelist()), numfiles) 761 zipf.close() 762 763 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 764 self.assertEqual(len(zipf2.namelist()), numfiles) 765 for i in range(numfiles): 766 content = zipf2.read("foo%08d" % i).decode('ascii') 767 self.assertEqual(content, "%d" % (i**3 % 57)) 768 zipf2.close() 769 770 def test_too_many_files_append(self): 771 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 772 allowZip64=False) 773 zipf.debug = 100 774 numfiles = 9 775 for i in range(numfiles): 776 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 777 self.assertEqual(len(zipf.namelist()), numfiles) 778 with self.assertRaises(zipfile.LargeZipFile): 779 zipf.writestr("foo%08d" % numfiles, b'') 780 self.assertEqual(len(zipf.namelist()), numfiles) 781 zipf.close() 782 783 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 784 allowZip64=False) 785 zipf.debug = 100 786 self.assertEqual(len(zipf.namelist()), numfiles) 787 with self.assertRaises(zipfile.LargeZipFile): 788 zipf.writestr("foo%08d" % numfiles, b'') 789 self.assertEqual(len(zipf.namelist()), numfiles) 790 zipf.close() 791 792 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 793 allowZip64=True) 794 zipf.debug = 100 795 self.assertEqual(len(zipf.namelist()), numfiles) 796 numfiles2 = 15 797 for i in range(numfiles, numfiles2): 798 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 799 self.assertEqual(len(zipf.namelist()), numfiles2) 800 zipf.close() 801 802 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 803 self.assertEqual(len(zipf2.namelist()), numfiles2) 804 for i in range(numfiles2): 805 content = zipf2.read("foo%08d" % i).decode('ascii') 806 self.assertEqual(content, "%d" % (i**3 % 57)) 807 zipf2.close() 808 809 def tearDown(self): 810 zipfile.ZIP64_LIMIT = self._limit 811 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 812 unlink(TESTFN) 813 unlink(TESTFN2) 814 815 816class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 817 unittest.TestCase): 818 compression = zipfile.ZIP_STORED 819 820 def large_file_exception_test(self, f, compression): 821 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 822 self.assertRaises(zipfile.LargeZipFile, 823 zipfp.write, TESTFN, "another.name") 824 825 def large_file_exception_test2(self, f, compression): 826 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 827 self.assertRaises(zipfile.LargeZipFile, 828 zipfp.writestr, "another.name", self.data) 829 830 def test_large_file_exception(self): 831 for f in get_files(self): 832 self.large_file_exception_test(f, zipfile.ZIP_STORED) 833 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 834 835 def test_absolute_arcnames(self): 836 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 837 allowZip64=True) as zipfp: 838 zipfp.write(TESTFN, "/absolute") 839 840 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 841 self.assertEqual(zipfp.namelist(), ["absolute"]) 842 843 def test_append(self): 844 # Test that appending to the Zip64 archive doesn't change 845 # extra fields of existing entries. 846 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 847 zipfp.writestr("strfile", self.data) 848 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 849 zinfo = zipfp.getinfo("strfile") 850 extra = zinfo.extra 851 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 852 zipfp.writestr("strfile2", self.data) 853 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 854 zinfo = zipfp.getinfo("strfile") 855 self.assertEqual(zinfo.extra, extra) 856 857 def make_zip64_file( 858 self, file_size_64_set=False, file_size_extra=False, 859 compress_size_64_set=False, compress_size_extra=False, 860 header_offset_64_set=False, header_offset_extra=False, 861 ): 862 """Generate bytes sequence for a zip with (incomplete) zip64 data. 863 864 The actual values (not the zip 64 0xffffffff values) stored in the file 865 are: 866 file_size: 8 867 compress_size: 8 868 header_offset: 0 869 """ 870 actual_size = 8 871 actual_header_offset = 0 872 local_zip64_fields = [] 873 central_zip64_fields = [] 874 875 file_size = actual_size 876 if file_size_64_set: 877 file_size = 0xffffffff 878 if file_size_extra: 879 local_zip64_fields.append(actual_size) 880 central_zip64_fields.append(actual_size) 881 file_size = struct.pack("<L", file_size) 882 883 compress_size = actual_size 884 if compress_size_64_set: 885 compress_size = 0xffffffff 886 if compress_size_extra: 887 local_zip64_fields.append(actual_size) 888 central_zip64_fields.append(actual_size) 889 compress_size = struct.pack("<L", compress_size) 890 891 header_offset = actual_header_offset 892 if header_offset_64_set: 893 header_offset = 0xffffffff 894 if header_offset_extra: 895 central_zip64_fields.append(actual_header_offset) 896 header_offset = struct.pack("<L", header_offset) 897 898 local_extra = struct.pack( 899 '<HH' + 'Q'*len(local_zip64_fields), 900 0x0001, 901 8*len(local_zip64_fields), 902 *local_zip64_fields 903 ) 904 905 central_extra = struct.pack( 906 '<HH' + 'Q'*len(central_zip64_fields), 907 0x0001, 908 8*len(central_zip64_fields), 909 *central_zip64_fields 910 ) 911 912 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 913 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 914 915 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 916 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 917 918 filename = b"test.txt" 919 content = b"test1234" 920 filename_length = struct.pack("<H", len(filename)) 921 zip64_contents = ( 922 # Local file header 923 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 924 + compress_size 925 + file_size 926 + filename_length 927 + local_extra_length 928 + filename 929 + local_extra 930 + content 931 # Central directory: 932 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 933 + compress_size 934 + file_size 935 + filename_length 936 + central_extra_length 937 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 938 + header_offset 939 + filename 940 + central_extra 941 # Zip64 end of central directory 942 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 943 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 944 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 945 + central_dir_size 946 + offset_to_central_dir 947 # Zip64 end of central directory locator 948 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 949 + b"\x00\x00\x00" 950 # end of central directory 951 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 952 + b"\x00\x00\x00\x00" 953 ) 954 return zip64_contents 955 956 def test_bad_zip64_extra(self): 957 """Missing zip64 extra records raises an exception. 958 959 There are 4 fields that the zip64 format handles (the disk number is 960 not used in this module and so is ignored here). According to the zip 961 spec: 962 The order of the fields in the zip64 extended 963 information record is fixed, but the fields MUST 964 only appear if the corresponding Local or Central 965 directory record field is set to 0xFFFF or 0xFFFFFFFF. 966 967 If the zip64 extra content doesn't contain enough entries for the 968 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 969 This test mismatches the length of the zip64 extra field and the number 970 of fields set to indicate the presence of zip64 data. 971 """ 972 # zip64 file size present, no fields in extra, expecting one, equals 973 # missing file size. 974 missing_file_size_extra = self.make_zip64_file( 975 file_size_64_set=True, 976 ) 977 with self.assertRaises(zipfile.BadZipFile) as e: 978 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 979 self.assertIn('file size', str(e.exception).lower()) 980 981 # zip64 file size present, zip64 compress size present, one field in 982 # extra, expecting two, equals missing compress size. 983 missing_compress_size_extra = self.make_zip64_file( 984 file_size_64_set=True, 985 file_size_extra=True, 986 compress_size_64_set=True, 987 ) 988 with self.assertRaises(zipfile.BadZipFile) as e: 989 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 990 self.assertIn('compress size', str(e.exception).lower()) 991 992 # zip64 compress size present, no fields in extra, expecting one, 993 # equals missing compress size. 994 missing_compress_size_extra = self.make_zip64_file( 995 compress_size_64_set=True, 996 ) 997 with self.assertRaises(zipfile.BadZipFile) as e: 998 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 999 self.assertIn('compress size', str(e.exception).lower()) 1000 1001 # zip64 file size present, zip64 compress size present, zip64 header 1002 # offset present, two fields in extra, expecting three, equals missing 1003 # header offset 1004 missing_header_offset_extra = self.make_zip64_file( 1005 file_size_64_set=True, 1006 file_size_extra=True, 1007 compress_size_64_set=True, 1008 compress_size_extra=True, 1009 header_offset_64_set=True, 1010 ) 1011 with self.assertRaises(zipfile.BadZipFile) as e: 1012 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1013 self.assertIn('header offset', str(e.exception).lower()) 1014 1015 # zip64 compress size present, zip64 header offset present, one field 1016 # in extra, expecting two, equals missing header offset 1017 missing_header_offset_extra = self.make_zip64_file( 1018 file_size_64_set=False, 1019 compress_size_64_set=True, 1020 compress_size_extra=True, 1021 header_offset_64_set=True, 1022 ) 1023 with self.assertRaises(zipfile.BadZipFile) as e: 1024 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1025 self.assertIn('header offset', str(e.exception).lower()) 1026 1027 # zip64 file size present, zip64 header offset present, one field in 1028 # extra, expecting two, equals missing header offset 1029 missing_header_offset_extra = self.make_zip64_file( 1030 file_size_64_set=True, 1031 file_size_extra=True, 1032 compress_size_64_set=False, 1033 header_offset_64_set=True, 1034 ) 1035 with self.assertRaises(zipfile.BadZipFile) as e: 1036 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1037 self.assertIn('header offset', str(e.exception).lower()) 1038 1039 # zip64 header offset present, no fields in extra, expecting one, 1040 # equals missing header offset 1041 missing_header_offset_extra = self.make_zip64_file( 1042 file_size_64_set=False, 1043 compress_size_64_set=False, 1044 header_offset_64_set=True, 1045 ) 1046 with self.assertRaises(zipfile.BadZipFile) as e: 1047 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1048 self.assertIn('header offset', str(e.exception).lower()) 1049 1050 def test_generated_valid_zip64_extra(self): 1051 # These values are what is set in the make_zip64_file method. 1052 expected_file_size = 8 1053 expected_compress_size = 8 1054 expected_header_offset = 0 1055 expected_content = b"test1234" 1056 1057 # Loop through the various valid combinations of zip64 masks 1058 # present and extra fields present. 1059 params = ( 1060 {"file_size_64_set": True, "file_size_extra": True}, 1061 {"compress_size_64_set": True, "compress_size_extra": True}, 1062 {"header_offset_64_set": True, "header_offset_extra": True}, 1063 ) 1064 1065 for r in range(1, len(params) + 1): 1066 for combo in itertools.combinations(params, r): 1067 kwargs = {} 1068 for c in combo: 1069 kwargs.update(c) 1070 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1071 zinfo = zf.infolist()[0] 1072 self.assertEqual(zinfo.file_size, expected_file_size) 1073 self.assertEqual(zinfo.compress_size, expected_compress_size) 1074 self.assertEqual(zinfo.header_offset, expected_header_offset) 1075 self.assertEqual(zf.read(zinfo), expected_content) 1076 1077 1078@requires_zlib() 1079class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1080 unittest.TestCase): 1081 compression = zipfile.ZIP_DEFLATED 1082 1083@requires_bz2() 1084class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1085 unittest.TestCase): 1086 compression = zipfile.ZIP_BZIP2 1087 1088@requires_lzma() 1089class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1090 unittest.TestCase): 1091 compression = zipfile.ZIP_LZMA 1092 1093 1094class AbstractWriterTests: 1095 1096 def tearDown(self): 1097 unlink(TESTFN2) 1098 1099 def test_close_after_close(self): 1100 data = b'content' 1101 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1102 w = zipf.open('test', 'w') 1103 w.write(data) 1104 w.close() 1105 self.assertTrue(w.closed) 1106 w.close() 1107 self.assertTrue(w.closed) 1108 self.assertEqual(zipf.read('test'), data) 1109 1110 def test_write_after_close(self): 1111 data = b'content' 1112 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1113 w = zipf.open('test', 'w') 1114 w.write(data) 1115 w.close() 1116 self.assertTrue(w.closed) 1117 self.assertRaises(ValueError, w.write, b'') 1118 self.assertEqual(zipf.read('test'), data) 1119 1120class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1121 compression = zipfile.ZIP_STORED 1122 1123@requires_zlib() 1124class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1125 compression = zipfile.ZIP_DEFLATED 1126 1127@requires_bz2() 1128class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1129 compression = zipfile.ZIP_BZIP2 1130 1131@requires_lzma() 1132class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1133 compression = zipfile.ZIP_LZMA 1134 1135 1136class PyZipFileTests(unittest.TestCase): 1137 def assertCompiledIn(self, name, namelist): 1138 if name + 'o' not in namelist: 1139 self.assertIn(name + 'c', namelist) 1140 1141 def requiresWriteAccess(self, path): 1142 # effective_ids unavailable on windows 1143 if not os.access(path, os.W_OK, 1144 effective_ids=os.access in os.supports_effective_ids): 1145 self.skipTest('requires write access to the installed location') 1146 filename = os.path.join(path, 'test_zipfile.try') 1147 try: 1148 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1149 os.close(fd) 1150 except Exception: 1151 self.skipTest('requires write access to the installed location') 1152 unlink(filename) 1153 1154 def test_write_pyfile(self): 1155 self.requiresWriteAccess(os.path.dirname(__file__)) 1156 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1157 fn = __file__ 1158 if fn.endswith('.pyc'): 1159 path_split = fn.split(os.sep) 1160 if os.altsep is not None: 1161 path_split.extend(fn.split(os.altsep)) 1162 if '__pycache__' in path_split: 1163 fn = importlib.util.source_from_cache(fn) 1164 else: 1165 fn = fn[:-1] 1166 1167 zipfp.writepy(fn) 1168 1169 bn = os.path.basename(fn) 1170 self.assertNotIn(bn, zipfp.namelist()) 1171 self.assertCompiledIn(bn, zipfp.namelist()) 1172 1173 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1174 fn = __file__ 1175 if fn.endswith('.pyc'): 1176 fn = fn[:-1] 1177 1178 zipfp.writepy(fn, "testpackage") 1179 1180 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1181 self.assertNotIn(bn, zipfp.namelist()) 1182 self.assertCompiledIn(bn, zipfp.namelist()) 1183 1184 def test_write_python_package(self): 1185 import email 1186 packagedir = os.path.dirname(email.__file__) 1187 self.requiresWriteAccess(packagedir) 1188 1189 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1190 zipfp.writepy(packagedir) 1191 1192 # Check for a couple of modules at different levels of the 1193 # hierarchy 1194 names = zipfp.namelist() 1195 self.assertCompiledIn('email/__init__.py', names) 1196 self.assertCompiledIn('email/mime/text.py', names) 1197 1198 def test_write_filtered_python_package(self): 1199 import test 1200 packagedir = os.path.dirname(test.__file__) 1201 self.requiresWriteAccess(packagedir) 1202 1203 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1204 1205 # first make sure that the test folder gives error messages 1206 # (on the badsyntax_... files) 1207 with captured_stdout() as reportSIO: 1208 zipfp.writepy(packagedir) 1209 reportStr = reportSIO.getvalue() 1210 self.assertTrue('SyntaxError' in reportStr) 1211 1212 # then check that the filter works on the whole package 1213 with captured_stdout() as reportSIO: 1214 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1215 reportStr = reportSIO.getvalue() 1216 self.assertTrue('SyntaxError' not in reportStr) 1217 1218 # then check that the filter works on individual files 1219 def filter(path): 1220 return not os.path.basename(path).startswith("bad") 1221 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1222 zipfp.writepy(packagedir, filterfunc=filter) 1223 reportStr = reportSIO.getvalue() 1224 if reportStr: 1225 print(reportStr) 1226 self.assertTrue('SyntaxError' not in reportStr) 1227 1228 def test_write_with_optimization(self): 1229 import email 1230 packagedir = os.path.dirname(email.__file__) 1231 self.requiresWriteAccess(packagedir) 1232 optlevel = 1 if __debug__ else 0 1233 ext = '.pyc' 1234 1235 with TemporaryFile() as t, \ 1236 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1237 zipfp.writepy(packagedir) 1238 1239 names = zipfp.namelist() 1240 self.assertIn('email/__init__' + ext, names) 1241 self.assertIn('email/mime/text' + ext, names) 1242 1243 def test_write_python_directory(self): 1244 os.mkdir(TESTFN2) 1245 try: 1246 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1247 fp.write("print(42)\n") 1248 1249 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1250 fp.write("print(42 * 42)\n") 1251 1252 with open(os.path.join(TESTFN2, "mod2.txt"), "w") as fp: 1253 fp.write("bla bla bla\n") 1254 1255 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1256 zipfp.writepy(TESTFN2) 1257 1258 names = zipfp.namelist() 1259 self.assertCompiledIn('mod1.py', names) 1260 self.assertCompiledIn('mod2.py', names) 1261 self.assertNotIn('mod2.txt', names) 1262 1263 finally: 1264 rmtree(TESTFN2) 1265 1266 def test_write_python_directory_filtered(self): 1267 os.mkdir(TESTFN2) 1268 try: 1269 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1270 fp.write("print(42)\n") 1271 1272 with open(os.path.join(TESTFN2, "mod2.py"), "w") as fp: 1273 fp.write("print(42 * 42)\n") 1274 1275 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1276 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1277 not fn.endswith('mod2.py')) 1278 1279 names = zipfp.namelist() 1280 self.assertCompiledIn('mod1.py', names) 1281 self.assertNotIn('mod2.py', names) 1282 1283 finally: 1284 rmtree(TESTFN2) 1285 1286 def test_write_non_pyfile(self): 1287 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1288 with open(TESTFN, 'w') as f: 1289 f.write('most definitely not a python file') 1290 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1291 unlink(TESTFN) 1292 1293 def test_write_pyfile_bad_syntax(self): 1294 os.mkdir(TESTFN2) 1295 try: 1296 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1297 fp.write("Bad syntax in python file\n") 1298 1299 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1300 # syntax errors are printed to stdout 1301 with captured_stdout() as s: 1302 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1303 1304 self.assertIn("SyntaxError", s.getvalue()) 1305 1306 # as it will not have compiled the python file, it will 1307 # include the .py file not .pyc 1308 names = zipfp.namelist() 1309 self.assertIn('mod1.py', names) 1310 self.assertNotIn('mod1.pyc', names) 1311 1312 finally: 1313 rmtree(TESTFN2) 1314 1315 def test_write_pathlike(self): 1316 os.mkdir(TESTFN2) 1317 try: 1318 with open(os.path.join(TESTFN2, "mod1.py"), "w") as fp: 1319 fp.write("print(42)\n") 1320 1321 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1322 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1323 names = zipfp.namelist() 1324 self.assertCompiledIn('mod1.py', names) 1325 finally: 1326 rmtree(TESTFN2) 1327 1328 1329class ExtractTests(unittest.TestCase): 1330 1331 def make_test_file(self): 1332 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1333 for fpath, fdata in SMALL_TEST_DATA: 1334 zipfp.writestr(fpath, fdata) 1335 1336 def test_extract(self): 1337 with temp_cwd(): 1338 self.make_test_file() 1339 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1340 for fpath, fdata in SMALL_TEST_DATA: 1341 writtenfile = zipfp.extract(fpath) 1342 1343 # make sure it was written to the right place 1344 correctfile = os.path.join(os.getcwd(), fpath) 1345 correctfile = os.path.normpath(correctfile) 1346 1347 self.assertEqual(writtenfile, correctfile) 1348 1349 # make sure correct data is in correct file 1350 with open(writtenfile, "rb") as f: 1351 self.assertEqual(fdata.encode(), f.read()) 1352 1353 unlink(writtenfile) 1354 1355 def _test_extract_with_target(self, target): 1356 self.make_test_file() 1357 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1358 for fpath, fdata in SMALL_TEST_DATA: 1359 writtenfile = zipfp.extract(fpath, target) 1360 1361 # make sure it was written to the right place 1362 correctfile = os.path.join(target, fpath) 1363 correctfile = os.path.normpath(correctfile) 1364 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1365 1366 # make sure correct data is in correct file 1367 with open(writtenfile, "rb") as f: 1368 self.assertEqual(fdata.encode(), f.read()) 1369 1370 unlink(writtenfile) 1371 1372 unlink(TESTFN2) 1373 1374 def test_extract_with_target(self): 1375 with temp_dir() as extdir: 1376 self._test_extract_with_target(extdir) 1377 1378 def test_extract_with_target_pathlike(self): 1379 with temp_dir() as extdir: 1380 self._test_extract_with_target(pathlib.Path(extdir)) 1381 1382 def test_extract_all(self): 1383 with temp_cwd(): 1384 self.make_test_file() 1385 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1386 zipfp.extractall() 1387 for fpath, fdata in SMALL_TEST_DATA: 1388 outfile = os.path.join(os.getcwd(), fpath) 1389 1390 with open(outfile, "rb") as f: 1391 self.assertEqual(fdata.encode(), f.read()) 1392 1393 unlink(outfile) 1394 1395 def _test_extract_all_with_target(self, target): 1396 self.make_test_file() 1397 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1398 zipfp.extractall(target) 1399 for fpath, fdata in SMALL_TEST_DATA: 1400 outfile = os.path.join(target, fpath) 1401 1402 with open(outfile, "rb") as f: 1403 self.assertEqual(fdata.encode(), f.read()) 1404 1405 unlink(outfile) 1406 1407 unlink(TESTFN2) 1408 1409 def test_extract_all_with_target(self): 1410 with temp_dir() as extdir: 1411 self._test_extract_all_with_target(extdir) 1412 1413 def test_extract_all_with_target_pathlike(self): 1414 with temp_dir() as extdir: 1415 self._test_extract_all_with_target(pathlib.Path(extdir)) 1416 1417 def check_file(self, filename, content): 1418 self.assertTrue(os.path.isfile(filename)) 1419 with open(filename, 'rb') as f: 1420 self.assertEqual(f.read(), content) 1421 1422 def test_sanitize_windows_name(self): 1423 san = zipfile.ZipFile._sanitize_windows_name 1424 # Passing pathsep in allows this test to work regardless of platform. 1425 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1426 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1427 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1428 1429 def test_extract_hackers_arcnames_common_cases(self): 1430 common_hacknames = [ 1431 ('../foo/bar', 'foo/bar'), 1432 ('foo/../bar', 'foo/bar'), 1433 ('foo/../../bar', 'foo/bar'), 1434 ('foo/bar/..', 'foo/bar'), 1435 ('./../foo/bar', 'foo/bar'), 1436 ('/foo/bar', 'foo/bar'), 1437 ('/foo/../bar', 'foo/bar'), 1438 ('/foo/../../bar', 'foo/bar'), 1439 ] 1440 self._test_extract_hackers_arcnames(common_hacknames) 1441 1442 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1443 def test_extract_hackers_arcnames_windows_only(self): 1444 """Test combination of path fixing and windows name sanitization.""" 1445 windows_hacknames = [ 1446 (r'..\foo\bar', 'foo/bar'), 1447 (r'..\/foo\/bar', 'foo/bar'), 1448 (r'foo/\..\/bar', 'foo/bar'), 1449 (r'foo\/../\bar', 'foo/bar'), 1450 (r'C:foo/bar', 'foo/bar'), 1451 (r'C:/foo/bar', 'foo/bar'), 1452 (r'C://foo/bar', 'foo/bar'), 1453 (r'C:\foo\bar', 'foo/bar'), 1454 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1455 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1456 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1457 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1458 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1459 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1460 (r'//?/C:/foo/bar', 'foo/bar'), 1461 (r'\\?\C:\foo\bar', 'foo/bar'), 1462 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1463 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1464 ('../../foo../../ba..r', 'foo/ba..r'), 1465 ] 1466 self._test_extract_hackers_arcnames(windows_hacknames) 1467 1468 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1469 def test_extract_hackers_arcnames_posix_only(self): 1470 posix_hacknames = [ 1471 ('//foo/bar', 'foo/bar'), 1472 ('../../foo../../ba..r', 'foo../ba..r'), 1473 (r'foo/..\bar', r'foo/..\bar'), 1474 ] 1475 self._test_extract_hackers_arcnames(posix_hacknames) 1476 1477 def _test_extract_hackers_arcnames(self, hacknames): 1478 for arcname, fixedname in hacknames: 1479 content = b'foobar' + arcname.encode() 1480 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1481 zinfo = zipfile.ZipInfo() 1482 # preserve backslashes 1483 zinfo.filename = arcname 1484 zinfo.external_attr = 0o600 << 16 1485 zipfp.writestr(zinfo, content) 1486 1487 arcname = arcname.replace(os.sep, "/") 1488 targetpath = os.path.join('target', 'subdir', 'subsub') 1489 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1490 1491 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1492 writtenfile = zipfp.extract(arcname, targetpath) 1493 self.assertEqual(writtenfile, correctfile, 1494 msg='extract %r: %r != %r' % 1495 (arcname, writtenfile, correctfile)) 1496 self.check_file(correctfile, content) 1497 rmtree('target') 1498 1499 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1500 zipfp.extractall(targetpath) 1501 self.check_file(correctfile, content) 1502 rmtree('target') 1503 1504 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1505 1506 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1507 writtenfile = zipfp.extract(arcname) 1508 self.assertEqual(writtenfile, correctfile, 1509 msg="extract %r" % arcname) 1510 self.check_file(correctfile, content) 1511 rmtree(fixedname.split('/')[0]) 1512 1513 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1514 zipfp.extractall() 1515 self.check_file(correctfile, content) 1516 rmtree(fixedname.split('/')[0]) 1517 1518 unlink(TESTFN2) 1519 1520 1521class OtherTests(unittest.TestCase): 1522 def test_open_via_zip_info(self): 1523 # Create the ZIP archive 1524 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1525 zipfp.writestr("name", "foo") 1526 with self.assertWarns(UserWarning): 1527 zipfp.writestr("name", "bar") 1528 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1529 1530 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1531 infos = zipfp.infolist() 1532 data = b"" 1533 for info in infos: 1534 with zipfp.open(info) as zipopen: 1535 data += zipopen.read() 1536 self.assertIn(data, {b"foobar", b"barfoo"}) 1537 data = b"" 1538 for info in infos: 1539 data += zipfp.read(info) 1540 self.assertIn(data, {b"foobar", b"barfoo"}) 1541 1542 def test_writestr_extended_local_header_issue1202(self): 1543 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1544 for data in 'abcdefghijklmnop': 1545 zinfo = zipfile.ZipInfo(data) 1546 zinfo.flag_bits |= 0x08 # Include an extended local header. 1547 orig_zip.writestr(zinfo, data) 1548 1549 def test_close(self): 1550 """Check that the zipfile is closed after the 'with' block.""" 1551 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1552 for fpath, fdata in SMALL_TEST_DATA: 1553 zipfp.writestr(fpath, fdata) 1554 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1555 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1556 1557 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1558 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1559 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1560 1561 def test_close_on_exception(self): 1562 """Check that the zipfile is closed if an exception is raised in the 1563 'with' block.""" 1564 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1565 for fpath, fdata in SMALL_TEST_DATA: 1566 zipfp.writestr(fpath, fdata) 1567 1568 try: 1569 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1570 raise zipfile.BadZipFile() 1571 except zipfile.BadZipFile: 1572 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1573 1574 def test_unsupported_version(self): 1575 # File has an extract_version of 120 1576 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1577 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1578 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1579 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1580 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1581 1582 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1583 io.BytesIO(data), 'r') 1584 1585 @requires_zlib() 1586 def test_read_unicode_filenames(self): 1587 # bug #10801 1588 fname = findfile('zip_cp437_header.zip') 1589 with zipfile.ZipFile(fname) as zipfp: 1590 for name in zipfp.namelist(): 1591 zipfp.open(name).close() 1592 1593 def test_write_unicode_filenames(self): 1594 with zipfile.ZipFile(TESTFN, "w") as zf: 1595 zf.writestr("foo.txt", "Test for unicode filename") 1596 zf.writestr("\xf6.txt", "Test for unicode filename") 1597 self.assertIsInstance(zf.infolist()[0].filename, str) 1598 1599 with zipfile.ZipFile(TESTFN, "r") as zf: 1600 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1601 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1602 1603 def test_read_after_write_unicode_filenames(self): 1604 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1605 zipfp.writestr('приклад', b'sample') 1606 self.assertEqual(zipfp.read('приклад'), b'sample') 1607 1608 def test_exclusive_create_zip_file(self): 1609 """Test exclusive creating a new zipfile.""" 1610 unlink(TESTFN2) 1611 filename = 'testfile.txt' 1612 content = b'hello, world. this is some content.' 1613 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1614 zipfp.writestr(filename, content) 1615 with self.assertRaises(FileExistsError): 1616 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1617 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1618 self.assertEqual(zipfp.namelist(), [filename]) 1619 self.assertEqual(zipfp.read(filename), content) 1620 1621 def test_create_non_existent_file_for_append(self): 1622 if os.path.exists(TESTFN): 1623 os.unlink(TESTFN) 1624 1625 filename = 'testfile.txt' 1626 content = b'hello, world. this is some content.' 1627 1628 try: 1629 with zipfile.ZipFile(TESTFN, 'a') as zf: 1630 zf.writestr(filename, content) 1631 except OSError: 1632 self.fail('Could not append data to a non-existent zip file.') 1633 1634 self.assertTrue(os.path.exists(TESTFN)) 1635 1636 with zipfile.ZipFile(TESTFN, 'r') as zf: 1637 self.assertEqual(zf.read(filename), content) 1638 1639 def test_close_erroneous_file(self): 1640 # This test checks that the ZipFile constructor closes the file object 1641 # it opens if there's an error in the file. If it doesn't, the 1642 # traceback holds a reference to the ZipFile object and, indirectly, 1643 # the file object. 1644 # On Windows, this causes the os.unlink() call to fail because the 1645 # underlying file is still open. This is SF bug #412214. 1646 # 1647 with open(TESTFN, "w") as fp: 1648 fp.write("this is not a legal zip file\n") 1649 try: 1650 zf = zipfile.ZipFile(TESTFN) 1651 except zipfile.BadZipFile: 1652 pass 1653 1654 def test_is_zip_erroneous_file(self): 1655 """Check that is_zipfile() correctly identifies non-zip files.""" 1656 # - passing a filename 1657 with open(TESTFN, "w") as fp: 1658 fp.write("this is not a legal zip file\n") 1659 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1660 # - passing a path-like object 1661 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1662 # - passing a file object 1663 with open(TESTFN, "rb") as fp: 1664 self.assertFalse(zipfile.is_zipfile(fp)) 1665 # - passing a file-like object 1666 fp = io.BytesIO() 1667 fp.write(b"this is not a legal zip file\n") 1668 self.assertFalse(zipfile.is_zipfile(fp)) 1669 fp.seek(0, 0) 1670 self.assertFalse(zipfile.is_zipfile(fp)) 1671 1672 def test_damaged_zipfile(self): 1673 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1674 # - Create a valid zip file 1675 fp = io.BytesIO() 1676 with zipfile.ZipFile(fp, mode="w") as zipf: 1677 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1678 zipfiledata = fp.getvalue() 1679 1680 # - Now create copies of it missing the last N bytes and make sure 1681 # a BadZipFile exception is raised when we try to open it 1682 for N in range(len(zipfiledata)): 1683 fp = io.BytesIO(zipfiledata[:N]) 1684 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1685 1686 def test_is_zip_valid_file(self): 1687 """Check that is_zipfile() correctly identifies zip files.""" 1688 # - passing a filename 1689 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1690 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1691 1692 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1693 # - passing a file object 1694 with open(TESTFN, "rb") as fp: 1695 self.assertTrue(zipfile.is_zipfile(fp)) 1696 fp.seek(0, 0) 1697 zip_contents = fp.read() 1698 # - passing a file-like object 1699 fp = io.BytesIO() 1700 fp.write(zip_contents) 1701 self.assertTrue(zipfile.is_zipfile(fp)) 1702 fp.seek(0, 0) 1703 self.assertTrue(zipfile.is_zipfile(fp)) 1704 1705 def test_non_existent_file_raises_OSError(self): 1706 # make sure we don't raise an AttributeError when a partially-constructed 1707 # ZipFile instance is finalized; this tests for regression on SF tracker 1708 # bug #403871. 1709 1710 # The bug we're testing for caused an AttributeError to be raised 1711 # when a ZipFile instance was created for a file that did not 1712 # exist; the .fp member was not initialized but was needed by the 1713 # __del__() method. Since the AttributeError is in the __del__(), 1714 # it is ignored, but the user should be sufficiently annoyed by 1715 # the message on the output that regression will be noticed 1716 # quickly. 1717 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1718 1719 def test_empty_file_raises_BadZipFile(self): 1720 f = open(TESTFN, 'w') 1721 f.close() 1722 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1723 1724 with open(TESTFN, 'w') as fp: 1725 fp.write("short file") 1726 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1727 1728 def test_closed_zip_raises_ValueError(self): 1729 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1730 data = io.BytesIO() 1731 with zipfile.ZipFile(data, mode="w") as zipf: 1732 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1733 1734 # This is correct; calling .read on a closed ZipFile should raise 1735 # a ValueError, and so should calling .testzip. An earlier 1736 # version of .testzip would swallow this exception (and any other) 1737 # and report that the first file in the archive was corrupt. 1738 self.assertRaises(ValueError, zipf.read, "foo.txt") 1739 self.assertRaises(ValueError, zipf.open, "foo.txt") 1740 self.assertRaises(ValueError, zipf.testzip) 1741 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1742 with open(TESTFN, 'w') as f: 1743 f.write('zipfile test data') 1744 self.assertRaises(ValueError, zipf.write, TESTFN) 1745 1746 def test_bad_constructor_mode(self): 1747 """Check that bad modes passed to ZipFile constructor are caught.""" 1748 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1749 1750 def test_bad_open_mode(self): 1751 """Check that bad modes passed to ZipFile.open are caught.""" 1752 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1753 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1754 1755 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1756 # read the data to make sure the file is there 1757 zipf.read("foo.txt") 1758 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1759 # universal newlines support is removed 1760 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1761 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1762 1763 def test_read0(self): 1764 """Check that calling read(0) on a ZipExtFile object returns an empty 1765 string and doesn't advance file pointer.""" 1766 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1767 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1768 # read the data to make sure the file is there 1769 with zipf.open("foo.txt") as f: 1770 for i in range(FIXEDTEST_SIZE): 1771 self.assertEqual(f.read(0), b'') 1772 1773 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1774 1775 def test_open_non_existent_item(self): 1776 """Check that attempting to call open() for an item that doesn't 1777 exist in the archive raises a RuntimeError.""" 1778 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1779 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1780 1781 def test_bad_compression_mode(self): 1782 """Check that bad compression methods passed to ZipFile.open are 1783 caught.""" 1784 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1785 1786 def test_unsupported_compression(self): 1787 # data is declared as shrunk, but actually deflated 1788 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1789 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1790 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1791 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1792 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1793 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1794 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1795 self.assertRaises(NotImplementedError, zipf.open, 'x') 1796 1797 def test_null_byte_in_filename(self): 1798 """Check that a filename containing a null byte is properly 1799 terminated.""" 1800 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1801 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1802 self.assertEqual(zipf.namelist(), ['foo.txt']) 1803 1804 def test_struct_sizes(self): 1805 """Check that ZIP internal structure sizes are calculated correctly.""" 1806 self.assertEqual(zipfile.sizeEndCentDir, 22) 1807 self.assertEqual(zipfile.sizeCentralDir, 46) 1808 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1809 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1810 1811 def test_comments(self): 1812 """Check that comments on the archive are handled properly.""" 1813 1814 # check default comment is empty 1815 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1816 self.assertEqual(zipf.comment, b'') 1817 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1818 1819 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1820 self.assertEqual(zipfr.comment, b'') 1821 1822 # check a simple short comment 1823 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1824 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1825 zipf.comment = comment 1826 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1827 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1828 self.assertEqual(zipf.comment, comment) 1829 1830 # check a comment of max length 1831 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1832 comment2 = comment2.encode("ascii") 1833 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1834 zipf.comment = comment2 1835 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1836 1837 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1838 self.assertEqual(zipfr.comment, comment2) 1839 1840 # check a comment that is too long is truncated 1841 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1842 with self.assertWarns(UserWarning): 1843 zipf.comment = comment2 + b'oops' 1844 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1845 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1846 self.assertEqual(zipfr.comment, comment2) 1847 1848 # check that comments are correctly modified in append mode 1849 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1850 zipf.comment = b"original comment" 1851 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1852 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1853 zipf.comment = b"an updated comment" 1854 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1855 self.assertEqual(zipf.comment, b"an updated comment") 1856 1857 # check that comments are correctly shortened in append mode 1858 # and the file is indeed truncated 1859 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1860 zipf.comment = b"original comment that's longer" 1861 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1862 original_zip_size = os.path.getsize(TESTFN) 1863 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1864 zipf.comment = b"shorter comment" 1865 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 1866 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1867 self.assertEqual(zipf.comment, b"shorter comment") 1868 1869 def test_unicode_comment(self): 1870 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1871 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1872 with self.assertRaises(TypeError): 1873 zipf.comment = "this is an error" 1874 1875 def test_change_comment_in_empty_archive(self): 1876 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1877 self.assertFalse(zipf.filelist) 1878 zipf.comment = b"this is a comment" 1879 with zipfile.ZipFile(TESTFN, "r") as zipf: 1880 self.assertEqual(zipf.comment, b"this is a comment") 1881 1882 def test_change_comment_in_nonempty_archive(self): 1883 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1884 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1885 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1886 self.assertTrue(zipf.filelist) 1887 zipf.comment = b"this is a comment" 1888 with zipfile.ZipFile(TESTFN, "r") as zipf: 1889 self.assertEqual(zipf.comment, b"this is a comment") 1890 1891 def test_empty_zipfile(self): 1892 # Check that creating a file in 'w' or 'a' mode and closing without 1893 # adding any files to the archives creates a valid empty ZIP file 1894 zipf = zipfile.ZipFile(TESTFN, mode="w") 1895 zipf.close() 1896 try: 1897 zipf = zipfile.ZipFile(TESTFN, mode="r") 1898 except zipfile.BadZipFile: 1899 self.fail("Unable to create empty ZIP file in 'w' mode") 1900 1901 zipf = zipfile.ZipFile(TESTFN, mode="a") 1902 zipf.close() 1903 try: 1904 zipf = zipfile.ZipFile(TESTFN, mode="r") 1905 except: 1906 self.fail("Unable to create empty ZIP file in 'a' mode") 1907 1908 def test_open_empty_file(self): 1909 # Issue 1710703: Check that opening a file with less than 22 bytes 1910 # raises a BadZipFile exception (rather than the previously unhelpful 1911 # OSError) 1912 f = open(TESTFN, 'w') 1913 f.close() 1914 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1915 1916 def test_create_zipinfo_before_1980(self): 1917 self.assertRaises(ValueError, 1918 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1919 1920 def test_create_empty_zipinfo_repr(self): 1921 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 1922 zi = zipfile.ZipInfo(filename="empty") 1923 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 1924 1925 def test_create_empty_zipinfo_default_attributes(self): 1926 """Ensure all required attributes are set.""" 1927 zi = zipfile.ZipInfo() 1928 self.assertEqual(zi.orig_filename, "NoName") 1929 self.assertEqual(zi.filename, "NoName") 1930 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 1931 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 1932 self.assertEqual(zi.comment, b"") 1933 self.assertEqual(zi.extra, b"") 1934 self.assertIn(zi.create_system, (0, 3)) 1935 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 1936 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 1937 self.assertEqual(zi.reserved, 0) 1938 self.assertEqual(zi.flag_bits, 0) 1939 self.assertEqual(zi.volume, 0) 1940 self.assertEqual(zi.internal_attr, 0) 1941 self.assertEqual(zi.external_attr, 0) 1942 1943 # Before bpo-26185, both were missing 1944 self.assertEqual(zi.file_size, 0) 1945 self.assertEqual(zi.compress_size, 0) 1946 1947 def test_zipfile_with_short_extra_field(self): 1948 """If an extra field in the header is less than 4 bytes, skip it.""" 1949 zipdata = ( 1950 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1951 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1952 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1953 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1954 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1955 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1956 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1957 ) 1958 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1959 # testzip returns the name of the first corrupt file, or None 1960 self.assertIsNone(zipf.testzip()) 1961 1962 def test_open_conflicting_handles(self): 1963 # It's only possible to open one writable file handle at a time 1964 msg1 = b"It's fun to charter an accountant!" 1965 msg2 = b"And sail the wide accountant sea" 1966 msg3 = b"To find, explore the funds offshore" 1967 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1968 with zipf.open('foo', mode='w') as w2: 1969 w2.write(msg1) 1970 with zipf.open('bar', mode='w') as w1: 1971 with self.assertRaises(ValueError): 1972 zipf.open('handle', mode='w') 1973 with self.assertRaises(ValueError): 1974 zipf.open('foo', mode='r') 1975 with self.assertRaises(ValueError): 1976 zipf.writestr('str', 'abcde') 1977 with self.assertRaises(ValueError): 1978 zipf.write(__file__, 'file') 1979 with self.assertRaises(ValueError): 1980 zipf.close() 1981 w1.write(msg2) 1982 with zipf.open('baz', mode='w') as w2: 1983 w2.write(msg3) 1984 1985 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1986 self.assertEqual(zipf.read('foo'), msg1) 1987 self.assertEqual(zipf.read('bar'), msg2) 1988 self.assertEqual(zipf.read('baz'), msg3) 1989 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1990 1991 def test_seek_tell(self): 1992 # Test seek functionality 1993 txt = b"Where's Bruce?" 1994 bloc = txt.find(b"Bruce") 1995 # Check seek on a file 1996 with zipfile.ZipFile(TESTFN, "w") as zipf: 1997 zipf.writestr("foo.txt", txt) 1998 with zipfile.ZipFile(TESTFN, "r") as zipf: 1999 with zipf.open("foo.txt", "r") as fp: 2000 fp.seek(bloc, os.SEEK_SET) 2001 self.assertEqual(fp.tell(), bloc) 2002 fp.seek(-bloc, os.SEEK_CUR) 2003 self.assertEqual(fp.tell(), 0) 2004 fp.seek(bloc, os.SEEK_CUR) 2005 self.assertEqual(fp.tell(), bloc) 2006 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2007 fp.seek(0, os.SEEK_END) 2008 self.assertEqual(fp.tell(), len(txt)) 2009 fp.seek(0, os.SEEK_SET) 2010 self.assertEqual(fp.tell(), 0) 2011 # Check seek on memory file 2012 data = io.BytesIO() 2013 with zipfile.ZipFile(data, mode="w") as zipf: 2014 zipf.writestr("foo.txt", txt) 2015 with zipfile.ZipFile(data, mode="r") as zipf: 2016 with zipf.open("foo.txt", "r") as fp: 2017 fp.seek(bloc, os.SEEK_SET) 2018 self.assertEqual(fp.tell(), bloc) 2019 fp.seek(-bloc, os.SEEK_CUR) 2020 self.assertEqual(fp.tell(), 0) 2021 fp.seek(bloc, os.SEEK_CUR) 2022 self.assertEqual(fp.tell(), bloc) 2023 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2024 fp.seek(0, os.SEEK_END) 2025 self.assertEqual(fp.tell(), len(txt)) 2026 fp.seek(0, os.SEEK_SET) 2027 self.assertEqual(fp.tell(), 0) 2028 2029 @requires_bz2() 2030 def test_decompress_without_3rd_party_library(self): 2031 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2032 zip_file = io.BytesIO(data) 2033 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2034 zf.writestr('a.txt', b'a') 2035 with mock.patch('zipfile.bz2', None): 2036 with zipfile.ZipFile(zip_file) as zf: 2037 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2038 2039 def tearDown(self): 2040 unlink(TESTFN) 2041 unlink(TESTFN2) 2042 2043 2044class AbstractBadCrcTests: 2045 def test_testzip_with_bad_crc(self): 2046 """Tests that files with bad CRCs return their name from testzip.""" 2047 zipdata = self.zip_with_bad_crc 2048 2049 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2050 # testzip returns the name of the first corrupt file, or None 2051 self.assertEqual('afile', zipf.testzip()) 2052 2053 def test_read_with_bad_crc(self): 2054 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2055 zipdata = self.zip_with_bad_crc 2056 2057 # Using ZipFile.read() 2058 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2059 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2060 2061 # Using ZipExtFile.read() 2062 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2063 with zipf.open('afile', 'r') as corrupt_file: 2064 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2065 2066 # Same with small reads (in order to exercise the buffering logic) 2067 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2068 with zipf.open('afile', 'r') as corrupt_file: 2069 corrupt_file.MIN_READ_SIZE = 2 2070 with self.assertRaises(zipfile.BadZipFile): 2071 while corrupt_file.read(2): 2072 pass 2073 2074 2075class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2076 compression = zipfile.ZIP_STORED 2077 zip_with_bad_crc = ( 2078 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2079 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2080 b'ilehello,AworldP' 2081 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2082 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2083 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2084 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2085 b'\0\0/\0\0\0\0\0') 2086 2087@requires_zlib() 2088class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2089 compression = zipfile.ZIP_DEFLATED 2090 zip_with_bad_crc = ( 2091 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2092 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2093 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2094 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2095 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2096 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2097 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2098 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2099 2100@requires_bz2() 2101class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2102 compression = zipfile.ZIP_BZIP2 2103 zip_with_bad_crc = ( 2104 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2105 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2106 b'ileBZh91AY&SY\xd4\xa8\xca' 2107 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2108 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2109 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2110 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2111 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2112 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2113 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2114 b'\x00\x00\x00\x00') 2115 2116@requires_lzma() 2117class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2118 compression = zipfile.ZIP_LZMA 2119 zip_with_bad_crc = ( 2120 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2121 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2122 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2123 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2124 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2125 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2126 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2127 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2128 b'\x00>\x00\x00\x00\x00\x00') 2129 2130 2131class DecryptionTests(unittest.TestCase): 2132 """Check that ZIP decryption works. Since the library does not 2133 support encryption at the moment, we use a pre-generated encrypted 2134 ZIP file.""" 2135 2136 data = ( 2137 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2138 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2139 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2140 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2141 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2142 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2143 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2144 data2 = ( 2145 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2146 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2147 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2148 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2149 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2150 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2151 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2152 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2153 2154 plain = b'zipfile.py encryption test' 2155 plain2 = b'\x00'*512 2156 2157 def setUp(self): 2158 with open(TESTFN, "wb") as fp: 2159 fp.write(self.data) 2160 self.zip = zipfile.ZipFile(TESTFN, "r") 2161 with open(TESTFN2, "wb") as fp: 2162 fp.write(self.data2) 2163 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2164 2165 def tearDown(self): 2166 self.zip.close() 2167 os.unlink(TESTFN) 2168 self.zip2.close() 2169 os.unlink(TESTFN2) 2170 2171 def test_no_password(self): 2172 # Reading the encrypted file without password 2173 # must generate a RunTime exception 2174 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2175 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2176 2177 def test_bad_password(self): 2178 self.zip.setpassword(b"perl") 2179 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2180 self.zip2.setpassword(b"perl") 2181 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2182 2183 @requires_zlib() 2184 def test_good_password(self): 2185 self.zip.setpassword(b"python") 2186 self.assertEqual(self.zip.read("test.txt"), self.plain) 2187 self.zip2.setpassword(b"12345") 2188 self.assertEqual(self.zip2.read("zero"), self.plain2) 2189 2190 def test_unicode_password(self): 2191 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2192 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2193 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2194 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2195 2196 def test_seek_tell(self): 2197 self.zip.setpassword(b"python") 2198 txt = self.plain 2199 test_word = b'encryption' 2200 bloc = txt.find(test_word) 2201 bloc_len = len(test_word) 2202 with self.zip.open("test.txt", "r") as fp: 2203 fp.seek(bloc, os.SEEK_SET) 2204 self.assertEqual(fp.tell(), bloc) 2205 fp.seek(-bloc, os.SEEK_CUR) 2206 self.assertEqual(fp.tell(), 0) 2207 fp.seek(bloc, os.SEEK_CUR) 2208 self.assertEqual(fp.tell(), bloc) 2209 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2210 2211 # Make sure that the second read after seeking back beyond 2212 # _readbuffer returns the same content (ie. rewind to the start of 2213 # the file to read forward to the required position). 2214 old_read_size = fp.MIN_READ_SIZE 2215 fp.MIN_READ_SIZE = 1 2216 fp._readbuffer = b'' 2217 fp._offset = 0 2218 fp.seek(0, os.SEEK_SET) 2219 self.assertEqual(fp.tell(), 0) 2220 fp.seek(bloc, os.SEEK_CUR) 2221 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2222 fp.MIN_READ_SIZE = old_read_size 2223 2224 fp.seek(0, os.SEEK_END) 2225 self.assertEqual(fp.tell(), len(txt)) 2226 fp.seek(0, os.SEEK_SET) 2227 self.assertEqual(fp.tell(), 0) 2228 2229 # Read the file completely to definitely call any eof integrity 2230 # checks (crc) and make sure they still pass. 2231 fp.read() 2232 2233 2234class AbstractTestsWithRandomBinaryFiles: 2235 @classmethod 2236 def setUpClass(cls): 2237 datacount = randint(16, 64)*1024 + randint(1, 1024) 2238 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2239 for i in range(datacount)) 2240 2241 def setUp(self): 2242 # Make a source file with some lines 2243 with open(TESTFN, "wb") as fp: 2244 fp.write(self.data) 2245 2246 def tearDown(self): 2247 unlink(TESTFN) 2248 unlink(TESTFN2) 2249 2250 def make_test_archive(self, f, compression): 2251 # Create the ZIP archive 2252 with zipfile.ZipFile(f, "w", compression) as zipfp: 2253 zipfp.write(TESTFN, "another.name") 2254 zipfp.write(TESTFN, TESTFN) 2255 2256 def zip_test(self, f, compression): 2257 self.make_test_archive(f, compression) 2258 2259 # Read the ZIP archive 2260 with zipfile.ZipFile(f, "r", compression) as zipfp: 2261 testdata = zipfp.read(TESTFN) 2262 self.assertEqual(len(testdata), len(self.data)) 2263 self.assertEqual(testdata, self.data) 2264 self.assertEqual(zipfp.read("another.name"), self.data) 2265 2266 def test_read(self): 2267 for f in get_files(self): 2268 self.zip_test(f, self.compression) 2269 2270 def zip_open_test(self, f, compression): 2271 self.make_test_archive(f, compression) 2272 2273 # Read the ZIP archive 2274 with zipfile.ZipFile(f, "r", compression) as zipfp: 2275 zipdata1 = [] 2276 with zipfp.open(TESTFN) as zipopen1: 2277 while True: 2278 read_data = zipopen1.read(256) 2279 if not read_data: 2280 break 2281 zipdata1.append(read_data) 2282 2283 zipdata2 = [] 2284 with zipfp.open("another.name") as zipopen2: 2285 while True: 2286 read_data = zipopen2.read(256) 2287 if not read_data: 2288 break 2289 zipdata2.append(read_data) 2290 2291 testdata1 = b''.join(zipdata1) 2292 self.assertEqual(len(testdata1), len(self.data)) 2293 self.assertEqual(testdata1, self.data) 2294 2295 testdata2 = b''.join(zipdata2) 2296 self.assertEqual(len(testdata2), len(self.data)) 2297 self.assertEqual(testdata2, self.data) 2298 2299 def test_open(self): 2300 for f in get_files(self): 2301 self.zip_open_test(f, self.compression) 2302 2303 def zip_random_open_test(self, f, compression): 2304 self.make_test_archive(f, compression) 2305 2306 # Read the ZIP archive 2307 with zipfile.ZipFile(f, "r", compression) as zipfp: 2308 zipdata1 = [] 2309 with zipfp.open(TESTFN) as zipopen1: 2310 while True: 2311 read_data = zipopen1.read(randint(1, 1024)) 2312 if not read_data: 2313 break 2314 zipdata1.append(read_data) 2315 2316 testdata = b''.join(zipdata1) 2317 self.assertEqual(len(testdata), len(self.data)) 2318 self.assertEqual(testdata, self.data) 2319 2320 def test_random_open(self): 2321 for f in get_files(self): 2322 self.zip_random_open_test(f, self.compression) 2323 2324 2325class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2326 unittest.TestCase): 2327 compression = zipfile.ZIP_STORED 2328 2329@requires_zlib() 2330class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2331 unittest.TestCase): 2332 compression = zipfile.ZIP_DEFLATED 2333 2334@requires_bz2() 2335class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2336 unittest.TestCase): 2337 compression = zipfile.ZIP_BZIP2 2338 2339@requires_lzma() 2340class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2341 unittest.TestCase): 2342 compression = zipfile.ZIP_LZMA 2343 2344 2345# Provide the tell() method but not seek() 2346class Tellable: 2347 def __init__(self, fp): 2348 self.fp = fp 2349 self.offset = 0 2350 2351 def write(self, data): 2352 n = self.fp.write(data) 2353 self.offset += n 2354 return n 2355 2356 def tell(self): 2357 return self.offset 2358 2359 def flush(self): 2360 self.fp.flush() 2361 2362class Unseekable: 2363 def __init__(self, fp): 2364 self.fp = fp 2365 2366 def write(self, data): 2367 return self.fp.write(data) 2368 2369 def flush(self): 2370 self.fp.flush() 2371 2372class UnseekableTests(unittest.TestCase): 2373 def test_writestr(self): 2374 for wrapper in (lambda f: f), Tellable, Unseekable: 2375 with self.subTest(wrapper=wrapper): 2376 f = io.BytesIO() 2377 f.write(b'abc') 2378 bf = io.BufferedWriter(f) 2379 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2380 zipfp.writestr('ones', b'111') 2381 zipfp.writestr('twos', b'222') 2382 self.assertEqual(f.getvalue()[:5], b'abcPK') 2383 with zipfile.ZipFile(f, mode='r') as zipf: 2384 with zipf.open('ones') as zopen: 2385 self.assertEqual(zopen.read(), b'111') 2386 with zipf.open('twos') as zopen: 2387 self.assertEqual(zopen.read(), b'222') 2388 2389 def test_write(self): 2390 for wrapper in (lambda f: f), Tellable, Unseekable: 2391 with self.subTest(wrapper=wrapper): 2392 f = io.BytesIO() 2393 f.write(b'abc') 2394 bf = io.BufferedWriter(f) 2395 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2396 self.addCleanup(unlink, TESTFN) 2397 with open(TESTFN, 'wb') as f2: 2398 f2.write(b'111') 2399 zipfp.write(TESTFN, 'ones') 2400 with open(TESTFN, 'wb') as f2: 2401 f2.write(b'222') 2402 zipfp.write(TESTFN, 'twos') 2403 self.assertEqual(f.getvalue()[:5], b'abcPK') 2404 with zipfile.ZipFile(f, mode='r') as zipf: 2405 with zipf.open('ones') as zopen: 2406 self.assertEqual(zopen.read(), b'111') 2407 with zipf.open('twos') as zopen: 2408 self.assertEqual(zopen.read(), b'222') 2409 2410 def test_open_write(self): 2411 for wrapper in (lambda f: f), Tellable, Unseekable: 2412 with self.subTest(wrapper=wrapper): 2413 f = io.BytesIO() 2414 f.write(b'abc') 2415 bf = io.BufferedWriter(f) 2416 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2417 with zipf.open('ones', 'w') as zopen: 2418 zopen.write(b'111') 2419 with zipf.open('twos', 'w') as zopen: 2420 zopen.write(b'222') 2421 self.assertEqual(f.getvalue()[:5], b'abcPK') 2422 with zipfile.ZipFile(f) as zipf: 2423 self.assertEqual(zipf.read('ones'), b'111') 2424 self.assertEqual(zipf.read('twos'), b'222') 2425 2426 2427@requires_zlib() 2428class TestsWithMultipleOpens(unittest.TestCase): 2429 @classmethod 2430 def setUpClass(cls): 2431 cls.data1 = b'111' + randbytes(10000) 2432 cls.data2 = b'222' + randbytes(10000) 2433 2434 def make_test_archive(self, f): 2435 # Create the ZIP archive 2436 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2437 zipfp.writestr('ones', self.data1) 2438 zipfp.writestr('twos', self.data2) 2439 2440 def test_same_file(self): 2441 # Verify that (when the ZipFile is in control of creating file objects) 2442 # multiple open() calls can be made without interfering with each other. 2443 for f in get_files(self): 2444 self.make_test_archive(f) 2445 with zipfile.ZipFile(f, mode="r") as zipf: 2446 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2447 data1 = zopen1.read(500) 2448 data2 = zopen2.read(500) 2449 data1 += zopen1.read() 2450 data2 += zopen2.read() 2451 self.assertEqual(data1, data2) 2452 self.assertEqual(data1, self.data1) 2453 2454 def test_different_file(self): 2455 # Verify that (when the ZipFile is in control of creating file objects) 2456 # multiple open() calls can be made without interfering with each other. 2457 for f in get_files(self): 2458 self.make_test_archive(f) 2459 with zipfile.ZipFile(f, mode="r") as zipf: 2460 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2461 data1 = zopen1.read(500) 2462 data2 = zopen2.read(500) 2463 data1 += zopen1.read() 2464 data2 += zopen2.read() 2465 self.assertEqual(data1, self.data1) 2466 self.assertEqual(data2, self.data2) 2467 2468 def test_interleaved(self): 2469 # Verify that (when the ZipFile is in control of creating file objects) 2470 # multiple open() calls can be made without interfering with each other. 2471 for f in get_files(self): 2472 self.make_test_archive(f) 2473 with zipfile.ZipFile(f, mode="r") as zipf: 2474 with zipf.open('ones') as zopen1: 2475 data1 = zopen1.read(500) 2476 with zipf.open('twos') as zopen2: 2477 data2 = zopen2.read(500) 2478 data1 += zopen1.read() 2479 data2 += zopen2.read() 2480 self.assertEqual(data1, self.data1) 2481 self.assertEqual(data2, self.data2) 2482 2483 def test_read_after_close(self): 2484 for f in get_files(self): 2485 self.make_test_archive(f) 2486 with contextlib.ExitStack() as stack: 2487 with zipfile.ZipFile(f, 'r') as zipf: 2488 zopen1 = stack.enter_context(zipf.open('ones')) 2489 zopen2 = stack.enter_context(zipf.open('twos')) 2490 data1 = zopen1.read(500) 2491 data2 = zopen2.read(500) 2492 data1 += zopen1.read() 2493 data2 += zopen2.read() 2494 self.assertEqual(data1, self.data1) 2495 self.assertEqual(data2, self.data2) 2496 2497 def test_read_after_write(self): 2498 for f in get_files(self): 2499 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2500 zipf.writestr('ones', self.data1) 2501 zipf.writestr('twos', self.data2) 2502 with zipf.open('ones') as zopen1: 2503 data1 = zopen1.read(500) 2504 self.assertEqual(data1, self.data1[:500]) 2505 with zipfile.ZipFile(f, 'r') as zipf: 2506 data1 = zipf.read('ones') 2507 data2 = zipf.read('twos') 2508 self.assertEqual(data1, self.data1) 2509 self.assertEqual(data2, self.data2) 2510 2511 def test_write_after_read(self): 2512 for f in get_files(self): 2513 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2514 zipf.writestr('ones', self.data1) 2515 with zipf.open('ones') as zopen1: 2516 zopen1.read(500) 2517 zipf.writestr('twos', self.data2) 2518 with zipfile.ZipFile(f, 'r') as zipf: 2519 data1 = zipf.read('ones') 2520 data2 = zipf.read('twos') 2521 self.assertEqual(data1, self.data1) 2522 self.assertEqual(data2, self.data2) 2523 2524 def test_many_opens(self): 2525 # Verify that read() and open() promptly close the file descriptor, 2526 # and don't rely on the garbage collector to free resources. 2527 self.make_test_archive(TESTFN2) 2528 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2529 for x in range(100): 2530 zipf.read('ones') 2531 with zipf.open('ones') as zopen1: 2532 pass 2533 with open(os.devnull) as f: 2534 self.assertLess(f.fileno(), 100) 2535 2536 def test_write_while_reading(self): 2537 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2538 zipf.writestr('ones', self.data1) 2539 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2540 with zipf.open('ones', 'r') as r1: 2541 data1 = r1.read(500) 2542 with zipf.open('twos', 'w') as w1: 2543 w1.write(self.data2) 2544 data1 += r1.read() 2545 self.assertEqual(data1, self.data1) 2546 with zipfile.ZipFile(TESTFN2) as zipf: 2547 self.assertEqual(zipf.read('twos'), self.data2) 2548 2549 def tearDown(self): 2550 unlink(TESTFN2) 2551 2552 2553class TestWithDirectory(unittest.TestCase): 2554 def setUp(self): 2555 os.mkdir(TESTFN2) 2556 2557 def test_extract_dir(self): 2558 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2559 zipf.extractall(TESTFN2) 2560 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2561 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2562 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2563 2564 def test_bug_6050(self): 2565 # Extraction should succeed if directories already exist 2566 os.mkdir(os.path.join(TESTFN2, "a")) 2567 self.test_extract_dir() 2568 2569 def test_write_dir(self): 2570 dirpath = os.path.join(TESTFN2, "x") 2571 os.mkdir(dirpath) 2572 mode = os.stat(dirpath).st_mode & 0xFFFF 2573 with zipfile.ZipFile(TESTFN, "w") as zipf: 2574 zipf.write(dirpath) 2575 zinfo = zipf.filelist[0] 2576 self.assertTrue(zinfo.filename.endswith("/x/")) 2577 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2578 zipf.write(dirpath, "y") 2579 zinfo = zipf.filelist[1] 2580 self.assertTrue(zinfo.filename, "y/") 2581 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2582 with zipfile.ZipFile(TESTFN, "r") as zipf: 2583 zinfo = zipf.filelist[0] 2584 self.assertTrue(zinfo.filename.endswith("/x/")) 2585 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2586 zinfo = zipf.filelist[1] 2587 self.assertTrue(zinfo.filename, "y/") 2588 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2589 target = os.path.join(TESTFN2, "target") 2590 os.mkdir(target) 2591 zipf.extractall(target) 2592 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2593 self.assertEqual(len(os.listdir(target)), 2) 2594 2595 def test_writestr_dir(self): 2596 os.mkdir(os.path.join(TESTFN2, "x")) 2597 with zipfile.ZipFile(TESTFN, "w") as zipf: 2598 zipf.writestr("x/", b'') 2599 zinfo = zipf.filelist[0] 2600 self.assertEqual(zinfo.filename, "x/") 2601 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2602 with zipfile.ZipFile(TESTFN, "r") as zipf: 2603 zinfo = zipf.filelist[0] 2604 self.assertTrue(zinfo.filename.endswith("x/")) 2605 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2606 target = os.path.join(TESTFN2, "target") 2607 os.mkdir(target) 2608 zipf.extractall(target) 2609 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2610 self.assertEqual(os.listdir(target), ["x"]) 2611 2612 def tearDown(self): 2613 rmtree(TESTFN2) 2614 if os.path.exists(TESTFN): 2615 unlink(TESTFN) 2616 2617 2618class ZipInfoTests(unittest.TestCase): 2619 def test_from_file(self): 2620 zi = zipfile.ZipInfo.from_file(__file__) 2621 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2622 self.assertFalse(zi.is_dir()) 2623 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2624 2625 def test_from_file_pathlike(self): 2626 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2627 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2628 self.assertFalse(zi.is_dir()) 2629 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2630 2631 def test_from_file_bytes(self): 2632 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2633 self.assertEqual(posixpath.basename(zi.filename), 'test') 2634 self.assertFalse(zi.is_dir()) 2635 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2636 2637 def test_from_file_fileno(self): 2638 with open(__file__, 'rb') as f: 2639 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2640 self.assertEqual(posixpath.basename(zi.filename), 'test') 2641 self.assertFalse(zi.is_dir()) 2642 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2643 2644 def test_from_dir(self): 2645 dirpath = os.path.dirname(os.path.abspath(__file__)) 2646 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2647 self.assertEqual(zi.filename, 'stdlib_tests/') 2648 self.assertTrue(zi.is_dir()) 2649 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2650 self.assertEqual(zi.file_size, 0) 2651 2652 2653class CommandLineTest(unittest.TestCase): 2654 2655 def zipfilecmd(self, *args, **kwargs): 2656 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2657 **kwargs) 2658 return out.replace(os.linesep.encode(), b'\n') 2659 2660 def zipfilecmd_failure(self, *args): 2661 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2662 2663 def test_bad_use(self): 2664 rc, out, err = self.zipfilecmd_failure() 2665 self.assertEqual(out, b'') 2666 self.assertIn(b'usage', err.lower()) 2667 self.assertIn(b'error', err.lower()) 2668 self.assertIn(b'required', err.lower()) 2669 rc, out, err = self.zipfilecmd_failure('-l', '') 2670 self.assertEqual(out, b'') 2671 self.assertNotEqual(err.strip(), b'') 2672 2673 def test_test_command(self): 2674 zip_name = findfile('zipdir.zip') 2675 for opt in '-t', '--test': 2676 out = self.zipfilecmd(opt, zip_name) 2677 self.assertEqual(out.rstrip(), b'Done testing') 2678 zip_name = findfile('testtar.tar') 2679 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2680 self.assertEqual(out, b'') 2681 2682 def test_list_command(self): 2683 zip_name = findfile('zipdir.zip') 2684 t = io.StringIO() 2685 with zipfile.ZipFile(zip_name, 'r') as tf: 2686 tf.printdir(t) 2687 expected = t.getvalue().encode('ascii', 'backslashreplace') 2688 for opt in '-l', '--list': 2689 out = self.zipfilecmd(opt, zip_name, 2690 PYTHONIOENCODING='ascii:backslashreplace') 2691 self.assertEqual(out, expected) 2692 2693 @requires_zlib() 2694 def test_create_command(self): 2695 self.addCleanup(unlink, TESTFN) 2696 with open(TESTFN, 'w') as f: 2697 f.write('test 1') 2698 os.mkdir(TESTFNDIR) 2699 self.addCleanup(rmtree, TESTFNDIR) 2700 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w') as f: 2701 f.write('test 2') 2702 files = [TESTFN, TESTFNDIR] 2703 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2704 for opt in '-c', '--create': 2705 try: 2706 out = self.zipfilecmd(opt, TESTFN2, *files) 2707 self.assertEqual(out, b'') 2708 with zipfile.ZipFile(TESTFN2) as zf: 2709 self.assertEqual(zf.namelist(), namelist) 2710 self.assertEqual(zf.read(namelist[0]), b'test 1') 2711 self.assertEqual(zf.read(namelist[2]), b'test 2') 2712 finally: 2713 unlink(TESTFN2) 2714 2715 def test_extract_command(self): 2716 zip_name = findfile('zipdir.zip') 2717 for opt in '-e', '--extract': 2718 with temp_dir() as extdir: 2719 out = self.zipfilecmd(opt, zip_name, extdir) 2720 self.assertEqual(out, b'') 2721 with zipfile.ZipFile(zip_name) as zf: 2722 for zi in zf.infolist(): 2723 path = os.path.join(extdir, 2724 zi.filename.replace('/', os.sep)) 2725 if zi.is_dir(): 2726 self.assertTrue(os.path.isdir(path)) 2727 else: 2728 self.assertTrue(os.path.isfile(path)) 2729 with open(path, 'rb') as f: 2730 self.assertEqual(f.read(), zf.read(zi)) 2731 2732 2733class TestExecutablePrependedZip(unittest.TestCase): 2734 """Test our ability to open zip files with an executable prepended.""" 2735 2736 def setUp(self): 2737 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2738 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2739 2740 def _test_zip_works(self, name): 2741 # bpo28494 sanity check: ensure is_zipfile works on these. 2742 self.assertTrue(zipfile.is_zipfile(name), 2743 f'is_zipfile failed on {name}') 2744 # Ensure we can operate on these via ZipFile. 2745 with zipfile.ZipFile(name) as zipfp: 2746 for n in zipfp.namelist(): 2747 data = zipfp.read(n) 2748 self.assertIn(b'FAVORITE_NUMBER', data) 2749 2750 def test_read_zip_with_exe_prepended(self): 2751 self._test_zip_works(self.exe_zip) 2752 2753 def test_read_zip64_with_exe_prepended(self): 2754 self._test_zip_works(self.exe_zip64) 2755 2756 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2757 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2758 'Test relies on #!/bin/bash working.') 2759 def test_execute_zip2(self): 2760 output = subprocess.check_output([self.exe_zip, sys.executable]) 2761 self.assertIn(b'number in executable: 5', output) 2762 2763 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2764 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2765 'Test relies on #!/bin/bash working.') 2766 def test_execute_zip64(self): 2767 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2768 self.assertIn(b'number in executable: 5', output) 2769 2770 2771# Poor man's technique to consume a (smallish) iterable. 2772consume = tuple 2773 2774 2775# from jaraco.itertools 5.0 2776class jaraco: 2777 class itertools: 2778 class Counter: 2779 def __init__(self, i): 2780 self.count = 0 2781 self._orig_iter = iter(i) 2782 2783 def __iter__(self): 2784 return self 2785 2786 def __next__(self): 2787 result = next(self._orig_iter) 2788 self.count += 1 2789 return result 2790 2791 2792def add_dirs(zf): 2793 """ 2794 Given a writable zip file zf, inject directory entries for 2795 any directories implied by the presence of children. 2796 """ 2797 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2798 zf.writestr(name, b"") 2799 return zf 2800 2801 2802def build_alpharep_fixture(): 2803 """ 2804 Create a zip file with this structure: 2805 2806 . 2807 ├── a.txt 2808 ├── b 2809 │ ├── c.txt 2810 │ ├── d 2811 │ │ └── e.txt 2812 │ └── f.txt 2813 └── g 2814 └── h 2815 └── i.txt 2816 2817 This fixture has the following key characteristics: 2818 2819 - a file at the root (a) 2820 - a file two levels deep (b/d/e) 2821 - multiple files in a directory (b/c, b/f) 2822 - a directory containing only a directory (g/h) 2823 2824 "alpha" because it uses alphabet 2825 "rep" because it's a representative example 2826 """ 2827 data = io.BytesIO() 2828 zf = zipfile.ZipFile(data, "w") 2829 zf.writestr("a.txt", b"content of a") 2830 zf.writestr("b/c.txt", b"content of c") 2831 zf.writestr("b/d/e.txt", b"content of e") 2832 zf.writestr("b/f.txt", b"content of f") 2833 zf.writestr("g/h/i.txt", b"content of i") 2834 zf.filename = "alpharep.zip" 2835 return zf 2836 2837 2838class TestPath(unittest.TestCase): 2839 def setUp(self): 2840 self.fixtures = contextlib.ExitStack() 2841 self.addCleanup(self.fixtures.close) 2842 2843 def zipfile_alpharep(self): 2844 with self.subTest(): 2845 yield build_alpharep_fixture() 2846 with self.subTest(): 2847 yield add_dirs(build_alpharep_fixture()) 2848 2849 def zipfile_ondisk(self): 2850 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2851 for alpharep in self.zipfile_alpharep(): 2852 buffer = alpharep.fp 2853 alpharep.close() 2854 path = tmpdir / alpharep.filename 2855 with path.open("wb") as strm: 2856 strm.write(buffer.getvalue()) 2857 yield path 2858 2859 def test_iterdir_and_types(self): 2860 for alpharep in self.zipfile_alpharep(): 2861 root = zipfile.Path(alpharep) 2862 assert root.is_dir() 2863 a, b, g = root.iterdir() 2864 assert a.is_file() 2865 assert b.is_dir() 2866 assert g.is_dir() 2867 c, f, d = b.iterdir() 2868 assert c.is_file() and f.is_file() 2869 e, = d.iterdir() 2870 assert e.is_file() 2871 h, = g.iterdir() 2872 i, = h.iterdir() 2873 assert i.is_file() 2874 2875 def test_subdir_is_dir(self): 2876 for alpharep in self.zipfile_alpharep(): 2877 root = zipfile.Path(alpharep) 2878 assert (root / 'b').is_dir() 2879 assert (root / 'b/').is_dir() 2880 assert (root / 'g').is_dir() 2881 assert (root / 'g/').is_dir() 2882 2883 def test_open(self): 2884 for alpharep in self.zipfile_alpharep(): 2885 root = zipfile.Path(alpharep) 2886 a, b, g = root.iterdir() 2887 with a.open() as strm: 2888 data = strm.read() 2889 assert data == "content of a" 2890 2891 def test_read(self): 2892 for alpharep in self.zipfile_alpharep(): 2893 root = zipfile.Path(alpharep) 2894 a, b, g = root.iterdir() 2895 assert a.read_text() == "content of a" 2896 assert a.read_bytes() == b"content of a" 2897 2898 def test_joinpath(self): 2899 for alpharep in self.zipfile_alpharep(): 2900 root = zipfile.Path(alpharep) 2901 a = root.joinpath("a") 2902 assert a.is_file() 2903 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 2904 assert e.read_text() == "content of e" 2905 2906 def test_traverse_truediv(self): 2907 for alpharep in self.zipfile_alpharep(): 2908 root = zipfile.Path(alpharep) 2909 a = root / "a" 2910 assert a.is_file() 2911 e = root / "b" / "d" / "e.txt" 2912 assert e.read_text() == "content of e" 2913 2914 def test_pathlike_construction(self): 2915 """ 2916 zipfile.Path should be constructable from a path-like object 2917 """ 2918 for zipfile_ondisk in self.zipfile_ondisk(): 2919 pathlike = pathlib.Path(str(zipfile_ondisk)) 2920 zipfile.Path(pathlike) 2921 2922 def test_traverse_pathlike(self): 2923 for alpharep in self.zipfile_alpharep(): 2924 root = zipfile.Path(alpharep) 2925 root / pathlib.Path("a") 2926 2927 def test_parent(self): 2928 for alpharep in self.zipfile_alpharep(): 2929 root = zipfile.Path(alpharep) 2930 assert (root / 'a').parent.at == '' 2931 assert (root / 'a' / 'b').parent.at == 'a/' 2932 2933 def test_dir_parent(self): 2934 for alpharep in self.zipfile_alpharep(): 2935 root = zipfile.Path(alpharep) 2936 assert (root / 'b').parent.at == '' 2937 assert (root / 'b/').parent.at == '' 2938 2939 def test_missing_dir_parent(self): 2940 for alpharep in self.zipfile_alpharep(): 2941 root = zipfile.Path(alpharep) 2942 assert (root / 'missing dir/').parent.at == '' 2943 2944 def test_mutability(self): 2945 """ 2946 If the underlying zipfile is changed, the Path object should 2947 reflect that change. 2948 """ 2949 for alpharep in self.zipfile_alpharep(): 2950 root = zipfile.Path(alpharep) 2951 a, b, g = root.iterdir() 2952 alpharep.writestr('foo.txt', 'foo') 2953 alpharep.writestr('bar/baz.txt', 'baz') 2954 assert any( 2955 child.name == 'foo.txt' 2956 for child in root.iterdir()) 2957 assert (root / 'foo.txt').read_text() == 'foo' 2958 baz, = (root / 'bar').iterdir() 2959 assert baz.read_text() == 'baz' 2960 2961 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 2962 2963 def huge_zipfile(self): 2964 """Create a read-only zipfile with a huge number of entries entries.""" 2965 strm = io.BytesIO() 2966 zf = zipfile.ZipFile(strm, "w") 2967 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 2968 zf.writestr(entry, entry) 2969 zf.mode = 'r' 2970 return zf 2971 2972 def test_joinpath_constant_time(self): 2973 """ 2974 Ensure joinpath on items in zipfile is linear time. 2975 """ 2976 root = zipfile.Path(self.huge_zipfile()) 2977 entries = jaraco.itertools.Counter(root.iterdir()) 2978 for entry in entries: 2979 entry.joinpath('suffix') 2980 # Check the file iterated all items 2981 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 2982 2983 # @func_timeout.func_set_timeout(3) 2984 def test_implied_dirs_performance(self): 2985 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 2986 zipfile.CompleteDirs._implied_dirs(data) 2987 2988 2989if __name__ == "__main__": 2990 unittest.main() 2991