1import contextlib 2import importlib.util 3import io 4import itertools 5import os 6import pathlib 7import posixpath 8import string 9import struct 10import subprocess 11import sys 12import time 13import unittest 14import unittest.mock as mock 15import zipfile 16import functools 17 18 19from tempfile import TemporaryFile 20from random import randint, random, randbytes 21 22from test.support import script_helper 23from test.support import (findfile, requires_zlib, requires_bz2, 24 requires_lzma, captured_stdout) 25from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd 26 27 28TESTFN2 = TESTFN + "2" 29TESTFNDIR = TESTFN + "d" 30FIXEDTEST_SIZE = 1000 31DATAFILES_DIR = 'zipfile_datafiles' 32 33SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 34 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 35 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 36 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 37 38def get_files(test): 39 yield TESTFN2 40 with TemporaryFile() as f: 41 yield f 42 test.assertFalse(f.closed) 43 with io.BytesIO() as f: 44 yield f 45 test.assertFalse(f.closed) 46 47class AbstractTestsWithSourceFile: 48 @classmethod 49 def setUpClass(cls): 50 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 51 (i, random()), "ascii") 52 for i in range(FIXEDTEST_SIZE)] 53 cls.data = b''.join(cls.line_gen) 54 55 def setUp(self): 56 # Make a source file with some lines 57 with open(TESTFN, "wb") as fp: 58 fp.write(self.data) 59 60 def make_test_archive(self, f, compression, compresslevel=None): 61 kwargs = {'compression': compression, 'compresslevel': compresslevel} 62 # Create the ZIP archive 63 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 64 zipfp.write(TESTFN, "another.name") 65 zipfp.write(TESTFN, TESTFN) 66 zipfp.writestr("strfile", self.data) 67 with zipfp.open('written-open-w', mode='w') as f: 68 for line in self.line_gen: 69 f.write(line) 70 71 def zip_test(self, f, compression, compresslevel=None): 72 self.make_test_archive(f, compression, compresslevel) 73 74 # Read the ZIP archive 75 with zipfile.ZipFile(f, "r", compression) as zipfp: 76 self.assertEqual(zipfp.read(TESTFN), self.data) 77 self.assertEqual(zipfp.read("another.name"), self.data) 78 self.assertEqual(zipfp.read("strfile"), self.data) 79 80 # Print the ZIP directory 81 fp = io.StringIO() 82 zipfp.printdir(file=fp) 83 directory = fp.getvalue() 84 lines = directory.splitlines() 85 self.assertEqual(len(lines), 5) # Number of files + header 86 87 self.assertIn('File Name', lines[0]) 88 self.assertIn('Modified', lines[0]) 89 self.assertIn('Size', lines[0]) 90 91 fn, date, time_, size = lines[1].split() 92 self.assertEqual(fn, 'another.name') 93 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 94 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 95 self.assertEqual(size, str(len(self.data))) 96 97 # Check the namelist 98 names = zipfp.namelist() 99 self.assertEqual(len(names), 4) 100 self.assertIn(TESTFN, names) 101 self.assertIn("another.name", names) 102 self.assertIn("strfile", names) 103 self.assertIn("written-open-w", names) 104 105 # Check infolist 106 infos = zipfp.infolist() 107 names = [i.filename for i in infos] 108 self.assertEqual(len(names), 4) 109 self.assertIn(TESTFN, names) 110 self.assertIn("another.name", names) 111 self.assertIn("strfile", names) 112 self.assertIn("written-open-w", names) 113 for i in infos: 114 self.assertEqual(i.file_size, len(self.data)) 115 116 # check getinfo 117 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 118 info = zipfp.getinfo(nm) 119 self.assertEqual(info.filename, nm) 120 self.assertEqual(info.file_size, len(self.data)) 121 122 # Check that testzip doesn't raise an exception 123 zipfp.testzip() 124 125 def test_basic(self): 126 for f in get_files(self): 127 self.zip_test(f, self.compression) 128 129 def zip_open_test(self, f, compression): 130 self.make_test_archive(f, compression) 131 132 # Read the ZIP archive 133 with zipfile.ZipFile(f, "r", compression) as zipfp: 134 zipdata1 = [] 135 with zipfp.open(TESTFN) as zipopen1: 136 while True: 137 read_data = zipopen1.read(256) 138 if not read_data: 139 break 140 zipdata1.append(read_data) 141 142 zipdata2 = [] 143 with zipfp.open("another.name") as zipopen2: 144 while True: 145 read_data = zipopen2.read(256) 146 if not read_data: 147 break 148 zipdata2.append(read_data) 149 150 self.assertEqual(b''.join(zipdata1), self.data) 151 self.assertEqual(b''.join(zipdata2), self.data) 152 153 def test_open(self): 154 for f in get_files(self): 155 self.zip_open_test(f, self.compression) 156 157 def test_open_with_pathlike(self): 158 path = pathlib.Path(TESTFN2) 159 self.zip_open_test(path, self.compression) 160 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 161 self.assertIsInstance(zipfp.filename, str) 162 163 def zip_random_open_test(self, f, compression): 164 self.make_test_archive(f, compression) 165 166 # Read the ZIP archive 167 with zipfile.ZipFile(f, "r", compression) as zipfp: 168 zipdata1 = [] 169 with zipfp.open(TESTFN) as zipopen1: 170 while True: 171 read_data = zipopen1.read(randint(1, 1024)) 172 if not read_data: 173 break 174 zipdata1.append(read_data) 175 176 self.assertEqual(b''.join(zipdata1), self.data) 177 178 def test_random_open(self): 179 for f in get_files(self): 180 self.zip_random_open_test(f, self.compression) 181 182 def zip_read1_test(self, f, compression): 183 self.make_test_archive(f, compression) 184 185 # Read the ZIP archive 186 with zipfile.ZipFile(f, "r") as zipfp, \ 187 zipfp.open(TESTFN) as zipopen: 188 zipdata = [] 189 while True: 190 read_data = zipopen.read1(-1) 191 if not read_data: 192 break 193 zipdata.append(read_data) 194 195 self.assertEqual(b''.join(zipdata), self.data) 196 197 def test_read1(self): 198 for f in get_files(self): 199 self.zip_read1_test(f, self.compression) 200 201 def zip_read1_10_test(self, f, compression): 202 self.make_test_archive(f, compression) 203 204 # Read the ZIP archive 205 with zipfile.ZipFile(f, "r") as zipfp, \ 206 zipfp.open(TESTFN) as zipopen: 207 zipdata = [] 208 while True: 209 read_data = zipopen.read1(10) 210 self.assertLessEqual(len(read_data), 10) 211 if not read_data: 212 break 213 zipdata.append(read_data) 214 215 self.assertEqual(b''.join(zipdata), self.data) 216 217 def test_read1_10(self): 218 for f in get_files(self): 219 self.zip_read1_10_test(f, self.compression) 220 221 def zip_readline_read_test(self, f, compression): 222 self.make_test_archive(f, compression) 223 224 # Read the ZIP archive 225 with zipfile.ZipFile(f, "r") as zipfp, \ 226 zipfp.open(TESTFN) as zipopen: 227 data = b'' 228 while True: 229 read = zipopen.readline() 230 if not read: 231 break 232 data += read 233 234 read = zipopen.read(100) 235 if not read: 236 break 237 data += read 238 239 self.assertEqual(data, self.data) 240 241 def test_readline_read(self): 242 # Issue #7610: calls to readline() interleaved with calls to read(). 243 for f in get_files(self): 244 self.zip_readline_read_test(f, self.compression) 245 246 def zip_readline_test(self, f, compression): 247 self.make_test_archive(f, compression) 248 249 # Read the ZIP archive 250 with zipfile.ZipFile(f, "r") as zipfp: 251 with zipfp.open(TESTFN) as zipopen: 252 for line in self.line_gen: 253 linedata = zipopen.readline() 254 self.assertEqual(linedata, line) 255 256 def test_readline(self): 257 for f in get_files(self): 258 self.zip_readline_test(f, self.compression) 259 260 def zip_readlines_test(self, f, compression): 261 self.make_test_archive(f, compression) 262 263 # Read the ZIP archive 264 with zipfile.ZipFile(f, "r") as zipfp: 265 with zipfp.open(TESTFN) as zipopen: 266 ziplines = zipopen.readlines() 267 for line, zipline in zip(self.line_gen, ziplines): 268 self.assertEqual(zipline, line) 269 270 def test_readlines(self): 271 for f in get_files(self): 272 self.zip_readlines_test(f, self.compression) 273 274 def zip_iterlines_test(self, f, compression): 275 self.make_test_archive(f, compression) 276 277 # Read the ZIP archive 278 with zipfile.ZipFile(f, "r") as zipfp: 279 with zipfp.open(TESTFN) as zipopen: 280 for line, zipline in zip(self.line_gen, zipopen): 281 self.assertEqual(zipline, line) 282 283 def test_iterlines(self): 284 for f in get_files(self): 285 self.zip_iterlines_test(f, self.compression) 286 287 def test_low_compression(self): 288 """Check for cases where compressed data is larger than original.""" 289 # Create the ZIP archive 290 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 291 zipfp.writestr("strfile", '12') 292 293 # Get an open object for strfile 294 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 295 with zipfp.open("strfile") as openobj: 296 self.assertEqual(openobj.read(1), b'1') 297 self.assertEqual(openobj.read(1), b'2') 298 299 def test_writestr_compression(self): 300 zipfp = zipfile.ZipFile(TESTFN2, "w") 301 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 302 info = zipfp.getinfo('b.txt') 303 self.assertEqual(info.compress_type, self.compression) 304 305 def test_writestr_compresslevel(self): 306 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 307 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 308 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 309 compresslevel=2) 310 311 # Compression level follows the constructor. 312 a_info = zipfp.getinfo('a.txt') 313 self.assertEqual(a_info.compress_type, self.compression) 314 self.assertEqual(a_info._compresslevel, 1) 315 316 # Compression level is overridden. 317 b_info = zipfp.getinfo('b.txt') 318 self.assertEqual(b_info.compress_type, self.compression) 319 self.assertEqual(b_info._compresslevel, 2) 320 321 def test_read_return_size(self): 322 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 323 # than requested. 324 for test_size in (1, 4095, 4096, 4097, 16384): 325 file_size = test_size + 1 326 junk = randbytes(file_size) 327 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 328 zipf.writestr('foo', junk) 329 with zipf.open('foo', 'r') as fp: 330 buf = fp.read(test_size) 331 self.assertEqual(len(buf), test_size) 332 333 def test_truncated_zipfile(self): 334 fp = io.BytesIO() 335 with zipfile.ZipFile(fp, mode='w') as zipf: 336 zipf.writestr('strfile', self.data, compress_type=self.compression) 337 end_offset = fp.tell() 338 zipfiledata = fp.getvalue() 339 340 fp = io.BytesIO(zipfiledata) 341 with zipfile.ZipFile(fp) as zipf: 342 with zipf.open('strfile') as zipopen: 343 fp.truncate(end_offset - 20) 344 with self.assertRaises(EOFError): 345 zipopen.read() 346 347 fp = io.BytesIO(zipfiledata) 348 with zipfile.ZipFile(fp) as zipf: 349 with zipf.open('strfile') as zipopen: 350 fp.truncate(end_offset - 20) 351 with self.assertRaises(EOFError): 352 while zipopen.read(100): 353 pass 354 355 fp = io.BytesIO(zipfiledata) 356 with zipfile.ZipFile(fp) as zipf: 357 with zipf.open('strfile') as zipopen: 358 fp.truncate(end_offset - 20) 359 with self.assertRaises(EOFError): 360 while zipopen.read1(100): 361 pass 362 363 def test_repr(self): 364 fname = 'file.name' 365 for f in get_files(self): 366 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 367 zipfp.write(TESTFN, fname) 368 r = repr(zipfp) 369 self.assertIn("mode='w'", r) 370 371 with zipfile.ZipFile(f, 'r') as zipfp: 372 r = repr(zipfp) 373 if isinstance(f, str): 374 self.assertIn('filename=%r' % f, r) 375 else: 376 self.assertIn('file=%r' % f, r) 377 self.assertIn("mode='r'", r) 378 r = repr(zipfp.getinfo(fname)) 379 self.assertIn('filename=%r' % fname, r) 380 self.assertIn('filemode=', r) 381 self.assertIn('file_size=', r) 382 if self.compression != zipfile.ZIP_STORED: 383 self.assertIn('compress_type=', r) 384 self.assertIn('compress_size=', r) 385 with zipfp.open(fname) as zipopen: 386 r = repr(zipopen) 387 self.assertIn('name=%r' % fname, r) 388 self.assertIn("mode='r'", r) 389 if self.compression != zipfile.ZIP_STORED: 390 self.assertIn('compress_type=', r) 391 self.assertIn('[closed]', repr(zipopen)) 392 self.assertIn('[closed]', repr(zipfp)) 393 394 def test_compresslevel_basic(self): 395 for f in get_files(self): 396 self.zip_test(f, self.compression, compresslevel=9) 397 398 def test_per_file_compresslevel(self): 399 """Check that files within a Zip archive can have different 400 compression levels.""" 401 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 402 zipfp.write(TESTFN, 'compress_1') 403 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 404 one_info = zipfp.getinfo('compress_1') 405 nine_info = zipfp.getinfo('compress_9') 406 self.assertEqual(one_info._compresslevel, 1) 407 self.assertEqual(nine_info._compresslevel, 9) 408 409 def test_writing_errors(self): 410 class BrokenFile(io.BytesIO): 411 def write(self, data): 412 nonlocal count 413 if count is not None: 414 if count == stop: 415 raise OSError 416 count += 1 417 super().write(data) 418 419 stop = 0 420 while True: 421 testfile = BrokenFile() 422 count = None 423 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 424 with zipfp.open('file1', 'w') as f: 425 f.write(b'data1') 426 count = 0 427 try: 428 with zipfp.open('file2', 'w') as f: 429 f.write(b'data2') 430 except OSError: 431 stop += 1 432 else: 433 break 434 finally: 435 count = None 436 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 437 self.assertEqual(zipfp.namelist(), ['file1']) 438 self.assertEqual(zipfp.read('file1'), b'data1') 439 440 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 441 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 442 self.assertEqual(zipfp.read('file1'), b'data1') 443 self.assertEqual(zipfp.read('file2'), b'data2') 444 445 446 def tearDown(self): 447 unlink(TESTFN) 448 unlink(TESTFN2) 449 450 451class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 452 unittest.TestCase): 453 compression = zipfile.ZIP_STORED 454 test_low_compression = None 455 456 def zip_test_writestr_permissions(self, f, compression): 457 # Make sure that writestr and open(... mode='w') create files with 458 # mode 0600, when they are passed a name rather than a ZipInfo 459 # instance. 460 461 self.make_test_archive(f, compression) 462 with zipfile.ZipFile(f, "r") as zipfp: 463 zinfo = zipfp.getinfo('strfile') 464 self.assertEqual(zinfo.external_attr, 0o600 << 16) 465 466 zinfo2 = zipfp.getinfo('written-open-w') 467 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 468 469 def test_writestr_permissions(self): 470 for f in get_files(self): 471 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 472 473 def test_absolute_arcnames(self): 474 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 475 zipfp.write(TESTFN, "/absolute") 476 477 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 478 self.assertEqual(zipfp.namelist(), ["absolute"]) 479 480 def test_append_to_zip_file(self): 481 """Test appending to an existing zipfile.""" 482 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 483 zipfp.write(TESTFN, TESTFN) 484 485 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 486 zipfp.writestr("strfile", self.data) 487 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 488 489 def test_append_to_non_zip_file(self): 490 """Test appending to an existing file that is not a zipfile.""" 491 # NOTE: this test fails if len(d) < 22 because of the first 492 # line "fpin.seek(-22, 2)" in _EndRecData 493 data = b'I am not a ZipFile!'*10 494 with open(TESTFN2, 'wb') as f: 495 f.write(data) 496 497 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 498 zipfp.write(TESTFN, TESTFN) 499 500 with open(TESTFN2, 'rb') as f: 501 f.seek(len(data)) 502 with zipfile.ZipFile(f, "r") as zipfp: 503 self.assertEqual(zipfp.namelist(), [TESTFN]) 504 self.assertEqual(zipfp.read(TESTFN), self.data) 505 with open(TESTFN2, 'rb') as f: 506 self.assertEqual(f.read(len(data)), data) 507 zipfiledata = f.read() 508 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 509 self.assertEqual(zipfp.namelist(), [TESTFN]) 510 self.assertEqual(zipfp.read(TESTFN), self.data) 511 512 def test_read_concatenated_zip_file(self): 513 with io.BytesIO() as bio: 514 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 515 zipfp.write(TESTFN, TESTFN) 516 zipfiledata = bio.getvalue() 517 data = b'I am not a ZipFile!'*10 518 with open(TESTFN2, 'wb') as f: 519 f.write(data) 520 f.write(zipfiledata) 521 522 with zipfile.ZipFile(TESTFN2) as zipfp: 523 self.assertEqual(zipfp.namelist(), [TESTFN]) 524 self.assertEqual(zipfp.read(TESTFN), self.data) 525 526 def test_append_to_concatenated_zip_file(self): 527 with io.BytesIO() as bio: 528 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 529 zipfp.write(TESTFN, TESTFN) 530 zipfiledata = bio.getvalue() 531 data = b'I am not a ZipFile!'*1000000 532 with open(TESTFN2, 'wb') as f: 533 f.write(data) 534 f.write(zipfiledata) 535 536 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 537 self.assertEqual(zipfp.namelist(), [TESTFN]) 538 zipfp.writestr('strfile', self.data) 539 540 with open(TESTFN2, 'rb') as f: 541 self.assertEqual(f.read(len(data)), data) 542 zipfiledata = f.read() 543 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 544 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 545 self.assertEqual(zipfp.read(TESTFN), self.data) 546 self.assertEqual(zipfp.read('strfile'), self.data) 547 548 def test_ignores_newline_at_end(self): 549 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 550 zipfp.write(TESTFN, TESTFN) 551 with open(TESTFN2, 'a', encoding='utf-8') as f: 552 f.write("\r\n\00\00\00") 553 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 554 self.assertIsInstance(zipfp, zipfile.ZipFile) 555 556 def test_ignores_stuff_appended_past_comments(self): 557 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 558 zipfp.comment = b"this is a comment" 559 zipfp.write(TESTFN, TESTFN) 560 with open(TESTFN2, 'a', encoding='utf-8') as f: 561 f.write("abcdef\r\n") 562 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 563 self.assertIsInstance(zipfp, zipfile.ZipFile) 564 self.assertEqual(zipfp.comment, b"this is a comment") 565 566 def test_write_default_name(self): 567 """Check that calling ZipFile.write without arcname specified 568 produces the expected result.""" 569 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 570 zipfp.write(TESTFN) 571 with open(TESTFN, "rb") as f: 572 self.assertEqual(zipfp.read(TESTFN), f.read()) 573 574 def test_io_on_closed_zipextfile(self): 575 fname = "somefile.txt" 576 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 577 zipfp.writestr(fname, "bogus") 578 579 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 580 with zipfp.open(fname) as fid: 581 fid.close() 582 self.assertRaises(ValueError, fid.read) 583 self.assertRaises(ValueError, fid.seek, 0) 584 self.assertRaises(ValueError, fid.tell) 585 self.assertRaises(ValueError, fid.readable) 586 self.assertRaises(ValueError, fid.seekable) 587 588 def test_write_to_readonly(self): 589 """Check that trying to call write() on a readonly ZipFile object 590 raises a ValueError.""" 591 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 592 zipfp.writestr("somefile.txt", "bogus") 593 594 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 595 self.assertRaises(ValueError, zipfp.write, TESTFN) 596 597 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 598 with self.assertRaises(ValueError): 599 zipfp.open(TESTFN, mode='w') 600 601 def test_add_file_before_1980(self): 602 # Set atime and mtime to 1970-01-01 603 os.utime(TESTFN, (0, 0)) 604 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 605 self.assertRaises(ValueError, zipfp.write, TESTFN) 606 607 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 608 zipfp.write(TESTFN) 609 zinfo = zipfp.getinfo(TESTFN) 610 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 611 612 def test_add_file_after_2107(self): 613 # Set atime and mtime to 2108-12-30 614 ts = 4386268800 615 try: 616 time.localtime(ts) 617 except OverflowError: 618 self.skipTest(f'time.localtime({ts}) raises OverflowError') 619 try: 620 os.utime(TESTFN, (ts, ts)) 621 except OverflowError: 622 self.skipTest('Host fs cannot set timestamp to required value.') 623 624 mtime_ns = os.stat(TESTFN).st_mtime_ns 625 if mtime_ns != (4386268800 * 10**9): 626 # XFS filesystem is limited to 32-bit timestamp, but the syscall 627 # didn't fail. Moreover, there is a VFS bug which returns 628 # a cached timestamp which is different than the value on disk. 629 # 630 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 631 # 632 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 633 # https://bugs.python.org/issue39460#msg360952 634 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 635 636 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 637 self.assertRaises(struct.error, zipfp.write, TESTFN) 638 639 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 640 zipfp.write(TESTFN) 641 zinfo = zipfp.getinfo(TESTFN) 642 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 643 644 645@requires_zlib() 646class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 647 unittest.TestCase): 648 compression = zipfile.ZIP_DEFLATED 649 650 def test_per_file_compression(self): 651 """Check that files within a Zip archive can have different 652 compression options.""" 653 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 654 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 655 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 656 sinfo = zipfp.getinfo('storeme') 657 dinfo = zipfp.getinfo('deflateme') 658 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 659 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 660 661@requires_bz2() 662class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 663 unittest.TestCase): 664 compression = zipfile.ZIP_BZIP2 665 666@requires_lzma() 667class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 668 unittest.TestCase): 669 compression = zipfile.ZIP_LZMA 670 671 672class AbstractTestZip64InSmallFiles: 673 # These tests test the ZIP64 functionality without using large files, 674 # see test_zipfile64 for proper tests. 675 676 @classmethod 677 def setUpClass(cls): 678 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 679 for i in range(0, FIXEDTEST_SIZE)) 680 cls.data = b'\n'.join(line_gen) 681 682 def setUp(self): 683 self._limit = zipfile.ZIP64_LIMIT 684 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 685 zipfile.ZIP64_LIMIT = 1000 686 zipfile.ZIP_FILECOUNT_LIMIT = 9 687 688 # Make a source file with some lines 689 with open(TESTFN, "wb") as fp: 690 fp.write(self.data) 691 692 def zip_test(self, f, compression): 693 # Create the ZIP archive 694 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 695 zipfp.write(TESTFN, "another.name") 696 zipfp.write(TESTFN, TESTFN) 697 zipfp.writestr("strfile", self.data) 698 699 # Read the ZIP archive 700 with zipfile.ZipFile(f, "r", compression) as zipfp: 701 self.assertEqual(zipfp.read(TESTFN), self.data) 702 self.assertEqual(zipfp.read("another.name"), self.data) 703 self.assertEqual(zipfp.read("strfile"), self.data) 704 705 # Print the ZIP directory 706 fp = io.StringIO() 707 zipfp.printdir(fp) 708 709 directory = fp.getvalue() 710 lines = directory.splitlines() 711 self.assertEqual(len(lines), 4) # Number of files + header 712 713 self.assertIn('File Name', lines[0]) 714 self.assertIn('Modified', lines[0]) 715 self.assertIn('Size', lines[0]) 716 717 fn, date, time_, size = lines[1].split() 718 self.assertEqual(fn, 'another.name') 719 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 720 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 721 self.assertEqual(size, str(len(self.data))) 722 723 # Check the namelist 724 names = zipfp.namelist() 725 self.assertEqual(len(names), 3) 726 self.assertIn(TESTFN, names) 727 self.assertIn("another.name", names) 728 self.assertIn("strfile", names) 729 730 # Check infolist 731 infos = zipfp.infolist() 732 names = [i.filename for i in infos] 733 self.assertEqual(len(names), 3) 734 self.assertIn(TESTFN, names) 735 self.assertIn("another.name", names) 736 self.assertIn("strfile", names) 737 for i in infos: 738 self.assertEqual(i.file_size, len(self.data)) 739 740 # check getinfo 741 for nm in (TESTFN, "another.name", "strfile"): 742 info = zipfp.getinfo(nm) 743 self.assertEqual(info.filename, nm) 744 self.assertEqual(info.file_size, len(self.data)) 745 746 # Check that testzip doesn't raise an exception 747 zipfp.testzip() 748 749 def test_basic(self): 750 for f in get_files(self): 751 self.zip_test(f, self.compression) 752 753 def test_too_many_files(self): 754 # This test checks that more than 64k files can be added to an archive, 755 # and that the resulting archive can be read properly by ZipFile 756 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 757 allowZip64=True) 758 zipf.debug = 100 759 numfiles = 15 760 for i in range(numfiles): 761 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 762 self.assertEqual(len(zipf.namelist()), numfiles) 763 zipf.close() 764 765 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 766 self.assertEqual(len(zipf2.namelist()), numfiles) 767 for i in range(numfiles): 768 content = zipf2.read("foo%08d" % i).decode('ascii') 769 self.assertEqual(content, "%d" % (i**3 % 57)) 770 zipf2.close() 771 772 def test_too_many_files_append(self): 773 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 774 allowZip64=False) 775 zipf.debug = 100 776 numfiles = 9 777 for i in range(numfiles): 778 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 779 self.assertEqual(len(zipf.namelist()), numfiles) 780 with self.assertRaises(zipfile.LargeZipFile): 781 zipf.writestr("foo%08d" % numfiles, b'') 782 self.assertEqual(len(zipf.namelist()), numfiles) 783 zipf.close() 784 785 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 786 allowZip64=False) 787 zipf.debug = 100 788 self.assertEqual(len(zipf.namelist()), numfiles) 789 with self.assertRaises(zipfile.LargeZipFile): 790 zipf.writestr("foo%08d" % numfiles, b'') 791 self.assertEqual(len(zipf.namelist()), numfiles) 792 zipf.close() 793 794 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 795 allowZip64=True) 796 zipf.debug = 100 797 self.assertEqual(len(zipf.namelist()), numfiles) 798 numfiles2 = 15 799 for i in range(numfiles, numfiles2): 800 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 801 self.assertEqual(len(zipf.namelist()), numfiles2) 802 zipf.close() 803 804 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 805 self.assertEqual(len(zipf2.namelist()), numfiles2) 806 for i in range(numfiles2): 807 content = zipf2.read("foo%08d" % i).decode('ascii') 808 self.assertEqual(content, "%d" % (i**3 % 57)) 809 zipf2.close() 810 811 def tearDown(self): 812 zipfile.ZIP64_LIMIT = self._limit 813 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 814 unlink(TESTFN) 815 unlink(TESTFN2) 816 817 818class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 819 unittest.TestCase): 820 compression = zipfile.ZIP_STORED 821 822 def large_file_exception_test(self, f, compression): 823 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 824 self.assertRaises(zipfile.LargeZipFile, 825 zipfp.write, TESTFN, "another.name") 826 827 def large_file_exception_test2(self, f, compression): 828 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 829 self.assertRaises(zipfile.LargeZipFile, 830 zipfp.writestr, "another.name", self.data) 831 832 def test_large_file_exception(self): 833 for f in get_files(self): 834 self.large_file_exception_test(f, zipfile.ZIP_STORED) 835 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 836 837 def test_absolute_arcnames(self): 838 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 839 allowZip64=True) as zipfp: 840 zipfp.write(TESTFN, "/absolute") 841 842 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 843 self.assertEqual(zipfp.namelist(), ["absolute"]) 844 845 def test_append(self): 846 # Test that appending to the Zip64 archive doesn't change 847 # extra fields of existing entries. 848 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 849 zipfp.writestr("strfile", self.data) 850 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 851 zinfo = zipfp.getinfo("strfile") 852 extra = zinfo.extra 853 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 854 zipfp.writestr("strfile2", self.data) 855 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 856 zinfo = zipfp.getinfo("strfile") 857 self.assertEqual(zinfo.extra, extra) 858 859 def make_zip64_file( 860 self, file_size_64_set=False, file_size_extra=False, 861 compress_size_64_set=False, compress_size_extra=False, 862 header_offset_64_set=False, header_offset_extra=False, 863 ): 864 """Generate bytes sequence for a zip with (incomplete) zip64 data. 865 866 The actual values (not the zip 64 0xffffffff values) stored in the file 867 are: 868 file_size: 8 869 compress_size: 8 870 header_offset: 0 871 """ 872 actual_size = 8 873 actual_header_offset = 0 874 local_zip64_fields = [] 875 central_zip64_fields = [] 876 877 file_size = actual_size 878 if file_size_64_set: 879 file_size = 0xffffffff 880 if file_size_extra: 881 local_zip64_fields.append(actual_size) 882 central_zip64_fields.append(actual_size) 883 file_size = struct.pack("<L", file_size) 884 885 compress_size = actual_size 886 if compress_size_64_set: 887 compress_size = 0xffffffff 888 if compress_size_extra: 889 local_zip64_fields.append(actual_size) 890 central_zip64_fields.append(actual_size) 891 compress_size = struct.pack("<L", compress_size) 892 893 header_offset = actual_header_offset 894 if header_offset_64_set: 895 header_offset = 0xffffffff 896 if header_offset_extra: 897 central_zip64_fields.append(actual_header_offset) 898 header_offset = struct.pack("<L", header_offset) 899 900 local_extra = struct.pack( 901 '<HH' + 'Q'*len(local_zip64_fields), 902 0x0001, 903 8*len(local_zip64_fields), 904 *local_zip64_fields 905 ) 906 907 central_extra = struct.pack( 908 '<HH' + 'Q'*len(central_zip64_fields), 909 0x0001, 910 8*len(central_zip64_fields), 911 *central_zip64_fields 912 ) 913 914 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 915 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 916 917 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 918 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 919 920 filename = b"test.txt" 921 content = b"test1234" 922 filename_length = struct.pack("<H", len(filename)) 923 zip64_contents = ( 924 # Local file header 925 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 926 + compress_size 927 + file_size 928 + filename_length 929 + local_extra_length 930 + filename 931 + local_extra 932 + content 933 # Central directory: 934 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 935 + compress_size 936 + file_size 937 + filename_length 938 + central_extra_length 939 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 940 + header_offset 941 + filename 942 + central_extra 943 # Zip64 end of central directory 944 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 945 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 946 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 947 + central_dir_size 948 + offset_to_central_dir 949 # Zip64 end of central directory locator 950 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 951 + b"\x00\x00\x00" 952 # end of central directory 953 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 954 + b"\x00\x00\x00\x00" 955 ) 956 return zip64_contents 957 958 def test_bad_zip64_extra(self): 959 """Missing zip64 extra records raises an exception. 960 961 There are 4 fields that the zip64 format handles (the disk number is 962 not used in this module and so is ignored here). According to the zip 963 spec: 964 The order of the fields in the zip64 extended 965 information record is fixed, but the fields MUST 966 only appear if the corresponding Local or Central 967 directory record field is set to 0xFFFF or 0xFFFFFFFF. 968 969 If the zip64 extra content doesn't contain enough entries for the 970 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 971 This test mismatches the length of the zip64 extra field and the number 972 of fields set to indicate the presence of zip64 data. 973 """ 974 # zip64 file size present, no fields in extra, expecting one, equals 975 # missing file size. 976 missing_file_size_extra = self.make_zip64_file( 977 file_size_64_set=True, 978 ) 979 with self.assertRaises(zipfile.BadZipFile) as e: 980 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 981 self.assertIn('file size', str(e.exception).lower()) 982 983 # zip64 file size present, zip64 compress size present, one field in 984 # extra, expecting two, equals missing compress size. 985 missing_compress_size_extra = self.make_zip64_file( 986 file_size_64_set=True, 987 file_size_extra=True, 988 compress_size_64_set=True, 989 ) 990 with self.assertRaises(zipfile.BadZipFile) as e: 991 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 992 self.assertIn('compress size', str(e.exception).lower()) 993 994 # zip64 compress size present, no fields in extra, expecting one, 995 # equals missing compress size. 996 missing_compress_size_extra = self.make_zip64_file( 997 compress_size_64_set=True, 998 ) 999 with self.assertRaises(zipfile.BadZipFile) as e: 1000 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1001 self.assertIn('compress size', str(e.exception).lower()) 1002 1003 # zip64 file size present, zip64 compress size present, zip64 header 1004 # offset present, two fields in extra, expecting three, equals missing 1005 # header offset 1006 missing_header_offset_extra = self.make_zip64_file( 1007 file_size_64_set=True, 1008 file_size_extra=True, 1009 compress_size_64_set=True, 1010 compress_size_extra=True, 1011 header_offset_64_set=True, 1012 ) 1013 with self.assertRaises(zipfile.BadZipFile) as e: 1014 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1015 self.assertIn('header offset', str(e.exception).lower()) 1016 1017 # zip64 compress size present, zip64 header offset present, one field 1018 # in extra, expecting two, equals missing header offset 1019 missing_header_offset_extra = self.make_zip64_file( 1020 file_size_64_set=False, 1021 compress_size_64_set=True, 1022 compress_size_extra=True, 1023 header_offset_64_set=True, 1024 ) 1025 with self.assertRaises(zipfile.BadZipFile) as e: 1026 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1027 self.assertIn('header offset', str(e.exception).lower()) 1028 1029 # zip64 file size present, zip64 header offset present, one field in 1030 # extra, expecting two, equals missing header offset 1031 missing_header_offset_extra = self.make_zip64_file( 1032 file_size_64_set=True, 1033 file_size_extra=True, 1034 compress_size_64_set=False, 1035 header_offset_64_set=True, 1036 ) 1037 with self.assertRaises(zipfile.BadZipFile) as e: 1038 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1039 self.assertIn('header offset', str(e.exception).lower()) 1040 1041 # zip64 header offset present, no fields in extra, expecting one, 1042 # equals missing header offset 1043 missing_header_offset_extra = self.make_zip64_file( 1044 file_size_64_set=False, 1045 compress_size_64_set=False, 1046 header_offset_64_set=True, 1047 ) 1048 with self.assertRaises(zipfile.BadZipFile) as e: 1049 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1050 self.assertIn('header offset', str(e.exception).lower()) 1051 1052 def test_generated_valid_zip64_extra(self): 1053 # These values are what is set in the make_zip64_file method. 1054 expected_file_size = 8 1055 expected_compress_size = 8 1056 expected_header_offset = 0 1057 expected_content = b"test1234" 1058 1059 # Loop through the various valid combinations of zip64 masks 1060 # present and extra fields present. 1061 params = ( 1062 {"file_size_64_set": True, "file_size_extra": True}, 1063 {"compress_size_64_set": True, "compress_size_extra": True}, 1064 {"header_offset_64_set": True, "header_offset_extra": True}, 1065 ) 1066 1067 for r in range(1, len(params) + 1): 1068 for combo in itertools.combinations(params, r): 1069 kwargs = {} 1070 for c in combo: 1071 kwargs.update(c) 1072 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1073 zinfo = zf.infolist()[0] 1074 self.assertEqual(zinfo.file_size, expected_file_size) 1075 self.assertEqual(zinfo.compress_size, expected_compress_size) 1076 self.assertEqual(zinfo.header_offset, expected_header_offset) 1077 self.assertEqual(zf.read(zinfo), expected_content) 1078 1079 1080@requires_zlib() 1081class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1082 unittest.TestCase): 1083 compression = zipfile.ZIP_DEFLATED 1084 1085@requires_bz2() 1086class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1087 unittest.TestCase): 1088 compression = zipfile.ZIP_BZIP2 1089 1090@requires_lzma() 1091class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1092 unittest.TestCase): 1093 compression = zipfile.ZIP_LZMA 1094 1095 1096class AbstractWriterTests: 1097 1098 def tearDown(self): 1099 unlink(TESTFN2) 1100 1101 def test_close_after_close(self): 1102 data = b'content' 1103 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1104 w = zipf.open('test', 'w') 1105 w.write(data) 1106 w.close() 1107 self.assertTrue(w.closed) 1108 w.close() 1109 self.assertTrue(w.closed) 1110 self.assertEqual(zipf.read('test'), data) 1111 1112 def test_write_after_close(self): 1113 data = b'content' 1114 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1115 w = zipf.open('test', 'w') 1116 w.write(data) 1117 w.close() 1118 self.assertTrue(w.closed) 1119 self.assertRaises(ValueError, w.write, b'') 1120 self.assertEqual(zipf.read('test'), data) 1121 1122class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1123 compression = zipfile.ZIP_STORED 1124 1125@requires_zlib() 1126class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1127 compression = zipfile.ZIP_DEFLATED 1128 1129@requires_bz2() 1130class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1131 compression = zipfile.ZIP_BZIP2 1132 1133@requires_lzma() 1134class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1135 compression = zipfile.ZIP_LZMA 1136 1137 1138class PyZipFileTests(unittest.TestCase): 1139 def assertCompiledIn(self, name, namelist): 1140 if name + 'o' not in namelist: 1141 self.assertIn(name + 'c', namelist) 1142 1143 def requiresWriteAccess(self, path): 1144 # effective_ids unavailable on windows 1145 if not os.access(path, os.W_OK, 1146 effective_ids=os.access in os.supports_effective_ids): 1147 self.skipTest('requires write access to the installed location') 1148 filename = os.path.join(path, 'test_zipfile.try') 1149 try: 1150 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1151 os.close(fd) 1152 except Exception: 1153 self.skipTest('requires write access to the installed location') 1154 unlink(filename) 1155 1156 def test_write_pyfile(self): 1157 self.requiresWriteAccess(os.path.dirname(__file__)) 1158 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1159 fn = __file__ 1160 if fn.endswith('.pyc'): 1161 path_split = fn.split(os.sep) 1162 if os.altsep is not None: 1163 path_split.extend(fn.split(os.altsep)) 1164 if '__pycache__' in path_split: 1165 fn = importlib.util.source_from_cache(fn) 1166 else: 1167 fn = fn[:-1] 1168 1169 zipfp.writepy(fn) 1170 1171 bn = os.path.basename(fn) 1172 self.assertNotIn(bn, zipfp.namelist()) 1173 self.assertCompiledIn(bn, zipfp.namelist()) 1174 1175 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1176 fn = __file__ 1177 if fn.endswith('.pyc'): 1178 fn = fn[:-1] 1179 1180 zipfp.writepy(fn, "testpackage") 1181 1182 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1183 self.assertNotIn(bn, zipfp.namelist()) 1184 self.assertCompiledIn(bn, zipfp.namelist()) 1185 1186 def test_write_python_package(self): 1187 import email 1188 packagedir = os.path.dirname(email.__file__) 1189 self.requiresWriteAccess(packagedir) 1190 1191 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1192 zipfp.writepy(packagedir) 1193 1194 # Check for a couple of modules at different levels of the 1195 # hierarchy 1196 names = zipfp.namelist() 1197 self.assertCompiledIn('email/__init__.py', names) 1198 self.assertCompiledIn('email/mime/text.py', names) 1199 1200 def test_write_filtered_python_package(self): 1201 import test 1202 packagedir = os.path.dirname(test.__file__) 1203 self.requiresWriteAccess(packagedir) 1204 1205 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1206 1207 # first make sure that the test folder gives error messages 1208 # (on the badsyntax_... files) 1209 with captured_stdout() as reportSIO: 1210 zipfp.writepy(packagedir) 1211 reportStr = reportSIO.getvalue() 1212 self.assertTrue('SyntaxError' in reportStr) 1213 1214 # then check that the filter works on the whole package 1215 with captured_stdout() as reportSIO: 1216 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1217 reportStr = reportSIO.getvalue() 1218 self.assertTrue('SyntaxError' not in reportStr) 1219 1220 # then check that the filter works on individual files 1221 def filter(path): 1222 return not os.path.basename(path).startswith("bad") 1223 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1224 zipfp.writepy(packagedir, filterfunc=filter) 1225 reportStr = reportSIO.getvalue() 1226 if reportStr: 1227 print(reportStr) 1228 self.assertTrue('SyntaxError' not in reportStr) 1229 1230 def test_write_with_optimization(self): 1231 import email 1232 packagedir = os.path.dirname(email.__file__) 1233 self.requiresWriteAccess(packagedir) 1234 optlevel = 1 if __debug__ else 0 1235 ext = '.pyc' 1236 1237 with TemporaryFile() as t, \ 1238 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1239 zipfp.writepy(packagedir) 1240 1241 names = zipfp.namelist() 1242 self.assertIn('email/__init__' + ext, names) 1243 self.assertIn('email/mime/text' + ext, names) 1244 1245 def test_write_python_directory(self): 1246 os.mkdir(TESTFN2) 1247 try: 1248 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1249 fp.write("print(42)\n") 1250 1251 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1252 fp.write("print(42 * 42)\n") 1253 1254 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1255 fp.write("bla bla bla\n") 1256 1257 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1258 zipfp.writepy(TESTFN2) 1259 1260 names = zipfp.namelist() 1261 self.assertCompiledIn('mod1.py', names) 1262 self.assertCompiledIn('mod2.py', names) 1263 self.assertNotIn('mod2.txt', names) 1264 1265 finally: 1266 rmtree(TESTFN2) 1267 1268 def test_write_python_directory_filtered(self): 1269 os.mkdir(TESTFN2) 1270 try: 1271 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1272 fp.write("print(42)\n") 1273 1274 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1275 fp.write("print(42 * 42)\n") 1276 1277 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1278 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1279 not fn.endswith('mod2.py')) 1280 1281 names = zipfp.namelist() 1282 self.assertCompiledIn('mod1.py', names) 1283 self.assertNotIn('mod2.py', names) 1284 1285 finally: 1286 rmtree(TESTFN2) 1287 1288 def test_write_non_pyfile(self): 1289 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1290 with open(TESTFN, 'w', encoding='utf-8') as f: 1291 f.write('most definitely not a python file') 1292 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1293 unlink(TESTFN) 1294 1295 def test_write_pyfile_bad_syntax(self): 1296 os.mkdir(TESTFN2) 1297 try: 1298 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1299 fp.write("Bad syntax in python file\n") 1300 1301 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1302 # syntax errors are printed to stdout 1303 with captured_stdout() as s: 1304 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1305 1306 self.assertIn("SyntaxError", s.getvalue()) 1307 1308 # as it will not have compiled the python file, it will 1309 # include the .py file not .pyc 1310 names = zipfp.namelist() 1311 self.assertIn('mod1.py', names) 1312 self.assertNotIn('mod1.pyc', names) 1313 1314 finally: 1315 rmtree(TESTFN2) 1316 1317 def test_write_pathlike(self): 1318 os.mkdir(TESTFN2) 1319 try: 1320 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1321 fp.write("print(42)\n") 1322 1323 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1324 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1325 names = zipfp.namelist() 1326 self.assertCompiledIn('mod1.py', names) 1327 finally: 1328 rmtree(TESTFN2) 1329 1330 1331class ExtractTests(unittest.TestCase): 1332 1333 def make_test_file(self): 1334 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1335 for fpath, fdata in SMALL_TEST_DATA: 1336 zipfp.writestr(fpath, fdata) 1337 1338 def test_extract(self): 1339 with temp_cwd(): 1340 self.make_test_file() 1341 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1342 for fpath, fdata in SMALL_TEST_DATA: 1343 writtenfile = zipfp.extract(fpath) 1344 1345 # make sure it was written to the right place 1346 correctfile = os.path.join(os.getcwd(), fpath) 1347 correctfile = os.path.normpath(correctfile) 1348 1349 self.assertEqual(writtenfile, correctfile) 1350 1351 # make sure correct data is in correct file 1352 with open(writtenfile, "rb") as f: 1353 self.assertEqual(fdata.encode(), f.read()) 1354 1355 unlink(writtenfile) 1356 1357 def _test_extract_with_target(self, target): 1358 self.make_test_file() 1359 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1360 for fpath, fdata in SMALL_TEST_DATA: 1361 writtenfile = zipfp.extract(fpath, target) 1362 1363 # make sure it was written to the right place 1364 correctfile = os.path.join(target, fpath) 1365 correctfile = os.path.normpath(correctfile) 1366 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1367 1368 # make sure correct data is in correct file 1369 with open(writtenfile, "rb") as f: 1370 self.assertEqual(fdata.encode(), f.read()) 1371 1372 unlink(writtenfile) 1373 1374 unlink(TESTFN2) 1375 1376 def test_extract_with_target(self): 1377 with temp_dir() as extdir: 1378 self._test_extract_with_target(extdir) 1379 1380 def test_extract_with_target_pathlike(self): 1381 with temp_dir() as extdir: 1382 self._test_extract_with_target(pathlib.Path(extdir)) 1383 1384 def test_extract_all(self): 1385 with temp_cwd(): 1386 self.make_test_file() 1387 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1388 zipfp.extractall() 1389 for fpath, fdata in SMALL_TEST_DATA: 1390 outfile = os.path.join(os.getcwd(), fpath) 1391 1392 with open(outfile, "rb") as f: 1393 self.assertEqual(fdata.encode(), f.read()) 1394 1395 unlink(outfile) 1396 1397 def _test_extract_all_with_target(self, target): 1398 self.make_test_file() 1399 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1400 zipfp.extractall(target) 1401 for fpath, fdata in SMALL_TEST_DATA: 1402 outfile = os.path.join(target, fpath) 1403 1404 with open(outfile, "rb") as f: 1405 self.assertEqual(fdata.encode(), f.read()) 1406 1407 unlink(outfile) 1408 1409 unlink(TESTFN2) 1410 1411 def test_extract_all_with_target(self): 1412 with temp_dir() as extdir: 1413 self._test_extract_all_with_target(extdir) 1414 1415 def test_extract_all_with_target_pathlike(self): 1416 with temp_dir() as extdir: 1417 self._test_extract_all_with_target(pathlib.Path(extdir)) 1418 1419 def check_file(self, filename, content): 1420 self.assertTrue(os.path.isfile(filename)) 1421 with open(filename, 'rb') as f: 1422 self.assertEqual(f.read(), content) 1423 1424 def test_sanitize_windows_name(self): 1425 san = zipfile.ZipFile._sanitize_windows_name 1426 # Passing pathsep in allows this test to work regardless of platform. 1427 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1428 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1429 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1430 1431 def test_extract_hackers_arcnames_common_cases(self): 1432 common_hacknames = [ 1433 ('../foo/bar', 'foo/bar'), 1434 ('foo/../bar', 'foo/bar'), 1435 ('foo/../../bar', 'foo/bar'), 1436 ('foo/bar/..', 'foo/bar'), 1437 ('./../foo/bar', 'foo/bar'), 1438 ('/foo/bar', 'foo/bar'), 1439 ('/foo/../bar', 'foo/bar'), 1440 ('/foo/../../bar', 'foo/bar'), 1441 ] 1442 self._test_extract_hackers_arcnames(common_hacknames) 1443 1444 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1445 def test_extract_hackers_arcnames_windows_only(self): 1446 """Test combination of path fixing and windows name sanitization.""" 1447 windows_hacknames = [ 1448 (r'..\foo\bar', 'foo/bar'), 1449 (r'..\/foo\/bar', 'foo/bar'), 1450 (r'foo/\..\/bar', 'foo/bar'), 1451 (r'foo\/../\bar', 'foo/bar'), 1452 (r'C:foo/bar', 'foo/bar'), 1453 (r'C:/foo/bar', 'foo/bar'), 1454 (r'C://foo/bar', 'foo/bar'), 1455 (r'C:\foo\bar', 'foo/bar'), 1456 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1457 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1458 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1459 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1460 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1461 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1462 (r'//?/C:/foo/bar', 'foo/bar'), 1463 (r'\\?\C:\foo\bar', 'foo/bar'), 1464 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1465 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1466 ('../../foo../../ba..r', 'foo/ba..r'), 1467 ] 1468 self._test_extract_hackers_arcnames(windows_hacknames) 1469 1470 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1471 def test_extract_hackers_arcnames_posix_only(self): 1472 posix_hacknames = [ 1473 ('//foo/bar', 'foo/bar'), 1474 ('../../foo../../ba..r', 'foo../ba..r'), 1475 (r'foo/..\bar', r'foo/..\bar'), 1476 ] 1477 self._test_extract_hackers_arcnames(posix_hacknames) 1478 1479 def _test_extract_hackers_arcnames(self, hacknames): 1480 for arcname, fixedname in hacknames: 1481 content = b'foobar' + arcname.encode() 1482 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1483 zinfo = zipfile.ZipInfo() 1484 # preserve backslashes 1485 zinfo.filename = arcname 1486 zinfo.external_attr = 0o600 << 16 1487 zipfp.writestr(zinfo, content) 1488 1489 arcname = arcname.replace(os.sep, "/") 1490 targetpath = os.path.join('target', 'subdir', 'subsub') 1491 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1492 1493 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1494 writtenfile = zipfp.extract(arcname, targetpath) 1495 self.assertEqual(writtenfile, correctfile, 1496 msg='extract %r: %r != %r' % 1497 (arcname, writtenfile, correctfile)) 1498 self.check_file(correctfile, content) 1499 rmtree('target') 1500 1501 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1502 zipfp.extractall(targetpath) 1503 self.check_file(correctfile, content) 1504 rmtree('target') 1505 1506 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1507 1508 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1509 writtenfile = zipfp.extract(arcname) 1510 self.assertEqual(writtenfile, correctfile, 1511 msg="extract %r" % arcname) 1512 self.check_file(correctfile, content) 1513 rmtree(fixedname.split('/')[0]) 1514 1515 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1516 zipfp.extractall() 1517 self.check_file(correctfile, content) 1518 rmtree(fixedname.split('/')[0]) 1519 1520 unlink(TESTFN2) 1521 1522 1523class OtherTests(unittest.TestCase): 1524 def test_open_via_zip_info(self): 1525 # Create the ZIP archive 1526 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1527 zipfp.writestr("name", "foo") 1528 with self.assertWarns(UserWarning): 1529 zipfp.writestr("name", "bar") 1530 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1531 1532 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1533 infos = zipfp.infolist() 1534 data = b"" 1535 for info in infos: 1536 with zipfp.open(info) as zipopen: 1537 data += zipopen.read() 1538 self.assertIn(data, {b"foobar", b"barfoo"}) 1539 data = b"" 1540 for info in infos: 1541 data += zipfp.read(info) 1542 self.assertIn(data, {b"foobar", b"barfoo"}) 1543 1544 def test_writestr_extended_local_header_issue1202(self): 1545 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1546 for data in 'abcdefghijklmnop': 1547 zinfo = zipfile.ZipInfo(data) 1548 zinfo.flag_bits |= 0x08 # Include an extended local header. 1549 orig_zip.writestr(zinfo, data) 1550 1551 def test_close(self): 1552 """Check that the zipfile is closed after the 'with' block.""" 1553 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1554 for fpath, fdata in SMALL_TEST_DATA: 1555 zipfp.writestr(fpath, fdata) 1556 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1557 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1558 1559 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1560 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1561 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1562 1563 def test_close_on_exception(self): 1564 """Check that the zipfile is closed if an exception is raised in the 1565 'with' block.""" 1566 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1567 for fpath, fdata in SMALL_TEST_DATA: 1568 zipfp.writestr(fpath, fdata) 1569 1570 try: 1571 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1572 raise zipfile.BadZipFile() 1573 except zipfile.BadZipFile: 1574 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1575 1576 def test_unsupported_version(self): 1577 # File has an extract_version of 120 1578 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1579 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1580 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1581 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1582 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1583 1584 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1585 io.BytesIO(data), 'r') 1586 1587 @requires_zlib() 1588 def test_read_unicode_filenames(self): 1589 # bug #10801 1590 fname = findfile('zip_cp437_header.zip') 1591 with zipfile.ZipFile(fname) as zipfp: 1592 for name in zipfp.namelist(): 1593 zipfp.open(name).close() 1594 1595 def test_write_unicode_filenames(self): 1596 with zipfile.ZipFile(TESTFN, "w") as zf: 1597 zf.writestr("foo.txt", "Test for unicode filename") 1598 zf.writestr("\xf6.txt", "Test for unicode filename") 1599 self.assertIsInstance(zf.infolist()[0].filename, str) 1600 1601 with zipfile.ZipFile(TESTFN, "r") as zf: 1602 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1603 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1604 1605 def test_read_after_write_unicode_filenames(self): 1606 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1607 zipfp.writestr('приклад', b'sample') 1608 self.assertEqual(zipfp.read('приклад'), b'sample') 1609 1610 def test_exclusive_create_zip_file(self): 1611 """Test exclusive creating a new zipfile.""" 1612 unlink(TESTFN2) 1613 filename = 'testfile.txt' 1614 content = b'hello, world. this is some content.' 1615 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1616 zipfp.writestr(filename, content) 1617 with self.assertRaises(FileExistsError): 1618 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1619 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1620 self.assertEqual(zipfp.namelist(), [filename]) 1621 self.assertEqual(zipfp.read(filename), content) 1622 1623 def test_create_non_existent_file_for_append(self): 1624 if os.path.exists(TESTFN): 1625 os.unlink(TESTFN) 1626 1627 filename = 'testfile.txt' 1628 content = b'hello, world. this is some content.' 1629 1630 try: 1631 with zipfile.ZipFile(TESTFN, 'a') as zf: 1632 zf.writestr(filename, content) 1633 except OSError: 1634 self.fail('Could not append data to a non-existent zip file.') 1635 1636 self.assertTrue(os.path.exists(TESTFN)) 1637 1638 with zipfile.ZipFile(TESTFN, 'r') as zf: 1639 self.assertEqual(zf.read(filename), content) 1640 1641 def test_close_erroneous_file(self): 1642 # This test checks that the ZipFile constructor closes the file object 1643 # it opens if there's an error in the file. If it doesn't, the 1644 # traceback holds a reference to the ZipFile object and, indirectly, 1645 # the file object. 1646 # On Windows, this causes the os.unlink() call to fail because the 1647 # underlying file is still open. This is SF bug #412214. 1648 # 1649 with open(TESTFN, "w", encoding="utf-8") as fp: 1650 fp.write("this is not a legal zip file\n") 1651 try: 1652 zf = zipfile.ZipFile(TESTFN) 1653 except zipfile.BadZipFile: 1654 pass 1655 1656 def test_is_zip_erroneous_file(self): 1657 """Check that is_zipfile() correctly identifies non-zip files.""" 1658 # - passing a filename 1659 with open(TESTFN, "w", encoding='utf-8') as fp: 1660 fp.write("this is not a legal zip file\n") 1661 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1662 # - passing a path-like object 1663 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1664 # - passing a file object 1665 with open(TESTFN, "rb") as fp: 1666 self.assertFalse(zipfile.is_zipfile(fp)) 1667 # - passing a file-like object 1668 fp = io.BytesIO() 1669 fp.write(b"this is not a legal zip file\n") 1670 self.assertFalse(zipfile.is_zipfile(fp)) 1671 fp.seek(0, 0) 1672 self.assertFalse(zipfile.is_zipfile(fp)) 1673 1674 def test_damaged_zipfile(self): 1675 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1676 # - Create a valid zip file 1677 fp = io.BytesIO() 1678 with zipfile.ZipFile(fp, mode="w") as zipf: 1679 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1680 zipfiledata = fp.getvalue() 1681 1682 # - Now create copies of it missing the last N bytes and make sure 1683 # a BadZipFile exception is raised when we try to open it 1684 for N in range(len(zipfiledata)): 1685 fp = io.BytesIO(zipfiledata[:N]) 1686 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1687 1688 def test_is_zip_valid_file(self): 1689 """Check that is_zipfile() correctly identifies zip files.""" 1690 # - passing a filename 1691 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1692 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1693 1694 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1695 # - passing a file object 1696 with open(TESTFN, "rb") as fp: 1697 self.assertTrue(zipfile.is_zipfile(fp)) 1698 fp.seek(0, 0) 1699 zip_contents = fp.read() 1700 # - passing a file-like object 1701 fp = io.BytesIO() 1702 fp.write(zip_contents) 1703 self.assertTrue(zipfile.is_zipfile(fp)) 1704 fp.seek(0, 0) 1705 self.assertTrue(zipfile.is_zipfile(fp)) 1706 1707 def test_non_existent_file_raises_OSError(self): 1708 # make sure we don't raise an AttributeError when a partially-constructed 1709 # ZipFile instance is finalized; this tests for regression on SF tracker 1710 # bug #403871. 1711 1712 # The bug we're testing for caused an AttributeError to be raised 1713 # when a ZipFile instance was created for a file that did not 1714 # exist; the .fp member was not initialized but was needed by the 1715 # __del__() method. Since the AttributeError is in the __del__(), 1716 # it is ignored, but the user should be sufficiently annoyed by 1717 # the message on the output that regression will be noticed 1718 # quickly. 1719 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1720 1721 def test_empty_file_raises_BadZipFile(self): 1722 f = open(TESTFN, 'w', encoding='utf-8') 1723 f.close() 1724 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1725 1726 with open(TESTFN, 'w', encoding='utf-8') as fp: 1727 fp.write("short file") 1728 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1729 1730 def test_closed_zip_raises_ValueError(self): 1731 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1732 data = io.BytesIO() 1733 with zipfile.ZipFile(data, mode="w") as zipf: 1734 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1735 1736 # This is correct; calling .read on a closed ZipFile should raise 1737 # a ValueError, and so should calling .testzip. An earlier 1738 # version of .testzip would swallow this exception (and any other) 1739 # and report that the first file in the archive was corrupt. 1740 self.assertRaises(ValueError, zipf.read, "foo.txt") 1741 self.assertRaises(ValueError, zipf.open, "foo.txt") 1742 self.assertRaises(ValueError, zipf.testzip) 1743 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1744 with open(TESTFN, 'w', encoding='utf-8') as f: 1745 f.write('zipfile test data') 1746 self.assertRaises(ValueError, zipf.write, TESTFN) 1747 1748 def test_bad_constructor_mode(self): 1749 """Check that bad modes passed to ZipFile constructor are caught.""" 1750 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1751 1752 def test_bad_open_mode(self): 1753 """Check that bad modes passed to ZipFile.open are caught.""" 1754 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1755 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1756 1757 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1758 # read the data to make sure the file is there 1759 zipf.read("foo.txt") 1760 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1761 # universal newlines support is removed 1762 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1763 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1764 1765 def test_read0(self): 1766 """Check that calling read(0) on a ZipExtFile object returns an empty 1767 string and doesn't advance file pointer.""" 1768 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1769 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1770 # read the data to make sure the file is there 1771 with zipf.open("foo.txt") as f: 1772 for i in range(FIXEDTEST_SIZE): 1773 self.assertEqual(f.read(0), b'') 1774 1775 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1776 1777 def test_open_non_existent_item(self): 1778 """Check that attempting to call open() for an item that doesn't 1779 exist in the archive raises a RuntimeError.""" 1780 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1781 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1782 1783 def test_bad_compression_mode(self): 1784 """Check that bad compression methods passed to ZipFile.open are 1785 caught.""" 1786 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1787 1788 def test_unsupported_compression(self): 1789 # data is declared as shrunk, but actually deflated 1790 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1791 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1792 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1793 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1794 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1795 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1796 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1797 self.assertRaises(NotImplementedError, zipf.open, 'x') 1798 1799 def test_null_byte_in_filename(self): 1800 """Check that a filename containing a null byte is properly 1801 terminated.""" 1802 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1803 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1804 self.assertEqual(zipf.namelist(), ['foo.txt']) 1805 1806 def test_struct_sizes(self): 1807 """Check that ZIP internal structure sizes are calculated correctly.""" 1808 self.assertEqual(zipfile.sizeEndCentDir, 22) 1809 self.assertEqual(zipfile.sizeCentralDir, 46) 1810 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1811 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1812 1813 def test_comments(self): 1814 """Check that comments on the archive are handled properly.""" 1815 1816 # check default comment is empty 1817 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1818 self.assertEqual(zipf.comment, b'') 1819 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1820 1821 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1822 self.assertEqual(zipfr.comment, b'') 1823 1824 # check a simple short comment 1825 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1826 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1827 zipf.comment = comment 1828 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1829 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1830 self.assertEqual(zipf.comment, comment) 1831 1832 # check a comment of max length 1833 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1834 comment2 = comment2.encode("ascii") 1835 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1836 zipf.comment = comment2 1837 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1838 1839 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1840 self.assertEqual(zipfr.comment, comment2) 1841 1842 # check a comment that is too long is truncated 1843 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1844 with self.assertWarns(UserWarning): 1845 zipf.comment = comment2 + b'oops' 1846 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1847 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1848 self.assertEqual(zipfr.comment, comment2) 1849 1850 # check that comments are correctly modified in append mode 1851 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1852 zipf.comment = b"original comment" 1853 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1854 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1855 zipf.comment = b"an updated comment" 1856 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1857 self.assertEqual(zipf.comment, b"an updated comment") 1858 1859 # check that comments are correctly shortened in append mode 1860 # and the file is indeed truncated 1861 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1862 zipf.comment = b"original comment that's longer" 1863 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1864 original_zip_size = os.path.getsize(TESTFN) 1865 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1866 zipf.comment = b"shorter comment" 1867 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 1868 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1869 self.assertEqual(zipf.comment, b"shorter comment") 1870 1871 def test_unicode_comment(self): 1872 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1873 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1874 with self.assertRaises(TypeError): 1875 zipf.comment = "this is an error" 1876 1877 def test_change_comment_in_empty_archive(self): 1878 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1879 self.assertFalse(zipf.filelist) 1880 zipf.comment = b"this is a comment" 1881 with zipfile.ZipFile(TESTFN, "r") as zipf: 1882 self.assertEqual(zipf.comment, b"this is a comment") 1883 1884 def test_change_comment_in_nonempty_archive(self): 1885 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1886 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1887 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1888 self.assertTrue(zipf.filelist) 1889 zipf.comment = b"this is a comment" 1890 with zipfile.ZipFile(TESTFN, "r") as zipf: 1891 self.assertEqual(zipf.comment, b"this is a comment") 1892 1893 def test_empty_zipfile(self): 1894 # Check that creating a file in 'w' or 'a' mode and closing without 1895 # adding any files to the archives creates a valid empty ZIP file 1896 zipf = zipfile.ZipFile(TESTFN, mode="w") 1897 zipf.close() 1898 try: 1899 zipf = zipfile.ZipFile(TESTFN, mode="r") 1900 except zipfile.BadZipFile: 1901 self.fail("Unable to create empty ZIP file in 'w' mode") 1902 1903 zipf = zipfile.ZipFile(TESTFN, mode="a") 1904 zipf.close() 1905 try: 1906 zipf = zipfile.ZipFile(TESTFN, mode="r") 1907 except: 1908 self.fail("Unable to create empty ZIP file in 'a' mode") 1909 1910 def test_open_empty_file(self): 1911 # Issue 1710703: Check that opening a file with less than 22 bytes 1912 # raises a BadZipFile exception (rather than the previously unhelpful 1913 # OSError) 1914 f = open(TESTFN, 'w', encoding='utf-8') 1915 f.close() 1916 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1917 1918 def test_create_zipinfo_before_1980(self): 1919 self.assertRaises(ValueError, 1920 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1921 1922 def test_create_empty_zipinfo_repr(self): 1923 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 1924 zi = zipfile.ZipInfo(filename="empty") 1925 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 1926 1927 def test_create_empty_zipinfo_default_attributes(self): 1928 """Ensure all required attributes are set.""" 1929 zi = zipfile.ZipInfo() 1930 self.assertEqual(zi.orig_filename, "NoName") 1931 self.assertEqual(zi.filename, "NoName") 1932 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 1933 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 1934 self.assertEqual(zi.comment, b"") 1935 self.assertEqual(zi.extra, b"") 1936 self.assertIn(zi.create_system, (0, 3)) 1937 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 1938 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 1939 self.assertEqual(zi.reserved, 0) 1940 self.assertEqual(zi.flag_bits, 0) 1941 self.assertEqual(zi.volume, 0) 1942 self.assertEqual(zi.internal_attr, 0) 1943 self.assertEqual(zi.external_attr, 0) 1944 1945 # Before bpo-26185, both were missing 1946 self.assertEqual(zi.file_size, 0) 1947 self.assertEqual(zi.compress_size, 0) 1948 1949 def test_zipfile_with_short_extra_field(self): 1950 """If an extra field in the header is less than 4 bytes, skip it.""" 1951 zipdata = ( 1952 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1953 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1954 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1955 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1956 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1957 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1958 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1959 ) 1960 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1961 # testzip returns the name of the first corrupt file, or None 1962 self.assertIsNone(zipf.testzip()) 1963 1964 def test_open_conflicting_handles(self): 1965 # It's only possible to open one writable file handle at a time 1966 msg1 = b"It's fun to charter an accountant!" 1967 msg2 = b"And sail the wide accountant sea" 1968 msg3 = b"To find, explore the funds offshore" 1969 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1970 with zipf.open('foo', mode='w') as w2: 1971 w2.write(msg1) 1972 with zipf.open('bar', mode='w') as w1: 1973 with self.assertRaises(ValueError): 1974 zipf.open('handle', mode='w') 1975 with self.assertRaises(ValueError): 1976 zipf.open('foo', mode='r') 1977 with self.assertRaises(ValueError): 1978 zipf.writestr('str', 'abcde') 1979 with self.assertRaises(ValueError): 1980 zipf.write(__file__, 'file') 1981 with self.assertRaises(ValueError): 1982 zipf.close() 1983 w1.write(msg2) 1984 with zipf.open('baz', mode='w') as w2: 1985 w2.write(msg3) 1986 1987 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1988 self.assertEqual(zipf.read('foo'), msg1) 1989 self.assertEqual(zipf.read('bar'), msg2) 1990 self.assertEqual(zipf.read('baz'), msg3) 1991 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 1992 1993 def test_seek_tell(self): 1994 # Test seek functionality 1995 txt = b"Where's Bruce?" 1996 bloc = txt.find(b"Bruce") 1997 # Check seek on a file 1998 with zipfile.ZipFile(TESTFN, "w") as zipf: 1999 zipf.writestr("foo.txt", txt) 2000 with zipfile.ZipFile(TESTFN, "r") as zipf: 2001 with zipf.open("foo.txt", "r") as fp: 2002 fp.seek(bloc, os.SEEK_SET) 2003 self.assertEqual(fp.tell(), bloc) 2004 fp.seek(-bloc, os.SEEK_CUR) 2005 self.assertEqual(fp.tell(), 0) 2006 fp.seek(bloc, os.SEEK_CUR) 2007 self.assertEqual(fp.tell(), bloc) 2008 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2009 fp.seek(0, os.SEEK_END) 2010 self.assertEqual(fp.tell(), len(txt)) 2011 fp.seek(0, os.SEEK_SET) 2012 self.assertEqual(fp.tell(), 0) 2013 # Check seek on memory file 2014 data = io.BytesIO() 2015 with zipfile.ZipFile(data, mode="w") as zipf: 2016 zipf.writestr("foo.txt", txt) 2017 with zipfile.ZipFile(data, mode="r") as zipf: 2018 with zipf.open("foo.txt", "r") as fp: 2019 fp.seek(bloc, os.SEEK_SET) 2020 self.assertEqual(fp.tell(), bloc) 2021 fp.seek(-bloc, os.SEEK_CUR) 2022 self.assertEqual(fp.tell(), 0) 2023 fp.seek(bloc, os.SEEK_CUR) 2024 self.assertEqual(fp.tell(), bloc) 2025 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2026 fp.seek(0, os.SEEK_END) 2027 self.assertEqual(fp.tell(), len(txt)) 2028 fp.seek(0, os.SEEK_SET) 2029 self.assertEqual(fp.tell(), 0) 2030 2031 @requires_bz2() 2032 def test_decompress_without_3rd_party_library(self): 2033 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2034 zip_file = io.BytesIO(data) 2035 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2036 zf.writestr('a.txt', b'a') 2037 with mock.patch('zipfile.bz2', None): 2038 with zipfile.ZipFile(zip_file) as zf: 2039 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2040 2041 @requires_zlib() 2042 def test_full_overlap(self): 2043 data = ( 2044 b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e' 2045 b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00a\xed' 2046 b'\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\d\x0b`P' 2047 b'K\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2' 2048 b'\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00' 2049 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00aPK' 2050 b'\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e' 2051 b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00\x00' 2052 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00bPK\x05' 2053 b'\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00\x00/\x00\x00' 2054 b'\x00\x00\x00' 2055 ) 2056 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2057 self.assertEqual(zipf.namelist(), ['a', 'b']) 2058 zi = zipf.getinfo('a') 2059 self.assertEqual(zi.header_offset, 0) 2060 self.assertEqual(zi.compress_size, 16) 2061 self.assertEqual(zi.file_size, 1033) 2062 zi = zipf.getinfo('b') 2063 self.assertEqual(zi.header_offset, 0) 2064 self.assertEqual(zi.compress_size, 16) 2065 self.assertEqual(zi.file_size, 1033) 2066 self.assertEqual(len(zipf.read('a')), 1033) 2067 with self.assertRaisesRegex(zipfile.BadZipFile, 'File name.*differ'): 2068 zipf.read('b') 2069 2070 @requires_zlib() 2071 def test_quoted_overlap(self): 2072 data = ( 2073 b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05Y\xfc' 2074 b'8\x044\x00\x00\x00(\x04\x00\x00\x01\x00\x00\x00a\x00' 2075 b'\x1f\x00\xe0\xffPK\x03\x04\x14\x00\x00\x00\x08\x00\xa0l' 2076 b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00' 2077 b'\x00\x00b\xed\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\' 2078 b'd\x0b`PK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0' 2079 b'lH\x05Y\xfc8\x044\x00\x00\x00(\x04\x00\x00\x01' 2080 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2081 b'\x00aPK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0l' 2082 b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00' 2083 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00$\x00\x00\x00' 2084 b'bPK\x05\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00' 2085 b'\x00S\x00\x00\x00\x00\x00' 2086 ) 2087 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2088 self.assertEqual(zipf.namelist(), ['a', 'b']) 2089 zi = zipf.getinfo('a') 2090 self.assertEqual(zi.header_offset, 0) 2091 self.assertEqual(zi.compress_size, 52) 2092 self.assertEqual(zi.file_size, 1064) 2093 zi = zipf.getinfo('b') 2094 self.assertEqual(zi.header_offset, 36) 2095 self.assertEqual(zi.compress_size, 16) 2096 self.assertEqual(zi.file_size, 1033) 2097 with self.assertRaisesRegex(zipfile.BadZipFile, 'Overlapped entries'): 2098 zipf.read('a') 2099 self.assertEqual(len(zipf.read('b')), 1033) 2100 2101 def tearDown(self): 2102 unlink(TESTFN) 2103 unlink(TESTFN2) 2104 2105 2106class AbstractBadCrcTests: 2107 def test_testzip_with_bad_crc(self): 2108 """Tests that files with bad CRCs return their name from testzip.""" 2109 zipdata = self.zip_with_bad_crc 2110 2111 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2112 # testzip returns the name of the first corrupt file, or None 2113 self.assertEqual('afile', zipf.testzip()) 2114 2115 def test_read_with_bad_crc(self): 2116 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2117 zipdata = self.zip_with_bad_crc 2118 2119 # Using ZipFile.read() 2120 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2121 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2122 2123 # Using ZipExtFile.read() 2124 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2125 with zipf.open('afile', 'r') as corrupt_file: 2126 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2127 2128 # Same with small reads (in order to exercise the buffering logic) 2129 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2130 with zipf.open('afile', 'r') as corrupt_file: 2131 corrupt_file.MIN_READ_SIZE = 2 2132 with self.assertRaises(zipfile.BadZipFile): 2133 while corrupt_file.read(2): 2134 pass 2135 2136 2137class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2138 compression = zipfile.ZIP_STORED 2139 zip_with_bad_crc = ( 2140 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2141 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2142 b'ilehello,AworldP' 2143 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2144 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2145 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2146 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2147 b'\0\0/\0\0\0\0\0') 2148 2149@requires_zlib() 2150class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2151 compression = zipfile.ZIP_DEFLATED 2152 zip_with_bad_crc = ( 2153 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2154 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2155 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2156 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2157 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2158 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2159 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2160 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2161 2162@requires_bz2() 2163class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2164 compression = zipfile.ZIP_BZIP2 2165 zip_with_bad_crc = ( 2166 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2167 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2168 b'ileBZh91AY&SY\xd4\xa8\xca' 2169 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2170 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2171 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2172 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2173 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2174 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2175 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2176 b'\x00\x00\x00\x00') 2177 2178@requires_lzma() 2179class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2180 compression = zipfile.ZIP_LZMA 2181 zip_with_bad_crc = ( 2182 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2183 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2184 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2185 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2186 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2187 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2188 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2189 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2190 b'\x00>\x00\x00\x00\x00\x00') 2191 2192 2193class DecryptionTests(unittest.TestCase): 2194 """Check that ZIP decryption works. Since the library does not 2195 support encryption at the moment, we use a pre-generated encrypted 2196 ZIP file.""" 2197 2198 data = ( 2199 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2200 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2201 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2202 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2203 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2204 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2205 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2206 data2 = ( 2207 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2208 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2209 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2210 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2211 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2212 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2213 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2214 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2215 2216 plain = b'zipfile.py encryption test' 2217 plain2 = b'\x00'*512 2218 2219 def setUp(self): 2220 with open(TESTFN, "wb") as fp: 2221 fp.write(self.data) 2222 self.zip = zipfile.ZipFile(TESTFN, "r") 2223 with open(TESTFN2, "wb") as fp: 2224 fp.write(self.data2) 2225 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2226 2227 def tearDown(self): 2228 self.zip.close() 2229 os.unlink(TESTFN) 2230 self.zip2.close() 2231 os.unlink(TESTFN2) 2232 2233 def test_no_password(self): 2234 # Reading the encrypted file without password 2235 # must generate a RunTime exception 2236 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2237 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2238 2239 def test_bad_password(self): 2240 self.zip.setpassword(b"perl") 2241 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2242 self.zip2.setpassword(b"perl") 2243 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2244 2245 @requires_zlib() 2246 def test_good_password(self): 2247 self.zip.setpassword(b"python") 2248 self.assertEqual(self.zip.read("test.txt"), self.plain) 2249 self.zip2.setpassword(b"12345") 2250 self.assertEqual(self.zip2.read("zero"), self.plain2) 2251 2252 def test_unicode_password(self): 2253 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2254 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2255 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2256 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2257 2258 def test_seek_tell(self): 2259 self.zip.setpassword(b"python") 2260 txt = self.plain 2261 test_word = b'encryption' 2262 bloc = txt.find(test_word) 2263 bloc_len = len(test_word) 2264 with self.zip.open("test.txt", "r") as fp: 2265 fp.seek(bloc, os.SEEK_SET) 2266 self.assertEqual(fp.tell(), bloc) 2267 fp.seek(-bloc, os.SEEK_CUR) 2268 self.assertEqual(fp.tell(), 0) 2269 fp.seek(bloc, os.SEEK_CUR) 2270 self.assertEqual(fp.tell(), bloc) 2271 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2272 2273 # Make sure that the second read after seeking back beyond 2274 # _readbuffer returns the same content (ie. rewind to the start of 2275 # the file to read forward to the required position). 2276 old_read_size = fp.MIN_READ_SIZE 2277 fp.MIN_READ_SIZE = 1 2278 fp._readbuffer = b'' 2279 fp._offset = 0 2280 fp.seek(0, os.SEEK_SET) 2281 self.assertEqual(fp.tell(), 0) 2282 fp.seek(bloc, os.SEEK_CUR) 2283 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2284 fp.MIN_READ_SIZE = old_read_size 2285 2286 fp.seek(0, os.SEEK_END) 2287 self.assertEqual(fp.tell(), len(txt)) 2288 fp.seek(0, os.SEEK_SET) 2289 self.assertEqual(fp.tell(), 0) 2290 2291 # Read the file completely to definitely call any eof integrity 2292 # checks (crc) and make sure they still pass. 2293 fp.read() 2294 2295 2296class AbstractTestsWithRandomBinaryFiles: 2297 @classmethod 2298 def setUpClass(cls): 2299 datacount = randint(16, 64)*1024 + randint(1, 1024) 2300 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2301 for i in range(datacount)) 2302 2303 def setUp(self): 2304 # Make a source file with some lines 2305 with open(TESTFN, "wb") as fp: 2306 fp.write(self.data) 2307 2308 def tearDown(self): 2309 unlink(TESTFN) 2310 unlink(TESTFN2) 2311 2312 def make_test_archive(self, f, compression): 2313 # Create the ZIP archive 2314 with zipfile.ZipFile(f, "w", compression) as zipfp: 2315 zipfp.write(TESTFN, "another.name") 2316 zipfp.write(TESTFN, TESTFN) 2317 2318 def zip_test(self, f, compression): 2319 self.make_test_archive(f, compression) 2320 2321 # Read the ZIP archive 2322 with zipfile.ZipFile(f, "r", compression) as zipfp: 2323 testdata = zipfp.read(TESTFN) 2324 self.assertEqual(len(testdata), len(self.data)) 2325 self.assertEqual(testdata, self.data) 2326 self.assertEqual(zipfp.read("another.name"), self.data) 2327 2328 def test_read(self): 2329 for f in get_files(self): 2330 self.zip_test(f, self.compression) 2331 2332 def zip_open_test(self, f, compression): 2333 self.make_test_archive(f, compression) 2334 2335 # Read the ZIP archive 2336 with zipfile.ZipFile(f, "r", compression) as zipfp: 2337 zipdata1 = [] 2338 with zipfp.open(TESTFN) as zipopen1: 2339 while True: 2340 read_data = zipopen1.read(256) 2341 if not read_data: 2342 break 2343 zipdata1.append(read_data) 2344 2345 zipdata2 = [] 2346 with zipfp.open("another.name") as zipopen2: 2347 while True: 2348 read_data = zipopen2.read(256) 2349 if not read_data: 2350 break 2351 zipdata2.append(read_data) 2352 2353 testdata1 = b''.join(zipdata1) 2354 self.assertEqual(len(testdata1), len(self.data)) 2355 self.assertEqual(testdata1, self.data) 2356 2357 testdata2 = b''.join(zipdata2) 2358 self.assertEqual(len(testdata2), len(self.data)) 2359 self.assertEqual(testdata2, self.data) 2360 2361 def test_open(self): 2362 for f in get_files(self): 2363 self.zip_open_test(f, self.compression) 2364 2365 def zip_random_open_test(self, f, compression): 2366 self.make_test_archive(f, compression) 2367 2368 # Read the ZIP archive 2369 with zipfile.ZipFile(f, "r", compression) as zipfp: 2370 zipdata1 = [] 2371 with zipfp.open(TESTFN) as zipopen1: 2372 while True: 2373 read_data = zipopen1.read(randint(1, 1024)) 2374 if not read_data: 2375 break 2376 zipdata1.append(read_data) 2377 2378 testdata = b''.join(zipdata1) 2379 self.assertEqual(len(testdata), len(self.data)) 2380 self.assertEqual(testdata, self.data) 2381 2382 def test_random_open(self): 2383 for f in get_files(self): 2384 self.zip_random_open_test(f, self.compression) 2385 2386 2387class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2388 unittest.TestCase): 2389 compression = zipfile.ZIP_STORED 2390 2391@requires_zlib() 2392class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2393 unittest.TestCase): 2394 compression = zipfile.ZIP_DEFLATED 2395 2396@requires_bz2() 2397class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2398 unittest.TestCase): 2399 compression = zipfile.ZIP_BZIP2 2400 2401@requires_lzma() 2402class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2403 unittest.TestCase): 2404 compression = zipfile.ZIP_LZMA 2405 2406 2407# Provide the tell() method but not seek() 2408class Tellable: 2409 def __init__(self, fp): 2410 self.fp = fp 2411 self.offset = 0 2412 2413 def write(self, data): 2414 n = self.fp.write(data) 2415 self.offset += n 2416 return n 2417 2418 def tell(self): 2419 return self.offset 2420 2421 def flush(self): 2422 self.fp.flush() 2423 2424class Unseekable: 2425 def __init__(self, fp): 2426 self.fp = fp 2427 2428 def write(self, data): 2429 return self.fp.write(data) 2430 2431 def flush(self): 2432 self.fp.flush() 2433 2434class UnseekableTests(unittest.TestCase): 2435 def test_writestr(self): 2436 for wrapper in (lambda f: f), Tellable, Unseekable: 2437 with self.subTest(wrapper=wrapper): 2438 f = io.BytesIO() 2439 f.write(b'abc') 2440 bf = io.BufferedWriter(f) 2441 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2442 zipfp.writestr('ones', b'111') 2443 zipfp.writestr('twos', b'222') 2444 self.assertEqual(f.getvalue()[:5], b'abcPK') 2445 with zipfile.ZipFile(f, mode='r') as zipf: 2446 with zipf.open('ones') as zopen: 2447 self.assertEqual(zopen.read(), b'111') 2448 with zipf.open('twos') as zopen: 2449 self.assertEqual(zopen.read(), b'222') 2450 2451 def test_write(self): 2452 for wrapper in (lambda f: f), Tellable, Unseekable: 2453 with self.subTest(wrapper=wrapper): 2454 f = io.BytesIO() 2455 f.write(b'abc') 2456 bf = io.BufferedWriter(f) 2457 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2458 self.addCleanup(unlink, TESTFN) 2459 with open(TESTFN, 'wb') as f2: 2460 f2.write(b'111') 2461 zipfp.write(TESTFN, 'ones') 2462 with open(TESTFN, 'wb') as f2: 2463 f2.write(b'222') 2464 zipfp.write(TESTFN, 'twos') 2465 self.assertEqual(f.getvalue()[:5], b'abcPK') 2466 with zipfile.ZipFile(f, mode='r') as zipf: 2467 with zipf.open('ones') as zopen: 2468 self.assertEqual(zopen.read(), b'111') 2469 with zipf.open('twos') as zopen: 2470 self.assertEqual(zopen.read(), b'222') 2471 2472 def test_open_write(self): 2473 for wrapper in (lambda f: f), Tellable, Unseekable: 2474 with self.subTest(wrapper=wrapper): 2475 f = io.BytesIO() 2476 f.write(b'abc') 2477 bf = io.BufferedWriter(f) 2478 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2479 with zipf.open('ones', 'w') as zopen: 2480 zopen.write(b'111') 2481 with zipf.open('twos', 'w') as zopen: 2482 zopen.write(b'222') 2483 self.assertEqual(f.getvalue()[:5], b'abcPK') 2484 with zipfile.ZipFile(f) as zipf: 2485 self.assertEqual(zipf.read('ones'), b'111') 2486 self.assertEqual(zipf.read('twos'), b'222') 2487 2488 2489@requires_zlib() 2490class TestsWithMultipleOpens(unittest.TestCase): 2491 @classmethod 2492 def setUpClass(cls): 2493 cls.data1 = b'111' + randbytes(10000) 2494 cls.data2 = b'222' + randbytes(10000) 2495 2496 def make_test_archive(self, f): 2497 # Create the ZIP archive 2498 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2499 zipfp.writestr('ones', self.data1) 2500 zipfp.writestr('twos', self.data2) 2501 2502 def test_same_file(self): 2503 # Verify that (when the ZipFile is in control of creating file objects) 2504 # multiple open() calls can be made without interfering with each other. 2505 for f in get_files(self): 2506 self.make_test_archive(f) 2507 with zipfile.ZipFile(f, mode="r") as zipf: 2508 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2509 data1 = zopen1.read(500) 2510 data2 = zopen2.read(500) 2511 data1 += zopen1.read() 2512 data2 += zopen2.read() 2513 self.assertEqual(data1, data2) 2514 self.assertEqual(data1, self.data1) 2515 2516 def test_different_file(self): 2517 # Verify that (when the ZipFile is in control of creating file objects) 2518 # multiple open() calls can be made without interfering with each other. 2519 for f in get_files(self): 2520 self.make_test_archive(f) 2521 with zipfile.ZipFile(f, mode="r") as zipf: 2522 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2523 data1 = zopen1.read(500) 2524 data2 = zopen2.read(500) 2525 data1 += zopen1.read() 2526 data2 += zopen2.read() 2527 self.assertEqual(data1, self.data1) 2528 self.assertEqual(data2, self.data2) 2529 2530 def test_interleaved(self): 2531 # Verify that (when the ZipFile is in control of creating file objects) 2532 # multiple open() calls can be made without interfering with each other. 2533 for f in get_files(self): 2534 self.make_test_archive(f) 2535 with zipfile.ZipFile(f, mode="r") as zipf: 2536 with zipf.open('ones') as zopen1: 2537 data1 = zopen1.read(500) 2538 with zipf.open('twos') as zopen2: 2539 data2 = zopen2.read(500) 2540 data1 += zopen1.read() 2541 data2 += zopen2.read() 2542 self.assertEqual(data1, self.data1) 2543 self.assertEqual(data2, self.data2) 2544 2545 def test_read_after_close(self): 2546 for f in get_files(self): 2547 self.make_test_archive(f) 2548 with contextlib.ExitStack() as stack: 2549 with zipfile.ZipFile(f, 'r') as zipf: 2550 zopen1 = stack.enter_context(zipf.open('ones')) 2551 zopen2 = stack.enter_context(zipf.open('twos')) 2552 data1 = zopen1.read(500) 2553 data2 = zopen2.read(500) 2554 data1 += zopen1.read() 2555 data2 += zopen2.read() 2556 self.assertEqual(data1, self.data1) 2557 self.assertEqual(data2, self.data2) 2558 2559 def test_read_after_write(self): 2560 for f in get_files(self): 2561 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2562 zipf.writestr('ones', self.data1) 2563 zipf.writestr('twos', self.data2) 2564 with zipf.open('ones') as zopen1: 2565 data1 = zopen1.read(500) 2566 self.assertEqual(data1, self.data1[:500]) 2567 with zipfile.ZipFile(f, 'r') as zipf: 2568 data1 = zipf.read('ones') 2569 data2 = zipf.read('twos') 2570 self.assertEqual(data1, self.data1) 2571 self.assertEqual(data2, self.data2) 2572 2573 def test_write_after_read(self): 2574 for f in get_files(self): 2575 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2576 zipf.writestr('ones', self.data1) 2577 with zipf.open('ones') as zopen1: 2578 zopen1.read(500) 2579 zipf.writestr('twos', self.data2) 2580 with zipfile.ZipFile(f, 'r') as zipf: 2581 data1 = zipf.read('ones') 2582 data2 = zipf.read('twos') 2583 self.assertEqual(data1, self.data1) 2584 self.assertEqual(data2, self.data2) 2585 2586 def test_many_opens(self): 2587 # Verify that read() and open() promptly close the file descriptor, 2588 # and don't rely on the garbage collector to free resources. 2589 self.make_test_archive(TESTFN2) 2590 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2591 for x in range(100): 2592 zipf.read('ones') 2593 with zipf.open('ones') as zopen1: 2594 pass 2595 with open(os.devnull, "rb") as f: 2596 self.assertLess(f.fileno(), 100) 2597 2598 def test_write_while_reading(self): 2599 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2600 zipf.writestr('ones', self.data1) 2601 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2602 with zipf.open('ones', 'r') as r1: 2603 data1 = r1.read(500) 2604 with zipf.open('twos', 'w') as w1: 2605 w1.write(self.data2) 2606 data1 += r1.read() 2607 self.assertEqual(data1, self.data1) 2608 with zipfile.ZipFile(TESTFN2) as zipf: 2609 self.assertEqual(zipf.read('twos'), self.data2) 2610 2611 def tearDown(self): 2612 unlink(TESTFN2) 2613 2614 2615class TestWithDirectory(unittest.TestCase): 2616 def setUp(self): 2617 os.mkdir(TESTFN2) 2618 2619 def test_extract_dir(self): 2620 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2621 zipf.extractall(TESTFN2) 2622 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2623 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2624 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2625 2626 def test_bug_6050(self): 2627 # Extraction should succeed if directories already exist 2628 os.mkdir(os.path.join(TESTFN2, "a")) 2629 self.test_extract_dir() 2630 2631 def test_write_dir(self): 2632 dirpath = os.path.join(TESTFN2, "x") 2633 os.mkdir(dirpath) 2634 mode = os.stat(dirpath).st_mode & 0xFFFF 2635 with zipfile.ZipFile(TESTFN, "w") as zipf: 2636 zipf.write(dirpath) 2637 zinfo = zipf.filelist[0] 2638 self.assertTrue(zinfo.filename.endswith("/x/")) 2639 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2640 zipf.write(dirpath, "y") 2641 zinfo = zipf.filelist[1] 2642 self.assertTrue(zinfo.filename, "y/") 2643 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2644 with zipfile.ZipFile(TESTFN, "r") as zipf: 2645 zinfo = zipf.filelist[0] 2646 self.assertTrue(zinfo.filename.endswith("/x/")) 2647 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2648 zinfo = zipf.filelist[1] 2649 self.assertTrue(zinfo.filename, "y/") 2650 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2651 target = os.path.join(TESTFN2, "target") 2652 os.mkdir(target) 2653 zipf.extractall(target) 2654 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2655 self.assertEqual(len(os.listdir(target)), 2) 2656 2657 def test_writestr_dir(self): 2658 os.mkdir(os.path.join(TESTFN2, "x")) 2659 with zipfile.ZipFile(TESTFN, "w") as zipf: 2660 zipf.writestr("x/", b'') 2661 zinfo = zipf.filelist[0] 2662 self.assertEqual(zinfo.filename, "x/") 2663 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2664 with zipfile.ZipFile(TESTFN, "r") as zipf: 2665 zinfo = zipf.filelist[0] 2666 self.assertTrue(zinfo.filename.endswith("x/")) 2667 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2668 target = os.path.join(TESTFN2, "target") 2669 os.mkdir(target) 2670 zipf.extractall(target) 2671 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2672 self.assertEqual(os.listdir(target), ["x"]) 2673 2674 def tearDown(self): 2675 rmtree(TESTFN2) 2676 if os.path.exists(TESTFN): 2677 unlink(TESTFN) 2678 2679 2680class ZipInfoTests(unittest.TestCase): 2681 def test_from_file(self): 2682 zi = zipfile.ZipInfo.from_file(__file__) 2683 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2684 self.assertFalse(zi.is_dir()) 2685 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2686 2687 def test_from_file_pathlike(self): 2688 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2689 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2690 self.assertFalse(zi.is_dir()) 2691 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2692 2693 def test_from_file_bytes(self): 2694 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2695 self.assertEqual(posixpath.basename(zi.filename), 'test') 2696 self.assertFalse(zi.is_dir()) 2697 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2698 2699 def test_from_file_fileno(self): 2700 with open(__file__, 'rb') as f: 2701 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2702 self.assertEqual(posixpath.basename(zi.filename), 'test') 2703 self.assertFalse(zi.is_dir()) 2704 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2705 2706 def test_from_dir(self): 2707 dirpath = os.path.dirname(os.path.abspath(__file__)) 2708 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2709 self.assertEqual(zi.filename, 'stdlib_tests/') 2710 self.assertTrue(zi.is_dir()) 2711 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2712 self.assertEqual(zi.file_size, 0) 2713 2714 2715class CommandLineTest(unittest.TestCase): 2716 2717 def zipfilecmd(self, *args, **kwargs): 2718 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2719 **kwargs) 2720 return out.replace(os.linesep.encode(), b'\n') 2721 2722 def zipfilecmd_failure(self, *args): 2723 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2724 2725 def test_bad_use(self): 2726 rc, out, err = self.zipfilecmd_failure() 2727 self.assertEqual(out, b'') 2728 self.assertIn(b'usage', err.lower()) 2729 self.assertIn(b'error', err.lower()) 2730 self.assertIn(b'required', err.lower()) 2731 rc, out, err = self.zipfilecmd_failure('-l', '') 2732 self.assertEqual(out, b'') 2733 self.assertNotEqual(err.strip(), b'') 2734 2735 def test_test_command(self): 2736 zip_name = findfile('zipdir.zip') 2737 for opt in '-t', '--test': 2738 out = self.zipfilecmd(opt, zip_name) 2739 self.assertEqual(out.rstrip(), b'Done testing') 2740 zip_name = findfile('testtar.tar') 2741 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2742 self.assertEqual(out, b'') 2743 2744 def test_list_command(self): 2745 zip_name = findfile('zipdir.zip') 2746 t = io.StringIO() 2747 with zipfile.ZipFile(zip_name, 'r') as tf: 2748 tf.printdir(t) 2749 expected = t.getvalue().encode('ascii', 'backslashreplace') 2750 for opt in '-l', '--list': 2751 out = self.zipfilecmd(opt, zip_name, 2752 PYTHONIOENCODING='ascii:backslashreplace') 2753 self.assertEqual(out, expected) 2754 2755 @requires_zlib() 2756 def test_create_command(self): 2757 self.addCleanup(unlink, TESTFN) 2758 with open(TESTFN, 'w', encoding='utf-8') as f: 2759 f.write('test 1') 2760 os.mkdir(TESTFNDIR) 2761 self.addCleanup(rmtree, TESTFNDIR) 2762 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 2763 f.write('test 2') 2764 files = [TESTFN, TESTFNDIR] 2765 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2766 for opt in '-c', '--create': 2767 try: 2768 out = self.zipfilecmd(opt, TESTFN2, *files) 2769 self.assertEqual(out, b'') 2770 with zipfile.ZipFile(TESTFN2) as zf: 2771 self.assertEqual(zf.namelist(), namelist) 2772 self.assertEqual(zf.read(namelist[0]), b'test 1') 2773 self.assertEqual(zf.read(namelist[2]), b'test 2') 2774 finally: 2775 unlink(TESTFN2) 2776 2777 def test_extract_command(self): 2778 zip_name = findfile('zipdir.zip') 2779 for opt in '-e', '--extract': 2780 with temp_dir() as extdir: 2781 out = self.zipfilecmd(opt, zip_name, extdir) 2782 self.assertEqual(out, b'') 2783 with zipfile.ZipFile(zip_name) as zf: 2784 for zi in zf.infolist(): 2785 path = os.path.join(extdir, 2786 zi.filename.replace('/', os.sep)) 2787 if zi.is_dir(): 2788 self.assertTrue(os.path.isdir(path)) 2789 else: 2790 self.assertTrue(os.path.isfile(path)) 2791 with open(path, 'rb') as f: 2792 self.assertEqual(f.read(), zf.read(zi)) 2793 2794 2795class TestExecutablePrependedZip(unittest.TestCase): 2796 """Test our ability to open zip files with an executable prepended.""" 2797 2798 def setUp(self): 2799 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2800 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2801 2802 def _test_zip_works(self, name): 2803 # bpo28494 sanity check: ensure is_zipfile works on these. 2804 self.assertTrue(zipfile.is_zipfile(name), 2805 f'is_zipfile failed on {name}') 2806 # Ensure we can operate on these via ZipFile. 2807 with zipfile.ZipFile(name) as zipfp: 2808 for n in zipfp.namelist(): 2809 data = zipfp.read(n) 2810 self.assertIn(b'FAVORITE_NUMBER', data) 2811 2812 def test_read_zip_with_exe_prepended(self): 2813 self._test_zip_works(self.exe_zip) 2814 2815 def test_read_zip64_with_exe_prepended(self): 2816 self._test_zip_works(self.exe_zip64) 2817 2818 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2819 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2820 'Test relies on #!/bin/bash working.') 2821 def test_execute_zip2(self): 2822 output = subprocess.check_output([self.exe_zip, sys.executable]) 2823 self.assertIn(b'number in executable: 5', output) 2824 2825 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2826 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2827 'Test relies on #!/bin/bash working.') 2828 def test_execute_zip64(self): 2829 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2830 self.assertIn(b'number in executable: 5', output) 2831 2832 2833# Poor man's technique to consume a (smallish) iterable. 2834consume = tuple 2835 2836 2837# from jaraco.itertools 5.0 2838class jaraco: 2839 class itertools: 2840 class Counter: 2841 def __init__(self, i): 2842 self.count = 0 2843 self._orig_iter = iter(i) 2844 2845 def __iter__(self): 2846 return self 2847 2848 def __next__(self): 2849 result = next(self._orig_iter) 2850 self.count += 1 2851 return result 2852 2853 2854def add_dirs(zf): 2855 """ 2856 Given a writable zip file zf, inject directory entries for 2857 any directories implied by the presence of children. 2858 """ 2859 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2860 zf.writestr(name, b"") 2861 return zf 2862 2863 2864def build_alpharep_fixture(): 2865 """ 2866 Create a zip file with this structure: 2867 2868 . 2869 ├── a.txt 2870 ├── b 2871 │ ├── c.txt 2872 │ ├── d 2873 │ │ └── e.txt 2874 │ └── f.txt 2875 └── g 2876 └── h 2877 └── i.txt 2878 2879 This fixture has the following key characteristics: 2880 2881 - a file at the root (a) 2882 - a file two levels deep (b/d/e) 2883 - multiple files in a directory (b/c, b/f) 2884 - a directory containing only a directory (g/h) 2885 2886 "alpha" because it uses alphabet 2887 "rep" because it's a representative example 2888 """ 2889 data = io.BytesIO() 2890 zf = zipfile.ZipFile(data, "w") 2891 zf.writestr("a.txt", b"content of a") 2892 zf.writestr("b/c.txt", b"content of c") 2893 zf.writestr("b/d/e.txt", b"content of e") 2894 zf.writestr("b/f.txt", b"content of f") 2895 zf.writestr("g/h/i.txt", b"content of i") 2896 zf.filename = "alpharep.zip" 2897 return zf 2898 2899 2900def pass_alpharep(meth): 2901 """ 2902 Given a method, wrap it in a for loop that invokes method 2903 with each subtest. 2904 """ 2905 2906 @functools.wraps(meth) 2907 def wrapper(self): 2908 for alpharep in self.zipfile_alpharep(): 2909 meth(self, alpharep=alpharep) 2910 2911 return wrapper 2912 2913 2914class TestPath(unittest.TestCase): 2915 def setUp(self): 2916 self.fixtures = contextlib.ExitStack() 2917 self.addCleanup(self.fixtures.close) 2918 2919 def zipfile_alpharep(self): 2920 with self.subTest(): 2921 yield build_alpharep_fixture() 2922 with self.subTest(): 2923 yield add_dirs(build_alpharep_fixture()) 2924 2925 def zipfile_ondisk(self, alpharep): 2926 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2927 buffer = alpharep.fp 2928 alpharep.close() 2929 path = tmpdir / alpharep.filename 2930 with path.open("wb") as strm: 2931 strm.write(buffer.getvalue()) 2932 return path 2933 2934 @pass_alpharep 2935 def test_iterdir_and_types(self, alpharep): 2936 root = zipfile.Path(alpharep) 2937 assert root.is_dir() 2938 a, b, g = root.iterdir() 2939 assert a.is_file() 2940 assert b.is_dir() 2941 assert g.is_dir() 2942 c, f, d = b.iterdir() 2943 assert c.is_file() and f.is_file() 2944 (e,) = d.iterdir() 2945 assert e.is_file() 2946 (h,) = g.iterdir() 2947 (i,) = h.iterdir() 2948 assert i.is_file() 2949 2950 @pass_alpharep 2951 def test_is_file_missing(self, alpharep): 2952 root = zipfile.Path(alpharep) 2953 assert not root.joinpath('missing.txt').is_file() 2954 2955 @pass_alpharep 2956 def test_iterdir_on_file(self, alpharep): 2957 root = zipfile.Path(alpharep) 2958 a, b, g = root.iterdir() 2959 with self.assertRaises(ValueError): 2960 a.iterdir() 2961 2962 @pass_alpharep 2963 def test_subdir_is_dir(self, alpharep): 2964 root = zipfile.Path(alpharep) 2965 assert (root / 'b').is_dir() 2966 assert (root / 'b/').is_dir() 2967 assert (root / 'g').is_dir() 2968 assert (root / 'g/').is_dir() 2969 2970 @pass_alpharep 2971 def test_open(self, alpharep): 2972 root = zipfile.Path(alpharep) 2973 a, b, g = root.iterdir() 2974 with a.open(encoding="utf-8") as strm: 2975 data = strm.read() 2976 assert data == "content of a" 2977 2978 def test_open_write(self): 2979 """ 2980 If the zipfile is open for write, it should be possible to 2981 write bytes or text to it. 2982 """ 2983 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 2984 with zf.joinpath('file.bin').open('wb') as strm: 2985 strm.write(b'binary contents') 2986 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 2987 strm.write('text file') 2988 2989 def test_open_extant_directory(self): 2990 """ 2991 Attempting to open a directory raises IsADirectoryError. 2992 """ 2993 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2994 with self.assertRaises(IsADirectoryError): 2995 zf.joinpath('b').open() 2996 2997 @pass_alpharep 2998 def test_open_binary_invalid_args(self, alpharep): 2999 root = zipfile.Path(alpharep) 3000 with self.assertRaises(ValueError): 3001 root.joinpath('a.txt').open('rb', encoding='utf-8') 3002 with self.assertRaises(ValueError): 3003 root.joinpath('a.txt').open('rb', 'utf-8') 3004 3005 def test_open_missing_directory(self): 3006 """ 3007 Attempting to open a missing directory raises FileNotFoundError. 3008 """ 3009 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 3010 with self.assertRaises(FileNotFoundError): 3011 zf.joinpath('z').open() 3012 3013 @pass_alpharep 3014 def test_read(self, alpharep): 3015 root = zipfile.Path(alpharep) 3016 a, b, g = root.iterdir() 3017 assert a.read_text(encoding="utf-8") == "content of a" 3018 assert a.read_bytes() == b"content of a" 3019 3020 @pass_alpharep 3021 def test_joinpath(self, alpharep): 3022 root = zipfile.Path(alpharep) 3023 a = root.joinpath("a.txt") 3024 assert a.is_file() 3025 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 3026 assert e.read_text(encoding="utf-8") == "content of e" 3027 3028 @pass_alpharep 3029 def test_joinpath_multiple(self, alpharep): 3030 root = zipfile.Path(alpharep) 3031 e = root.joinpath("b", "d", "e.txt") 3032 assert e.read_text(encoding="utf-8") == "content of e" 3033 3034 @pass_alpharep 3035 def test_traverse_truediv(self, alpharep): 3036 root = zipfile.Path(alpharep) 3037 a = root / "a.txt" 3038 assert a.is_file() 3039 e = root / "b" / "d" / "e.txt" 3040 assert e.read_text(encoding="utf-8") == "content of e" 3041 3042 @pass_alpharep 3043 def test_traverse_simplediv(self, alpharep): 3044 """ 3045 Disable the __future__.division when testing traversal. 3046 """ 3047 code = compile( 3048 source="zipfile.Path(alpharep) / 'a'", 3049 filename="(test)", 3050 mode="eval", 3051 dont_inherit=True, 3052 ) 3053 eval(code) 3054 3055 @pass_alpharep 3056 def test_pathlike_construction(self, alpharep): 3057 """ 3058 zipfile.Path should be constructable from a path-like object 3059 """ 3060 zipfile_ondisk = self.zipfile_ondisk(alpharep) 3061 pathlike = pathlib.Path(str(zipfile_ondisk)) 3062 zipfile.Path(pathlike) 3063 3064 @pass_alpharep 3065 def test_traverse_pathlike(self, alpharep): 3066 root = zipfile.Path(alpharep) 3067 root / pathlib.Path("a") 3068 3069 @pass_alpharep 3070 def test_parent(self, alpharep): 3071 root = zipfile.Path(alpharep) 3072 assert (root / 'a').parent.at == '' 3073 assert (root / 'a' / 'b').parent.at == 'a/' 3074 3075 @pass_alpharep 3076 def test_dir_parent(self, alpharep): 3077 root = zipfile.Path(alpharep) 3078 assert (root / 'b').parent.at == '' 3079 assert (root / 'b/').parent.at == '' 3080 3081 @pass_alpharep 3082 def test_missing_dir_parent(self, alpharep): 3083 root = zipfile.Path(alpharep) 3084 assert (root / 'missing dir/').parent.at == '' 3085 3086 @pass_alpharep 3087 def test_mutability(self, alpharep): 3088 """ 3089 If the underlying zipfile is changed, the Path object should 3090 reflect that change. 3091 """ 3092 root = zipfile.Path(alpharep) 3093 a, b, g = root.iterdir() 3094 alpharep.writestr('foo.txt', 'foo') 3095 alpharep.writestr('bar/baz.txt', 'baz') 3096 assert any(child.name == 'foo.txt' for child in root.iterdir()) 3097 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 3098 (baz,) = (root / 'bar').iterdir() 3099 assert baz.read_text(encoding="utf-8") == 'baz' 3100 3101 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 3102 3103 def huge_zipfile(self): 3104 """Create a read-only zipfile with a huge number of entries entries.""" 3105 strm = io.BytesIO() 3106 zf = zipfile.ZipFile(strm, "w") 3107 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 3108 zf.writestr(entry, entry) 3109 zf.mode = 'r' 3110 return zf 3111 3112 def test_joinpath_constant_time(self): 3113 """ 3114 Ensure joinpath on items in zipfile is linear time. 3115 """ 3116 root = zipfile.Path(self.huge_zipfile()) 3117 entries = jaraco.itertools.Counter(root.iterdir()) 3118 for entry in entries: 3119 entry.joinpath('suffix') 3120 # Check the file iterated all items 3121 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 3122 3123 # @func_timeout.func_set_timeout(3) 3124 def test_implied_dirs_performance(self): 3125 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 3126 zipfile.CompleteDirs._implied_dirs(data) 3127 3128 @pass_alpharep 3129 def test_read_does_not_close(self, alpharep): 3130 alpharep = self.zipfile_ondisk(alpharep) 3131 with zipfile.ZipFile(alpharep) as file: 3132 for rep in range(2): 3133 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 3134 3135 @pass_alpharep 3136 def test_subclass(self, alpharep): 3137 class Subclass(zipfile.Path): 3138 pass 3139 3140 root = Subclass(alpharep) 3141 assert isinstance(root / 'b', Subclass) 3142 3143 @pass_alpharep 3144 def test_filename(self, alpharep): 3145 root = zipfile.Path(alpharep) 3146 assert root.filename == pathlib.Path('alpharep.zip') 3147 3148 @pass_alpharep 3149 def test_root_name(self, alpharep): 3150 """ 3151 The name of the root should be the name of the zipfile 3152 """ 3153 root = zipfile.Path(alpharep) 3154 assert root.name == 'alpharep.zip' == root.filename.name 3155 3156 @pass_alpharep 3157 def test_root_parent(self, alpharep): 3158 root = zipfile.Path(alpharep) 3159 assert root.parent == pathlib.Path('.') 3160 root.root.filename = 'foo/bar.zip' 3161 assert root.parent == pathlib.Path('foo') 3162 3163 @pass_alpharep 3164 def test_root_unnamed(self, alpharep): 3165 """ 3166 It is an error to attempt to get the name 3167 or parent of an unnamed zipfile. 3168 """ 3169 alpharep.filename = None 3170 root = zipfile.Path(alpharep) 3171 with self.assertRaises(TypeError): 3172 root.name 3173 with self.assertRaises(TypeError): 3174 root.parent 3175 3176 # .name and .parent should still work on subs 3177 sub = root / "b" 3178 assert sub.name == "b" 3179 assert sub.parent 3180 3181 @pass_alpharep 3182 def test_inheritance(self, alpharep): 3183 cls = type('PathChild', (zipfile.Path,), {}) 3184 for alpharep in self.zipfile_alpharep(): 3185 file = cls(alpharep).joinpath('some dir').parent 3186 assert isinstance(file, cls) 3187 3188 3189if __name__ == "__main__": 3190 unittest.main() 3191