1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8import shutil 9import re 10import warnings 11import stat 12 13import unittest 14import unittest.mock 15import tarfile 16 17from test import support 18from test.support import os_helper 19from test.support import script_helper 20from test.support import warnings_helper 21 22# Check for our compression modules. 23try: 24 import gzip 25except ImportError: 26 gzip = None 27try: 28 import zlib 29except ImportError: 30 zlib = None 31try: 32 import bz2 33except ImportError: 34 bz2 = None 35try: 36 import lzma 37except ImportError: 38 lzma = None 39 40def sha256sum(data): 41 return sha256(data).hexdigest() 42 43TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir" 44tarextdir = TEMPDIR + '-extract-test' 45tarname = support.findfile("testtar.tar") 46gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 47bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 48xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 49tmpname = os.path.join(TEMPDIR, "tmp.tar") 50dotlessname = os.path.join(TEMPDIR, "testtar") 51SPACE = b" " 52 53sha256_regtype = ( 54 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 55) 56sha256_sparse = ( 57 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 58) 59 60 61class TarTest: 62 tarname = tarname 63 suffix = '' 64 open = io.FileIO 65 taropen = tarfile.TarFile.taropen 66 67 @property 68 def mode(self): 69 return self.prefix + self.suffix 70 71@support.requires_gzip() 72class GzipTest: 73 tarname = gzipname 74 suffix = 'gz' 75 open = gzip.GzipFile if gzip else None 76 taropen = tarfile.TarFile.gzopen 77 78@support.requires_bz2() 79class Bz2Test: 80 tarname = bz2name 81 suffix = 'bz2' 82 open = bz2.BZ2File if bz2 else None 83 taropen = tarfile.TarFile.bz2open 84 85@support.requires_lzma() 86class LzmaTest: 87 tarname = xzname 88 suffix = 'xz' 89 open = lzma.LZMAFile if lzma else None 90 taropen = tarfile.TarFile.xzopen 91 92 93class ReadTest(TarTest): 94 95 prefix = "r:" 96 97 def setUp(self): 98 self.tar = tarfile.open(self.tarname, mode=self.mode, 99 encoding="iso8859-1") 100 101 def tearDown(self): 102 self.tar.close() 103 104 105class UstarReadTest(ReadTest, unittest.TestCase): 106 107 def test_fileobj_regular_file(self): 108 tarinfo = self.tar.getmember("ustar/regtype") 109 with self.tar.extractfile(tarinfo) as fobj: 110 data = fobj.read() 111 self.assertEqual(len(data), tarinfo.size, 112 "regular file extraction failed") 113 self.assertEqual(sha256sum(data), sha256_regtype, 114 "regular file extraction failed") 115 116 def test_fileobj_readlines(self): 117 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 118 tarinfo = self.tar.getmember("ustar/regtype") 119 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 120 lines1 = fobj1.readlines() 121 122 with self.tar.extractfile(tarinfo) as fobj: 123 fobj2 = io.TextIOWrapper(fobj) 124 lines2 = fobj2.readlines() 125 self.assertEqual(lines1, lines2, 126 "fileobj.readlines() failed") 127 self.assertEqual(len(lines2), 114, 128 "fileobj.readlines() failed") 129 self.assertEqual(lines2[83], 130 "I will gladly admit that Python is not the fastest " 131 "running scripting language.\n", 132 "fileobj.readlines() failed") 133 134 def test_fileobj_iter(self): 135 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 136 tarinfo = self.tar.getmember("ustar/regtype") 137 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 138 lines1 = fobj1.readlines() 139 with self.tar.extractfile(tarinfo) as fobj2: 140 lines2 = list(io.TextIOWrapper(fobj2)) 141 self.assertEqual(lines1, lines2, 142 "fileobj.__iter__() failed") 143 144 def test_fileobj_seek(self): 145 self.tar.extract("ustar/regtype", TEMPDIR, 146 filter='data') 147 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 148 data = fobj.read() 149 150 tarinfo = self.tar.getmember("ustar/regtype") 151 with self.tar.extractfile(tarinfo) as fobj: 152 text = fobj.read() 153 fobj.seek(0) 154 self.assertEqual(0, fobj.tell(), 155 "seek() to file's start failed") 156 fobj.seek(2048, 0) 157 self.assertEqual(2048, fobj.tell(), 158 "seek() to absolute position failed") 159 fobj.seek(-1024, 1) 160 self.assertEqual(1024, fobj.tell(), 161 "seek() to negative relative position failed") 162 fobj.seek(1024, 1) 163 self.assertEqual(2048, fobj.tell(), 164 "seek() to positive relative position failed") 165 s = fobj.read(10) 166 self.assertEqual(s, data[2048:2058], 167 "read() after seek failed") 168 fobj.seek(0, 2) 169 self.assertEqual(tarinfo.size, fobj.tell(), 170 "seek() to file's end failed") 171 self.assertEqual(fobj.read(), b"", 172 "read() at file's end did not return empty string") 173 fobj.seek(-tarinfo.size, 2) 174 self.assertEqual(0, fobj.tell(), 175 "relative seek() to file's end failed") 176 fobj.seek(512) 177 s1 = fobj.readlines() 178 fobj.seek(512) 179 s2 = fobj.readlines() 180 self.assertEqual(s1, s2, 181 "readlines() after seek failed") 182 fobj.seek(0) 183 self.assertEqual(len(fobj.readline()), fobj.tell(), 184 "tell() after readline() failed") 185 fobj.seek(512) 186 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 187 "tell() after seek() and readline() failed") 188 fobj.seek(0) 189 line = fobj.readline() 190 self.assertEqual(fobj.read(), data[len(line):], 191 "read() after readline() failed") 192 193 def test_fileobj_text(self): 194 with self.tar.extractfile("ustar/regtype") as fobj: 195 fobj = io.TextIOWrapper(fobj) 196 data = fobj.read().encode("iso8859-1") 197 self.assertEqual(sha256sum(data), sha256_regtype) 198 try: 199 fobj.seek(100) 200 except AttributeError: 201 # Issue #13815: seek() complained about a missing 202 # flush() method. 203 self.fail("seeking failed in text mode") 204 205 # Test if symbolic and hard links are resolved by extractfile(). The 206 # test link members each point to a regular member whose data is 207 # supposed to be exported. 208 def _test_fileobj_link(self, lnktype, regtype): 209 with self.tar.extractfile(lnktype) as a, \ 210 self.tar.extractfile(regtype) as b: 211 self.assertEqual(a.name, b.name) 212 213 def test_fileobj_link1(self): 214 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 215 216 def test_fileobj_link2(self): 217 self._test_fileobj_link("./ustar/linktest2/lnktype", 218 "ustar/linktest1/regtype") 219 220 def test_fileobj_symlink1(self): 221 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 222 223 def test_fileobj_symlink2(self): 224 self._test_fileobj_link("./ustar/linktest2/symtype", 225 "ustar/linktest1/regtype") 226 227 def test_issue14160(self): 228 self._test_fileobj_link("symtype2", "ustar/regtype") 229 230 def test_add_dir_getmember(self): 231 # bpo-21987 232 self.add_dir_and_getmember('bar') 233 self.add_dir_and_getmember('a'*101) 234 235 @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"), 236 "Missing getuid or getgid implementation") 237 def add_dir_and_getmember(self, name): 238 def filter(tarinfo): 239 tarinfo.uid = tarinfo.gid = 100 240 return tarinfo 241 242 with os_helper.temp_cwd(): 243 with tarfile.open(tmpname, 'w') as tar: 244 tar.format = tarfile.USTAR_FORMAT 245 try: 246 os.mkdir(name) 247 tar.add(name, filter=filter) 248 finally: 249 os.rmdir(name) 250 with tarfile.open(tmpname) as tar: 251 self.assertEqual( 252 tar.getmember(name), 253 tar.getmember(name + '/') 254 ) 255 256class GzipUstarReadTest(GzipTest, UstarReadTest): 257 pass 258 259class Bz2UstarReadTest(Bz2Test, UstarReadTest): 260 pass 261 262class LzmaUstarReadTest(LzmaTest, UstarReadTest): 263 pass 264 265 266class ListTest(ReadTest, unittest.TestCase): 267 268 # Override setUp to use default encoding (UTF-8) 269 def setUp(self): 270 self.tar = tarfile.open(self.tarname, mode=self.mode) 271 272 def test_list(self): 273 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 274 with support.swap_attr(sys, 'stdout', tio): 275 self.tar.list(verbose=False) 276 out = tio.detach().getvalue() 277 self.assertIn(b'ustar/conttype', out) 278 self.assertIn(b'ustar/regtype', out) 279 self.assertIn(b'ustar/lnktype', out) 280 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 281 self.assertIn(b'./ustar/linktest2/symtype', out) 282 self.assertIn(b'./ustar/linktest2/lnktype', out) 283 # Make sure it puts trailing slash for directory 284 self.assertIn(b'ustar/dirtype/', out) 285 self.assertIn(b'ustar/dirtype-with-size/', out) 286 # Make sure it is able to print unencodable characters 287 def conv(b): 288 s = b.decode(self.tar.encoding, 'surrogateescape') 289 return s.encode('ascii', 'backslashreplace') 290 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 291 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 292 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 293 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 294 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 295 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 296 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 297 # Make sure it prints files separated by one newline without any 298 # 'ls -l'-like accessories if verbose flag is not being used 299 # ... 300 # ustar/conttype 301 # ustar/regtype 302 # ... 303 self.assertRegex(out, br'ustar/conttype ?\r?\n' 304 br'ustar/regtype ?\r?\n') 305 # Make sure it does not print the source of link without verbose flag 306 self.assertNotIn(b'link to', out) 307 self.assertNotIn(b'->', out) 308 309 def test_list_verbose(self): 310 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 311 with support.swap_attr(sys, 'stdout', tio): 312 self.tar.list(verbose=True) 313 out = tio.detach().getvalue() 314 # Make sure it prints files separated by one newline with 'ls -l'-like 315 # accessories if verbose flag is being used 316 # ... 317 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 318 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 319 # ... 320 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 321 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 322 br'ustar/\w+type ?\r?\n') * 2) 323 # Make sure it prints the source of link with verbose flag 324 self.assertIn(b'ustar/symtype -> regtype', out) 325 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 326 self.assertIn(b'./ustar/linktest2/lnktype link to ' 327 b'./ustar/linktest1/regtype', out) 328 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 329 (b'/123' * 125) + b'/longname', out) 330 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 331 (b'/123' * 125) + b'/longname', out) 332 333 def test_list_members(self): 334 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 335 def members(tar): 336 for tarinfo in tar.getmembers(): 337 if 'reg' in tarinfo.name: 338 yield tarinfo 339 with support.swap_attr(sys, 'stdout', tio): 340 self.tar.list(verbose=False, members=members(self.tar)) 341 out = tio.detach().getvalue() 342 self.assertIn(b'ustar/regtype', out) 343 self.assertNotIn(b'ustar/conttype', out) 344 345 346class GzipListTest(GzipTest, ListTest): 347 pass 348 349 350class Bz2ListTest(Bz2Test, ListTest): 351 pass 352 353 354class LzmaListTest(LzmaTest, ListTest): 355 pass 356 357 358class CommonReadTest(ReadTest): 359 360 def test_is_tarfile_erroneous(self): 361 with open(tmpname, "wb"): 362 pass 363 364 # is_tarfile works on filenames 365 self.assertFalse(tarfile.is_tarfile(tmpname)) 366 367 # is_tarfile works on path-like objects 368 self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname))) 369 370 # is_tarfile works on file objects 371 with open(tmpname, "rb") as fobj: 372 self.assertFalse(tarfile.is_tarfile(fobj)) 373 374 # is_tarfile works on file-like objects 375 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 376 377 def test_is_tarfile_valid(self): 378 # is_tarfile works on filenames 379 self.assertTrue(tarfile.is_tarfile(self.tarname)) 380 381 # is_tarfile works on path-like objects 382 self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname))) 383 384 # is_tarfile works on file objects 385 with open(self.tarname, "rb") as fobj: 386 self.assertTrue(tarfile.is_tarfile(fobj)) 387 388 # is_tarfile works on file-like objects 389 with open(self.tarname, "rb") as fobj: 390 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 391 392 def test_is_tarfile_keeps_position(self): 393 # Test for issue44289: tarfile.is_tarfile() modifies 394 # file object's current position 395 with open(self.tarname, "rb") as fobj: 396 tarfile.is_tarfile(fobj) 397 self.assertEqual(fobj.tell(), 0) 398 399 with open(self.tarname, "rb") as fobj: 400 file_like = io.BytesIO(fobj.read()) 401 tarfile.is_tarfile(file_like) 402 self.assertEqual(file_like.tell(), 0) 403 404 def test_empty_tarfile(self): 405 # Test for issue6123: Allow opening empty archives. 406 # This test checks if tarfile.open() is able to open an empty tar 407 # archive successfully. Note that an empty tar archive is not the 408 # same as an empty file! 409 with tarfile.open(tmpname, self.mode.replace("r", "w")): 410 pass 411 try: 412 tar = tarfile.open(tmpname, self.mode) 413 tar.getnames() 414 except tarfile.ReadError: 415 self.fail("tarfile.open() failed on empty archive") 416 else: 417 self.assertListEqual(tar.getmembers(), []) 418 finally: 419 tar.close() 420 421 def test_non_existent_tarfile(self): 422 # Test for issue11513: prevent non-existent gzipped tarfiles raising 423 # multiple exceptions. 424 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 425 tarfile.open("xxx", self.mode) 426 427 def test_null_tarfile(self): 428 # Test for issue6123: Allow opening empty archives. 429 # This test guarantees that tarfile.open() does not treat an empty 430 # file as an empty tar archive. 431 with open(tmpname, "wb"): 432 pass 433 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 434 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 435 436 def test_ignore_zeros(self): 437 # Test TarFile's ignore_zeros option. 438 # generate 512 pseudorandom bytes 439 data = Random(0).randbytes(512) 440 for char in (b'\0', b'a'): 441 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 442 # are ignored correctly. 443 with self.open(tmpname, "w") as fobj: 444 fobj.write(char * 1024) 445 tarinfo = tarfile.TarInfo("foo") 446 tarinfo.size = len(data) 447 fobj.write(tarinfo.tobuf()) 448 fobj.write(data) 449 450 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 451 try: 452 self.assertListEqual(tar.getnames(), ["foo"], 453 "ignore_zeros=True should have skipped the %r-blocks" % 454 char) 455 finally: 456 tar.close() 457 458 def test_premature_end_of_archive(self): 459 for size in (512, 600, 1024, 1200): 460 with tarfile.open(tmpname, "w:") as tar: 461 t = tarfile.TarInfo("foo") 462 t.size = 1024 463 tar.addfile(t, io.BytesIO(b"a" * 1024)) 464 465 with open(tmpname, "r+b") as fobj: 466 fobj.truncate(size) 467 468 with tarfile.open(tmpname) as tar: 469 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 470 for t in tar: 471 pass 472 473 with tarfile.open(tmpname) as tar: 474 t = tar.next() 475 476 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 477 tar.extract(t, TEMPDIR, filter='data') 478 479 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 480 tar.extractfile(t).read() 481 482 def test_length_zero_header(self): 483 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 484 # with an exception 485 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 486 with tarfile.open(support.findfile('recursion.tar')) as tar: 487 pass 488 489class MiscReadTestBase(CommonReadTest): 490 def requires_name_attribute(self): 491 pass 492 493 def test_no_name_argument(self): 494 self.requires_name_attribute() 495 with open(self.tarname, "rb") as fobj: 496 self.assertIsInstance(fobj.name, str) 497 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 498 self.assertIsInstance(tar.name, str) 499 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 500 501 def test_no_name_attribute(self): 502 with open(self.tarname, "rb") as fobj: 503 data = fobj.read() 504 fobj = io.BytesIO(data) 505 self.assertRaises(AttributeError, getattr, fobj, "name") 506 tar = tarfile.open(fileobj=fobj, mode=self.mode) 507 self.assertIsNone(tar.name) 508 509 def test_empty_name_attribute(self): 510 with open(self.tarname, "rb") as fobj: 511 data = fobj.read() 512 fobj = io.BytesIO(data) 513 fobj.name = "" 514 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 515 self.assertIsNone(tar.name) 516 517 def test_int_name_attribute(self): 518 # Issue 21044: tarfile.open() should handle fileobj with an integer 519 # 'name' attribute. 520 fd = os.open(self.tarname, os.O_RDONLY) 521 with open(fd, 'rb') as fobj: 522 self.assertIsInstance(fobj.name, int) 523 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 524 self.assertIsNone(tar.name) 525 526 def test_bytes_name_attribute(self): 527 self.requires_name_attribute() 528 tarname = os.fsencode(self.tarname) 529 with open(tarname, 'rb') as fobj: 530 self.assertIsInstance(fobj.name, bytes) 531 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 532 self.assertIsInstance(tar.name, bytes) 533 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 534 535 def test_pathlike_name(self): 536 tarname = pathlib.Path(self.tarname) 537 with tarfile.open(tarname, mode=self.mode) as tar: 538 self.assertIsInstance(tar.name, str) 539 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 540 with self.taropen(tarname) as tar: 541 self.assertIsInstance(tar.name, str) 542 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 543 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 544 self.assertIsInstance(tar.name, str) 545 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 546 if self.suffix == '': 547 with tarfile.TarFile(tarname, mode='r') as tar: 548 self.assertIsInstance(tar.name, str) 549 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 550 551 def test_illegal_mode_arg(self): 552 with open(tmpname, 'wb'): 553 pass 554 with self.assertRaisesRegex(ValueError, 'mode must be '): 555 tar = self.taropen(tmpname, 'q') 556 with self.assertRaisesRegex(ValueError, 'mode must be '): 557 tar = self.taropen(tmpname, 'rw') 558 with self.assertRaisesRegex(ValueError, 'mode must be '): 559 tar = self.taropen(tmpname, '') 560 561 def test_fileobj_with_offset(self): 562 # Skip the first member and store values from the second member 563 # of the testtar. 564 tar = tarfile.open(self.tarname, mode=self.mode) 565 try: 566 tar.next() 567 t = tar.next() 568 name = t.name 569 offset = t.offset 570 with tar.extractfile(t) as f: 571 data = f.read() 572 finally: 573 tar.close() 574 575 # Open the testtar and seek to the offset of the second member. 576 with self.open(self.tarname) as fobj: 577 fobj.seek(offset) 578 579 # Test if the tarfile starts with the second member. 580 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 581 t = tar.next() 582 self.assertEqual(t.name, name) 583 # Read to the end of fileobj and test if seeking back to the 584 # beginning works. 585 tar.getmembers() 586 self.assertEqual(tar.extractfile(t).read(), data, 587 "seek back did not work") 588 589 def test_fail_comp(self): 590 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 591 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 592 with open(tarname, "rb") as fobj: 593 self.assertRaises(tarfile.ReadError, tarfile.open, 594 fileobj=fobj, mode=self.mode) 595 596 def test_v7_dirtype(self): 597 # Test old style dirtype member (bug #1336623): 598 # Old V7 tars create directory members using an AREGTYPE 599 # header with a "/" appended to the filename field. 600 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 601 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 602 "v7 dirtype failed") 603 604 def test_xstar_type(self): 605 # The xstar format stores extra atime and ctime fields inside the 606 # space reserved for the prefix field. The prefix field must be 607 # ignored in this case, otherwise it will mess up the name. 608 try: 609 self.tar.getmember("misc/regtype-xstar") 610 except KeyError: 611 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 612 613 def test_check_members(self): 614 for tarinfo in self.tar: 615 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 616 "wrong mtime for %s" % tarinfo.name) 617 if not tarinfo.name.startswith("ustar/"): 618 continue 619 self.assertEqual(tarinfo.uname, "tarfile", 620 "wrong uname for %s" % tarinfo.name) 621 622 def test_find_members(self): 623 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 624 "could not find all members") 625 626 @unittest.skipUnless(hasattr(os, "link"), 627 "Missing hardlink implementation") 628 @os_helper.skip_unless_symlink 629 def test_extract_hardlink(self): 630 # Test hardlink extraction (e.g. bug #857297). 631 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 632 tar.extract("ustar/regtype", TEMPDIR, filter='data') 633 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 634 635 tar.extract("ustar/lnktype", TEMPDIR, filter='data') 636 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 637 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 638 data = f.read() 639 self.assertEqual(sha256sum(data), sha256_regtype) 640 641 tar.extract("ustar/symtype", TEMPDIR, filter='data') 642 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 643 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 644 data = f.read() 645 self.assertEqual(sha256sum(data), sha256_regtype) 646 647 @os_helper.skip_unless_working_chmod 648 def test_extractall(self): 649 # Test if extractall() correctly restores directory permissions 650 # and times (see issue1735). 651 tar = tarfile.open(tarname, encoding="iso8859-1") 652 DIR = os.path.join(TEMPDIR, "extractall") 653 os.mkdir(DIR) 654 try: 655 directories = [t for t in tar if t.isdir()] 656 tar.extractall(DIR, directories, filter='fully_trusted') 657 for tarinfo in directories: 658 path = os.path.join(DIR, tarinfo.name) 659 if sys.platform != "win32": 660 # Win32 has no support for fine grained permissions. 661 self.assertEqual(tarinfo.mode & 0o777, 662 os.stat(path).st_mode & 0o777, 663 tarinfo.name) 664 def format_mtime(mtime): 665 if isinstance(mtime, float): 666 return "{} ({})".format(mtime, mtime.hex()) 667 else: 668 return "{!r} (int)".format(mtime) 669 file_mtime = os.path.getmtime(path) 670 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 671 format_mtime(tarinfo.mtime), 672 format_mtime(file_mtime), 673 path) 674 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 675 finally: 676 tar.close() 677 os_helper.rmtree(DIR) 678 679 @os_helper.skip_unless_working_chmod 680 def test_extract_directory(self): 681 dirtype = "ustar/dirtype" 682 DIR = os.path.join(TEMPDIR, "extractdir") 683 os.mkdir(DIR) 684 try: 685 with tarfile.open(tarname, encoding="iso8859-1") as tar: 686 tarinfo = tar.getmember(dirtype) 687 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 688 extracted = os.path.join(DIR, dirtype) 689 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 690 if sys.platform != "win32": 691 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 692 finally: 693 os_helper.rmtree(DIR) 694 695 def test_extractall_pathlike_name(self): 696 DIR = pathlib.Path(TEMPDIR) / "extractall" 697 with os_helper.temp_dir(DIR), \ 698 tarfile.open(tarname, encoding="iso8859-1") as tar: 699 directories = [t for t in tar if t.isdir()] 700 tar.extractall(DIR, directories, filter='fully_trusted') 701 for tarinfo in directories: 702 path = DIR / tarinfo.name 703 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 704 705 def test_extract_pathlike_name(self): 706 dirtype = "ustar/dirtype" 707 DIR = pathlib.Path(TEMPDIR) / "extractall" 708 with os_helper.temp_dir(DIR), \ 709 tarfile.open(tarname, encoding="iso8859-1") as tar: 710 tarinfo = tar.getmember(dirtype) 711 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 712 extracted = DIR / dirtype 713 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 714 715 def test_init_close_fobj(self): 716 # Issue #7341: Close the internal file object in the TarFile 717 # constructor in case of an error. For the test we rely on 718 # the fact that opening an empty file raises a ReadError. 719 empty = os.path.join(TEMPDIR, "empty") 720 with open(empty, "wb") as fobj: 721 fobj.write(b"") 722 723 try: 724 tar = object.__new__(tarfile.TarFile) 725 try: 726 tar.__init__(empty) 727 except tarfile.ReadError: 728 self.assertTrue(tar.fileobj.closed) 729 else: 730 self.fail("ReadError not raised") 731 finally: 732 os_helper.unlink(empty) 733 734 def test_parallel_iteration(self): 735 # Issue #16601: Restarting iteration over tarfile continued 736 # from where it left off. 737 with tarfile.open(self.tarname) as tar: 738 for m1, m2 in zip(tar, tar): 739 self.assertEqual(m1.offset, m2.offset) 740 self.assertEqual(m1.get_info(), m2.get_info()) 741 742 @unittest.skipIf(zlib is None, "requires zlib") 743 def test_zlib_error_does_not_leak(self): 744 # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when 745 # parsing certain types of invalid data 746 with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock: 747 mock.side_effect = zlib.error 748 with self.assertRaises(tarfile.ReadError): 749 tarfile.open(self.tarname) 750 751 def test_next_on_empty_tarfile(self): 752 fd = io.BytesIO() 753 tf = tarfile.open(fileobj=fd, mode="w") 754 tf.close() 755 756 fd.seek(0) 757 with tarfile.open(fileobj=fd, mode="r|") as tf: 758 self.assertEqual(tf.next(), None) 759 760 fd.seek(0) 761 with tarfile.open(fileobj=fd, mode="r") as tf: 762 self.assertEqual(tf.next(), None) 763 764class MiscReadTest(MiscReadTestBase, unittest.TestCase): 765 test_fail_comp = None 766 767class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 768 pass 769 770class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 771 def requires_name_attribute(self): 772 self.skipTest("BZ2File have no name attribute") 773 774class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 775 def requires_name_attribute(self): 776 self.skipTest("LZMAFile have no name attribute") 777 778 779class StreamReadTest(CommonReadTest, unittest.TestCase): 780 781 prefix="r|" 782 783 def test_read_through(self): 784 # Issue #11224: A poorly designed _FileInFile.read() method 785 # caused seeking errors with stream tar files. 786 for tarinfo in self.tar: 787 if not tarinfo.isreg(): 788 continue 789 with self.tar.extractfile(tarinfo) as fobj: 790 while True: 791 try: 792 buf = fobj.read(512) 793 except tarfile.StreamError: 794 self.fail("simple read-through using " 795 "TarFile.extractfile() failed") 796 if not buf: 797 break 798 799 def test_fileobj_regular_file(self): 800 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 801 with self.tar.extractfile(tarinfo) as fobj: 802 data = fobj.read() 803 self.assertEqual(len(data), tarinfo.size, 804 "regular file extraction failed") 805 self.assertEqual(sha256sum(data), sha256_regtype, 806 "regular file extraction failed") 807 808 def test_provoke_stream_error(self): 809 tarinfos = self.tar.getmembers() 810 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 811 self.assertRaises(tarfile.StreamError, f.read) 812 813 def test_compare_members(self): 814 tar1 = tarfile.open(tarname, encoding="iso8859-1") 815 try: 816 tar2 = self.tar 817 818 while True: 819 t1 = tar1.next() 820 t2 = tar2.next() 821 if t1 is None: 822 break 823 self.assertIsNotNone(t2, "stream.next() failed.") 824 825 if t2.islnk() or t2.issym(): 826 with self.assertRaises(tarfile.StreamError): 827 tar2.extractfile(t2) 828 continue 829 830 v1 = tar1.extractfile(t1) 831 v2 = tar2.extractfile(t2) 832 if v1 is None: 833 continue 834 self.assertIsNotNone(v2, "stream.extractfile() failed") 835 self.assertEqual(v1.read(), v2.read(), 836 "stream extraction failed") 837 finally: 838 tar1.close() 839 840class GzipStreamReadTest(GzipTest, StreamReadTest): 841 pass 842 843class Bz2StreamReadTest(Bz2Test, StreamReadTest): 844 pass 845 846class LzmaStreamReadTest(LzmaTest, StreamReadTest): 847 pass 848 849 850class DetectReadTest(TarTest, unittest.TestCase): 851 def _testfunc_file(self, name, mode): 852 try: 853 tar = tarfile.open(name, mode) 854 except tarfile.ReadError as e: 855 self.fail() 856 else: 857 tar.close() 858 859 def _testfunc_fileobj(self, name, mode): 860 try: 861 with open(name, "rb") as f: 862 tar = tarfile.open(name, mode, fileobj=f) 863 except tarfile.ReadError as e: 864 self.fail() 865 else: 866 tar.close() 867 868 def _test_modes(self, testfunc): 869 if self.suffix: 870 with self.assertRaises(tarfile.ReadError): 871 tarfile.open(tarname, mode="r:" + self.suffix) 872 with self.assertRaises(tarfile.ReadError): 873 tarfile.open(tarname, mode="r|" + self.suffix) 874 with self.assertRaises(tarfile.ReadError): 875 tarfile.open(self.tarname, mode="r:") 876 with self.assertRaises(tarfile.ReadError): 877 tarfile.open(self.tarname, mode="r|") 878 testfunc(self.tarname, "r") 879 testfunc(self.tarname, "r:" + self.suffix) 880 testfunc(self.tarname, "r:*") 881 testfunc(self.tarname, "r|" + self.suffix) 882 testfunc(self.tarname, "r|*") 883 884 def test_detect_file(self): 885 self._test_modes(self._testfunc_file) 886 887 def test_detect_fileobj(self): 888 self._test_modes(self._testfunc_fileobj) 889 890class GzipDetectReadTest(GzipTest, DetectReadTest): 891 pass 892 893class Bz2DetectReadTest(Bz2Test, DetectReadTest): 894 def test_detect_stream_bz2(self): 895 # Originally, tarfile's stream detection looked for the string 896 # "BZh91" at the start of the file. This is incorrect because 897 # the '9' represents the blocksize (900,000 bytes). If the file was 898 # compressed using another blocksize autodetection fails. 899 with open(tarname, "rb") as fobj: 900 data = fobj.read() 901 902 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 903 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 904 fobj.write(data) 905 906 self._testfunc_file(tmpname, "r|*") 907 908class LzmaDetectReadTest(LzmaTest, DetectReadTest): 909 pass 910 911 912class MemberReadTest(ReadTest, unittest.TestCase): 913 914 def _test_member(self, tarinfo, chksum=None, **kwargs): 915 if chksum is not None: 916 with self.tar.extractfile(tarinfo) as f: 917 self.assertEqual(sha256sum(f.read()), chksum, 918 "wrong sha256sum for %s" % tarinfo.name) 919 920 kwargs["mtime"] = 0o7606136617 921 kwargs["uid"] = 1000 922 kwargs["gid"] = 100 923 if "old-v7" not in tarinfo.name: 924 # V7 tar can't handle alphabetic owners. 925 kwargs["uname"] = "tarfile" 926 kwargs["gname"] = "tarfile" 927 for k, v in kwargs.items(): 928 self.assertEqual(getattr(tarinfo, k), v, 929 "wrong value in %s field of %s" % (k, tarinfo.name)) 930 931 def test_find_regtype(self): 932 tarinfo = self.tar.getmember("ustar/regtype") 933 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 934 935 def test_find_conttype(self): 936 tarinfo = self.tar.getmember("ustar/conttype") 937 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 938 939 def test_find_dirtype(self): 940 tarinfo = self.tar.getmember("ustar/dirtype") 941 self._test_member(tarinfo, size=0) 942 943 def test_find_dirtype_with_size(self): 944 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 945 self._test_member(tarinfo, size=255) 946 947 def test_find_lnktype(self): 948 tarinfo = self.tar.getmember("ustar/lnktype") 949 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 950 951 def test_find_symtype(self): 952 tarinfo = self.tar.getmember("ustar/symtype") 953 self._test_member(tarinfo, size=0, linkname="regtype") 954 955 def test_find_blktype(self): 956 tarinfo = self.tar.getmember("ustar/blktype") 957 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 958 959 def test_find_chrtype(self): 960 tarinfo = self.tar.getmember("ustar/chrtype") 961 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 962 963 def test_find_fifotype(self): 964 tarinfo = self.tar.getmember("ustar/fifotype") 965 self._test_member(tarinfo, size=0) 966 967 def test_find_sparse(self): 968 tarinfo = self.tar.getmember("ustar/sparse") 969 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 970 971 def test_find_gnusparse(self): 972 tarinfo = self.tar.getmember("gnu/sparse") 973 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 974 975 def test_find_gnusparse_00(self): 976 tarinfo = self.tar.getmember("gnu/sparse-0.0") 977 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 978 979 def test_find_gnusparse_01(self): 980 tarinfo = self.tar.getmember("gnu/sparse-0.1") 981 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 982 983 def test_find_gnusparse_10(self): 984 tarinfo = self.tar.getmember("gnu/sparse-1.0") 985 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 986 987 def test_find_umlauts(self): 988 tarinfo = self.tar.getmember("ustar/umlauts-" 989 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 990 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 991 992 def test_find_ustar_longname(self): 993 name = "ustar/" + "12345/" * 39 + "1234567/longname" 994 self.assertIn(name, self.tar.getnames()) 995 996 def test_find_regtype_oldv7(self): 997 tarinfo = self.tar.getmember("misc/regtype-old-v7") 998 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 999 1000 def test_find_pax_umlauts(self): 1001 self.tar.close() 1002 self.tar = tarfile.open(self.tarname, mode=self.mode, 1003 encoding="iso8859-1") 1004 tarinfo = self.tar.getmember("pax/umlauts-" 1005 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1006 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1007 1008 1009class LongnameTest: 1010 1011 def test_read_longname(self): 1012 # Test reading of longname (bug #1471427). 1013 longname = self.subdir + "/" + "123/" * 125 + "longname" 1014 try: 1015 tarinfo = self.tar.getmember(longname) 1016 except KeyError: 1017 self.fail("longname not found") 1018 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 1019 "read longname as dirtype") 1020 1021 def test_read_longlink(self): 1022 longname = self.subdir + "/" + "123/" * 125 + "longname" 1023 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 1024 try: 1025 tarinfo = self.tar.getmember(longlink) 1026 except KeyError: 1027 self.fail("longlink not found") 1028 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 1029 1030 def test_truncated_longname(self): 1031 longname = self.subdir + "/" + "123/" * 125 + "longname" 1032 tarinfo = self.tar.getmember(longname) 1033 offset = tarinfo.offset 1034 self.tar.fileobj.seek(offset) 1035 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 1036 with self.assertRaises(tarfile.ReadError): 1037 tarfile.open(name="foo.tar", fileobj=fobj) 1038 1039 def test_header_offset(self): 1040 # Test if the start offset of the TarInfo object includes 1041 # the preceding extended header. 1042 longname = self.subdir + "/" + "123/" * 125 + "longname" 1043 offset = self.tar.getmember(longname).offset 1044 with open(tarname, "rb") as fobj: 1045 fobj.seek(offset) 1046 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 1047 "iso8859-1", "strict") 1048 self.assertEqual(tarinfo.type, self.longnametype) 1049 1050 def test_longname_directory(self): 1051 # Test reading a longlink directory. Issue #47231. 1052 longdir = ('a' * 101) + '/' 1053 with os_helper.temp_cwd(): 1054 with tarfile.open(tmpname, 'w') as tar: 1055 tar.format = self.format 1056 try: 1057 os.mkdir(longdir) 1058 tar.add(longdir) 1059 finally: 1060 os.rmdir(longdir.rstrip("/")) 1061 with tarfile.open(tmpname) as tar: 1062 self.assertIsNotNone(tar.getmember(longdir)) 1063 self.assertIsNotNone(tar.getmember(longdir.removesuffix('/'))) 1064 1065class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 1066 1067 subdir = "gnu" 1068 longnametype = tarfile.GNUTYPE_LONGNAME 1069 format = tarfile.GNU_FORMAT 1070 1071 # Since 3.2 tarfile is supposed to accurately restore sparse members and 1072 # produce files with holes. This is what we actually want to test here. 1073 # Unfortunately, not all platforms/filesystems support sparse files, and 1074 # even on platforms that do it is non-trivial to make reliable assertions 1075 # about holes in files. Therefore, we first do one basic test which works 1076 # an all platforms, and after that a test that will work only on 1077 # platforms/filesystems that prove to support sparse files. 1078 def _test_sparse_file(self, name): 1079 self.tar.extract(name, TEMPDIR, filter='data') 1080 filename = os.path.join(TEMPDIR, name) 1081 with open(filename, "rb") as fobj: 1082 data = fobj.read() 1083 self.assertEqual(sha256sum(data), sha256_sparse, 1084 "wrong sha256sum for %s" % name) 1085 1086 if self._fs_supports_holes(): 1087 s = os.stat(filename) 1088 self.assertLess(s.st_blocks * 512, s.st_size) 1089 1090 def test_sparse_file_old(self): 1091 self._test_sparse_file("gnu/sparse") 1092 1093 def test_sparse_file_00(self): 1094 self._test_sparse_file("gnu/sparse-0.0") 1095 1096 def test_sparse_file_01(self): 1097 self._test_sparse_file("gnu/sparse-0.1") 1098 1099 def test_sparse_file_10(self): 1100 self._test_sparse_file("gnu/sparse-1.0") 1101 1102 @staticmethod 1103 def _fs_supports_holes(): 1104 # Return True if the platform knows the st_blocks stat attribute and 1105 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1106 # store holes of 4 KiB in files. 1107 # 1108 # The function returns False if page size is larger than 4 KiB. 1109 # For example, ppc64 uses pages of 64 KiB. 1110 if sys.platform.startswith("linux"): 1111 # Linux evidentially has 512 byte st_blocks units. 1112 name = os.path.join(TEMPDIR, "sparse-test") 1113 with open(name, "wb") as fobj: 1114 # Seek to "punch a hole" of 4 KiB 1115 fobj.seek(4096) 1116 fobj.write(b'x' * 4096) 1117 fobj.truncate() 1118 s = os.stat(name) 1119 os_helper.unlink(name) 1120 return (s.st_blocks * 512 < s.st_size) 1121 else: 1122 return False 1123 1124 1125class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1126 1127 subdir = "pax" 1128 longnametype = tarfile.XHDTYPE 1129 format = tarfile.PAX_FORMAT 1130 1131 def test_pax_global_headers(self): 1132 tar = tarfile.open(tarname, encoding="iso8859-1") 1133 try: 1134 tarinfo = tar.getmember("pax/regtype1") 1135 self.assertEqual(tarinfo.uname, "foo") 1136 self.assertEqual(tarinfo.gname, "bar") 1137 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1138 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1139 1140 tarinfo = tar.getmember("pax/regtype2") 1141 self.assertEqual(tarinfo.uname, "") 1142 self.assertEqual(tarinfo.gname, "bar") 1143 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1144 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1145 1146 tarinfo = tar.getmember("pax/regtype3") 1147 self.assertEqual(tarinfo.uname, "tarfile") 1148 self.assertEqual(tarinfo.gname, "tarfile") 1149 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1150 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1151 finally: 1152 tar.close() 1153 1154 def test_pax_number_fields(self): 1155 # All following number fields are read from the pax header. 1156 tar = tarfile.open(tarname, encoding="iso8859-1") 1157 try: 1158 tarinfo = tar.getmember("pax/regtype4") 1159 self.assertEqual(tarinfo.size, 7011) 1160 self.assertEqual(tarinfo.uid, 123) 1161 self.assertEqual(tarinfo.gid, 123) 1162 self.assertEqual(tarinfo.mtime, 1041808783.0) 1163 self.assertEqual(type(tarinfo.mtime), float) 1164 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1165 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1166 finally: 1167 tar.close() 1168 1169 def test_pax_header_bad_formats(self): 1170 # The fields from the pax header have priority over the 1171 # TarInfo. 1172 pax_header_replacements = ( 1173 b" foo=bar\n", 1174 b"0 \n", 1175 b"1 \n", 1176 b"2 \n", 1177 b"3 =\n", 1178 b"4 =a\n", 1179 b"1000000 foo=bar\n", 1180 b"0 foo=bar\n", 1181 b"-12 foo=bar\n", 1182 b"000000000000000000000000036 foo=bar\n", 1183 ) 1184 pax_headers = {"foo": "bar"} 1185 1186 for replacement in pax_header_replacements: 1187 with self.subTest(header=replacement): 1188 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1189 encoding="iso8859-1") 1190 try: 1191 t = tarfile.TarInfo() 1192 t.name = "pax" # non-ASCII 1193 t.uid = 1 1194 t.pax_headers = pax_headers 1195 tar.addfile(t) 1196 finally: 1197 tar.close() 1198 1199 with open(tmpname, "rb") as f: 1200 data = f.read() 1201 self.assertIn(b"11 foo=bar\n", data) 1202 data = data.replace(b"11 foo=bar\n", replacement) 1203 1204 with open(tmpname, "wb") as f: 1205 f.truncate() 1206 f.write(data) 1207 1208 with self.assertRaisesRegex(tarfile.ReadError, r"method tar: ReadError\('invalid header'\)"): 1209 tarfile.open(tmpname, encoding="iso8859-1") 1210 1211 1212class WriteTestBase(TarTest): 1213 # Put all write tests in here that are supposed to be tested 1214 # in all possible mode combinations. 1215 1216 def test_fileobj_no_close(self): 1217 fobj = io.BytesIO() 1218 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1219 tar.addfile(tarfile.TarInfo("foo")) 1220 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1221 # Issue #20238: Incomplete gzip output with mode="w:gz" 1222 data = fobj.getvalue() 1223 del tar 1224 support.gc_collect() 1225 self.assertFalse(fobj.closed) 1226 self.assertEqual(data, fobj.getvalue()) 1227 1228 def test_eof_marker(self): 1229 # Make sure an end of archive marker is written (two zero blocks). 1230 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1231 # So, we create an archive that has exactly 10240 bytes without the 1232 # marker, and has 20480 bytes once the marker is written. 1233 with tarfile.open(tmpname, self.mode) as tar: 1234 t = tarfile.TarInfo("foo") 1235 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1236 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1237 1238 with self.open(tmpname, "rb") as fobj: 1239 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1240 1241 1242class WriteTest(WriteTestBase, unittest.TestCase): 1243 1244 prefix = "w:" 1245 1246 def test_100_char_name(self): 1247 # The name field in a tar header stores strings of at most 100 chars. 1248 # If a string is shorter than 100 chars it has to be padded with '\0', 1249 # which implies that a string of exactly 100 chars is stored without 1250 # a trailing '\0'. 1251 name = "0123456789" * 10 1252 tar = tarfile.open(tmpname, self.mode) 1253 try: 1254 t = tarfile.TarInfo(name) 1255 tar.addfile(t) 1256 finally: 1257 tar.close() 1258 1259 tar = tarfile.open(tmpname) 1260 try: 1261 self.assertEqual(tar.getnames()[0], name, 1262 "failed to store 100 char filename") 1263 finally: 1264 tar.close() 1265 1266 def test_tar_size(self): 1267 # Test for bug #1013882. 1268 tar = tarfile.open(tmpname, self.mode) 1269 try: 1270 path = os.path.join(TEMPDIR, "file") 1271 with open(path, "wb") as fobj: 1272 fobj.write(b"aaa") 1273 tar.add(path) 1274 finally: 1275 tar.close() 1276 self.assertGreater(os.path.getsize(tmpname), 0, 1277 "tarfile is empty") 1278 1279 # The test_*_size tests test for bug #1167128. 1280 def test_file_size(self): 1281 tar = tarfile.open(tmpname, self.mode) 1282 try: 1283 path = os.path.join(TEMPDIR, "file") 1284 with open(path, "wb"): 1285 pass 1286 tarinfo = tar.gettarinfo(path) 1287 self.assertEqual(tarinfo.size, 0) 1288 1289 with open(path, "wb") as fobj: 1290 fobj.write(b"aaa") 1291 tarinfo = tar.gettarinfo(path) 1292 self.assertEqual(tarinfo.size, 3) 1293 finally: 1294 tar.close() 1295 1296 def test_directory_size(self): 1297 path = os.path.join(TEMPDIR, "directory") 1298 os.mkdir(path) 1299 try: 1300 tar = tarfile.open(tmpname, self.mode) 1301 try: 1302 tarinfo = tar.gettarinfo(path) 1303 self.assertEqual(tarinfo.size, 0) 1304 finally: 1305 tar.close() 1306 finally: 1307 os_helper.rmdir(path) 1308 1309 # mock the following: 1310 # os.listdir: so we know that files are in the wrong order 1311 def test_ordered_recursion(self): 1312 path = os.path.join(TEMPDIR, "directory") 1313 os.mkdir(path) 1314 open(os.path.join(path, "1"), "a").close() 1315 open(os.path.join(path, "2"), "a").close() 1316 try: 1317 tar = tarfile.open(tmpname, self.mode) 1318 try: 1319 with unittest.mock.patch('os.listdir') as mock_listdir: 1320 mock_listdir.return_value = ["2", "1"] 1321 tar.add(path) 1322 paths = [] 1323 for m in tar.getmembers(): 1324 paths.append(os.path.split(m.name)[-1]) 1325 self.assertEqual(paths, ["directory", "1", "2"]); 1326 finally: 1327 tar.close() 1328 finally: 1329 os_helper.unlink(os.path.join(path, "1")) 1330 os_helper.unlink(os.path.join(path, "2")) 1331 os_helper.rmdir(path) 1332 1333 def test_gettarinfo_pathlike_name(self): 1334 with tarfile.open(tmpname, self.mode) as tar: 1335 path = pathlib.Path(TEMPDIR) / "file" 1336 with open(path, "wb") as fobj: 1337 fobj.write(b"aaa") 1338 tarinfo = tar.gettarinfo(path) 1339 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1340 self.assertIsInstance(tarinfo.name, str) 1341 self.assertEqual(tarinfo.name, tarinfo2.name) 1342 self.assertEqual(tarinfo.size, 3) 1343 1344 @unittest.skipUnless(hasattr(os, "link"), 1345 "Missing hardlink implementation") 1346 def test_link_size(self): 1347 link = os.path.join(TEMPDIR, "link") 1348 target = os.path.join(TEMPDIR, "link_target") 1349 with open(target, "wb") as fobj: 1350 fobj.write(b"aaa") 1351 try: 1352 os.link(target, link) 1353 except PermissionError as e: 1354 self.skipTest('os.link(): %s' % e) 1355 try: 1356 tar = tarfile.open(tmpname, self.mode) 1357 try: 1358 # Record the link target in the inodes list. 1359 tar.gettarinfo(target) 1360 tarinfo = tar.gettarinfo(link) 1361 self.assertEqual(tarinfo.size, 0) 1362 finally: 1363 tar.close() 1364 finally: 1365 os_helper.unlink(target) 1366 os_helper.unlink(link) 1367 1368 @os_helper.skip_unless_symlink 1369 def test_symlink_size(self): 1370 path = os.path.join(TEMPDIR, "symlink") 1371 os.symlink("link_target", path) 1372 try: 1373 tar = tarfile.open(tmpname, self.mode) 1374 try: 1375 tarinfo = tar.gettarinfo(path) 1376 self.assertEqual(tarinfo.size, 0) 1377 finally: 1378 tar.close() 1379 finally: 1380 os_helper.unlink(path) 1381 1382 def test_add_self(self): 1383 # Test for #1257255. 1384 dstname = os.path.abspath(tmpname) 1385 tar = tarfile.open(tmpname, self.mode) 1386 try: 1387 self.assertEqual(tar.name, dstname, 1388 "archive name must be absolute") 1389 tar.add(dstname) 1390 self.assertEqual(tar.getnames(), [], 1391 "added the archive to itself") 1392 1393 with os_helper.change_cwd(TEMPDIR): 1394 tar.add(dstname) 1395 self.assertEqual(tar.getnames(), [], 1396 "added the archive to itself") 1397 finally: 1398 tar.close() 1399 1400 def test_filter(self): 1401 tempdir = os.path.join(TEMPDIR, "filter") 1402 os.mkdir(tempdir) 1403 try: 1404 for name in ("foo", "bar", "baz"): 1405 name = os.path.join(tempdir, name) 1406 os_helper.create_empty_file(name) 1407 1408 def filter(tarinfo): 1409 if os.path.basename(tarinfo.name) == "bar": 1410 return 1411 tarinfo.uid = 123 1412 tarinfo.uname = "foo" 1413 return tarinfo 1414 1415 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1416 try: 1417 tar.add(tempdir, arcname="empty_dir", filter=filter) 1418 finally: 1419 tar.close() 1420 1421 # Verify that filter is a keyword-only argument 1422 with self.assertRaises(TypeError): 1423 tar.add(tempdir, "empty_dir", True, None, filter) 1424 1425 tar = tarfile.open(tmpname, "r") 1426 try: 1427 for tarinfo in tar: 1428 self.assertEqual(tarinfo.uid, 123) 1429 self.assertEqual(tarinfo.uname, "foo") 1430 self.assertEqual(len(tar.getmembers()), 3) 1431 finally: 1432 tar.close() 1433 finally: 1434 os_helper.rmtree(tempdir) 1435 1436 # Guarantee that stored pathnames are not modified. Don't 1437 # remove ./ or ../ or double slashes. Still make absolute 1438 # pathnames relative. 1439 # For details see bug #6054. 1440 def _test_pathname(self, path, cmp_path=None, dir=False): 1441 # Create a tarfile with an empty member named path 1442 # and compare the stored name with the original. 1443 foo = os.path.join(TEMPDIR, "foo") 1444 if not dir: 1445 os_helper.create_empty_file(foo) 1446 else: 1447 os.mkdir(foo) 1448 1449 tar = tarfile.open(tmpname, self.mode) 1450 try: 1451 tar.add(foo, arcname=path) 1452 finally: 1453 tar.close() 1454 1455 tar = tarfile.open(tmpname, "r") 1456 try: 1457 t = tar.next() 1458 finally: 1459 tar.close() 1460 1461 if not dir: 1462 os_helper.unlink(foo) 1463 else: 1464 os_helper.rmdir(foo) 1465 1466 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1467 1468 1469 @os_helper.skip_unless_symlink 1470 def test_extractall_symlinks(self): 1471 # Test if extractall works properly when tarfile contains symlinks 1472 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1473 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1474 os.mkdir(tempdir) 1475 try: 1476 source_file = os.path.join(tempdir,'source') 1477 target_file = os.path.join(tempdir,'symlink') 1478 with open(source_file,'w') as f: 1479 f.write('something\n') 1480 os.symlink(source_file, target_file) 1481 with tarfile.open(temparchive, 'w') as tar: 1482 tar.add(source_file, arcname="source") 1483 tar.add(target_file, arcname="symlink") 1484 # Let's extract it to the location which contains the symlink 1485 with tarfile.open(temparchive, errorlevel=2) as tar: 1486 # this should not raise OSError: [Errno 17] File exists 1487 try: 1488 tar.extractall(path=tempdir, 1489 filter='fully_trusted') 1490 except OSError: 1491 self.fail("extractall failed with symlinked files") 1492 finally: 1493 os_helper.unlink(temparchive) 1494 os_helper.rmtree(tempdir) 1495 1496 def test_pathnames(self): 1497 self._test_pathname("foo") 1498 self._test_pathname(os.path.join("foo", ".", "bar")) 1499 self._test_pathname(os.path.join("foo", "..", "bar")) 1500 self._test_pathname(os.path.join(".", "foo")) 1501 self._test_pathname(os.path.join(".", "foo", ".")) 1502 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1503 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1504 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1505 self._test_pathname(os.path.join("..", "foo")) 1506 self._test_pathname(os.path.join("..", "foo", "..")) 1507 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1508 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1509 1510 self._test_pathname("foo" + os.sep + os.sep + "bar") 1511 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1512 1513 def test_abs_pathnames(self): 1514 if sys.platform == "win32": 1515 self._test_pathname("C:\\foo", "foo") 1516 else: 1517 self._test_pathname("/foo", "foo") 1518 self._test_pathname("///foo", "foo") 1519 1520 def test_cwd(self): 1521 # Test adding the current working directory. 1522 with os_helper.change_cwd(TEMPDIR): 1523 tar = tarfile.open(tmpname, self.mode) 1524 try: 1525 tar.add(".") 1526 finally: 1527 tar.close() 1528 1529 tar = tarfile.open(tmpname, "r") 1530 try: 1531 for t in tar: 1532 if t.name != ".": 1533 self.assertTrue(t.name.startswith("./"), t.name) 1534 finally: 1535 tar.close() 1536 1537 def test_open_nonwritable_fileobj(self): 1538 for exctype in OSError, EOFError, RuntimeError: 1539 class BadFile(io.BytesIO): 1540 first = True 1541 def write(self, data): 1542 if self.first: 1543 self.first = False 1544 raise exctype 1545 1546 f = BadFile() 1547 with self.assertRaises(exctype): 1548 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1549 format=tarfile.PAX_FORMAT, 1550 pax_headers={'non': 'empty'}) 1551 self.assertFalse(f.closed) 1552 1553 1554class GzipWriteTest(GzipTest, WriteTest): 1555 pass 1556 1557 1558class Bz2WriteTest(Bz2Test, WriteTest): 1559 pass 1560 1561 1562class LzmaWriteTest(LzmaTest, WriteTest): 1563 pass 1564 1565 1566class StreamWriteTest(WriteTestBase, unittest.TestCase): 1567 1568 prefix = "w|" 1569 decompressor = None 1570 1571 def test_stream_padding(self): 1572 # Test for bug #1543303. 1573 tar = tarfile.open(tmpname, self.mode) 1574 tar.close() 1575 if self.decompressor: 1576 dec = self.decompressor() 1577 with open(tmpname, "rb") as fobj: 1578 data = fobj.read() 1579 data = dec.decompress(data) 1580 self.assertFalse(dec.unused_data, "found trailing data") 1581 else: 1582 with self.open(tmpname) as fobj: 1583 data = fobj.read() 1584 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1585 "incorrect zero padding") 1586 1587 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1588 "Missing umask implementation") 1589 @unittest.skipIf( 1590 support.is_emscripten or support.is_wasi, 1591 "Emscripten's/WASI's umask is a stub." 1592 ) 1593 def test_file_mode(self): 1594 # Test for issue #8464: Create files with correct 1595 # permissions. 1596 if os.path.exists(tmpname): 1597 os_helper.unlink(tmpname) 1598 1599 original_umask = os.umask(0o022) 1600 try: 1601 tar = tarfile.open(tmpname, self.mode) 1602 tar.close() 1603 mode = os.stat(tmpname).st_mode & 0o777 1604 self.assertEqual(mode, 0o644, "wrong file permissions") 1605 finally: 1606 os.umask(original_umask) 1607 1608 1609class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1610 def test_source_directory_not_leaked(self): 1611 """ 1612 Ensure the source directory is not included in the tar header 1613 per bpo-41316. 1614 """ 1615 tarfile.open(tmpname, self.mode).close() 1616 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1617 assert os.path.dirname(tmpname) not in payload 1618 1619 1620class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1621 decompressor = bz2.BZ2Decompressor if bz2 else None 1622 1623class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1624 decompressor = lzma.LZMADecompressor if lzma else None 1625 1626 1627class GNUWriteTest(unittest.TestCase): 1628 # This testcase checks for correct creation of GNU Longname 1629 # and Longlink extended headers (cp. bug #812325). 1630 1631 def _length(self, s): 1632 blocks = len(s) // 512 + 1 1633 return blocks * 512 1634 1635 def _calc_size(self, name, link=None): 1636 # Initial tar header 1637 count = 512 1638 1639 if len(name) > tarfile.LENGTH_NAME: 1640 # GNU longname extended header + longname 1641 count += 512 1642 count += self._length(name) 1643 if link is not None and len(link) > tarfile.LENGTH_LINK: 1644 # GNU longlink extended header + longlink 1645 count += 512 1646 count += self._length(link) 1647 return count 1648 1649 def _test(self, name, link=None): 1650 tarinfo = tarfile.TarInfo(name) 1651 if link: 1652 tarinfo.linkname = link 1653 tarinfo.type = tarfile.LNKTYPE 1654 1655 tar = tarfile.open(tmpname, "w") 1656 try: 1657 tar.format = tarfile.GNU_FORMAT 1658 tar.addfile(tarinfo) 1659 1660 v1 = self._calc_size(name, link) 1661 v2 = tar.offset 1662 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1663 finally: 1664 tar.close() 1665 1666 tar = tarfile.open(tmpname) 1667 try: 1668 member = tar.next() 1669 self.assertIsNotNone(member, 1670 "unable to read longname member") 1671 self.assertEqual(tarinfo.name, member.name, 1672 "unable to read longname member") 1673 self.assertEqual(tarinfo.linkname, member.linkname, 1674 "unable to read longname member") 1675 finally: 1676 tar.close() 1677 1678 def test_longname_1023(self): 1679 self._test(("longnam/" * 127) + "longnam") 1680 1681 def test_longname_1024(self): 1682 self._test(("longnam/" * 127) + "longname") 1683 1684 def test_longname_1025(self): 1685 self._test(("longnam/" * 127) + "longname_") 1686 1687 def test_longlink_1023(self): 1688 self._test("name", ("longlnk/" * 127) + "longlnk") 1689 1690 def test_longlink_1024(self): 1691 self._test("name", ("longlnk/" * 127) + "longlink") 1692 1693 def test_longlink_1025(self): 1694 self._test("name", ("longlnk/" * 127) + "longlink_") 1695 1696 def test_longnamelink_1023(self): 1697 self._test(("longnam/" * 127) + "longnam", 1698 ("longlnk/" * 127) + "longlnk") 1699 1700 def test_longnamelink_1024(self): 1701 self._test(("longnam/" * 127) + "longname", 1702 ("longlnk/" * 127) + "longlink") 1703 1704 def test_longnamelink_1025(self): 1705 self._test(("longnam/" * 127) + "longname_", 1706 ("longlnk/" * 127) + "longlink_") 1707 1708 1709class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1710 1711 prefix = "w:" 1712 1713 def test_headers_written_only_for_device_files(self): 1714 # Regression test for bpo-18819. 1715 tempdir = os.path.join(TEMPDIR, "device_header_test") 1716 os.mkdir(tempdir) 1717 try: 1718 tar = tarfile.open(tmpname, self.mode) 1719 try: 1720 input_blk = tarfile.TarInfo(name="my_block_device") 1721 input_reg = tarfile.TarInfo(name="my_regular_file") 1722 input_blk.type = tarfile.BLKTYPE 1723 input_reg.type = tarfile.REGTYPE 1724 tar.addfile(input_blk) 1725 tar.addfile(input_reg) 1726 finally: 1727 tar.close() 1728 1729 # devmajor and devminor should be *interpreted* as 0 in both... 1730 tar = tarfile.open(tmpname, "r") 1731 try: 1732 output_blk = tar.getmember("my_block_device") 1733 output_reg = tar.getmember("my_regular_file") 1734 finally: 1735 tar.close() 1736 self.assertEqual(output_blk.devmajor, 0) 1737 self.assertEqual(output_blk.devminor, 0) 1738 self.assertEqual(output_reg.devmajor, 0) 1739 self.assertEqual(output_reg.devminor, 0) 1740 1741 # ...but the fields should not actually be set on regular files: 1742 with open(tmpname, "rb") as infile: 1743 buf = infile.read() 1744 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1745 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1746 # See `struct posixheader` in GNU docs for byte offsets: 1747 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1748 device_headers = slice(329, 329 + 16) 1749 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1750 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1751 finally: 1752 os_helper.rmtree(tempdir) 1753 1754 1755class CreateTest(WriteTestBase, unittest.TestCase): 1756 1757 prefix = "x:" 1758 1759 file_path = os.path.join(TEMPDIR, "spameggs42") 1760 1761 def setUp(self): 1762 os_helper.unlink(tmpname) 1763 1764 @classmethod 1765 def setUpClass(cls): 1766 with open(cls.file_path, "wb") as fobj: 1767 fobj.write(b"aaa") 1768 1769 @classmethod 1770 def tearDownClass(cls): 1771 os_helper.unlink(cls.file_path) 1772 1773 def test_create(self): 1774 with tarfile.open(tmpname, self.mode) as tobj: 1775 tobj.add(self.file_path) 1776 1777 with self.taropen(tmpname) as tobj: 1778 names = tobj.getnames() 1779 self.assertEqual(len(names), 1) 1780 self.assertIn('spameggs42', names[0]) 1781 1782 def test_create_existing(self): 1783 with tarfile.open(tmpname, self.mode) as tobj: 1784 tobj.add(self.file_path) 1785 1786 with self.assertRaises(FileExistsError): 1787 tobj = tarfile.open(tmpname, self.mode) 1788 1789 with self.taropen(tmpname) as tobj: 1790 names = tobj.getnames() 1791 self.assertEqual(len(names), 1) 1792 self.assertIn('spameggs42', names[0]) 1793 1794 def test_create_taropen(self): 1795 with self.taropen(tmpname, "x") as tobj: 1796 tobj.add(self.file_path) 1797 1798 with self.taropen(tmpname) as tobj: 1799 names = tobj.getnames() 1800 self.assertEqual(len(names), 1) 1801 self.assertIn('spameggs42', names[0]) 1802 1803 def test_create_existing_taropen(self): 1804 with self.taropen(tmpname, "x") as tobj: 1805 tobj.add(self.file_path) 1806 1807 with self.assertRaises(FileExistsError): 1808 with self.taropen(tmpname, "x"): 1809 pass 1810 1811 with self.taropen(tmpname) as tobj: 1812 names = tobj.getnames() 1813 self.assertEqual(len(names), 1) 1814 self.assertIn("spameggs42", names[0]) 1815 1816 def test_create_pathlike_name(self): 1817 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1818 self.assertIsInstance(tobj.name, str) 1819 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1820 tobj.add(pathlib.Path(self.file_path)) 1821 names = tobj.getnames() 1822 self.assertEqual(len(names), 1) 1823 self.assertIn('spameggs42', names[0]) 1824 1825 with self.taropen(tmpname) as tobj: 1826 names = tobj.getnames() 1827 self.assertEqual(len(names), 1) 1828 self.assertIn('spameggs42', names[0]) 1829 1830 def test_create_taropen_pathlike_name(self): 1831 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1832 self.assertIsInstance(tobj.name, str) 1833 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1834 tobj.add(pathlib.Path(self.file_path)) 1835 names = tobj.getnames() 1836 self.assertEqual(len(names), 1) 1837 self.assertIn('spameggs42', names[0]) 1838 1839 with self.taropen(tmpname) as tobj: 1840 names = tobj.getnames() 1841 self.assertEqual(len(names), 1) 1842 self.assertIn('spameggs42', names[0]) 1843 1844 1845class GzipCreateTest(GzipTest, CreateTest): 1846 1847 def test_create_with_compresslevel(self): 1848 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1849 tobj.add(self.file_path) 1850 with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj: 1851 pass 1852 1853 1854class Bz2CreateTest(Bz2Test, CreateTest): 1855 1856 def test_create_with_compresslevel(self): 1857 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1858 tobj.add(self.file_path) 1859 with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj: 1860 pass 1861 1862 1863class LzmaCreateTest(LzmaTest, CreateTest): 1864 1865 # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel. 1866 # It does not allow for preset to be specified when reading. 1867 def test_create_with_preset(self): 1868 with tarfile.open(tmpname, self.mode, preset=1) as tobj: 1869 tobj.add(self.file_path) 1870 1871 1872class CreateWithXModeTest(CreateTest): 1873 1874 prefix = "x" 1875 1876 test_create_taropen = None 1877 test_create_existing_taropen = None 1878 1879 1880@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1881class HardlinkTest(unittest.TestCase): 1882 # Test the creation of LNKTYPE (hardlink) members in an archive. 1883 1884 def setUp(self): 1885 self.foo = os.path.join(TEMPDIR, "foo") 1886 self.bar = os.path.join(TEMPDIR, "bar") 1887 1888 with open(self.foo, "wb") as fobj: 1889 fobj.write(b"foo") 1890 1891 try: 1892 os.link(self.foo, self.bar) 1893 except PermissionError as e: 1894 self.skipTest('os.link(): %s' % e) 1895 1896 self.tar = tarfile.open(tmpname, "w") 1897 self.tar.add(self.foo) 1898 1899 def tearDown(self): 1900 self.tar.close() 1901 os_helper.unlink(self.foo) 1902 os_helper.unlink(self.bar) 1903 1904 def test_add_twice(self): 1905 # The same name will be added as a REGTYPE every 1906 # time regardless of st_nlink. 1907 tarinfo = self.tar.gettarinfo(self.foo) 1908 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1909 "add file as regular failed") 1910 1911 def test_add_hardlink(self): 1912 tarinfo = self.tar.gettarinfo(self.bar) 1913 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1914 "add file as hardlink failed") 1915 1916 def test_dereference_hardlink(self): 1917 self.tar.dereference = True 1918 tarinfo = self.tar.gettarinfo(self.bar) 1919 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1920 "dereferencing hardlink failed") 1921 1922 1923class PaxWriteTest(GNUWriteTest): 1924 1925 def _test(self, name, link=None): 1926 # See GNUWriteTest. 1927 tarinfo = tarfile.TarInfo(name) 1928 if link: 1929 tarinfo.linkname = link 1930 tarinfo.type = tarfile.LNKTYPE 1931 1932 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1933 try: 1934 tar.addfile(tarinfo) 1935 finally: 1936 tar.close() 1937 1938 tar = tarfile.open(tmpname) 1939 try: 1940 if link: 1941 l = tar.getmembers()[0].linkname 1942 self.assertEqual(link, l, "PAX longlink creation failed") 1943 else: 1944 n = tar.getmembers()[0].name 1945 self.assertEqual(name, n, "PAX longname creation failed") 1946 finally: 1947 tar.close() 1948 1949 def test_pax_global_header(self): 1950 pax_headers = { 1951 "foo": "bar", 1952 "uid": "0", 1953 "mtime": "1.23", 1954 "test": "\xe4\xf6\xfc", 1955 "\xe4\xf6\xfc": "test"} 1956 1957 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1958 pax_headers=pax_headers) 1959 try: 1960 tar.addfile(tarfile.TarInfo("test")) 1961 finally: 1962 tar.close() 1963 1964 # Test if the global header was written correctly. 1965 tar = tarfile.open(tmpname, encoding="iso8859-1") 1966 try: 1967 self.assertEqual(tar.pax_headers, pax_headers) 1968 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1969 # Test if all the fields are strings. 1970 for key, val in tar.pax_headers.items(): 1971 self.assertIsNot(type(key), bytes) 1972 self.assertIsNot(type(val), bytes) 1973 if key in tarfile.PAX_NUMBER_FIELDS: 1974 try: 1975 tarfile.PAX_NUMBER_FIELDS[key](val) 1976 except (TypeError, ValueError): 1977 self.fail("unable to convert pax header field") 1978 finally: 1979 tar.close() 1980 1981 def test_pax_extended_header(self): 1982 # The fields from the pax header have priority over the 1983 # TarInfo. 1984 pax_headers = {"path": "foo", "uid": "123"} 1985 1986 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1987 encoding="iso8859-1") 1988 try: 1989 t = tarfile.TarInfo() 1990 t.name = "\xe4\xf6\xfc" # non-ASCII 1991 t.uid = 8**8 # too large 1992 t.pax_headers = pax_headers 1993 tar.addfile(t) 1994 finally: 1995 tar.close() 1996 1997 tar = tarfile.open(tmpname, encoding="iso8859-1") 1998 try: 1999 t = tar.getmembers()[0] 2000 self.assertEqual(t.pax_headers, pax_headers) 2001 self.assertEqual(t.name, "foo") 2002 self.assertEqual(t.uid, 123) 2003 finally: 2004 tar.close() 2005 2006 def test_create_pax_header(self): 2007 # The ustar header should contain values that can be 2008 # represented reasonably, even if a better (e.g. higher 2009 # precision) version is set in the pax header. 2010 # Issue #45863 2011 2012 # values that should be kept 2013 t = tarfile.TarInfo() 2014 t.name = "foo" 2015 t.mtime = 1000.1 2016 t.size = 100 2017 t.uid = 123 2018 t.gid = 124 2019 info = t.get_info() 2020 header = t.create_pax_header(info, encoding="iso8859-1") 2021 self.assertEqual(info['name'], "foo") 2022 # mtime should be rounded to nearest second 2023 self.assertIsInstance(info['mtime'], int) 2024 self.assertEqual(info['mtime'], 1000) 2025 self.assertEqual(info['size'], 100) 2026 self.assertEqual(info['uid'], 123) 2027 self.assertEqual(info['gid'], 124) 2028 self.assertEqual(header, 2029 b'././@PaxHeader' + bytes(86) \ 2030 + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \ 2031 + bytes(100) + b'ustar\x0000'+ bytes(247) \ 2032 + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \ 2033 + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \ 2034 + bytes(100) + b'ustar\x0000' + bytes(247)) 2035 2036 # values that should be changed 2037 t = tarfile.TarInfo() 2038 t.name = "foo\u3374" # can't be represented in ascii 2039 t.mtime = 10**10 # too big 2040 t.size = 10**10 # too big 2041 t.uid = 8**8 # too big 2042 t.gid = 8**8+1 # too big 2043 info = t.get_info() 2044 header = t.create_pax_header(info, encoding="iso8859-1") 2045 # name is kept as-is in info but should be added to pax header 2046 self.assertEqual(info['name'], "foo\u3374") 2047 self.assertEqual(info['mtime'], 0) 2048 self.assertEqual(info['size'], 0) 2049 self.assertEqual(info['uid'], 0) 2050 self.assertEqual(info['gid'], 0) 2051 self.assertEqual(header, 2052 b'././@PaxHeader' + bytes(86) \ 2053 + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \ 2054 + bytes(100) + b'ustar\x0000' + bytes(247) \ 2055 + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \ 2056 + b'16 gid=16777217\n20 size=10000000000\n' \ 2057 + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \ 2058 + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \ 2059 + bytes(100) + b'ustar\x0000' + bytes(247)) 2060 2061 2062class UnicodeTest: 2063 2064 def test_iso8859_1_filename(self): 2065 self._test_unicode_filename("iso8859-1") 2066 2067 def test_utf7_filename(self): 2068 self._test_unicode_filename("utf7") 2069 2070 def test_utf8_filename(self): 2071 self._test_unicode_filename("utf-8") 2072 2073 def _test_unicode_filename(self, encoding): 2074 tar = tarfile.open(tmpname, "w", format=self.format, 2075 encoding=encoding, errors="strict") 2076 try: 2077 name = "\xe4\xf6\xfc" 2078 tar.addfile(tarfile.TarInfo(name)) 2079 finally: 2080 tar.close() 2081 2082 tar = tarfile.open(tmpname, encoding=encoding) 2083 try: 2084 self.assertEqual(tar.getmembers()[0].name, name) 2085 finally: 2086 tar.close() 2087 2088 def test_unicode_filename_error(self): 2089 tar = tarfile.open(tmpname, "w", format=self.format, 2090 encoding="ascii", errors="strict") 2091 try: 2092 tarinfo = tarfile.TarInfo() 2093 2094 tarinfo.name = "\xe4\xf6\xfc" 2095 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2096 2097 tarinfo.name = "foo" 2098 tarinfo.uname = "\xe4\xf6\xfc" 2099 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2100 finally: 2101 tar.close() 2102 2103 def test_unicode_argument(self): 2104 tar = tarfile.open(tarname, "r", 2105 encoding="iso8859-1", errors="strict") 2106 try: 2107 for t in tar: 2108 self.assertIs(type(t.name), str) 2109 self.assertIs(type(t.linkname), str) 2110 self.assertIs(type(t.uname), str) 2111 self.assertIs(type(t.gname), str) 2112 finally: 2113 tar.close() 2114 2115 def test_uname_unicode(self): 2116 t = tarfile.TarInfo("foo") 2117 t.uname = "\xe4\xf6\xfc" 2118 t.gname = "\xe4\xf6\xfc" 2119 2120 tar = tarfile.open(tmpname, mode="w", format=self.format, 2121 encoding="iso8859-1") 2122 try: 2123 tar.addfile(t) 2124 finally: 2125 tar.close() 2126 2127 tar = tarfile.open(tmpname, encoding="iso8859-1") 2128 try: 2129 t = tar.getmember("foo") 2130 self.assertEqual(t.uname, "\xe4\xf6\xfc") 2131 self.assertEqual(t.gname, "\xe4\xf6\xfc") 2132 2133 if self.format != tarfile.PAX_FORMAT: 2134 tar.close() 2135 tar = tarfile.open(tmpname, encoding="ascii") 2136 t = tar.getmember("foo") 2137 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 2138 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 2139 finally: 2140 tar.close() 2141 2142 2143class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 2144 2145 format = tarfile.USTAR_FORMAT 2146 2147 # Test whether the utf-8 encoded version of a filename exceeds the 100 2148 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 2149 # bytes). 2150 def test_unicode_name1(self): 2151 self._test_ustar_name("0123456789" * 10) 2152 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 2153 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 2154 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 2155 2156 def test_unicode_name2(self): 2157 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 2158 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 2159 2160 # Test whether the utf-8 encoded version of a filename exceeds the 155 2161 # bytes prefix + '/' + 100 bytes name limit. 2162 def test_unicode_longname1(self): 2163 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 2164 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 2165 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 2166 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 2167 2168 def test_unicode_longname2(self): 2169 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 2170 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 2171 2172 def test_unicode_longname3(self): 2173 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 2174 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 2175 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 2176 2177 def test_unicode_longname4(self): 2178 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 2179 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 2180 2181 def _test_ustar_name(self, name, exc=None): 2182 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2183 t = tarfile.TarInfo(name) 2184 if exc is None: 2185 tar.addfile(t) 2186 else: 2187 self.assertRaises(exc, tar.addfile, t) 2188 2189 if exc is None: 2190 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2191 for t in tar: 2192 self.assertEqual(name, t.name) 2193 break 2194 2195 # Test the same as above for the 100 bytes link field. 2196 def test_unicode_link1(self): 2197 self._test_ustar_link("0123456789" * 10) 2198 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 2199 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 2200 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 2201 2202 def test_unicode_link2(self): 2203 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 2204 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 2205 2206 def _test_ustar_link(self, name, exc=None): 2207 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2208 t = tarfile.TarInfo("foo") 2209 t.linkname = name 2210 if exc is None: 2211 tar.addfile(t) 2212 else: 2213 self.assertRaises(exc, tar.addfile, t) 2214 2215 if exc is None: 2216 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2217 for t in tar: 2218 self.assertEqual(name, t.linkname) 2219 break 2220 2221 2222class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2223 2224 format = tarfile.GNU_FORMAT 2225 2226 def test_bad_pax_header(self): 2227 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2228 # without a hdrcharset=BINARY header. 2229 for encoding, name in ( 2230 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2231 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2232 with tarfile.open(tarname, encoding=encoding, 2233 errors="surrogateescape") as tar: 2234 try: 2235 t = tar.getmember(name) 2236 except KeyError: 2237 self.fail("unable to read bad GNU tar pax header") 2238 2239 2240class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2241 2242 format = tarfile.PAX_FORMAT 2243 2244 # PAX_FORMAT ignores encoding in write mode. 2245 test_unicode_filename_error = None 2246 2247 def test_binary_header(self): 2248 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2249 for encoding, name in ( 2250 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2251 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2252 with tarfile.open(tarname, encoding=encoding, 2253 errors="surrogateescape") as tar: 2254 try: 2255 t = tar.getmember(name) 2256 except KeyError: 2257 self.fail("unable to read POSIX.1-2008 binary header") 2258 2259 2260class AppendTestBase: 2261 # Test append mode (cp. patch #1652681). 2262 2263 def setUp(self): 2264 self.tarname = tmpname 2265 if os.path.exists(self.tarname): 2266 os_helper.unlink(self.tarname) 2267 2268 def _create_testtar(self, mode="w:"): 2269 with tarfile.open(tarname, encoding="iso8859-1") as src: 2270 t = src.getmember("ustar/regtype") 2271 t.name = "foo" 2272 with src.extractfile(t) as f: 2273 with tarfile.open(self.tarname, mode) as tar: 2274 tar.addfile(t, f) 2275 2276 def test_append_compressed(self): 2277 self._create_testtar("w:" + self.suffix) 2278 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2279 2280class AppendTest(AppendTestBase, unittest.TestCase): 2281 test_append_compressed = None 2282 2283 def _add_testfile(self, fileobj=None): 2284 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2285 tar.addfile(tarfile.TarInfo("bar")) 2286 2287 def _test(self, names=["bar"], fileobj=None): 2288 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2289 self.assertEqual(tar.getnames(), names) 2290 2291 def test_non_existing(self): 2292 self._add_testfile() 2293 self._test() 2294 2295 def test_empty(self): 2296 tarfile.open(self.tarname, "w:").close() 2297 self._add_testfile() 2298 self._test() 2299 2300 def test_empty_fileobj(self): 2301 fobj = io.BytesIO(b"\0" * 1024) 2302 self._add_testfile(fobj) 2303 fobj.seek(0) 2304 self._test(fileobj=fobj) 2305 2306 def test_fileobj(self): 2307 self._create_testtar() 2308 with open(self.tarname, "rb") as fobj: 2309 data = fobj.read() 2310 fobj = io.BytesIO(data) 2311 self._add_testfile(fobj) 2312 fobj.seek(0) 2313 self._test(names=["foo", "bar"], fileobj=fobj) 2314 2315 def test_existing(self): 2316 self._create_testtar() 2317 self._add_testfile() 2318 self._test(names=["foo", "bar"]) 2319 2320 # Append mode is supposed to fail if the tarfile to append to 2321 # does not end with a zero block. 2322 def _test_error(self, data): 2323 with open(self.tarname, "wb") as fobj: 2324 fobj.write(data) 2325 self.assertRaises(tarfile.ReadError, self._add_testfile) 2326 2327 def test_null(self): 2328 self._test_error(b"") 2329 2330 def test_incomplete(self): 2331 self._test_error(b"\0" * 13) 2332 2333 def test_premature_eof(self): 2334 data = tarfile.TarInfo("foo").tobuf() 2335 self._test_error(data) 2336 2337 def test_trailing_garbage(self): 2338 data = tarfile.TarInfo("foo").tobuf() 2339 self._test_error(data + b"\0" * 13) 2340 2341 def test_invalid(self): 2342 self._test_error(b"a" * 512) 2343 2344class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2345 pass 2346 2347class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2348 pass 2349 2350class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2351 pass 2352 2353 2354class LimitsTest(unittest.TestCase): 2355 2356 def test_ustar_limits(self): 2357 # 100 char name 2358 tarinfo = tarfile.TarInfo("0123456789" * 10) 2359 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2360 2361 # 101 char name that cannot be stored 2362 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2363 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2364 2365 # 256 char name with a slash at pos 156 2366 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2367 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2368 2369 # 256 char name that cannot be stored 2370 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2371 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2372 2373 # 512 char name 2374 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2375 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2376 2377 # 512 char linkname 2378 tarinfo = tarfile.TarInfo("longlink") 2379 tarinfo.linkname = "123/" * 126 + "longname" 2380 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2381 2382 # uid > 8 digits 2383 tarinfo = tarfile.TarInfo("name") 2384 tarinfo.uid = 0o10000000 2385 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2386 2387 def test_gnu_limits(self): 2388 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2389 tarinfo.tobuf(tarfile.GNU_FORMAT) 2390 2391 tarinfo = tarfile.TarInfo("longlink") 2392 tarinfo.linkname = "123/" * 126 + "longname" 2393 tarinfo.tobuf(tarfile.GNU_FORMAT) 2394 2395 # uid >= 256 ** 7 2396 tarinfo = tarfile.TarInfo("name") 2397 tarinfo.uid = 0o4000000000000000000 2398 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2399 2400 def test_pax_limits(self): 2401 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2402 tarinfo.tobuf(tarfile.PAX_FORMAT) 2403 2404 tarinfo = tarfile.TarInfo("longlink") 2405 tarinfo.linkname = "123/" * 126 + "longname" 2406 tarinfo.tobuf(tarfile.PAX_FORMAT) 2407 2408 tarinfo = tarfile.TarInfo("name") 2409 tarinfo.uid = 0o4000000000000000000 2410 tarinfo.tobuf(tarfile.PAX_FORMAT) 2411 2412 2413class MiscTest(unittest.TestCase): 2414 2415 def test_char_fields(self): 2416 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2417 b"foo\0\0\0\0\0") 2418 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2419 b"foo") 2420 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2421 "foo") 2422 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2423 "foo") 2424 2425 def test_read_number_fields(self): 2426 # Issue 13158: Test if GNU tar specific base-256 number fields 2427 # are decoded correctly. 2428 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2429 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2430 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2431 0o10000000) 2432 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2433 0xffffffff) 2434 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2435 -1) 2436 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2437 -100) 2438 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2439 -0x100000000000000) 2440 2441 # Issue 24514: Test if empty number fields are converted to zero. 2442 self.assertEqual(tarfile.nti(b"\0"), 0) 2443 self.assertEqual(tarfile.nti(b" \0"), 0) 2444 2445 def test_write_number_fields(self): 2446 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2447 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2448 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2449 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2450 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2451 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2452 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2453 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2454 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2455 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2456 self.assertEqual(tarfile.itn(-0x100000000000000, 2457 format=tarfile.GNU_FORMAT), 2458 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2459 2460 # Issue 32713: Test if itn() supports float values outside the 2461 # non-GNU format range 2462 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2463 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2464 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2465 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2466 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2467 2468 def test_number_field_limits(self): 2469 with self.assertRaises(ValueError): 2470 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2471 with self.assertRaises(ValueError): 2472 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2473 with self.assertRaises(ValueError): 2474 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2475 with self.assertRaises(ValueError): 2476 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2477 2478 def test__all__(self): 2479 not_exported = { 2480 'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE', 2481 'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME', 2482 'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2483 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE', 2484 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE', 2485 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES', 2486 'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS', 2487 'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 2488 'copyfileobj', 'filemode', 'EmptyHeaderError', 2489 'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError', 2490 'SubsequentHeaderError', 'ExFileObject', 'main', 2491 "fully_trusted_filter", "data_filter", 2492 "tar_filter", "FilterError", "AbsoluteLinkError", 2493 "OutsideDestinationError", "SpecialFileError", "AbsolutePathError", 2494 "LinkOutsideDestinationError", "LinkFallbackError", 2495 } 2496 support.check__all__(self, tarfile, not_exported=not_exported) 2497 2498 def test_useful_error_message_when_modules_missing(self): 2499 fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz') 2500 with self.assertRaises(tarfile.ReadError) as excinfo: 2501 error = tarfile.CompressionError('lzma module is not available'), 2502 with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error): 2503 tarfile.open(fname) 2504 2505 self.assertIn( 2506 "\n- method xz: CompressionError('lzma module is not available')\n", 2507 str(excinfo.exception), 2508 ) 2509 2510 @unittest.skipUnless(os_helper.can_symlink(), 'requires symlink support') 2511 @unittest.skipUnless(hasattr(os, 'chmod'), "missing os.chmod") 2512 @unittest.mock.patch('os.chmod') 2513 def test_deferred_directory_attributes_update(self, mock_chmod): 2514 # Regression test for gh-127987: setting attributes on arbitrary files 2515 tempdir = os.path.join(TEMPDIR, 'test127987') 2516 def mock_chmod_side_effect(path, mode, **kwargs): 2517 target_path = os.path.realpath(path) 2518 if os.path.commonpath([target_path, tempdir]) != tempdir: 2519 raise Exception("should not try to chmod anything outside the destination", target_path) 2520 mock_chmod.side_effect = mock_chmod_side_effect 2521 2522 outside_tree_dir = os.path.join(TEMPDIR, 'outside_tree_dir') 2523 with ArchiveMaker() as arc: 2524 arc.add('x', symlink_to='.') 2525 arc.add('x', type=tarfile.DIRTYPE, mode='?rwsrwsrwt') 2526 arc.add('x', symlink_to=outside_tree_dir) 2527 2528 os.makedirs(outside_tree_dir) 2529 try: 2530 arc.open().extractall(path=tempdir, filter='tar') 2531 finally: 2532 os_helper.rmtree(outside_tree_dir) 2533 os_helper.rmtree(tempdir) 2534 2535 2536class CommandLineTest(unittest.TestCase): 2537 2538 def tarfilecmd(self, *args, **kwargs): 2539 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2540 **kwargs) 2541 return out.replace(os.linesep.encode(), b'\n') 2542 2543 def tarfilecmd_failure(self, *args): 2544 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2545 2546 def make_simple_tarfile(self, tar_name): 2547 files = [support.findfile('tokenize_tests.txt'), 2548 support.findfile('tokenize_tests-no-coding-cookie-' 2549 'and-utf8-bom-sig-only.txt')] 2550 self.addCleanup(os_helper.unlink, tar_name) 2551 with tarfile.open(tar_name, 'w') as tf: 2552 for tardata in files: 2553 tf.add(tardata, arcname=os.path.basename(tardata)) 2554 2555 def make_evil_tarfile(self, tar_name): 2556 files = [support.findfile('tokenize_tests.txt')] 2557 self.addCleanup(os_helper.unlink, tar_name) 2558 with tarfile.open(tar_name, 'w') as tf: 2559 benign = tarfile.TarInfo('benign') 2560 tf.addfile(benign, fileobj=io.BytesIO(b'')) 2561 evil = tarfile.TarInfo('../evil') 2562 tf.addfile(evil, fileobj=io.BytesIO(b'')) 2563 2564 def test_bad_use(self): 2565 rc, out, err = self.tarfilecmd_failure() 2566 self.assertEqual(out, b'') 2567 self.assertIn(b'usage', err.lower()) 2568 self.assertIn(b'error', err.lower()) 2569 self.assertIn(b'required', err.lower()) 2570 rc, out, err = self.tarfilecmd_failure('-l', '') 2571 self.assertEqual(out, b'') 2572 self.assertNotEqual(err.strip(), b'') 2573 2574 def test_test_command(self): 2575 for tar_name in testtarnames: 2576 for opt in '-t', '--test': 2577 out = self.tarfilecmd(opt, tar_name) 2578 self.assertEqual(out, b'') 2579 2580 def test_test_command_verbose(self): 2581 for tar_name in testtarnames: 2582 for opt in '-v', '--verbose': 2583 out = self.tarfilecmd(opt, '-t', tar_name, 2584 PYTHONIOENCODING='utf-8') 2585 self.assertIn(b'is a tar archive.\n', out) 2586 2587 def test_test_command_invalid_file(self): 2588 zipname = support.findfile('zipdir.zip') 2589 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2590 self.assertIn(b' is not a tar archive.', err) 2591 self.assertEqual(out, b'') 2592 self.assertEqual(rc, 1) 2593 2594 for tar_name in testtarnames: 2595 with self.subTest(tar_name=tar_name): 2596 with open(tar_name, 'rb') as f: 2597 data = f.read() 2598 try: 2599 with open(tmpname, 'wb') as f: 2600 f.write(data[:511]) 2601 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2602 self.assertEqual(out, b'') 2603 self.assertEqual(rc, 1) 2604 finally: 2605 os_helper.unlink(tmpname) 2606 2607 def test_list_command(self): 2608 for tar_name in testtarnames: 2609 with support.captured_stdout() as t: 2610 with tarfile.open(tar_name, 'r') as tf: 2611 tf.list(verbose=False) 2612 expected = t.getvalue().encode('ascii', 'backslashreplace') 2613 for opt in '-l', '--list': 2614 out = self.tarfilecmd(opt, tar_name, 2615 PYTHONIOENCODING='ascii') 2616 self.assertEqual(out, expected) 2617 2618 def test_list_command_verbose(self): 2619 for tar_name in testtarnames: 2620 with support.captured_stdout() as t: 2621 with tarfile.open(tar_name, 'r') as tf: 2622 tf.list(verbose=True) 2623 expected = t.getvalue().encode('ascii', 'backslashreplace') 2624 for opt in '-v', '--verbose': 2625 out = self.tarfilecmd(opt, '-l', tar_name, 2626 PYTHONIOENCODING='ascii') 2627 self.assertEqual(out, expected) 2628 2629 def test_list_command_invalid_file(self): 2630 zipname = support.findfile('zipdir.zip') 2631 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2632 self.assertIn(b' is not a tar archive.', err) 2633 self.assertEqual(out, b'') 2634 self.assertEqual(rc, 1) 2635 2636 def test_create_command(self): 2637 files = [support.findfile('tokenize_tests.txt'), 2638 support.findfile('tokenize_tests-no-coding-cookie-' 2639 'and-utf8-bom-sig-only.txt')] 2640 for opt in '-c', '--create': 2641 try: 2642 out = self.tarfilecmd(opt, tmpname, *files) 2643 self.assertEqual(out, b'') 2644 with tarfile.open(tmpname) as tar: 2645 tar.getmembers() 2646 finally: 2647 os_helper.unlink(tmpname) 2648 2649 def test_create_command_verbose(self): 2650 files = [support.findfile('tokenize_tests.txt'), 2651 support.findfile('tokenize_tests-no-coding-cookie-' 2652 'and-utf8-bom-sig-only.txt')] 2653 for opt in '-v', '--verbose': 2654 try: 2655 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2656 PYTHONIOENCODING='utf-8') 2657 self.assertIn(b' file created.', out) 2658 with tarfile.open(tmpname) as tar: 2659 tar.getmembers() 2660 finally: 2661 os_helper.unlink(tmpname) 2662 2663 def test_create_command_dotless_filename(self): 2664 files = [support.findfile('tokenize_tests.txt')] 2665 try: 2666 out = self.tarfilecmd('-c', dotlessname, *files) 2667 self.assertEqual(out, b'') 2668 with tarfile.open(dotlessname) as tar: 2669 tar.getmembers() 2670 finally: 2671 os_helper.unlink(dotlessname) 2672 2673 def test_create_command_dot_started_filename(self): 2674 tar_name = os.path.join(TEMPDIR, ".testtar") 2675 files = [support.findfile('tokenize_tests.txt')] 2676 try: 2677 out = self.tarfilecmd('-c', tar_name, *files) 2678 self.assertEqual(out, b'') 2679 with tarfile.open(tar_name) as tar: 2680 tar.getmembers() 2681 finally: 2682 os_helper.unlink(tar_name) 2683 2684 def test_create_command_compressed(self): 2685 files = [support.findfile('tokenize_tests.txt'), 2686 support.findfile('tokenize_tests-no-coding-cookie-' 2687 'and-utf8-bom-sig-only.txt')] 2688 for filetype in (GzipTest, Bz2Test, LzmaTest): 2689 if not filetype.open: 2690 continue 2691 try: 2692 tar_name = tmpname + '.' + filetype.suffix 2693 out = self.tarfilecmd('-c', tar_name, *files) 2694 with filetype.taropen(tar_name) as tar: 2695 tar.getmembers() 2696 finally: 2697 os_helper.unlink(tar_name) 2698 2699 def test_extract_command(self): 2700 self.make_simple_tarfile(tmpname) 2701 for opt in '-e', '--extract': 2702 try: 2703 with os_helper.temp_cwd(tarextdir): 2704 out = self.tarfilecmd(opt, tmpname) 2705 self.assertEqual(out, b'') 2706 finally: 2707 os_helper.rmtree(tarextdir) 2708 2709 def test_extract_command_verbose(self): 2710 self.make_simple_tarfile(tmpname) 2711 for opt in '-v', '--verbose': 2712 try: 2713 with os_helper.temp_cwd(tarextdir): 2714 out = self.tarfilecmd(opt, '-e', tmpname, 2715 PYTHONIOENCODING='utf-8') 2716 self.assertIn(b' file is extracted.', out) 2717 finally: 2718 os_helper.rmtree(tarextdir) 2719 2720 def test_extract_command_filter(self): 2721 self.make_evil_tarfile(tmpname) 2722 # Make an inner directory, so the member named '../evil' 2723 # is still extracted into `tarextdir` 2724 destdir = os.path.join(tarextdir, 'dest') 2725 os.mkdir(tarextdir) 2726 try: 2727 with os_helper.temp_cwd(destdir): 2728 self.tarfilecmd_failure('-e', tmpname, 2729 '-v', 2730 '--filter', 'data') 2731 out = self.tarfilecmd('-e', tmpname, 2732 '-v', 2733 '--filter', 'fully_trusted', 2734 PYTHONIOENCODING='utf-8') 2735 self.assertIn(b' file is extracted.', out) 2736 finally: 2737 os_helper.rmtree(tarextdir) 2738 2739 def test_extract_command_different_directory(self): 2740 self.make_simple_tarfile(tmpname) 2741 try: 2742 with os_helper.temp_cwd(tarextdir): 2743 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2744 self.assertEqual(out, b'') 2745 finally: 2746 os_helper.rmtree(tarextdir) 2747 2748 def test_extract_command_invalid_file(self): 2749 zipname = support.findfile('zipdir.zip') 2750 with os_helper.temp_cwd(tarextdir): 2751 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2752 self.assertIn(b' is not a tar archive.', err) 2753 self.assertEqual(out, b'') 2754 self.assertEqual(rc, 1) 2755 2756 2757class ContextManagerTest(unittest.TestCase): 2758 2759 def test_basic(self): 2760 with tarfile.open(tarname) as tar: 2761 self.assertFalse(tar.closed, "closed inside runtime context") 2762 self.assertTrue(tar.closed, "context manager failed") 2763 2764 def test_closed(self): 2765 # The __enter__() method is supposed to raise OSError 2766 # if the TarFile object is already closed. 2767 tar = tarfile.open(tarname) 2768 tar.close() 2769 with self.assertRaises(OSError): 2770 with tar: 2771 pass 2772 2773 def test_exception(self): 2774 # Test if the OSError exception is passed through properly. 2775 with self.assertRaises(Exception) as exc: 2776 with tarfile.open(tarname) as tar: 2777 raise OSError 2778 self.assertIsInstance(exc.exception, OSError, 2779 "wrong exception raised in context manager") 2780 self.assertTrue(tar.closed, "context manager failed") 2781 2782 def test_no_eof(self): 2783 # __exit__() must not write end-of-archive blocks if an 2784 # exception was raised. 2785 try: 2786 with tarfile.open(tmpname, "w") as tar: 2787 raise Exception 2788 except: 2789 pass 2790 self.assertEqual(os.path.getsize(tmpname), 0, 2791 "context manager wrote an end-of-archive block") 2792 self.assertTrue(tar.closed, "context manager failed") 2793 2794 def test_eof(self): 2795 # __exit__() must write end-of-archive blocks, i.e. call 2796 # TarFile.close() if there was no error. 2797 with tarfile.open(tmpname, "w"): 2798 pass 2799 self.assertNotEqual(os.path.getsize(tmpname), 0, 2800 "context manager wrote no end-of-archive block") 2801 2802 def test_fileobj(self): 2803 # Test that __exit__() did not close the external file 2804 # object. 2805 with open(tmpname, "wb") as fobj: 2806 try: 2807 with tarfile.open(fileobj=fobj, mode="w") as tar: 2808 raise Exception 2809 except: 2810 pass 2811 self.assertFalse(fobj.closed, "external file object was closed") 2812 self.assertTrue(tar.closed, "context manager failed") 2813 2814 2815@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2816class LinkEmulationTest(ReadTest, unittest.TestCase): 2817 2818 # Test for issue #8741 regression. On platforms that do not support 2819 # symbolic or hard links tarfile tries to extract these types of members 2820 # as the regular files they point to. 2821 def _test_link_extraction(self, name): 2822 self.tar.extract(name, TEMPDIR, filter='fully_trusted') 2823 with open(os.path.join(TEMPDIR, name), "rb") as f: 2824 data = f.read() 2825 self.assertEqual(sha256sum(data), sha256_regtype) 2826 2827 # See issues #1578269, #8879, and #17689 for some history on these skips 2828 @unittest.skipIf(hasattr(os.path, "islink"), 2829 "Skip emulation - has os.path.islink but not os.link") 2830 def test_hardlink_extraction1(self): 2831 self._test_link_extraction("ustar/lnktype") 2832 2833 @unittest.skipIf(hasattr(os.path, "islink"), 2834 "Skip emulation - has os.path.islink but not os.link") 2835 def test_hardlink_extraction2(self): 2836 self._test_link_extraction("./ustar/linktest2/lnktype") 2837 2838 @unittest.skipIf(hasattr(os, "symlink"), 2839 "Skip emulation if symlink exists") 2840 def test_symlink_extraction1(self): 2841 self._test_link_extraction("ustar/symtype") 2842 2843 @unittest.skipIf(hasattr(os, "symlink"), 2844 "Skip emulation if symlink exists") 2845 def test_symlink_extraction2(self): 2846 self._test_link_extraction("./ustar/linktest2/symtype") 2847 2848 2849class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2850 # Issue5068: The _BZ2Proxy.read() method loops forever 2851 # on an empty or partial bzipped file. 2852 2853 def _test_partial_input(self, mode): 2854 class MyBytesIO(io.BytesIO): 2855 hit_eof = False 2856 def read(self, n): 2857 if self.hit_eof: 2858 raise AssertionError("infinite loop detected in " 2859 "tarfile.open()") 2860 self.hit_eof = self.tell() == len(self.getvalue()) 2861 return super(MyBytesIO, self).read(n) 2862 def seek(self, *args): 2863 self.hit_eof = False 2864 return super(MyBytesIO, self).seek(*args) 2865 2866 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2867 for x in range(len(data) + 1): 2868 try: 2869 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2870 except tarfile.ReadError: 2871 pass # we have no interest in ReadErrors 2872 2873 def test_partial_input(self): 2874 self._test_partial_input("r") 2875 2876 def test_partial_input_bz2(self): 2877 self._test_partial_input("r:bz2") 2878 2879 2880def root_is_uid_gid_0(): 2881 try: 2882 import pwd, grp 2883 except ImportError: 2884 return False 2885 if pwd.getpwuid(0)[0] != 'root': 2886 return False 2887 if grp.getgrgid(0)[0] != 'root': 2888 return False 2889 return True 2890 2891 2892@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2893@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2894class NumericOwnerTest(unittest.TestCase): 2895 # mock the following: 2896 # os.chown: so we can test what's being called 2897 # os.chmod: so the modes are not actually changed. if they are, we can't 2898 # delete the files/directories 2899 # os.geteuid: so we can lie and say we're root (uid = 0) 2900 2901 @staticmethod 2902 def _make_test_archive(filename_1, dirname_1, filename_2): 2903 # the file contents to write 2904 fobj = io.BytesIO(b"content") 2905 2906 # create a tar file with a file, a directory, and a file within that 2907 # directory. Assign various .uid/.gid values to them 2908 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2909 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2910 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2911 ] 2912 with tarfile.open(tmpname, 'w') as tarfl: 2913 for name, uid, gid, typ, contents in items: 2914 t = tarfile.TarInfo(name) 2915 t.uid = uid 2916 t.gid = gid 2917 t.uname = 'root' 2918 t.gname = 'root' 2919 t.type = typ 2920 tarfl.addfile(t, contents) 2921 2922 # return the full pathname to the tar file 2923 return tmpname 2924 2925 @staticmethod 2926 @contextmanager 2927 def _setup_test(mock_geteuid): 2928 mock_geteuid.return_value = 0 # lie and say we're root 2929 fname = 'numeric-owner-testfile' 2930 dirname = 'dir' 2931 2932 # the names we want stored in the tarfile 2933 filename_1 = fname 2934 dirname_1 = dirname 2935 filename_2 = os.path.join(dirname, fname) 2936 2937 # create the tarfile with the contents we're after 2938 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2939 dirname_1, 2940 filename_2) 2941 2942 # open the tarfile for reading. yield it and the names of the items 2943 # we stored into the file 2944 with tarfile.open(tar_filename) as tarfl: 2945 yield tarfl, filename_1, dirname_1, filename_2 2946 2947 @unittest.mock.patch('os.chown') 2948 @unittest.mock.patch('os.chmod') 2949 @unittest.mock.patch('os.geteuid') 2950 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2951 mock_chown): 2952 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2953 filename_2): 2954 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True, 2955 filter='fully_trusted') 2956 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True, 2957 filter='fully_trusted') 2958 2959 # convert to filesystem paths 2960 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2961 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2962 2963 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2964 unittest.mock.call(f_filename_2, 88, 87), 2965 ], 2966 any_order=True) 2967 2968 @unittest.mock.patch('os.chown') 2969 @unittest.mock.patch('os.chmod') 2970 @unittest.mock.patch('os.geteuid') 2971 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2972 mock_chown): 2973 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2974 filename_2): 2975 tarfl.extractall(TEMPDIR, numeric_owner=True, 2976 filter='fully_trusted') 2977 2978 # convert to filesystem paths 2979 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2980 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2981 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2982 2983 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2984 unittest.mock.call(f_dirname_1, 77, 76), 2985 unittest.mock.call(f_filename_2, 88, 87), 2986 ], 2987 any_order=True) 2988 2989 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2990 # because the uname and gname in the test file are 'root', and extract() 2991 # will look them up using pwd and grp to find their uid and gid, which we 2992 # test here to be 0. 2993 @unittest.skipUnless(root_is_uid_gid_0(), 2994 'uid=0,gid=0 must be named "root"') 2995 @unittest.mock.patch('os.chown') 2996 @unittest.mock.patch('os.chmod') 2997 @unittest.mock.patch('os.geteuid') 2998 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2999 mock_chown): 3000 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 3001 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False, 3002 filter='fully_trusted') 3003 3004 # convert to filesystem paths 3005 f_filename_1 = os.path.join(TEMPDIR, filename_1) 3006 3007 mock_chown.assert_called_with(f_filename_1, 0, 0) 3008 3009 @unittest.mock.patch('os.geteuid') 3010 def test_keyword_only(self, mock_geteuid): 3011 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 3012 self.assertRaises(TypeError, 3013 tarfl.extract, filename_1, TEMPDIR, False, True) 3014 3015 3016class ReplaceTests(ReadTest, unittest.TestCase): 3017 def test_replace_name(self): 3018 member = self.tar.getmember('ustar/regtype') 3019 replaced = member.replace(name='misc/other') 3020 self.assertEqual(replaced.name, 'misc/other') 3021 self.assertEqual(member.name, 'ustar/regtype') 3022 self.assertEqual(self.tar.getmember('ustar/regtype').name, 3023 'ustar/regtype') 3024 3025 def test_replace_deep(self): 3026 member = self.tar.getmember('pax/regtype1') 3027 replaced = member.replace() 3028 replaced.pax_headers['gname'] = 'not-bar' 3029 self.assertEqual(member.pax_headers['gname'], 'bar') 3030 self.assertEqual( 3031 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar') 3032 3033 def test_replace_shallow(self): 3034 member = self.tar.getmember('pax/regtype1') 3035 replaced = member.replace(deep=False) 3036 replaced.pax_headers['gname'] = 'not-bar' 3037 self.assertEqual(member.pax_headers['gname'], 'not-bar') 3038 self.assertEqual( 3039 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar') 3040 3041 def test_replace_all(self): 3042 member = self.tar.getmember('ustar/regtype') 3043 for attr_name in ('name', 'mtime', 'mode', 'linkname', 3044 'uid', 'gid', 'uname', 'gname'): 3045 with self.subTest(attr_name=attr_name): 3046 replaced = member.replace(**{attr_name: None}) 3047 self.assertEqual(getattr(replaced, attr_name), None) 3048 self.assertNotEqual(getattr(member, attr_name), None) 3049 3050 def test_replace_internal(self): 3051 member = self.tar.getmember('ustar/regtype') 3052 with self.assertRaises(TypeError): 3053 member.replace(offset=123456789) 3054 3055 3056class NoneInfoExtractTests(ReadTest): 3057 # These mainly check that all kinds of members are extracted successfully 3058 # if some metadata is None. 3059 # Some of the methods do additional spot checks. 3060 3061 # We also test that the default filters can deal with None. 3062 3063 extraction_filter = None 3064 3065 @classmethod 3066 def setUpClass(cls): 3067 tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") 3068 cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl" 3069 tar.errorlevel = 0 3070 tar.extractall(cls.control_dir, filter=cls.extraction_filter) 3071 tar.close() 3072 cls.control_paths = set( 3073 p.relative_to(cls.control_dir) 3074 for p in pathlib.Path(cls.control_dir).glob('**/*')) 3075 3076 @classmethod 3077 def tearDownClass(cls): 3078 shutil.rmtree(cls.control_dir) 3079 3080 def check_files_present(self, directory): 3081 got_paths = set( 3082 p.relative_to(directory) 3083 for p in pathlib.Path(directory).glob('**/*')) 3084 if self.extraction_filter == 'data': 3085 # The 'data' filter is expected to reject special files 3086 for path in 'ustar/fifotype', 'ustar/blktype', 'ustar/chrtype': 3087 got_paths.discard(pathlib.Path(path)) 3088 self.assertEqual(self.control_paths, got_paths) 3089 3090 @contextmanager 3091 def extract_with_none(self, *attr_names): 3092 DIR = pathlib.Path(TEMPDIR) / "extractall_none" 3093 self.tar.errorlevel = 0 3094 for member in self.tar.getmembers(): 3095 for attr_name in attr_names: 3096 setattr(member, attr_name, None) 3097 with os_helper.temp_dir(DIR): 3098 self.tar.extractall(DIR, filter='fully_trusted') 3099 self.check_files_present(DIR) 3100 yield DIR 3101 3102 def test_extractall_none_mtime(self): 3103 # mtimes of extracted files should be later than 'now' -- the mtime 3104 # of a previously created directory. 3105 now = pathlib.Path(TEMPDIR).stat().st_mtime 3106 with self.extract_with_none('mtime') as DIR: 3107 for path in pathlib.Path(DIR).glob('**/*'): 3108 with self.subTest(path=path): 3109 try: 3110 mtime = path.stat().st_mtime 3111 except OSError: 3112 # Some systems can't stat symlinks, ignore those 3113 if not path.is_symlink(): 3114 raise 3115 else: 3116 self.assertGreaterEqual(path.stat().st_mtime, now) 3117 3118 def test_extractall_none_mode(self): 3119 # modes of directories and regular files should match the mode 3120 # of a "normally" created directory or regular file 3121 dir_mode = pathlib.Path(TEMPDIR).stat().st_mode 3122 regular_file = pathlib.Path(TEMPDIR) / 'regular_file' 3123 regular_file.write_text('') 3124 regular_file_mode = regular_file.stat().st_mode 3125 with self.extract_with_none('mode') as DIR: 3126 for path in pathlib.Path(DIR).glob('**/*'): 3127 with self.subTest(path=path): 3128 if path.is_dir(): 3129 self.assertEqual(path.stat().st_mode, dir_mode) 3130 elif path.is_file(): 3131 self.assertEqual(path.stat().st_mode, 3132 regular_file_mode) 3133 3134 def test_extractall_none_uid(self): 3135 with self.extract_with_none('uid'): 3136 pass 3137 3138 def test_extractall_none_gid(self): 3139 with self.extract_with_none('gid'): 3140 pass 3141 3142 def test_extractall_none_uname(self): 3143 with self.extract_with_none('uname'): 3144 pass 3145 3146 def test_extractall_none_gname(self): 3147 with self.extract_with_none('gname'): 3148 pass 3149 3150 def test_extractall_none_ownership(self): 3151 with self.extract_with_none('uid', 'gid', 'uname', 'gname'): 3152 pass 3153 3154class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase): 3155 extraction_filter = 'data' 3156 3157class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, 3158 unittest.TestCase): 3159 extraction_filter = 'fully_trusted' 3160 3161class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): 3162 extraction_filter = 'tar' 3163 3164class NoneInfoExtractTests_Default(NoneInfoExtractTests, 3165 unittest.TestCase): 3166 extraction_filter = None 3167 3168class NoneInfoTests_Misc(unittest.TestCase): 3169 def test_add(self): 3170 # When addfile() encounters None metadata, it raises a ValueError 3171 bio = io.BytesIO() 3172 for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT, 3173 tarfile.PAX_FORMAT): 3174 with self.subTest(tarformat=tarformat): 3175 tar = tarfile.open(fileobj=bio, mode='w', format=tarformat) 3176 tarinfo = tar.gettarinfo(tarname) 3177 try: 3178 tar.addfile(tarinfo) 3179 except Exception: 3180 if tarformat == tarfile.USTAR_FORMAT: 3181 # In the old, limited format, adding might fail for 3182 # reasons like the UID being too large 3183 pass 3184 else: 3185 raise 3186 else: 3187 for attr_name in ('mtime', 'mode', 'uid', 'gid', 3188 'uname', 'gname'): 3189 with self.subTest(attr_name=attr_name): 3190 replaced = tarinfo.replace(**{attr_name: None}) 3191 with self.assertRaisesRegex(ValueError, 3192 f"{attr_name}"): 3193 tar.addfile(replaced) 3194 3195 def test_list(self): 3196 # Change some metadata to None, then compare list() output 3197 # word-for-word. We want list() to not raise, and to only change 3198 # printout for the affected piece of metadata. 3199 # (n.b.: some contents of the test archive are hardcoded.) 3200 for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, 3201 {'uname'}, {'gname'}, 3202 {'uid', 'uname'}, {'gid', 'gname'}): 3203 with (self.subTest(attr_names=attr_names), 3204 tarfile.open(tarname, encoding="iso8859-1") as tar): 3205 tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3206 with support.swap_attr(sys, 'stdout', tio_prev): 3207 tar.list() 3208 for member in tar.getmembers(): 3209 for attr_name in attr_names: 3210 setattr(member, attr_name, None) 3211 tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3212 with support.swap_attr(sys, 'stdout', tio_new): 3213 tar.list() 3214 for expected, got in zip(tio_prev.detach().getvalue().split(), 3215 tio_new.detach().getvalue().split()): 3216 if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected): 3217 self.assertEqual(got, b'????-??-??') 3218 elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected): 3219 self.assertEqual(got, b'??:??:??') 3220 elif attr_names == {'mode'} and re.match( 3221 rb'.([r-][w-][x-]){3}', expected): 3222 self.assertEqual(got, b'??????????') 3223 elif attr_names == {'uname'} and expected.startswith( 3224 (b'tarfile/', b'lars/', b'foo/')): 3225 exp_user, exp_group = expected.split(b'/') 3226 got_user, got_group = got.split(b'/') 3227 self.assertEqual(got_group, exp_group) 3228 self.assertRegex(got_user, b'[0-9]+') 3229 elif attr_names == {'gname'} and expected.endswith( 3230 (b'/tarfile', b'/users', b'/bar')): 3231 exp_user, exp_group = expected.split(b'/') 3232 got_user, got_group = got.split(b'/') 3233 self.assertEqual(got_user, exp_user) 3234 self.assertRegex(got_group, b'[0-9]+') 3235 elif attr_names == {'uid'} and expected.startswith( 3236 (b'1000/')): 3237 exp_user, exp_group = expected.split(b'/') 3238 got_user, got_group = got.split(b'/') 3239 self.assertEqual(got_group, exp_group) 3240 self.assertEqual(got_user, b'None') 3241 elif attr_names == {'gid'} and expected.endswith((b'/100')): 3242 exp_user, exp_group = expected.split(b'/') 3243 got_user, got_group = got.split(b'/') 3244 self.assertEqual(got_user, exp_user) 3245 self.assertEqual(got_group, b'None') 3246 elif attr_names == {'uid', 'uname'} and expected.startswith( 3247 (b'tarfile/', b'lars/', b'foo/', b'1000/')): 3248 exp_user, exp_group = expected.split(b'/') 3249 got_user, got_group = got.split(b'/') 3250 self.assertEqual(got_group, exp_group) 3251 self.assertEqual(got_user, b'None') 3252 elif attr_names == {'gname', 'gid'} and expected.endswith( 3253 (b'/tarfile', b'/users', b'/bar', b'/100')): 3254 exp_user, exp_group = expected.split(b'/') 3255 got_user, got_group = got.split(b'/') 3256 self.assertEqual(got_user, exp_user) 3257 self.assertEqual(got_group, b'None') 3258 else: 3259 # In other cases the output should be the same 3260 self.assertEqual(expected, got) 3261 3262def _filemode_to_int(mode): 3263 """Inverse of `stat.filemode` (for permission bits) 3264 3265 Using mode strings rather than numbers makes the later tests more readable. 3266 """ 3267 str_mode = mode[1:] 3268 result = ( 3269 {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] 3270 | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] 3271 | {'x': stat.S_IXUSR, '-': 0, 3272 's': stat.S_IXUSR | stat.S_ISUID, 3273 'S': stat.S_ISUID}[str_mode[2]] 3274 | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] 3275 | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] 3276 | {'x': stat.S_IXGRP, '-': 0, 3277 's': stat.S_IXGRP | stat.S_ISGID, 3278 'S': stat.S_ISGID}[str_mode[5]] 3279 | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] 3280 | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] 3281 | {'x': stat.S_IXOTH, '-': 0, 3282 't': stat.S_IXOTH | stat.S_ISVTX, 3283 'T': stat.S_ISVTX}[str_mode[8]] 3284 ) 3285 # check we did this right 3286 assert stat.filemode(result)[1:] == mode[1:] 3287 3288 return result 3289 3290class ArchiveMaker: 3291 """Helper to create a tar file with specific contents 3292 3293 Usage: 3294 3295 with ArchiveMaker() as t: 3296 t.add('filename', ...) 3297 3298 with t.open() as tar: 3299 ... # `tar` is now a TarFile with 'filename' in it! 3300 """ 3301 def __init__(self): 3302 self.bio = io.BytesIO() 3303 3304 def __enter__(self): 3305 self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) 3306 return self 3307 3308 def __exit__(self, *exc): 3309 self.tar_w.close() 3310 self.contents = self.bio.getvalue() 3311 self.bio = None 3312 3313 def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, 3314 mode=None, size=None, content=None, **kwargs): 3315 """Add a member to the test archive. Call within `with`. 3316 3317 Provides many shortcuts: 3318 - default `type` is based on symlink_to, hardlink_to, and trailing `/` 3319 in name (which is stripped) 3320 - size & content defaults are based on each other 3321 - content can be str or bytes 3322 - mode should be textual ('-rwxrwxrwx') 3323 3324 (add more! this is unstable internal test-only API) 3325 """ 3326 name = str(name) 3327 tarinfo = tarfile.TarInfo(name).replace(**kwargs) 3328 if content is not None: 3329 if isinstance(content, str): 3330 content = content.encode() 3331 size = len(content) 3332 if size is not None: 3333 tarinfo.size = size 3334 if content is None: 3335 content = bytes(tarinfo.size) 3336 if mode: 3337 tarinfo.mode = _filemode_to_int(mode) 3338 if symlink_to is not None: 3339 type = tarfile.SYMTYPE 3340 tarinfo.linkname = str(symlink_to) 3341 if hardlink_to is not None: 3342 type = tarfile.LNKTYPE 3343 tarinfo.linkname = str(hardlink_to) 3344 if name.endswith('/') and type is None: 3345 type = tarfile.DIRTYPE 3346 if type is not None: 3347 tarinfo.type = type 3348 if tarinfo.isreg(): 3349 fileobj = io.BytesIO(content) 3350 else: 3351 fileobj = None 3352 self.tar_w.addfile(tarinfo, fileobj) 3353 3354 def open(self, **kwargs): 3355 """Open the resulting archive as TarFile. Call after `with`.""" 3356 bio = io.BytesIO(self.contents) 3357 return tarfile.open(fileobj=bio, **kwargs) 3358 3359# Under WASI, `os_helper.can_symlink` is False to make 3360# `skip_unless_symlink` skip symlink tests. " 3361# But in the following tests we use can_symlink to *determine* which 3362# behavior is expected. 3363# Like other symlink tests, skip these on WASI for now. 3364if support.is_wasi: 3365 def symlink_test(f): 3366 return unittest.skip("WASI: Skip symlink test for now")(f) 3367else: 3368 def symlink_test(f): 3369 return f 3370 3371 3372class TestExtractionFilters(unittest.TestCase): 3373 3374 # A temporary directory for the extraction results. 3375 # All files that "escape" the destination path should still end 3376 # up in this directory. 3377 outerdir = pathlib.Path(TEMPDIR) / 'outerdir' 3378 3379 # The destination for the extraction, within `outerdir` 3380 destdir = outerdir / 'dest' 3381 3382 @contextmanager 3383 def check_context(self, tar, filter, *, check_flag=True): 3384 """Extracts `tar` to `self.destdir` and allows checking the result 3385 3386 If an error occurs, it must be checked using `expect_exception` 3387 3388 Otherwise, all resulting files must be checked using `expect_file`, 3389 except the destination directory itself and parent directories of 3390 other files. 3391 When checking directories, do so before their contents. 3392 3393 A file called 'flag' is made in outerdir (i.e. outside destdir) 3394 before extraction; it should not be altered nor should its contents 3395 be read/copied. 3396 """ 3397 with os_helper.temp_dir(self.outerdir): 3398 flag_path = self.outerdir / 'flag' 3399 flag_path.write_text('capture me') 3400 try: 3401 tar.extractall(self.destdir, filter=filter) 3402 except Exception as exc: 3403 self.raised_exception = exc 3404 self.reraise_exception = True 3405 self.expected_paths = set() 3406 else: 3407 self.raised_exception = None 3408 self.reraise_exception = False 3409 self.expected_paths = set(self.outerdir.glob('**/*')) 3410 self.expected_paths.discard(self.destdir) 3411 self.expected_paths.discard(flag_path) 3412 try: 3413 yield self 3414 finally: 3415 tar.close() 3416 if self.reraise_exception: 3417 raise self.raised_exception 3418 self.assertEqual(self.expected_paths, set()) 3419 if check_flag: 3420 self.assertEqual(flag_path.read_text(), 'capture me') 3421 else: 3422 assert filter == 'fully_trusted' 3423 3424 def expect_file(self, name, type=None, symlink_to=None, mode=None, 3425 size=None, content=None): 3426 """Check a single file. See check_context.""" 3427 if self.raised_exception: 3428 raise self.raised_exception 3429 # use normpath() rather than resolve() so we don't follow symlinks 3430 path = pathlib.Path(os.path.normpath(self.destdir / name)) 3431 self.assertIn(path, self.expected_paths) 3432 self.expected_paths.remove(path) 3433 if mode is not None and os_helper.can_chmod(): 3434 got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) 3435 self.assertEqual(got, mode) 3436 if type is None and isinstance(name, str) and name.endswith('/'): 3437 type = tarfile.DIRTYPE 3438 if symlink_to is not None: 3439 got = (self.destdir / name).readlink() 3440 expected = pathlib.Path(symlink_to) 3441 # The symlink might be the same (textually) as what we expect, 3442 # but some systems change the link to an equivalent path, so 3443 # we fall back to samefile(). 3444 try: 3445 if expected != got: 3446 self.assertTrue(got.samefile(expected)) 3447 except Exception as e: 3448 # attach a note, so it's shown even if `samefile` fails 3449 e.add_note(f'{expected=}, {got=}') 3450 raise 3451 elif type == tarfile.REGTYPE or type is None: 3452 self.assertTrue(path.is_file()) 3453 elif type == tarfile.DIRTYPE: 3454 self.assertTrue(path.is_dir()) 3455 elif type == tarfile.FIFOTYPE: 3456 self.assertTrue(path.is_fifo()) 3457 elif type == tarfile.SYMTYPE: 3458 self.assertTrue(path.is_symlink()) 3459 else: 3460 raise NotImplementedError(type) 3461 if size is not None: 3462 self.assertEqual(path.stat().st_size, size) 3463 if content is not None: 3464 self.assertEqual(path.read_text(), content) 3465 for parent in path.parents: 3466 self.expected_paths.discard(parent) 3467 3468 def expect_any_tree(self, name): 3469 """Check a directory; forget about its contents.""" 3470 tree_path = (self.destdir / name).resolve() 3471 self.expect_file(tree_path, type=tarfile.DIRTYPE) 3472 self.expected_paths = { 3473 p for p in self.expected_paths 3474 if tree_path not in p.parents 3475 } 3476 3477 def expect_exception(self, exc_type, message_re='.'): 3478 with self.assertRaisesRegex(exc_type, message_re): 3479 if self.raised_exception is not None: 3480 raise self.raised_exception 3481 self.reraise_exception = False 3482 return self.raised_exception 3483 3484 def test_benign_file(self): 3485 with ArchiveMaker() as arc: 3486 arc.add('benign.txt') 3487 for filter in 'fully_trusted', 'tar', 'data': 3488 with self.check_context(arc.open(), filter): 3489 self.expect_file('benign.txt') 3490 3491 def test_absolute(self): 3492 # Test handling a member with an absolute path 3493 # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives 3494 with ArchiveMaker() as arc: 3495 arc.add(self.outerdir / 'escaped.evil') 3496 3497 with self.check_context(arc.open(), 'fully_trusted'): 3498 self.expect_file('../escaped.evil') 3499 3500 for filter in 'tar', 'data': 3501 with self.check_context(arc.open(), filter): 3502 if str(self.outerdir).startswith('/'): 3503 # We strip leading slashes, as e.g. GNU tar does 3504 # (without --absolute-filenames). 3505 outerdir_stripped = str(self.outerdir).lstrip('/') 3506 self.expect_file(f'{outerdir_stripped}/escaped.evil') 3507 else: 3508 # On this system, absolute paths don't have leading 3509 # slashes. 3510 # So, there's nothing to strip. We refuse to unpack 3511 # to an absolute path, nonetheless. 3512 self.expect_exception( 3513 tarfile.AbsolutePathError, 3514 """['"].*escaped.evil['"] has an absolute path""") 3515 3516 @symlink_test 3517 def test_parent_symlink(self): 3518 # Test interplaying symlinks 3519 # Inspired by 'dirsymlink2a' in jwilk/traversal-archives 3520 with ArchiveMaker() as arc: 3521 arc.add('current', symlink_to='.') 3522 arc.add('parent', symlink_to='current/..') 3523 arc.add('parent/evil') 3524 3525 if os_helper.can_symlink(): 3526 with self.check_context(arc.open(), 'fully_trusted'): 3527 if self.raised_exception is not None: 3528 # Windows will refuse to create a file that's a symlink to itself 3529 # (and tarfile doesn't swallow that exception) 3530 self.expect_exception(FileExistsError) 3531 # The other cases will fail with this error too. 3532 # Skip the rest of this test. 3533 return 3534 else: 3535 self.expect_file('current', symlink_to='.') 3536 self.expect_file('parent', symlink_to='current/..') 3537 self.expect_file('../evil') 3538 3539 with self.check_context(arc.open(), 'tar'): 3540 self.expect_exception( 3541 tarfile.OutsideDestinationError, 3542 """'parent/evil' would be extracted to ['"].*evil['"], """ 3543 + "which is outside the destination") 3544 3545 with self.check_context(arc.open(), 'data'): 3546 self.expect_exception( 3547 tarfile.LinkOutsideDestinationError, 3548 """'parent' would link to ['"].*outerdir['"], """ 3549 + "which is outside the destination") 3550 3551 else: 3552 # No symlink support. The symlinks are ignored. 3553 with self.check_context(arc.open(), 'fully_trusted'): 3554 self.expect_file('parent/evil') 3555 with self.check_context(arc.open(), 'tar'): 3556 self.expect_file('parent/evil') 3557 with self.check_context(arc.open(), 'data'): 3558 self.expect_file('parent/evil') 3559 3560 @symlink_test 3561 @os_helper.skip_unless_symlink 3562 def test_realpath_limit_attack(self): 3563 # (CVE-2025-4517) 3564 3565 with ArchiveMaker() as arc: 3566 # populate the symlinks and dirs that expand in os.path.realpath() 3567 # The component length is chosen so that in common cases, the unexpanded 3568 # path fits in PATH_MAX, but it overflows when the final symlink 3569 # is expanded 3570 steps = "abcdefghijklmnop" 3571 if sys.platform == 'win32': 3572 component = 'd' * 25 3573 elif 'PC_PATH_MAX' in os.pathconf_names: 3574 max_path_len = os.pathconf(self.outerdir.parent, "PC_PATH_MAX") 3575 path_sep_len = 1 3576 dest_len = len(str(self.destdir)) + path_sep_len 3577 component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len) 3578 component = 'd' * component_len 3579 else: 3580 raise NotImplementedError("Need to guess component length for {sys.platform}") 3581 path = "" 3582 step_path = "" 3583 for i in steps: 3584 arc.add(os.path.join(path, component), type=tarfile.DIRTYPE, 3585 mode='drwxrwxrwx') 3586 arc.add(os.path.join(path, i), symlink_to=component) 3587 path = os.path.join(path, component) 3588 step_path = os.path.join(step_path, i) 3589 # create the final symlink that exceeds PATH_MAX and simply points 3590 # to the top dir. 3591 # this link will never be expanded by 3592 # os.path.realpath(strict=False), nor anything after it. 3593 linkpath = os.path.join(*steps, "l"*254) 3594 parent_segments = [".."] * len(steps) 3595 arc.add(linkpath, symlink_to=os.path.join(*parent_segments)) 3596 # make a symlink outside to keep the tar command happy 3597 arc.add("escape", symlink_to=os.path.join(linkpath, "..")) 3598 # use the symlinks above, that are not checked, to create a hardlink 3599 # to a file outside of the destination path 3600 arc.add("flaglink", hardlink_to=os.path.join("escape", "flag")) 3601 # now that we have the hardlink we can overwrite the file 3602 arc.add("flaglink", content='overwrite') 3603 # we can also create new files as well! 3604 arc.add("escape/newfile", content='new') 3605 3606 with (self.subTest('fully_trusted'), 3607 self.check_context(arc.open(), filter='fully_trusted', 3608 check_flag=False)): 3609 if sys.platform == 'win32': 3610 self.expect_exception((FileNotFoundError, FileExistsError)) 3611 elif self.raised_exception: 3612 # Cannot symlink/hardlink: tarfile falls back to getmember() 3613 self.expect_exception(KeyError) 3614 # Otherwise, this block should never enter. 3615 else: 3616 self.expect_any_tree(component) 3617 self.expect_file('flaglink', content='overwrite') 3618 self.expect_file('../newfile', content='new') 3619 self.expect_file('escape', type=tarfile.SYMTYPE) 3620 self.expect_file('a', symlink_to=component) 3621 3622 for filter in 'tar', 'data': 3623 with self.subTest(filter), self.check_context(arc.open(), filter=filter): 3624 exc = self.expect_exception((OSError, KeyError)) 3625 if isinstance(exc, OSError): 3626 if sys.platform == 'win32': 3627 # 3: ERROR_PATH_NOT_FOUND 3628 # 5: ERROR_ACCESS_DENIED 3629 # 206: ERROR_FILENAME_EXCED_RANGE 3630 self.assertIn(exc.winerror, (3, 5, 206)) 3631 else: 3632 self.assertEqual(exc.errno, errno.ENAMETOOLONG) 3633 3634 @symlink_test 3635 def test_parent_symlink2(self): 3636 # Test interplaying symlinks 3637 # Inspired by 'dirsymlink2b' in jwilk/traversal-archives 3638 with ArchiveMaker() as arc: 3639 arc.add('current', symlink_to='.') 3640 arc.add('current/parent', symlink_to='..') 3641 arc.add('parent/evil') 3642 3643 with self.check_context(arc.open(), 'fully_trusted'): 3644 if os_helper.can_symlink(): 3645 self.expect_file('current', symlink_to='.') 3646 self.expect_file('parent', symlink_to='..') 3647 self.expect_file('../evil') 3648 else: 3649 self.expect_file('current/') 3650 self.expect_file('parent/evil') 3651 3652 with self.check_context(arc.open(), 'tar'): 3653 if os_helper.can_symlink(): 3654 self.expect_exception( 3655 tarfile.OutsideDestinationError, 3656 "'parent/evil' would be extracted to " 3657 + """['"].*evil['"], which is outside """ 3658 + "the destination") 3659 else: 3660 self.expect_file('current/') 3661 self.expect_file('parent/evil') 3662 3663 with self.check_context(arc.open(), 'data'): 3664 self.expect_exception( 3665 tarfile.LinkOutsideDestinationError, 3666 """'current/parent' would link to ['"].*['"], """ 3667 + "which is outside the destination") 3668 3669 @symlink_test 3670 def test_absolute_symlink(self): 3671 # Test symlink to an absolute path 3672 # Inspired by 'dirsymlink' in jwilk/traversal-archives 3673 with ArchiveMaker() as arc: 3674 arc.add('parent', symlink_to=self.outerdir) 3675 arc.add('parent/evil') 3676 3677 with self.check_context(arc.open(), 'fully_trusted'): 3678 if os_helper.can_symlink(): 3679 self.expect_file('parent', symlink_to=self.outerdir) 3680 self.expect_file('../evil') 3681 else: 3682 self.expect_file('parent/evil') 3683 3684 with self.check_context(arc.open(), 'tar'): 3685 if os_helper.can_symlink(): 3686 self.expect_exception( 3687 tarfile.OutsideDestinationError, 3688 "'parent/evil' would be extracted to " 3689 + """['"].*evil['"], which is outside """ 3690 + "the destination") 3691 else: 3692 self.expect_file('parent/evil') 3693 3694 with self.check_context(arc.open(), 'data'): 3695 self.expect_exception( 3696 tarfile.AbsoluteLinkError, 3697 "'parent' is a symlink to an absolute path") 3698 3699 @symlink_test 3700 def test_sly_relative0(self): 3701 # Inspired by 'relative0' in jwilk/traversal-archives 3702 with ArchiveMaker() as arc: 3703 arc.add('../moo', symlink_to='..//tmp/moo') 3704 3705 try: 3706 with self.check_context(arc.open(), filter='fully_trusted'): 3707 if os_helper.can_symlink(): 3708 if isinstance(self.raised_exception, FileExistsError): 3709 # XXX TarFile happens to fail creating a parent 3710 # directory. 3711 # This might be a bug, but fixing it would hurt 3712 # security. 3713 # Note that e.g. GNU `tar` rejects '..' components, 3714 # so you could argue this is an invalid archive and we 3715 # just raise an bad type of exception. 3716 self.expect_exception(FileExistsError) 3717 else: 3718 self.expect_file('../moo', symlink_to='..//tmp/moo') 3719 else: 3720 # The symlink can't be extracted and is ignored 3721 pass 3722 except FileExistsError: 3723 pass 3724 3725 for filter in 'tar', 'data': 3726 with self.check_context(arc.open(), filter): 3727 self.expect_exception( 3728 tarfile.OutsideDestinationError, 3729 "'../moo' would be extracted to " 3730 + "'.*moo', which is outside " 3731 + "the destination") 3732 3733 @symlink_test 3734 def test_sly_relative2(self): 3735 # Inspired by 'relative2' in jwilk/traversal-archives 3736 with ArchiveMaker() as arc: 3737 arc.add('tmp/') 3738 arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') 3739 3740 with self.check_context(arc.open(), 'fully_trusted'): 3741 self.expect_file('tmp', type=tarfile.DIRTYPE) 3742 if os_helper.can_symlink(): 3743 self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') 3744 3745 for filter in 'tar', 'data': 3746 with self.check_context(arc.open(), filter): 3747 self.expect_exception( 3748 tarfile.OutsideDestinationError, 3749 "'tmp/../../moo' would be extracted to " 3750 + """['"].*moo['"], which is outside the """ 3751 + "destination") 3752 3753 @symlink_test 3754 def test_deep_symlink(self): 3755 # Test that symlinks and hardlinks inside a directory 3756 # point to the correct file (`target` of size 3). 3757 # If links aren't supported we get a copy of the file. 3758 with ArchiveMaker() as arc: 3759 arc.add('targetdir/target', size=3) 3760 # a hardlink's linkname is relative to the archive 3761 arc.add('linkdir/hardlink', hardlink_to=os.path.join( 3762 'targetdir', 'target')) 3763 # a symlink's linkname is relative to the link's directory 3764 arc.add('linkdir/symlink', symlink_to=os.path.join( 3765 '..', 'targetdir', 'target')) 3766 3767 for filter in 'tar', 'data', 'fully_trusted': 3768 with self.check_context(arc.open(), filter): 3769 self.expect_file('targetdir/target', size=3) 3770 self.expect_file('linkdir/hardlink', size=3) 3771 if os_helper.can_symlink(): 3772 self.expect_file('linkdir/symlink', size=3, 3773 symlink_to='../targetdir/target') 3774 else: 3775 self.expect_file('linkdir/symlink', size=3) 3776 3777 @symlink_test 3778 def test_chains(self): 3779 # Test chaining of symlinks/hardlinks. 3780 # Symlinks are created before the files they point to. 3781 with ArchiveMaker() as arc: 3782 arc.add('linkdir/symlink', symlink_to='hardlink') 3783 arc.add('symlink2', symlink_to=os.path.join( 3784 'linkdir', 'hardlink2')) 3785 arc.add('targetdir/target', size=3) 3786 arc.add('linkdir/hardlink', hardlink_to=os.path.join('targetdir', 'target')) 3787 arc.add('linkdir/hardlink2', hardlink_to=os.path.join('linkdir', 'symlink')) 3788 3789 for filter in 'tar', 'data', 'fully_trusted': 3790 with self.check_context(arc.open(), filter): 3791 self.expect_file('targetdir/target', size=3) 3792 self.expect_file('linkdir/hardlink', size=3) 3793 self.expect_file('linkdir/hardlink2', size=3) 3794 if os_helper.can_symlink(): 3795 self.expect_file('linkdir/symlink', size=3, 3796 symlink_to='hardlink') 3797 self.expect_file('symlink2', size=3, 3798 symlink_to='linkdir/hardlink2') 3799 else: 3800 self.expect_file('linkdir/symlink', size=3) 3801 self.expect_file('symlink2', size=3) 3802 3803 @symlink_test 3804 def test_sneaky_hardlink_fallback(self): 3805 # (CVE-2025-4330) 3806 # Test that when hardlink extraction falls back to extracting members 3807 # from the archive, the extracted member is (re-)filtered. 3808 with ArchiveMaker() as arc: 3809 # Create a directory structure so the c/escape symlink stays 3810 # inside the path 3811 arc.add("a/t/dummy") 3812 # Create b/ directory 3813 arc.add("b/") 3814 # Point "c" to the bottom of the tree in "a" 3815 arc.add("c", symlink_to=os.path.join("a", "t")) 3816 # link to non-existant location under "a" 3817 arc.add("c/escape", symlink_to=os.path.join("..", "..", 3818 "link_here")) 3819 # Move "c" to point to "b" ("c/escape" no longer exists) 3820 arc.add("c", symlink_to="b") 3821 # Attempt to create a hard link to "c/escape". Since it doesn't 3822 # exist it will attempt to extract "cescape" but at "boom". 3823 arc.add("boom", hardlink_to=os.path.join("c", "escape")) 3824 3825 with self.check_context(arc.open(), 'data'): 3826 if not os_helper.can_symlink(): 3827 # When 'c/escape' is extracted, 'c' is a regular 3828 # directory, and 'c/escape' *would* point outside 3829 # the destination if symlinks were allowed. 3830 self.expect_exception( 3831 tarfile.LinkOutsideDestinationError) 3832 elif sys.platform == "win32": 3833 # On Windows, 'c/escape' points outside the destination 3834 self.expect_exception(tarfile.LinkOutsideDestinationError) 3835 else: 3836 e = self.expect_exception( 3837 tarfile.LinkFallbackError, 3838 "link 'boom' would be extracted as a copy of " 3839 + "'c/escape', which was rejected") 3840 self.assertIsInstance(e.__cause__, 3841 tarfile.LinkOutsideDestinationError) 3842 for filter in 'tar', 'fully_trusted': 3843 with self.subTest(filter), self.check_context(arc.open(), filter): 3844 if not os_helper.can_symlink(): 3845 self.expect_file("a/t/dummy") 3846 self.expect_file("b/") 3847 self.expect_file("c/") 3848 else: 3849 self.expect_file("a/t/dummy") 3850 self.expect_file("b/") 3851 self.expect_file("a/t/escape", symlink_to='../../link_here') 3852 self.expect_file("boom", symlink_to='../../link_here') 3853 self.expect_file("c", symlink_to='b') 3854 3855 @symlink_test 3856 def test_exfiltration_via_symlink(self): 3857 # (CVE-2025-4138) 3858 # Test changing symlinks that result in a symlink pointing outside 3859 # the extraction directory, unless prevented by 'data' filter's 3860 # normalization. 3861 with ArchiveMaker() as arc: 3862 arc.add("escape", symlink_to=os.path.join('link', 'link', '..', '..', 'link-here')) 3863 arc.add("link", symlink_to='./') 3864 3865 for filter in 'tar', 'data', 'fully_trusted': 3866 with self.check_context(arc.open(), filter): 3867 if os_helper.can_symlink(): 3868 self.expect_file("link", symlink_to='./') 3869 if filter == 'data': 3870 self.expect_file("escape", symlink_to='link-here') 3871 else: 3872 self.expect_file("escape", 3873 symlink_to='link/link/../../link-here') 3874 else: 3875 # Nothing is extracted. 3876 pass 3877 3878 @symlink_test 3879 def test_chmod_outside_dir(self): 3880 # (CVE-2024-12718) 3881 # Test that members used for delayed updates of directory metadata 3882 # are (re-)filtered. 3883 with ArchiveMaker() as arc: 3884 # "pwn" is a veeeery innocent symlink: 3885 arc.add("a/pwn", symlink_to='.') 3886 # But now "pwn" is also a directory, so it's scheduled to have its 3887 # metadata updated later: 3888 arc.add("a/pwn/", mode='drwxrwxrwx') 3889 # Oops, "pwn" is not so innocent any more: 3890 arc.add("a/pwn", symlink_to='x/../') 3891 # Newly created symlink points to the dest dir, 3892 # so it's OK for the "data" filter. 3893 arc.add('a/x', symlink_to=('../')) 3894 # But now "pwn" points outside the dest dir 3895 3896 for filter in 'tar', 'data', 'fully_trusted': 3897 with self.check_context(arc.open(), filter) as cc: 3898 if not os_helper.can_symlink(): 3899 self.expect_file("a/pwn/") 3900 elif filter == 'data': 3901 self.expect_file("a/x", symlink_to='../') 3902 self.expect_file("a/pwn", symlink_to='.') 3903 else: 3904 self.expect_file("a/x", symlink_to='../') 3905 self.expect_file("a/pwn", symlink_to='x/../') 3906 if sys.platform != "win32": 3907 st_mode = cc.outerdir.stat().st_mode 3908 self.assertNotEqual(st_mode & 0o777, 0o777) 3909 3910 def test_link_fallback_normalizes(self): 3911 # Make sure hardlink fallbacks work for non-normalized paths for all 3912 # filters 3913 with ArchiveMaker() as arc: 3914 arc.add("dir/") 3915 arc.add("dir/../afile") 3916 arc.add("link1", hardlink_to='dir/../afile') 3917 arc.add("link2", hardlink_to='dir/../dir/../afile') 3918 3919 for filter in 'tar', 'data', 'fully_trusted': 3920 with self.check_context(arc.open(), filter) as cc: 3921 self.expect_file("dir/") 3922 self.expect_file("afile") 3923 self.expect_file("link1") 3924 self.expect_file("link2") 3925 3926 def test_modes(self): 3927 # Test how file modes are extracted 3928 # (Note that the modes are ignored on platforms without working chmod) 3929 with ArchiveMaker() as arc: 3930 arc.add('all_bits', mode='?rwsrwsrwt') 3931 arc.add('perm_bits', mode='?rwxrwxrwx') 3932 arc.add('exec_group_other', mode='?rw-rwxrwx') 3933 arc.add('read_group_only', mode='?---r-----') 3934 arc.add('no_bits', mode='?---------') 3935 arc.add('dir/', mode='?---rwsrwt') 3936 3937 # On some systems, setting the sticky bit is a no-op. 3938 # Check if that's the case. 3939 tmp_filename = os.path.join(TEMPDIR, "tmp.file") 3940 with open(tmp_filename, 'w'): 3941 pass 3942 os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) 3943 have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) 3944 os.unlink(tmp_filename) 3945 3946 os.mkdir(tmp_filename) 3947 os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) 3948 have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) 3949 os.rmdir(tmp_filename) 3950 3951 with self.check_context(arc.open(), 'fully_trusted'): 3952 if have_sticky_files: 3953 self.expect_file('all_bits', mode='?rwsrwsrwt') 3954 else: 3955 self.expect_file('all_bits', mode='?rwsrwsrwx') 3956 self.expect_file('perm_bits', mode='?rwxrwxrwx') 3957 self.expect_file('exec_group_other', mode='?rw-rwxrwx') 3958 self.expect_file('read_group_only', mode='?---r-----') 3959 self.expect_file('no_bits', mode='?---------') 3960 if have_sticky_dirs: 3961 self.expect_file('dir/', mode='?---rwsrwt') 3962 else: 3963 self.expect_file('dir/', mode='?---rwsrwx') 3964 3965 with self.check_context(arc.open(), 'tar'): 3966 self.expect_file('all_bits', mode='?rwxr-xr-x') 3967 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3968 self.expect_file('exec_group_other', mode='?rw-r-xr-x') 3969 self.expect_file('read_group_only', mode='?---r-----') 3970 self.expect_file('no_bits', mode='?---------') 3971 self.expect_file('dir/', mode='?---r-xr-x') 3972 3973 with self.check_context(arc.open(), 'data'): 3974 normal_dir_mode = stat.filemode(stat.S_IMODE( 3975 self.outerdir.stat().st_mode)) 3976 self.expect_file('all_bits', mode='?rwxr-xr-x') 3977 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3978 self.expect_file('exec_group_other', mode='?rw-r--r--') 3979 self.expect_file('read_group_only', mode='?rw-r-----') 3980 self.expect_file('no_bits', mode='?rw-------') 3981 self.expect_file('dir/', mode=normal_dir_mode) 3982 3983 def test_pipe(self): 3984 # Test handling of a special file 3985 with ArchiveMaker() as arc: 3986 arc.add('foo', type=tarfile.FIFOTYPE) 3987 3988 for filter in 'fully_trusted', 'tar': 3989 with self.check_context(arc.open(), filter): 3990 if hasattr(os, 'mkfifo'): 3991 self.expect_file('foo', type=tarfile.FIFOTYPE) 3992 else: 3993 # The pipe can't be extracted and is skipped. 3994 pass 3995 3996 with self.check_context(arc.open(), 'data'): 3997 self.expect_exception( 3998 tarfile.SpecialFileError, 3999 "'foo' is a special file") 4000 4001 def test_special_files(self): 4002 # Creating device files is tricky. Instead of attempting that let's 4003 # only check the filter result. 4004 for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: 4005 tarinfo = tarfile.TarInfo('foo') 4006 tarinfo.type = special_type 4007 trusted = tarfile.fully_trusted_filter(tarinfo, '') 4008 self.assertIs(trusted, tarinfo) 4009 tar = tarfile.tar_filter(tarinfo, '') 4010 self.assertEqual(tar.type, special_type) 4011 with self.assertRaises(tarfile.SpecialFileError) as cm: 4012 tarfile.data_filter(tarinfo, '') 4013 self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo) 4014 self.assertEqual(cm.exception.tarinfo.name, 'foo') 4015 4016 def test_fully_trusted_filter(self): 4017 # The 'fully_trusted' filter returns the original TarInfo objects. 4018 with tarfile.TarFile.open(tarname) as tar: 4019 for tarinfo in tar.getmembers(): 4020 filtered = tarfile.fully_trusted_filter(tarinfo, '') 4021 self.assertIs(filtered, tarinfo) 4022 4023 def test_tar_filter(self): 4024 # The 'tar' filter returns TarInfo objects with the same name/type. 4025 # (It can also fail for particularly "evil" input, but we don't have 4026 # that in the test archive.) 4027 with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar: 4028 for tarinfo in tar.getmembers(): 4029 filtered = tarfile.tar_filter(tarinfo, '') 4030 self.assertIs(filtered.name, tarinfo.name) 4031 self.assertIs(filtered.type, tarinfo.type) 4032 4033 def test_data_filter(self): 4034 # The 'data' filter either raises, or returns TarInfo with the same 4035 # name/type. 4036 with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar: 4037 for tarinfo in tar.getmembers(): 4038 try: 4039 filtered = tarfile.data_filter(tarinfo, '') 4040 except tarfile.FilterError: 4041 continue 4042 self.assertIs(filtered.name, tarinfo.name) 4043 self.assertIs(filtered.type, tarinfo.type) 4044 4045 def test_default_filter_warns_not(self): 4046 """Ensure the default filter does not warn (like in 3.12)""" 4047 with ArchiveMaker() as arc: 4048 arc.add('foo') 4049 with warnings_helper.check_no_warnings(self): 4050 with self.check_context(arc.open(), None): 4051 self.expect_file('foo') 4052 4053 def test_change_default_filter_on_instance(self): 4054 tar = tarfile.TarFile(tarname, 'r') 4055 def strict_filter(tarinfo, path): 4056 if tarinfo.name == 'ustar/regtype': 4057 return tarinfo 4058 else: 4059 return None 4060 tar.extraction_filter = strict_filter 4061 with self.check_context(tar, None): 4062 self.expect_file('ustar/regtype') 4063 4064 def test_change_default_filter_on_class(self): 4065 def strict_filter(tarinfo, path): 4066 if tarinfo.name == 'ustar/regtype': 4067 return tarinfo 4068 else: 4069 return None 4070 tar = tarfile.TarFile(tarname, 'r') 4071 with support.swap_attr(tarfile.TarFile, 'extraction_filter', 4072 staticmethod(strict_filter)): 4073 with self.check_context(tar, None): 4074 self.expect_file('ustar/regtype') 4075 4076 def test_change_default_filter_on_subclass(self): 4077 class TarSubclass(tarfile.TarFile): 4078 def extraction_filter(self, tarinfo, path): 4079 if tarinfo.name == 'ustar/regtype': 4080 return tarinfo 4081 else: 4082 return None 4083 4084 tar = TarSubclass(tarname, 'r') 4085 with self.check_context(tar, None): 4086 self.expect_file('ustar/regtype') 4087 4088 def test_change_default_filter_to_string(self): 4089 tar = tarfile.TarFile(tarname, 'r') 4090 tar.extraction_filter = 'data' 4091 with self.check_context(tar, None): 4092 self.expect_exception(TypeError) 4093 4094 def test_custom_filter(self): 4095 def custom_filter(tarinfo, path): 4096 self.assertIs(path, self.destdir) 4097 if tarinfo.name == 'move_this': 4098 return tarinfo.replace(name='moved') 4099 if tarinfo.name == 'ignore_this': 4100 return None 4101 return tarinfo 4102 4103 with ArchiveMaker() as arc: 4104 arc.add('move_this') 4105 arc.add('ignore_this') 4106 arc.add('keep') 4107 with self.check_context(arc.open(), custom_filter): 4108 self.expect_file('moved') 4109 self.expect_file('keep') 4110 4111 def test_bad_filter_name(self): 4112 with ArchiveMaker() as arc: 4113 arc.add('foo') 4114 with self.check_context(arc.open(), 'bad filter name'): 4115 self.expect_exception(ValueError) 4116 4117 def test_stateful_filter(self): 4118 # Stateful filters should be possible. 4119 # (This doesn't really test tarfile. Rather, it demonstrates 4120 # that third parties can implement a stateful filter.) 4121 class StatefulFilter: 4122 def __enter__(self): 4123 self.num_files_processed = 0 4124 return self 4125 4126 def __call__(self, tarinfo, path): 4127 try: 4128 tarinfo = tarfile.data_filter(tarinfo, path) 4129 except tarfile.FilterError: 4130 return None 4131 self.num_files_processed += 1 4132 return tarinfo 4133 4134 def __exit__(self, *exc_info): 4135 self.done = True 4136 4137 with ArchiveMaker() as arc: 4138 arc.add('good') 4139 arc.add('bad', symlink_to='/') 4140 arc.add('good') 4141 with StatefulFilter() as custom_filter: 4142 with self.check_context(arc.open(), custom_filter): 4143 self.expect_file('good') 4144 self.assertEqual(custom_filter.num_files_processed, 2) 4145 self.assertEqual(custom_filter.done, True) 4146 4147 def test_errorlevel(self): 4148 def extracterror_filter(tarinfo, path): 4149 raise tarfile.ExtractError('failed with ExtractError') 4150 def filtererror_filter(tarinfo, path): 4151 raise tarfile.FilterError('failed with FilterError') 4152 def oserror_filter(tarinfo, path): 4153 raise OSError('failed with OSError') 4154 def tarerror_filter(tarinfo, path): 4155 raise tarfile.TarError('failed with base TarError') 4156 def valueerror_filter(tarinfo, path): 4157 raise ValueError('failed with ValueError') 4158 4159 with ArchiveMaker() as arc: 4160 arc.add('file') 4161 4162 # If errorlevel is 0, errors affected by errorlevel are ignored 4163 4164 with self.check_context(arc.open(errorlevel=0), extracterror_filter): 4165 pass 4166 4167 with self.check_context(arc.open(errorlevel=0), filtererror_filter): 4168 pass 4169 4170 with self.check_context(arc.open(errorlevel=0), oserror_filter): 4171 pass 4172 4173 with self.check_context(arc.open(errorlevel=0), tarerror_filter): 4174 self.expect_exception(tarfile.TarError) 4175 4176 with self.check_context(arc.open(errorlevel=0), valueerror_filter): 4177 self.expect_exception(ValueError) 4178 4179 # If 1, all fatal errors are raised 4180 4181 with self.check_context(arc.open(errorlevel=1), extracterror_filter): 4182 pass 4183 4184 with self.check_context(arc.open(errorlevel=1), filtererror_filter): 4185 self.expect_exception(tarfile.FilterError) 4186 4187 with self.check_context(arc.open(errorlevel=1), oserror_filter): 4188 self.expect_exception(OSError) 4189 4190 with self.check_context(arc.open(errorlevel=1), tarerror_filter): 4191 self.expect_exception(tarfile.TarError) 4192 4193 with self.check_context(arc.open(errorlevel=1), valueerror_filter): 4194 self.expect_exception(ValueError) 4195 4196 # If 2, all non-fatal errors are raised as well. 4197 4198 with self.check_context(arc.open(errorlevel=2), extracterror_filter): 4199 self.expect_exception(tarfile.ExtractError) 4200 4201 with self.check_context(arc.open(errorlevel=2), filtererror_filter): 4202 self.expect_exception(tarfile.FilterError) 4203 4204 with self.check_context(arc.open(errorlevel=2), oserror_filter): 4205 self.expect_exception(OSError) 4206 4207 with self.check_context(arc.open(errorlevel=2), tarerror_filter): 4208 self.expect_exception(tarfile.TarError) 4209 4210 with self.check_context(arc.open(errorlevel=2), valueerror_filter): 4211 self.expect_exception(ValueError) 4212 4213 # We only handle ExtractionError, FilterError & OSError specially. 4214 4215 with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter): 4216 self.expect_exception(TypeError) # errorlevel is not int 4217 4218 4219class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase): 4220 testdir = os.path.join(TEMPDIR, "testoverwrite") 4221 4222 @classmethod 4223 def setUpClass(cls): 4224 p = cls.ar_with_file = os.path.join(TEMPDIR, 'tar-with-file.tar') 4225 cls.addClassCleanup(os_helper.unlink, p) 4226 with tarfile.open(p, 'w') as tar: 4227 t = tarfile.TarInfo('test') 4228 t.size = 10 4229 tar.addfile(t, io.BytesIO(b'newcontent')) 4230 4231 p = cls.ar_with_dir = os.path.join(TEMPDIR, 'tar-with-dir.tar') 4232 cls.addClassCleanup(os_helper.unlink, p) 4233 with tarfile.open(p, 'w') as tar: 4234 tar.addfile(tar.gettarinfo(os.curdir, 'test')) 4235 4236 p = os.path.join(TEMPDIR, 'tar-with-implicit-dir.tar') 4237 cls.ar_with_implicit_dir = p 4238 cls.addClassCleanup(os_helper.unlink, p) 4239 with tarfile.open(p, 'w') as tar: 4240 t = tarfile.TarInfo('test/file') 4241 t.size = 10 4242 tar.addfile(t, io.BytesIO(b'newcontent')) 4243 4244 def open(self, path): 4245 return tarfile.open(path, 'r') 4246 4247 def extractall(self, ar): 4248 ar.extractall(self.testdir, filter='fully_trusted') 4249 4250 4251class OffsetValidationTests(unittest.TestCase): 4252 tarname = tmpname 4253 invalid_posix_header = ( 4254 # name: 100 bytes 4255 tarfile.NUL * tarfile.LENGTH_NAME 4256 # mode, space, null terminator: 8 bytes 4257 + b"000755" + SPACE + tarfile.NUL 4258 # uid, space, null terminator: 8 bytes 4259 + b"000001" + SPACE + tarfile.NUL 4260 # gid, space, null terminator: 8 bytes 4261 + b"000001" + SPACE + tarfile.NUL 4262 # size, space: 12 bytes 4263 + b"\xff" * 11 + SPACE 4264 # mtime, space: 12 bytes 4265 + tarfile.NUL * 11 + SPACE 4266 # chksum: 8 bytes 4267 + b"0011407" + tarfile.NUL 4268 # type: 1 byte 4269 + tarfile.REGTYPE 4270 # linkname: 100 bytes 4271 + tarfile.NUL * tarfile.LENGTH_LINK 4272 # magic: 6 bytes, version: 2 bytes 4273 + tarfile.POSIX_MAGIC 4274 # uname: 32 bytes 4275 + tarfile.NUL * 32 4276 # gname: 32 bytes 4277 + tarfile.NUL * 32 4278 # devmajor, space, null terminator: 8 bytes 4279 + tarfile.NUL * 6 + SPACE + tarfile.NUL 4280 # devminor, space, null terminator: 8 bytes 4281 + tarfile.NUL * 6 + SPACE + tarfile.NUL 4282 # prefix: 155 bytes 4283 + tarfile.NUL * tarfile.LENGTH_PREFIX 4284 # padding: 12 bytes 4285 + tarfile.NUL * 12 4286 ) 4287 invalid_gnu_header = ( 4288 # name: 100 bytes 4289 tarfile.NUL * tarfile.LENGTH_NAME 4290 # mode, null terminator: 8 bytes 4291 + b"0000755" + tarfile.NUL 4292 # uid, null terminator: 8 bytes 4293 + b"0000001" + tarfile.NUL 4294 # gid, space, null terminator: 8 bytes 4295 + b"0000001" + tarfile.NUL 4296 # size, space: 12 bytes 4297 + b"\xff" * 11 + SPACE 4298 # mtime, space: 12 bytes 4299 + tarfile.NUL * 11 + SPACE 4300 # chksum: 8 bytes 4301 + b"0011327" + tarfile.NUL 4302 # type: 1 byte 4303 + tarfile.REGTYPE 4304 # linkname: 100 bytes 4305 + tarfile.NUL * tarfile.LENGTH_LINK 4306 # magic: 8 bytes 4307 + tarfile.GNU_MAGIC 4308 # uname: 32 bytes 4309 + tarfile.NUL * 32 4310 # gname: 32 bytes 4311 + tarfile.NUL * 32 4312 # devmajor, null terminator: 8 bytes 4313 + tarfile.NUL * 8 4314 # devminor, null terminator: 8 bytes 4315 + tarfile.NUL * 8 4316 # padding: 167 bytes 4317 + tarfile.NUL * 167 4318 ) 4319 invalid_v7_header = ( 4320 # name: 100 bytes 4321 tarfile.NUL * tarfile.LENGTH_NAME 4322 # mode, space, null terminator: 8 bytes 4323 + b"000755" + SPACE + tarfile.NUL 4324 # uid, space, null terminator: 8 bytes 4325 + b"000001" + SPACE + tarfile.NUL 4326 # gid, space, null terminator: 8 bytes 4327 + b"000001" + SPACE + tarfile.NUL 4328 # size, space: 12 bytes 4329 + b"\xff" * 11 + SPACE 4330 # mtime, space: 12 bytes 4331 + tarfile.NUL * 11 + SPACE 4332 # chksum: 8 bytes 4333 + b"0010070" + tarfile.NUL 4334 # type: 1 byte 4335 + tarfile.REGTYPE 4336 # linkname: 100 bytes 4337 + tarfile.NUL * tarfile.LENGTH_LINK 4338 # padding: 255 bytes 4339 + tarfile.NUL * 255 4340 ) 4341 valid_gnu_header = tarfile.TarInfo("filename").tobuf(tarfile.GNU_FORMAT) 4342 data_block = b"\xff" * tarfile.BLOCKSIZE 4343 4344 def _write_buffer(self, buffer): 4345 with open(self.tarname, "wb") as f: 4346 f.write(buffer) 4347 4348 def _get_members(self, ignore_zeros=None): 4349 with open(self.tarname, "rb") as f: 4350 with tarfile.open( 4351 mode="r", fileobj=f, ignore_zeros=ignore_zeros 4352 ) as tar: 4353 return tar.getmembers() 4354 4355 def _assert_raises_read_error_exception(self): 4356 with self.assertRaisesRegex( 4357 tarfile.ReadError, "file could not be opened successfully" 4358 ): 4359 self._get_members() 4360 4361 def test_invalid_offset_header_validations(self): 4362 for tar_format, invalid_header in ( 4363 ("posix", self.invalid_posix_header), 4364 ("gnu", self.invalid_gnu_header), 4365 ("v7", self.invalid_v7_header), 4366 ): 4367 with self.subTest(format=tar_format): 4368 self._write_buffer(invalid_header) 4369 self._assert_raises_read_error_exception() 4370 4371 def test_early_stop_at_invalid_offset_header(self): 4372 buffer = self.valid_gnu_header + self.invalid_gnu_header + self.valid_gnu_header 4373 self._write_buffer(buffer) 4374 members = self._get_members() 4375 self.assertEqual(len(members), 1) 4376 self.assertEqual(members[0].name, "filename") 4377 self.assertEqual(members[0].offset, 0) 4378 4379 def test_ignore_invalid_archive(self): 4380 # 3 invalid headers with their respective data 4381 buffer = (self.invalid_gnu_header + self.data_block) * 3 4382 self._write_buffer(buffer) 4383 members = self._get_members(ignore_zeros=True) 4384 self.assertEqual(len(members), 0) 4385 4386 def test_ignore_invalid_offset_headers(self): 4387 for first_block, second_block, expected_offset in ( 4388 ( 4389 (self.valid_gnu_header), 4390 (self.invalid_gnu_header + self.data_block), 4391 0, 4392 ), 4393 ( 4394 (self.invalid_gnu_header + self.data_block), 4395 (self.valid_gnu_header), 4396 1024, 4397 ), 4398 ): 4399 self._write_buffer(first_block + second_block) 4400 members = self._get_members(ignore_zeros=True) 4401 self.assertEqual(len(members), 1) 4402 self.assertEqual(members[0].name, "filename") 4403 self.assertEqual(members[0].offset, expected_offset) 4404 4405 4406def setUpModule(): 4407 os_helper.unlink(TEMPDIR) 4408 os.makedirs(TEMPDIR) 4409 4410 global testtarnames 4411 testtarnames = [tarname] 4412 with open(tarname, "rb") as fobj: 4413 data = fobj.read() 4414 4415 # Create compressed tarfiles. 4416 for c in GzipTest, Bz2Test, LzmaTest: 4417 if c.open: 4418 os_helper.unlink(c.tarname) 4419 testtarnames.append(c.tarname) 4420 with c.open(c.tarname, "wb") as tar: 4421 tar.write(data) 4422 4423def tearDownModule(): 4424 if os.path.exists(TEMPDIR): 4425 os_helper.rmtree(TEMPDIR) 4426 4427if __name__ == "__main__": 4428 unittest.main() 4429