1import array 2import contextlib 3import importlib.util 4import io 5import itertools 6import os 7import pathlib 8import posixpath 9import string 10import struct 11import subprocess 12import sys 13import time 14import unittest 15import unittest.mock as mock 16import zipfile 17import functools 18 19 20from tempfile import TemporaryFile 21from random import randint, random, randbytes 22 23from test.support import script_helper 24from test.support import (findfile, requires_zlib, requires_bz2, 25 requires_lzma, captured_stdout) 26from test.support.os_helper import TESTFN, unlink, rmtree, temp_dir, temp_cwd 27 28 29TESTFN2 = TESTFN + "2" 30TESTFNDIR = TESTFN + "d" 31FIXEDTEST_SIZE = 1000 32DATAFILES_DIR = 'zipfile_datafiles' 33 34SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 35 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 36 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 37 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 38 39def get_files(test): 40 yield TESTFN2 41 with TemporaryFile() as f: 42 yield f 43 test.assertFalse(f.closed) 44 with io.BytesIO() as f: 45 yield f 46 test.assertFalse(f.closed) 47 48class AbstractTestsWithSourceFile: 49 @classmethod 50 def setUpClass(cls): 51 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 52 (i, random()), "ascii") 53 for i in range(FIXEDTEST_SIZE)] 54 cls.data = b''.join(cls.line_gen) 55 56 def setUp(self): 57 # Make a source file with some lines 58 with open(TESTFN, "wb") as fp: 59 fp.write(self.data) 60 61 def make_test_archive(self, f, compression, compresslevel=None): 62 kwargs = {'compression': compression, 'compresslevel': compresslevel} 63 # Create the ZIP archive 64 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 65 zipfp.write(TESTFN, "another.name") 66 zipfp.write(TESTFN, TESTFN) 67 zipfp.writestr("strfile", self.data) 68 with zipfp.open('written-open-w', mode='w') as f: 69 for line in self.line_gen: 70 f.write(line) 71 72 def zip_test(self, f, compression, compresslevel=None): 73 self.make_test_archive(f, compression, compresslevel) 74 75 # Read the ZIP archive 76 with zipfile.ZipFile(f, "r", compression) as zipfp: 77 self.assertEqual(zipfp.read(TESTFN), self.data) 78 self.assertEqual(zipfp.read("another.name"), self.data) 79 self.assertEqual(zipfp.read("strfile"), self.data) 80 81 # Print the ZIP directory 82 fp = io.StringIO() 83 zipfp.printdir(file=fp) 84 directory = fp.getvalue() 85 lines = directory.splitlines() 86 self.assertEqual(len(lines), 5) # Number of files + header 87 88 self.assertIn('File Name', lines[0]) 89 self.assertIn('Modified', lines[0]) 90 self.assertIn('Size', lines[0]) 91 92 fn, date, time_, size = lines[1].split() 93 self.assertEqual(fn, 'another.name') 94 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 95 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 96 self.assertEqual(size, str(len(self.data))) 97 98 # Check the namelist 99 names = zipfp.namelist() 100 self.assertEqual(len(names), 4) 101 self.assertIn(TESTFN, names) 102 self.assertIn("another.name", names) 103 self.assertIn("strfile", names) 104 self.assertIn("written-open-w", names) 105 106 # Check infolist 107 infos = zipfp.infolist() 108 names = [i.filename for i in infos] 109 self.assertEqual(len(names), 4) 110 self.assertIn(TESTFN, names) 111 self.assertIn("another.name", names) 112 self.assertIn("strfile", names) 113 self.assertIn("written-open-w", names) 114 for i in infos: 115 self.assertEqual(i.file_size, len(self.data)) 116 117 # check getinfo 118 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 119 info = zipfp.getinfo(nm) 120 self.assertEqual(info.filename, nm) 121 self.assertEqual(info.file_size, len(self.data)) 122 123 # Check that testzip doesn't raise an exception 124 zipfp.testzip() 125 126 def test_basic(self): 127 for f in get_files(self): 128 self.zip_test(f, self.compression) 129 130 def zip_open_test(self, f, compression): 131 self.make_test_archive(f, compression) 132 133 # Read the ZIP archive 134 with zipfile.ZipFile(f, "r", compression) as zipfp: 135 zipdata1 = [] 136 with zipfp.open(TESTFN) as zipopen1: 137 while True: 138 read_data = zipopen1.read(256) 139 if not read_data: 140 break 141 zipdata1.append(read_data) 142 143 zipdata2 = [] 144 with zipfp.open("another.name") as zipopen2: 145 while True: 146 read_data = zipopen2.read(256) 147 if not read_data: 148 break 149 zipdata2.append(read_data) 150 151 self.assertEqual(b''.join(zipdata1), self.data) 152 self.assertEqual(b''.join(zipdata2), self.data) 153 154 def test_open(self): 155 for f in get_files(self): 156 self.zip_open_test(f, self.compression) 157 158 def test_open_with_pathlike(self): 159 path = pathlib.Path(TESTFN2) 160 self.zip_open_test(path, self.compression) 161 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 162 self.assertIsInstance(zipfp.filename, str) 163 164 def zip_random_open_test(self, f, compression): 165 self.make_test_archive(f, compression) 166 167 # Read the ZIP archive 168 with zipfile.ZipFile(f, "r", compression) as zipfp: 169 zipdata1 = [] 170 with zipfp.open(TESTFN) as zipopen1: 171 while True: 172 read_data = zipopen1.read(randint(1, 1024)) 173 if not read_data: 174 break 175 zipdata1.append(read_data) 176 177 self.assertEqual(b''.join(zipdata1), self.data) 178 179 def test_random_open(self): 180 for f in get_files(self): 181 self.zip_random_open_test(f, self.compression) 182 183 def zip_read1_test(self, f, compression): 184 self.make_test_archive(f, compression) 185 186 # Read the ZIP archive 187 with zipfile.ZipFile(f, "r") as zipfp, \ 188 zipfp.open(TESTFN) as zipopen: 189 zipdata = [] 190 while True: 191 read_data = zipopen.read1(-1) 192 if not read_data: 193 break 194 zipdata.append(read_data) 195 196 self.assertEqual(b''.join(zipdata), self.data) 197 198 def test_read1(self): 199 for f in get_files(self): 200 self.zip_read1_test(f, self.compression) 201 202 def zip_read1_10_test(self, f, compression): 203 self.make_test_archive(f, compression) 204 205 # Read the ZIP archive 206 with zipfile.ZipFile(f, "r") as zipfp, \ 207 zipfp.open(TESTFN) as zipopen: 208 zipdata = [] 209 while True: 210 read_data = zipopen.read1(10) 211 self.assertLessEqual(len(read_data), 10) 212 if not read_data: 213 break 214 zipdata.append(read_data) 215 216 self.assertEqual(b''.join(zipdata), self.data) 217 218 def test_read1_10(self): 219 for f in get_files(self): 220 self.zip_read1_10_test(f, self.compression) 221 222 def zip_readline_read_test(self, f, compression): 223 self.make_test_archive(f, compression) 224 225 # Read the ZIP archive 226 with zipfile.ZipFile(f, "r") as zipfp, \ 227 zipfp.open(TESTFN) as zipopen: 228 data = b'' 229 while True: 230 read = zipopen.readline() 231 if not read: 232 break 233 data += read 234 235 read = zipopen.read(100) 236 if not read: 237 break 238 data += read 239 240 self.assertEqual(data, self.data) 241 242 def test_readline_read(self): 243 # Issue #7610: calls to readline() interleaved with calls to read(). 244 for f in get_files(self): 245 self.zip_readline_read_test(f, self.compression) 246 247 def zip_readline_test(self, f, compression): 248 self.make_test_archive(f, compression) 249 250 # Read the ZIP archive 251 with zipfile.ZipFile(f, "r") as zipfp: 252 with zipfp.open(TESTFN) as zipopen: 253 for line in self.line_gen: 254 linedata = zipopen.readline() 255 self.assertEqual(linedata, line) 256 257 def test_readline(self): 258 for f in get_files(self): 259 self.zip_readline_test(f, self.compression) 260 261 def zip_readlines_test(self, f, compression): 262 self.make_test_archive(f, compression) 263 264 # Read the ZIP archive 265 with zipfile.ZipFile(f, "r") as zipfp: 266 with zipfp.open(TESTFN) as zipopen: 267 ziplines = zipopen.readlines() 268 for line, zipline in zip(self.line_gen, ziplines): 269 self.assertEqual(zipline, line) 270 271 def test_readlines(self): 272 for f in get_files(self): 273 self.zip_readlines_test(f, self.compression) 274 275 def zip_iterlines_test(self, f, compression): 276 self.make_test_archive(f, compression) 277 278 # Read the ZIP archive 279 with zipfile.ZipFile(f, "r") as zipfp: 280 with zipfp.open(TESTFN) as zipopen: 281 for line, zipline in zip(self.line_gen, zipopen): 282 self.assertEqual(zipline, line) 283 284 def test_iterlines(self): 285 for f in get_files(self): 286 self.zip_iterlines_test(f, self.compression) 287 288 def test_low_compression(self): 289 """Check for cases where compressed data is larger than original.""" 290 # Create the ZIP archive 291 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 292 zipfp.writestr("strfile", '12') 293 294 # Get an open object for strfile 295 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 296 with zipfp.open("strfile") as openobj: 297 self.assertEqual(openobj.read(1), b'1') 298 self.assertEqual(openobj.read(1), b'2') 299 300 def test_writestr_compression(self): 301 zipfp = zipfile.ZipFile(TESTFN2, "w") 302 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 303 info = zipfp.getinfo('b.txt') 304 self.assertEqual(info.compress_type, self.compression) 305 306 def test_writestr_compresslevel(self): 307 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 308 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 309 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 310 compresslevel=2) 311 312 # Compression level follows the constructor. 313 a_info = zipfp.getinfo('a.txt') 314 self.assertEqual(a_info.compress_type, self.compression) 315 self.assertEqual(a_info._compresslevel, 1) 316 317 # Compression level is overridden. 318 b_info = zipfp.getinfo('b.txt') 319 self.assertEqual(b_info.compress_type, self.compression) 320 self.assertEqual(b_info._compresslevel, 2) 321 322 def test_read_return_size(self): 323 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 324 # than requested. 325 for test_size in (1, 4095, 4096, 4097, 16384): 326 file_size = test_size + 1 327 junk = randbytes(file_size) 328 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 329 zipf.writestr('foo', junk) 330 with zipf.open('foo', 'r') as fp: 331 buf = fp.read(test_size) 332 self.assertEqual(len(buf), test_size) 333 334 def test_truncated_zipfile(self): 335 fp = io.BytesIO() 336 with zipfile.ZipFile(fp, mode='w') as zipf: 337 zipf.writestr('strfile', self.data, compress_type=self.compression) 338 end_offset = fp.tell() 339 zipfiledata = fp.getvalue() 340 341 fp = io.BytesIO(zipfiledata) 342 with zipfile.ZipFile(fp) as zipf: 343 with zipf.open('strfile') as zipopen: 344 fp.truncate(end_offset - 20) 345 with self.assertRaises(EOFError): 346 zipopen.read() 347 348 fp = io.BytesIO(zipfiledata) 349 with zipfile.ZipFile(fp) as zipf: 350 with zipf.open('strfile') as zipopen: 351 fp.truncate(end_offset - 20) 352 with self.assertRaises(EOFError): 353 while zipopen.read(100): 354 pass 355 356 fp = io.BytesIO(zipfiledata) 357 with zipfile.ZipFile(fp) as zipf: 358 with zipf.open('strfile') as zipopen: 359 fp.truncate(end_offset - 20) 360 with self.assertRaises(EOFError): 361 while zipopen.read1(100): 362 pass 363 364 def test_repr(self): 365 fname = 'file.name' 366 for f in get_files(self): 367 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 368 zipfp.write(TESTFN, fname) 369 r = repr(zipfp) 370 self.assertIn("mode='w'", r) 371 372 with zipfile.ZipFile(f, 'r') as zipfp: 373 r = repr(zipfp) 374 if isinstance(f, str): 375 self.assertIn('filename=%r' % f, r) 376 else: 377 self.assertIn('file=%r' % f, r) 378 self.assertIn("mode='r'", r) 379 r = repr(zipfp.getinfo(fname)) 380 self.assertIn('filename=%r' % fname, r) 381 self.assertIn('filemode=', r) 382 self.assertIn('file_size=', r) 383 if self.compression != zipfile.ZIP_STORED: 384 self.assertIn('compress_type=', r) 385 self.assertIn('compress_size=', r) 386 with zipfp.open(fname) as zipopen: 387 r = repr(zipopen) 388 self.assertIn('name=%r' % fname, r) 389 self.assertIn("mode='r'", r) 390 if self.compression != zipfile.ZIP_STORED: 391 self.assertIn('compress_type=', r) 392 self.assertIn('[closed]', repr(zipopen)) 393 self.assertIn('[closed]', repr(zipfp)) 394 395 def test_compresslevel_basic(self): 396 for f in get_files(self): 397 self.zip_test(f, self.compression, compresslevel=9) 398 399 def test_per_file_compresslevel(self): 400 """Check that files within a Zip archive can have different 401 compression levels.""" 402 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 403 zipfp.write(TESTFN, 'compress_1') 404 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 405 one_info = zipfp.getinfo('compress_1') 406 nine_info = zipfp.getinfo('compress_9') 407 self.assertEqual(one_info._compresslevel, 1) 408 self.assertEqual(nine_info._compresslevel, 9) 409 410 def test_writing_errors(self): 411 class BrokenFile(io.BytesIO): 412 def write(self, data): 413 nonlocal count 414 if count is not None: 415 if count == stop: 416 raise OSError 417 count += 1 418 super().write(data) 419 420 stop = 0 421 while True: 422 testfile = BrokenFile() 423 count = None 424 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 425 with zipfp.open('file1', 'w') as f: 426 f.write(b'data1') 427 count = 0 428 try: 429 with zipfp.open('file2', 'w') as f: 430 f.write(b'data2') 431 except OSError: 432 stop += 1 433 else: 434 break 435 finally: 436 count = None 437 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 438 self.assertEqual(zipfp.namelist(), ['file1']) 439 self.assertEqual(zipfp.read('file1'), b'data1') 440 441 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 442 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 443 self.assertEqual(zipfp.read('file1'), b'data1') 444 self.assertEqual(zipfp.read('file2'), b'data2') 445 446 447 def tearDown(self): 448 unlink(TESTFN) 449 unlink(TESTFN2) 450 451 452class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 453 unittest.TestCase): 454 compression = zipfile.ZIP_STORED 455 test_low_compression = None 456 457 def zip_test_writestr_permissions(self, f, compression): 458 # Make sure that writestr and open(... mode='w') create files with 459 # mode 0600, when they are passed a name rather than a ZipInfo 460 # instance. 461 462 self.make_test_archive(f, compression) 463 with zipfile.ZipFile(f, "r") as zipfp: 464 zinfo = zipfp.getinfo('strfile') 465 self.assertEqual(zinfo.external_attr, 0o600 << 16) 466 467 zinfo2 = zipfp.getinfo('written-open-w') 468 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 469 470 def test_writestr_permissions(self): 471 for f in get_files(self): 472 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 473 474 def test_absolute_arcnames(self): 475 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 476 zipfp.write(TESTFN, "/absolute") 477 478 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 479 self.assertEqual(zipfp.namelist(), ["absolute"]) 480 481 def test_append_to_zip_file(self): 482 """Test appending to an existing zipfile.""" 483 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 484 zipfp.write(TESTFN, TESTFN) 485 486 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 487 zipfp.writestr("strfile", self.data) 488 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 489 490 def test_append_to_non_zip_file(self): 491 """Test appending to an existing file that is not a zipfile.""" 492 # NOTE: this test fails if len(d) < 22 because of the first 493 # line "fpin.seek(-22, 2)" in _EndRecData 494 data = b'I am not a ZipFile!'*10 495 with open(TESTFN2, 'wb') as f: 496 f.write(data) 497 498 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 499 zipfp.write(TESTFN, TESTFN) 500 501 with open(TESTFN2, 'rb') as f: 502 f.seek(len(data)) 503 with zipfile.ZipFile(f, "r") as zipfp: 504 self.assertEqual(zipfp.namelist(), [TESTFN]) 505 self.assertEqual(zipfp.read(TESTFN), self.data) 506 with open(TESTFN2, 'rb') as f: 507 self.assertEqual(f.read(len(data)), data) 508 zipfiledata = f.read() 509 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 510 self.assertEqual(zipfp.namelist(), [TESTFN]) 511 self.assertEqual(zipfp.read(TESTFN), self.data) 512 513 def test_read_concatenated_zip_file(self): 514 with io.BytesIO() as bio: 515 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 516 zipfp.write(TESTFN, TESTFN) 517 zipfiledata = bio.getvalue() 518 data = b'I am not a ZipFile!'*10 519 with open(TESTFN2, 'wb') as f: 520 f.write(data) 521 f.write(zipfiledata) 522 523 with zipfile.ZipFile(TESTFN2) as zipfp: 524 self.assertEqual(zipfp.namelist(), [TESTFN]) 525 self.assertEqual(zipfp.read(TESTFN), self.data) 526 527 def test_append_to_concatenated_zip_file(self): 528 with io.BytesIO() as bio: 529 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 530 zipfp.write(TESTFN, TESTFN) 531 zipfiledata = bio.getvalue() 532 data = b'I am not a ZipFile!'*1000000 533 with open(TESTFN2, 'wb') as f: 534 f.write(data) 535 f.write(zipfiledata) 536 537 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 538 self.assertEqual(zipfp.namelist(), [TESTFN]) 539 zipfp.writestr('strfile', self.data) 540 541 with open(TESTFN2, 'rb') as f: 542 self.assertEqual(f.read(len(data)), data) 543 zipfiledata = f.read() 544 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 545 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 546 self.assertEqual(zipfp.read(TESTFN), self.data) 547 self.assertEqual(zipfp.read('strfile'), self.data) 548 549 def test_ignores_newline_at_end(self): 550 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 551 zipfp.write(TESTFN, TESTFN) 552 with open(TESTFN2, 'a', encoding='utf-8') as f: 553 f.write("\r\n\00\00\00") 554 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 555 self.assertIsInstance(zipfp, zipfile.ZipFile) 556 557 def test_ignores_stuff_appended_past_comments(self): 558 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 559 zipfp.comment = b"this is a comment" 560 zipfp.write(TESTFN, TESTFN) 561 with open(TESTFN2, 'a', encoding='utf-8') as f: 562 f.write("abcdef\r\n") 563 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 564 self.assertIsInstance(zipfp, zipfile.ZipFile) 565 self.assertEqual(zipfp.comment, b"this is a comment") 566 567 def test_write_default_name(self): 568 """Check that calling ZipFile.write without arcname specified 569 produces the expected result.""" 570 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 571 zipfp.write(TESTFN) 572 with open(TESTFN, "rb") as f: 573 self.assertEqual(zipfp.read(TESTFN), f.read()) 574 575 def test_io_on_closed_zipextfile(self): 576 fname = "somefile.txt" 577 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 578 zipfp.writestr(fname, "bogus") 579 580 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 581 with zipfp.open(fname) as fid: 582 fid.close() 583 self.assertRaises(ValueError, fid.read) 584 self.assertRaises(ValueError, fid.seek, 0) 585 self.assertRaises(ValueError, fid.tell) 586 self.assertRaises(ValueError, fid.readable) 587 self.assertRaises(ValueError, fid.seekable) 588 589 def test_write_to_readonly(self): 590 """Check that trying to call write() on a readonly ZipFile object 591 raises a ValueError.""" 592 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 593 zipfp.writestr("somefile.txt", "bogus") 594 595 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 596 self.assertRaises(ValueError, zipfp.write, TESTFN) 597 598 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 599 with self.assertRaises(ValueError): 600 zipfp.open(TESTFN, mode='w') 601 602 def test_add_file_before_1980(self): 603 # Set atime and mtime to 1970-01-01 604 os.utime(TESTFN, (0, 0)) 605 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 606 self.assertRaises(ValueError, zipfp.write, TESTFN) 607 608 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 609 zipfp.write(TESTFN) 610 zinfo = zipfp.getinfo(TESTFN) 611 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 612 613 def test_add_file_after_2107(self): 614 # Set atime and mtime to 2108-12-30 615 ts = 4386268800 616 try: 617 time.localtime(ts) 618 except OverflowError: 619 self.skipTest(f'time.localtime({ts}) raises OverflowError') 620 try: 621 os.utime(TESTFN, (ts, ts)) 622 except OverflowError: 623 self.skipTest('Host fs cannot set timestamp to required value.') 624 625 mtime_ns = os.stat(TESTFN).st_mtime_ns 626 if mtime_ns != (4386268800 * 10**9): 627 # XFS filesystem is limited to 32-bit timestamp, but the syscall 628 # didn't fail. Moreover, there is a VFS bug which returns 629 # a cached timestamp which is different than the value on disk. 630 # 631 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 632 # 633 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 634 # https://bugs.python.org/issue39460#msg360952 635 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 636 637 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 638 self.assertRaises(struct.error, zipfp.write, TESTFN) 639 640 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 641 zipfp.write(TESTFN) 642 zinfo = zipfp.getinfo(TESTFN) 643 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 644 645 646@requires_zlib() 647class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 648 unittest.TestCase): 649 compression = zipfile.ZIP_DEFLATED 650 651 def test_per_file_compression(self): 652 """Check that files within a Zip archive can have different 653 compression options.""" 654 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 655 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 656 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 657 sinfo = zipfp.getinfo('storeme') 658 dinfo = zipfp.getinfo('deflateme') 659 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 660 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 661 662@requires_bz2() 663class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 664 unittest.TestCase): 665 compression = zipfile.ZIP_BZIP2 666 667@requires_lzma() 668class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 669 unittest.TestCase): 670 compression = zipfile.ZIP_LZMA 671 672 673class AbstractTestZip64InSmallFiles: 674 # These tests test the ZIP64 functionality without using large files, 675 # see test_zipfile64 for proper tests. 676 677 @classmethod 678 def setUpClass(cls): 679 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 680 for i in range(0, FIXEDTEST_SIZE)) 681 cls.data = b'\n'.join(line_gen) 682 683 def setUp(self): 684 self._limit = zipfile.ZIP64_LIMIT 685 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 686 zipfile.ZIP64_LIMIT = 1000 687 zipfile.ZIP_FILECOUNT_LIMIT = 9 688 689 # Make a source file with some lines 690 with open(TESTFN, "wb") as fp: 691 fp.write(self.data) 692 693 def zip_test(self, f, compression): 694 # Create the ZIP archive 695 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 696 zipfp.write(TESTFN, "another.name") 697 zipfp.write(TESTFN, TESTFN) 698 zipfp.writestr("strfile", self.data) 699 700 # Read the ZIP archive 701 with zipfile.ZipFile(f, "r", compression) as zipfp: 702 self.assertEqual(zipfp.read(TESTFN), self.data) 703 self.assertEqual(zipfp.read("another.name"), self.data) 704 self.assertEqual(zipfp.read("strfile"), self.data) 705 706 # Print the ZIP directory 707 fp = io.StringIO() 708 zipfp.printdir(fp) 709 710 directory = fp.getvalue() 711 lines = directory.splitlines() 712 self.assertEqual(len(lines), 4) # Number of files + header 713 714 self.assertIn('File Name', lines[0]) 715 self.assertIn('Modified', lines[0]) 716 self.assertIn('Size', lines[0]) 717 718 fn, date, time_, size = lines[1].split() 719 self.assertEqual(fn, 'another.name') 720 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 721 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 722 self.assertEqual(size, str(len(self.data))) 723 724 # Check the namelist 725 names = zipfp.namelist() 726 self.assertEqual(len(names), 3) 727 self.assertIn(TESTFN, names) 728 self.assertIn("another.name", names) 729 self.assertIn("strfile", names) 730 731 # Check infolist 732 infos = zipfp.infolist() 733 names = [i.filename for i in infos] 734 self.assertEqual(len(names), 3) 735 self.assertIn(TESTFN, names) 736 self.assertIn("another.name", names) 737 self.assertIn("strfile", names) 738 for i in infos: 739 self.assertEqual(i.file_size, len(self.data)) 740 741 # check getinfo 742 for nm in (TESTFN, "another.name", "strfile"): 743 info = zipfp.getinfo(nm) 744 self.assertEqual(info.filename, nm) 745 self.assertEqual(info.file_size, len(self.data)) 746 747 # Check that testzip doesn't raise an exception 748 zipfp.testzip() 749 750 def test_basic(self): 751 for f in get_files(self): 752 self.zip_test(f, self.compression) 753 754 def test_too_many_files(self): 755 # This test checks that more than 64k files can be added to an archive, 756 # and that the resulting archive can be read properly by ZipFile 757 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 758 allowZip64=True) 759 zipf.debug = 100 760 numfiles = 15 761 for i in range(numfiles): 762 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 763 self.assertEqual(len(zipf.namelist()), numfiles) 764 zipf.close() 765 766 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 767 self.assertEqual(len(zipf2.namelist()), numfiles) 768 for i in range(numfiles): 769 content = zipf2.read("foo%08d" % i).decode('ascii') 770 self.assertEqual(content, "%d" % (i**3 % 57)) 771 zipf2.close() 772 773 def test_too_many_files_append(self): 774 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 775 allowZip64=False) 776 zipf.debug = 100 777 numfiles = 9 778 for i in range(numfiles): 779 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 780 self.assertEqual(len(zipf.namelist()), numfiles) 781 with self.assertRaises(zipfile.LargeZipFile): 782 zipf.writestr("foo%08d" % numfiles, b'') 783 self.assertEqual(len(zipf.namelist()), numfiles) 784 zipf.close() 785 786 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 787 allowZip64=False) 788 zipf.debug = 100 789 self.assertEqual(len(zipf.namelist()), numfiles) 790 with self.assertRaises(zipfile.LargeZipFile): 791 zipf.writestr("foo%08d" % numfiles, b'') 792 self.assertEqual(len(zipf.namelist()), numfiles) 793 zipf.close() 794 795 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 796 allowZip64=True) 797 zipf.debug = 100 798 self.assertEqual(len(zipf.namelist()), numfiles) 799 numfiles2 = 15 800 for i in range(numfiles, numfiles2): 801 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 802 self.assertEqual(len(zipf.namelist()), numfiles2) 803 zipf.close() 804 805 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 806 self.assertEqual(len(zipf2.namelist()), numfiles2) 807 for i in range(numfiles2): 808 content = zipf2.read("foo%08d" % i).decode('ascii') 809 self.assertEqual(content, "%d" % (i**3 % 57)) 810 zipf2.close() 811 812 def tearDown(self): 813 zipfile.ZIP64_LIMIT = self._limit 814 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 815 unlink(TESTFN) 816 unlink(TESTFN2) 817 818 819class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 820 unittest.TestCase): 821 compression = zipfile.ZIP_STORED 822 823 def large_file_exception_test(self, f, compression): 824 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 825 self.assertRaises(zipfile.LargeZipFile, 826 zipfp.write, TESTFN, "another.name") 827 828 def large_file_exception_test2(self, f, compression): 829 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 830 self.assertRaises(zipfile.LargeZipFile, 831 zipfp.writestr, "another.name", self.data) 832 833 def test_large_file_exception(self): 834 for f in get_files(self): 835 self.large_file_exception_test(f, zipfile.ZIP_STORED) 836 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 837 838 def test_absolute_arcnames(self): 839 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 840 allowZip64=True) as zipfp: 841 zipfp.write(TESTFN, "/absolute") 842 843 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 844 self.assertEqual(zipfp.namelist(), ["absolute"]) 845 846 def test_append(self): 847 # Test that appending to the Zip64 archive doesn't change 848 # extra fields of existing entries. 849 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 850 zipfp.writestr("strfile", self.data) 851 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 852 zinfo = zipfp.getinfo("strfile") 853 extra = zinfo.extra 854 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 855 zipfp.writestr("strfile2", self.data) 856 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 857 zinfo = zipfp.getinfo("strfile") 858 self.assertEqual(zinfo.extra, extra) 859 860 def make_zip64_file( 861 self, file_size_64_set=False, file_size_extra=False, 862 compress_size_64_set=False, compress_size_extra=False, 863 header_offset_64_set=False, header_offset_extra=False, 864 ): 865 """Generate bytes sequence for a zip with (incomplete) zip64 data. 866 867 The actual values (not the zip 64 0xffffffff values) stored in the file 868 are: 869 file_size: 8 870 compress_size: 8 871 header_offset: 0 872 """ 873 actual_size = 8 874 actual_header_offset = 0 875 local_zip64_fields = [] 876 central_zip64_fields = [] 877 878 file_size = actual_size 879 if file_size_64_set: 880 file_size = 0xffffffff 881 if file_size_extra: 882 local_zip64_fields.append(actual_size) 883 central_zip64_fields.append(actual_size) 884 file_size = struct.pack("<L", file_size) 885 886 compress_size = actual_size 887 if compress_size_64_set: 888 compress_size = 0xffffffff 889 if compress_size_extra: 890 local_zip64_fields.append(actual_size) 891 central_zip64_fields.append(actual_size) 892 compress_size = struct.pack("<L", compress_size) 893 894 header_offset = actual_header_offset 895 if header_offset_64_set: 896 header_offset = 0xffffffff 897 if header_offset_extra: 898 central_zip64_fields.append(actual_header_offset) 899 header_offset = struct.pack("<L", header_offset) 900 901 local_extra = struct.pack( 902 '<HH' + 'Q'*len(local_zip64_fields), 903 0x0001, 904 8*len(local_zip64_fields), 905 *local_zip64_fields 906 ) 907 908 central_extra = struct.pack( 909 '<HH' + 'Q'*len(central_zip64_fields), 910 0x0001, 911 8*len(central_zip64_fields), 912 *central_zip64_fields 913 ) 914 915 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 916 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 917 918 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 919 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 920 921 filename = b"test.txt" 922 content = b"test1234" 923 filename_length = struct.pack("<H", len(filename)) 924 zip64_contents = ( 925 # Local file header 926 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 927 + compress_size 928 + file_size 929 + filename_length 930 + local_extra_length 931 + filename 932 + local_extra 933 + content 934 # Central directory: 935 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 936 + compress_size 937 + file_size 938 + filename_length 939 + central_extra_length 940 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 941 + header_offset 942 + filename 943 + central_extra 944 # Zip64 end of central directory 945 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 946 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 947 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 948 + central_dir_size 949 + offset_to_central_dir 950 # Zip64 end of central directory locator 951 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 952 + b"\x00\x00\x00" 953 # end of central directory 954 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 955 + b"\x00\x00\x00\x00" 956 ) 957 return zip64_contents 958 959 def test_bad_zip64_extra(self): 960 """Missing zip64 extra records raises an exception. 961 962 There are 4 fields that the zip64 format handles (the disk number is 963 not used in this module and so is ignored here). According to the zip 964 spec: 965 The order of the fields in the zip64 extended 966 information record is fixed, but the fields MUST 967 only appear if the corresponding Local or Central 968 directory record field is set to 0xFFFF or 0xFFFFFFFF. 969 970 If the zip64 extra content doesn't contain enough entries for the 971 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 972 This test mismatches the length of the zip64 extra field and the number 973 of fields set to indicate the presence of zip64 data. 974 """ 975 # zip64 file size present, no fields in extra, expecting one, equals 976 # missing file size. 977 missing_file_size_extra = self.make_zip64_file( 978 file_size_64_set=True, 979 ) 980 with self.assertRaises(zipfile.BadZipFile) as e: 981 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 982 self.assertIn('file size', str(e.exception).lower()) 983 984 # zip64 file size present, zip64 compress size present, one field in 985 # extra, expecting two, equals missing compress size. 986 missing_compress_size_extra = self.make_zip64_file( 987 file_size_64_set=True, 988 file_size_extra=True, 989 compress_size_64_set=True, 990 ) 991 with self.assertRaises(zipfile.BadZipFile) as e: 992 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 993 self.assertIn('compress size', str(e.exception).lower()) 994 995 # zip64 compress size present, no fields in extra, expecting one, 996 # equals missing compress size. 997 missing_compress_size_extra = self.make_zip64_file( 998 compress_size_64_set=True, 999 ) 1000 with self.assertRaises(zipfile.BadZipFile) as e: 1001 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1002 self.assertIn('compress size', str(e.exception).lower()) 1003 1004 # zip64 file size present, zip64 compress size present, zip64 header 1005 # offset present, two fields in extra, expecting three, equals missing 1006 # header offset 1007 missing_header_offset_extra = self.make_zip64_file( 1008 file_size_64_set=True, 1009 file_size_extra=True, 1010 compress_size_64_set=True, 1011 compress_size_extra=True, 1012 header_offset_64_set=True, 1013 ) 1014 with self.assertRaises(zipfile.BadZipFile) as e: 1015 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1016 self.assertIn('header offset', str(e.exception).lower()) 1017 1018 # zip64 compress size present, zip64 header offset present, one field 1019 # in extra, expecting two, equals missing header offset 1020 missing_header_offset_extra = self.make_zip64_file( 1021 file_size_64_set=False, 1022 compress_size_64_set=True, 1023 compress_size_extra=True, 1024 header_offset_64_set=True, 1025 ) 1026 with self.assertRaises(zipfile.BadZipFile) as e: 1027 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1028 self.assertIn('header offset', str(e.exception).lower()) 1029 1030 # zip64 file size present, zip64 header offset present, one field in 1031 # extra, expecting two, equals missing header offset 1032 missing_header_offset_extra = self.make_zip64_file( 1033 file_size_64_set=True, 1034 file_size_extra=True, 1035 compress_size_64_set=False, 1036 header_offset_64_set=True, 1037 ) 1038 with self.assertRaises(zipfile.BadZipFile) as e: 1039 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1040 self.assertIn('header offset', str(e.exception).lower()) 1041 1042 # zip64 header offset present, no fields in extra, expecting one, 1043 # equals missing header offset 1044 missing_header_offset_extra = self.make_zip64_file( 1045 file_size_64_set=False, 1046 compress_size_64_set=False, 1047 header_offset_64_set=True, 1048 ) 1049 with self.assertRaises(zipfile.BadZipFile) as e: 1050 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1051 self.assertIn('header offset', str(e.exception).lower()) 1052 1053 def test_generated_valid_zip64_extra(self): 1054 # These values are what is set in the make_zip64_file method. 1055 expected_file_size = 8 1056 expected_compress_size = 8 1057 expected_header_offset = 0 1058 expected_content = b"test1234" 1059 1060 # Loop through the various valid combinations of zip64 masks 1061 # present and extra fields present. 1062 params = ( 1063 {"file_size_64_set": True, "file_size_extra": True}, 1064 {"compress_size_64_set": True, "compress_size_extra": True}, 1065 {"header_offset_64_set": True, "header_offset_extra": True}, 1066 ) 1067 1068 for r in range(1, len(params) + 1): 1069 for combo in itertools.combinations(params, r): 1070 kwargs = {} 1071 for c in combo: 1072 kwargs.update(c) 1073 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1074 zinfo = zf.infolist()[0] 1075 self.assertEqual(zinfo.file_size, expected_file_size) 1076 self.assertEqual(zinfo.compress_size, expected_compress_size) 1077 self.assertEqual(zinfo.header_offset, expected_header_offset) 1078 self.assertEqual(zf.read(zinfo), expected_content) 1079 1080 1081@requires_zlib() 1082class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1083 unittest.TestCase): 1084 compression = zipfile.ZIP_DEFLATED 1085 1086@requires_bz2() 1087class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1088 unittest.TestCase): 1089 compression = zipfile.ZIP_BZIP2 1090 1091@requires_lzma() 1092class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1093 unittest.TestCase): 1094 compression = zipfile.ZIP_LZMA 1095 1096 1097class AbstractWriterTests: 1098 1099 def tearDown(self): 1100 unlink(TESTFN2) 1101 1102 def test_close_after_close(self): 1103 data = b'content' 1104 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1105 w = zipf.open('test', 'w') 1106 w.write(data) 1107 w.close() 1108 self.assertTrue(w.closed) 1109 w.close() 1110 self.assertTrue(w.closed) 1111 self.assertEqual(zipf.read('test'), data) 1112 1113 def test_write_after_close(self): 1114 data = b'content' 1115 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1116 w = zipf.open('test', 'w') 1117 w.write(data) 1118 w.close() 1119 self.assertTrue(w.closed) 1120 self.assertRaises(ValueError, w.write, b'') 1121 self.assertEqual(zipf.read('test'), data) 1122 1123 def test_issue44439(self): 1124 q = array.array('Q', [1, 2, 3, 4, 5]) 1125 LENGTH = len(q) * q.itemsize 1126 with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip: 1127 with zip.open('data', 'w') as data: 1128 self.assertEqual(data.write(q), LENGTH) 1129 self.assertEqual(zip.getinfo('data').file_size, LENGTH) 1130 1131class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1132 compression = zipfile.ZIP_STORED 1133 1134@requires_zlib() 1135class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1136 compression = zipfile.ZIP_DEFLATED 1137 1138@requires_bz2() 1139class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1140 compression = zipfile.ZIP_BZIP2 1141 1142@requires_lzma() 1143class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1144 compression = zipfile.ZIP_LZMA 1145 1146 1147class PyZipFileTests(unittest.TestCase): 1148 def assertCompiledIn(self, name, namelist): 1149 if name + 'o' not in namelist: 1150 self.assertIn(name + 'c', namelist) 1151 1152 def requiresWriteAccess(self, path): 1153 # effective_ids unavailable on windows 1154 if not os.access(path, os.W_OK, 1155 effective_ids=os.access in os.supports_effective_ids): 1156 self.skipTest('requires write access to the installed location') 1157 filename = os.path.join(path, 'test_zipfile.try') 1158 try: 1159 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1160 os.close(fd) 1161 except Exception: 1162 self.skipTest('requires write access to the installed location') 1163 unlink(filename) 1164 1165 def test_write_pyfile(self): 1166 self.requiresWriteAccess(os.path.dirname(__file__)) 1167 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1168 fn = __file__ 1169 if fn.endswith('.pyc'): 1170 path_split = fn.split(os.sep) 1171 if os.altsep is not None: 1172 path_split.extend(fn.split(os.altsep)) 1173 if '__pycache__' in path_split: 1174 fn = importlib.util.source_from_cache(fn) 1175 else: 1176 fn = fn[:-1] 1177 1178 zipfp.writepy(fn) 1179 1180 bn = os.path.basename(fn) 1181 self.assertNotIn(bn, zipfp.namelist()) 1182 self.assertCompiledIn(bn, zipfp.namelist()) 1183 1184 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1185 fn = __file__ 1186 if fn.endswith('.pyc'): 1187 fn = fn[:-1] 1188 1189 zipfp.writepy(fn, "testpackage") 1190 1191 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1192 self.assertNotIn(bn, zipfp.namelist()) 1193 self.assertCompiledIn(bn, zipfp.namelist()) 1194 1195 def test_write_python_package(self): 1196 import email 1197 packagedir = os.path.dirname(email.__file__) 1198 self.requiresWriteAccess(packagedir) 1199 1200 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1201 zipfp.writepy(packagedir) 1202 1203 # Check for a couple of modules at different levels of the 1204 # hierarchy 1205 names = zipfp.namelist() 1206 self.assertCompiledIn('email/__init__.py', names) 1207 self.assertCompiledIn('email/mime/text.py', names) 1208 1209 def test_write_filtered_python_package(self): 1210 import test 1211 packagedir = os.path.dirname(test.__file__) 1212 self.requiresWriteAccess(packagedir) 1213 1214 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1215 1216 # first make sure that the test folder gives error messages 1217 # (on the badsyntax_... files) 1218 with captured_stdout() as reportSIO: 1219 zipfp.writepy(packagedir) 1220 reportStr = reportSIO.getvalue() 1221 self.assertTrue('SyntaxError' in reportStr) 1222 1223 # then check that the filter works on the whole package 1224 with captured_stdout() as reportSIO: 1225 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1226 reportStr = reportSIO.getvalue() 1227 self.assertTrue('SyntaxError' not in reportStr) 1228 1229 # then check that the filter works on individual files 1230 def filter(path): 1231 return not os.path.basename(path).startswith("bad") 1232 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1233 zipfp.writepy(packagedir, filterfunc=filter) 1234 reportStr = reportSIO.getvalue() 1235 if reportStr: 1236 print(reportStr) 1237 self.assertTrue('SyntaxError' not in reportStr) 1238 1239 def test_write_with_optimization(self): 1240 import email 1241 packagedir = os.path.dirname(email.__file__) 1242 self.requiresWriteAccess(packagedir) 1243 optlevel = 1 if __debug__ else 0 1244 ext = '.pyc' 1245 1246 with TemporaryFile() as t, \ 1247 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1248 zipfp.writepy(packagedir) 1249 1250 names = zipfp.namelist() 1251 self.assertIn('email/__init__' + ext, names) 1252 self.assertIn('email/mime/text' + ext, names) 1253 1254 def test_write_python_directory(self): 1255 os.mkdir(TESTFN2) 1256 try: 1257 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1258 fp.write("print(42)\n") 1259 1260 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1261 fp.write("print(42 * 42)\n") 1262 1263 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1264 fp.write("bla bla bla\n") 1265 1266 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1267 zipfp.writepy(TESTFN2) 1268 1269 names = zipfp.namelist() 1270 self.assertCompiledIn('mod1.py', names) 1271 self.assertCompiledIn('mod2.py', names) 1272 self.assertNotIn('mod2.txt', names) 1273 1274 finally: 1275 rmtree(TESTFN2) 1276 1277 def test_write_python_directory_filtered(self): 1278 os.mkdir(TESTFN2) 1279 try: 1280 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1281 fp.write("print(42)\n") 1282 1283 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1284 fp.write("print(42 * 42)\n") 1285 1286 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1287 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1288 not fn.endswith('mod2.py')) 1289 1290 names = zipfp.namelist() 1291 self.assertCompiledIn('mod1.py', names) 1292 self.assertNotIn('mod2.py', names) 1293 1294 finally: 1295 rmtree(TESTFN2) 1296 1297 def test_write_non_pyfile(self): 1298 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1299 with open(TESTFN, 'w', encoding='utf-8') as f: 1300 f.write('most definitely not a python file') 1301 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1302 unlink(TESTFN) 1303 1304 def test_write_pyfile_bad_syntax(self): 1305 os.mkdir(TESTFN2) 1306 try: 1307 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1308 fp.write("Bad syntax in python file\n") 1309 1310 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1311 # syntax errors are printed to stdout 1312 with captured_stdout() as s: 1313 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1314 1315 self.assertIn("SyntaxError", s.getvalue()) 1316 1317 # as it will not have compiled the python file, it will 1318 # include the .py file not .pyc 1319 names = zipfp.namelist() 1320 self.assertIn('mod1.py', names) 1321 self.assertNotIn('mod1.pyc', names) 1322 1323 finally: 1324 rmtree(TESTFN2) 1325 1326 def test_write_pathlike(self): 1327 os.mkdir(TESTFN2) 1328 try: 1329 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1330 fp.write("print(42)\n") 1331 1332 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1333 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1334 names = zipfp.namelist() 1335 self.assertCompiledIn('mod1.py', names) 1336 finally: 1337 rmtree(TESTFN2) 1338 1339 1340class ExtractTests(unittest.TestCase): 1341 1342 def make_test_file(self): 1343 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1344 for fpath, fdata in SMALL_TEST_DATA: 1345 zipfp.writestr(fpath, fdata) 1346 1347 def test_extract(self): 1348 with temp_cwd(): 1349 self.make_test_file() 1350 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1351 for fpath, fdata in SMALL_TEST_DATA: 1352 writtenfile = zipfp.extract(fpath) 1353 1354 # make sure it was written to the right place 1355 correctfile = os.path.join(os.getcwd(), fpath) 1356 correctfile = os.path.normpath(correctfile) 1357 1358 self.assertEqual(writtenfile, correctfile) 1359 1360 # make sure correct data is in correct file 1361 with open(writtenfile, "rb") as f: 1362 self.assertEqual(fdata.encode(), f.read()) 1363 1364 unlink(writtenfile) 1365 1366 def _test_extract_with_target(self, target): 1367 self.make_test_file() 1368 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1369 for fpath, fdata in SMALL_TEST_DATA: 1370 writtenfile = zipfp.extract(fpath, target) 1371 1372 # make sure it was written to the right place 1373 correctfile = os.path.join(target, fpath) 1374 correctfile = os.path.normpath(correctfile) 1375 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1376 1377 # make sure correct data is in correct file 1378 with open(writtenfile, "rb") as f: 1379 self.assertEqual(fdata.encode(), f.read()) 1380 1381 unlink(writtenfile) 1382 1383 unlink(TESTFN2) 1384 1385 def test_extract_with_target(self): 1386 with temp_dir() as extdir: 1387 self._test_extract_with_target(extdir) 1388 1389 def test_extract_with_target_pathlike(self): 1390 with temp_dir() as extdir: 1391 self._test_extract_with_target(pathlib.Path(extdir)) 1392 1393 def test_extract_all(self): 1394 with temp_cwd(): 1395 self.make_test_file() 1396 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1397 zipfp.extractall() 1398 for fpath, fdata in SMALL_TEST_DATA: 1399 outfile = os.path.join(os.getcwd(), fpath) 1400 1401 with open(outfile, "rb") as f: 1402 self.assertEqual(fdata.encode(), f.read()) 1403 1404 unlink(outfile) 1405 1406 def _test_extract_all_with_target(self, target): 1407 self.make_test_file() 1408 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1409 zipfp.extractall(target) 1410 for fpath, fdata in SMALL_TEST_DATA: 1411 outfile = os.path.join(target, fpath) 1412 1413 with open(outfile, "rb") as f: 1414 self.assertEqual(fdata.encode(), f.read()) 1415 1416 unlink(outfile) 1417 1418 unlink(TESTFN2) 1419 1420 def test_extract_all_with_target(self): 1421 with temp_dir() as extdir: 1422 self._test_extract_all_with_target(extdir) 1423 1424 def test_extract_all_with_target_pathlike(self): 1425 with temp_dir() as extdir: 1426 self._test_extract_all_with_target(pathlib.Path(extdir)) 1427 1428 def check_file(self, filename, content): 1429 self.assertTrue(os.path.isfile(filename)) 1430 with open(filename, 'rb') as f: 1431 self.assertEqual(f.read(), content) 1432 1433 def test_sanitize_windows_name(self): 1434 san = zipfile.ZipFile._sanitize_windows_name 1435 # Passing pathsep in allows this test to work regardless of platform. 1436 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1437 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1438 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1439 1440 def test_extract_hackers_arcnames_common_cases(self): 1441 common_hacknames = [ 1442 ('../foo/bar', 'foo/bar'), 1443 ('foo/../bar', 'foo/bar'), 1444 ('foo/../../bar', 'foo/bar'), 1445 ('foo/bar/..', 'foo/bar'), 1446 ('./../foo/bar', 'foo/bar'), 1447 ('/foo/bar', 'foo/bar'), 1448 ('/foo/../bar', 'foo/bar'), 1449 ('/foo/../../bar', 'foo/bar'), 1450 ] 1451 self._test_extract_hackers_arcnames(common_hacknames) 1452 1453 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1454 def test_extract_hackers_arcnames_windows_only(self): 1455 """Test combination of path fixing and windows name sanitization.""" 1456 windows_hacknames = [ 1457 (r'..\foo\bar', 'foo/bar'), 1458 (r'..\/foo\/bar', 'foo/bar'), 1459 (r'foo/\..\/bar', 'foo/bar'), 1460 (r'foo\/../\bar', 'foo/bar'), 1461 (r'C:foo/bar', 'foo/bar'), 1462 (r'C:/foo/bar', 'foo/bar'), 1463 (r'C://foo/bar', 'foo/bar'), 1464 (r'C:\foo\bar', 'foo/bar'), 1465 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1466 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1467 (r'///conky/mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1468 (r'\\\conky\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1469 (r'//conky//mountpoint/foo/bar', 'conky/mountpoint/foo/bar'), 1470 (r'\\conky\\mountpoint\foo\bar', 'conky/mountpoint/foo/bar'), 1471 (r'//?/C:/foo/bar', 'foo/bar'), 1472 (r'\\?\C:\foo\bar', 'foo/bar'), 1473 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1474 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1475 ('../../foo../../ba..r', 'foo/ba..r'), 1476 ] 1477 self._test_extract_hackers_arcnames(windows_hacknames) 1478 1479 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1480 def test_extract_hackers_arcnames_posix_only(self): 1481 posix_hacknames = [ 1482 ('//foo/bar', 'foo/bar'), 1483 ('../../foo../../ba..r', 'foo../ba..r'), 1484 (r'foo/..\bar', r'foo/..\bar'), 1485 ] 1486 self._test_extract_hackers_arcnames(posix_hacknames) 1487 1488 def _test_extract_hackers_arcnames(self, hacknames): 1489 for arcname, fixedname in hacknames: 1490 content = b'foobar' + arcname.encode() 1491 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1492 zinfo = zipfile.ZipInfo() 1493 # preserve backslashes 1494 zinfo.filename = arcname 1495 zinfo.external_attr = 0o600 << 16 1496 zipfp.writestr(zinfo, content) 1497 1498 arcname = arcname.replace(os.sep, "/") 1499 targetpath = os.path.join('target', 'subdir', 'subsub') 1500 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1501 1502 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1503 writtenfile = zipfp.extract(arcname, targetpath) 1504 self.assertEqual(writtenfile, correctfile, 1505 msg='extract %r: %r != %r' % 1506 (arcname, writtenfile, correctfile)) 1507 self.check_file(correctfile, content) 1508 rmtree('target') 1509 1510 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1511 zipfp.extractall(targetpath) 1512 self.check_file(correctfile, content) 1513 rmtree('target') 1514 1515 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1516 1517 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1518 writtenfile = zipfp.extract(arcname) 1519 self.assertEqual(writtenfile, correctfile, 1520 msg="extract %r" % arcname) 1521 self.check_file(correctfile, content) 1522 rmtree(fixedname.split('/')[0]) 1523 1524 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1525 zipfp.extractall() 1526 self.check_file(correctfile, content) 1527 rmtree(fixedname.split('/')[0]) 1528 1529 unlink(TESTFN2) 1530 1531 1532class OtherTests(unittest.TestCase): 1533 def test_open_via_zip_info(self): 1534 # Create the ZIP archive 1535 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1536 zipfp.writestr("name", "foo") 1537 with self.assertWarns(UserWarning): 1538 zipfp.writestr("name", "bar") 1539 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1540 1541 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1542 infos = zipfp.infolist() 1543 data = b"" 1544 for info in infos: 1545 with zipfp.open(info) as zipopen: 1546 data += zipopen.read() 1547 self.assertIn(data, {b"foobar", b"barfoo"}) 1548 data = b"" 1549 for info in infos: 1550 data += zipfp.read(info) 1551 self.assertIn(data, {b"foobar", b"barfoo"}) 1552 1553 def test_writestr_extended_local_header_issue1202(self): 1554 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1555 for data in 'abcdefghijklmnop': 1556 zinfo = zipfile.ZipInfo(data) 1557 zinfo.flag_bits |= 0x08 # Include an extended local header. 1558 orig_zip.writestr(zinfo, data) 1559 1560 def test_close(self): 1561 """Check that the zipfile is closed after the 'with' block.""" 1562 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1563 for fpath, fdata in SMALL_TEST_DATA: 1564 zipfp.writestr(fpath, fdata) 1565 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1566 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1567 1568 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1569 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1570 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1571 1572 def test_close_on_exception(self): 1573 """Check that the zipfile is closed if an exception is raised in the 1574 'with' block.""" 1575 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1576 for fpath, fdata in SMALL_TEST_DATA: 1577 zipfp.writestr(fpath, fdata) 1578 1579 try: 1580 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1581 raise zipfile.BadZipFile() 1582 except zipfile.BadZipFile: 1583 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1584 1585 def test_unsupported_version(self): 1586 # File has an extract_version of 120 1587 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1588 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1589 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1590 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1591 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1592 1593 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1594 io.BytesIO(data), 'r') 1595 1596 @requires_zlib() 1597 def test_read_unicode_filenames(self): 1598 # bug #10801 1599 fname = findfile('zip_cp437_header.zip') 1600 with zipfile.ZipFile(fname) as zipfp: 1601 for name in zipfp.namelist(): 1602 zipfp.open(name).close() 1603 1604 def test_write_unicode_filenames(self): 1605 with zipfile.ZipFile(TESTFN, "w") as zf: 1606 zf.writestr("foo.txt", "Test for unicode filename") 1607 zf.writestr("\xf6.txt", "Test for unicode filename") 1608 self.assertIsInstance(zf.infolist()[0].filename, str) 1609 1610 with zipfile.ZipFile(TESTFN, "r") as zf: 1611 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1612 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1613 1614 def test_read_after_write_unicode_filenames(self): 1615 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1616 zipfp.writestr('приклад', b'sample') 1617 self.assertEqual(zipfp.read('приклад'), b'sample') 1618 1619 def test_exclusive_create_zip_file(self): 1620 """Test exclusive creating a new zipfile.""" 1621 unlink(TESTFN2) 1622 filename = 'testfile.txt' 1623 content = b'hello, world. this is some content.' 1624 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1625 zipfp.writestr(filename, content) 1626 with self.assertRaises(FileExistsError): 1627 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1628 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1629 self.assertEqual(zipfp.namelist(), [filename]) 1630 self.assertEqual(zipfp.read(filename), content) 1631 1632 def test_create_non_existent_file_for_append(self): 1633 if os.path.exists(TESTFN): 1634 os.unlink(TESTFN) 1635 1636 filename = 'testfile.txt' 1637 content = b'hello, world. this is some content.' 1638 1639 try: 1640 with zipfile.ZipFile(TESTFN, 'a') as zf: 1641 zf.writestr(filename, content) 1642 except OSError: 1643 self.fail('Could not append data to a non-existent zip file.') 1644 1645 self.assertTrue(os.path.exists(TESTFN)) 1646 1647 with zipfile.ZipFile(TESTFN, 'r') as zf: 1648 self.assertEqual(zf.read(filename), content) 1649 1650 def test_close_erroneous_file(self): 1651 # This test checks that the ZipFile constructor closes the file object 1652 # it opens if there's an error in the file. If it doesn't, the 1653 # traceback holds a reference to the ZipFile object and, indirectly, 1654 # the file object. 1655 # On Windows, this causes the os.unlink() call to fail because the 1656 # underlying file is still open. This is SF bug #412214. 1657 # 1658 with open(TESTFN, "w", encoding="utf-8") as fp: 1659 fp.write("this is not a legal zip file\n") 1660 try: 1661 zf = zipfile.ZipFile(TESTFN) 1662 except zipfile.BadZipFile: 1663 pass 1664 1665 def test_is_zip_erroneous_file(self): 1666 """Check that is_zipfile() correctly identifies non-zip files.""" 1667 # - passing a filename 1668 with open(TESTFN, "w", encoding='utf-8') as fp: 1669 fp.write("this is not a legal zip file\n") 1670 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1671 # - passing a path-like object 1672 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1673 # - passing a file object 1674 with open(TESTFN, "rb") as fp: 1675 self.assertFalse(zipfile.is_zipfile(fp)) 1676 # - passing a file-like object 1677 fp = io.BytesIO() 1678 fp.write(b"this is not a legal zip file\n") 1679 self.assertFalse(zipfile.is_zipfile(fp)) 1680 fp.seek(0, 0) 1681 self.assertFalse(zipfile.is_zipfile(fp)) 1682 1683 def test_damaged_zipfile(self): 1684 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1685 # - Create a valid zip file 1686 fp = io.BytesIO() 1687 with zipfile.ZipFile(fp, mode="w") as zipf: 1688 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1689 zipfiledata = fp.getvalue() 1690 1691 # - Now create copies of it missing the last N bytes and make sure 1692 # a BadZipFile exception is raised when we try to open it 1693 for N in range(len(zipfiledata)): 1694 fp = io.BytesIO(zipfiledata[:N]) 1695 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1696 1697 def test_is_zip_valid_file(self): 1698 """Check that is_zipfile() correctly identifies zip files.""" 1699 # - passing a filename 1700 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1701 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1702 1703 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1704 # - passing a file object 1705 with open(TESTFN, "rb") as fp: 1706 self.assertTrue(zipfile.is_zipfile(fp)) 1707 fp.seek(0, 0) 1708 zip_contents = fp.read() 1709 # - passing a file-like object 1710 fp = io.BytesIO() 1711 fp.write(zip_contents) 1712 self.assertTrue(zipfile.is_zipfile(fp)) 1713 fp.seek(0, 0) 1714 self.assertTrue(zipfile.is_zipfile(fp)) 1715 1716 def test_non_existent_file_raises_OSError(self): 1717 # make sure we don't raise an AttributeError when a partially-constructed 1718 # ZipFile instance is finalized; this tests for regression on SF tracker 1719 # bug #403871. 1720 1721 # The bug we're testing for caused an AttributeError to be raised 1722 # when a ZipFile instance was created for a file that did not 1723 # exist; the .fp member was not initialized but was needed by the 1724 # __del__() method. Since the AttributeError is in the __del__(), 1725 # it is ignored, but the user should be sufficiently annoyed by 1726 # the message on the output that regression will be noticed 1727 # quickly. 1728 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1729 1730 def test_empty_file_raises_BadZipFile(self): 1731 f = open(TESTFN, 'w', encoding='utf-8') 1732 f.close() 1733 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1734 1735 with open(TESTFN, 'w', encoding='utf-8') as fp: 1736 fp.write("short file") 1737 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1738 1739 def test_closed_zip_raises_ValueError(self): 1740 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1741 data = io.BytesIO() 1742 with zipfile.ZipFile(data, mode="w") as zipf: 1743 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1744 1745 # This is correct; calling .read on a closed ZipFile should raise 1746 # a ValueError, and so should calling .testzip. An earlier 1747 # version of .testzip would swallow this exception (and any other) 1748 # and report that the first file in the archive was corrupt. 1749 self.assertRaises(ValueError, zipf.read, "foo.txt") 1750 self.assertRaises(ValueError, zipf.open, "foo.txt") 1751 self.assertRaises(ValueError, zipf.testzip) 1752 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1753 with open(TESTFN, 'w', encoding='utf-8') as f: 1754 f.write('zipfile test data') 1755 self.assertRaises(ValueError, zipf.write, TESTFN) 1756 1757 def test_bad_constructor_mode(self): 1758 """Check that bad modes passed to ZipFile constructor are caught.""" 1759 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1760 1761 def test_bad_open_mode(self): 1762 """Check that bad modes passed to ZipFile.open are caught.""" 1763 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1764 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1765 1766 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1767 # read the data to make sure the file is there 1768 zipf.read("foo.txt") 1769 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1770 # universal newlines support is removed 1771 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1772 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1773 1774 def test_read0(self): 1775 """Check that calling read(0) on a ZipExtFile object returns an empty 1776 string and doesn't advance file pointer.""" 1777 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1778 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1779 # read the data to make sure the file is there 1780 with zipf.open("foo.txt") as f: 1781 for i in range(FIXEDTEST_SIZE): 1782 self.assertEqual(f.read(0), b'') 1783 1784 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1785 1786 def test_open_non_existent_item(self): 1787 """Check that attempting to call open() for an item that doesn't 1788 exist in the archive raises a RuntimeError.""" 1789 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1790 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1791 1792 def test_bad_compression_mode(self): 1793 """Check that bad compression methods passed to ZipFile.open are 1794 caught.""" 1795 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1796 1797 def test_unsupported_compression(self): 1798 # data is declared as shrunk, but actually deflated 1799 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1800 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1801 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1802 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1803 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1804 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1805 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1806 self.assertRaises(NotImplementedError, zipf.open, 'x') 1807 1808 def test_null_byte_in_filename(self): 1809 """Check that a filename containing a null byte is properly 1810 terminated.""" 1811 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1812 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1813 self.assertEqual(zipf.namelist(), ['foo.txt']) 1814 1815 def test_struct_sizes(self): 1816 """Check that ZIP internal structure sizes are calculated correctly.""" 1817 self.assertEqual(zipfile.sizeEndCentDir, 22) 1818 self.assertEqual(zipfile.sizeCentralDir, 46) 1819 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1820 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1821 1822 def test_comments(self): 1823 """Check that comments on the archive are handled properly.""" 1824 1825 # check default comment is empty 1826 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1827 self.assertEqual(zipf.comment, b'') 1828 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1829 1830 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1831 self.assertEqual(zipfr.comment, b'') 1832 1833 # check a simple short comment 1834 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 1835 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1836 zipf.comment = comment 1837 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1838 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1839 self.assertEqual(zipf.comment, comment) 1840 1841 # check a comment of max length 1842 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 1843 comment2 = comment2.encode("ascii") 1844 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1845 zipf.comment = comment2 1846 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1847 1848 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1849 self.assertEqual(zipfr.comment, comment2) 1850 1851 # check a comment that is too long is truncated 1852 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1853 with self.assertWarns(UserWarning): 1854 zipf.comment = comment2 + b'oops' 1855 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1856 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 1857 self.assertEqual(zipfr.comment, comment2) 1858 1859 # check that comments are correctly modified in append mode 1860 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1861 zipf.comment = b"original comment" 1862 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1863 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1864 zipf.comment = b"an updated comment" 1865 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1866 self.assertEqual(zipf.comment, b"an updated comment") 1867 1868 # check that comments are correctly shortened in append mode 1869 # and the file is indeed truncated 1870 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 1871 zipf.comment = b"original comment that's longer" 1872 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1873 original_zip_size = os.path.getsize(TESTFN) 1874 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 1875 zipf.comment = b"shorter comment" 1876 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 1877 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 1878 self.assertEqual(zipf.comment, b"shorter comment") 1879 1880 def test_unicode_comment(self): 1881 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1882 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1883 with self.assertRaises(TypeError): 1884 zipf.comment = "this is an error" 1885 1886 def test_change_comment_in_empty_archive(self): 1887 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1888 self.assertFalse(zipf.filelist) 1889 zipf.comment = b"this is a comment" 1890 with zipfile.ZipFile(TESTFN, "r") as zipf: 1891 self.assertEqual(zipf.comment, b"this is a comment") 1892 1893 def test_change_comment_in_nonempty_archive(self): 1894 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 1895 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1896 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 1897 self.assertTrue(zipf.filelist) 1898 zipf.comment = b"this is a comment" 1899 with zipfile.ZipFile(TESTFN, "r") as zipf: 1900 self.assertEqual(zipf.comment, b"this is a comment") 1901 1902 def test_empty_zipfile(self): 1903 # Check that creating a file in 'w' or 'a' mode and closing without 1904 # adding any files to the archives creates a valid empty ZIP file 1905 zipf = zipfile.ZipFile(TESTFN, mode="w") 1906 zipf.close() 1907 try: 1908 zipf = zipfile.ZipFile(TESTFN, mode="r") 1909 except zipfile.BadZipFile: 1910 self.fail("Unable to create empty ZIP file in 'w' mode") 1911 1912 zipf = zipfile.ZipFile(TESTFN, mode="a") 1913 zipf.close() 1914 try: 1915 zipf = zipfile.ZipFile(TESTFN, mode="r") 1916 except: 1917 self.fail("Unable to create empty ZIP file in 'a' mode") 1918 1919 def test_open_empty_file(self): 1920 # Issue 1710703: Check that opening a file with less than 22 bytes 1921 # raises a BadZipFile exception (rather than the previously unhelpful 1922 # OSError) 1923 f = open(TESTFN, 'w', encoding='utf-8') 1924 f.close() 1925 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 1926 1927 def test_create_zipinfo_before_1980(self): 1928 self.assertRaises(ValueError, 1929 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 1930 1931 def test_create_empty_zipinfo_repr(self): 1932 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 1933 zi = zipfile.ZipInfo(filename="empty") 1934 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 1935 1936 def test_create_empty_zipinfo_default_attributes(self): 1937 """Ensure all required attributes are set.""" 1938 zi = zipfile.ZipInfo() 1939 self.assertEqual(zi.orig_filename, "NoName") 1940 self.assertEqual(zi.filename, "NoName") 1941 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 1942 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 1943 self.assertEqual(zi.comment, b"") 1944 self.assertEqual(zi.extra, b"") 1945 self.assertIn(zi.create_system, (0, 3)) 1946 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 1947 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 1948 self.assertEqual(zi.reserved, 0) 1949 self.assertEqual(zi.flag_bits, 0) 1950 self.assertEqual(zi.volume, 0) 1951 self.assertEqual(zi.internal_attr, 0) 1952 self.assertEqual(zi.external_attr, 0) 1953 1954 # Before bpo-26185, both were missing 1955 self.assertEqual(zi.file_size, 0) 1956 self.assertEqual(zi.compress_size, 0) 1957 1958 def test_zipfile_with_short_extra_field(self): 1959 """If an extra field in the header is less than 4 bytes, skip it.""" 1960 zipdata = ( 1961 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 1962 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 1963 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 1964 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 1965 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 1966 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 1967 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 1968 ) 1969 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 1970 # testzip returns the name of the first corrupt file, or None 1971 self.assertIsNone(zipf.testzip()) 1972 1973 def test_open_conflicting_handles(self): 1974 # It's only possible to open one writable file handle at a time 1975 msg1 = b"It's fun to charter an accountant!" 1976 msg2 = b"And sail the wide accountant sea" 1977 msg3 = b"To find, explore the funds offshore" 1978 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 1979 with zipf.open('foo', mode='w') as w2: 1980 w2.write(msg1) 1981 with zipf.open('bar', mode='w') as w1: 1982 with self.assertRaises(ValueError): 1983 zipf.open('handle', mode='w') 1984 with self.assertRaises(ValueError): 1985 zipf.open('foo', mode='r') 1986 with self.assertRaises(ValueError): 1987 zipf.writestr('str', 'abcde') 1988 with self.assertRaises(ValueError): 1989 zipf.write(__file__, 'file') 1990 with self.assertRaises(ValueError): 1991 zipf.close() 1992 w1.write(msg2) 1993 with zipf.open('baz', mode='w') as w2: 1994 w2.write(msg3) 1995 1996 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 1997 self.assertEqual(zipf.read('foo'), msg1) 1998 self.assertEqual(zipf.read('bar'), msg2) 1999 self.assertEqual(zipf.read('baz'), msg3) 2000 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 2001 2002 def test_seek_tell(self): 2003 # Test seek functionality 2004 txt = b"Where's Bruce?" 2005 bloc = txt.find(b"Bruce") 2006 # Check seek on a file 2007 with zipfile.ZipFile(TESTFN, "w") as zipf: 2008 zipf.writestr("foo.txt", txt) 2009 with zipfile.ZipFile(TESTFN, "r") as zipf: 2010 with zipf.open("foo.txt", "r") as fp: 2011 fp.seek(bloc, os.SEEK_SET) 2012 self.assertEqual(fp.tell(), bloc) 2013 fp.seek(-bloc, os.SEEK_CUR) 2014 self.assertEqual(fp.tell(), 0) 2015 fp.seek(bloc, os.SEEK_CUR) 2016 self.assertEqual(fp.tell(), bloc) 2017 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2018 fp.seek(0, os.SEEK_END) 2019 self.assertEqual(fp.tell(), len(txt)) 2020 fp.seek(0, os.SEEK_SET) 2021 self.assertEqual(fp.tell(), 0) 2022 # Check seek on memory file 2023 data = io.BytesIO() 2024 with zipfile.ZipFile(data, mode="w") as zipf: 2025 zipf.writestr("foo.txt", txt) 2026 with zipfile.ZipFile(data, mode="r") as zipf: 2027 with zipf.open("foo.txt", "r") as fp: 2028 fp.seek(bloc, os.SEEK_SET) 2029 self.assertEqual(fp.tell(), bloc) 2030 fp.seek(-bloc, os.SEEK_CUR) 2031 self.assertEqual(fp.tell(), 0) 2032 fp.seek(bloc, os.SEEK_CUR) 2033 self.assertEqual(fp.tell(), bloc) 2034 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2035 fp.seek(0, os.SEEK_END) 2036 self.assertEqual(fp.tell(), len(txt)) 2037 fp.seek(0, os.SEEK_SET) 2038 self.assertEqual(fp.tell(), 0) 2039 2040 @requires_bz2() 2041 def test_decompress_without_3rd_party_library(self): 2042 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2043 zip_file = io.BytesIO(data) 2044 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2045 zf.writestr('a.txt', b'a') 2046 with mock.patch('zipfile.bz2', None): 2047 with zipfile.ZipFile(zip_file) as zf: 2048 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2049 2050 def tearDown(self): 2051 unlink(TESTFN) 2052 unlink(TESTFN2) 2053 2054 2055class AbstractBadCrcTests: 2056 def test_testzip_with_bad_crc(self): 2057 """Tests that files with bad CRCs return their name from testzip.""" 2058 zipdata = self.zip_with_bad_crc 2059 2060 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2061 # testzip returns the name of the first corrupt file, or None 2062 self.assertEqual('afile', zipf.testzip()) 2063 2064 def test_read_with_bad_crc(self): 2065 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2066 zipdata = self.zip_with_bad_crc 2067 2068 # Using ZipFile.read() 2069 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2070 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2071 2072 # Using ZipExtFile.read() 2073 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2074 with zipf.open('afile', 'r') as corrupt_file: 2075 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2076 2077 # Same with small reads (in order to exercise the buffering logic) 2078 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2079 with zipf.open('afile', 'r') as corrupt_file: 2080 corrupt_file.MIN_READ_SIZE = 2 2081 with self.assertRaises(zipfile.BadZipFile): 2082 while corrupt_file.read(2): 2083 pass 2084 2085 2086class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2087 compression = zipfile.ZIP_STORED 2088 zip_with_bad_crc = ( 2089 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2090 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2091 b'ilehello,AworldP' 2092 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2093 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2094 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2095 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2096 b'\0\0/\0\0\0\0\0') 2097 2098@requires_zlib() 2099class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2100 compression = zipfile.ZIP_DEFLATED 2101 zip_with_bad_crc = ( 2102 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2103 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2104 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2105 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2106 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2107 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2108 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2109 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2110 2111@requires_bz2() 2112class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2113 compression = zipfile.ZIP_BZIP2 2114 zip_with_bad_crc = ( 2115 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2116 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2117 b'ileBZh91AY&SY\xd4\xa8\xca' 2118 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2119 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2120 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2121 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2122 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2123 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2124 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2125 b'\x00\x00\x00\x00') 2126 2127@requires_lzma() 2128class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2129 compression = zipfile.ZIP_LZMA 2130 zip_with_bad_crc = ( 2131 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2132 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2133 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2134 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2135 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2136 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2137 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2138 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2139 b'\x00>\x00\x00\x00\x00\x00') 2140 2141 2142class DecryptionTests(unittest.TestCase): 2143 """Check that ZIP decryption works. Since the library does not 2144 support encryption at the moment, we use a pre-generated encrypted 2145 ZIP file.""" 2146 2147 data = ( 2148 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2149 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2150 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2151 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2152 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2153 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2154 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2155 data2 = ( 2156 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2157 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2158 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2159 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2160 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2161 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2162 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2163 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2164 2165 plain = b'zipfile.py encryption test' 2166 plain2 = b'\x00'*512 2167 2168 def setUp(self): 2169 with open(TESTFN, "wb") as fp: 2170 fp.write(self.data) 2171 self.zip = zipfile.ZipFile(TESTFN, "r") 2172 with open(TESTFN2, "wb") as fp: 2173 fp.write(self.data2) 2174 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2175 2176 def tearDown(self): 2177 self.zip.close() 2178 os.unlink(TESTFN) 2179 self.zip2.close() 2180 os.unlink(TESTFN2) 2181 2182 def test_no_password(self): 2183 # Reading the encrypted file without password 2184 # must generate a RunTime exception 2185 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2186 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2187 2188 def test_bad_password(self): 2189 self.zip.setpassword(b"perl") 2190 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2191 self.zip2.setpassword(b"perl") 2192 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2193 2194 @requires_zlib() 2195 def test_good_password(self): 2196 self.zip.setpassword(b"python") 2197 self.assertEqual(self.zip.read("test.txt"), self.plain) 2198 self.zip2.setpassword(b"12345") 2199 self.assertEqual(self.zip2.read("zero"), self.plain2) 2200 2201 def test_unicode_password(self): 2202 self.assertRaises(TypeError, self.zip.setpassword, "unicode") 2203 self.assertRaises(TypeError, self.zip.read, "test.txt", "python") 2204 self.assertRaises(TypeError, self.zip.open, "test.txt", pwd="python") 2205 self.assertRaises(TypeError, self.zip.extract, "test.txt", pwd="python") 2206 2207 def test_seek_tell(self): 2208 self.zip.setpassword(b"python") 2209 txt = self.plain 2210 test_word = b'encryption' 2211 bloc = txt.find(test_word) 2212 bloc_len = len(test_word) 2213 with self.zip.open("test.txt", "r") as fp: 2214 fp.seek(bloc, os.SEEK_SET) 2215 self.assertEqual(fp.tell(), bloc) 2216 fp.seek(-bloc, os.SEEK_CUR) 2217 self.assertEqual(fp.tell(), 0) 2218 fp.seek(bloc, os.SEEK_CUR) 2219 self.assertEqual(fp.tell(), bloc) 2220 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2221 2222 # Make sure that the second read after seeking back beyond 2223 # _readbuffer returns the same content (ie. rewind to the start of 2224 # the file to read forward to the required position). 2225 old_read_size = fp.MIN_READ_SIZE 2226 fp.MIN_READ_SIZE = 1 2227 fp._readbuffer = b'' 2228 fp._offset = 0 2229 fp.seek(0, os.SEEK_SET) 2230 self.assertEqual(fp.tell(), 0) 2231 fp.seek(bloc, os.SEEK_CUR) 2232 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2233 fp.MIN_READ_SIZE = old_read_size 2234 2235 fp.seek(0, os.SEEK_END) 2236 self.assertEqual(fp.tell(), len(txt)) 2237 fp.seek(0, os.SEEK_SET) 2238 self.assertEqual(fp.tell(), 0) 2239 2240 # Read the file completely to definitely call any eof integrity 2241 # checks (crc) and make sure they still pass. 2242 fp.read() 2243 2244 2245class AbstractTestsWithRandomBinaryFiles: 2246 @classmethod 2247 def setUpClass(cls): 2248 datacount = randint(16, 64)*1024 + randint(1, 1024) 2249 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2250 for i in range(datacount)) 2251 2252 def setUp(self): 2253 # Make a source file with some lines 2254 with open(TESTFN, "wb") as fp: 2255 fp.write(self.data) 2256 2257 def tearDown(self): 2258 unlink(TESTFN) 2259 unlink(TESTFN2) 2260 2261 def make_test_archive(self, f, compression): 2262 # Create the ZIP archive 2263 with zipfile.ZipFile(f, "w", compression) as zipfp: 2264 zipfp.write(TESTFN, "another.name") 2265 zipfp.write(TESTFN, TESTFN) 2266 2267 def zip_test(self, f, compression): 2268 self.make_test_archive(f, compression) 2269 2270 # Read the ZIP archive 2271 with zipfile.ZipFile(f, "r", compression) as zipfp: 2272 testdata = zipfp.read(TESTFN) 2273 self.assertEqual(len(testdata), len(self.data)) 2274 self.assertEqual(testdata, self.data) 2275 self.assertEqual(zipfp.read("another.name"), self.data) 2276 2277 def test_read(self): 2278 for f in get_files(self): 2279 self.zip_test(f, self.compression) 2280 2281 def zip_open_test(self, f, compression): 2282 self.make_test_archive(f, compression) 2283 2284 # Read the ZIP archive 2285 with zipfile.ZipFile(f, "r", compression) as zipfp: 2286 zipdata1 = [] 2287 with zipfp.open(TESTFN) as zipopen1: 2288 while True: 2289 read_data = zipopen1.read(256) 2290 if not read_data: 2291 break 2292 zipdata1.append(read_data) 2293 2294 zipdata2 = [] 2295 with zipfp.open("another.name") as zipopen2: 2296 while True: 2297 read_data = zipopen2.read(256) 2298 if not read_data: 2299 break 2300 zipdata2.append(read_data) 2301 2302 testdata1 = b''.join(zipdata1) 2303 self.assertEqual(len(testdata1), len(self.data)) 2304 self.assertEqual(testdata1, self.data) 2305 2306 testdata2 = b''.join(zipdata2) 2307 self.assertEqual(len(testdata2), len(self.data)) 2308 self.assertEqual(testdata2, self.data) 2309 2310 def test_open(self): 2311 for f in get_files(self): 2312 self.zip_open_test(f, self.compression) 2313 2314 def zip_random_open_test(self, f, compression): 2315 self.make_test_archive(f, compression) 2316 2317 # Read the ZIP archive 2318 with zipfile.ZipFile(f, "r", compression) as zipfp: 2319 zipdata1 = [] 2320 with zipfp.open(TESTFN) as zipopen1: 2321 while True: 2322 read_data = zipopen1.read(randint(1, 1024)) 2323 if not read_data: 2324 break 2325 zipdata1.append(read_data) 2326 2327 testdata = b''.join(zipdata1) 2328 self.assertEqual(len(testdata), len(self.data)) 2329 self.assertEqual(testdata, self.data) 2330 2331 def test_random_open(self): 2332 for f in get_files(self): 2333 self.zip_random_open_test(f, self.compression) 2334 2335 2336class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2337 unittest.TestCase): 2338 compression = zipfile.ZIP_STORED 2339 2340@requires_zlib() 2341class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2342 unittest.TestCase): 2343 compression = zipfile.ZIP_DEFLATED 2344 2345@requires_bz2() 2346class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2347 unittest.TestCase): 2348 compression = zipfile.ZIP_BZIP2 2349 2350@requires_lzma() 2351class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2352 unittest.TestCase): 2353 compression = zipfile.ZIP_LZMA 2354 2355 2356# Provide the tell() method but not seek() 2357class Tellable: 2358 def __init__(self, fp): 2359 self.fp = fp 2360 self.offset = 0 2361 2362 def write(self, data): 2363 n = self.fp.write(data) 2364 self.offset += n 2365 return n 2366 2367 def tell(self): 2368 return self.offset 2369 2370 def flush(self): 2371 self.fp.flush() 2372 2373class Unseekable: 2374 def __init__(self, fp): 2375 self.fp = fp 2376 2377 def write(self, data): 2378 return self.fp.write(data) 2379 2380 def flush(self): 2381 self.fp.flush() 2382 2383class UnseekableTests(unittest.TestCase): 2384 def test_writestr(self): 2385 for wrapper in (lambda f: f), Tellable, Unseekable: 2386 with self.subTest(wrapper=wrapper): 2387 f = io.BytesIO() 2388 f.write(b'abc') 2389 bf = io.BufferedWriter(f) 2390 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2391 zipfp.writestr('ones', b'111') 2392 zipfp.writestr('twos', b'222') 2393 self.assertEqual(f.getvalue()[:5], b'abcPK') 2394 with zipfile.ZipFile(f, mode='r') as zipf: 2395 with zipf.open('ones') as zopen: 2396 self.assertEqual(zopen.read(), b'111') 2397 with zipf.open('twos') as zopen: 2398 self.assertEqual(zopen.read(), b'222') 2399 2400 def test_write(self): 2401 for wrapper in (lambda f: f), Tellable, Unseekable: 2402 with self.subTest(wrapper=wrapper): 2403 f = io.BytesIO() 2404 f.write(b'abc') 2405 bf = io.BufferedWriter(f) 2406 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2407 self.addCleanup(unlink, TESTFN) 2408 with open(TESTFN, 'wb') as f2: 2409 f2.write(b'111') 2410 zipfp.write(TESTFN, 'ones') 2411 with open(TESTFN, 'wb') as f2: 2412 f2.write(b'222') 2413 zipfp.write(TESTFN, 'twos') 2414 self.assertEqual(f.getvalue()[:5], b'abcPK') 2415 with zipfile.ZipFile(f, mode='r') as zipf: 2416 with zipf.open('ones') as zopen: 2417 self.assertEqual(zopen.read(), b'111') 2418 with zipf.open('twos') as zopen: 2419 self.assertEqual(zopen.read(), b'222') 2420 2421 def test_open_write(self): 2422 for wrapper in (lambda f: f), Tellable, Unseekable: 2423 with self.subTest(wrapper=wrapper): 2424 f = io.BytesIO() 2425 f.write(b'abc') 2426 bf = io.BufferedWriter(f) 2427 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2428 with zipf.open('ones', 'w') as zopen: 2429 zopen.write(b'111') 2430 with zipf.open('twos', 'w') as zopen: 2431 zopen.write(b'222') 2432 self.assertEqual(f.getvalue()[:5], b'abcPK') 2433 with zipfile.ZipFile(f) as zipf: 2434 self.assertEqual(zipf.read('ones'), b'111') 2435 self.assertEqual(zipf.read('twos'), b'222') 2436 2437 2438@requires_zlib() 2439class TestsWithMultipleOpens(unittest.TestCase): 2440 @classmethod 2441 def setUpClass(cls): 2442 cls.data1 = b'111' + randbytes(10000) 2443 cls.data2 = b'222' + randbytes(10000) 2444 2445 def make_test_archive(self, f): 2446 # Create the ZIP archive 2447 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2448 zipfp.writestr('ones', self.data1) 2449 zipfp.writestr('twos', self.data2) 2450 2451 def test_same_file(self): 2452 # Verify that (when the ZipFile is in control of creating file objects) 2453 # multiple open() calls can be made without interfering with each other. 2454 for f in get_files(self): 2455 self.make_test_archive(f) 2456 with zipfile.ZipFile(f, mode="r") as zipf: 2457 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2458 data1 = zopen1.read(500) 2459 data2 = zopen2.read(500) 2460 data1 += zopen1.read() 2461 data2 += zopen2.read() 2462 self.assertEqual(data1, data2) 2463 self.assertEqual(data1, self.data1) 2464 2465 def test_different_file(self): 2466 # Verify that (when the ZipFile is in control of creating file objects) 2467 # multiple open() calls can be made without interfering with each other. 2468 for f in get_files(self): 2469 self.make_test_archive(f) 2470 with zipfile.ZipFile(f, mode="r") as zipf: 2471 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2472 data1 = zopen1.read(500) 2473 data2 = zopen2.read(500) 2474 data1 += zopen1.read() 2475 data2 += zopen2.read() 2476 self.assertEqual(data1, self.data1) 2477 self.assertEqual(data2, self.data2) 2478 2479 def test_interleaved(self): 2480 # Verify that (when the ZipFile is in control of creating file objects) 2481 # multiple open() calls can be made without interfering with each other. 2482 for f in get_files(self): 2483 self.make_test_archive(f) 2484 with zipfile.ZipFile(f, mode="r") as zipf: 2485 with zipf.open('ones') as zopen1: 2486 data1 = zopen1.read(500) 2487 with zipf.open('twos') as zopen2: 2488 data2 = zopen2.read(500) 2489 data1 += zopen1.read() 2490 data2 += zopen2.read() 2491 self.assertEqual(data1, self.data1) 2492 self.assertEqual(data2, self.data2) 2493 2494 def test_read_after_close(self): 2495 for f in get_files(self): 2496 self.make_test_archive(f) 2497 with contextlib.ExitStack() as stack: 2498 with zipfile.ZipFile(f, 'r') as zipf: 2499 zopen1 = stack.enter_context(zipf.open('ones')) 2500 zopen2 = stack.enter_context(zipf.open('twos')) 2501 data1 = zopen1.read(500) 2502 data2 = zopen2.read(500) 2503 data1 += zopen1.read() 2504 data2 += zopen2.read() 2505 self.assertEqual(data1, self.data1) 2506 self.assertEqual(data2, self.data2) 2507 2508 def test_read_after_write(self): 2509 for f in get_files(self): 2510 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2511 zipf.writestr('ones', self.data1) 2512 zipf.writestr('twos', self.data2) 2513 with zipf.open('ones') as zopen1: 2514 data1 = zopen1.read(500) 2515 self.assertEqual(data1, self.data1[:500]) 2516 with zipfile.ZipFile(f, 'r') as zipf: 2517 data1 = zipf.read('ones') 2518 data2 = zipf.read('twos') 2519 self.assertEqual(data1, self.data1) 2520 self.assertEqual(data2, self.data2) 2521 2522 def test_write_after_read(self): 2523 for f in get_files(self): 2524 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2525 zipf.writestr('ones', self.data1) 2526 with zipf.open('ones') as zopen1: 2527 zopen1.read(500) 2528 zipf.writestr('twos', self.data2) 2529 with zipfile.ZipFile(f, 'r') as zipf: 2530 data1 = zipf.read('ones') 2531 data2 = zipf.read('twos') 2532 self.assertEqual(data1, self.data1) 2533 self.assertEqual(data2, self.data2) 2534 2535 def test_many_opens(self): 2536 # Verify that read() and open() promptly close the file descriptor, 2537 # and don't rely on the garbage collector to free resources. 2538 self.make_test_archive(TESTFN2) 2539 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2540 for x in range(100): 2541 zipf.read('ones') 2542 with zipf.open('ones') as zopen1: 2543 pass 2544 with open(os.devnull, "rb") as f: 2545 self.assertLess(f.fileno(), 100) 2546 2547 def test_write_while_reading(self): 2548 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2549 zipf.writestr('ones', self.data1) 2550 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2551 with zipf.open('ones', 'r') as r1: 2552 data1 = r1.read(500) 2553 with zipf.open('twos', 'w') as w1: 2554 w1.write(self.data2) 2555 data1 += r1.read() 2556 self.assertEqual(data1, self.data1) 2557 with zipfile.ZipFile(TESTFN2) as zipf: 2558 self.assertEqual(zipf.read('twos'), self.data2) 2559 2560 def tearDown(self): 2561 unlink(TESTFN2) 2562 2563 2564class TestWithDirectory(unittest.TestCase): 2565 def setUp(self): 2566 os.mkdir(TESTFN2) 2567 2568 def test_extract_dir(self): 2569 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2570 zipf.extractall(TESTFN2) 2571 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2572 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2573 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2574 2575 def test_bug_6050(self): 2576 # Extraction should succeed if directories already exist 2577 os.mkdir(os.path.join(TESTFN2, "a")) 2578 self.test_extract_dir() 2579 2580 def test_write_dir(self): 2581 dirpath = os.path.join(TESTFN2, "x") 2582 os.mkdir(dirpath) 2583 mode = os.stat(dirpath).st_mode & 0xFFFF 2584 with zipfile.ZipFile(TESTFN, "w") as zipf: 2585 zipf.write(dirpath) 2586 zinfo = zipf.filelist[0] 2587 self.assertTrue(zinfo.filename.endswith("/x/")) 2588 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2589 zipf.write(dirpath, "y") 2590 zinfo = zipf.filelist[1] 2591 self.assertTrue(zinfo.filename, "y/") 2592 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2593 with zipfile.ZipFile(TESTFN, "r") as zipf: 2594 zinfo = zipf.filelist[0] 2595 self.assertTrue(zinfo.filename.endswith("/x/")) 2596 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2597 zinfo = zipf.filelist[1] 2598 self.assertTrue(zinfo.filename, "y/") 2599 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2600 target = os.path.join(TESTFN2, "target") 2601 os.mkdir(target) 2602 zipf.extractall(target) 2603 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2604 self.assertEqual(len(os.listdir(target)), 2) 2605 2606 def test_writestr_dir(self): 2607 os.mkdir(os.path.join(TESTFN2, "x")) 2608 with zipfile.ZipFile(TESTFN, "w") as zipf: 2609 zipf.writestr("x/", b'') 2610 zinfo = zipf.filelist[0] 2611 self.assertEqual(zinfo.filename, "x/") 2612 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2613 with zipfile.ZipFile(TESTFN, "r") as zipf: 2614 zinfo = zipf.filelist[0] 2615 self.assertTrue(zinfo.filename.endswith("x/")) 2616 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2617 target = os.path.join(TESTFN2, "target") 2618 os.mkdir(target) 2619 zipf.extractall(target) 2620 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2621 self.assertEqual(os.listdir(target), ["x"]) 2622 2623 def tearDown(self): 2624 rmtree(TESTFN2) 2625 if os.path.exists(TESTFN): 2626 unlink(TESTFN) 2627 2628 2629class ZipInfoTests(unittest.TestCase): 2630 def test_from_file(self): 2631 zi = zipfile.ZipInfo.from_file(__file__) 2632 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2633 self.assertFalse(zi.is_dir()) 2634 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2635 2636 def test_from_file_pathlike(self): 2637 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2638 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2639 self.assertFalse(zi.is_dir()) 2640 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2641 2642 def test_from_file_bytes(self): 2643 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2644 self.assertEqual(posixpath.basename(zi.filename), 'test') 2645 self.assertFalse(zi.is_dir()) 2646 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2647 2648 def test_from_file_fileno(self): 2649 with open(__file__, 'rb') as f: 2650 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2651 self.assertEqual(posixpath.basename(zi.filename), 'test') 2652 self.assertFalse(zi.is_dir()) 2653 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2654 2655 def test_from_dir(self): 2656 dirpath = os.path.dirname(os.path.abspath(__file__)) 2657 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2658 self.assertEqual(zi.filename, 'stdlib_tests/') 2659 self.assertTrue(zi.is_dir()) 2660 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2661 self.assertEqual(zi.file_size, 0) 2662 2663 2664class CommandLineTest(unittest.TestCase): 2665 2666 def zipfilecmd(self, *args, **kwargs): 2667 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2668 **kwargs) 2669 return out.replace(os.linesep.encode(), b'\n') 2670 2671 def zipfilecmd_failure(self, *args): 2672 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2673 2674 def test_bad_use(self): 2675 rc, out, err = self.zipfilecmd_failure() 2676 self.assertEqual(out, b'') 2677 self.assertIn(b'usage', err.lower()) 2678 self.assertIn(b'error', err.lower()) 2679 self.assertIn(b'required', err.lower()) 2680 rc, out, err = self.zipfilecmd_failure('-l', '') 2681 self.assertEqual(out, b'') 2682 self.assertNotEqual(err.strip(), b'') 2683 2684 def test_test_command(self): 2685 zip_name = findfile('zipdir.zip') 2686 for opt in '-t', '--test': 2687 out = self.zipfilecmd(opt, zip_name) 2688 self.assertEqual(out.rstrip(), b'Done testing') 2689 zip_name = findfile('testtar.tar') 2690 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2691 self.assertEqual(out, b'') 2692 2693 def test_list_command(self): 2694 zip_name = findfile('zipdir.zip') 2695 t = io.StringIO() 2696 with zipfile.ZipFile(zip_name, 'r') as tf: 2697 tf.printdir(t) 2698 expected = t.getvalue().encode('ascii', 'backslashreplace') 2699 for opt in '-l', '--list': 2700 out = self.zipfilecmd(opt, zip_name, 2701 PYTHONIOENCODING='ascii:backslashreplace') 2702 self.assertEqual(out, expected) 2703 2704 @requires_zlib() 2705 def test_create_command(self): 2706 self.addCleanup(unlink, TESTFN) 2707 with open(TESTFN, 'w', encoding='utf-8') as f: 2708 f.write('test 1') 2709 os.mkdir(TESTFNDIR) 2710 self.addCleanup(rmtree, TESTFNDIR) 2711 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 2712 f.write('test 2') 2713 files = [TESTFN, TESTFNDIR] 2714 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 2715 for opt in '-c', '--create': 2716 try: 2717 out = self.zipfilecmd(opt, TESTFN2, *files) 2718 self.assertEqual(out, b'') 2719 with zipfile.ZipFile(TESTFN2) as zf: 2720 self.assertEqual(zf.namelist(), namelist) 2721 self.assertEqual(zf.read(namelist[0]), b'test 1') 2722 self.assertEqual(zf.read(namelist[2]), b'test 2') 2723 finally: 2724 unlink(TESTFN2) 2725 2726 def test_extract_command(self): 2727 zip_name = findfile('zipdir.zip') 2728 for opt in '-e', '--extract': 2729 with temp_dir() as extdir: 2730 out = self.zipfilecmd(opt, zip_name, extdir) 2731 self.assertEqual(out, b'') 2732 with zipfile.ZipFile(zip_name) as zf: 2733 for zi in zf.infolist(): 2734 path = os.path.join(extdir, 2735 zi.filename.replace('/', os.sep)) 2736 if zi.is_dir(): 2737 self.assertTrue(os.path.isdir(path)) 2738 else: 2739 self.assertTrue(os.path.isfile(path)) 2740 with open(path, 'rb') as f: 2741 self.assertEqual(f.read(), zf.read(zi)) 2742 2743 2744class TestExecutablePrependedZip(unittest.TestCase): 2745 """Test our ability to open zip files with an executable prepended.""" 2746 2747 def setUp(self): 2748 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 2749 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 2750 2751 def _test_zip_works(self, name): 2752 # bpo28494 sanity check: ensure is_zipfile works on these. 2753 self.assertTrue(zipfile.is_zipfile(name), 2754 f'is_zipfile failed on {name}') 2755 # Ensure we can operate on these via ZipFile. 2756 with zipfile.ZipFile(name) as zipfp: 2757 for n in zipfp.namelist(): 2758 data = zipfp.read(n) 2759 self.assertIn(b'FAVORITE_NUMBER', data) 2760 2761 def test_read_zip_with_exe_prepended(self): 2762 self._test_zip_works(self.exe_zip) 2763 2764 def test_read_zip64_with_exe_prepended(self): 2765 self._test_zip_works(self.exe_zip64) 2766 2767 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2768 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2769 'Test relies on #!/bin/bash working.') 2770 def test_execute_zip2(self): 2771 output = subprocess.check_output([self.exe_zip, sys.executable]) 2772 self.assertIn(b'number in executable: 5', output) 2773 2774 @unittest.skipUnless(sys.executable, 'sys.executable required.') 2775 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 2776 'Test relies on #!/bin/bash working.') 2777 def test_execute_zip64(self): 2778 output = subprocess.check_output([self.exe_zip64, sys.executable]) 2779 self.assertIn(b'number in executable: 5', output) 2780 2781 2782# Poor man's technique to consume a (smallish) iterable. 2783consume = tuple 2784 2785 2786# from jaraco.itertools 5.0 2787class jaraco: 2788 class itertools: 2789 class Counter: 2790 def __init__(self, i): 2791 self.count = 0 2792 self._orig_iter = iter(i) 2793 2794 def __iter__(self): 2795 return self 2796 2797 def __next__(self): 2798 result = next(self._orig_iter) 2799 self.count += 1 2800 return result 2801 2802 2803def add_dirs(zf): 2804 """ 2805 Given a writable zip file zf, inject directory entries for 2806 any directories implied by the presence of children. 2807 """ 2808 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 2809 zf.writestr(name, b"") 2810 return zf 2811 2812 2813def build_alpharep_fixture(): 2814 """ 2815 Create a zip file with this structure: 2816 2817 . 2818 ├── a.txt 2819 ├── b 2820 │ ├── c.txt 2821 │ ├── d 2822 │ │ └── e.txt 2823 │ └── f.txt 2824 └── g 2825 └── h 2826 └── i.txt 2827 2828 This fixture has the following key characteristics: 2829 2830 - a file at the root (a) 2831 - a file two levels deep (b/d/e) 2832 - multiple files in a directory (b/c, b/f) 2833 - a directory containing only a directory (g/h) 2834 2835 "alpha" because it uses alphabet 2836 "rep" because it's a representative example 2837 """ 2838 data = io.BytesIO() 2839 zf = zipfile.ZipFile(data, "w") 2840 zf.writestr("a.txt", b"content of a") 2841 zf.writestr("b/c.txt", b"content of c") 2842 zf.writestr("b/d/e.txt", b"content of e") 2843 zf.writestr("b/f.txt", b"content of f") 2844 zf.writestr("g/h/i.txt", b"content of i") 2845 zf.filename = "alpharep.zip" 2846 return zf 2847 2848 2849def pass_alpharep(meth): 2850 """ 2851 Given a method, wrap it in a for loop that invokes method 2852 with each subtest. 2853 """ 2854 2855 @functools.wraps(meth) 2856 def wrapper(self): 2857 for alpharep in self.zipfile_alpharep(): 2858 meth(self, alpharep=alpharep) 2859 2860 return wrapper 2861 2862 2863class TestPath(unittest.TestCase): 2864 def setUp(self): 2865 self.fixtures = contextlib.ExitStack() 2866 self.addCleanup(self.fixtures.close) 2867 2868 def zipfile_alpharep(self): 2869 with self.subTest(): 2870 yield build_alpharep_fixture() 2871 with self.subTest(): 2872 yield add_dirs(build_alpharep_fixture()) 2873 2874 def zipfile_ondisk(self, alpharep): 2875 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 2876 buffer = alpharep.fp 2877 alpharep.close() 2878 path = tmpdir / alpharep.filename 2879 with path.open("wb") as strm: 2880 strm.write(buffer.getvalue()) 2881 return path 2882 2883 @pass_alpharep 2884 def test_iterdir_and_types(self, alpharep): 2885 root = zipfile.Path(alpharep) 2886 assert root.is_dir() 2887 a, b, g = root.iterdir() 2888 assert a.is_file() 2889 assert b.is_dir() 2890 assert g.is_dir() 2891 c, f, d = b.iterdir() 2892 assert c.is_file() and f.is_file() 2893 (e,) = d.iterdir() 2894 assert e.is_file() 2895 (h,) = g.iterdir() 2896 (i,) = h.iterdir() 2897 assert i.is_file() 2898 2899 @pass_alpharep 2900 def test_is_file_missing(self, alpharep): 2901 root = zipfile.Path(alpharep) 2902 assert not root.joinpath('missing.txt').is_file() 2903 2904 @pass_alpharep 2905 def test_iterdir_on_file(self, alpharep): 2906 root = zipfile.Path(alpharep) 2907 a, b, g = root.iterdir() 2908 with self.assertRaises(ValueError): 2909 a.iterdir() 2910 2911 @pass_alpharep 2912 def test_subdir_is_dir(self, alpharep): 2913 root = zipfile.Path(alpharep) 2914 assert (root / 'b').is_dir() 2915 assert (root / 'b/').is_dir() 2916 assert (root / 'g').is_dir() 2917 assert (root / 'g/').is_dir() 2918 2919 @pass_alpharep 2920 def test_open(self, alpharep): 2921 root = zipfile.Path(alpharep) 2922 a, b, g = root.iterdir() 2923 with a.open(encoding="utf-8") as strm: 2924 data = strm.read() 2925 assert data == "content of a" 2926 2927 def test_open_write(self): 2928 """ 2929 If the zipfile is open for write, it should be possible to 2930 write bytes or text to it. 2931 """ 2932 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 2933 with zf.joinpath('file.bin').open('wb') as strm: 2934 strm.write(b'binary contents') 2935 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 2936 strm.write('text file') 2937 2938 def test_open_extant_directory(self): 2939 """ 2940 Attempting to open a directory raises IsADirectoryError. 2941 """ 2942 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2943 with self.assertRaises(IsADirectoryError): 2944 zf.joinpath('b').open() 2945 2946 @pass_alpharep 2947 def test_open_binary_invalid_args(self, alpharep): 2948 root = zipfile.Path(alpharep) 2949 with self.assertRaises(ValueError): 2950 root.joinpath('a.txt').open('rb', encoding='utf-8') 2951 with self.assertRaises(ValueError): 2952 root.joinpath('a.txt').open('rb', 'utf-8') 2953 2954 def test_open_missing_directory(self): 2955 """ 2956 Attempting to open a missing directory raises FileNotFoundError. 2957 """ 2958 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 2959 with self.assertRaises(FileNotFoundError): 2960 zf.joinpath('z').open() 2961 2962 @pass_alpharep 2963 def test_read(self, alpharep): 2964 root = zipfile.Path(alpharep) 2965 a, b, g = root.iterdir() 2966 assert a.read_text(encoding="utf-8") == "content of a" 2967 assert a.read_bytes() == b"content of a" 2968 2969 @pass_alpharep 2970 def test_joinpath(self, alpharep): 2971 root = zipfile.Path(alpharep) 2972 a = root.joinpath("a.txt") 2973 assert a.is_file() 2974 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 2975 assert e.read_text(encoding="utf-8") == "content of e" 2976 2977 @pass_alpharep 2978 def test_joinpath_multiple(self, alpharep): 2979 root = zipfile.Path(alpharep) 2980 e = root.joinpath("b", "d", "e.txt") 2981 assert e.read_text(encoding="utf-8") == "content of e" 2982 2983 @pass_alpharep 2984 def test_traverse_truediv(self, alpharep): 2985 root = zipfile.Path(alpharep) 2986 a = root / "a.txt" 2987 assert a.is_file() 2988 e = root / "b" / "d" / "e.txt" 2989 assert e.read_text(encoding="utf-8") == "content of e" 2990 2991 @pass_alpharep 2992 def test_traverse_simplediv(self, alpharep): 2993 """ 2994 Disable the __future__.division when testing traversal. 2995 """ 2996 code = compile( 2997 source="zipfile.Path(alpharep) / 'a'", 2998 filename="(test)", 2999 mode="eval", 3000 dont_inherit=True, 3001 ) 3002 eval(code) 3003 3004 @pass_alpharep 3005 def test_pathlike_construction(self, alpharep): 3006 """ 3007 zipfile.Path should be constructable from a path-like object 3008 """ 3009 zipfile_ondisk = self.zipfile_ondisk(alpharep) 3010 pathlike = pathlib.Path(str(zipfile_ondisk)) 3011 zipfile.Path(pathlike) 3012 3013 @pass_alpharep 3014 def test_traverse_pathlike(self, alpharep): 3015 root = zipfile.Path(alpharep) 3016 root / pathlib.Path("a") 3017 3018 @pass_alpharep 3019 def test_parent(self, alpharep): 3020 root = zipfile.Path(alpharep) 3021 assert (root / 'a').parent.at == '' 3022 assert (root / 'a' / 'b').parent.at == 'a/' 3023 3024 @pass_alpharep 3025 def test_dir_parent(self, alpharep): 3026 root = zipfile.Path(alpharep) 3027 assert (root / 'b').parent.at == '' 3028 assert (root / 'b/').parent.at == '' 3029 3030 @pass_alpharep 3031 def test_missing_dir_parent(self, alpharep): 3032 root = zipfile.Path(alpharep) 3033 assert (root / 'missing dir/').parent.at == '' 3034 3035 @pass_alpharep 3036 def test_mutability(self, alpharep): 3037 """ 3038 If the underlying zipfile is changed, the Path object should 3039 reflect that change. 3040 """ 3041 root = zipfile.Path(alpharep) 3042 a, b, g = root.iterdir() 3043 alpharep.writestr('foo.txt', 'foo') 3044 alpharep.writestr('bar/baz.txt', 'baz') 3045 assert any(child.name == 'foo.txt' for child in root.iterdir()) 3046 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 3047 (baz,) = (root / 'bar').iterdir() 3048 assert baz.read_text(encoding="utf-8") == 'baz' 3049 3050 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 3051 3052 def huge_zipfile(self): 3053 """Create a read-only zipfile with a huge number of entries entries.""" 3054 strm = io.BytesIO() 3055 zf = zipfile.ZipFile(strm, "w") 3056 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 3057 zf.writestr(entry, entry) 3058 zf.mode = 'r' 3059 return zf 3060 3061 def test_joinpath_constant_time(self): 3062 """ 3063 Ensure joinpath on items in zipfile is linear time. 3064 """ 3065 root = zipfile.Path(self.huge_zipfile()) 3066 entries = jaraco.itertools.Counter(root.iterdir()) 3067 for entry in entries: 3068 entry.joinpath('suffix') 3069 # Check the file iterated all items 3070 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 3071 3072 # @func_timeout.func_set_timeout(3) 3073 def test_implied_dirs_performance(self): 3074 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 3075 zipfile.CompleteDirs._implied_dirs(data) 3076 3077 @pass_alpharep 3078 def test_read_does_not_close(self, alpharep): 3079 alpharep = self.zipfile_ondisk(alpharep) 3080 with zipfile.ZipFile(alpharep) as file: 3081 for rep in range(2): 3082 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 3083 3084 @pass_alpharep 3085 def test_subclass(self, alpharep): 3086 class Subclass(zipfile.Path): 3087 pass 3088 3089 root = Subclass(alpharep) 3090 assert isinstance(root / 'b', Subclass) 3091 3092 @pass_alpharep 3093 def test_filename(self, alpharep): 3094 root = zipfile.Path(alpharep) 3095 assert root.filename == pathlib.Path('alpharep.zip') 3096 3097 @pass_alpharep 3098 def test_root_name(self, alpharep): 3099 """ 3100 The name of the root should be the name of the zipfile 3101 """ 3102 root = zipfile.Path(alpharep) 3103 assert root.name == 'alpharep.zip' == root.filename.name 3104 3105 @pass_alpharep 3106 def test_root_parent(self, alpharep): 3107 root = zipfile.Path(alpharep) 3108 assert root.parent == pathlib.Path('.') 3109 root.root.filename = 'foo/bar.zip' 3110 assert root.parent == pathlib.Path('foo') 3111 3112 @pass_alpharep 3113 def test_root_unnamed(self, alpharep): 3114 """ 3115 It is an error to attempt to get the name 3116 or parent of an unnamed zipfile. 3117 """ 3118 alpharep.filename = None 3119 root = zipfile.Path(alpharep) 3120 with self.assertRaises(TypeError): 3121 root.name 3122 with self.assertRaises(TypeError): 3123 root.parent 3124 3125 # .name and .parent should still work on subs 3126 sub = root / "b" 3127 assert sub.name == "b" 3128 assert sub.parent 3129 3130 @pass_alpharep 3131 def test_inheritance(self, alpharep): 3132 cls = type('PathChild', (zipfile.Path,), {}) 3133 for alpharep in self.zipfile_alpharep(): 3134 file = cls(alpharep).joinpath('some dir').parent 3135 assert isinstance(file, cls) 3136 3137 3138if __name__ == "__main__": 3139 unittest.main() 3140