1import io 2import itertools 3import contextlib 4import pathlib 5import pickle 6import stat 7import sys 8import time 9import unittest 10import zipfile 11import zipfile._path 12 13from test.support.os_helper import temp_dir, FakePath 14 15from ._functools import compose 16from ._itertools import Counter 17 18from ._test_params import parameterize, Invoked 19 20 21class jaraco: 22 class itertools: 23 Counter = Counter 24 25 26def _make_link(info: zipfile.ZipInfo): # type: ignore[name-defined] 27 info.external_attr |= stat.S_IFLNK << 16 28 29 30def build_alpharep_fixture(): 31 """ 32 Create a zip file with this structure: 33 34 . 35 ├── a.txt 36 ├── n.txt (-> a.txt) 37 ├── b 38 │ ├── c.txt 39 │ ├── d 40 │ │ └── e.txt 41 │ └── f.txt 42 ├── g 43 │ └── h 44 │ └── i.txt 45 └── j 46 ├── k.bin 47 ├── l.baz 48 └── m.bar 49 50 This fixture has the following key characteristics: 51 52 - a file at the root (a) 53 - a file two levels deep (b/d/e) 54 - multiple files in a directory (b/c, b/f) 55 - a directory containing only a directory (g/h) 56 - a directory with files of different extensions (j/klm) 57 - a symlink (n) pointing to (a) 58 59 "alpha" because it uses alphabet 60 "rep" because it's a representative example 61 """ 62 data = io.BytesIO() 63 zf = zipfile.ZipFile(data, "w") 64 zf.writestr("a.txt", b"content of a") 65 zf.writestr("b/c.txt", b"content of c") 66 zf.writestr("b/d/e.txt", b"content of e") 67 zf.writestr("b/f.txt", b"content of f") 68 zf.writestr("g/h/i.txt", b"content of i") 69 zf.writestr("j/k.bin", b"content of k") 70 zf.writestr("j/l.baz", b"content of l") 71 zf.writestr("j/m.bar", b"content of m") 72 zf.writestr("n.txt", b"a.txt") 73 _make_link(zf.infolist()[-1]) 74 75 zf.filename = "alpharep.zip" 76 return zf 77 78 79alpharep_generators = [ 80 Invoked.wrap(build_alpharep_fixture), 81 Invoked.wrap(compose(zipfile._path.CompleteDirs.inject, build_alpharep_fixture)), 82] 83 84pass_alpharep = parameterize(['alpharep'], alpharep_generators) 85 86 87class TestPath(unittest.TestCase): 88 def setUp(self): 89 self.fixtures = contextlib.ExitStack() 90 self.addCleanup(self.fixtures.close) 91 92 def zipfile_ondisk(self, alpharep): 93 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 94 buffer = alpharep.fp 95 alpharep.close() 96 path = tmpdir / alpharep.filename 97 with path.open("wb") as strm: 98 strm.write(buffer.getvalue()) 99 return path 100 101 @pass_alpharep 102 def test_iterdir_and_types(self, alpharep): 103 root = zipfile.Path(alpharep) 104 assert root.is_dir() 105 a, n, b, g, j = root.iterdir() 106 assert a.is_file() 107 assert b.is_dir() 108 assert g.is_dir() 109 c, f, d = b.iterdir() 110 assert c.is_file() and f.is_file() 111 (e,) = d.iterdir() 112 assert e.is_file() 113 (h,) = g.iterdir() 114 (i,) = h.iterdir() 115 assert i.is_file() 116 117 @pass_alpharep 118 def test_is_file_missing(self, alpharep): 119 root = zipfile.Path(alpharep) 120 assert not root.joinpath('missing.txt').is_file() 121 122 @pass_alpharep 123 def test_iterdir_on_file(self, alpharep): 124 root = zipfile.Path(alpharep) 125 a, n, b, g, j = root.iterdir() 126 with self.assertRaises(ValueError): 127 a.iterdir() 128 129 @pass_alpharep 130 def test_subdir_is_dir(self, alpharep): 131 root = zipfile.Path(alpharep) 132 assert (root / 'b').is_dir() 133 assert (root / 'b/').is_dir() 134 assert (root / 'g').is_dir() 135 assert (root / 'g/').is_dir() 136 137 @pass_alpharep 138 def test_open(self, alpharep): 139 root = zipfile.Path(alpharep) 140 a, n, b, g, j = root.iterdir() 141 with a.open(encoding="utf-8") as strm: 142 data = strm.read() 143 self.assertEqual(data, "content of a") 144 with a.open('r', "utf-8") as strm: # not a kw, no gh-101144 TypeError 145 data = strm.read() 146 self.assertEqual(data, "content of a") 147 148 def test_open_encoding_utf16(self): 149 in_memory_file = io.BytesIO() 150 zf = zipfile.ZipFile(in_memory_file, "w") 151 zf.writestr("path/16.txt", "This was utf-16".encode("utf-16")) 152 zf.filename = "test_open_utf16.zip" 153 root = zipfile.Path(zf) 154 (path,) = root.iterdir() 155 u16 = path.joinpath("16.txt") 156 with u16.open('r', "utf-16") as strm: 157 data = strm.read() 158 assert data == "This was utf-16" 159 with u16.open(encoding="utf-16") as strm: 160 data = strm.read() 161 assert data == "This was utf-16" 162 163 def test_open_encoding_errors(self): 164 in_memory_file = io.BytesIO() 165 zf = zipfile.ZipFile(in_memory_file, "w") 166 zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.") 167 zf.filename = "test_read_text_encoding_errors.zip" 168 root = zipfile.Path(zf) 169 (path,) = root.iterdir() 170 u16 = path.joinpath("bad-utf8.bin") 171 172 # encoding= as a positional argument for gh-101144. 173 data = u16.read_text("utf-8", errors="ignore") 174 assert data == "invalid utf-8: ." 175 with u16.open("r", "utf-8", errors="surrogateescape") as f: 176 assert f.read() == "invalid utf-8: \udcff\udcff." 177 178 # encoding= both positional and keyword is an error; gh-101144. 179 with self.assertRaisesRegex(TypeError, "encoding"): 180 data = u16.read_text("utf-8", encoding="utf-8") 181 182 # both keyword arguments work. 183 with u16.open("r", encoding="utf-8", errors="strict") as f: 184 # error during decoding with wrong codec. 185 with self.assertRaises(UnicodeDecodeError): 186 f.read() 187 188 @unittest.skipIf( 189 not getattr(sys.flags, 'warn_default_encoding', 0), 190 "Requires warn_default_encoding", 191 ) 192 @pass_alpharep 193 def test_encoding_warnings(self, alpharep): 194 """EncodingWarning must blame the read_text and open calls.""" 195 assert sys.flags.warn_default_encoding 196 root = zipfile.Path(alpharep) 197 with self.assertWarns(EncodingWarning) as wc: 198 root.joinpath("a.txt").read_text() 199 assert __file__ == wc.filename 200 with self.assertWarns(EncodingWarning) as wc: 201 root.joinpath("a.txt").open("r").close() 202 assert __file__ == wc.filename 203 204 def test_open_write(self): 205 """ 206 If the zipfile is open for write, it should be possible to 207 write bytes or text to it. 208 """ 209 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 210 with zf.joinpath('file.bin').open('wb') as strm: 211 strm.write(b'binary contents') 212 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 213 strm.write('text file') 214 215 @pass_alpharep 216 def test_open_extant_directory(self, alpharep): 217 """ 218 Attempting to open a directory raises IsADirectoryError. 219 """ 220 zf = zipfile.Path(alpharep) 221 with self.assertRaises(IsADirectoryError): 222 zf.joinpath('b').open() 223 224 @pass_alpharep 225 def test_open_binary_invalid_args(self, alpharep): 226 root = zipfile.Path(alpharep) 227 with self.assertRaises(ValueError): 228 root.joinpath('a.txt').open('rb', encoding='utf-8') 229 with self.assertRaises(ValueError): 230 root.joinpath('a.txt').open('rb', 'utf-8') 231 232 @pass_alpharep 233 def test_open_missing_directory(self, alpharep): 234 """ 235 Attempting to open a missing directory raises FileNotFoundError. 236 """ 237 zf = zipfile.Path(alpharep) 238 with self.assertRaises(FileNotFoundError): 239 zf.joinpath('z').open() 240 241 @pass_alpharep 242 def test_read(self, alpharep): 243 root = zipfile.Path(alpharep) 244 a, n, b, g, j = root.iterdir() 245 assert a.read_text(encoding="utf-8") == "content of a" 246 # Also check positional encoding arg (gh-101144). 247 assert a.read_text("utf-8") == "content of a" 248 assert a.read_bytes() == b"content of a" 249 250 @pass_alpharep 251 def test_joinpath(self, alpharep): 252 root = zipfile.Path(alpharep) 253 a = root.joinpath("a.txt") 254 assert a.is_file() 255 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 256 assert e.read_text(encoding="utf-8") == "content of e" 257 258 @pass_alpharep 259 def test_joinpath_multiple(self, alpharep): 260 root = zipfile.Path(alpharep) 261 e = root.joinpath("b", "d", "e.txt") 262 assert e.read_text(encoding="utf-8") == "content of e" 263 264 @pass_alpharep 265 def test_traverse_truediv(self, alpharep): 266 root = zipfile.Path(alpharep) 267 a = root / "a.txt" 268 assert a.is_file() 269 e = root / "b" / "d" / "e.txt" 270 assert e.read_text(encoding="utf-8") == "content of e" 271 272 @pass_alpharep 273 def test_pathlike_construction(self, alpharep): 274 """ 275 zipfile.Path should be constructable from a path-like object 276 """ 277 zipfile_ondisk = self.zipfile_ondisk(alpharep) 278 pathlike = FakePath(str(zipfile_ondisk)) 279 zipfile.Path(pathlike) 280 281 @pass_alpharep 282 def test_traverse_pathlike(self, alpharep): 283 root = zipfile.Path(alpharep) 284 root / FakePath("a") 285 286 @pass_alpharep 287 def test_parent(self, alpharep): 288 root = zipfile.Path(alpharep) 289 assert (root / 'a').parent.at == '' 290 assert (root / 'a' / 'b').parent.at == 'a/' 291 292 @pass_alpharep 293 def test_dir_parent(self, alpharep): 294 root = zipfile.Path(alpharep) 295 assert (root / 'b').parent.at == '' 296 assert (root / 'b/').parent.at == '' 297 298 @pass_alpharep 299 def test_missing_dir_parent(self, alpharep): 300 root = zipfile.Path(alpharep) 301 assert (root / 'missing dir/').parent.at == '' 302 303 @pass_alpharep 304 def test_mutability(self, alpharep): 305 """ 306 If the underlying zipfile is changed, the Path object should 307 reflect that change. 308 """ 309 root = zipfile.Path(alpharep) 310 a, n, b, g, j = root.iterdir() 311 alpharep.writestr('foo.txt', 'foo') 312 alpharep.writestr('bar/baz.txt', 'baz') 313 assert any(child.name == 'foo.txt' for child in root.iterdir()) 314 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 315 (baz,) = (root / 'bar').iterdir() 316 assert baz.read_text(encoding="utf-8") == 'baz' 317 318 HUGE_ZIPFILE_NUM_ENTRIES = 2**13 319 320 def huge_zipfile(self): 321 """Create a read-only zipfile with a huge number of entries entries.""" 322 strm = io.BytesIO() 323 zf = zipfile.ZipFile(strm, "w") 324 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 325 zf.writestr(entry, entry) 326 zf.mode = 'r' 327 return zf 328 329 def test_joinpath_constant_time(self): 330 """ 331 Ensure joinpath on items in zipfile is linear time. 332 """ 333 root = zipfile.Path(self.huge_zipfile()) 334 entries = jaraco.itertools.Counter(root.iterdir()) 335 for entry in entries: 336 entry.joinpath('suffix') 337 # Check the file iterated all items 338 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 339 340 @pass_alpharep 341 def test_read_does_not_close(self, alpharep): 342 alpharep = self.zipfile_ondisk(alpharep) 343 with zipfile.ZipFile(alpharep) as file: 344 for rep in range(2): 345 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 346 347 @pass_alpharep 348 def test_subclass(self, alpharep): 349 class Subclass(zipfile.Path): 350 pass 351 352 root = Subclass(alpharep) 353 assert isinstance(root / 'b', Subclass) 354 355 @pass_alpharep 356 def test_filename(self, alpharep): 357 root = zipfile.Path(alpharep) 358 assert root.filename == pathlib.Path('alpharep.zip') 359 360 @pass_alpharep 361 def test_root_name(self, alpharep): 362 """ 363 The name of the root should be the name of the zipfile 364 """ 365 root = zipfile.Path(alpharep) 366 assert root.name == 'alpharep.zip' == root.filename.name 367 368 @pass_alpharep 369 def test_suffix(self, alpharep): 370 """ 371 The suffix of the root should be the suffix of the zipfile. 372 The suffix of each nested file is the final component's last suffix, if any. 373 Includes the leading period, just like pathlib.Path. 374 """ 375 root = zipfile.Path(alpharep) 376 assert root.suffix == '.zip' == root.filename.suffix 377 378 b = root / "b.txt" 379 assert b.suffix == ".txt" 380 381 c = root / "c" / "filename.tar.gz" 382 assert c.suffix == ".gz" 383 384 d = root / "d" 385 assert d.suffix == "" 386 387 @pass_alpharep 388 def test_suffixes(self, alpharep): 389 """ 390 The suffix of the root should be the suffix of the zipfile. 391 The suffix of each nested file is the final component's last suffix, if any. 392 Includes the leading period, just like pathlib.Path. 393 """ 394 root = zipfile.Path(alpharep) 395 assert root.suffixes == ['.zip'] == root.filename.suffixes 396 397 b = root / 'b.txt' 398 assert b.suffixes == ['.txt'] 399 400 c = root / 'c' / 'filename.tar.gz' 401 assert c.suffixes == ['.tar', '.gz'] 402 403 d = root / 'd' 404 assert d.suffixes == [] 405 406 e = root / '.hgrc' 407 assert e.suffixes == [] 408 409 @pass_alpharep 410 def test_suffix_no_filename(self, alpharep): 411 alpharep.filename = None 412 root = zipfile.Path(alpharep) 413 assert root.joinpath('example').suffix == "" 414 assert root.joinpath('example').suffixes == [] 415 416 @pass_alpharep 417 def test_stem(self, alpharep): 418 """ 419 The final path component, without its suffix 420 """ 421 root = zipfile.Path(alpharep) 422 assert root.stem == 'alpharep' == root.filename.stem 423 424 b = root / "b.txt" 425 assert b.stem == "b" 426 427 c = root / "c" / "filename.tar.gz" 428 assert c.stem == "filename.tar" 429 430 d = root / "d" 431 assert d.stem == "d" 432 433 assert (root / ".gitignore").stem == ".gitignore" 434 435 @pass_alpharep 436 def test_root_parent(self, alpharep): 437 root = zipfile.Path(alpharep) 438 assert root.parent == pathlib.Path('.') 439 root.root.filename = 'foo/bar.zip' 440 assert root.parent == pathlib.Path('foo') 441 442 @pass_alpharep 443 def test_root_unnamed(self, alpharep): 444 """ 445 It is an error to attempt to get the name 446 or parent of an unnamed zipfile. 447 """ 448 alpharep.filename = None 449 root = zipfile.Path(alpharep) 450 with self.assertRaises(TypeError): 451 root.name 452 with self.assertRaises(TypeError): 453 root.parent 454 455 # .name and .parent should still work on subs 456 sub = root / "b" 457 assert sub.name == "b" 458 assert sub.parent 459 460 @pass_alpharep 461 def test_match_and_glob(self, alpharep): 462 root = zipfile.Path(alpharep) 463 assert not root.match("*.txt") 464 465 assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")] 466 assert list(root.glob("b/*.txt")) == [ 467 zipfile.Path(alpharep, "b/c.txt"), 468 zipfile.Path(alpharep, "b/f.txt"), 469 ] 470 471 @pass_alpharep 472 def test_glob_recursive(self, alpharep): 473 root = zipfile.Path(alpharep) 474 files = root.glob("**/*.txt") 475 assert all(each.match("*.txt") for each in files) 476 477 assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt")) 478 479 @pass_alpharep 480 def test_glob_dirs(self, alpharep): 481 root = zipfile.Path(alpharep) 482 assert list(root.glob('b')) == [zipfile.Path(alpharep, "b/")] 483 assert list(root.glob('b*')) == [zipfile.Path(alpharep, "b/")] 484 485 @pass_alpharep 486 def test_glob_subdir(self, alpharep): 487 root = zipfile.Path(alpharep) 488 assert list(root.glob('g/h')) == [zipfile.Path(alpharep, "g/h/")] 489 assert list(root.glob('g*/h*')) == [zipfile.Path(alpharep, "g/h/")] 490 491 @pass_alpharep 492 def test_glob_subdirs(self, alpharep): 493 root = zipfile.Path(alpharep) 494 495 assert list(root.glob("*/i.txt")) == [] 496 assert list(root.rglob("*/i.txt")) == [zipfile.Path(alpharep, "g/h/i.txt")] 497 498 @pass_alpharep 499 def test_glob_does_not_overmatch_dot(self, alpharep): 500 root = zipfile.Path(alpharep) 501 502 assert list(root.glob("*.xt")) == [] 503 504 @pass_alpharep 505 def test_glob_single_char(self, alpharep): 506 root = zipfile.Path(alpharep) 507 508 assert list(root.glob("a?txt")) == [zipfile.Path(alpharep, "a.txt")] 509 assert list(root.glob("a[.]txt")) == [zipfile.Path(alpharep, "a.txt")] 510 assert list(root.glob("a[?]txt")) == [] 511 512 @pass_alpharep 513 def test_glob_chars(self, alpharep): 514 root = zipfile.Path(alpharep) 515 516 assert list(root.glob("j/?.b[ai][nz]")) == [ 517 zipfile.Path(alpharep, "j/k.bin"), 518 zipfile.Path(alpharep, "j/l.baz"), 519 ] 520 521 def test_glob_empty(self): 522 root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w')) 523 with self.assertRaises(ValueError): 524 root.glob('') 525 526 @pass_alpharep 527 def test_eq_hash(self, alpharep): 528 root = zipfile.Path(alpharep) 529 assert root == zipfile.Path(alpharep) 530 531 assert root != (root / "a.txt") 532 assert (root / "a.txt") == (root / "a.txt") 533 534 root = zipfile.Path(alpharep) 535 assert root in {root} 536 537 @pass_alpharep 538 def test_is_symlink(self, alpharep): 539 root = zipfile.Path(alpharep) 540 assert not root.joinpath('a.txt').is_symlink() 541 assert root.joinpath('n.txt').is_symlink() 542 543 @pass_alpharep 544 def test_relative_to(self, alpharep): 545 root = zipfile.Path(alpharep) 546 relative = root.joinpath("b", "c.txt").relative_to(root / "b") 547 assert str(relative) == "c.txt" 548 549 relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b") 550 assert str(relative) == "d/e.txt" 551 552 @pass_alpharep 553 def test_inheritance(self, alpharep): 554 cls = type('PathChild', (zipfile.Path,), {}) 555 file = cls(alpharep).joinpath('some dir').parent 556 assert isinstance(file, cls) 557 558 @parameterize( 559 ['alpharep', 'path_type', 'subpath'], 560 itertools.product( 561 alpharep_generators, 562 [str, FakePath], 563 ['', 'b/'], 564 ), 565 ) 566 def test_pickle(self, alpharep, path_type, subpath): 567 zipfile_ondisk = path_type(str(self.zipfile_ondisk(alpharep))) 568 569 saved_1 = pickle.dumps(zipfile.Path(zipfile_ondisk, at=subpath)) 570 restored_1 = pickle.loads(saved_1) 571 first, *rest = restored_1.iterdir() 572 assert first.read_text(encoding='utf-8').startswith('content of ') 573 574 @pass_alpharep 575 def test_extract_orig_with_implied_dirs(self, alpharep): 576 """ 577 A zip file wrapped in a Path should extract even with implied dirs. 578 """ 579 source_path = self.zipfile_ondisk(alpharep) 580 zf = zipfile.ZipFile(source_path) 581 # wrap the zipfile for its side effect 582 zipfile.Path(zf) 583 zf.extractall(source_path.parent) 584 585 @pass_alpharep 586 def test_getinfo_missing(self, alpharep): 587 """ 588 Validate behavior of getinfo on original zipfile after wrapping. 589 """ 590 zipfile.Path(alpharep) 591 with self.assertRaises(KeyError): 592 alpharep.getinfo('does-not-exist') 593 594 def test_malformed_paths(self): 595 """ 596 Path should handle malformed paths gracefully. 597 598 Paths with leading slashes are not visible. 599 600 Paths with dots are treated like regular files. 601 """ 602 data = io.BytesIO() 603 zf = zipfile.ZipFile(data, "w") 604 zf.writestr("/one-slash.txt", b"content") 605 zf.writestr("//two-slash.txt", b"content") 606 zf.writestr("../parent.txt", b"content") 607 zf.filename = '' 608 root = zipfile.Path(zf) 609 assert list(map(str, root.iterdir())) == ['../'] 610 assert root.joinpath('..').joinpath('parent.txt').read_bytes() == b'content' 611 612 def test_unsupported_names(self): 613 """ 614 Path segments with special characters are readable. 615 616 On some platforms or file systems, characters like 617 ``:`` and ``?`` are not allowed, but they are valid 618 in the zip file. 619 """ 620 data = io.BytesIO() 621 zf = zipfile.ZipFile(data, "w") 622 zf.writestr("path?", b"content") 623 zf.writestr("V: NMS.flac", b"fLaC...") 624 zf.filename = '' 625 root = zipfile.Path(zf) 626 contents = root.iterdir() 627 assert next(contents).name == 'path?' 628 assert next(contents).name == 'V: NMS.flac' 629 assert root.joinpath('V: NMS.flac').read_bytes() == b"fLaC..." 630 631 def test_backslash_not_separator(self): 632 """ 633 In a zip file, backslashes are not separators. 634 """ 635 data = io.BytesIO() 636 zf = zipfile.ZipFile(data, "w") 637 zf.writestr(DirtyZipInfo.for_name("foo\\bar", zf), b"content") 638 zf.filename = '' 639 root = zipfile.Path(zf) 640 (first,) = root.iterdir() 641 assert not first.is_dir() 642 assert first.name == 'foo\\bar' 643 644 @pass_alpharep 645 def test_interface(self, alpharep): 646 from importlib.resources.abc import Traversable 647 648 zf = zipfile.Path(alpharep) 649 assert isinstance(zf, Traversable) 650 651 652class DirtyZipInfo(zipfile.ZipInfo): 653 """ 654 Bypass name sanitization. 655 """ 656 657 def __init__(self, filename, *args, **kwargs): 658 super().__init__(filename, *args, **kwargs) 659 self.filename = filename 660 661 @classmethod 662 def for_name(cls, name, archive): 663 """ 664 Construct the same way that ZipFile.writestr does. 665 666 TODO: extract this functionality and re-use 667 """ 668 self = cls(filename=name, date_time=time.localtime(time.time())[:6]) 669 self.compress_type = archive.compression 670 self.compress_level = archive.compresslevel 671 if self.filename.endswith('/'): # pragma: no cover 672 self.external_attr = 0o40775 << 16 # drwxrwxr-x 673 self.external_attr |= 0x10 # MS-DOS directory flag 674 else: 675 self.external_attr = 0o600 << 16 # ?rw------- 676 return self 677