1import array 2import contextlib 3import importlib.util 4import io 5import itertools 6import os 7import posixpath 8import struct 9import subprocess 10import sys 11import time 12import unittest 13import unittest.mock as mock 14import zipfile 15 16 17from tempfile import TemporaryFile 18from random import randint, random, randbytes 19 20from test import archiver_tests 21from test.support import script_helper 22from test.support import ( 23 findfile, requires_zlib, requires_bz2, requires_lzma, 24 captured_stdout, captured_stderr, requires_subprocess 25) 26from test.support.os_helper import ( 27 TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count, FakePath 28) 29 30 31TESTFN2 = TESTFN + "2" 32TESTFNDIR = TESTFN + "d" 33FIXEDTEST_SIZE = 1000 34DATAFILES_DIR = 'zipfile_datafiles' 35 36SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 37 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 38 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 39 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 40 41def get_files(test): 42 yield TESTFN2 43 with TemporaryFile() as f: 44 yield f 45 test.assertFalse(f.closed) 46 with io.BytesIO() as f: 47 yield f 48 test.assertFalse(f.closed) 49 50class AbstractTestsWithSourceFile: 51 @classmethod 52 def setUpClass(cls): 53 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 54 (i, random()), "ascii") 55 for i in range(FIXEDTEST_SIZE)] 56 cls.data = b''.join(cls.line_gen) 57 58 def setUp(self): 59 # Make a source file with some lines 60 with open(TESTFN, "wb") as fp: 61 fp.write(self.data) 62 63 def make_test_archive(self, f, compression, compresslevel=None): 64 kwargs = {'compression': compression, 'compresslevel': compresslevel} 65 # Create the ZIP archive 66 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 67 zipfp.write(TESTFN, "another.name") 68 zipfp.write(TESTFN, TESTFN) 69 zipfp.writestr("strfile", self.data) 70 with zipfp.open('written-open-w', mode='w') as f: 71 for line in self.line_gen: 72 f.write(line) 73 74 def zip_test(self, f, compression, compresslevel=None): 75 self.make_test_archive(f, compression, compresslevel) 76 77 # Read the ZIP archive 78 with zipfile.ZipFile(f, "r", compression) as zipfp: 79 self.assertEqual(zipfp.read(TESTFN), self.data) 80 self.assertEqual(zipfp.read("another.name"), self.data) 81 self.assertEqual(zipfp.read("strfile"), self.data) 82 83 # Print the ZIP directory 84 fp = io.StringIO() 85 zipfp.printdir(file=fp) 86 directory = fp.getvalue() 87 lines = directory.splitlines() 88 self.assertEqual(len(lines), 5) # Number of files + header 89 90 self.assertIn('File Name', lines[0]) 91 self.assertIn('Modified', lines[0]) 92 self.assertIn('Size', lines[0]) 93 94 fn, date, time_, size = lines[1].split() 95 self.assertEqual(fn, 'another.name') 96 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 97 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 98 self.assertEqual(size, str(len(self.data))) 99 100 # Check the namelist 101 names = zipfp.namelist() 102 self.assertEqual(len(names), 4) 103 self.assertIn(TESTFN, names) 104 self.assertIn("another.name", names) 105 self.assertIn("strfile", names) 106 self.assertIn("written-open-w", names) 107 108 # Check infolist 109 infos = zipfp.infolist() 110 names = [i.filename for i in infos] 111 self.assertEqual(len(names), 4) 112 self.assertIn(TESTFN, names) 113 self.assertIn("another.name", names) 114 self.assertIn("strfile", names) 115 self.assertIn("written-open-w", names) 116 for i in infos: 117 self.assertEqual(i.file_size, len(self.data)) 118 119 # check getinfo 120 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 121 info = zipfp.getinfo(nm) 122 self.assertEqual(info.filename, nm) 123 self.assertEqual(info.file_size, len(self.data)) 124 125 # Check that testzip thinks the archive is ok 126 # (it returns None if all contents could be read properly) 127 self.assertIsNone(zipfp.testzip()) 128 129 def test_basic(self): 130 for f in get_files(self): 131 self.zip_test(f, self.compression) 132 133 def zip_open_test(self, f, compression): 134 self.make_test_archive(f, compression) 135 136 # Read the ZIP archive 137 with zipfile.ZipFile(f, "r", compression) as zipfp: 138 zipdata1 = [] 139 with zipfp.open(TESTFN) as zipopen1: 140 while True: 141 read_data = zipopen1.read(256) 142 if not read_data: 143 break 144 zipdata1.append(read_data) 145 146 zipdata2 = [] 147 with zipfp.open("another.name") as zipopen2: 148 while True: 149 read_data = zipopen2.read(256) 150 if not read_data: 151 break 152 zipdata2.append(read_data) 153 154 self.assertEqual(b''.join(zipdata1), self.data) 155 self.assertEqual(b''.join(zipdata2), self.data) 156 157 def test_open(self): 158 for f in get_files(self): 159 self.zip_open_test(f, self.compression) 160 161 def test_open_with_pathlike(self): 162 path = FakePath(TESTFN2) 163 self.zip_open_test(path, self.compression) 164 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 165 self.assertIsInstance(zipfp.filename, str) 166 167 def zip_random_open_test(self, f, compression): 168 self.make_test_archive(f, compression) 169 170 # Read the ZIP archive 171 with zipfile.ZipFile(f, "r", compression) as zipfp: 172 zipdata1 = [] 173 with zipfp.open(TESTFN) as zipopen1: 174 while True: 175 read_data = zipopen1.read(randint(1, 1024)) 176 if not read_data: 177 break 178 zipdata1.append(read_data) 179 180 self.assertEqual(b''.join(zipdata1), self.data) 181 182 def test_random_open(self): 183 for f in get_files(self): 184 self.zip_random_open_test(f, self.compression) 185 186 def zip_read1_test(self, f, compression): 187 self.make_test_archive(f, compression) 188 189 # Read the ZIP archive 190 with zipfile.ZipFile(f, "r") as zipfp, \ 191 zipfp.open(TESTFN) as zipopen: 192 zipdata = [] 193 while True: 194 read_data = zipopen.read1(-1) 195 if not read_data: 196 break 197 zipdata.append(read_data) 198 199 self.assertEqual(b''.join(zipdata), self.data) 200 201 def test_read1(self): 202 for f in get_files(self): 203 self.zip_read1_test(f, self.compression) 204 205 def zip_read1_10_test(self, f, compression): 206 self.make_test_archive(f, compression) 207 208 # Read the ZIP archive 209 with zipfile.ZipFile(f, "r") as zipfp, \ 210 zipfp.open(TESTFN) as zipopen: 211 zipdata = [] 212 while True: 213 read_data = zipopen.read1(10) 214 self.assertLessEqual(len(read_data), 10) 215 if not read_data: 216 break 217 zipdata.append(read_data) 218 219 self.assertEqual(b''.join(zipdata), self.data) 220 221 def test_read1_10(self): 222 for f in get_files(self): 223 self.zip_read1_10_test(f, self.compression) 224 225 def zip_readline_read_test(self, f, compression): 226 self.make_test_archive(f, compression) 227 228 # Read the ZIP archive 229 with zipfile.ZipFile(f, "r") as zipfp, \ 230 zipfp.open(TESTFN) as zipopen: 231 data = b'' 232 while True: 233 read = zipopen.readline() 234 if not read: 235 break 236 data += read 237 238 read = zipopen.read(100) 239 if not read: 240 break 241 data += read 242 243 self.assertEqual(data, self.data) 244 245 def test_readline_read(self): 246 # Issue #7610: calls to readline() interleaved with calls to read(). 247 for f in get_files(self): 248 self.zip_readline_read_test(f, self.compression) 249 250 def zip_readline_test(self, f, compression): 251 self.make_test_archive(f, compression) 252 253 # Read the ZIP archive 254 with zipfile.ZipFile(f, "r") as zipfp: 255 with zipfp.open(TESTFN) as zipopen: 256 for line in self.line_gen: 257 linedata = zipopen.readline() 258 self.assertEqual(linedata, line) 259 260 def test_readline(self): 261 for f in get_files(self): 262 self.zip_readline_test(f, self.compression) 263 264 def zip_readlines_test(self, f, compression): 265 self.make_test_archive(f, compression) 266 267 # Read the ZIP archive 268 with zipfile.ZipFile(f, "r") as zipfp: 269 with zipfp.open(TESTFN) as zipopen: 270 ziplines = zipopen.readlines() 271 for line, zipline in zip(self.line_gen, ziplines): 272 self.assertEqual(zipline, line) 273 274 def test_readlines(self): 275 for f in get_files(self): 276 self.zip_readlines_test(f, self.compression) 277 278 def zip_iterlines_test(self, f, compression): 279 self.make_test_archive(f, compression) 280 281 # Read the ZIP archive 282 with zipfile.ZipFile(f, "r") as zipfp: 283 with zipfp.open(TESTFN) as zipopen: 284 for line, zipline in zip(self.line_gen, zipopen): 285 self.assertEqual(zipline, line) 286 287 def test_iterlines(self): 288 for f in get_files(self): 289 self.zip_iterlines_test(f, self.compression) 290 291 def test_low_compression(self): 292 """Check for cases where compressed data is larger than original.""" 293 # Create the ZIP archive 294 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 295 zipfp.writestr("strfile", '12') 296 297 # Get an open object for strfile 298 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 299 with zipfp.open("strfile") as openobj: 300 self.assertEqual(openobj.read(1), b'1') 301 self.assertEqual(openobj.read(1), b'2') 302 303 def test_writestr_compression(self): 304 zipfp = zipfile.ZipFile(TESTFN2, "w") 305 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 306 info = zipfp.getinfo('b.txt') 307 self.assertEqual(info.compress_type, self.compression) 308 309 def test_writestr_compresslevel(self): 310 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 311 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 312 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 313 compresslevel=2) 314 315 # Compression level follows the constructor. 316 a_info = zipfp.getinfo('a.txt') 317 self.assertEqual(a_info.compress_type, self.compression) 318 self.assertEqual(a_info.compress_level, 1) 319 320 # Compression level is overridden. 321 b_info = zipfp.getinfo('b.txt') 322 self.assertEqual(b_info.compress_type, self.compression) 323 self.assertEqual(b_info._compresslevel, 2) 324 325 def test_read_return_size(self): 326 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 327 # than requested. 328 for test_size in (1, 4095, 4096, 4097, 16384): 329 file_size = test_size + 1 330 junk = randbytes(file_size) 331 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 332 zipf.writestr('foo', junk) 333 with zipf.open('foo', 'r') as fp: 334 buf = fp.read(test_size) 335 self.assertEqual(len(buf), test_size) 336 337 def test_truncated_zipfile(self): 338 fp = io.BytesIO() 339 with zipfile.ZipFile(fp, mode='w') as zipf: 340 zipf.writestr('strfile', self.data, compress_type=self.compression) 341 end_offset = fp.tell() 342 zipfiledata = fp.getvalue() 343 344 fp = io.BytesIO(zipfiledata) 345 with zipfile.ZipFile(fp) as zipf: 346 with zipf.open('strfile') as zipopen: 347 fp.truncate(end_offset - 20) 348 with self.assertRaises(EOFError): 349 zipopen.read() 350 351 fp = io.BytesIO(zipfiledata) 352 with zipfile.ZipFile(fp) as zipf: 353 with zipf.open('strfile') as zipopen: 354 fp.truncate(end_offset - 20) 355 with self.assertRaises(EOFError): 356 while zipopen.read(100): 357 pass 358 359 fp = io.BytesIO(zipfiledata) 360 with zipfile.ZipFile(fp) as zipf: 361 with zipf.open('strfile') as zipopen: 362 fp.truncate(end_offset - 20) 363 with self.assertRaises(EOFError): 364 while zipopen.read1(100): 365 pass 366 367 def test_repr(self): 368 fname = 'file.name' 369 for f in get_files(self): 370 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 371 zipfp.write(TESTFN, fname) 372 r = repr(zipfp) 373 self.assertIn("mode='w'", r) 374 375 with zipfile.ZipFile(f, 'r') as zipfp: 376 r = repr(zipfp) 377 if isinstance(f, str): 378 self.assertIn('filename=%r' % f, r) 379 else: 380 self.assertIn('file=%r' % f, r) 381 self.assertIn("mode='r'", r) 382 r = repr(zipfp.getinfo(fname)) 383 self.assertIn('filename=%r' % fname, r) 384 self.assertIn('filemode=', r) 385 self.assertIn('file_size=', r) 386 if self.compression != zipfile.ZIP_STORED: 387 self.assertIn('compress_type=', r) 388 self.assertIn('compress_size=', r) 389 with zipfp.open(fname) as zipopen: 390 r = repr(zipopen) 391 self.assertIn('name=%r' % fname, r) 392 if self.compression != zipfile.ZIP_STORED: 393 self.assertIn('compress_type=', r) 394 self.assertIn('[closed]', repr(zipopen)) 395 self.assertIn('[closed]', repr(zipfp)) 396 397 def test_compresslevel_basic(self): 398 for f in get_files(self): 399 self.zip_test(f, self.compression, compresslevel=9) 400 401 def test_per_file_compresslevel(self): 402 """Check that files within a Zip archive can have different 403 compression levels.""" 404 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 405 zipfp.write(TESTFN, 'compress_1') 406 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 407 one_info = zipfp.getinfo('compress_1') 408 nine_info = zipfp.getinfo('compress_9') 409 self.assertEqual(one_info._compresslevel, 1) 410 self.assertEqual(nine_info.compress_level, 9) 411 412 def test_writing_errors(self): 413 class BrokenFile(io.BytesIO): 414 def write(self, data): 415 nonlocal count 416 if count is not None: 417 if count == stop: 418 raise OSError 419 count += 1 420 super().write(data) 421 422 stop = 0 423 while True: 424 testfile = BrokenFile() 425 count = None 426 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 427 with zipfp.open('file1', 'w') as f: 428 f.write(b'data1') 429 count = 0 430 try: 431 with zipfp.open('file2', 'w') as f: 432 f.write(b'data2') 433 except OSError: 434 stop += 1 435 else: 436 break 437 finally: 438 count = None 439 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 440 self.assertEqual(zipfp.namelist(), ['file1']) 441 self.assertEqual(zipfp.read('file1'), b'data1') 442 443 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 444 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 445 self.assertEqual(zipfp.read('file1'), b'data1') 446 self.assertEqual(zipfp.read('file2'), b'data2') 447 448 def test_zipextfile_attrs(self): 449 fname = "somefile.txt" 450 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 451 zipfp.writestr(fname, "bogus") 452 453 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 454 with zipfp.open(fname) as fid: 455 self.assertEqual(fid.name, fname) 456 self.assertRaises(io.UnsupportedOperation, fid.fileno) 457 self.assertEqual(fid.mode, 'rb') 458 self.assertIs(fid.readable(), True) 459 self.assertIs(fid.writable(), False) 460 self.assertIs(fid.seekable(), True) 461 self.assertIs(fid.closed, False) 462 self.assertIs(fid.closed, True) 463 self.assertEqual(fid.name, fname) 464 self.assertEqual(fid.mode, 'rb') 465 self.assertRaises(io.UnsupportedOperation, fid.fileno) 466 self.assertRaises(ValueError, fid.readable) 467 self.assertIs(fid.writable(), False) 468 self.assertRaises(ValueError, fid.seekable) 469 470 def tearDown(self): 471 unlink(TESTFN) 472 unlink(TESTFN2) 473 474 475class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 476 unittest.TestCase): 477 compression = zipfile.ZIP_STORED 478 test_low_compression = None 479 480 def zip_test_writestr_permissions(self, f, compression): 481 # Make sure that writestr and open(... mode='w') create files with 482 # mode 0600, when they are passed a name rather than a ZipInfo 483 # instance. 484 485 self.make_test_archive(f, compression) 486 with zipfile.ZipFile(f, "r") as zipfp: 487 zinfo = zipfp.getinfo('strfile') 488 self.assertEqual(zinfo.external_attr, 0o600 << 16) 489 490 zinfo2 = zipfp.getinfo('written-open-w') 491 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 492 493 def test_writestr_permissions(self): 494 for f in get_files(self): 495 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 496 497 def test_absolute_arcnames(self): 498 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 499 zipfp.write(TESTFN, "/absolute") 500 501 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 502 self.assertEqual(zipfp.namelist(), ["absolute"]) 503 504 def test_append_to_zip_file(self): 505 """Test appending to an existing zipfile.""" 506 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 507 zipfp.write(TESTFN, TESTFN) 508 509 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 510 zipfp.writestr("strfile", self.data) 511 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 512 513 def test_append_to_non_zip_file(self): 514 """Test appending to an existing file that is not a zipfile.""" 515 # NOTE: this test fails if len(d) < 22 because of the first 516 # line "fpin.seek(-22, 2)" in _EndRecData 517 data = b'I am not a ZipFile!'*10 518 with open(TESTFN2, 'wb') as f: 519 f.write(data) 520 521 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 522 zipfp.write(TESTFN, TESTFN) 523 524 with open(TESTFN2, 'rb') as f: 525 f.seek(len(data)) 526 with zipfile.ZipFile(f, "r") as zipfp: 527 self.assertEqual(zipfp.namelist(), [TESTFN]) 528 self.assertEqual(zipfp.read(TESTFN), self.data) 529 with open(TESTFN2, 'rb') as f: 530 self.assertEqual(f.read(len(data)), data) 531 zipfiledata = f.read() 532 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 533 self.assertEqual(zipfp.namelist(), [TESTFN]) 534 self.assertEqual(zipfp.read(TESTFN), self.data) 535 536 def test_read_concatenated_zip_file(self): 537 with io.BytesIO() as bio: 538 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 539 zipfp.write(TESTFN, TESTFN) 540 zipfiledata = bio.getvalue() 541 data = b'I am not a ZipFile!'*10 542 with open(TESTFN2, 'wb') as f: 543 f.write(data) 544 f.write(zipfiledata) 545 546 with zipfile.ZipFile(TESTFN2) as zipfp: 547 self.assertEqual(zipfp.namelist(), [TESTFN]) 548 self.assertEqual(zipfp.read(TESTFN), self.data) 549 550 def test_append_to_concatenated_zip_file(self): 551 with io.BytesIO() as bio: 552 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 553 zipfp.write(TESTFN, TESTFN) 554 zipfiledata = bio.getvalue() 555 data = b'I am not a ZipFile!'*1000000 556 with open(TESTFN2, 'wb') as f: 557 f.write(data) 558 f.write(zipfiledata) 559 560 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 561 self.assertEqual(zipfp.namelist(), [TESTFN]) 562 zipfp.writestr('strfile', self.data) 563 564 with open(TESTFN2, 'rb') as f: 565 self.assertEqual(f.read(len(data)), data) 566 zipfiledata = f.read() 567 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 568 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 569 self.assertEqual(zipfp.read(TESTFN), self.data) 570 self.assertEqual(zipfp.read('strfile'), self.data) 571 572 def test_ignores_newline_at_end(self): 573 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 574 zipfp.write(TESTFN, TESTFN) 575 with open(TESTFN2, 'a', encoding='utf-8') as f: 576 f.write("\r\n\00\00\00") 577 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 578 self.assertIsInstance(zipfp, zipfile.ZipFile) 579 580 def test_ignores_stuff_appended_past_comments(self): 581 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 582 zipfp.comment = b"this is a comment" 583 zipfp.write(TESTFN, TESTFN) 584 with open(TESTFN2, 'a', encoding='utf-8') as f: 585 f.write("abcdef\r\n") 586 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 587 self.assertIsInstance(zipfp, zipfile.ZipFile) 588 self.assertEqual(zipfp.comment, b"this is a comment") 589 590 def test_write_default_name(self): 591 """Check that calling ZipFile.write without arcname specified 592 produces the expected result.""" 593 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 594 zipfp.write(TESTFN) 595 with open(TESTFN, "rb") as f: 596 self.assertEqual(zipfp.read(TESTFN), f.read()) 597 598 def test_io_on_closed_zipextfile(self): 599 fname = "somefile.txt" 600 with zipfile.ZipFile(TESTFN2, mode="w", compression=self.compression) as zipfp: 601 zipfp.writestr(fname, "bogus") 602 603 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 604 with zipfp.open(fname) as fid: 605 fid.close() 606 self.assertIs(fid.closed, True) 607 self.assertRaises(ValueError, fid.read) 608 self.assertRaises(ValueError, fid.seek, 0) 609 self.assertRaises(ValueError, fid.tell) 610 611 def test_write_to_readonly(self): 612 """Check that trying to call write() on a readonly ZipFile object 613 raises a ValueError.""" 614 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 615 zipfp.writestr("somefile.txt", "bogus") 616 617 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 618 self.assertRaises(ValueError, zipfp.write, TESTFN) 619 620 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 621 with self.assertRaises(ValueError): 622 zipfp.open(TESTFN, mode='w') 623 624 def test_add_file_before_1980(self): 625 # Set atime and mtime to 1970-01-01 626 os.utime(TESTFN, (0, 0)) 627 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 628 self.assertRaises(ValueError, zipfp.write, TESTFN) 629 630 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 631 zipfp.write(TESTFN) 632 zinfo = zipfp.getinfo(TESTFN) 633 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 634 635 def test_add_file_after_2107(self): 636 # Set atime and mtime to 2108-12-30 637 ts = 4386268800 638 try: 639 time.localtime(ts) 640 except OverflowError: 641 self.skipTest(f'time.localtime({ts}) raises OverflowError') 642 try: 643 os.utime(TESTFN, (ts, ts)) 644 except OverflowError: 645 self.skipTest('Host fs cannot set timestamp to required value.') 646 647 mtime_ns = os.stat(TESTFN).st_mtime_ns 648 if mtime_ns != (4386268800 * 10**9): 649 # XFS filesystem is limited to 32-bit timestamp, but the syscall 650 # didn't fail. Moreover, there is a VFS bug which returns 651 # a cached timestamp which is different than the value on disk. 652 # 653 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 654 # 655 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 656 # https://bugs.python.org/issue39460#msg360952 657 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 658 659 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 660 self.assertRaises(struct.error, zipfp.write, TESTFN) 661 662 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 663 zipfp.write(TESTFN) 664 zinfo = zipfp.getinfo(TESTFN) 665 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 666 667 668@requires_zlib() 669class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 670 unittest.TestCase): 671 compression = zipfile.ZIP_DEFLATED 672 673 def test_per_file_compression(self): 674 """Check that files within a Zip archive can have different 675 compression options.""" 676 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 677 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 678 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 679 sinfo = zipfp.getinfo('storeme') 680 dinfo = zipfp.getinfo('deflateme') 681 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 682 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 683 684@requires_bz2() 685class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 686 unittest.TestCase): 687 compression = zipfile.ZIP_BZIP2 688 689@requires_lzma() 690class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 691 unittest.TestCase): 692 compression = zipfile.ZIP_LZMA 693 694 695class AbstractTestZip64InSmallFiles: 696 # These tests test the ZIP64 functionality without using large files, 697 # see test_zipfile64 for proper tests. 698 699 @classmethod 700 def setUpClass(cls): 701 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 702 for i in range(0, FIXEDTEST_SIZE)) 703 cls.data = b'\n'.join(line_gen) 704 705 def setUp(self): 706 self._limit = zipfile.ZIP64_LIMIT 707 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 708 zipfile.ZIP64_LIMIT = 1000 709 zipfile.ZIP_FILECOUNT_LIMIT = 9 710 711 # Make a source file with some lines 712 with open(TESTFN, "wb") as fp: 713 fp.write(self.data) 714 715 def zip_test(self, f, compression): 716 # Create the ZIP archive 717 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 718 zipfp.write(TESTFN, "another.name") 719 zipfp.write(TESTFN, TESTFN) 720 zipfp.writestr("strfile", self.data) 721 722 # Read the ZIP archive 723 with zipfile.ZipFile(f, "r", compression) as zipfp: 724 self.assertEqual(zipfp.read(TESTFN), self.data) 725 self.assertEqual(zipfp.read("another.name"), self.data) 726 self.assertEqual(zipfp.read("strfile"), self.data) 727 728 # Print the ZIP directory 729 fp = io.StringIO() 730 zipfp.printdir(fp) 731 732 directory = fp.getvalue() 733 lines = directory.splitlines() 734 self.assertEqual(len(lines), 4) # Number of files + header 735 736 self.assertIn('File Name', lines[0]) 737 self.assertIn('Modified', lines[0]) 738 self.assertIn('Size', lines[0]) 739 740 fn, date, time_, size = lines[1].split() 741 self.assertEqual(fn, 'another.name') 742 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 743 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 744 self.assertEqual(size, str(len(self.data))) 745 746 # Check the namelist 747 names = zipfp.namelist() 748 self.assertEqual(len(names), 3) 749 self.assertIn(TESTFN, names) 750 self.assertIn("another.name", names) 751 self.assertIn("strfile", names) 752 753 # Check infolist 754 infos = zipfp.infolist() 755 names = [i.filename for i in infos] 756 self.assertEqual(len(names), 3) 757 self.assertIn(TESTFN, names) 758 self.assertIn("another.name", names) 759 self.assertIn("strfile", names) 760 for i in infos: 761 self.assertEqual(i.file_size, len(self.data)) 762 763 # check getinfo 764 for nm in (TESTFN, "another.name", "strfile"): 765 info = zipfp.getinfo(nm) 766 self.assertEqual(info.filename, nm) 767 self.assertEqual(info.file_size, len(self.data)) 768 769 # Check that testzip thinks the archive is valid 770 self.assertIsNone(zipfp.testzip()) 771 772 def test_basic(self): 773 for f in get_files(self): 774 self.zip_test(f, self.compression) 775 776 def test_too_many_files(self): 777 # This test checks that more than 64k files can be added to an archive, 778 # and that the resulting archive can be read properly by ZipFile 779 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 780 allowZip64=True) 781 zipf.debug = 100 782 numfiles = 15 783 for i in range(numfiles): 784 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 785 self.assertEqual(len(zipf.namelist()), numfiles) 786 zipf.close() 787 788 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 789 self.assertEqual(len(zipf2.namelist()), numfiles) 790 for i in range(numfiles): 791 content = zipf2.read("foo%08d" % i).decode('ascii') 792 self.assertEqual(content, "%d" % (i**3 % 57)) 793 zipf2.close() 794 795 def test_too_many_files_append(self): 796 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 797 allowZip64=False) 798 zipf.debug = 100 799 numfiles = 9 800 for i in range(numfiles): 801 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 802 self.assertEqual(len(zipf.namelist()), numfiles) 803 with self.assertRaises(zipfile.LargeZipFile): 804 zipf.writestr("foo%08d" % numfiles, b'') 805 self.assertEqual(len(zipf.namelist()), numfiles) 806 zipf.close() 807 808 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 809 allowZip64=False) 810 zipf.debug = 100 811 self.assertEqual(len(zipf.namelist()), numfiles) 812 with self.assertRaises(zipfile.LargeZipFile): 813 zipf.writestr("foo%08d" % numfiles, b'') 814 self.assertEqual(len(zipf.namelist()), numfiles) 815 zipf.close() 816 817 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 818 allowZip64=True) 819 zipf.debug = 100 820 self.assertEqual(len(zipf.namelist()), numfiles) 821 numfiles2 = 15 822 for i in range(numfiles, numfiles2): 823 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 824 self.assertEqual(len(zipf.namelist()), numfiles2) 825 zipf.close() 826 827 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 828 self.assertEqual(len(zipf2.namelist()), numfiles2) 829 for i in range(numfiles2): 830 content = zipf2.read("foo%08d" % i).decode('ascii') 831 self.assertEqual(content, "%d" % (i**3 % 57)) 832 zipf2.close() 833 834 def tearDown(self): 835 zipfile.ZIP64_LIMIT = self._limit 836 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 837 unlink(TESTFN) 838 unlink(TESTFN2) 839 840 841class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 842 unittest.TestCase): 843 compression = zipfile.ZIP_STORED 844 845 def large_file_exception_test(self, f, compression): 846 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 847 self.assertRaises(zipfile.LargeZipFile, 848 zipfp.write, TESTFN, "another.name") 849 850 def large_file_exception_test2(self, f, compression): 851 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 852 self.assertRaises(zipfile.LargeZipFile, 853 zipfp.writestr, "another.name", self.data) 854 855 def test_large_file_exception(self): 856 for f in get_files(self): 857 self.large_file_exception_test(f, zipfile.ZIP_STORED) 858 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 859 860 def test_absolute_arcnames(self): 861 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 862 allowZip64=True) as zipfp: 863 zipfp.write(TESTFN, "/absolute") 864 865 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 866 self.assertEqual(zipfp.namelist(), ["absolute"]) 867 868 def test_append(self): 869 # Test that appending to the Zip64 archive doesn't change 870 # extra fields of existing entries. 871 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 872 zipfp.writestr("strfile", self.data) 873 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 874 zinfo = zipfp.getinfo("strfile") 875 extra = zinfo.extra 876 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 877 zipfp.writestr("strfile2", self.data) 878 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 879 zinfo = zipfp.getinfo("strfile") 880 self.assertEqual(zinfo.extra, extra) 881 882 def make_zip64_file( 883 self, file_size_64_set=False, file_size_extra=False, 884 compress_size_64_set=False, compress_size_extra=False, 885 header_offset_64_set=False, header_offset_extra=False, 886 ): 887 """Generate bytes sequence for a zip with (incomplete) zip64 data. 888 889 The actual values (not the zip 64 0xffffffff values) stored in the file 890 are: 891 file_size: 8 892 compress_size: 8 893 header_offset: 0 894 """ 895 actual_size = 8 896 actual_header_offset = 0 897 local_zip64_fields = [] 898 central_zip64_fields = [] 899 900 file_size = actual_size 901 if file_size_64_set: 902 file_size = 0xffffffff 903 if file_size_extra: 904 local_zip64_fields.append(actual_size) 905 central_zip64_fields.append(actual_size) 906 file_size = struct.pack("<L", file_size) 907 908 compress_size = actual_size 909 if compress_size_64_set: 910 compress_size = 0xffffffff 911 if compress_size_extra: 912 local_zip64_fields.append(actual_size) 913 central_zip64_fields.append(actual_size) 914 compress_size = struct.pack("<L", compress_size) 915 916 header_offset = actual_header_offset 917 if header_offset_64_set: 918 header_offset = 0xffffffff 919 if header_offset_extra: 920 central_zip64_fields.append(actual_header_offset) 921 header_offset = struct.pack("<L", header_offset) 922 923 local_extra = struct.pack( 924 '<HH' + 'Q'*len(local_zip64_fields), 925 0x0001, 926 8*len(local_zip64_fields), 927 *local_zip64_fields 928 ) 929 930 central_extra = struct.pack( 931 '<HH' + 'Q'*len(central_zip64_fields), 932 0x0001, 933 8*len(central_zip64_fields), 934 *central_zip64_fields 935 ) 936 937 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 938 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 939 940 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 941 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 942 943 filename = b"test.txt" 944 content = b"test1234" 945 filename_length = struct.pack("<H", len(filename)) 946 zip64_contents = ( 947 # Local file header 948 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 949 + compress_size 950 + file_size 951 + filename_length 952 + local_extra_length 953 + filename 954 + local_extra 955 + content 956 # Central directory: 957 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 958 + compress_size 959 + file_size 960 + filename_length 961 + central_extra_length 962 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 963 + header_offset 964 + filename 965 + central_extra 966 # Zip64 end of central directory 967 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 968 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 969 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 970 + central_dir_size 971 + offset_to_central_dir 972 # Zip64 end of central directory locator 973 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 974 + b"\x00\x00\x00" 975 # end of central directory 976 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 977 + b"\x00\x00\x00\x00" 978 ) 979 return zip64_contents 980 981 def test_bad_zip64_extra(self): 982 """Missing zip64 extra records raises an exception. 983 984 There are 4 fields that the zip64 format handles (the disk number is 985 not used in this module and so is ignored here). According to the zip 986 spec: 987 The order of the fields in the zip64 extended 988 information record is fixed, but the fields MUST 989 only appear if the corresponding Local or Central 990 directory record field is set to 0xFFFF or 0xFFFFFFFF. 991 992 If the zip64 extra content doesn't contain enough entries for the 993 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 994 This test mismatches the length of the zip64 extra field and the number 995 of fields set to indicate the presence of zip64 data. 996 """ 997 # zip64 file size present, no fields in extra, expecting one, equals 998 # missing file size. 999 missing_file_size_extra = self.make_zip64_file( 1000 file_size_64_set=True, 1001 ) 1002 with self.assertRaises(zipfile.BadZipFile) as e: 1003 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 1004 self.assertIn('file size', str(e.exception).lower()) 1005 1006 # zip64 file size present, zip64 compress size present, one field in 1007 # extra, expecting two, equals missing compress size. 1008 missing_compress_size_extra = self.make_zip64_file( 1009 file_size_64_set=True, 1010 file_size_extra=True, 1011 compress_size_64_set=True, 1012 ) 1013 with self.assertRaises(zipfile.BadZipFile) as e: 1014 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1015 self.assertIn('compress size', str(e.exception).lower()) 1016 1017 # zip64 compress size present, no fields in extra, expecting one, 1018 # equals missing compress size. 1019 missing_compress_size_extra = self.make_zip64_file( 1020 compress_size_64_set=True, 1021 ) 1022 with self.assertRaises(zipfile.BadZipFile) as e: 1023 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1024 self.assertIn('compress size', str(e.exception).lower()) 1025 1026 # zip64 file size present, zip64 compress size present, zip64 header 1027 # offset present, two fields in extra, expecting three, equals missing 1028 # header offset 1029 missing_header_offset_extra = self.make_zip64_file( 1030 file_size_64_set=True, 1031 file_size_extra=True, 1032 compress_size_64_set=True, 1033 compress_size_extra=True, 1034 header_offset_64_set=True, 1035 ) 1036 with self.assertRaises(zipfile.BadZipFile) as e: 1037 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1038 self.assertIn('header offset', str(e.exception).lower()) 1039 1040 # zip64 compress size present, zip64 header offset present, one field 1041 # in extra, expecting two, equals missing header offset 1042 missing_header_offset_extra = self.make_zip64_file( 1043 file_size_64_set=False, 1044 compress_size_64_set=True, 1045 compress_size_extra=True, 1046 header_offset_64_set=True, 1047 ) 1048 with self.assertRaises(zipfile.BadZipFile) as e: 1049 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1050 self.assertIn('header offset', str(e.exception).lower()) 1051 1052 # zip64 file size present, zip64 header offset present, one field in 1053 # extra, expecting two, equals missing header offset 1054 missing_header_offset_extra = self.make_zip64_file( 1055 file_size_64_set=True, 1056 file_size_extra=True, 1057 compress_size_64_set=False, 1058 header_offset_64_set=True, 1059 ) 1060 with self.assertRaises(zipfile.BadZipFile) as e: 1061 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1062 self.assertIn('header offset', str(e.exception).lower()) 1063 1064 # zip64 header offset present, no fields in extra, expecting one, 1065 # equals missing header offset 1066 missing_header_offset_extra = self.make_zip64_file( 1067 file_size_64_set=False, 1068 compress_size_64_set=False, 1069 header_offset_64_set=True, 1070 ) 1071 with self.assertRaises(zipfile.BadZipFile) as e: 1072 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1073 self.assertIn('header offset', str(e.exception).lower()) 1074 1075 def test_generated_valid_zip64_extra(self): 1076 # These values are what is set in the make_zip64_file method. 1077 expected_file_size = 8 1078 expected_compress_size = 8 1079 expected_header_offset = 0 1080 expected_content = b"test1234" 1081 1082 # Loop through the various valid combinations of zip64 masks 1083 # present and extra fields present. 1084 params = ( 1085 {"file_size_64_set": True, "file_size_extra": True}, 1086 {"compress_size_64_set": True, "compress_size_extra": True}, 1087 {"header_offset_64_set": True, "header_offset_extra": True}, 1088 ) 1089 1090 for r in range(1, len(params) + 1): 1091 for combo in itertools.combinations(params, r): 1092 kwargs = {} 1093 for c in combo: 1094 kwargs.update(c) 1095 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1096 zinfo = zf.infolist()[0] 1097 self.assertEqual(zinfo.file_size, expected_file_size) 1098 self.assertEqual(zinfo.compress_size, expected_compress_size) 1099 self.assertEqual(zinfo.header_offset, expected_header_offset) 1100 self.assertEqual(zf.read(zinfo), expected_content) 1101 1102 def test_force_zip64(self): 1103 """Test that forcing zip64 extensions correctly notes this in the zip file""" 1104 1105 # GH-103861 describes an issue where forcing a small file to use zip64 1106 # extensions would add a zip64 extra record, but not change the data 1107 # sizes to 0xFFFFFFFF to indicate to the extractor that the zip64 1108 # record should be read. Additionally, it would not set the required 1109 # version to indicate that zip64 extensions are required to extract it. 1110 # This test replicates the situation and reads the raw data to specifically ensure: 1111 # - The required extract version is always >= ZIP64_VERSION 1112 # - The compressed and uncompressed size in the file headers are both 1113 # 0xFFFFFFFF (ie. point to zip64 record) 1114 # - The zip64 record is provided and has the correct sizes in it 1115 # Other aspects of the zip are checked as well, but verifying the above is the main goal. 1116 # Because this is hard to verify by parsing the data as a zip, the raw 1117 # bytes are checked to ensure that they line up with the zip spec. 1118 # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 1119 # The relevant sections for this test are: 1120 # - 4.3.7 for local file header 1121 # - 4.5.3 for zip64 extra field 1122 1123 data = io.BytesIO() 1124 with zipfile.ZipFile(data, mode="w", allowZip64=True) as zf: 1125 with zf.open("text.txt", mode="w", force_zip64=True) as zi: 1126 zi.write(b"_") 1127 1128 zipdata = data.getvalue() 1129 1130 # pull out and check zip information 1131 ( 1132 header, vers, os, flags, comp, csize, usize, fn_len, 1133 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, cd_sig 1134 ) = struct.unpack("<4sBBHH8xIIHH8shhQQx4s", zipdata[:63]) 1135 1136 self.assertEqual(header, b"PK\x03\x04") # local file header 1137 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1138 self.assertEqual(os, 0) # compatible with MS-DOS 1139 self.assertEqual(flags, 0) # no flags 1140 self.assertEqual(comp, 0) # compression method = stored 1141 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1142 self.assertEqual(usize, 0xFFFFFFFF) 1143 self.assertEqual(fn_len, 8) # filename len 1144 self.assertEqual(ex_total_len, 20) # size of extra records 1145 self.assertEqual(ex_id, 1) # Zip64 extra record 1146 self.assertEqual(ex_len, 16) # 16 bytes of data 1147 self.assertEqual(ex_usize, 1) # uncompressed size 1148 self.assertEqual(ex_csize, 1) # compressed size 1149 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1150 1151 z = zipfile.ZipFile(io.BytesIO(zipdata)) 1152 zinfos = z.infolist() 1153 self.assertEqual(len(zinfos), 1) 1154 self.assertGreaterEqual(zinfos[0].extract_version, zipfile.ZIP64_VERSION) # requires zip64 to extract 1155 1156 def test_unseekable_zip_unknown_filesize(self): 1157 """Test that creating a zip with/without seeking will raise a RuntimeError if zip64 was required but not used""" 1158 1159 def make_zip(fp): 1160 with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf: 1161 with zf.open("text.txt", mode="w", force_zip64=False) as zi: 1162 zi.write(b"_" * (zipfile.ZIP64_LIMIT + 1)) 1163 1164 self.assertRaises(RuntimeError, make_zip, io.BytesIO()) 1165 self.assertRaises(RuntimeError, make_zip, Unseekable(io.BytesIO())) 1166 1167 def test_zip64_required_not_allowed_fail(self): 1168 """Test that trying to add a large file to a zip that doesn't allow zip64 extensions fails on add""" 1169 def make_zip(fp): 1170 with zipfile.ZipFile(fp, mode="w", allowZip64=False) as zf: 1171 # pretend zipfile.ZipInfo.from_file was used to get the name and filesize 1172 info = zipfile.ZipInfo("text.txt") 1173 info.file_size = zipfile.ZIP64_LIMIT + 1 1174 zf.open(info, mode="w") 1175 1176 self.assertRaises(zipfile.LargeZipFile, make_zip, io.BytesIO()) 1177 self.assertRaises(zipfile.LargeZipFile, make_zip, Unseekable(io.BytesIO())) 1178 1179 def test_unseekable_zip_known_filesize(self): 1180 """Test that creating a zip without seeking will use zip64 extensions if the file size is provided up-front""" 1181 1182 # This test ensures that the zip will use a zip64 data descriptor (same 1183 # as a regular data descriptor except the sizes are 8 bytes instead of 1184 # 4) record to communicate the size of a file if the zip is being 1185 # written to an unseekable stream. 1186 # Because this sort of thing is hard to verify by parsing the data back 1187 # in as a zip, this test looks at the raw bytes created to ensure that 1188 # the correct data has been generated. 1189 # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 1190 # The relevant sections for this test are: 1191 # - 4.3.7 for local file header 1192 # - 4.3.9 for the data descriptor 1193 # - 4.5.3 for zip64 extra field 1194 1195 file_size = zipfile.ZIP64_LIMIT + 1 1196 1197 def make_zip(fp): 1198 with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf: 1199 # pretend zipfile.ZipInfo.from_file was used to get the name and filesize 1200 info = zipfile.ZipInfo("text.txt") 1201 info.file_size = file_size 1202 with zf.open(info, mode="w", force_zip64=False) as zi: 1203 zi.write(b"_" * file_size) 1204 return fp 1205 1206 # check seekable file information 1207 seekable_data = make_zip(io.BytesIO()).getvalue() 1208 ( 1209 header, vers, os, flags, comp, csize, usize, fn_len, 1210 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, 1211 cd_sig 1212 ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s".format(file_size), seekable_data[:62 + file_size]) 1213 1214 self.assertEqual(header, b"PK\x03\x04") # local file header 1215 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1216 self.assertEqual(os, 0) # compatible with MS-DOS 1217 self.assertEqual(flags, 0) # no flags set 1218 self.assertEqual(comp, 0) # compression method = stored 1219 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1220 self.assertEqual(usize, 0xFFFFFFFF) 1221 self.assertEqual(fn_len, 8) # filename len 1222 self.assertEqual(ex_total_len, 20) # size of extra records 1223 self.assertEqual(ex_id, 1) # Zip64 extra record 1224 self.assertEqual(ex_len, 16) # 16 bytes of data 1225 self.assertEqual(ex_usize, file_size) # uncompressed size 1226 self.assertEqual(ex_csize, file_size) # compressed size 1227 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1228 1229 # check unseekable file information 1230 unseekable_data = make_zip(Unseekable(io.BytesIO())).fp.getvalue() 1231 ( 1232 header, vers, os, flags, comp, csize, usize, fn_len, 1233 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, 1234 dd_header, dd_usize, dd_csize, cd_sig 1235 ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s4xQQ4s".format(file_size), unseekable_data[:86 + file_size]) 1236 1237 self.assertEqual(header, b"PK\x03\x04") # local file header 1238 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1239 self.assertEqual(os, 0) # compatible with MS-DOS 1240 self.assertEqual("{:b}".format(flags), "1000") # streaming flag set 1241 self.assertEqual(comp, 0) # compression method = stored 1242 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1243 self.assertEqual(usize, 0xFFFFFFFF) 1244 self.assertEqual(fn_len, 8) # filename len 1245 self.assertEqual(ex_total_len, 20) # size of extra records 1246 self.assertEqual(ex_id, 1) # Zip64 extra record 1247 self.assertEqual(ex_len, 16) # 16 bytes of data 1248 self.assertEqual(ex_usize, 0) # uncompressed size - 0 to defer to data descriptor 1249 self.assertEqual(ex_csize, 0) # compressed size - 0 to defer to data descriptor 1250 self.assertEqual(dd_header, b"PK\07\x08") # data descriptor 1251 self.assertEqual(dd_usize, file_size) # file size (8 bytes because zip64) 1252 self.assertEqual(dd_csize, file_size) # compressed size (8 bytes because zip64) 1253 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1254 1255 1256@requires_zlib() 1257class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1258 unittest.TestCase): 1259 compression = zipfile.ZIP_DEFLATED 1260 1261@requires_bz2() 1262class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1263 unittest.TestCase): 1264 compression = zipfile.ZIP_BZIP2 1265 1266@requires_lzma() 1267class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1268 unittest.TestCase): 1269 compression = zipfile.ZIP_LZMA 1270 1271 1272class AbstractWriterTests: 1273 1274 def tearDown(self): 1275 unlink(TESTFN2) 1276 1277 def test_close_after_close(self): 1278 data = b'content' 1279 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1280 w = zipf.open('test', 'w') 1281 w.write(data) 1282 w.close() 1283 self.assertTrue(w.closed) 1284 w.close() 1285 self.assertTrue(w.closed) 1286 self.assertEqual(zipf.read('test'), data) 1287 1288 def test_write_after_close(self): 1289 data = b'content' 1290 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1291 w = zipf.open('test', 'w') 1292 w.write(data) 1293 w.close() 1294 self.assertTrue(w.closed) 1295 self.assertRaises(ValueError, w.write, b'') 1296 self.assertEqual(zipf.read('test'), data) 1297 1298 def test_issue44439(self): 1299 q = array.array('Q', [1, 2, 3, 4, 5]) 1300 LENGTH = len(q) * q.itemsize 1301 with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip: 1302 with zip.open('data', 'w') as data: 1303 self.assertEqual(data.write(q), LENGTH) 1304 self.assertEqual(zip.getinfo('data').file_size, LENGTH) 1305 1306 def test_zipwritefile_attrs(self): 1307 fname = "somefile.txt" 1308 with zipfile.ZipFile(TESTFN2, mode="w", compression=self.compression) as zipfp: 1309 with zipfp.open(fname, 'w') as fid: 1310 self.assertEqual(fid.name, fname) 1311 self.assertRaises(io.UnsupportedOperation, fid.fileno) 1312 self.assertEqual(fid.mode, 'wb') 1313 self.assertIs(fid.readable(), False) 1314 self.assertIs(fid.writable(), True) 1315 self.assertIs(fid.seekable(), False) 1316 self.assertIs(fid.closed, False) 1317 self.assertIs(fid.closed, True) 1318 self.assertEqual(fid.name, fname) 1319 self.assertEqual(fid.mode, 'wb') 1320 self.assertRaises(io.UnsupportedOperation, fid.fileno) 1321 self.assertIs(fid.readable(), False) 1322 self.assertIs(fid.writable(), True) 1323 self.assertIs(fid.seekable(), False) 1324 1325class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1326 compression = zipfile.ZIP_STORED 1327 1328@requires_zlib() 1329class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1330 compression = zipfile.ZIP_DEFLATED 1331 1332@requires_bz2() 1333class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1334 compression = zipfile.ZIP_BZIP2 1335 1336@requires_lzma() 1337class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1338 compression = zipfile.ZIP_LZMA 1339 1340 1341class PyZipFileTests(unittest.TestCase): 1342 def assertCompiledIn(self, name, namelist): 1343 if name + 'o' not in namelist: 1344 self.assertIn(name + 'c', namelist) 1345 1346 def requiresWriteAccess(self, path): 1347 # effective_ids unavailable on windows 1348 if not os.access(path, os.W_OK, 1349 effective_ids=os.access in os.supports_effective_ids): 1350 self.skipTest('requires write access to the installed location') 1351 filename = os.path.join(path, 'test_zipfile.try') 1352 try: 1353 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1354 os.close(fd) 1355 except Exception: 1356 self.skipTest('requires write access to the installed location') 1357 unlink(filename) 1358 1359 def test_write_pyfile(self): 1360 self.requiresWriteAccess(os.path.dirname(__file__)) 1361 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1362 fn = __file__ 1363 if fn.endswith('.pyc'): 1364 path_split = fn.split(os.sep) 1365 if os.altsep is not None: 1366 path_split.extend(fn.split(os.altsep)) 1367 if '__pycache__' in path_split: 1368 fn = importlib.util.source_from_cache(fn) 1369 else: 1370 fn = fn[:-1] 1371 1372 zipfp.writepy(fn) 1373 1374 bn = os.path.basename(fn) 1375 self.assertNotIn(bn, zipfp.namelist()) 1376 self.assertCompiledIn(bn, zipfp.namelist()) 1377 1378 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1379 fn = __file__ 1380 if fn.endswith('.pyc'): 1381 fn = fn[:-1] 1382 1383 zipfp.writepy(fn, "testpackage") 1384 1385 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1386 self.assertNotIn(bn, zipfp.namelist()) 1387 self.assertCompiledIn(bn, zipfp.namelist()) 1388 1389 def test_write_python_package(self): 1390 import email 1391 packagedir = os.path.dirname(email.__file__) 1392 self.requiresWriteAccess(packagedir) 1393 1394 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1395 zipfp.writepy(packagedir) 1396 1397 # Check for a couple of modules at different levels of the 1398 # hierarchy 1399 names = zipfp.namelist() 1400 self.assertCompiledIn('email/__init__.py', names) 1401 self.assertCompiledIn('email/mime/text.py', names) 1402 1403 def test_write_filtered_python_package(self): 1404 import test 1405 packagedir = os.path.dirname(test.__file__) 1406 self.requiresWriteAccess(packagedir) 1407 1408 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1409 1410 # first make sure that the test folder gives error messages 1411 # (on the badsyntax_... files) 1412 with captured_stdout() as reportSIO: 1413 zipfp.writepy(packagedir) 1414 reportStr = reportSIO.getvalue() 1415 self.assertTrue('SyntaxError' in reportStr) 1416 1417 # then check that the filter works on the whole package 1418 with captured_stdout() as reportSIO: 1419 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1420 reportStr = reportSIO.getvalue() 1421 self.assertTrue('SyntaxError' not in reportStr) 1422 1423 # then check that the filter works on individual files 1424 def filter(path): 1425 return not os.path.basename(path).startswith("bad") 1426 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1427 zipfp.writepy(packagedir, filterfunc=filter) 1428 reportStr = reportSIO.getvalue() 1429 if reportStr: 1430 print(reportStr) 1431 self.assertTrue('SyntaxError' not in reportStr) 1432 1433 def test_write_with_optimization(self): 1434 import email 1435 packagedir = os.path.dirname(email.__file__) 1436 self.requiresWriteAccess(packagedir) 1437 optlevel = 1 if __debug__ else 0 1438 ext = '.pyc' 1439 1440 with TemporaryFile() as t, \ 1441 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1442 zipfp.writepy(packagedir) 1443 1444 names = zipfp.namelist() 1445 self.assertIn('email/__init__' + ext, names) 1446 self.assertIn('email/mime/text' + ext, names) 1447 1448 def test_write_python_directory(self): 1449 os.mkdir(TESTFN2) 1450 try: 1451 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1452 fp.write("print(42)\n") 1453 1454 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1455 fp.write("print(42 * 42)\n") 1456 1457 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1458 fp.write("bla bla bla\n") 1459 1460 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1461 zipfp.writepy(TESTFN2) 1462 1463 names = zipfp.namelist() 1464 self.assertCompiledIn('mod1.py', names) 1465 self.assertCompiledIn('mod2.py', names) 1466 self.assertNotIn('mod2.txt', names) 1467 1468 finally: 1469 rmtree(TESTFN2) 1470 1471 def test_write_python_directory_filtered(self): 1472 os.mkdir(TESTFN2) 1473 try: 1474 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1475 fp.write("print(42)\n") 1476 1477 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1478 fp.write("print(42 * 42)\n") 1479 1480 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1481 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1482 not fn.endswith('mod2.py')) 1483 1484 names = zipfp.namelist() 1485 self.assertCompiledIn('mod1.py', names) 1486 self.assertNotIn('mod2.py', names) 1487 1488 finally: 1489 rmtree(TESTFN2) 1490 1491 def test_write_non_pyfile(self): 1492 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1493 with open(TESTFN, 'w', encoding='utf-8') as f: 1494 f.write('most definitely not a python file') 1495 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1496 unlink(TESTFN) 1497 1498 def test_write_pyfile_bad_syntax(self): 1499 os.mkdir(TESTFN2) 1500 try: 1501 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1502 fp.write("Bad syntax in python file\n") 1503 1504 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1505 # syntax errors are printed to stdout 1506 with captured_stdout() as s: 1507 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1508 1509 self.assertIn("SyntaxError", s.getvalue()) 1510 1511 # as it will not have compiled the python file, it will 1512 # include the .py file not .pyc 1513 names = zipfp.namelist() 1514 self.assertIn('mod1.py', names) 1515 self.assertNotIn('mod1.pyc', names) 1516 1517 finally: 1518 rmtree(TESTFN2) 1519 1520 def test_write_pathlike(self): 1521 os.mkdir(TESTFN2) 1522 try: 1523 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1524 fp.write("print(42)\n") 1525 1526 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1527 zipfp.writepy(FakePath(os.path.join(TESTFN2, "mod1.py"))) 1528 names = zipfp.namelist() 1529 self.assertCompiledIn('mod1.py', names) 1530 finally: 1531 rmtree(TESTFN2) 1532 1533 1534class ExtractTests(unittest.TestCase): 1535 1536 def make_test_file(self): 1537 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1538 for fpath, fdata in SMALL_TEST_DATA: 1539 zipfp.writestr(fpath, fdata) 1540 1541 def test_extract(self): 1542 with temp_cwd(): 1543 self.make_test_file() 1544 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1545 for fpath, fdata in SMALL_TEST_DATA: 1546 writtenfile = zipfp.extract(fpath) 1547 1548 # make sure it was written to the right place 1549 correctfile = os.path.join(os.getcwd(), fpath) 1550 correctfile = os.path.normpath(correctfile) 1551 1552 self.assertEqual(writtenfile, correctfile) 1553 1554 # make sure correct data is in correct file 1555 with open(writtenfile, "rb") as f: 1556 self.assertEqual(fdata.encode(), f.read()) 1557 1558 unlink(writtenfile) 1559 1560 def _test_extract_with_target(self, target): 1561 self.make_test_file() 1562 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1563 for fpath, fdata in SMALL_TEST_DATA: 1564 writtenfile = zipfp.extract(fpath, target) 1565 1566 # make sure it was written to the right place 1567 correctfile = os.path.join(target, fpath) 1568 correctfile = os.path.normpath(correctfile) 1569 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1570 1571 # make sure correct data is in correct file 1572 with open(writtenfile, "rb") as f: 1573 self.assertEqual(fdata.encode(), f.read()) 1574 1575 unlink(writtenfile) 1576 1577 unlink(TESTFN2) 1578 1579 def test_extract_with_target(self): 1580 with temp_dir() as extdir: 1581 self._test_extract_with_target(extdir) 1582 1583 def test_extract_with_target_pathlike(self): 1584 with temp_dir() as extdir: 1585 self._test_extract_with_target(FakePath(extdir)) 1586 1587 def test_extract_all(self): 1588 with temp_cwd(): 1589 self.make_test_file() 1590 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1591 zipfp.extractall() 1592 for fpath, fdata in SMALL_TEST_DATA: 1593 outfile = os.path.join(os.getcwd(), fpath) 1594 1595 with open(outfile, "rb") as f: 1596 self.assertEqual(fdata.encode(), f.read()) 1597 1598 unlink(outfile) 1599 1600 def _test_extract_all_with_target(self, target): 1601 self.make_test_file() 1602 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1603 zipfp.extractall(target) 1604 for fpath, fdata in SMALL_TEST_DATA: 1605 outfile = os.path.join(target, fpath) 1606 1607 with open(outfile, "rb") as f: 1608 self.assertEqual(fdata.encode(), f.read()) 1609 1610 unlink(outfile) 1611 1612 unlink(TESTFN2) 1613 1614 def test_extract_all_with_target(self): 1615 with temp_dir() as extdir: 1616 self._test_extract_all_with_target(extdir) 1617 1618 def test_extract_all_with_target_pathlike(self): 1619 with temp_dir() as extdir: 1620 self._test_extract_all_with_target(FakePath(extdir)) 1621 1622 def check_file(self, filename, content): 1623 self.assertTrue(os.path.isfile(filename)) 1624 with open(filename, 'rb') as f: 1625 self.assertEqual(f.read(), content) 1626 1627 def test_sanitize_windows_name(self): 1628 san = zipfile.ZipFile._sanitize_windows_name 1629 # Passing pathsep in allows this test to work regardless of platform. 1630 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1631 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1632 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1633 self.assertEqual(san(' / /foo / /ba r', '/'), r'foo/ba r') 1634 self.assertEqual(san(' . /. /foo ./ . /. ./ba .r', '/'), r'foo/ba .r') 1635 1636 def test_extract_hackers_arcnames_common_cases(self): 1637 common_hacknames = [ 1638 ('../foo/bar', 'foo/bar'), 1639 ('foo/../bar', 'foo/bar'), 1640 ('foo/../../bar', 'foo/bar'), 1641 ('foo/bar/..', 'foo/bar'), 1642 ('./../foo/bar', 'foo/bar'), 1643 ('/foo/bar', 'foo/bar'), 1644 ('/foo/../bar', 'foo/bar'), 1645 ('/foo/../../bar', 'foo/bar'), 1646 ] 1647 self._test_extract_hackers_arcnames(common_hacknames) 1648 1649 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1650 def test_extract_hackers_arcnames_windows_only(self): 1651 """Test combination of path fixing and windows name sanitization.""" 1652 windows_hacknames = [ 1653 (r'..\foo\bar', 'foo/bar'), 1654 (r'..\/foo\/bar', 'foo/bar'), 1655 (r'foo/\..\/bar', 'foo/bar'), 1656 (r'foo\/../\bar', 'foo/bar'), 1657 (r'C:foo/bar', 'foo/bar'), 1658 (r'C:/foo/bar', 'foo/bar'), 1659 (r'C://foo/bar', 'foo/bar'), 1660 (r'C:\foo\bar', 'foo/bar'), 1661 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1662 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1663 (r'///conky/mountpoint/foo/bar', 'mountpoint/foo/bar'), 1664 (r'\\\conky\mountpoint\foo\bar', 'mountpoint/foo/bar'), 1665 (r'//conky//mountpoint/foo/bar', 'mountpoint/foo/bar'), 1666 (r'\\conky\\mountpoint\foo\bar', 'mountpoint/foo/bar'), 1667 (r'//?/C:/foo/bar', 'foo/bar'), 1668 (r'\\?\C:\foo\bar', 'foo/bar'), 1669 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1670 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1671 ('../../foo../../ba..r', 'foo/ba..r'), 1672 ] 1673 self._test_extract_hackers_arcnames(windows_hacknames) 1674 1675 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1676 def test_extract_hackers_arcnames_posix_only(self): 1677 posix_hacknames = [ 1678 ('//foo/bar', 'foo/bar'), 1679 ('../../foo../../ba..r', 'foo../ba..r'), 1680 (r'foo/..\bar', r'foo/..\bar'), 1681 ] 1682 self._test_extract_hackers_arcnames(posix_hacknames) 1683 1684 def _test_extract_hackers_arcnames(self, hacknames): 1685 for arcname, fixedname in hacknames: 1686 content = b'foobar' + arcname.encode() 1687 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1688 zinfo = zipfile.ZipInfo() 1689 # preserve backslashes 1690 zinfo.filename = arcname 1691 zinfo.external_attr = 0o600 << 16 1692 zipfp.writestr(zinfo, content) 1693 1694 arcname = arcname.replace(os.sep, "/") 1695 targetpath = os.path.join('target', 'subdir', 'subsub') 1696 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1697 1698 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1699 writtenfile = zipfp.extract(arcname, targetpath) 1700 self.assertEqual(writtenfile, correctfile, 1701 msg='extract %r: %r != %r' % 1702 (arcname, writtenfile, correctfile)) 1703 self.check_file(correctfile, content) 1704 rmtree('target') 1705 1706 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1707 zipfp.extractall(targetpath) 1708 self.check_file(correctfile, content) 1709 rmtree('target') 1710 1711 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1712 1713 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1714 writtenfile = zipfp.extract(arcname) 1715 self.assertEqual(writtenfile, correctfile, 1716 msg="extract %r" % arcname) 1717 self.check_file(correctfile, content) 1718 rmtree(fixedname.split('/')[0]) 1719 1720 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1721 zipfp.extractall() 1722 self.check_file(correctfile, content) 1723 rmtree(fixedname.split('/')[0]) 1724 1725 unlink(TESTFN2) 1726 1727 1728class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase): 1729 testdir = TESTFN 1730 1731 @classmethod 1732 def setUpClass(cls): 1733 p = cls.ar_with_file = TESTFN + '-with-file.zip' 1734 cls.addClassCleanup(unlink, p) 1735 with zipfile.ZipFile(p, 'w') as zipfp: 1736 zipfp.writestr('test', b'newcontent') 1737 1738 p = cls.ar_with_dir = TESTFN + '-with-dir.zip' 1739 cls.addClassCleanup(unlink, p) 1740 with zipfile.ZipFile(p, 'w') as zipfp: 1741 zipfp.mkdir('test') 1742 1743 p = cls.ar_with_implicit_dir = TESTFN + '-with-implicit-dir.zip' 1744 cls.addClassCleanup(unlink, p) 1745 with zipfile.ZipFile(p, 'w') as zipfp: 1746 zipfp.writestr('test/file', b'newcontent') 1747 1748 def open(self, path): 1749 return zipfile.ZipFile(path, 'r') 1750 1751 def extractall(self, ar): 1752 ar.extractall(self.testdir) 1753 1754 1755class OtherTests(unittest.TestCase): 1756 def test_open_via_zip_info(self): 1757 # Create the ZIP archive 1758 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1759 zipfp.writestr("name", "foo") 1760 with self.assertWarns(UserWarning): 1761 zipfp.writestr("name", "bar") 1762 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1763 1764 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1765 infos = zipfp.infolist() 1766 data = b"" 1767 for info in infos: 1768 with zipfp.open(info) as zipopen: 1769 data += zipopen.read() 1770 self.assertIn(data, {b"foobar", b"barfoo"}) 1771 data = b"" 1772 for info in infos: 1773 data += zipfp.read(info) 1774 self.assertIn(data, {b"foobar", b"barfoo"}) 1775 1776 def test_writestr_extended_local_header_issue1202(self): 1777 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1778 for data in 'abcdefghijklmnop': 1779 zinfo = zipfile.ZipInfo(data) 1780 zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR # Include an extended local header. 1781 orig_zip.writestr(zinfo, data) 1782 1783 def test_close(self): 1784 """Check that the zipfile is closed after the 'with' block.""" 1785 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1786 for fpath, fdata in SMALL_TEST_DATA: 1787 zipfp.writestr(fpath, fdata) 1788 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1789 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1790 1791 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1792 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1793 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1794 1795 def test_close_on_exception(self): 1796 """Check that the zipfile is closed if an exception is raised in the 1797 'with' block.""" 1798 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1799 for fpath, fdata in SMALL_TEST_DATA: 1800 zipfp.writestr(fpath, fdata) 1801 1802 try: 1803 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1804 raise zipfile.BadZipFile() 1805 except zipfile.BadZipFile: 1806 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1807 1808 def test_unsupported_version(self): 1809 # File has an extract_version of 120 1810 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1811 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1812 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1813 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1814 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1815 1816 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1817 io.BytesIO(data), 'r') 1818 1819 @requires_zlib() 1820 def test_read_unicode_filenames(self): 1821 # bug #10801 1822 fname = findfile('zip_cp437_header.zip', subdir='archivetestdata') 1823 with zipfile.ZipFile(fname) as zipfp: 1824 for name in zipfp.namelist(): 1825 zipfp.open(name).close() 1826 1827 def test_write_unicode_filenames(self): 1828 with zipfile.ZipFile(TESTFN, "w") as zf: 1829 zf.writestr("foo.txt", "Test for unicode filename") 1830 zf.writestr("\xf6.txt", "Test for unicode filename") 1831 self.assertIsInstance(zf.infolist()[0].filename, str) 1832 1833 with zipfile.ZipFile(TESTFN, "r") as zf: 1834 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1835 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1836 1837 def create_zipfile_with_extra_data(self, filename, extra_data_name): 1838 with zipfile.ZipFile(TESTFN, mode='w') as zf: 1839 filename_encoded = filename.encode("utf-8") 1840 # create a ZipInfo object with Unicode path extra field 1841 zip_info = zipfile.ZipInfo(filename) 1842 1843 tag_for_unicode_path = b'\x75\x70' 1844 version_of_unicode_path = b'\x01' 1845 1846 import zlib 1847 filename_crc = struct.pack('<L', zlib.crc32(filename_encoded)) 1848 1849 extra_data = version_of_unicode_path + filename_crc + extra_data_name 1850 tsize = len(extra_data).to_bytes(2, 'little') 1851 1852 zip_info.extra = tag_for_unicode_path + tsize + extra_data 1853 1854 # add the file to the ZIP archive 1855 zf.writestr(zip_info, b'Hello World!') 1856 1857 @requires_zlib() 1858 def test_read_zipfile_containing_unicode_path_extra_field(self): 1859 self.create_zipfile_with_extra_data("이름.txt", "이름.txt".encode("utf-8")) 1860 with zipfile.ZipFile(TESTFN, "r") as zf: 1861 self.assertEqual(zf.filelist[0].filename, "이름.txt") 1862 1863 @requires_zlib() 1864 def test_read_zipfile_warning(self): 1865 self.create_zipfile_with_extra_data("이름.txt", b"") 1866 with self.assertWarns(UserWarning): 1867 zipfile.ZipFile(TESTFN, "r").close() 1868 1869 @requires_zlib() 1870 def test_read_zipfile_error(self): 1871 self.create_zipfile_with_extra_data("이름.txt", b"\xff") 1872 with self.assertRaises(zipfile.BadZipfile): 1873 zipfile.ZipFile(TESTFN, "r").close() 1874 1875 def test_read_after_write_unicode_filenames(self): 1876 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1877 zipfp.writestr('приклад', b'sample') 1878 self.assertEqual(zipfp.read('приклад'), b'sample') 1879 1880 def test_exclusive_create_zip_file(self): 1881 """Test exclusive creating a new zipfile.""" 1882 unlink(TESTFN2) 1883 filename = 'testfile.txt' 1884 content = b'hello, world. this is some content.' 1885 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1886 zipfp.writestr(filename, content) 1887 with self.assertRaises(FileExistsError): 1888 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1889 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1890 self.assertEqual(zipfp.namelist(), [filename]) 1891 self.assertEqual(zipfp.read(filename), content) 1892 1893 def test_create_non_existent_file_for_append(self): 1894 if os.path.exists(TESTFN): 1895 os.unlink(TESTFN) 1896 1897 filename = 'testfile.txt' 1898 content = b'hello, world. this is some content.' 1899 1900 try: 1901 with zipfile.ZipFile(TESTFN, 'a') as zf: 1902 zf.writestr(filename, content) 1903 except OSError: 1904 self.fail('Could not append data to a non-existent zip file.') 1905 1906 self.assertTrue(os.path.exists(TESTFN)) 1907 1908 with zipfile.ZipFile(TESTFN, 'r') as zf: 1909 self.assertEqual(zf.read(filename), content) 1910 1911 def test_close_erroneous_file(self): 1912 # This test checks that the ZipFile constructor closes the file object 1913 # it opens if there's an error in the file. If it doesn't, the 1914 # traceback holds a reference to the ZipFile object and, indirectly, 1915 # the file object. 1916 # On Windows, this causes the os.unlink() call to fail because the 1917 # underlying file is still open. This is SF bug #412214. 1918 # 1919 with open(TESTFN, "w", encoding="utf-8") as fp: 1920 fp.write("this is not a legal zip file\n") 1921 try: 1922 zf = zipfile.ZipFile(TESTFN) 1923 except zipfile.BadZipFile: 1924 pass 1925 1926 def test_is_zip_erroneous_file(self): 1927 """Check that is_zipfile() correctly identifies non-zip files.""" 1928 # - passing a filename 1929 with open(TESTFN, "w", encoding='utf-8') as fp: 1930 fp.write("this is not a legal zip file\n") 1931 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1932 # - passing a path-like object 1933 self.assertFalse(zipfile.is_zipfile(FakePath(TESTFN))) 1934 # - passing a file object 1935 with open(TESTFN, "rb") as fp: 1936 self.assertFalse(zipfile.is_zipfile(fp)) 1937 # - passing a file-like object 1938 fp = io.BytesIO() 1939 fp.write(b"this is not a legal zip file\n") 1940 self.assertFalse(zipfile.is_zipfile(fp)) 1941 fp.seek(0, 0) 1942 self.assertFalse(zipfile.is_zipfile(fp)) 1943 1944 def test_damaged_zipfile(self): 1945 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1946 # - Create a valid zip file 1947 fp = io.BytesIO() 1948 with zipfile.ZipFile(fp, mode="w") as zipf: 1949 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1950 zipfiledata = fp.getvalue() 1951 1952 # - Now create copies of it missing the last N bytes and make sure 1953 # a BadZipFile exception is raised when we try to open it 1954 for N in range(len(zipfiledata)): 1955 fp = io.BytesIO(zipfiledata[:N]) 1956 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1957 1958 def test_is_zip_valid_file(self): 1959 """Check that is_zipfile() correctly identifies zip files.""" 1960 # - passing a filename 1961 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1962 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1963 1964 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1965 # - passing a file object 1966 with open(TESTFN, "rb") as fp: 1967 self.assertTrue(zipfile.is_zipfile(fp)) 1968 fp.seek(0, 0) 1969 zip_contents = fp.read() 1970 # - passing a file-like object 1971 fp = io.BytesIO() 1972 fp.write(zip_contents) 1973 self.assertTrue(zipfile.is_zipfile(fp)) 1974 fp.seek(0, 0) 1975 self.assertTrue(zipfile.is_zipfile(fp)) 1976 1977 def test_non_existent_file_raises_OSError(self): 1978 # make sure we don't raise an AttributeError when a partially-constructed 1979 # ZipFile instance is finalized; this tests for regression on SF tracker 1980 # bug #403871. 1981 1982 # The bug we're testing for caused an AttributeError to be raised 1983 # when a ZipFile instance was created for a file that did not 1984 # exist; the .fp member was not initialized but was needed by the 1985 # __del__() method. Since the AttributeError is in the __del__(), 1986 # it is ignored, but the user should be sufficiently annoyed by 1987 # the message on the output that regression will be noticed 1988 # quickly. 1989 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1990 1991 def test_empty_file_raises_BadZipFile(self): 1992 f = open(TESTFN, 'w', encoding='utf-8') 1993 f.close() 1994 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1995 1996 with open(TESTFN, 'w', encoding='utf-8') as fp: 1997 fp.write("short file") 1998 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1999 2000 def test_negative_central_directory_offset_raises_BadZipFile(self): 2001 # Zip file containing an empty EOCD record 2002 buffer = bytearray(b'PK\x05\x06' + b'\0'*18) 2003 2004 # Set the size of the central directory bytes to become 1, 2005 # causing the central directory offset to become negative 2006 for dirsize in 1, 2**32-1: 2007 buffer[12:16] = struct.pack('<L', dirsize) 2008 f = io.BytesIO(buffer) 2009 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, f) 2010 2011 def test_closed_zip_raises_ValueError(self): 2012 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 2013 data = io.BytesIO() 2014 with zipfile.ZipFile(data, mode="w") as zipf: 2015 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2016 2017 # This is correct; calling .read on a closed ZipFile should raise 2018 # a ValueError, and so should calling .testzip. An earlier 2019 # version of .testzip would swallow this exception (and any other) 2020 # and report that the first file in the archive was corrupt. 2021 self.assertRaises(ValueError, zipf.read, "foo.txt") 2022 self.assertRaises(ValueError, zipf.open, "foo.txt") 2023 self.assertRaises(ValueError, zipf.testzip) 2024 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 2025 with open(TESTFN, 'w', encoding='utf-8') as f: 2026 f.write('zipfile test data') 2027 self.assertRaises(ValueError, zipf.write, TESTFN) 2028 2029 def test_bad_constructor_mode(self): 2030 """Check that bad modes passed to ZipFile constructor are caught.""" 2031 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 2032 2033 def test_bad_open_mode(self): 2034 """Check that bad modes passed to ZipFile.open are caught.""" 2035 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2036 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2037 2038 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 2039 # read the data to make sure the file is there 2040 zipf.read("foo.txt") 2041 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 2042 # universal newlines support is removed 2043 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 2044 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 2045 2046 def test_read0(self): 2047 """Check that calling read(0) on a ZipExtFile object returns an empty 2048 string and doesn't advance file pointer.""" 2049 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2050 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2051 # read the data to make sure the file is there 2052 with zipf.open("foo.txt") as f: 2053 for i in range(FIXEDTEST_SIZE): 2054 self.assertEqual(f.read(0), b'') 2055 2056 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 2057 2058 def test_open_non_existent_item(self): 2059 """Check that attempting to call open() for an item that doesn't 2060 exist in the archive raises a RuntimeError.""" 2061 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2062 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 2063 2064 def test_bad_compression_mode(self): 2065 """Check that bad compression methods passed to ZipFile.open are 2066 caught.""" 2067 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 2068 2069 def test_unsupported_compression(self): 2070 # data is declared as shrunk, but actually deflated 2071 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 2072 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 2073 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 2074 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2075 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 2076 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 2077 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2078 self.assertRaises(NotImplementedError, zipf.open, 'x') 2079 2080 def test_null_byte_in_filename(self): 2081 """Check that a filename containing a null byte is properly 2082 terminated.""" 2083 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2084 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 2085 self.assertEqual(zipf.namelist(), ['foo.txt']) 2086 2087 def test_struct_sizes(self): 2088 """Check that ZIP internal structure sizes are calculated correctly.""" 2089 self.assertEqual(zipfile.sizeEndCentDir, 22) 2090 self.assertEqual(zipfile.sizeCentralDir, 46) 2091 self.assertEqual(zipfile.sizeEndCentDir64, 56) 2092 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 2093 2094 def test_comments(self): 2095 """Check that comments on the archive are handled properly.""" 2096 2097 # check default comment is empty 2098 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2099 self.assertEqual(zipf.comment, b'') 2100 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2101 2102 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2103 self.assertEqual(zipfr.comment, b'') 2104 2105 # check a simple short comment 2106 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 2107 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2108 zipf.comment = comment 2109 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2110 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2111 self.assertEqual(zipf.comment, comment) 2112 2113 # check a comment of max length 2114 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 2115 comment2 = comment2.encode("ascii") 2116 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2117 zipf.comment = comment2 2118 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2119 2120 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2121 self.assertEqual(zipfr.comment, comment2) 2122 2123 # check a comment that is too long is truncated 2124 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2125 with self.assertWarns(UserWarning): 2126 zipf.comment = comment2 + b'oops' 2127 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2128 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2129 self.assertEqual(zipfr.comment, comment2) 2130 2131 # check that comments are correctly modified in append mode 2132 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 2133 zipf.comment = b"original comment" 2134 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2135 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 2136 zipf.comment = b"an updated comment" 2137 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 2138 self.assertEqual(zipf.comment, b"an updated comment") 2139 2140 # check that comments are correctly shortened in append mode 2141 # and the file is indeed truncated 2142 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 2143 zipf.comment = b"original comment that's longer" 2144 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2145 original_zip_size = os.path.getsize(TESTFN) 2146 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 2147 zipf.comment = b"shorter comment" 2148 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 2149 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 2150 self.assertEqual(zipf.comment, b"shorter comment") 2151 2152 def test_unicode_comment(self): 2153 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 2154 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2155 with self.assertRaises(TypeError): 2156 zipf.comment = "this is an error" 2157 2158 def test_change_comment_in_empty_archive(self): 2159 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 2160 self.assertFalse(zipf.filelist) 2161 zipf.comment = b"this is a comment" 2162 with zipfile.ZipFile(TESTFN, "r") as zipf: 2163 self.assertEqual(zipf.comment, b"this is a comment") 2164 2165 def test_change_comment_in_nonempty_archive(self): 2166 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 2167 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2168 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 2169 self.assertTrue(zipf.filelist) 2170 zipf.comment = b"this is a comment" 2171 with zipfile.ZipFile(TESTFN, "r") as zipf: 2172 self.assertEqual(zipf.comment, b"this is a comment") 2173 2174 def test_empty_zipfile(self): 2175 # Check that creating a file in 'w' or 'a' mode and closing without 2176 # adding any files to the archives creates a valid empty ZIP file 2177 zipf = zipfile.ZipFile(TESTFN, mode="w") 2178 zipf.close() 2179 try: 2180 zipf = zipfile.ZipFile(TESTFN, mode="r") 2181 except zipfile.BadZipFile: 2182 self.fail("Unable to create empty ZIP file in 'w' mode") 2183 2184 zipf = zipfile.ZipFile(TESTFN, mode="a") 2185 zipf.close() 2186 try: 2187 zipf = zipfile.ZipFile(TESTFN, mode="r") 2188 except: 2189 self.fail("Unable to create empty ZIP file in 'a' mode") 2190 2191 def test_open_empty_file(self): 2192 # Issue 1710703: Check that opening a file with less than 22 bytes 2193 # raises a BadZipFile exception (rather than the previously unhelpful 2194 # OSError) 2195 f = open(TESTFN, 'w', encoding='utf-8') 2196 f.close() 2197 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 2198 2199 def test_create_zipinfo_before_1980(self): 2200 self.assertRaises(ValueError, 2201 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 2202 2203 def test_create_empty_zipinfo_repr(self): 2204 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 2205 zi = zipfile.ZipInfo(filename="empty") 2206 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 2207 2208 def test_create_empty_zipinfo_default_attributes(self): 2209 """Ensure all required attributes are set.""" 2210 zi = zipfile.ZipInfo() 2211 self.assertEqual(zi.orig_filename, "NoName") 2212 self.assertEqual(zi.filename, "NoName") 2213 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 2214 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2215 self.assertEqual(zi.comment, b"") 2216 self.assertEqual(zi.extra, b"") 2217 self.assertIn(zi.create_system, (0, 3)) 2218 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 2219 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 2220 self.assertEqual(zi.reserved, 0) 2221 self.assertEqual(zi.flag_bits, 0) 2222 self.assertEqual(zi.volume, 0) 2223 self.assertEqual(zi.internal_attr, 0) 2224 self.assertEqual(zi.external_attr, 0) 2225 2226 # Before bpo-26185, both were missing 2227 self.assertEqual(zi.file_size, 0) 2228 self.assertEqual(zi.compress_size, 0) 2229 2230 def test_zipfile_with_short_extra_field(self): 2231 """If an extra field in the header is less than 4 bytes, skip it.""" 2232 zipdata = ( 2233 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 2234 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 2235 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 2236 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 2237 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 2238 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 2239 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 2240 ) 2241 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 2242 # testzip returns the name of the first corrupt file, or None 2243 self.assertIsNone(zipf.testzip()) 2244 2245 def test_open_conflicting_handles(self): 2246 # It's only possible to open one writable file handle at a time 2247 msg1 = b"It's fun to charter an accountant!" 2248 msg2 = b"And sail the wide accountant sea" 2249 msg3 = b"To find, explore the funds offshore" 2250 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 2251 with zipf.open('foo', mode='w') as w2: 2252 w2.write(msg1) 2253 with zipf.open('bar', mode='w') as w1: 2254 with self.assertRaises(ValueError): 2255 zipf.open('handle', mode='w') 2256 with self.assertRaises(ValueError): 2257 zipf.open('foo', mode='r') 2258 with self.assertRaises(ValueError): 2259 zipf.writestr('str', 'abcde') 2260 with self.assertRaises(ValueError): 2261 zipf.write(__file__, 'file') 2262 with self.assertRaises(ValueError): 2263 zipf.close() 2264 w1.write(msg2) 2265 with zipf.open('baz', mode='w') as w2: 2266 w2.write(msg3) 2267 2268 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 2269 self.assertEqual(zipf.read('foo'), msg1) 2270 self.assertEqual(zipf.read('bar'), msg2) 2271 self.assertEqual(zipf.read('baz'), msg3) 2272 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 2273 2274 def test_seek_tell(self): 2275 # Test seek functionality 2276 txt = b"Where's Bruce?" 2277 bloc = txt.find(b"Bruce") 2278 # Check seek on a file 2279 with zipfile.ZipFile(TESTFN, "w") as zipf: 2280 zipf.writestr("foo.txt", txt) 2281 with zipfile.ZipFile(TESTFN, "r") as zipf: 2282 with zipf.open("foo.txt", "r") as fp: 2283 fp.seek(bloc, os.SEEK_SET) 2284 self.assertEqual(fp.tell(), bloc) 2285 fp.seek(-bloc, os.SEEK_CUR) 2286 self.assertEqual(fp.tell(), 0) 2287 fp.seek(bloc, os.SEEK_CUR) 2288 self.assertEqual(fp.tell(), bloc) 2289 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2290 self.assertEqual(fp.tell(), bloc + 5) 2291 fp.seek(0, os.SEEK_END) 2292 self.assertEqual(fp.tell(), len(txt)) 2293 fp.seek(0, os.SEEK_SET) 2294 self.assertEqual(fp.tell(), 0) 2295 # Check seek on memory file 2296 data = io.BytesIO() 2297 with zipfile.ZipFile(data, mode="w") as zipf: 2298 zipf.writestr("foo.txt", txt) 2299 with zipfile.ZipFile(data, mode="r") as zipf: 2300 with zipf.open("foo.txt", "r") as fp: 2301 fp.seek(bloc, os.SEEK_SET) 2302 self.assertEqual(fp.tell(), bloc) 2303 fp.seek(-bloc, os.SEEK_CUR) 2304 self.assertEqual(fp.tell(), 0) 2305 fp.seek(bloc, os.SEEK_CUR) 2306 self.assertEqual(fp.tell(), bloc) 2307 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2308 self.assertEqual(fp.tell(), bloc + 5) 2309 fp.seek(0, os.SEEK_END) 2310 self.assertEqual(fp.tell(), len(txt)) 2311 fp.seek(0, os.SEEK_SET) 2312 self.assertEqual(fp.tell(), 0) 2313 2314 def test_read_after_seek(self): 2315 # Issue 102956: Make sure seek(x, os.SEEK_CUR) doesn't break read() 2316 txt = b"Charge men!" 2317 bloc = txt.find(b"men") 2318 with zipfile.ZipFile(TESTFN, "w") as zipf: 2319 zipf.writestr("foo.txt", txt) 2320 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 2321 with zipf.open("foo.txt", "r") as fp: 2322 fp.seek(bloc, os.SEEK_CUR) 2323 self.assertEqual(fp.read(-1), b'men!') 2324 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 2325 with zipf.open("foo.txt", "r") as fp: 2326 fp.read(6) 2327 fp.seek(1, os.SEEK_CUR) 2328 self.assertEqual(fp.read(-1), b'men!') 2329 2330 @requires_bz2() 2331 def test_decompress_without_3rd_party_library(self): 2332 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2333 zip_file = io.BytesIO(data) 2334 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2335 zf.writestr('a.txt', b'a') 2336 with mock.patch('zipfile.bz2', None): 2337 with zipfile.ZipFile(zip_file) as zf: 2338 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2339 2340 @requires_zlib() 2341 def test_full_overlap(self): 2342 data = ( 2343 b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e' 2344 b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00a\xed' 2345 b'\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\d\x0b`P' 2346 b'K\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2' 2347 b'\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00' 2348 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00aPK' 2349 b'\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e' 2350 b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00\x00' 2351 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00bPK\x05' 2352 b'\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00\x00/\x00\x00' 2353 b'\x00\x00\x00' 2354 ) 2355 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2356 self.assertEqual(zipf.namelist(), ['a', 'b']) 2357 zi = zipf.getinfo('a') 2358 self.assertEqual(zi.header_offset, 0) 2359 self.assertEqual(zi.compress_size, 16) 2360 self.assertEqual(zi.file_size, 1033) 2361 zi = zipf.getinfo('b') 2362 self.assertEqual(zi.header_offset, 0) 2363 self.assertEqual(zi.compress_size, 16) 2364 self.assertEqual(zi.file_size, 1033) 2365 self.assertEqual(len(zipf.read('a')), 1033) 2366 with self.assertRaisesRegex(zipfile.BadZipFile, 'File name.*differ'): 2367 zipf.read('b') 2368 2369 @requires_zlib() 2370 def test_quoted_overlap(self): 2371 data = ( 2372 b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05Y\xfc' 2373 b'8\x044\x00\x00\x00(\x04\x00\x00\x01\x00\x00\x00a\x00' 2374 b'\x1f\x00\xe0\xffPK\x03\x04\x14\x00\x00\x00\x08\x00\xa0l' 2375 b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00' 2376 b'\x00\x00b\xed\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\' 2377 b'd\x0b`PK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0' 2378 b'lH\x05Y\xfc8\x044\x00\x00\x00(\x04\x00\x00\x01' 2379 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2380 b'\x00aPK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0l' 2381 b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00' 2382 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00$\x00\x00\x00' 2383 b'bPK\x05\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00' 2384 b'\x00S\x00\x00\x00\x00\x00' 2385 ) 2386 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2387 self.assertEqual(zipf.namelist(), ['a', 'b']) 2388 zi = zipf.getinfo('a') 2389 self.assertEqual(zi.header_offset, 0) 2390 self.assertEqual(zi.compress_size, 52) 2391 self.assertEqual(zi.file_size, 1064) 2392 zi = zipf.getinfo('b') 2393 self.assertEqual(zi.header_offset, 36) 2394 self.assertEqual(zi.compress_size, 16) 2395 self.assertEqual(zi.file_size, 1033) 2396 with self.assertRaisesRegex(zipfile.BadZipFile, 'Overlapped entries'): 2397 zipf.read('a') 2398 self.assertEqual(len(zipf.read('b')), 1033) 2399 2400 def tearDown(self): 2401 unlink(TESTFN) 2402 unlink(TESTFN2) 2403 2404 2405class AbstractBadCrcTests: 2406 def test_testzip_with_bad_crc(self): 2407 """Tests that files with bad CRCs return their name from testzip.""" 2408 zipdata = self.zip_with_bad_crc 2409 2410 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2411 # testzip returns the name of the first corrupt file, or None 2412 self.assertEqual('afile', zipf.testzip()) 2413 2414 def test_read_with_bad_crc(self): 2415 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2416 zipdata = self.zip_with_bad_crc 2417 2418 # Using ZipFile.read() 2419 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2420 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2421 2422 # Using ZipExtFile.read() 2423 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2424 with zipf.open('afile', 'r') as corrupt_file: 2425 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2426 2427 # Same with small reads (in order to exercise the buffering logic) 2428 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2429 with zipf.open('afile', 'r') as corrupt_file: 2430 corrupt_file.MIN_READ_SIZE = 2 2431 with self.assertRaises(zipfile.BadZipFile): 2432 while corrupt_file.read(2): 2433 pass 2434 2435 2436class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2437 compression = zipfile.ZIP_STORED 2438 zip_with_bad_crc = ( 2439 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2440 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2441 b'ilehello,AworldP' 2442 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2443 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2444 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2445 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2446 b'\0\0/\0\0\0\0\0') 2447 2448@requires_zlib() 2449class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2450 compression = zipfile.ZIP_DEFLATED 2451 zip_with_bad_crc = ( 2452 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2453 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2454 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2455 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2456 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2457 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2458 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2459 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2460 2461@requires_bz2() 2462class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2463 compression = zipfile.ZIP_BZIP2 2464 zip_with_bad_crc = ( 2465 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2466 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2467 b'ileBZh91AY&SY\xd4\xa8\xca' 2468 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2469 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2470 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2471 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2472 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2473 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2474 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2475 b'\x00\x00\x00\x00') 2476 2477@requires_lzma() 2478class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2479 compression = zipfile.ZIP_LZMA 2480 zip_with_bad_crc = ( 2481 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2482 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2483 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2484 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2485 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2486 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2487 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2488 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2489 b'\x00>\x00\x00\x00\x00\x00') 2490 2491 2492class DecryptionTests(unittest.TestCase): 2493 """Check that ZIP decryption works. Since the library does not 2494 support encryption at the moment, we use a pre-generated encrypted 2495 ZIP file.""" 2496 2497 data = ( 2498 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2499 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2500 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2501 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2502 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2503 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2504 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2505 data2 = ( 2506 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2507 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2508 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2509 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2510 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2511 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2512 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2513 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2514 2515 plain = b'zipfile.py encryption test' 2516 plain2 = b'\x00'*512 2517 2518 def setUp(self): 2519 with open(TESTFN, "wb") as fp: 2520 fp.write(self.data) 2521 self.zip = zipfile.ZipFile(TESTFN, "r") 2522 with open(TESTFN2, "wb") as fp: 2523 fp.write(self.data2) 2524 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2525 2526 def tearDown(self): 2527 self.zip.close() 2528 os.unlink(TESTFN) 2529 self.zip2.close() 2530 os.unlink(TESTFN2) 2531 2532 def test_no_password(self): 2533 # Reading the encrypted file without password 2534 # must generate a RunTime exception 2535 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2536 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2537 2538 def test_bad_password(self): 2539 self.zip.setpassword(b"perl") 2540 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2541 self.zip2.setpassword(b"perl") 2542 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2543 2544 @requires_zlib() 2545 def test_good_password(self): 2546 self.zip.setpassword(b"python") 2547 self.assertEqual(self.zip.read("test.txt"), self.plain) 2548 self.zip2.setpassword(b"12345") 2549 self.assertEqual(self.zip2.read("zero"), self.plain2) 2550 2551 def test_unicode_password(self): 2552 expected_msg = "pwd: expected bytes, got str" 2553 2554 with self.assertRaisesRegex(TypeError, expected_msg): 2555 self.zip.setpassword("unicode") 2556 2557 with self.assertRaisesRegex(TypeError, expected_msg): 2558 self.zip.read("test.txt", "python") 2559 2560 with self.assertRaisesRegex(TypeError, expected_msg): 2561 self.zip.open("test.txt", pwd="python") 2562 2563 with self.assertRaisesRegex(TypeError, expected_msg): 2564 self.zip.extract("test.txt", pwd="python") 2565 2566 with self.assertRaisesRegex(TypeError, expected_msg): 2567 self.zip.pwd = "python" 2568 self.zip.open("test.txt") 2569 2570 def test_seek_tell(self): 2571 self.zip.setpassword(b"python") 2572 txt = self.plain 2573 test_word = b'encryption' 2574 bloc = txt.find(test_word) 2575 bloc_len = len(test_word) 2576 with self.zip.open("test.txt", "r") as fp: 2577 fp.seek(bloc, os.SEEK_SET) 2578 self.assertEqual(fp.tell(), bloc) 2579 fp.seek(-bloc, os.SEEK_CUR) 2580 self.assertEqual(fp.tell(), 0) 2581 fp.seek(bloc, os.SEEK_CUR) 2582 self.assertEqual(fp.tell(), bloc) 2583 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2584 2585 # Make sure that the second read after seeking back beyond 2586 # _readbuffer returns the same content (ie. rewind to the start of 2587 # the file to read forward to the required position). 2588 old_read_size = fp.MIN_READ_SIZE 2589 fp.MIN_READ_SIZE = 1 2590 fp._readbuffer = b'' 2591 fp._offset = 0 2592 fp.seek(0, os.SEEK_SET) 2593 self.assertEqual(fp.tell(), 0) 2594 fp.seek(bloc, os.SEEK_CUR) 2595 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2596 fp.MIN_READ_SIZE = old_read_size 2597 2598 fp.seek(0, os.SEEK_END) 2599 self.assertEqual(fp.tell(), len(txt)) 2600 fp.seek(0, os.SEEK_SET) 2601 self.assertEqual(fp.tell(), 0) 2602 2603 # Read the file completely to definitely call any eof integrity 2604 # checks (crc) and make sure they still pass. 2605 fp.read() 2606 2607 2608class AbstractTestsWithRandomBinaryFiles: 2609 @classmethod 2610 def setUpClass(cls): 2611 datacount = randint(16, 64)*1024 + randint(1, 1024) 2612 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2613 for i in range(datacount)) 2614 2615 def setUp(self): 2616 # Make a source file with some lines 2617 with open(TESTFN, "wb") as fp: 2618 fp.write(self.data) 2619 2620 def tearDown(self): 2621 unlink(TESTFN) 2622 unlink(TESTFN2) 2623 2624 def make_test_archive(self, f, compression): 2625 # Create the ZIP archive 2626 with zipfile.ZipFile(f, "w", compression) as zipfp: 2627 zipfp.write(TESTFN, "another.name") 2628 zipfp.write(TESTFN, TESTFN) 2629 2630 def zip_test(self, f, compression): 2631 self.make_test_archive(f, compression) 2632 2633 # Read the ZIP archive 2634 with zipfile.ZipFile(f, "r", compression) as zipfp: 2635 testdata = zipfp.read(TESTFN) 2636 self.assertEqual(len(testdata), len(self.data)) 2637 self.assertEqual(testdata, self.data) 2638 self.assertEqual(zipfp.read("another.name"), self.data) 2639 2640 def test_read(self): 2641 for f in get_files(self): 2642 self.zip_test(f, self.compression) 2643 2644 def zip_open_test(self, f, compression): 2645 self.make_test_archive(f, compression) 2646 2647 # Read the ZIP archive 2648 with zipfile.ZipFile(f, "r", compression) as zipfp: 2649 zipdata1 = [] 2650 with zipfp.open(TESTFN) as zipopen1: 2651 while True: 2652 read_data = zipopen1.read(256) 2653 if not read_data: 2654 break 2655 zipdata1.append(read_data) 2656 2657 zipdata2 = [] 2658 with zipfp.open("another.name") as zipopen2: 2659 while True: 2660 read_data = zipopen2.read(256) 2661 if not read_data: 2662 break 2663 zipdata2.append(read_data) 2664 2665 testdata1 = b''.join(zipdata1) 2666 self.assertEqual(len(testdata1), len(self.data)) 2667 self.assertEqual(testdata1, self.data) 2668 2669 testdata2 = b''.join(zipdata2) 2670 self.assertEqual(len(testdata2), len(self.data)) 2671 self.assertEqual(testdata2, self.data) 2672 2673 def test_open(self): 2674 for f in get_files(self): 2675 self.zip_open_test(f, self.compression) 2676 2677 def zip_random_open_test(self, f, compression): 2678 self.make_test_archive(f, compression) 2679 2680 # Read the ZIP archive 2681 with zipfile.ZipFile(f, "r", compression) as zipfp: 2682 zipdata1 = [] 2683 with zipfp.open(TESTFN) as zipopen1: 2684 while True: 2685 read_data = zipopen1.read(randint(1, 1024)) 2686 if not read_data: 2687 break 2688 zipdata1.append(read_data) 2689 2690 testdata = b''.join(zipdata1) 2691 self.assertEqual(len(testdata), len(self.data)) 2692 self.assertEqual(testdata, self.data) 2693 2694 def test_random_open(self): 2695 for f in get_files(self): 2696 self.zip_random_open_test(f, self.compression) 2697 2698 2699class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2700 unittest.TestCase): 2701 compression = zipfile.ZIP_STORED 2702 2703@requires_zlib() 2704class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2705 unittest.TestCase): 2706 compression = zipfile.ZIP_DEFLATED 2707 2708@requires_bz2() 2709class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2710 unittest.TestCase): 2711 compression = zipfile.ZIP_BZIP2 2712 2713@requires_lzma() 2714class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2715 unittest.TestCase): 2716 compression = zipfile.ZIP_LZMA 2717 2718 2719# Provide the tell() method but not seek() 2720class Tellable: 2721 def __init__(self, fp): 2722 self.fp = fp 2723 self.offset = 0 2724 2725 def write(self, data): 2726 n = self.fp.write(data) 2727 self.offset += n 2728 return n 2729 2730 def tell(self): 2731 return self.offset 2732 2733 def flush(self): 2734 self.fp.flush() 2735 2736class Unseekable: 2737 def __init__(self, fp): 2738 self.fp = fp 2739 2740 def write(self, data): 2741 return self.fp.write(data) 2742 2743 def flush(self): 2744 self.fp.flush() 2745 2746class UnseekableTests(unittest.TestCase): 2747 def test_writestr(self): 2748 for wrapper in (lambda f: f), Tellable, Unseekable: 2749 with self.subTest(wrapper=wrapper): 2750 f = io.BytesIO() 2751 f.write(b'abc') 2752 bf = io.BufferedWriter(f) 2753 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2754 zipfp.writestr('ones', b'111') 2755 zipfp.writestr('twos', b'222') 2756 self.assertEqual(f.getvalue()[:5], b'abcPK') 2757 with zipfile.ZipFile(f, mode='r') as zipf: 2758 with zipf.open('ones') as zopen: 2759 self.assertEqual(zopen.read(), b'111') 2760 with zipf.open('twos') as zopen: 2761 self.assertEqual(zopen.read(), b'222') 2762 2763 def test_write(self): 2764 for wrapper in (lambda f: f), Tellable, Unseekable: 2765 with self.subTest(wrapper=wrapper): 2766 f = io.BytesIO() 2767 f.write(b'abc') 2768 bf = io.BufferedWriter(f) 2769 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2770 self.addCleanup(unlink, TESTFN) 2771 with open(TESTFN, 'wb') as f2: 2772 f2.write(b'111') 2773 zipfp.write(TESTFN, 'ones') 2774 with open(TESTFN, 'wb') as f2: 2775 f2.write(b'222') 2776 zipfp.write(TESTFN, 'twos') 2777 self.assertEqual(f.getvalue()[:5], b'abcPK') 2778 with zipfile.ZipFile(f, mode='r') as zipf: 2779 with zipf.open('ones') as zopen: 2780 self.assertEqual(zopen.read(), b'111') 2781 with zipf.open('twos') as zopen: 2782 self.assertEqual(zopen.read(), b'222') 2783 2784 def test_open_write(self): 2785 for wrapper in (lambda f: f), Tellable, Unseekable: 2786 with self.subTest(wrapper=wrapper): 2787 f = io.BytesIO() 2788 f.write(b'abc') 2789 bf = io.BufferedWriter(f) 2790 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2791 with zipf.open('ones', 'w') as zopen: 2792 zopen.write(b'111') 2793 with zipf.open('twos', 'w') as zopen: 2794 zopen.write(b'222') 2795 self.assertEqual(f.getvalue()[:5], b'abcPK') 2796 with zipfile.ZipFile(f) as zipf: 2797 self.assertEqual(zipf.read('ones'), b'111') 2798 self.assertEqual(zipf.read('twos'), b'222') 2799 2800 2801@requires_zlib() 2802class TestsWithMultipleOpens(unittest.TestCase): 2803 @classmethod 2804 def setUpClass(cls): 2805 cls.data1 = b'111' + randbytes(10000) 2806 cls.data2 = b'222' + randbytes(10000) 2807 2808 def make_test_archive(self, f): 2809 # Create the ZIP archive 2810 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2811 zipfp.writestr('ones', self.data1) 2812 zipfp.writestr('twos', self.data2) 2813 2814 def test_same_file(self): 2815 # Verify that (when the ZipFile is in control of creating file objects) 2816 # multiple open() calls can be made without interfering with each other. 2817 for f in get_files(self): 2818 self.make_test_archive(f) 2819 with zipfile.ZipFile(f, mode="r") as zipf: 2820 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2821 data1 = zopen1.read(500) 2822 data2 = zopen2.read(500) 2823 data1 += zopen1.read() 2824 data2 += zopen2.read() 2825 self.assertEqual(data1, data2) 2826 self.assertEqual(data1, self.data1) 2827 2828 def test_different_file(self): 2829 # Verify that (when the ZipFile is in control of creating file objects) 2830 # multiple open() calls can be made without interfering with each other. 2831 for f in get_files(self): 2832 self.make_test_archive(f) 2833 with zipfile.ZipFile(f, mode="r") as zipf: 2834 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2835 data1 = zopen1.read(500) 2836 data2 = zopen2.read(500) 2837 data1 += zopen1.read() 2838 data2 += zopen2.read() 2839 self.assertEqual(data1, self.data1) 2840 self.assertEqual(data2, self.data2) 2841 2842 def test_interleaved(self): 2843 # Verify that (when the ZipFile is in control of creating file objects) 2844 # multiple open() calls can be made without interfering with each other. 2845 for f in get_files(self): 2846 self.make_test_archive(f) 2847 with zipfile.ZipFile(f, mode="r") as zipf: 2848 with zipf.open('ones') as zopen1: 2849 data1 = zopen1.read(500) 2850 with zipf.open('twos') as zopen2: 2851 data2 = zopen2.read(500) 2852 data1 += zopen1.read() 2853 data2 += zopen2.read() 2854 self.assertEqual(data1, self.data1) 2855 self.assertEqual(data2, self.data2) 2856 2857 def test_read_after_close(self): 2858 for f in get_files(self): 2859 self.make_test_archive(f) 2860 with contextlib.ExitStack() as stack: 2861 with zipfile.ZipFile(f, 'r') as zipf: 2862 zopen1 = stack.enter_context(zipf.open('ones')) 2863 zopen2 = stack.enter_context(zipf.open('twos')) 2864 data1 = zopen1.read(500) 2865 data2 = zopen2.read(500) 2866 data1 += zopen1.read() 2867 data2 += zopen2.read() 2868 self.assertEqual(data1, self.data1) 2869 self.assertEqual(data2, self.data2) 2870 2871 def test_read_after_write(self): 2872 for f in get_files(self): 2873 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2874 zipf.writestr('ones', self.data1) 2875 zipf.writestr('twos', self.data2) 2876 with zipf.open('ones') as zopen1: 2877 data1 = zopen1.read(500) 2878 self.assertEqual(data1, self.data1[:500]) 2879 with zipfile.ZipFile(f, 'r') as zipf: 2880 data1 = zipf.read('ones') 2881 data2 = zipf.read('twos') 2882 self.assertEqual(data1, self.data1) 2883 self.assertEqual(data2, self.data2) 2884 2885 def test_write_after_read(self): 2886 for f in get_files(self): 2887 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2888 zipf.writestr('ones', self.data1) 2889 with zipf.open('ones') as zopen1: 2890 zopen1.read(500) 2891 zipf.writestr('twos', self.data2) 2892 with zipfile.ZipFile(f, 'r') as zipf: 2893 data1 = zipf.read('ones') 2894 data2 = zipf.read('twos') 2895 self.assertEqual(data1, self.data1) 2896 self.assertEqual(data2, self.data2) 2897 2898 def test_many_opens(self): 2899 # Verify that read() and open() promptly close the file descriptor, 2900 # and don't rely on the garbage collector to free resources. 2901 startcount = fd_count() 2902 self.make_test_archive(TESTFN2) 2903 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2904 for x in range(100): 2905 zipf.read('ones') 2906 with zipf.open('ones') as zopen1: 2907 pass 2908 self.assertEqual(startcount, fd_count()) 2909 2910 def test_write_while_reading(self): 2911 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2912 zipf.writestr('ones', self.data1) 2913 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2914 with zipf.open('ones', 'r') as r1: 2915 data1 = r1.read(500) 2916 with zipf.open('twos', 'w') as w1: 2917 w1.write(self.data2) 2918 data1 += r1.read() 2919 self.assertEqual(data1, self.data1) 2920 with zipfile.ZipFile(TESTFN2) as zipf: 2921 self.assertEqual(zipf.read('twos'), self.data2) 2922 2923 def tearDown(self): 2924 unlink(TESTFN2) 2925 2926 2927class TestWithDirectory(unittest.TestCase): 2928 def setUp(self): 2929 os.mkdir(TESTFN2) 2930 2931 def test_extract_dir(self): 2932 with zipfile.ZipFile(findfile("zipdir.zip", subdir="archivetestdata")) as zipf: 2933 zipf.extractall(TESTFN2) 2934 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2935 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2936 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2937 2938 def test_bug_6050(self): 2939 # Extraction should succeed if directories already exist 2940 os.mkdir(os.path.join(TESTFN2, "a")) 2941 self.test_extract_dir() 2942 2943 def test_extract_dir_backslash(self): 2944 zfname = findfile("zipdir_backslash.zip", subdir="archivetestdata") 2945 with zipfile.ZipFile(zfname) as zipf: 2946 zipf.extractall(TESTFN2) 2947 if os.name == 'nt': 2948 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2949 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2950 self.assertTrue(os.path.isfile(os.path.join(TESTFN2, "a", "b", "c"))) 2951 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "d"))) 2952 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "d", "e"))) 2953 else: 2954 self.assertTrue(os.path.isfile(os.path.join(TESTFN2, "a\\b\\c"))) 2955 self.assertTrue(os.path.isfile(os.path.join(TESTFN2, "d\\e\\"))) 2956 self.assertFalse(os.path.exists(os.path.join(TESTFN2, "a"))) 2957 self.assertFalse(os.path.exists(os.path.join(TESTFN2, "d"))) 2958 2959 def test_write_dir(self): 2960 dirpath = os.path.join(TESTFN2, "x") 2961 os.mkdir(dirpath) 2962 mode = os.stat(dirpath).st_mode & 0xFFFF 2963 with zipfile.ZipFile(TESTFN, "w") as zipf: 2964 zipf.write(dirpath) 2965 zinfo = zipf.filelist[0] 2966 self.assertTrue(zinfo.filename.endswith("/x/")) 2967 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2968 zipf.write(dirpath, "y") 2969 zinfo = zipf.filelist[1] 2970 self.assertTrue(zinfo.filename, "y/") 2971 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2972 with zipfile.ZipFile(TESTFN, "r") as zipf: 2973 zinfo = zipf.filelist[0] 2974 self.assertTrue(zinfo.filename.endswith("/x/")) 2975 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2976 zinfo = zipf.filelist[1] 2977 self.assertTrue(zinfo.filename, "y/") 2978 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2979 target = os.path.join(TESTFN2, "target") 2980 os.mkdir(target) 2981 zipf.extractall(target) 2982 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2983 self.assertEqual(len(os.listdir(target)), 2) 2984 2985 def test_writestr_dir(self): 2986 os.mkdir(os.path.join(TESTFN2, "x")) 2987 with zipfile.ZipFile(TESTFN, "w") as zipf: 2988 zipf.writestr("x/", b'') 2989 zinfo = zipf.filelist[0] 2990 self.assertEqual(zinfo.filename, "x/") 2991 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2992 with zipfile.ZipFile(TESTFN, "r") as zipf: 2993 zinfo = zipf.filelist[0] 2994 self.assertTrue(zinfo.filename.endswith("x/")) 2995 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2996 target = os.path.join(TESTFN2, "target") 2997 os.mkdir(target) 2998 zipf.extractall(target) 2999 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 3000 self.assertEqual(os.listdir(target), ["x"]) 3001 3002 def test_mkdir(self): 3003 with zipfile.ZipFile(TESTFN, "w") as zf: 3004 zf.mkdir("directory") 3005 zinfo = zf.filelist[0] 3006 self.assertEqual(zinfo.filename, "directory/") 3007 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 3008 3009 zf.mkdir("directory2/") 3010 zinfo = zf.filelist[1] 3011 self.assertEqual(zinfo.filename, "directory2/") 3012 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 3013 3014 zf.mkdir("directory3", mode=0o777) 3015 zinfo = zf.filelist[2] 3016 self.assertEqual(zinfo.filename, "directory3/") 3017 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 3018 3019 old_zinfo = zipfile.ZipInfo("directory4/") 3020 old_zinfo.external_attr = (0o40777 << 16) | 0x10 3021 old_zinfo.CRC = 0 3022 old_zinfo.file_size = 0 3023 old_zinfo.compress_size = 0 3024 zf.mkdir(old_zinfo) 3025 new_zinfo = zf.filelist[3] 3026 self.assertEqual(old_zinfo.filename, "directory4/") 3027 self.assertEqual(old_zinfo.external_attr, new_zinfo.external_attr) 3028 3029 target = os.path.join(TESTFN2, "target") 3030 os.mkdir(target) 3031 zf.extractall(target) 3032 self.assertEqual(set(os.listdir(target)), {"directory", "directory2", "directory3", "directory4"}) 3033 3034 def test_create_directory_with_write(self): 3035 with zipfile.ZipFile(TESTFN, "w") as zf: 3036 zf.writestr(zipfile.ZipInfo('directory/'), '') 3037 3038 zinfo = zf.filelist[0] 3039 self.assertEqual(zinfo.filename, "directory/") 3040 3041 directory = os.path.join(TESTFN2, "directory2") 3042 os.mkdir(directory) 3043 mode = os.stat(directory).st_mode & 0xFFFF 3044 zf.write(directory, arcname="directory2/") 3045 zinfo = zf.filelist[1] 3046 self.assertEqual(zinfo.filename, "directory2/") 3047 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 3048 3049 target = os.path.join(TESTFN2, "target") 3050 os.mkdir(target) 3051 zf.extractall(target) 3052 3053 self.assertEqual(set(os.listdir(target)), {"directory", "directory2"}) 3054 3055 def test_root_folder_in_zipfile(self): 3056 """ 3057 gh-112795: Some tools or self constructed codes will add '/' folder to 3058 the zip file, this is a strange behavior, but we should support it. 3059 """ 3060 in_memory_file = io.BytesIO() 3061 zf = zipfile.ZipFile(in_memory_file, "w") 3062 zf.mkdir('/') 3063 zf.writestr('./a.txt', 'aaa') 3064 zf.extractall(TESTFN2) 3065 3066 def tearDown(self): 3067 rmtree(TESTFN2) 3068 if os.path.exists(TESTFN): 3069 unlink(TESTFN) 3070 3071 3072class ZipInfoTests(unittest.TestCase): 3073 def test_from_file(self): 3074 zi = zipfile.ZipInfo.from_file(__file__) 3075 self.assertEqual(posixpath.basename(zi.filename), 'test_core.py') 3076 self.assertFalse(zi.is_dir()) 3077 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 3078 3079 def test_from_file_pathlike(self): 3080 zi = zipfile.ZipInfo.from_file(FakePath(__file__)) 3081 self.assertEqual(posixpath.basename(zi.filename), 'test_core.py') 3082 self.assertFalse(zi.is_dir()) 3083 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 3084 3085 def test_from_file_bytes(self): 3086 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 3087 self.assertEqual(posixpath.basename(zi.filename), 'test') 3088 self.assertFalse(zi.is_dir()) 3089 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 3090 3091 def test_from_file_fileno(self): 3092 with open(__file__, 'rb') as f: 3093 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 3094 self.assertEqual(posixpath.basename(zi.filename), 'test') 3095 self.assertFalse(zi.is_dir()) 3096 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 3097 3098 def test_from_dir(self): 3099 dirpath = os.path.dirname(os.path.abspath(__file__)) 3100 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 3101 self.assertEqual(zi.filename, 'stdlib_tests/') 3102 self.assertTrue(zi.is_dir()) 3103 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 3104 self.assertEqual(zi.file_size, 0) 3105 3106 def test_compresslevel_property(self): 3107 zinfo = zipfile.ZipInfo("xxx") 3108 self.assertFalse(zinfo._compresslevel) 3109 self.assertFalse(zinfo.compress_level) 3110 zinfo._compresslevel = 99 # test the legacy @property.setter 3111 self.assertEqual(zinfo.compress_level, 99) 3112 self.assertEqual(zinfo._compresslevel, 99) 3113 zinfo.compress_level = 8 3114 self.assertEqual(zinfo.compress_level, 8) 3115 self.assertEqual(zinfo._compresslevel, 8) 3116 3117 3118class CommandLineTest(unittest.TestCase): 3119 3120 def zipfilecmd(self, *args, **kwargs): 3121 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 3122 **kwargs) 3123 return out.replace(os.linesep.encode(), b'\n') 3124 3125 def zipfilecmd_failure(self, *args): 3126 return script_helper.assert_python_failure('-m', 'zipfile', *args) 3127 3128 def test_bad_use(self): 3129 rc, out, err = self.zipfilecmd_failure() 3130 self.assertEqual(out, b'') 3131 self.assertIn(b'usage', err.lower()) 3132 self.assertIn(b'error', err.lower()) 3133 self.assertIn(b'required', err.lower()) 3134 rc, out, err = self.zipfilecmd_failure('-l', '') 3135 self.assertEqual(out, b'') 3136 self.assertNotEqual(err.strip(), b'') 3137 3138 def test_test_command(self): 3139 zip_name = findfile('zipdir.zip', subdir='archivetestdata') 3140 for opt in '-t', '--test': 3141 out = self.zipfilecmd(opt, zip_name) 3142 self.assertEqual(out.rstrip(), b'Done testing') 3143 zip_name = findfile('testtar.tar') 3144 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 3145 self.assertEqual(out, b'') 3146 3147 def test_list_command(self): 3148 zip_name = findfile('zipdir.zip', subdir='archivetestdata') 3149 t = io.StringIO() 3150 with zipfile.ZipFile(zip_name, 'r') as tf: 3151 tf.printdir(t) 3152 expected = t.getvalue().encode('ascii', 'backslashreplace') 3153 for opt in '-l', '--list': 3154 out = self.zipfilecmd(opt, zip_name, 3155 PYTHONIOENCODING='ascii:backslashreplace') 3156 self.assertEqual(out, expected) 3157 3158 @requires_zlib() 3159 def test_create_command(self): 3160 self.addCleanup(unlink, TESTFN) 3161 with open(TESTFN, 'w', encoding='utf-8') as f: 3162 f.write('test 1') 3163 os.mkdir(TESTFNDIR) 3164 self.addCleanup(rmtree, TESTFNDIR) 3165 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 3166 f.write('test 2') 3167 files = [TESTFN, TESTFNDIR] 3168 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 3169 for opt in '-c', '--create': 3170 try: 3171 out = self.zipfilecmd(opt, TESTFN2, *files) 3172 self.assertEqual(out, b'') 3173 with zipfile.ZipFile(TESTFN2) as zf: 3174 self.assertEqual(zf.namelist(), namelist) 3175 self.assertEqual(zf.read(namelist[0]), b'test 1') 3176 self.assertEqual(zf.read(namelist[2]), b'test 2') 3177 finally: 3178 unlink(TESTFN2) 3179 3180 def test_extract_command(self): 3181 zip_name = findfile('zipdir.zip', subdir='archivetestdata') 3182 for opt in '-e', '--extract': 3183 with temp_dir() as extdir: 3184 out = self.zipfilecmd(opt, zip_name, extdir) 3185 self.assertEqual(out, b'') 3186 with zipfile.ZipFile(zip_name) as zf: 3187 for zi in zf.infolist(): 3188 path = os.path.join(extdir, 3189 zi.filename.replace('/', os.sep)) 3190 if zi.is_dir(): 3191 self.assertTrue(os.path.isdir(path)) 3192 else: 3193 self.assertTrue(os.path.isfile(path)) 3194 with open(path, 'rb') as f: 3195 self.assertEqual(f.read(), zf.read(zi)) 3196 3197 3198class TestExecutablePrependedZip(unittest.TestCase): 3199 """Test our ability to open zip files with an executable prepended.""" 3200 3201 def setUp(self): 3202 self.exe_zip = findfile('exe_with_zip', subdir='archivetestdata') 3203 self.exe_zip64 = findfile('exe_with_z64', subdir='archivetestdata') 3204 3205 def _test_zip_works(self, name): 3206 # bpo28494 sanity check: ensure is_zipfile works on these. 3207 self.assertTrue(zipfile.is_zipfile(name), 3208 f'is_zipfile failed on {name}') 3209 # Ensure we can operate on these via ZipFile. 3210 with zipfile.ZipFile(name) as zipfp: 3211 for n in zipfp.namelist(): 3212 data = zipfp.read(n) 3213 self.assertIn(b'FAVORITE_NUMBER', data) 3214 3215 def test_read_zip_with_exe_prepended(self): 3216 self._test_zip_works(self.exe_zip) 3217 3218 def test_read_zip64_with_exe_prepended(self): 3219 self._test_zip_works(self.exe_zip64) 3220 3221 @unittest.skipUnless(sys.executable, 'sys.executable required.') 3222 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 3223 'Test relies on #!/bin/bash working.') 3224 @requires_subprocess() 3225 def test_execute_zip2(self): 3226 output = subprocess.check_output([self.exe_zip, sys.executable]) 3227 self.assertIn(b'number in executable: 5', output) 3228 3229 @unittest.skipUnless(sys.executable, 'sys.executable required.') 3230 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 3231 'Test relies on #!/bin/bash working.') 3232 @requires_subprocess() 3233 def test_execute_zip64(self): 3234 output = subprocess.check_output([self.exe_zip64, sys.executable]) 3235 self.assertIn(b'number in executable: 5', output) 3236 3237 3238class EncodedMetadataTests(unittest.TestCase): 3239 file_names = ['\u4e00', '\u4e8c', '\u4e09'] # Han 'one', 'two', 'three' 3240 file_content = [ 3241 "This is pure ASCII.\n".encode('ascii'), 3242 # This is modern Japanese. (UTF-8) 3243 "\u3053\u308c\u306f\u73fe\u4ee3\u7684\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('utf-8'), 3244 # This is obsolete Japanese. (Shift JIS) 3245 "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'), 3246 ] 3247 3248 def setUp(self): 3249 self.addCleanup(unlink, TESTFN) 3250 # Create .zip of 3 members with Han names encoded in Shift JIS. 3251 # Each name is 1 Han character encoding to 2 bytes in Shift JIS. 3252 # The ASCII names are arbitrary as long as they are length 2 and 3253 # not otherwise contained in the zip file. 3254 # Data elements are encoded bytes (ascii, utf-8, shift_jis). 3255 placeholders = ["n1", "n2"] + self.file_names[2:] 3256 with zipfile.ZipFile(TESTFN, mode="w") as tf: 3257 for temp, content in zip(placeholders, self.file_content): 3258 tf.writestr(temp, content, zipfile.ZIP_STORED) 3259 # Hack in the Shift JIS names with flag bit 11 (UTF-8) unset. 3260 with open(TESTFN, "rb") as tf: 3261 data = tf.read() 3262 for name, temp in zip(self.file_names, placeholders[:2]): 3263 data = data.replace(temp.encode('ascii'), 3264 name.encode('shift_jis')) 3265 with open(TESTFN, "wb") as tf: 3266 tf.write(data) 3267 3268 def _test_read(self, zipfp, expected_names, expected_content): 3269 # Check the namelist 3270 names = zipfp.namelist() 3271 self.assertEqual(sorted(names), sorted(expected_names)) 3272 3273 # Check infolist 3274 infos = zipfp.infolist() 3275 names = [zi.filename for zi in infos] 3276 self.assertEqual(sorted(names), sorted(expected_names)) 3277 3278 # check getinfo 3279 for name, content in zip(expected_names, expected_content): 3280 info = zipfp.getinfo(name) 3281 self.assertEqual(info.filename, name) 3282 self.assertEqual(info.file_size, len(content)) 3283 self.assertEqual(zipfp.read(name), content) 3284 3285 def test_read_with_metadata_encoding(self): 3286 # Read the ZIP archive with correct metadata_encoding 3287 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp: 3288 self._test_read(zipfp, self.file_names, self.file_content) 3289 3290 def test_read_without_metadata_encoding(self): 3291 # Read the ZIP archive without metadata_encoding 3292 expected_names = [name.encode('shift_jis').decode('cp437') 3293 for name in self.file_names[:2]] + self.file_names[2:] 3294 with zipfile.ZipFile(TESTFN, "r") as zipfp: 3295 self._test_read(zipfp, expected_names, self.file_content) 3296 3297 def test_read_with_incorrect_metadata_encoding(self): 3298 # Read the ZIP archive with incorrect metadata_encoding 3299 expected_names = [name.encode('shift_jis').decode('koi8-u') 3300 for name in self.file_names[:2]] + self.file_names[2:] 3301 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='koi8-u') as zipfp: 3302 self._test_read(zipfp, expected_names, self.file_content) 3303 3304 def test_read_with_unsuitable_metadata_encoding(self): 3305 # Read the ZIP archive with metadata_encoding unsuitable for 3306 # decoding metadata 3307 with self.assertRaises(UnicodeDecodeError): 3308 zipfile.ZipFile(TESTFN, "r", metadata_encoding='ascii') 3309 with self.assertRaises(UnicodeDecodeError): 3310 zipfile.ZipFile(TESTFN, "r", metadata_encoding='utf-8') 3311 3312 def test_read_after_append(self): 3313 newname = '\u56db' # Han 'four' 3314 expected_names = [name.encode('shift_jis').decode('cp437') 3315 for name in self.file_names[:2]] + self.file_names[2:] 3316 expected_names.append(newname) 3317 expected_content = (*self.file_content, b"newcontent") 3318 3319 with zipfile.ZipFile(TESTFN, "a") as zipfp: 3320 zipfp.writestr(newname, "newcontent") 3321 self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names)) 3322 3323 with zipfile.ZipFile(TESTFN, "r") as zipfp: 3324 self._test_read(zipfp, expected_names, expected_content) 3325 3326 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp: 3327 self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names)) 3328 for i, (name, content) in enumerate(zip(expected_names, expected_content)): 3329 info = zipfp.getinfo(name) 3330 self.assertEqual(info.filename, name) 3331 self.assertEqual(info.file_size, len(content)) 3332 if i < 2: 3333 with self.assertRaises(zipfile.BadZipFile): 3334 zipfp.read(name) 3335 else: 3336 self.assertEqual(zipfp.read(name), content) 3337 3338 def test_write_with_metadata_encoding(self): 3339 ZF = zipfile.ZipFile 3340 for mode in ("w", "x", "a"): 3341 with self.assertRaisesRegex(ValueError, 3342 "^metadata_encoding is only"): 3343 ZF("nonesuch.zip", mode, metadata_encoding="shift_jis") 3344 3345 def test_cli_with_metadata_encoding(self): 3346 errmsg = "Non-conforming encodings not supported with -c." 3347 args = ["--metadata-encoding=shift_jis", "-c", "nonesuch", "nonesuch"] 3348 with captured_stdout() as stdout: 3349 with captured_stderr() as stderr: 3350 self.assertRaises(SystemExit, zipfile.main, args) 3351 self.assertEqual(stdout.getvalue(), "") 3352 self.assertIn(errmsg, stderr.getvalue()) 3353 3354 with captured_stdout() as stdout: 3355 zipfile.main(["--metadata-encoding=shift_jis", "-t", TESTFN]) 3356 listing = stdout.getvalue() 3357 3358 with captured_stdout() as stdout: 3359 zipfile.main(["--metadata-encoding=shift_jis", "-l", TESTFN]) 3360 listing = stdout.getvalue() 3361 for name in self.file_names: 3362 self.assertIn(name, listing) 3363 3364 def test_cli_with_metadata_encoding_extract(self): 3365 os.mkdir(TESTFN2) 3366 self.addCleanup(rmtree, TESTFN2) 3367 # Depending on locale, extracted file names can be not encodable 3368 # with the filesystem encoding. 3369 for fn in self.file_names: 3370 try: 3371 os.stat(os.path.join(TESTFN2, fn)) 3372 except OSError: 3373 pass 3374 except UnicodeEncodeError: 3375 self.skipTest(f'cannot encode file name {fn!r}') 3376 3377 zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2]) 3378 listing = os.listdir(TESTFN2) 3379 for name in self.file_names: 3380 self.assertIn(name, listing) 3381 3382 3383class StripExtraTests(unittest.TestCase): 3384 # Note: all of the "z" characters are technically invalid, but up 3385 # to 3 bytes at the end of the extra will be passed through as they 3386 # are too short to encode a valid extra. 3387 3388 ZIP64_EXTRA = 1 3389 3390 def test_no_data(self): 3391 s = struct.Struct("<HH") 3392 a = s.pack(self.ZIP64_EXTRA, 0) 3393 b = s.pack(2, 0) 3394 c = s.pack(3, 0) 3395 3396 self.assertEqual(b'', zipfile._Extra.strip(a, (self.ZIP64_EXTRA,))) 3397 self.assertEqual(b, zipfile._Extra.strip(b, (self.ZIP64_EXTRA,))) 3398 self.assertEqual( 3399 b+b"z", zipfile._Extra.strip(b+b"z", (self.ZIP64_EXTRA,))) 3400 3401 self.assertEqual(b+c, zipfile._Extra.strip(a+b+c, (self.ZIP64_EXTRA,))) 3402 self.assertEqual(b+c, zipfile._Extra.strip(b+a+c, (self.ZIP64_EXTRA,))) 3403 self.assertEqual(b+c, zipfile._Extra.strip(b+c+a, (self.ZIP64_EXTRA,))) 3404 3405 def test_with_data(self): 3406 s = struct.Struct("<HH") 3407 a = s.pack(self.ZIP64_EXTRA, 1) + b"a" 3408 b = s.pack(2, 2) + b"bb" 3409 c = s.pack(3, 3) + b"ccc" 3410 3411 self.assertEqual(b"", zipfile._Extra.strip(a, (self.ZIP64_EXTRA,))) 3412 self.assertEqual(b, zipfile._Extra.strip(b, (self.ZIP64_EXTRA,))) 3413 self.assertEqual( 3414 b+b"z", zipfile._Extra.strip(b+b"z", (self.ZIP64_EXTRA,))) 3415 3416 self.assertEqual(b+c, zipfile._Extra.strip(a+b+c, (self.ZIP64_EXTRA,))) 3417 self.assertEqual(b+c, zipfile._Extra.strip(b+a+c, (self.ZIP64_EXTRA,))) 3418 self.assertEqual(b+c, zipfile._Extra.strip(b+c+a, (self.ZIP64_EXTRA,))) 3419 3420 def test_multiples(self): 3421 s = struct.Struct("<HH") 3422 a = s.pack(self.ZIP64_EXTRA, 1) + b"a" 3423 b = s.pack(2, 2) + b"bb" 3424 3425 self.assertEqual(b"", zipfile._Extra.strip(a+a, (self.ZIP64_EXTRA,))) 3426 self.assertEqual(b"", zipfile._Extra.strip(a+a+a, (self.ZIP64_EXTRA,))) 3427 self.assertEqual( 3428 b"z", zipfile._Extra.strip(a+a+b"z", (self.ZIP64_EXTRA,))) 3429 self.assertEqual( 3430 b+b"z", zipfile._Extra.strip(a+a+b+b"z", (self.ZIP64_EXTRA,))) 3431 3432 self.assertEqual(b, zipfile._Extra.strip(a+a+b, (self.ZIP64_EXTRA,))) 3433 self.assertEqual(b, zipfile._Extra.strip(a+b+a, (self.ZIP64_EXTRA,))) 3434 self.assertEqual(b, zipfile._Extra.strip(b+a+a, (self.ZIP64_EXTRA,))) 3435 3436 def test_too_short(self): 3437 self.assertEqual(b"", zipfile._Extra.strip(b"", (self.ZIP64_EXTRA,))) 3438 self.assertEqual(b"z", zipfile._Extra.strip(b"z", (self.ZIP64_EXTRA,))) 3439 self.assertEqual( 3440 b"zz", zipfile._Extra.strip(b"zz", (self.ZIP64_EXTRA,))) 3441 self.assertEqual( 3442 b"zzz", zipfile._Extra.strip(b"zzz", (self.ZIP64_EXTRA,))) 3443 3444 3445if __name__ == "__main__": 3446 unittest.main() 3447