1import errno 2import sys 3import os 4import io 5from hashlib import sha256 6from contextlib import contextmanager, ExitStack 7from random import Random 8import pathlib 9import shutil 10import re 11import warnings 12import stat 13 14import unittest 15import unittest.mock 16import tarfile 17 18from test import archiver_tests 19from test import support 20from test.support import os_helper 21from test.support import script_helper 22from test.support import warnings_helper 23 24# Check for our compression modules. 25try: 26 import gzip 27except ImportError: 28 gzip = None 29try: 30 import zlib 31except ImportError: 32 zlib = None 33try: 34 import bz2 35except ImportError: 36 bz2 = None 37try: 38 import lzma 39except ImportError: 40 lzma = None 41 42def sha256sum(data): 43 return sha256(data).hexdigest() 44 45TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir" 46tarextdir = TEMPDIR + '-extract-test' 47tarname = support.findfile("testtar.tar", subdir="archivetestdata") 48gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 49bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 50xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 51tmpname = os.path.join(TEMPDIR, "tmp.tar") 52dotlessname = os.path.join(TEMPDIR, "testtar") 53 54sha256_regtype = ( 55 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 56) 57sha256_sparse = ( 58 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 59) 60 61 62class TarTest: 63 tarname = tarname 64 suffix = '' 65 open = io.FileIO 66 taropen = tarfile.TarFile.taropen 67 68 @property 69 def mode(self): 70 return self.prefix + self.suffix 71 72@support.requires_gzip() 73class GzipTest: 74 tarname = gzipname 75 suffix = 'gz' 76 open = gzip.GzipFile if gzip else None 77 taropen = tarfile.TarFile.gzopen 78 79@support.requires_bz2() 80class Bz2Test: 81 tarname = bz2name 82 suffix = 'bz2' 83 open = bz2.BZ2File if bz2 else None 84 taropen = tarfile.TarFile.bz2open 85 86@support.requires_lzma() 87class LzmaTest: 88 tarname = xzname 89 suffix = 'xz' 90 open = lzma.LZMAFile if lzma else None 91 taropen = tarfile.TarFile.xzopen 92 93 94class ReadTest(TarTest): 95 96 prefix = "r:" 97 98 def setUp(self): 99 self.tar = tarfile.open(self.tarname, mode=self.mode, 100 encoding="iso8859-1") 101 102 def tearDown(self): 103 self.tar.close() 104 105class StreamModeTest(ReadTest): 106 107 # Only needs to change how the tarfile is opened to set 108 # stream mode 109 def setUp(self): 110 self.tar = tarfile.open(self.tarname, mode=self.mode, 111 encoding="iso8859-1", 112 stream=True) 113 114class UstarReadTest(ReadTest, unittest.TestCase): 115 116 def test_fileobj_regular_file(self): 117 tarinfo = self.tar.getmember("ustar/regtype") 118 with self.tar.extractfile(tarinfo) as fobj: 119 data = fobj.read() 120 self.assertEqual(len(data), tarinfo.size, 121 "regular file extraction failed") 122 self.assertEqual(sha256sum(data), sha256_regtype, 123 "regular file extraction failed") 124 125 def test_fileobj_readlines(self): 126 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 127 tarinfo = self.tar.getmember("ustar/regtype") 128 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 129 lines1 = fobj1.readlines() 130 131 with self.tar.extractfile(tarinfo) as fobj: 132 fobj2 = io.TextIOWrapper(fobj) 133 lines2 = fobj2.readlines() 134 self.assertEqual(lines1, lines2, 135 "fileobj.readlines() failed") 136 self.assertEqual(len(lines2), 114, 137 "fileobj.readlines() failed") 138 self.assertEqual(lines2[83], 139 "I will gladly admit that Python is not the fastest " 140 "running scripting language.\n", 141 "fileobj.readlines() failed") 142 143 def test_fileobj_iter(self): 144 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 145 tarinfo = self.tar.getmember("ustar/regtype") 146 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 147 lines1 = fobj1.readlines() 148 with self.tar.extractfile(tarinfo) as fobj2: 149 lines2 = list(io.TextIOWrapper(fobj2)) 150 self.assertEqual(lines1, lines2, 151 "fileobj.__iter__() failed") 152 153 def test_fileobj_seek(self): 154 self.tar.extract("ustar/regtype", TEMPDIR, 155 filter='data') 156 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 157 data = fobj.read() 158 159 tarinfo = self.tar.getmember("ustar/regtype") 160 with self.tar.extractfile(tarinfo) as fobj: 161 text = fobj.read() 162 fobj.seek(0) 163 self.assertEqual(0, fobj.tell(), 164 "seek() to file's start failed") 165 fobj.seek(2048, 0) 166 self.assertEqual(2048, fobj.tell(), 167 "seek() to absolute position failed") 168 fobj.seek(-1024, 1) 169 self.assertEqual(1024, fobj.tell(), 170 "seek() to negative relative position failed") 171 fobj.seek(1024, 1) 172 self.assertEqual(2048, fobj.tell(), 173 "seek() to positive relative position failed") 174 s = fobj.read(10) 175 self.assertEqual(s, data[2048:2058], 176 "read() after seek failed") 177 fobj.seek(0, 2) 178 self.assertEqual(tarinfo.size, fobj.tell(), 179 "seek() to file's end failed") 180 self.assertEqual(fobj.read(), b"", 181 "read() at file's end did not return empty string") 182 fobj.seek(-tarinfo.size, 2) 183 self.assertEqual(0, fobj.tell(), 184 "relative seek() to file's end failed") 185 fobj.seek(512) 186 s1 = fobj.readlines() 187 fobj.seek(512) 188 s2 = fobj.readlines() 189 self.assertEqual(s1, s2, 190 "readlines() after seek failed") 191 fobj.seek(0) 192 self.assertEqual(len(fobj.readline()), fobj.tell(), 193 "tell() after readline() failed") 194 fobj.seek(512) 195 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 196 "tell() after seek() and readline() failed") 197 fobj.seek(0) 198 line = fobj.readline() 199 self.assertEqual(fobj.read(), data[len(line):], 200 "read() after readline() failed") 201 202 def test_fileobj_text(self): 203 with self.tar.extractfile("ustar/regtype") as fobj: 204 fobj = io.TextIOWrapper(fobj) 205 data = fobj.read().encode("iso8859-1") 206 self.assertEqual(sha256sum(data), sha256_regtype) 207 try: 208 fobj.seek(100) 209 except AttributeError: 210 # Issue #13815: seek() complained about a missing 211 # flush() method. 212 self.fail("seeking failed in text mode") 213 214 # Test if symbolic and hard links are resolved by extractfile(). The 215 # test link members each point to a regular member whose data is 216 # supposed to be exported. 217 def _test_fileobj_link(self, lnktype, regtype): 218 with self.tar.extractfile(lnktype) as a, \ 219 self.tar.extractfile(regtype) as b: 220 self.assertEqual(a.name, b.name) 221 222 def test_fileobj_link1(self): 223 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 224 225 def test_fileobj_link2(self): 226 self._test_fileobj_link("./ustar/linktest2/lnktype", 227 "ustar/linktest1/regtype") 228 229 def test_fileobj_symlink1(self): 230 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 231 232 def test_fileobj_symlink2(self): 233 self._test_fileobj_link("./ustar/linktest2/symtype", 234 "ustar/linktest1/regtype") 235 236 def test_issue14160(self): 237 self._test_fileobj_link("symtype2", "ustar/regtype") 238 239 def test_add_dir_getmember(self): 240 # bpo-21987 241 self.add_dir_and_getmember('bar') 242 self.add_dir_and_getmember('a'*101) 243 244 @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"), 245 "Missing getuid or getgid implementation") 246 def add_dir_and_getmember(self, name): 247 def filter(tarinfo): 248 tarinfo.uid = tarinfo.gid = 100 249 return tarinfo 250 251 with os_helper.temp_cwd(): 252 with tarfile.open(tmpname, 'w') as tar: 253 tar.format = tarfile.USTAR_FORMAT 254 try: 255 os.mkdir(name) 256 tar.add(name, filter=filter) 257 finally: 258 os.rmdir(name) 259 with tarfile.open(tmpname) as tar: 260 self.assertEqual( 261 tar.getmember(name), 262 tar.getmember(name + '/') 263 ) 264 265class GzipUstarReadTest(GzipTest, UstarReadTest): 266 pass 267 268class Bz2UstarReadTest(Bz2Test, UstarReadTest): 269 pass 270 271class LzmaUstarReadTest(LzmaTest, UstarReadTest): 272 pass 273 274 275class ListTest(ReadTest, unittest.TestCase): 276 277 # Override setUp to use default encoding (UTF-8) 278 def setUp(self): 279 self.tar = tarfile.open(self.tarname, mode=self.mode) 280 281 def test_list(self): 282 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 283 with support.swap_attr(sys, 'stdout', tio): 284 self.tar.list(verbose=False) 285 out = tio.detach().getvalue() 286 self.assertIn(b'ustar/conttype', out) 287 self.assertIn(b'ustar/regtype', out) 288 self.assertIn(b'ustar/lnktype', out) 289 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 290 self.assertIn(b'./ustar/linktest2/symtype', out) 291 self.assertIn(b'./ustar/linktest2/lnktype', out) 292 # Make sure it puts trailing slash for directory 293 self.assertIn(b'ustar/dirtype/', out) 294 self.assertIn(b'ustar/dirtype-with-size/', out) 295 # Make sure it is able to print unencodable characters 296 def conv(b): 297 s = b.decode(self.tar.encoding, 'surrogateescape') 298 return s.encode('ascii', 'backslashreplace') 299 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 300 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 301 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 302 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 303 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 304 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 305 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 306 # Make sure it prints files separated by one newline without any 307 # 'ls -l'-like accessories if verbose flag is not being used 308 # ... 309 # ustar/conttype 310 # ustar/regtype 311 # ... 312 self.assertRegex(out, br'ustar/conttype ?\r?\n' 313 br'ustar/regtype ?\r?\n') 314 # Make sure it does not print the source of link without verbose flag 315 self.assertNotIn(b'link to', out) 316 self.assertNotIn(b'->', out) 317 318 def test_list_verbose(self): 319 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 320 with support.swap_attr(sys, 'stdout', tio): 321 self.tar.list(verbose=True) 322 out = tio.detach().getvalue() 323 # Make sure it prints files separated by one newline with 'ls -l'-like 324 # accessories if verbose flag is being used 325 # ... 326 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 327 # -rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 328 # drwxr-xr-x tarfile/tarfile 0 2003-01-05 15:19:43 ustar/dirtype/ 329 # ... 330 # 331 # Array of values to modify the regex below: 332 # ((file_type, file_permissions, file_length), ...) 333 type_perm_lengths = ( 334 (br'\?', b'rw-r--r--', b'7011'), (b'-', b'rw-r--r--', b'7011'), 335 (b'd', b'rwxr-xr-x', b'0'), (b'd', b'rwxr-xr-x', b'255'), 336 (br'\?', b'rw-r--r--', b'0'), (b'l', b'rwxrwxrwx', b'0'), 337 (b'b', b'rw-rw----', b'3,0'), (b'c', b'rw-rw-rw-', b'1,3'), 338 (b'p', b'rw-r--r--', b'0')) 339 self.assertRegex(out, b''.join( 340 [(tp + (br'%s tarfile/tarfile\s+%s ' % (perm, ln) + 341 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 342 br'ustar/\w+type[/>\sa-z-]*\n')) for tp, perm, ln 343 in type_perm_lengths])) 344 # Make sure it prints the source of link with verbose flag 345 self.assertIn(b'ustar/symtype -> regtype', out) 346 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 347 self.assertIn(b'./ustar/linktest2/lnktype link to ' 348 b'./ustar/linktest1/regtype', out) 349 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 350 (b'/123' * 125) + b'/longname', out) 351 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 352 (b'/123' * 125) + b'/longname', out) 353 354 def test_list_members(self): 355 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 356 def members(tar): 357 for tarinfo in tar.getmembers(): 358 if 'reg' in tarinfo.name: 359 yield tarinfo 360 with support.swap_attr(sys, 'stdout', tio): 361 self.tar.list(verbose=False, members=members(self.tar)) 362 out = tio.detach().getvalue() 363 self.assertIn(b'ustar/regtype', out) 364 self.assertNotIn(b'ustar/conttype', out) 365 366 367class GzipListTest(GzipTest, ListTest): 368 pass 369 370 371class Bz2ListTest(Bz2Test, ListTest): 372 pass 373 374 375class LzmaListTest(LzmaTest, ListTest): 376 pass 377 378 379class CommonReadTest(ReadTest): 380 381 def test_is_tarfile_erroneous(self): 382 with open(tmpname, "wb"): 383 pass 384 385 # is_tarfile works on filenames 386 self.assertFalse(tarfile.is_tarfile(tmpname)) 387 388 # is_tarfile works on path-like objects 389 self.assertFalse(tarfile.is_tarfile(os_helper.FakePath(tmpname))) 390 391 # is_tarfile works on file objects 392 with open(tmpname, "rb") as fobj: 393 self.assertFalse(tarfile.is_tarfile(fobj)) 394 395 # is_tarfile works on file-like objects 396 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 397 398 def test_is_tarfile_valid(self): 399 # is_tarfile works on filenames 400 self.assertTrue(tarfile.is_tarfile(self.tarname)) 401 402 # is_tarfile works on path-like objects 403 self.assertTrue(tarfile.is_tarfile(os_helper.FakePath(self.tarname))) 404 405 # is_tarfile works on file objects 406 with open(self.tarname, "rb") as fobj: 407 self.assertTrue(tarfile.is_tarfile(fobj)) 408 409 # is_tarfile works on file-like objects 410 with open(self.tarname, "rb") as fobj: 411 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 412 413 def test_is_tarfile_keeps_position(self): 414 # Test for issue44289: tarfile.is_tarfile() modifies 415 # file object's current position 416 with open(self.tarname, "rb") as fobj: 417 tarfile.is_tarfile(fobj) 418 self.assertEqual(fobj.tell(), 0) 419 420 with open(self.tarname, "rb") as fobj: 421 file_like = io.BytesIO(fobj.read()) 422 tarfile.is_tarfile(file_like) 423 self.assertEqual(file_like.tell(), 0) 424 425 def test_empty_tarfile(self): 426 # Test for issue6123: Allow opening empty archives. 427 # This test checks if tarfile.open() is able to open an empty tar 428 # archive successfully. Note that an empty tar archive is not the 429 # same as an empty file! 430 with tarfile.open(tmpname, self.mode.replace("r", "w")): 431 pass 432 try: 433 tar = tarfile.open(tmpname, self.mode) 434 tar.getnames() 435 except tarfile.ReadError: 436 self.fail("tarfile.open() failed on empty archive") 437 else: 438 self.assertListEqual(tar.getmembers(), []) 439 finally: 440 tar.close() 441 442 def test_non_existent_tarfile(self): 443 # Test for issue11513: prevent non-existent gzipped tarfiles raising 444 # multiple exceptions. 445 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 446 tarfile.open("xxx", self.mode) 447 448 def test_null_tarfile(self): 449 # Test for issue6123: Allow opening empty archives. 450 # This test guarantees that tarfile.open() does not treat an empty 451 # file as an empty tar archive. 452 with open(tmpname, "wb"): 453 pass 454 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 455 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 456 457 def test_ignore_zeros(self): 458 # Test TarFile's ignore_zeros option. 459 # generate 512 pseudorandom bytes 460 data = Random(0).randbytes(512) 461 for char in (b'\0', b'a'): 462 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 463 # are ignored correctly. 464 with self.open(tmpname, "w") as fobj: 465 fobj.write(char * 1024) 466 tarinfo = tarfile.TarInfo("foo") 467 tarinfo.size = len(data) 468 fobj.write(tarinfo.tobuf()) 469 fobj.write(data) 470 471 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 472 try: 473 self.assertListEqual(tar.getnames(), ["foo"], 474 "ignore_zeros=True should have skipped the %r-blocks" % 475 char) 476 finally: 477 tar.close() 478 479 def test_premature_end_of_archive(self): 480 for size in (512, 600, 1024, 1200): 481 with tarfile.open(tmpname, "w:") as tar: 482 t = tarfile.TarInfo("foo") 483 t.size = 1024 484 tar.addfile(t, io.BytesIO(b"a" * 1024)) 485 486 with open(tmpname, "r+b") as fobj: 487 fobj.truncate(size) 488 489 with tarfile.open(tmpname) as tar: 490 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 491 for t in tar: 492 pass 493 494 with tarfile.open(tmpname) as tar: 495 t = tar.next() 496 497 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 498 tar.extract(t, TEMPDIR, filter='data') 499 500 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 501 tar.extractfile(t).read() 502 503 def test_length_zero_header(self): 504 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 505 # with an exception 506 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 507 with tarfile.open(support.findfile('recursion.tar', subdir='archivetestdata')): 508 pass 509 510 def test_extractfile_attrs(self): 511 # gh-74468: TarFile.name must name a file, not a parent archive. 512 file = self.tar.getmember('ustar/regtype') 513 with self.tar.extractfile(file) as fobj: 514 self.assertEqual(fobj.name, 'ustar/regtype') 515 self.assertRaises(AttributeError, fobj.fileno) 516 self.assertEqual(fobj.mode, 'rb') 517 self.assertIs(fobj.readable(), True) 518 self.assertIs(fobj.writable(), False) 519 if self.is_stream: 520 self.assertRaises(AttributeError, fobj.seekable) 521 else: 522 self.assertIs(fobj.seekable(), True) 523 self.assertIs(fobj.closed, False) 524 self.assertIs(fobj.closed, True) 525 self.assertEqual(fobj.name, 'ustar/regtype') 526 self.assertRaises(AttributeError, fobj.fileno) 527 self.assertEqual(fobj.mode, 'rb') 528 self.assertIs(fobj.readable(), True) 529 self.assertIs(fobj.writable(), False) 530 if self.is_stream: 531 self.assertRaises(AttributeError, fobj.seekable) 532 else: 533 self.assertIs(fobj.seekable(), True) 534 535 536class MiscReadTestBase(CommonReadTest): 537 is_stream = False 538 539 def test_no_name_argument(self): 540 with open(self.tarname, "rb") as fobj: 541 self.assertIsInstance(fobj.name, str) 542 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 543 self.assertIsInstance(tar.name, str) 544 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 545 546 def test_no_name_attribute(self): 547 with open(self.tarname, "rb") as fobj: 548 data = fobj.read() 549 fobj = io.BytesIO(data) 550 self.assertRaises(AttributeError, getattr, fobj, "name") 551 tar = tarfile.open(fileobj=fobj, mode=self.mode) 552 self.assertIsNone(tar.name) 553 554 def test_empty_name_attribute(self): 555 with open(self.tarname, "rb") as fobj: 556 data = fobj.read() 557 fobj = io.BytesIO(data) 558 fobj.name = "" 559 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 560 self.assertIsNone(tar.name) 561 562 def test_int_name_attribute(self): 563 # Issue 21044: tarfile.open() should handle fileobj with an integer 564 # 'name' attribute. 565 fd = os.open(self.tarname, os.O_RDONLY) 566 with open(fd, 'rb') as fobj: 567 self.assertIsInstance(fobj.name, int) 568 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 569 self.assertIsNone(tar.name) 570 571 def test_bytes_name_attribute(self): 572 tarname = os.fsencode(self.tarname) 573 with open(tarname, 'rb') as fobj: 574 self.assertIsInstance(fobj.name, bytes) 575 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 576 self.assertIsInstance(tar.name, bytes) 577 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 578 579 def test_pathlike_name(self, tarname=None): 580 if tarname is None: 581 tarname = self.tarname 582 expected = os.path.abspath(tarname) 583 tarname = os_helper.FakePath(tarname) 584 with tarfile.open(tarname, mode=self.mode) as tar: 585 self.assertEqual(tar.name, expected) 586 with self.taropen(tarname) as tar: 587 self.assertEqual(tar.name, expected) 588 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 589 self.assertEqual(tar.name, expected) 590 if self.suffix == '': 591 with tarfile.TarFile(tarname, mode='r') as tar: 592 self.assertEqual(tar.name, expected) 593 594 def test_pathlike_bytes_name(self): 595 self.test_pathlike_name(os.fsencode(self.tarname)) 596 597 def test_illegal_mode_arg(self): 598 with open(tmpname, 'wb'): 599 pass 600 with self.assertRaisesRegex(ValueError, 'mode must be '): 601 tar = self.taropen(tmpname, 'q') 602 with self.assertRaisesRegex(ValueError, 'mode must be '): 603 tar = self.taropen(tmpname, 'rw') 604 with self.assertRaisesRegex(ValueError, 'mode must be '): 605 tar = self.taropen(tmpname, '') 606 607 def test_fileobj_with_offset(self): 608 # Skip the first member and store values from the second member 609 # of the testtar. 610 tar = tarfile.open(self.tarname, mode=self.mode) 611 try: 612 tar.next() 613 t = tar.next() 614 name = t.name 615 offset = t.offset 616 with tar.extractfile(t) as f: 617 data = f.read() 618 finally: 619 tar.close() 620 621 # Open the testtar and seek to the offset of the second member. 622 with self.open(self.tarname) as fobj: 623 fobj.seek(offset) 624 625 # Test if the tarfile starts with the second member. 626 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 627 t = tar.next() 628 self.assertEqual(t.name, name) 629 # Read to the end of fileobj and test if seeking back to the 630 # beginning works. 631 tar.getmembers() 632 self.assertEqual(tar.extractfile(t).read(), data, 633 "seek back did not work") 634 635 def test_fail_comp(self): 636 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 637 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 638 with open(tarname, "rb") as fobj: 639 self.assertRaises(tarfile.ReadError, tarfile.open, 640 fileobj=fobj, mode=self.mode) 641 642 def test_v7_dirtype(self): 643 # Test old style dirtype member (bug #1336623): 644 # Old V7 tars create directory members using an AREGTYPE 645 # header with a "/" appended to the filename field. 646 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 647 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 648 "v7 dirtype failed") 649 650 def test_xstar_type(self): 651 # The xstar format stores extra atime and ctime fields inside the 652 # space reserved for the prefix field. The prefix field must be 653 # ignored in this case, otherwise it will mess up the name. 654 try: 655 self.tar.getmember("misc/regtype-xstar") 656 except KeyError: 657 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 658 659 def test_check_members(self): 660 for tarinfo in self.tar: 661 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 662 "wrong mtime for %s" % tarinfo.name) 663 if not tarinfo.name.startswith("ustar/"): 664 continue 665 self.assertEqual(tarinfo.uname, "tarfile", 666 "wrong uname for %s" % tarinfo.name) 667 668 def test_find_members(self): 669 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 670 "could not find all members") 671 672 @unittest.skipUnless(hasattr(os, "link"), 673 "Missing hardlink implementation") 674 @os_helper.skip_unless_symlink 675 def test_extract_hardlink(self): 676 # Test hardlink extraction (e.g. bug #857297). 677 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 678 tar.extract("ustar/regtype", TEMPDIR, filter='data') 679 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 680 681 tar.extract("ustar/lnktype", TEMPDIR, filter='data') 682 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 683 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 684 data = f.read() 685 self.assertEqual(sha256sum(data), sha256_regtype) 686 687 tar.extract("ustar/symtype", TEMPDIR, filter='data') 688 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 689 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 690 data = f.read() 691 self.assertEqual(sha256sum(data), sha256_regtype) 692 693 @os_helper.skip_unless_working_chmod 694 def test_extractall(self): 695 # Test if extractall() correctly restores directory permissions 696 # and times (see issue1735). 697 tar = tarfile.open(tarname, encoding="iso8859-1") 698 DIR = os.path.join(TEMPDIR, "extractall") 699 os.mkdir(DIR) 700 try: 701 directories = [t for t in tar if t.isdir()] 702 tar.extractall(DIR, directories, filter='fully_trusted') 703 for tarinfo in directories: 704 path = os.path.join(DIR, tarinfo.name) 705 if sys.platform != "win32": 706 # Win32 has no support for fine grained permissions. 707 self.assertEqual(tarinfo.mode & 0o777, 708 os.stat(path).st_mode & 0o777, 709 tarinfo.name) 710 def format_mtime(mtime): 711 if isinstance(mtime, float): 712 return "{} ({})".format(mtime, mtime.hex()) 713 else: 714 return "{!r} (int)".format(mtime) 715 file_mtime = os.path.getmtime(path) 716 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 717 format_mtime(tarinfo.mtime), 718 format_mtime(file_mtime), 719 path) 720 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 721 finally: 722 tar.close() 723 os_helper.rmtree(DIR) 724 725 @os_helper.skip_unless_working_chmod 726 def test_extract_directory(self): 727 dirtype = "ustar/dirtype" 728 DIR = os.path.join(TEMPDIR, "extractdir") 729 os.mkdir(DIR) 730 try: 731 with tarfile.open(tarname, encoding="iso8859-1") as tar: 732 tarinfo = tar.getmember(dirtype) 733 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 734 extracted = os.path.join(DIR, dirtype) 735 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 736 if sys.platform != "win32": 737 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 738 finally: 739 os_helper.rmtree(DIR) 740 741 def test_deprecation_if_no_filter_passed_to_extractall(self): 742 DIR = pathlib.Path(TEMPDIR) / "extractall" 743 with ( 744 os_helper.temp_dir(DIR), 745 tarfile.open(tarname, encoding="iso8859-1") as tar 746 ): 747 directories = [t for t in tar if t.isdir()] 748 with self.assertWarnsRegex(DeprecationWarning, "Use the filter argument") as cm: 749 tar.extractall(DIR, directories) 750 # check that the stacklevel of the deprecation warning is correct: 751 self.assertEqual(cm.filename, __file__) 752 753 def test_deprecation_if_no_filter_passed_to_extract(self): 754 dirtype = "ustar/dirtype" 755 DIR = pathlib.Path(TEMPDIR) / "extractall" 756 with ( 757 os_helper.temp_dir(DIR), 758 tarfile.open(tarname, encoding="iso8859-1") as tar 759 ): 760 tarinfo = tar.getmember(dirtype) 761 with self.assertWarnsRegex(DeprecationWarning, "Use the filter argument") as cm: 762 tar.extract(tarinfo, path=DIR) 763 # check that the stacklevel of the deprecation warning is correct: 764 self.assertEqual(cm.filename, __file__) 765 766 def test_extractall_pathlike_dir(self): 767 DIR = os.path.join(TEMPDIR, "extractall") 768 with os_helper.temp_dir(DIR), \ 769 tarfile.open(tarname, encoding="iso8859-1") as tar: 770 directories = [t for t in tar if t.isdir()] 771 tar.extractall(os_helper.FakePath(DIR), directories, filter='fully_trusted') 772 for tarinfo in directories: 773 path = os.path.join(DIR, tarinfo.name) 774 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 775 776 def test_extract_pathlike_dir(self): 777 dirtype = "ustar/dirtype" 778 DIR = os.path.join(TEMPDIR, "extractall") 779 with os_helper.temp_dir(DIR), \ 780 tarfile.open(tarname, encoding="iso8859-1") as tar: 781 tarinfo = tar.getmember(dirtype) 782 tar.extract(tarinfo, path=os_helper.FakePath(DIR), filter='fully_trusted') 783 extracted = os.path.join(DIR, dirtype) 784 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 785 786 def test_init_close_fobj(self): 787 # Issue #7341: Close the internal file object in the TarFile 788 # constructor in case of an error. For the test we rely on 789 # the fact that opening an empty file raises a ReadError. 790 empty = os.path.join(TEMPDIR, "empty") 791 with open(empty, "wb") as fobj: 792 fobj.write(b"") 793 794 try: 795 tar = object.__new__(tarfile.TarFile) 796 try: 797 tar.__init__(empty) 798 except tarfile.ReadError: 799 self.assertTrue(tar.fileobj.closed) 800 else: 801 self.fail("ReadError not raised") 802 finally: 803 os_helper.unlink(empty) 804 805 def test_parallel_iteration(self): 806 # Issue #16601: Restarting iteration over tarfile continued 807 # from where it left off. 808 with tarfile.open(self.tarname) as tar: 809 for m1, m2 in zip(tar, tar): 810 self.assertEqual(m1.offset, m2.offset) 811 self.assertEqual(m1.get_info(), m2.get_info()) 812 813 @unittest.skipIf(zlib is None, "requires zlib") 814 def test_zlib_error_does_not_leak(self): 815 # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when 816 # parsing certain types of invalid data 817 with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock: 818 mock.side_effect = zlib.error 819 with self.assertRaises(tarfile.ReadError): 820 tarfile.open(self.tarname) 821 822 def test_next_on_empty_tarfile(self): 823 fd = io.BytesIO() 824 tf = tarfile.open(fileobj=fd, mode="w") 825 tf.close() 826 827 fd.seek(0) 828 with tarfile.open(fileobj=fd, mode="r|") as tf: 829 self.assertEqual(tf.next(), None) 830 831 fd.seek(0) 832 with tarfile.open(fileobj=fd, mode="r") as tf: 833 self.assertEqual(tf.next(), None) 834 835class MiscReadTest(MiscReadTestBase, unittest.TestCase): 836 test_fail_comp = None 837 838class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 839 pass 840 841class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 842 pass 843 844class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 845 pass 846 847 848class StreamReadTest(CommonReadTest, unittest.TestCase): 849 850 prefix="r|" 851 is_stream = True 852 853 def test_read_through(self): 854 # Issue #11224: A poorly designed _FileInFile.read() method 855 # caused seeking errors with stream tar files. 856 for tarinfo in self.tar: 857 if not tarinfo.isreg(): 858 continue 859 with self.tar.extractfile(tarinfo) as fobj: 860 while True: 861 try: 862 buf = fobj.read(512) 863 except tarfile.StreamError: 864 self.fail("simple read-through using " 865 "TarFile.extractfile() failed") 866 if not buf: 867 break 868 869 def test_fileobj_regular_file(self): 870 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 871 with self.tar.extractfile(tarinfo) as fobj: 872 data = fobj.read() 873 self.assertEqual(len(data), tarinfo.size, 874 "regular file extraction failed") 875 self.assertEqual(sha256sum(data), sha256_regtype, 876 "regular file extraction failed") 877 878 def test_provoke_stream_error(self): 879 tarinfos = self.tar.getmembers() 880 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 881 self.assertRaises(tarfile.StreamError, f.read) 882 883 def test_compare_members(self): 884 tar1 = tarfile.open(tarname, encoding="iso8859-1") 885 try: 886 tar2 = self.tar 887 888 while True: 889 t1 = tar1.next() 890 t2 = tar2.next() 891 if t1 is None: 892 break 893 self.assertIsNotNone(t2, "stream.next() failed.") 894 895 if t2.islnk() or t2.issym(): 896 with self.assertRaises(tarfile.StreamError): 897 tar2.extractfile(t2) 898 continue 899 900 v1 = tar1.extractfile(t1) 901 v2 = tar2.extractfile(t2) 902 if v1 is None: 903 continue 904 self.assertIsNotNone(v2, "stream.extractfile() failed") 905 self.assertEqual(v1.read(), v2.read(), 906 "stream extraction failed") 907 finally: 908 tar1.close() 909 910class GzipStreamReadTest(GzipTest, StreamReadTest): 911 pass 912 913class Bz2StreamReadTest(Bz2Test, StreamReadTest): 914 pass 915 916class LzmaStreamReadTest(LzmaTest, StreamReadTest): 917 pass 918 919class TarStreamModeReadTest(StreamModeTest, unittest.TestCase): 920 921 def test_stream_mode_no_cache(self): 922 for _ in self.tar: 923 pass 924 self.assertEqual(self.tar.members, []) 925 926class GzipStreamModeReadTest(GzipTest, TarStreamModeReadTest): 927 pass 928 929class Bz2StreamModeReadTest(Bz2Test, TarStreamModeReadTest): 930 pass 931 932class LzmaStreamModeReadTest(LzmaTest, TarStreamModeReadTest): 933 pass 934 935class DetectReadTest(TarTest, unittest.TestCase): 936 def _testfunc_file(self, name, mode): 937 try: 938 tar = tarfile.open(name, mode) 939 except tarfile.ReadError as e: 940 self.fail() 941 else: 942 tar.close() 943 944 def _testfunc_fileobj(self, name, mode): 945 try: 946 with open(name, "rb") as f: 947 tar = tarfile.open(name, mode, fileobj=f) 948 except tarfile.ReadError as e: 949 self.fail() 950 else: 951 tar.close() 952 953 def _test_modes(self, testfunc): 954 if self.suffix: 955 with self.assertRaises(tarfile.ReadError): 956 tarfile.open(tarname, mode="r:" + self.suffix) 957 with self.assertRaises(tarfile.ReadError): 958 tarfile.open(tarname, mode="r|" + self.suffix) 959 with self.assertRaises(tarfile.ReadError): 960 tarfile.open(self.tarname, mode="r:") 961 with self.assertRaises(tarfile.ReadError): 962 tarfile.open(self.tarname, mode="r|") 963 testfunc(self.tarname, "r") 964 testfunc(self.tarname, "r:" + self.suffix) 965 testfunc(self.tarname, "r:*") 966 testfunc(self.tarname, "r|" + self.suffix) 967 testfunc(self.tarname, "r|*") 968 969 def test_detect_file(self): 970 self._test_modes(self._testfunc_file) 971 972 def test_detect_fileobj(self): 973 self._test_modes(self._testfunc_fileobj) 974 975class GzipDetectReadTest(GzipTest, DetectReadTest): 976 pass 977 978class Bz2DetectReadTest(Bz2Test, DetectReadTest): 979 def test_detect_stream_bz2(self): 980 # Originally, tarfile's stream detection looked for the string 981 # "BZh91" at the start of the file. This is incorrect because 982 # the '9' represents the blocksize (900,000 bytes). If the file was 983 # compressed using another blocksize autodetection fails. 984 with open(tarname, "rb") as fobj: 985 data = fobj.read() 986 987 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 988 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 989 fobj.write(data) 990 991 self._testfunc_file(tmpname, "r|*") 992 993class LzmaDetectReadTest(LzmaTest, DetectReadTest): 994 pass 995 996 997class GzipBrokenHeaderCorrectException(GzipTest, unittest.TestCase): 998 """ 999 See: https://github.com/python/cpython/issues/107396 1000 """ 1001 def runTest(self): 1002 f = io.BytesIO( 1003 b'\x1f\x8b' # header 1004 b'\x08' # compression method 1005 b'\x04' # flags 1006 b'\0\0\0\0\0\0' # timestamp, compression data, OS ID 1007 b'\0\x01' # size 1008 b'\0\0\0\0\0' # corrupt data (zeros) 1009 ) 1010 with self.assertRaises(tarfile.ReadError): 1011 tarfile.open(fileobj=f, mode='r|gz') 1012 1013 1014class MemberReadTest(ReadTest, unittest.TestCase): 1015 1016 def _test_member(self, tarinfo, chksum=None, **kwargs): 1017 if chksum is not None: 1018 with self.tar.extractfile(tarinfo) as f: 1019 self.assertEqual(sha256sum(f.read()), chksum, 1020 "wrong sha256sum for %s" % tarinfo.name) 1021 1022 kwargs["mtime"] = 0o7606136617 1023 kwargs["uid"] = 1000 1024 kwargs["gid"] = 100 1025 if "old-v7" not in tarinfo.name: 1026 # V7 tar can't handle alphabetic owners. 1027 kwargs["uname"] = "tarfile" 1028 kwargs["gname"] = "tarfile" 1029 for k, v in kwargs.items(): 1030 self.assertEqual(getattr(tarinfo, k), v, 1031 "wrong value in %s field of %s" % (k, tarinfo.name)) 1032 1033 def test_find_regtype(self): 1034 tarinfo = self.tar.getmember("ustar/regtype") 1035 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1036 1037 def test_find_conttype(self): 1038 tarinfo = self.tar.getmember("ustar/conttype") 1039 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1040 1041 def test_find_dirtype(self): 1042 tarinfo = self.tar.getmember("ustar/dirtype") 1043 self._test_member(tarinfo, size=0) 1044 1045 def test_find_dirtype_with_size(self): 1046 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 1047 self._test_member(tarinfo, size=255) 1048 1049 def test_find_lnktype(self): 1050 tarinfo = self.tar.getmember("ustar/lnktype") 1051 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 1052 1053 def test_find_symtype(self): 1054 tarinfo = self.tar.getmember("ustar/symtype") 1055 self._test_member(tarinfo, size=0, linkname="regtype") 1056 1057 def test_find_blktype(self): 1058 tarinfo = self.tar.getmember("ustar/blktype") 1059 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 1060 1061 def test_find_chrtype(self): 1062 tarinfo = self.tar.getmember("ustar/chrtype") 1063 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 1064 1065 def test_find_fifotype(self): 1066 tarinfo = self.tar.getmember("ustar/fifotype") 1067 self._test_member(tarinfo, size=0) 1068 1069 def test_find_sparse(self): 1070 tarinfo = self.tar.getmember("ustar/sparse") 1071 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 1072 1073 def test_find_gnusparse(self): 1074 tarinfo = self.tar.getmember("gnu/sparse") 1075 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 1076 1077 def test_find_gnusparse_00(self): 1078 tarinfo = self.tar.getmember("gnu/sparse-0.0") 1079 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 1080 1081 def test_find_gnusparse_01(self): 1082 tarinfo = self.tar.getmember("gnu/sparse-0.1") 1083 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 1084 1085 def test_find_gnusparse_10(self): 1086 tarinfo = self.tar.getmember("gnu/sparse-1.0") 1087 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 1088 1089 def test_find_umlauts(self): 1090 tarinfo = self.tar.getmember("ustar/umlauts-" 1091 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1092 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1093 1094 def test_find_ustar_longname(self): 1095 name = "ustar/" + "12345/" * 39 + "1234567/longname" 1096 self.assertIn(name, self.tar.getnames()) 1097 1098 def test_find_regtype_oldv7(self): 1099 tarinfo = self.tar.getmember("misc/regtype-old-v7") 1100 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1101 1102 def test_find_pax_umlauts(self): 1103 self.tar.close() 1104 self.tar = tarfile.open(self.tarname, mode=self.mode, 1105 encoding="iso8859-1") 1106 tarinfo = self.tar.getmember("pax/umlauts-" 1107 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1108 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1109 1110 1111class LongnameTest: 1112 1113 def test_read_longname(self): 1114 # Test reading of longname (bug #1471427). 1115 longname = self.subdir + "/" + "123/" * 125 + "longname" 1116 try: 1117 tarinfo = self.tar.getmember(longname) 1118 except KeyError: 1119 self.fail("longname not found") 1120 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 1121 "read longname as dirtype") 1122 1123 def test_read_longlink(self): 1124 longname = self.subdir + "/" + "123/" * 125 + "longname" 1125 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 1126 try: 1127 tarinfo = self.tar.getmember(longlink) 1128 except KeyError: 1129 self.fail("longlink not found") 1130 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 1131 1132 def test_truncated_longname(self): 1133 longname = self.subdir + "/" + "123/" * 125 + "longname" 1134 tarinfo = self.tar.getmember(longname) 1135 offset = tarinfo.offset 1136 self.tar.fileobj.seek(offset) 1137 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 1138 with self.assertRaises(tarfile.ReadError): 1139 tarfile.open(name="foo.tar", fileobj=fobj) 1140 1141 def test_header_offset(self): 1142 # Test if the start offset of the TarInfo object includes 1143 # the preceding extended header. 1144 longname = self.subdir + "/" + "123/" * 125 + "longname" 1145 offset = self.tar.getmember(longname).offset 1146 with open(tarname, "rb") as fobj: 1147 fobj.seek(offset) 1148 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 1149 "iso8859-1", "strict") 1150 self.assertEqual(tarinfo.type, self.longnametype) 1151 1152 def test_longname_directory(self): 1153 # Test reading a longlink directory. Issue #47231. 1154 longdir = ('a' * 101) + '/' 1155 with os_helper.temp_cwd(): 1156 with tarfile.open(tmpname, 'w') as tar: 1157 tar.format = self.format 1158 try: 1159 os.mkdir(longdir) 1160 tar.add(longdir) 1161 finally: 1162 os.rmdir(longdir.rstrip("/")) 1163 with tarfile.open(tmpname) as tar: 1164 self.assertIsNotNone(tar.getmember(longdir)) 1165 self.assertIsNotNone(tar.getmember(longdir.removesuffix('/'))) 1166 1167class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 1168 1169 subdir = "gnu" 1170 longnametype = tarfile.GNUTYPE_LONGNAME 1171 format = tarfile.GNU_FORMAT 1172 1173 # Since 3.2 tarfile is supposed to accurately restore sparse members and 1174 # produce files with holes. This is what we actually want to test here. 1175 # Unfortunately, not all platforms/filesystems support sparse files, and 1176 # even on platforms that do it is non-trivial to make reliable assertions 1177 # about holes in files. Therefore, we first do one basic test which works 1178 # an all platforms, and after that a test that will work only on 1179 # platforms/filesystems that prove to support sparse files. 1180 def _test_sparse_file(self, name): 1181 self.tar.extract(name, TEMPDIR, filter='data') 1182 filename = os.path.join(TEMPDIR, name) 1183 with open(filename, "rb") as fobj: 1184 data = fobj.read() 1185 self.assertEqual(sha256sum(data), sha256_sparse, 1186 "wrong sha256sum for %s" % name) 1187 1188 if self._fs_supports_holes(): 1189 s = os.stat(filename) 1190 self.assertLess(s.st_blocks * 512, s.st_size) 1191 1192 def test_sparse_file_old(self): 1193 self._test_sparse_file("gnu/sparse") 1194 1195 def test_sparse_file_00(self): 1196 self._test_sparse_file("gnu/sparse-0.0") 1197 1198 def test_sparse_file_01(self): 1199 self._test_sparse_file("gnu/sparse-0.1") 1200 1201 def test_sparse_file_10(self): 1202 self._test_sparse_file("gnu/sparse-1.0") 1203 1204 @staticmethod 1205 def _fs_supports_holes(): 1206 # Return True if the platform knows the st_blocks stat attribute and 1207 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1208 # store holes of 4 KiB in files. 1209 # 1210 # The function returns False if page size is larger than 4 KiB. 1211 # For example, ppc64 uses pages of 64 KiB. 1212 if sys.platform.startswith(("linux", "android")): 1213 # Linux evidentially has 512 byte st_blocks units. 1214 name = os.path.join(TEMPDIR, "sparse-test") 1215 with open(name, "wb") as fobj: 1216 # Seek to "punch a hole" of 4 KiB 1217 fobj.seek(4096) 1218 fobj.write(b'x' * 4096) 1219 fobj.truncate() 1220 s = os.stat(name) 1221 os_helper.unlink(name) 1222 return (s.st_blocks * 512 < s.st_size) 1223 else: 1224 return False 1225 1226 1227class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1228 1229 subdir = "pax" 1230 longnametype = tarfile.XHDTYPE 1231 format = tarfile.PAX_FORMAT 1232 1233 def test_pax_global_headers(self): 1234 tar = tarfile.open(tarname, encoding="iso8859-1") 1235 try: 1236 tarinfo = tar.getmember("pax/regtype1") 1237 self.assertEqual(tarinfo.uname, "foo") 1238 self.assertEqual(tarinfo.gname, "bar") 1239 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1240 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1241 1242 tarinfo = tar.getmember("pax/regtype2") 1243 self.assertEqual(tarinfo.uname, "") 1244 self.assertEqual(tarinfo.gname, "bar") 1245 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1246 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1247 1248 tarinfo = tar.getmember("pax/regtype3") 1249 self.assertEqual(tarinfo.uname, "tarfile") 1250 self.assertEqual(tarinfo.gname, "tarfile") 1251 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1252 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1253 finally: 1254 tar.close() 1255 1256 def test_pax_number_fields(self): 1257 # All following number fields are read from the pax header. 1258 tar = tarfile.open(tarname, encoding="iso8859-1") 1259 try: 1260 tarinfo = tar.getmember("pax/regtype4") 1261 self.assertEqual(tarinfo.size, 7011) 1262 self.assertEqual(tarinfo.uid, 123) 1263 self.assertEqual(tarinfo.gid, 123) 1264 self.assertEqual(tarinfo.mtime, 1041808783.0) 1265 self.assertEqual(type(tarinfo.mtime), float) 1266 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1267 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1268 finally: 1269 tar.close() 1270 1271 def test_pax_header_bad_formats(self): 1272 # The fields from the pax header have priority over the 1273 # TarInfo. 1274 pax_header_replacements = ( 1275 b" foo=bar\n", 1276 b"0 \n", 1277 b"1 \n", 1278 b"2 \n", 1279 b"3 =\n", 1280 b"4 =a\n", 1281 b"1000000 foo=bar\n", 1282 b"0 foo=bar\n", 1283 b"-12 foo=bar\n", 1284 b"000000000000000000000000036 foo=bar\n", 1285 ) 1286 pax_headers = {"foo": "bar"} 1287 1288 for replacement in pax_header_replacements: 1289 with self.subTest(header=replacement): 1290 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1291 encoding="iso8859-1") 1292 try: 1293 t = tarfile.TarInfo() 1294 t.name = "pax" # non-ASCII 1295 t.uid = 1 1296 t.pax_headers = pax_headers 1297 tar.addfile(t) 1298 finally: 1299 tar.close() 1300 1301 with open(tmpname, "rb") as f: 1302 data = f.read() 1303 self.assertIn(b"11 foo=bar\n", data) 1304 data = data.replace(b"11 foo=bar\n", replacement) 1305 1306 with open(tmpname, "wb") as f: 1307 f.truncate() 1308 f.write(data) 1309 1310 with self.assertRaisesRegex(tarfile.ReadError, r"method tar: ReadError\('invalid header'\)"): 1311 tarfile.open(tmpname, encoding="iso8859-1") 1312 1313 1314class WriteTestBase(TarTest): 1315 # Put all write tests in here that are supposed to be tested 1316 # in all possible mode combinations. 1317 1318 def test_fileobj_no_close(self): 1319 fobj = io.BytesIO() 1320 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1321 tar.addfile(tarfile.TarInfo("foo")) 1322 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1323 # Issue #20238: Incomplete gzip output with mode="w:gz" 1324 data = fobj.getvalue() 1325 del tar 1326 support.gc_collect() 1327 self.assertFalse(fobj.closed) 1328 self.assertEqual(data, fobj.getvalue()) 1329 1330 def test_eof_marker(self): 1331 # Make sure an end of archive marker is written (two zero blocks). 1332 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1333 # So, we create an archive that has exactly 10240 bytes without the 1334 # marker, and has 20480 bytes once the marker is written. 1335 with tarfile.open(tmpname, self.mode) as tar: 1336 t = tarfile.TarInfo("foo") 1337 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1338 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1339 1340 with self.open(tmpname, "rb") as fobj: 1341 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1342 1343 1344class WriteTest(WriteTestBase, unittest.TestCase): 1345 1346 prefix = "w:" 1347 1348 def test_100_char_name(self): 1349 # The name field in a tar header stores strings of at most 100 chars. 1350 # If a string is shorter than 100 chars it has to be padded with '\0', 1351 # which implies that a string of exactly 100 chars is stored without 1352 # a trailing '\0'. 1353 name = "0123456789" * 10 1354 tar = tarfile.open(tmpname, self.mode) 1355 try: 1356 t = tarfile.TarInfo(name) 1357 tar.addfile(t) 1358 finally: 1359 tar.close() 1360 1361 tar = tarfile.open(tmpname) 1362 try: 1363 self.assertEqual(tar.getnames()[0], name, 1364 "failed to store 100 char filename") 1365 finally: 1366 tar.close() 1367 1368 def test_tar_size(self): 1369 # Test for bug #1013882. 1370 tar = tarfile.open(tmpname, self.mode) 1371 try: 1372 path = os.path.join(TEMPDIR, "file") 1373 with open(path, "wb") as fobj: 1374 fobj.write(b"aaa") 1375 tar.add(path) 1376 finally: 1377 tar.close() 1378 self.assertGreater(os.path.getsize(tmpname), 0, 1379 "tarfile is empty") 1380 1381 # The test_*_size tests test for bug #1167128. 1382 def test_file_size(self): 1383 tar = tarfile.open(tmpname, self.mode) 1384 try: 1385 path = os.path.join(TEMPDIR, "file") 1386 with open(path, "wb"): 1387 pass 1388 tarinfo = tar.gettarinfo(path) 1389 self.assertEqual(tarinfo.size, 0) 1390 1391 with open(path, "wb") as fobj: 1392 fobj.write(b"aaa") 1393 tarinfo = tar.gettarinfo(path) 1394 self.assertEqual(tarinfo.size, 3) 1395 finally: 1396 tar.close() 1397 1398 def test_directory_size(self): 1399 path = os.path.join(TEMPDIR, "directory") 1400 os.mkdir(path) 1401 try: 1402 tar = tarfile.open(tmpname, self.mode) 1403 try: 1404 tarinfo = tar.gettarinfo(path) 1405 self.assertEqual(tarinfo.size, 0) 1406 finally: 1407 tar.close() 1408 finally: 1409 os_helper.rmdir(path) 1410 1411 # mock the following: 1412 # os.listdir: so we know that files are in the wrong order 1413 def test_ordered_recursion(self): 1414 path = os.path.join(TEMPDIR, "directory") 1415 os.mkdir(path) 1416 open(os.path.join(path, "1"), "a").close() 1417 open(os.path.join(path, "2"), "a").close() 1418 try: 1419 tar = tarfile.open(tmpname, self.mode) 1420 try: 1421 with unittest.mock.patch('os.listdir') as mock_listdir: 1422 mock_listdir.return_value = ["2", "1"] 1423 tar.add(path) 1424 paths = [] 1425 for m in tar.getmembers(): 1426 paths.append(os.path.split(m.name)[-1]) 1427 self.assertEqual(paths, ["directory", "1", "2"]); 1428 finally: 1429 tar.close() 1430 finally: 1431 os_helper.unlink(os.path.join(path, "1")) 1432 os_helper.unlink(os.path.join(path, "2")) 1433 os_helper.rmdir(path) 1434 1435 def test_gettarinfo_pathlike_name(self): 1436 with tarfile.open(tmpname, self.mode) as tar: 1437 path = os.path.join(TEMPDIR, "file") 1438 with open(path, "wb") as fobj: 1439 fobj.write(b"aaa") 1440 tarinfo = tar.gettarinfo(os_helper.FakePath(path)) 1441 tarinfo2 = tar.gettarinfo(path) 1442 self.assertIsInstance(tarinfo.name, str) 1443 self.assertEqual(tarinfo.name, tarinfo2.name) 1444 self.assertEqual(tarinfo.size, 3) 1445 1446 @unittest.skipUnless(hasattr(os, "link"), 1447 "Missing hardlink implementation") 1448 def test_link_size(self): 1449 link = os.path.join(TEMPDIR, "link") 1450 target = os.path.join(TEMPDIR, "link_target") 1451 with open(target, "wb") as fobj: 1452 fobj.write(b"aaa") 1453 try: 1454 os.link(target, link) 1455 except PermissionError as e: 1456 self.skipTest('os.link(): %s' % e) 1457 try: 1458 tar = tarfile.open(tmpname, self.mode) 1459 try: 1460 # Record the link target in the inodes list. 1461 tar.gettarinfo(target) 1462 tarinfo = tar.gettarinfo(link) 1463 self.assertEqual(tarinfo.size, 0) 1464 finally: 1465 tar.close() 1466 finally: 1467 os_helper.unlink(target) 1468 os_helper.unlink(link) 1469 1470 @os_helper.skip_unless_symlink 1471 def test_symlink_size(self): 1472 path = os.path.join(TEMPDIR, "symlink") 1473 os.symlink("link_target", path) 1474 try: 1475 tar = tarfile.open(tmpname, self.mode) 1476 try: 1477 tarinfo = tar.gettarinfo(path) 1478 self.assertEqual(tarinfo.size, 0) 1479 finally: 1480 tar.close() 1481 finally: 1482 os_helper.unlink(path) 1483 1484 def test_add_self(self): 1485 # Test for #1257255. 1486 dstname = os.path.abspath(tmpname) 1487 tar = tarfile.open(tmpname, self.mode) 1488 try: 1489 self.assertEqual(tar.name, dstname, 1490 "archive name must be absolute") 1491 tar.add(dstname) 1492 self.assertEqual(tar.getnames(), [], 1493 "added the archive to itself") 1494 1495 with os_helper.change_cwd(TEMPDIR): 1496 tar.add(dstname) 1497 self.assertEqual(tar.getnames(), [], 1498 "added the archive to itself") 1499 finally: 1500 tar.close() 1501 1502 def test_filter(self): 1503 tempdir = os.path.join(TEMPDIR, "filter") 1504 os.mkdir(tempdir) 1505 try: 1506 for name in ("foo", "bar", "baz"): 1507 name = os.path.join(tempdir, name) 1508 os_helper.create_empty_file(name) 1509 1510 def filter(tarinfo): 1511 if os.path.basename(tarinfo.name) == "bar": 1512 return 1513 tarinfo.uid = 123 1514 tarinfo.uname = "foo" 1515 return tarinfo 1516 1517 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1518 try: 1519 tar.add(tempdir, arcname="empty_dir", filter=filter) 1520 finally: 1521 tar.close() 1522 1523 # Verify that filter is a keyword-only argument 1524 with self.assertRaises(TypeError): 1525 tar.add(tempdir, "empty_dir", True, None, filter) 1526 1527 tar = tarfile.open(tmpname, "r") 1528 try: 1529 for tarinfo in tar: 1530 self.assertEqual(tarinfo.uid, 123) 1531 self.assertEqual(tarinfo.uname, "foo") 1532 self.assertEqual(len(tar.getmembers()), 3) 1533 finally: 1534 tar.close() 1535 finally: 1536 os_helper.rmtree(tempdir) 1537 1538 # Guarantee that stored pathnames are not modified. Don't 1539 # remove ./ or ../ or double slashes. Still make absolute 1540 # pathnames relative. 1541 # For details see bug #6054. 1542 def _test_pathname(self, path, cmp_path=None, dir=False): 1543 # Create a tarfile with an empty member named path 1544 # and compare the stored name with the original. 1545 foo = os.path.join(TEMPDIR, "foo") 1546 if not dir: 1547 os_helper.create_empty_file(foo) 1548 else: 1549 os.mkdir(foo) 1550 1551 tar = tarfile.open(tmpname, self.mode) 1552 try: 1553 tar.add(foo, arcname=path) 1554 finally: 1555 tar.close() 1556 1557 tar = tarfile.open(tmpname, "r") 1558 try: 1559 t = tar.next() 1560 finally: 1561 tar.close() 1562 1563 if not dir: 1564 os_helper.unlink(foo) 1565 else: 1566 os_helper.rmdir(foo) 1567 1568 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1569 1570 1571 @os_helper.skip_unless_symlink 1572 def test_extractall_symlinks(self): 1573 # Test if extractall works properly when tarfile contains symlinks 1574 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1575 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1576 os.mkdir(tempdir) 1577 try: 1578 source_file = os.path.join(tempdir,'source') 1579 target_file = os.path.join(tempdir,'symlink') 1580 with open(source_file,'w') as f: 1581 f.write('something\n') 1582 os.symlink(source_file, target_file) 1583 with tarfile.open(temparchive, 'w') as tar: 1584 tar.add(source_file, arcname="source") 1585 tar.add(target_file, arcname="symlink") 1586 # Let's extract it to the location which contains the symlink 1587 with tarfile.open(temparchive, errorlevel=2) as tar: 1588 # this should not raise OSError: [Errno 17] File exists 1589 try: 1590 tar.extractall(path=tempdir, 1591 filter='fully_trusted') 1592 except OSError: 1593 self.fail("extractall failed with symlinked files") 1594 finally: 1595 os_helper.unlink(temparchive) 1596 os_helper.rmtree(tempdir) 1597 1598 def test_pathnames(self): 1599 self._test_pathname("foo") 1600 self._test_pathname(os.path.join("foo", ".", "bar")) 1601 self._test_pathname(os.path.join("foo", "..", "bar")) 1602 self._test_pathname(os.path.join(".", "foo")) 1603 self._test_pathname(os.path.join(".", "foo", ".")) 1604 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1605 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1606 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1607 self._test_pathname(os.path.join("..", "foo")) 1608 self._test_pathname(os.path.join("..", "foo", "..")) 1609 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1610 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1611 1612 self._test_pathname("foo" + os.sep + os.sep + "bar") 1613 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1614 1615 def test_abs_pathnames(self): 1616 if sys.platform == "win32": 1617 self._test_pathname("C:\\foo", "foo") 1618 else: 1619 self._test_pathname("/foo", "foo") 1620 self._test_pathname("///foo", "foo") 1621 1622 def test_cwd(self): 1623 # Test adding the current working directory. 1624 with os_helper.change_cwd(TEMPDIR): 1625 tar = tarfile.open(tmpname, self.mode) 1626 try: 1627 tar.add(".") 1628 finally: 1629 tar.close() 1630 1631 tar = tarfile.open(tmpname, "r") 1632 try: 1633 for t in tar: 1634 if t.name != ".": 1635 self.assertTrue(t.name.startswith("./"), t.name) 1636 finally: 1637 tar.close() 1638 1639 def test_open_nonwritable_fileobj(self): 1640 for exctype in OSError, EOFError, RuntimeError: 1641 class BadFile(io.BytesIO): 1642 first = True 1643 def write(self, data): 1644 if self.first: 1645 self.first = False 1646 raise exctype 1647 1648 f = BadFile() 1649 with self.assertRaises(exctype): 1650 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1651 format=tarfile.PAX_FORMAT, 1652 pax_headers={'non': 'empty'}) 1653 self.assertFalse(f.closed) 1654 1655 def test_missing_fileobj(self): 1656 with tarfile.open(tmpname, self.mode) as tar: 1657 tarinfo = tar.gettarinfo(tarname) 1658 with self.assertRaises(ValueError): 1659 tar.addfile(tarinfo) 1660 1661 1662class GzipWriteTest(GzipTest, WriteTest): 1663 pass 1664 1665 1666class Bz2WriteTest(Bz2Test, WriteTest): 1667 pass 1668 1669 1670class LzmaWriteTest(LzmaTest, WriteTest): 1671 pass 1672 1673 1674class StreamWriteTest(WriteTestBase, unittest.TestCase): 1675 1676 prefix = "w|" 1677 decompressor = None 1678 1679 def test_stream_padding(self): 1680 # Test for bug #1543303. 1681 tar = tarfile.open(tmpname, self.mode) 1682 tar.close() 1683 if self.decompressor: 1684 dec = self.decompressor() 1685 with open(tmpname, "rb") as fobj: 1686 data = fobj.read() 1687 data = dec.decompress(data) 1688 self.assertFalse(dec.unused_data, "found trailing data") 1689 else: 1690 with self.open(tmpname) as fobj: 1691 data = fobj.read() 1692 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1693 "incorrect zero padding") 1694 1695 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1696 "Missing umask implementation") 1697 @unittest.skipIf( 1698 support.is_emscripten or support.is_wasi, 1699 "Emscripten's/WASI's umask is a stub." 1700 ) 1701 def test_file_mode(self): 1702 # Test for issue #8464: Create files with correct 1703 # permissions. 1704 if os.path.exists(tmpname): 1705 os_helper.unlink(tmpname) 1706 1707 original_umask = os.umask(0o022) 1708 try: 1709 tar = tarfile.open(tmpname, self.mode) 1710 tar.close() 1711 mode = os.stat(tmpname).st_mode & 0o777 1712 self.assertEqual(mode, 0o644, "wrong file permissions") 1713 finally: 1714 os.umask(original_umask) 1715 1716 1717class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1718 def test_source_directory_not_leaked(self): 1719 """ 1720 Ensure the source directory is not included in the tar header 1721 per bpo-41316. 1722 """ 1723 tarfile.open(tmpname, self.mode).close() 1724 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1725 assert os.path.dirname(tmpname) not in payload 1726 1727 1728class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1729 decompressor = bz2.BZ2Decompressor if bz2 else None 1730 1731class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1732 decompressor = lzma.LZMADecompressor if lzma else None 1733 1734class _CompressedWriteTest(TarTest): 1735 # This is not actually a standalone test. 1736 # It does not inherit WriteTest because it only makes sense with gz,bz2 1737 source = (b"And we move to Bristol where they have a special, " + 1738 b"Very Silly candidate") 1739 1740 def _compressed_tar(self, compresslevel): 1741 fobj = io.BytesIO() 1742 with tarfile.open(tmpname, self.mode, fobj, 1743 compresslevel=compresslevel) as tarfl: 1744 tarfl.addfile(tarfile.TarInfo("foo"), io.BytesIO(self.source)) 1745 return fobj 1746 1747 def _test_bz2_header(self, compresslevel): 1748 fobj = self._compressed_tar(compresslevel) 1749 self.assertEqual(fobj.getvalue()[0:10], 1750 b"BZh%d1AY&SY" % compresslevel) 1751 1752 def _test_gz_header(self, compresslevel): 1753 fobj = self._compressed_tar(compresslevel) 1754 self.assertEqual(fobj.getvalue()[:3], b"\x1f\x8b\x08") 1755 1756class Bz2CompressWriteTest(Bz2Test, _CompressedWriteTest, unittest.TestCase): 1757 prefix = "w:" 1758 def test_compression_levels(self): 1759 self._test_bz2_header(1) 1760 self._test_bz2_header(5) 1761 self._test_bz2_header(9) 1762 1763class Bz2CompressStreamWriteTest(Bz2Test, _CompressedWriteTest, 1764 unittest.TestCase): 1765 prefix = "w|" 1766 def test_compression_levels(self): 1767 self._test_bz2_header(1) 1768 self._test_bz2_header(5) 1769 self._test_bz2_header(9) 1770 1771class GzCompressWriteTest(GzipTest, _CompressedWriteTest, unittest.TestCase): 1772 prefix = "w:" 1773 def test_compression_levels(self): 1774 self._test_gz_header(1) 1775 self._test_gz_header(5) 1776 self._test_gz_header(9) 1777 1778class GzCompressStreamWriteTest(GzipTest, _CompressedWriteTest, 1779 unittest.TestCase): 1780 prefix = "w|" 1781 def test_compression_levels(self): 1782 self._test_gz_header(1) 1783 self._test_gz_header(5) 1784 self._test_gz_header(9) 1785 1786class CompressLevelRaises(unittest.TestCase): 1787 def test_compresslevel_wrong_modes(self): 1788 compresslevel = 5 1789 fobj = io.BytesIO() 1790 with self.assertRaises(TypeError): 1791 tarfile.open(tmpname, "w:", fobj, compresslevel=compresslevel) 1792 1793 @support.requires_bz2() 1794 def test_wrong_compresslevels(self): 1795 # BZ2 checks that the compresslevel is in [1,9]. gz does not 1796 fobj = io.BytesIO() 1797 with self.assertRaises(ValueError): 1798 tarfile.open(tmpname, "w:bz2", fobj, compresslevel=0) 1799 with self.assertRaises(ValueError): 1800 tarfile.open(tmpname, "w:bz2", fobj, compresslevel=10) 1801 with self.assertRaises(ValueError): 1802 tarfile.open(tmpname, "w|bz2", fobj, compresslevel=10) 1803 1804class GNUWriteTest(unittest.TestCase): 1805 # This testcase checks for correct creation of GNU Longname 1806 # and Longlink extended headers (cp. bug #812325). 1807 1808 def _length(self, s): 1809 blocks = len(s) // 512 + 1 1810 return blocks * 512 1811 1812 def _calc_size(self, name, link=None): 1813 # Initial tar header 1814 count = 512 1815 1816 if len(name) > tarfile.LENGTH_NAME: 1817 # GNU longname extended header + longname 1818 count += 512 1819 count += self._length(name) 1820 if link is not None and len(link) > tarfile.LENGTH_LINK: 1821 # GNU longlink extended header + longlink 1822 count += 512 1823 count += self._length(link) 1824 return count 1825 1826 def _test(self, name, link=None): 1827 tarinfo = tarfile.TarInfo(name) 1828 if link: 1829 tarinfo.linkname = link 1830 tarinfo.type = tarfile.LNKTYPE 1831 1832 tar = tarfile.open(tmpname, "w") 1833 try: 1834 tar.format = tarfile.GNU_FORMAT 1835 tar.addfile(tarinfo) 1836 1837 v1 = self._calc_size(name, link) 1838 v2 = tar.offset 1839 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1840 finally: 1841 tar.close() 1842 1843 tar = tarfile.open(tmpname) 1844 try: 1845 member = tar.next() 1846 self.assertIsNotNone(member, 1847 "unable to read longname member") 1848 self.assertEqual(tarinfo.name, member.name, 1849 "unable to read longname member") 1850 self.assertEqual(tarinfo.linkname, member.linkname, 1851 "unable to read longname member") 1852 finally: 1853 tar.close() 1854 1855 def test_longname_1023(self): 1856 self._test(("longnam/" * 127) + "longnam") 1857 1858 def test_longname_1024(self): 1859 self._test(("longnam/" * 127) + "longname") 1860 1861 def test_longname_1025(self): 1862 self._test(("longnam/" * 127) + "longname_") 1863 1864 def test_longlink_1023(self): 1865 self._test("name", ("longlnk/" * 127) + "longlnk") 1866 1867 def test_longlink_1024(self): 1868 self._test("name", ("longlnk/" * 127) + "longlink") 1869 1870 def test_longlink_1025(self): 1871 self._test("name", ("longlnk/" * 127) + "longlink_") 1872 1873 def test_longnamelink_1023(self): 1874 self._test(("longnam/" * 127) + "longnam", 1875 ("longlnk/" * 127) + "longlnk") 1876 1877 def test_longnamelink_1024(self): 1878 self._test(("longnam/" * 127) + "longname", 1879 ("longlnk/" * 127) + "longlink") 1880 1881 def test_longnamelink_1025(self): 1882 self._test(("longnam/" * 127) + "longname_", 1883 ("longlnk/" * 127) + "longlink_") 1884 1885 1886class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1887 1888 prefix = "w:" 1889 1890 def test_headers_written_only_for_device_files(self): 1891 # Regression test for bpo-18819. 1892 tempdir = os.path.join(TEMPDIR, "device_header_test") 1893 os.mkdir(tempdir) 1894 try: 1895 tar = tarfile.open(tmpname, self.mode) 1896 try: 1897 input_blk = tarfile.TarInfo(name="my_block_device") 1898 input_reg = tarfile.TarInfo(name="my_regular_file") 1899 input_blk.type = tarfile.BLKTYPE 1900 input_reg.type = tarfile.REGTYPE 1901 tar.addfile(input_blk) 1902 tar.addfile(input_reg) 1903 finally: 1904 tar.close() 1905 1906 # devmajor and devminor should be *interpreted* as 0 in both... 1907 tar = tarfile.open(tmpname, "r") 1908 try: 1909 output_blk = tar.getmember("my_block_device") 1910 output_reg = tar.getmember("my_regular_file") 1911 finally: 1912 tar.close() 1913 self.assertEqual(output_blk.devmajor, 0) 1914 self.assertEqual(output_blk.devminor, 0) 1915 self.assertEqual(output_reg.devmajor, 0) 1916 self.assertEqual(output_reg.devminor, 0) 1917 1918 # ...but the fields should not actually be set on regular files: 1919 with open(tmpname, "rb") as infile: 1920 buf = infile.read() 1921 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1922 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1923 # See `struct posixheader` in GNU docs for byte offsets: 1924 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1925 device_headers = slice(329, 329 + 16) 1926 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1927 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1928 finally: 1929 os_helper.rmtree(tempdir) 1930 1931 1932class CreateTest(WriteTestBase, unittest.TestCase): 1933 1934 prefix = "x:" 1935 1936 file_path = os.path.join(TEMPDIR, "spameggs42") 1937 1938 def setUp(self): 1939 os_helper.unlink(tmpname) 1940 1941 @classmethod 1942 def setUpClass(cls): 1943 with open(cls.file_path, "wb") as fobj: 1944 fobj.write(b"aaa") 1945 1946 @classmethod 1947 def tearDownClass(cls): 1948 os_helper.unlink(cls.file_path) 1949 1950 def test_create(self): 1951 with tarfile.open(tmpname, self.mode) as tobj: 1952 tobj.add(self.file_path) 1953 1954 with self.taropen(tmpname) as tobj: 1955 names = tobj.getnames() 1956 self.assertEqual(len(names), 1) 1957 self.assertIn('spameggs42', names[0]) 1958 1959 def test_create_existing(self): 1960 with tarfile.open(tmpname, self.mode) as tobj: 1961 tobj.add(self.file_path) 1962 1963 with self.assertRaises(FileExistsError): 1964 tobj = tarfile.open(tmpname, self.mode) 1965 1966 with self.taropen(tmpname) as tobj: 1967 names = tobj.getnames() 1968 self.assertEqual(len(names), 1) 1969 self.assertIn('spameggs42', names[0]) 1970 1971 def test_create_taropen(self): 1972 with self.taropen(tmpname, "x") as tobj: 1973 tobj.add(self.file_path) 1974 1975 with self.taropen(tmpname) as tobj: 1976 names = tobj.getnames() 1977 self.assertEqual(len(names), 1) 1978 self.assertIn('spameggs42', names[0]) 1979 1980 def test_create_existing_taropen(self): 1981 with self.taropen(tmpname, "x") as tobj: 1982 tobj.add(self.file_path) 1983 1984 with self.assertRaises(FileExistsError): 1985 with self.taropen(tmpname, "x"): 1986 pass 1987 1988 with self.taropen(tmpname) as tobj: 1989 names = tobj.getnames() 1990 self.assertEqual(len(names), 1) 1991 self.assertIn("spameggs42", names[0]) 1992 1993 def test_create_pathlike_name(self): 1994 with tarfile.open(os_helper.FakePath(tmpname), self.mode) as tobj: 1995 self.assertIsInstance(tobj.name, str) 1996 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1997 tobj.add(os_helper.FakePath(self.file_path)) 1998 names = tobj.getnames() 1999 self.assertEqual(len(names), 1) 2000 self.assertIn('spameggs42', names[0]) 2001 2002 with self.taropen(tmpname) as tobj: 2003 names = tobj.getnames() 2004 self.assertEqual(len(names), 1) 2005 self.assertIn('spameggs42', names[0]) 2006 2007 def test_create_taropen_pathlike_name(self): 2008 with self.taropen(os_helper.FakePath(tmpname), "x") as tobj: 2009 self.assertIsInstance(tobj.name, str) 2010 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 2011 tobj.add(os_helper.FakePath(self.file_path)) 2012 names = tobj.getnames() 2013 self.assertEqual(len(names), 1) 2014 self.assertIn('spameggs42', names[0]) 2015 2016 with self.taropen(tmpname) as tobj: 2017 names = tobj.getnames() 2018 self.assertEqual(len(names), 1) 2019 self.assertIn('spameggs42', names[0]) 2020 2021 2022class GzipCreateTest(GzipTest, CreateTest): 2023 2024 def test_create_with_compresslevel(self): 2025 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 2026 tobj.add(self.file_path) 2027 with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj: 2028 pass 2029 2030 2031class Bz2CreateTest(Bz2Test, CreateTest): 2032 2033 def test_create_with_compresslevel(self): 2034 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 2035 tobj.add(self.file_path) 2036 with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj: 2037 pass 2038 2039 2040class LzmaCreateTest(LzmaTest, CreateTest): 2041 2042 # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel. 2043 # It does not allow for preset to be specified when reading. 2044 def test_create_with_preset(self): 2045 with tarfile.open(tmpname, self.mode, preset=1) as tobj: 2046 tobj.add(self.file_path) 2047 2048 2049class CreateWithXModeTest(CreateTest): 2050 2051 prefix = "x" 2052 2053 test_create_taropen = None 2054 test_create_existing_taropen = None 2055 2056 2057@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 2058class HardlinkTest(unittest.TestCase): 2059 # Test the creation of LNKTYPE (hardlink) members in an archive. 2060 2061 def setUp(self): 2062 self.foo = os.path.join(TEMPDIR, "foo") 2063 self.bar = os.path.join(TEMPDIR, "bar") 2064 2065 with open(self.foo, "wb") as fobj: 2066 fobj.write(b"foo") 2067 2068 try: 2069 os.link(self.foo, self.bar) 2070 except PermissionError as e: 2071 self.skipTest('os.link(): %s' % e) 2072 2073 self.tar = tarfile.open(tmpname, "w") 2074 self.tar.add(self.foo) 2075 2076 def tearDown(self): 2077 self.tar.close() 2078 os_helper.unlink(self.foo) 2079 os_helper.unlink(self.bar) 2080 2081 def test_add_twice(self): 2082 # The same name will be added as a REGTYPE every 2083 # time regardless of st_nlink. 2084 tarinfo = self.tar.gettarinfo(self.foo) 2085 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 2086 "add file as regular failed") 2087 2088 def test_add_hardlink(self): 2089 tarinfo = self.tar.gettarinfo(self.bar) 2090 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 2091 "add file as hardlink failed") 2092 2093 def test_dereference_hardlink(self): 2094 self.tar.dereference = True 2095 tarinfo = self.tar.gettarinfo(self.bar) 2096 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 2097 "dereferencing hardlink failed") 2098 2099 2100class PaxWriteTest(GNUWriteTest): 2101 2102 def _test(self, name, link=None): 2103 # See GNUWriteTest. 2104 tarinfo = tarfile.TarInfo(name) 2105 if link: 2106 tarinfo.linkname = link 2107 tarinfo.type = tarfile.LNKTYPE 2108 2109 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 2110 try: 2111 tar.addfile(tarinfo) 2112 finally: 2113 tar.close() 2114 2115 tar = tarfile.open(tmpname) 2116 try: 2117 if link: 2118 l = tar.getmembers()[0].linkname 2119 self.assertEqual(link, l, "PAX longlink creation failed") 2120 else: 2121 n = tar.getmembers()[0].name 2122 self.assertEqual(name, n, "PAX longname creation failed") 2123 finally: 2124 tar.close() 2125 2126 def test_pax_global_header(self): 2127 pax_headers = { 2128 "foo": "bar", 2129 "uid": "0", 2130 "mtime": "1.23", 2131 "test": "\xe4\xf6\xfc", 2132 "\xe4\xf6\xfc": "test"} 2133 2134 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 2135 pax_headers=pax_headers) 2136 try: 2137 tar.addfile(tarfile.TarInfo("test")) 2138 finally: 2139 tar.close() 2140 2141 # Test if the global header was written correctly. 2142 tar = tarfile.open(tmpname, encoding="iso8859-1") 2143 try: 2144 self.assertEqual(tar.pax_headers, pax_headers) 2145 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 2146 # Test if all the fields are strings. 2147 for key, val in tar.pax_headers.items(): 2148 self.assertIsNot(type(key), bytes) 2149 self.assertIsNot(type(val), bytes) 2150 if key in tarfile.PAX_NUMBER_FIELDS: 2151 try: 2152 tarfile.PAX_NUMBER_FIELDS[key](val) 2153 except (TypeError, ValueError): 2154 self.fail("unable to convert pax header field") 2155 finally: 2156 tar.close() 2157 2158 def test_pax_extended_header(self): 2159 # The fields from the pax header have priority over the 2160 # TarInfo. 2161 pax_headers = {"path": "foo", "uid": "123"} 2162 2163 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 2164 encoding="iso8859-1") 2165 try: 2166 t = tarfile.TarInfo() 2167 t.name = "\xe4\xf6\xfc" # non-ASCII 2168 t.uid = 8**8 # too large 2169 t.pax_headers = pax_headers 2170 tar.addfile(t) 2171 finally: 2172 tar.close() 2173 2174 tar = tarfile.open(tmpname, encoding="iso8859-1") 2175 try: 2176 t = tar.getmembers()[0] 2177 self.assertEqual(t.pax_headers, pax_headers) 2178 self.assertEqual(t.name, "foo") 2179 self.assertEqual(t.uid, 123) 2180 finally: 2181 tar.close() 2182 2183 def test_create_pax_header(self): 2184 # The ustar header should contain values that can be 2185 # represented reasonably, even if a better (e.g. higher 2186 # precision) version is set in the pax header. 2187 # Issue #45863 2188 2189 # values that should be kept 2190 t = tarfile.TarInfo() 2191 t.name = "foo" 2192 t.mtime = 1000.1 2193 t.size = 100 2194 t.uid = 123 2195 t.gid = 124 2196 info = t.get_info() 2197 header = t.create_pax_header(info, encoding="iso8859-1") 2198 self.assertEqual(info['name'], "foo") 2199 # mtime should be rounded to nearest second 2200 self.assertIsInstance(info['mtime'], int) 2201 self.assertEqual(info['mtime'], 1000) 2202 self.assertEqual(info['size'], 100) 2203 self.assertEqual(info['uid'], 123) 2204 self.assertEqual(info['gid'], 124) 2205 self.assertEqual(header, 2206 b'././@PaxHeader' + bytes(86) \ 2207 + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \ 2208 + bytes(100) + b'ustar\x0000'+ bytes(247) \ 2209 + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \ 2210 + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \ 2211 + bytes(100) + b'ustar\x0000' + bytes(247)) 2212 2213 # values that should be changed 2214 t = tarfile.TarInfo() 2215 t.name = "foo\u3374" # can't be represented in ascii 2216 t.mtime = 10**10 # too big 2217 t.size = 10**10 # too big 2218 t.uid = 8**8 # too big 2219 t.gid = 8**8+1 # too big 2220 info = t.get_info() 2221 header = t.create_pax_header(info, encoding="iso8859-1") 2222 # name is kept as-is in info but should be added to pax header 2223 self.assertEqual(info['name'], "foo\u3374") 2224 self.assertEqual(info['mtime'], 0) 2225 self.assertEqual(info['size'], 0) 2226 self.assertEqual(info['uid'], 0) 2227 self.assertEqual(info['gid'], 0) 2228 self.assertEqual(header, 2229 b'././@PaxHeader' + bytes(86) \ 2230 + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \ 2231 + bytes(100) + b'ustar\x0000' + bytes(247) \ 2232 + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \ 2233 + b'16 gid=16777217\n20 size=10000000000\n' \ 2234 + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \ 2235 + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \ 2236 + bytes(100) + b'ustar\x0000' + bytes(247)) 2237 2238 2239class UnicodeTest: 2240 2241 def test_iso8859_1_filename(self): 2242 self._test_unicode_filename("iso8859-1") 2243 2244 def test_utf7_filename(self): 2245 self._test_unicode_filename("utf7") 2246 2247 def test_utf8_filename(self): 2248 self._test_unicode_filename("utf-8") 2249 2250 def _test_unicode_filename(self, encoding): 2251 tar = tarfile.open(tmpname, "w", format=self.format, 2252 encoding=encoding, errors="strict") 2253 try: 2254 name = "\xe4\xf6\xfc" 2255 tar.addfile(tarfile.TarInfo(name)) 2256 finally: 2257 tar.close() 2258 2259 tar = tarfile.open(tmpname, encoding=encoding) 2260 try: 2261 self.assertEqual(tar.getmembers()[0].name, name) 2262 finally: 2263 tar.close() 2264 2265 def test_unicode_filename_error(self): 2266 tar = tarfile.open(tmpname, "w", format=self.format, 2267 encoding="ascii", errors="strict") 2268 try: 2269 tarinfo = tarfile.TarInfo() 2270 2271 tarinfo.name = "\xe4\xf6\xfc" 2272 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2273 2274 tarinfo.name = "foo" 2275 tarinfo.uname = "\xe4\xf6\xfc" 2276 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2277 finally: 2278 tar.close() 2279 2280 def test_unicode_argument(self): 2281 tar = tarfile.open(tarname, "r", 2282 encoding="iso8859-1", errors="strict") 2283 try: 2284 for t in tar: 2285 self.assertIs(type(t.name), str) 2286 self.assertIs(type(t.linkname), str) 2287 self.assertIs(type(t.uname), str) 2288 self.assertIs(type(t.gname), str) 2289 finally: 2290 tar.close() 2291 2292 def test_uname_unicode(self): 2293 t = tarfile.TarInfo("foo") 2294 t.uname = "\xe4\xf6\xfc" 2295 t.gname = "\xe4\xf6\xfc" 2296 2297 tar = tarfile.open(tmpname, mode="w", format=self.format, 2298 encoding="iso8859-1") 2299 try: 2300 tar.addfile(t) 2301 finally: 2302 tar.close() 2303 2304 tar = tarfile.open(tmpname, encoding="iso8859-1") 2305 try: 2306 t = tar.getmember("foo") 2307 self.assertEqual(t.uname, "\xe4\xf6\xfc") 2308 self.assertEqual(t.gname, "\xe4\xf6\xfc") 2309 2310 if self.format != tarfile.PAX_FORMAT: 2311 tar.close() 2312 tar = tarfile.open(tmpname, encoding="ascii") 2313 t = tar.getmember("foo") 2314 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 2315 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 2316 finally: 2317 tar.close() 2318 2319 2320class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 2321 2322 format = tarfile.USTAR_FORMAT 2323 2324 # Test whether the utf-8 encoded version of a filename exceeds the 100 2325 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 2326 # bytes). 2327 def test_unicode_name1(self): 2328 self._test_ustar_name("0123456789" * 10) 2329 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 2330 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 2331 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 2332 2333 def test_unicode_name2(self): 2334 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 2335 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 2336 2337 # Test whether the utf-8 encoded version of a filename exceeds the 155 2338 # bytes prefix + '/' + 100 bytes name limit. 2339 def test_unicode_longname1(self): 2340 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 2341 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 2342 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 2343 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 2344 2345 def test_unicode_longname2(self): 2346 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 2347 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 2348 2349 def test_unicode_longname3(self): 2350 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 2351 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 2352 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 2353 2354 def test_unicode_longname4(self): 2355 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 2356 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 2357 2358 def _test_ustar_name(self, name, exc=None): 2359 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2360 t = tarfile.TarInfo(name) 2361 if exc is None: 2362 tar.addfile(t) 2363 else: 2364 self.assertRaises(exc, tar.addfile, t) 2365 2366 if exc is None: 2367 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2368 for t in tar: 2369 self.assertEqual(name, t.name) 2370 break 2371 2372 # Test the same as above for the 100 bytes link field. 2373 def test_unicode_link1(self): 2374 self._test_ustar_link("0123456789" * 10) 2375 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 2376 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 2377 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 2378 2379 def test_unicode_link2(self): 2380 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 2381 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 2382 2383 def _test_ustar_link(self, name, exc=None): 2384 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2385 t = tarfile.TarInfo("foo") 2386 t.linkname = name 2387 if exc is None: 2388 tar.addfile(t) 2389 else: 2390 self.assertRaises(exc, tar.addfile, t) 2391 2392 if exc is None: 2393 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2394 for t in tar: 2395 self.assertEqual(name, t.linkname) 2396 break 2397 2398 2399class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2400 2401 format = tarfile.GNU_FORMAT 2402 2403 def test_bad_pax_header(self): 2404 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2405 # without a hdrcharset=BINARY header. 2406 for encoding, name in ( 2407 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2408 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2409 with tarfile.open(tarname, encoding=encoding, 2410 errors="surrogateescape") as tar: 2411 try: 2412 t = tar.getmember(name) 2413 except KeyError: 2414 self.fail("unable to read bad GNU tar pax header") 2415 2416 2417class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2418 2419 format = tarfile.PAX_FORMAT 2420 2421 # PAX_FORMAT ignores encoding in write mode. 2422 test_unicode_filename_error = None 2423 2424 def test_binary_header(self): 2425 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2426 for encoding, name in ( 2427 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2428 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2429 with tarfile.open(tarname, encoding=encoding, 2430 errors="surrogateescape") as tar: 2431 try: 2432 t = tar.getmember(name) 2433 except KeyError: 2434 self.fail("unable to read POSIX.1-2008 binary header") 2435 2436 2437class AppendTestBase: 2438 # Test append mode (cp. patch #1652681). 2439 2440 def setUp(self): 2441 self.tarname = tmpname 2442 if os.path.exists(self.tarname): 2443 os_helper.unlink(self.tarname) 2444 2445 def _create_testtar(self, mode="w:"): 2446 with tarfile.open(tarname, encoding="iso8859-1") as src: 2447 t = src.getmember("ustar/regtype") 2448 t.name = "foo" 2449 with src.extractfile(t) as f: 2450 with tarfile.open(self.tarname, mode) as tar: 2451 tar.addfile(t, f) 2452 2453 def test_append_compressed(self): 2454 self._create_testtar("w:" + self.suffix) 2455 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2456 2457class AppendTest(AppendTestBase, unittest.TestCase): 2458 test_append_compressed = None 2459 2460 def _add_testfile(self, fileobj=None): 2461 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2462 tar.addfile(tarfile.TarInfo("bar")) 2463 2464 def _test(self, names=["bar"], fileobj=None): 2465 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2466 self.assertEqual(tar.getnames(), names) 2467 2468 def test_non_existing(self): 2469 self._add_testfile() 2470 self._test() 2471 2472 def test_empty(self): 2473 tarfile.open(self.tarname, "w:").close() 2474 self._add_testfile() 2475 self._test() 2476 2477 def test_empty_fileobj(self): 2478 fobj = io.BytesIO(b"\0" * 1024) 2479 self._add_testfile(fobj) 2480 fobj.seek(0) 2481 self._test(fileobj=fobj) 2482 2483 def test_fileobj(self): 2484 self._create_testtar() 2485 with open(self.tarname, "rb") as fobj: 2486 data = fobj.read() 2487 fobj = io.BytesIO(data) 2488 self._add_testfile(fobj) 2489 fobj.seek(0) 2490 self._test(names=["foo", "bar"], fileobj=fobj) 2491 2492 def test_existing(self): 2493 self._create_testtar() 2494 self._add_testfile() 2495 self._test(names=["foo", "bar"]) 2496 2497 # Append mode is supposed to fail if the tarfile to append to 2498 # does not end with a zero block. 2499 def _test_error(self, data): 2500 with open(self.tarname, "wb") as fobj: 2501 fobj.write(data) 2502 self.assertRaises(tarfile.ReadError, self._add_testfile) 2503 2504 def test_null(self): 2505 self._test_error(b"") 2506 2507 def test_incomplete(self): 2508 self._test_error(b"\0" * 13) 2509 2510 def test_premature_eof(self): 2511 data = tarfile.TarInfo("foo").tobuf() 2512 self._test_error(data) 2513 2514 def test_trailing_garbage(self): 2515 data = tarfile.TarInfo("foo").tobuf() 2516 self._test_error(data + b"\0" * 13) 2517 2518 def test_invalid(self): 2519 self._test_error(b"a" * 512) 2520 2521class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2522 pass 2523 2524class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2525 pass 2526 2527class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2528 pass 2529 2530 2531class LimitsTest(unittest.TestCase): 2532 2533 def test_ustar_limits(self): 2534 # 100 char name 2535 tarinfo = tarfile.TarInfo("0123456789" * 10) 2536 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2537 2538 # 101 char name that cannot be stored 2539 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2540 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2541 2542 # 256 char name with a slash at pos 156 2543 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2544 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2545 2546 # 256 char name that cannot be stored 2547 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2548 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2549 2550 # 512 char name 2551 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2552 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2553 2554 # 512 char linkname 2555 tarinfo = tarfile.TarInfo("longlink") 2556 tarinfo.linkname = "123/" * 126 + "longname" 2557 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2558 2559 # uid > 8 digits 2560 tarinfo = tarfile.TarInfo("name") 2561 tarinfo.uid = 0o10000000 2562 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2563 2564 def test_gnu_limits(self): 2565 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2566 tarinfo.tobuf(tarfile.GNU_FORMAT) 2567 2568 tarinfo = tarfile.TarInfo("longlink") 2569 tarinfo.linkname = "123/" * 126 + "longname" 2570 tarinfo.tobuf(tarfile.GNU_FORMAT) 2571 2572 # uid >= 256 ** 7 2573 tarinfo = tarfile.TarInfo("name") 2574 tarinfo.uid = 0o4000000000000000000 2575 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2576 2577 def test_pax_limits(self): 2578 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2579 tarinfo.tobuf(tarfile.PAX_FORMAT) 2580 2581 tarinfo = tarfile.TarInfo("longlink") 2582 tarinfo.linkname = "123/" * 126 + "longname" 2583 tarinfo.tobuf(tarfile.PAX_FORMAT) 2584 2585 tarinfo = tarfile.TarInfo("name") 2586 tarinfo.uid = 0o4000000000000000000 2587 tarinfo.tobuf(tarfile.PAX_FORMAT) 2588 2589 2590class MiscTest(unittest.TestCase): 2591 2592 def test_char_fields(self): 2593 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2594 b"foo\0\0\0\0\0") 2595 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2596 b"foo") 2597 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2598 "foo") 2599 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2600 "foo") 2601 2602 def test_read_number_fields(self): 2603 # Issue 13158: Test if GNU tar specific base-256 number fields 2604 # are decoded correctly. 2605 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2606 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2607 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2608 0o10000000) 2609 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2610 0xffffffff) 2611 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2612 -1) 2613 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2614 -100) 2615 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2616 -0x100000000000000) 2617 2618 # Issue 24514: Test if empty number fields are converted to zero. 2619 self.assertEqual(tarfile.nti(b"\0"), 0) 2620 self.assertEqual(tarfile.nti(b" \0"), 0) 2621 2622 def test_write_number_fields(self): 2623 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2624 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2625 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2626 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2627 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2628 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2629 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2630 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2631 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2632 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2633 self.assertEqual(tarfile.itn(-0x100000000000000, 2634 format=tarfile.GNU_FORMAT), 2635 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2636 2637 # Issue 32713: Test if itn() supports float values outside the 2638 # non-GNU format range 2639 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2640 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2641 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2642 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2643 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2644 2645 def test_number_field_limits(self): 2646 with self.assertRaises(ValueError): 2647 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2648 with self.assertRaises(ValueError): 2649 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2650 with self.assertRaises(ValueError): 2651 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2652 with self.assertRaises(ValueError): 2653 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2654 2655 def test__all__(self): 2656 not_exported = { 2657 'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE', 2658 'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME', 2659 'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2660 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE', 2661 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE', 2662 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES', 2663 'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS', 2664 'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 2665 'copyfileobj', 'filemode', 'EmptyHeaderError', 2666 'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError', 2667 'SubsequentHeaderError', 'ExFileObject', 'main'} 2668 support.check__all__(self, tarfile, not_exported=not_exported) 2669 2670 def test_useful_error_message_when_modules_missing(self): 2671 fname = os.path.join(os.path.dirname(__file__), 'archivetestdata', 'testtar.tar.xz') 2672 with self.assertRaises(tarfile.ReadError) as excinfo: 2673 error = tarfile.CompressionError('lzma module is not available'), 2674 with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error): 2675 tarfile.open(fname) 2676 2677 self.assertIn( 2678 "\n- method xz: CompressionError('lzma module is not available')\n", 2679 str(excinfo.exception), 2680 ) 2681 2682 2683class CommandLineTest(unittest.TestCase): 2684 2685 def tarfilecmd(self, *args, **kwargs): 2686 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2687 **kwargs) 2688 return out.replace(os.linesep.encode(), b'\n') 2689 2690 def tarfilecmd_failure(self, *args): 2691 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2692 2693 def make_simple_tarfile(self, tar_name): 2694 files = [support.findfile('tokenize_tests.txt', 2695 subdir='tokenizedata'), 2696 support.findfile('tokenize_tests-no-coding-cookie-' 2697 'and-utf8-bom-sig-only.txt', 2698 subdir='tokenizedata')] 2699 self.addCleanup(os_helper.unlink, tar_name) 2700 with tarfile.open(tar_name, 'w') as tf: 2701 for tardata in files: 2702 tf.add(tardata, arcname=os.path.basename(tardata)) 2703 2704 def make_evil_tarfile(self, tar_name): 2705 self.addCleanup(os_helper.unlink, tar_name) 2706 with tarfile.open(tar_name, 'w') as tf: 2707 benign = tarfile.TarInfo('benign') 2708 tf.addfile(benign, fileobj=io.BytesIO(b'')) 2709 evil = tarfile.TarInfo('../evil') 2710 tf.addfile(evil, fileobj=io.BytesIO(b'')) 2711 2712 def test_bad_use(self): 2713 rc, out, err = self.tarfilecmd_failure() 2714 self.assertEqual(out, b'') 2715 self.assertIn(b'usage', err.lower()) 2716 self.assertIn(b'error', err.lower()) 2717 self.assertIn(b'required', err.lower()) 2718 rc, out, err = self.tarfilecmd_failure('-l', '') 2719 self.assertEqual(out, b'') 2720 self.assertNotEqual(err.strip(), b'') 2721 2722 def test_test_command(self): 2723 for tar_name in testtarnames: 2724 for opt in '-t', '--test': 2725 out = self.tarfilecmd(opt, tar_name) 2726 self.assertEqual(out, b'') 2727 2728 def test_test_command_verbose(self): 2729 for tar_name in testtarnames: 2730 for opt in '-v', '--verbose': 2731 out = self.tarfilecmd(opt, '-t', tar_name, 2732 PYTHONIOENCODING='utf-8') 2733 self.assertIn(b'is a tar archive.\n', out) 2734 2735 def test_test_command_invalid_file(self): 2736 zipname = support.findfile('zipdir.zip', subdir='archivetestdata') 2737 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2738 self.assertIn(b' is not a tar archive.', err) 2739 self.assertEqual(out, b'') 2740 self.assertEqual(rc, 1) 2741 2742 for tar_name in testtarnames: 2743 with self.subTest(tar_name=tar_name): 2744 with open(tar_name, 'rb') as f: 2745 data = f.read() 2746 try: 2747 with open(tmpname, 'wb') as f: 2748 f.write(data[:511]) 2749 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2750 self.assertEqual(out, b'') 2751 self.assertEqual(rc, 1) 2752 finally: 2753 os_helper.unlink(tmpname) 2754 2755 def test_list_command(self): 2756 for tar_name in testtarnames: 2757 with support.captured_stdout() as t: 2758 with tarfile.open(tar_name, 'r') as tf: 2759 tf.list(verbose=False) 2760 expected = t.getvalue().encode('ascii', 'backslashreplace') 2761 for opt in '-l', '--list': 2762 out = self.tarfilecmd(opt, tar_name, 2763 PYTHONIOENCODING='ascii') 2764 self.assertEqual(out, expected) 2765 2766 def test_list_command_verbose(self): 2767 for tar_name in testtarnames: 2768 with support.captured_stdout() as t: 2769 with tarfile.open(tar_name, 'r') as tf: 2770 tf.list(verbose=True) 2771 expected = t.getvalue().encode('ascii', 'backslashreplace') 2772 for opt in '-v', '--verbose': 2773 out = self.tarfilecmd(opt, '-l', tar_name, 2774 PYTHONIOENCODING='ascii') 2775 self.assertEqual(out, expected) 2776 2777 def test_list_command_invalid_file(self): 2778 zipname = support.findfile('zipdir.zip', subdir='archivetestdata') 2779 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2780 self.assertIn(b' is not a tar archive.', err) 2781 self.assertEqual(out, b'') 2782 self.assertEqual(rc, 1) 2783 2784 def test_create_command(self): 2785 files = [support.findfile('tokenize_tests.txt', 2786 subdir='tokenizedata'), 2787 support.findfile('tokenize_tests-no-coding-cookie-' 2788 'and-utf8-bom-sig-only.txt', 2789 subdir='tokenizedata')] 2790 for opt in '-c', '--create': 2791 try: 2792 out = self.tarfilecmd(opt, tmpname, *files) 2793 self.assertEqual(out, b'') 2794 with tarfile.open(tmpname) as tar: 2795 tar.getmembers() 2796 finally: 2797 os_helper.unlink(tmpname) 2798 2799 def test_create_command_verbose(self): 2800 files = [support.findfile('tokenize_tests.txt', 2801 subdir='tokenizedata'), 2802 support.findfile('tokenize_tests-no-coding-cookie-' 2803 'and-utf8-bom-sig-only.txt', 2804 subdir='tokenizedata')] 2805 for opt in '-v', '--verbose': 2806 try: 2807 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2808 PYTHONIOENCODING='utf-8') 2809 self.assertIn(b' file created.', out) 2810 with tarfile.open(tmpname) as tar: 2811 tar.getmembers() 2812 finally: 2813 os_helper.unlink(tmpname) 2814 2815 def test_create_command_dotless_filename(self): 2816 files = [support.findfile('tokenize_tests.txt', subdir='tokenizedata')] 2817 try: 2818 out = self.tarfilecmd('-c', dotlessname, *files) 2819 self.assertEqual(out, b'') 2820 with tarfile.open(dotlessname) as tar: 2821 tar.getmembers() 2822 finally: 2823 os_helper.unlink(dotlessname) 2824 2825 def test_create_command_dot_started_filename(self): 2826 tar_name = os.path.join(TEMPDIR, ".testtar") 2827 files = [support.findfile('tokenize_tests.txt', subdir='tokenizedata')] 2828 try: 2829 out = self.tarfilecmd('-c', tar_name, *files) 2830 self.assertEqual(out, b'') 2831 with tarfile.open(tar_name) as tar: 2832 tar.getmembers() 2833 finally: 2834 os_helper.unlink(tar_name) 2835 2836 def test_create_command_compressed(self): 2837 files = [support.findfile('tokenize_tests.txt', 2838 subdir='tokenizedata'), 2839 support.findfile('tokenize_tests-no-coding-cookie-' 2840 'and-utf8-bom-sig-only.txt', 2841 subdir='tokenizedata')] 2842 for filetype in (GzipTest, Bz2Test, LzmaTest): 2843 if not filetype.open: 2844 continue 2845 try: 2846 tar_name = tmpname + '.' + filetype.suffix 2847 out = self.tarfilecmd('-c', tar_name, *files) 2848 with filetype.taropen(tar_name) as tar: 2849 tar.getmembers() 2850 finally: 2851 os_helper.unlink(tar_name) 2852 2853 def test_extract_command(self): 2854 self.make_simple_tarfile(tmpname) 2855 for opt in '-e', '--extract': 2856 try: 2857 with os_helper.temp_cwd(tarextdir): 2858 out = self.tarfilecmd(opt, tmpname) 2859 self.assertEqual(out, b'') 2860 finally: 2861 os_helper.rmtree(tarextdir) 2862 2863 def test_extract_command_verbose(self): 2864 self.make_simple_tarfile(tmpname) 2865 for opt in '-v', '--verbose': 2866 try: 2867 with os_helper.temp_cwd(tarextdir): 2868 out = self.tarfilecmd(opt, '-e', tmpname, 2869 PYTHONIOENCODING='utf-8') 2870 self.assertIn(b' file is extracted.', out) 2871 finally: 2872 os_helper.rmtree(tarextdir) 2873 2874 def test_extract_command_filter(self): 2875 self.make_evil_tarfile(tmpname) 2876 # Make an inner directory, so the member named '../evil' 2877 # is still extracted into `tarextdir` 2878 destdir = os.path.join(tarextdir, 'dest') 2879 os.mkdir(tarextdir) 2880 try: 2881 with os_helper.temp_cwd(destdir): 2882 self.tarfilecmd_failure('-e', tmpname, 2883 '-v', 2884 '--filter', 'data') 2885 out = self.tarfilecmd('-e', tmpname, 2886 '-v', 2887 '--filter', 'fully_trusted', 2888 PYTHONIOENCODING='utf-8') 2889 self.assertIn(b' file is extracted.', out) 2890 finally: 2891 os_helper.rmtree(tarextdir) 2892 2893 def test_extract_command_different_directory(self): 2894 self.make_simple_tarfile(tmpname) 2895 try: 2896 with os_helper.temp_cwd(tarextdir): 2897 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2898 self.assertEqual(out, b'') 2899 finally: 2900 os_helper.rmtree(tarextdir) 2901 2902 def test_extract_command_invalid_file(self): 2903 zipname = support.findfile('zipdir.zip', subdir='archivetestdata') 2904 with os_helper.temp_cwd(tarextdir): 2905 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2906 self.assertIn(b' is not a tar archive.', err) 2907 self.assertEqual(out, b'') 2908 self.assertEqual(rc, 1) 2909 2910 2911class ContextManagerTest(unittest.TestCase): 2912 2913 def test_basic(self): 2914 with tarfile.open(tarname) as tar: 2915 self.assertFalse(tar.closed, "closed inside runtime context") 2916 self.assertTrue(tar.closed, "context manager failed") 2917 2918 def test_closed(self): 2919 # The __enter__() method is supposed to raise OSError 2920 # if the TarFile object is already closed. 2921 tar = tarfile.open(tarname) 2922 tar.close() 2923 with self.assertRaises(OSError): 2924 with tar: 2925 pass 2926 2927 def test_exception(self): 2928 # Test if the OSError exception is passed through properly. 2929 with self.assertRaises(Exception) as exc: 2930 with tarfile.open(tarname) as tar: 2931 raise OSError 2932 self.assertIsInstance(exc.exception, OSError, 2933 "wrong exception raised in context manager") 2934 self.assertTrue(tar.closed, "context manager failed") 2935 2936 def test_no_eof(self): 2937 # __exit__() must not write end-of-archive blocks if an 2938 # exception was raised. 2939 try: 2940 with tarfile.open(tmpname, "w") as tar: 2941 raise Exception 2942 except: 2943 pass 2944 self.assertEqual(os.path.getsize(tmpname), 0, 2945 "context manager wrote an end-of-archive block") 2946 self.assertTrue(tar.closed, "context manager failed") 2947 2948 def test_eof(self): 2949 # __exit__() must write end-of-archive blocks, i.e. call 2950 # TarFile.close() if there was no error. 2951 with tarfile.open(tmpname, "w"): 2952 pass 2953 self.assertNotEqual(os.path.getsize(tmpname), 0, 2954 "context manager wrote no end-of-archive block") 2955 2956 def test_fileobj(self): 2957 # Test that __exit__() did not close the external file 2958 # object. 2959 with open(tmpname, "wb") as fobj: 2960 try: 2961 with tarfile.open(fileobj=fobj, mode="w") as tar: 2962 raise Exception 2963 except: 2964 pass 2965 self.assertFalse(fobj.closed, "external file object was closed") 2966 self.assertTrue(tar.closed, "context manager failed") 2967 2968 2969@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2970class LinkEmulationTest(ReadTest, unittest.TestCase): 2971 2972 # Test for issue #8741 regression. On platforms that do not support 2973 # symbolic or hard links tarfile tries to extract these types of members 2974 # as the regular files they point to. 2975 def _test_link_extraction(self, name): 2976 self.tar.extract(name, TEMPDIR, filter='fully_trusted') 2977 with open(os.path.join(TEMPDIR, name), "rb") as f: 2978 data = f.read() 2979 self.assertEqual(sha256sum(data), sha256_regtype) 2980 2981 # See issues #1578269, #8879, and #17689 for some history on these skips 2982 @unittest.skipIf(hasattr(os.path, "islink"), 2983 "Skip emulation - has os.path.islink but not os.link") 2984 def test_hardlink_extraction1(self): 2985 self._test_link_extraction("ustar/lnktype") 2986 2987 @unittest.skipIf(hasattr(os.path, "islink"), 2988 "Skip emulation - has os.path.islink but not os.link") 2989 def test_hardlink_extraction2(self): 2990 self._test_link_extraction("./ustar/linktest2/lnktype") 2991 2992 @unittest.skipIf(hasattr(os, "symlink"), 2993 "Skip emulation if symlink exists") 2994 def test_symlink_extraction1(self): 2995 self._test_link_extraction("ustar/symtype") 2996 2997 @unittest.skipIf(hasattr(os, "symlink"), 2998 "Skip emulation if symlink exists") 2999 def test_symlink_extraction2(self): 3000 self._test_link_extraction("./ustar/linktest2/symtype") 3001 3002 3003class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 3004 # Issue5068: The _BZ2Proxy.read() method loops forever 3005 # on an empty or partial bzipped file. 3006 3007 def _test_partial_input(self, mode): 3008 class MyBytesIO(io.BytesIO): 3009 hit_eof = False 3010 def read(self, n): 3011 if self.hit_eof: 3012 raise AssertionError("infinite loop detected in " 3013 "tarfile.open()") 3014 self.hit_eof = self.tell() == len(self.getvalue()) 3015 return super(MyBytesIO, self).read(n) 3016 def seek(self, *args): 3017 self.hit_eof = False 3018 return super(MyBytesIO, self).seek(*args) 3019 3020 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 3021 for x in range(len(data) + 1): 3022 try: 3023 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 3024 except tarfile.ReadError: 3025 pass # we have no interest in ReadErrors 3026 3027 def test_partial_input(self): 3028 self._test_partial_input("r") 3029 3030 def test_partial_input_bz2(self): 3031 self._test_partial_input("r:bz2") 3032 3033 3034def root_is_uid_gid_0(): 3035 try: 3036 import pwd, grp 3037 except ImportError: 3038 return False 3039 if pwd.getpwuid(0)[0] != 'root': 3040 return False 3041 if grp.getgrgid(0)[0] != 'root': 3042 return False 3043 return True 3044 3045 3046@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 3047@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 3048class NumericOwnerTest(unittest.TestCase): 3049 # mock the following: 3050 # os.chown: so we can test what's being called 3051 # os.chmod: so the modes are not actually changed. if they are, we can't 3052 # delete the files/directories 3053 # os.geteuid: so we can lie and say we're root (uid = 0) 3054 3055 @staticmethod 3056 def _make_test_archive(filename_1, dirname_1, filename_2): 3057 # the file contents to write 3058 fobj = io.BytesIO(b"content") 3059 3060 # create a tar file with a file, a directory, and a file within that 3061 # directory. Assign various .uid/.gid values to them 3062 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 3063 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 3064 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 3065 ] 3066 with tarfile.open(tmpname, 'w') as tarfl: 3067 for name, uid, gid, typ, contents in items: 3068 t = tarfile.TarInfo(name) 3069 t.uid = uid 3070 t.gid = gid 3071 t.uname = 'root' 3072 t.gname = 'root' 3073 t.type = typ 3074 tarfl.addfile(t, contents) 3075 3076 # return the full pathname to the tar file 3077 return tmpname 3078 3079 @staticmethod 3080 @contextmanager 3081 def _setup_test(mock_geteuid): 3082 mock_geteuid.return_value = 0 # lie and say we're root 3083 fname = 'numeric-owner-testfile' 3084 dirname = 'dir' 3085 3086 # the names we want stored in the tarfile 3087 filename_1 = fname 3088 dirname_1 = dirname 3089 filename_2 = os.path.join(dirname, fname) 3090 3091 # create the tarfile with the contents we're after 3092 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 3093 dirname_1, 3094 filename_2) 3095 3096 # open the tarfile for reading. yield it and the names of the items 3097 # we stored into the file 3098 with tarfile.open(tar_filename) as tarfl: 3099 yield tarfl, filename_1, dirname_1, filename_2 3100 3101 @unittest.mock.patch('os.chown') 3102 @unittest.mock.patch('os.chmod') 3103 @unittest.mock.patch('os.geteuid') 3104 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 3105 mock_chown): 3106 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 3107 filename_2): 3108 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True, 3109 filter='fully_trusted') 3110 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True, 3111 filter='fully_trusted') 3112 3113 # convert to filesystem paths 3114 f_filename_1 = os.path.join(TEMPDIR, filename_1) 3115 f_filename_2 = os.path.join(TEMPDIR, filename_2) 3116 3117 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 3118 unittest.mock.call(f_filename_2, 88, 87), 3119 ], 3120 any_order=True) 3121 3122 @unittest.mock.patch('os.chown') 3123 @unittest.mock.patch('os.chmod') 3124 @unittest.mock.patch('os.geteuid') 3125 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 3126 mock_chown): 3127 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 3128 filename_2): 3129 tarfl.extractall(TEMPDIR, numeric_owner=True, 3130 filter='fully_trusted') 3131 3132 # convert to filesystem paths 3133 f_filename_1 = os.path.join(TEMPDIR, filename_1) 3134 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 3135 f_filename_2 = os.path.join(TEMPDIR, filename_2) 3136 3137 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 3138 unittest.mock.call(f_dirname_1, 77, 76), 3139 unittest.mock.call(f_filename_2, 88, 87), 3140 ], 3141 any_order=True) 3142 3143 # this test requires that uid=0 and gid=0 really be named 'root'. that's 3144 # because the uname and gname in the test file are 'root', and extract() 3145 # will look them up using pwd and grp to find their uid and gid, which we 3146 # test here to be 0. 3147 @unittest.skipUnless(root_is_uid_gid_0(), 3148 'uid=0,gid=0 must be named "root"') 3149 @unittest.mock.patch('os.chown') 3150 @unittest.mock.patch('os.chmod') 3151 @unittest.mock.patch('os.geteuid') 3152 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 3153 mock_chown): 3154 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 3155 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False, 3156 filter='fully_trusted') 3157 3158 # convert to filesystem paths 3159 f_filename_1 = os.path.join(TEMPDIR, filename_1) 3160 3161 mock_chown.assert_called_with(f_filename_1, 0, 0) 3162 3163 @unittest.mock.patch('os.geteuid') 3164 def test_keyword_only(self, mock_geteuid): 3165 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 3166 self.assertRaises(TypeError, 3167 tarfl.extract, filename_1, TEMPDIR, False, True) 3168 3169 3170class ReplaceTests(ReadTest, unittest.TestCase): 3171 def test_replace_name(self): 3172 member = self.tar.getmember('ustar/regtype') 3173 replaced = member.replace(name='misc/other') 3174 self.assertEqual(replaced.name, 'misc/other') 3175 self.assertEqual(member.name, 'ustar/regtype') 3176 self.assertEqual(self.tar.getmember('ustar/regtype').name, 3177 'ustar/regtype') 3178 3179 def test_replace_deep(self): 3180 member = self.tar.getmember('pax/regtype1') 3181 replaced = member.replace() 3182 replaced.pax_headers['gname'] = 'not-bar' 3183 self.assertEqual(member.pax_headers['gname'], 'bar') 3184 self.assertEqual( 3185 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar') 3186 3187 def test_replace_shallow(self): 3188 member = self.tar.getmember('pax/regtype1') 3189 replaced = member.replace(deep=False) 3190 replaced.pax_headers['gname'] = 'not-bar' 3191 self.assertEqual(member.pax_headers['gname'], 'not-bar') 3192 self.assertEqual( 3193 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar') 3194 3195 def test_replace_all(self): 3196 member = self.tar.getmember('ustar/regtype') 3197 for attr_name in ('name', 'mtime', 'mode', 'linkname', 3198 'uid', 'gid', 'uname', 'gname'): 3199 with self.subTest(attr_name=attr_name): 3200 replaced = member.replace(**{attr_name: None}) 3201 self.assertEqual(getattr(replaced, attr_name), None) 3202 self.assertNotEqual(getattr(member, attr_name), None) 3203 3204 def test_replace_internal(self): 3205 member = self.tar.getmember('ustar/regtype') 3206 with self.assertRaises(TypeError): 3207 member.replace(offset=123456789) 3208 3209 3210class NoneInfoExtractTests(ReadTest): 3211 # These mainly check that all kinds of members are extracted successfully 3212 # if some metadata is None. 3213 # Some of the methods do additional spot checks. 3214 3215 # We also test that the default filters can deal with None. 3216 3217 extraction_filter = None 3218 3219 @classmethod 3220 def setUpClass(cls): 3221 tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") 3222 cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl" 3223 tar.errorlevel = 0 3224 with ExitStack() as cm: 3225 if cls.extraction_filter is None: 3226 cm.enter_context(warnings.catch_warnings( 3227 action="ignore", category=DeprecationWarning)) 3228 tar.extractall(cls.control_dir, filter=cls.extraction_filter) 3229 tar.close() 3230 cls.control_paths = set( 3231 p.relative_to(cls.control_dir) 3232 for p in pathlib.Path(cls.control_dir).glob('**/*')) 3233 3234 @classmethod 3235 def tearDownClass(cls): 3236 shutil.rmtree(cls.control_dir) 3237 3238 def check_files_present(self, directory): 3239 got_paths = set( 3240 p.relative_to(directory) 3241 for p in pathlib.Path(directory).glob('**/*')) 3242 self.assertEqual(self.control_paths, got_paths) 3243 3244 @contextmanager 3245 def extract_with_none(self, *attr_names): 3246 DIR = pathlib.Path(TEMPDIR) / "extractall_none" 3247 self.tar.errorlevel = 0 3248 for member in self.tar.getmembers(): 3249 for attr_name in attr_names: 3250 setattr(member, attr_name, None) 3251 with os_helper.temp_dir(DIR): 3252 self.tar.extractall(DIR, filter='fully_trusted') 3253 self.check_files_present(DIR) 3254 yield DIR 3255 3256 def test_extractall_none_mtime(self): 3257 # mtimes of extracted files should be later than 'now' -- the mtime 3258 # of a previously created directory. 3259 now = pathlib.Path(TEMPDIR).stat().st_mtime 3260 with self.extract_with_none('mtime') as DIR: 3261 for path in pathlib.Path(DIR).glob('**/*'): 3262 with self.subTest(path=path): 3263 try: 3264 mtime = path.stat().st_mtime 3265 except OSError: 3266 # Some systems can't stat symlinks, ignore those 3267 if not path.is_symlink(): 3268 raise 3269 else: 3270 self.assertGreaterEqual(path.stat().st_mtime, now) 3271 3272 def test_extractall_none_mode(self): 3273 # modes of directories and regular files should match the mode 3274 # of a "normally" created directory or regular file 3275 dir_mode = pathlib.Path(TEMPDIR).stat().st_mode 3276 regular_file = pathlib.Path(TEMPDIR) / 'regular_file' 3277 regular_file.write_text('') 3278 regular_file_mode = regular_file.stat().st_mode 3279 with self.extract_with_none('mode') as DIR: 3280 for path in pathlib.Path(DIR).glob('**/*'): 3281 with self.subTest(path=path): 3282 if path.is_dir(): 3283 self.assertEqual(path.stat().st_mode, dir_mode) 3284 elif path.is_file(): 3285 self.assertEqual(path.stat().st_mode, 3286 regular_file_mode) 3287 3288 def test_extractall_none_uid(self): 3289 with self.extract_with_none('uid'): 3290 pass 3291 3292 def test_extractall_none_gid(self): 3293 with self.extract_with_none('gid'): 3294 pass 3295 3296 def test_extractall_none_uname(self): 3297 with self.extract_with_none('uname'): 3298 pass 3299 3300 def test_extractall_none_gname(self): 3301 with self.extract_with_none('gname'): 3302 pass 3303 3304 def test_extractall_none_ownership(self): 3305 with self.extract_with_none('uid', 'gid', 'uname', 'gname'): 3306 pass 3307 3308class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase): 3309 extraction_filter = 'data' 3310 3311class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, 3312 unittest.TestCase): 3313 extraction_filter = 'fully_trusted' 3314 3315class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): 3316 extraction_filter = 'tar' 3317 3318class NoneInfoExtractTests_Default(NoneInfoExtractTests, 3319 unittest.TestCase): 3320 extraction_filter = None 3321 3322class NoneInfoTests_Misc(unittest.TestCase): 3323 def test_add(self): 3324 # When addfile() encounters None metadata, it raises a ValueError 3325 bio = io.BytesIO() 3326 for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT, 3327 tarfile.PAX_FORMAT): 3328 with self.subTest(tarformat=tarformat): 3329 tar = tarfile.open(fileobj=bio, mode='w', format=tarformat) 3330 tarinfo = tar.gettarinfo(tarname) 3331 try: 3332 with open(tarname, 'rb') as f: 3333 tar.addfile(tarinfo, f) 3334 except Exception: 3335 if tarformat == tarfile.USTAR_FORMAT: 3336 # In the old, limited format, adding might fail for 3337 # reasons like the UID being too large 3338 pass 3339 else: 3340 raise 3341 else: 3342 for attr_name in ('mtime', 'mode', 'uid', 'gid', 3343 'uname', 'gname'): 3344 with self.subTest(attr_name=attr_name): 3345 replaced = tarinfo.replace(**{attr_name: None}) 3346 with self.assertRaisesRegex(ValueError, 3347 f"{attr_name}"): 3348 with open(tarname, 'rb') as f: 3349 tar.addfile(replaced, f) 3350 3351 def test_list(self): 3352 # Change some metadata to None, then compare list() output 3353 # word-for-word. We want list() to not raise, and to only change 3354 # printout for the affected piece of metadata. 3355 # (n.b.: some contents of the test archive are hardcoded.) 3356 for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, 3357 {'uname'}, {'gname'}, 3358 {'uid', 'uname'}, {'gid', 'gname'}): 3359 with (self.subTest(attr_names=attr_names), 3360 tarfile.open(tarname, encoding="iso8859-1") as tar): 3361 tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3362 with support.swap_attr(sys, 'stdout', tio_prev): 3363 tar.list() 3364 for member in tar.getmembers(): 3365 for attr_name in attr_names: 3366 setattr(member, attr_name, None) 3367 tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3368 with support.swap_attr(sys, 'stdout', tio_new): 3369 tar.list() 3370 for expected, got in zip(tio_prev.detach().getvalue().split(), 3371 tio_new.detach().getvalue().split()): 3372 if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected): 3373 self.assertEqual(got, b'????-??-??') 3374 elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected): 3375 self.assertEqual(got, b'??:??:??') 3376 elif attr_names == {'mode'} and re.match( 3377 rb'.([r-][w-][x-]){3}', expected): 3378 self.assertEqual(got, b'??????????') 3379 elif attr_names == {'uname'} and expected.startswith( 3380 (b'tarfile/', b'lars/', b'foo/')): 3381 exp_user, exp_group = expected.split(b'/') 3382 got_user, got_group = got.split(b'/') 3383 self.assertEqual(got_group, exp_group) 3384 self.assertRegex(got_user, b'[0-9]+') 3385 elif attr_names == {'gname'} and expected.endswith( 3386 (b'/tarfile', b'/users', b'/bar')): 3387 exp_user, exp_group = expected.split(b'/') 3388 got_user, got_group = got.split(b'/') 3389 self.assertEqual(got_user, exp_user) 3390 self.assertRegex(got_group, b'[0-9]+') 3391 elif attr_names == {'uid'} and expected.startswith( 3392 (b'1000/')): 3393 exp_user, exp_group = expected.split(b'/') 3394 got_user, got_group = got.split(b'/') 3395 self.assertEqual(got_group, exp_group) 3396 self.assertEqual(got_user, b'None') 3397 elif attr_names == {'gid'} and expected.endswith((b'/100')): 3398 exp_user, exp_group = expected.split(b'/') 3399 got_user, got_group = got.split(b'/') 3400 self.assertEqual(got_user, exp_user) 3401 self.assertEqual(got_group, b'None') 3402 elif attr_names == {'uid', 'uname'} and expected.startswith( 3403 (b'tarfile/', b'lars/', b'foo/', b'1000/')): 3404 exp_user, exp_group = expected.split(b'/') 3405 got_user, got_group = got.split(b'/') 3406 self.assertEqual(got_group, exp_group) 3407 self.assertEqual(got_user, b'None') 3408 elif attr_names == {'gname', 'gid'} and expected.endswith( 3409 (b'/tarfile', b'/users', b'/bar', b'/100')): 3410 exp_user, exp_group = expected.split(b'/') 3411 got_user, got_group = got.split(b'/') 3412 self.assertEqual(got_user, exp_user) 3413 self.assertEqual(got_group, b'None') 3414 else: 3415 # In other cases the output should be the same 3416 self.assertEqual(expected, got) 3417 3418def _filemode_to_int(mode): 3419 """Inverse of `stat.filemode` (for permission bits) 3420 3421 Using mode strings rather than numbers makes the later tests more readable. 3422 """ 3423 str_mode = mode[1:] 3424 result = ( 3425 {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] 3426 | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] 3427 | {'x': stat.S_IXUSR, '-': 0, 3428 's': stat.S_IXUSR | stat.S_ISUID, 3429 'S': stat.S_ISUID}[str_mode[2]] 3430 | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] 3431 | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] 3432 | {'x': stat.S_IXGRP, '-': 0, 3433 's': stat.S_IXGRP | stat.S_ISGID, 3434 'S': stat.S_ISGID}[str_mode[5]] 3435 | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] 3436 | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] 3437 | {'x': stat.S_IXOTH, '-': 0, 3438 't': stat.S_IXOTH | stat.S_ISVTX, 3439 'T': stat.S_ISVTX}[str_mode[8]] 3440 ) 3441 # check we did this right 3442 assert stat.filemode(result)[1:] == mode[1:] 3443 3444 return result 3445 3446class ArchiveMaker: 3447 """Helper to create a tar file with specific contents 3448 3449 Usage: 3450 3451 with ArchiveMaker() as t: 3452 t.add('filename', ...) 3453 3454 with t.open() as tar: 3455 ... # `tar` is now a TarFile with 'filename' in it! 3456 """ 3457 def __init__(self): 3458 self.bio = io.BytesIO() 3459 3460 def __enter__(self): 3461 self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) 3462 return self 3463 3464 def __exit__(self, *exc): 3465 self.tar_w.close() 3466 self.contents = self.bio.getvalue() 3467 self.bio = None 3468 3469 def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, 3470 mode=None, size=None, **kwargs): 3471 """Add a member to the test archive. Call within `with`.""" 3472 name = str(name) 3473 tarinfo = tarfile.TarInfo(name).replace(**kwargs) 3474 if size is not None: 3475 tarinfo.size = size 3476 if mode: 3477 tarinfo.mode = _filemode_to_int(mode) 3478 if symlink_to is not None: 3479 type = tarfile.SYMTYPE 3480 tarinfo.linkname = str(symlink_to) 3481 if hardlink_to is not None: 3482 type = tarfile.LNKTYPE 3483 tarinfo.linkname = str(hardlink_to) 3484 if name.endswith('/') and type is None: 3485 type = tarfile.DIRTYPE 3486 if type is not None: 3487 tarinfo.type = type 3488 if tarinfo.isreg(): 3489 fileobj = io.BytesIO(bytes(tarinfo.size)) 3490 else: 3491 fileobj = None 3492 self.tar_w.addfile(tarinfo, fileobj) 3493 3494 def open(self, **kwargs): 3495 """Open the resulting archive as TarFile. Call after `with`.""" 3496 bio = io.BytesIO(self.contents) 3497 return tarfile.open(fileobj=bio, **kwargs) 3498 3499# Under WASI, `os_helper.can_symlink` is False to make 3500# `skip_unless_symlink` skip symlink tests. " 3501# But in the following tests we use can_symlink to *determine* which 3502# behavior is expected. 3503# Like other symlink tests, skip these on WASI for now. 3504if support.is_wasi: 3505 def symlink_test(f): 3506 return unittest.skip("WASI: Skip symlink test for now")(f) 3507else: 3508 def symlink_test(f): 3509 return f 3510 3511 3512class TestExtractionFilters(unittest.TestCase): 3513 3514 # A temporary directory for the extraction results. 3515 # All files that "escape" the destination path should still end 3516 # up in this directory. 3517 outerdir = pathlib.Path(TEMPDIR) / 'outerdir' 3518 3519 # The destination for the extraction, within `outerdir` 3520 destdir = outerdir / 'dest' 3521 3522 @contextmanager 3523 def check_context(self, tar, filter): 3524 """Extracts `tar` to `self.destdir` and allows checking the result 3525 3526 If an error occurs, it must be checked using `expect_exception` 3527 3528 Otherwise, all resulting files must be checked using `expect_file`, 3529 except the destination directory itself and parent directories of 3530 other files. 3531 When checking directories, do so before their contents. 3532 """ 3533 with os_helper.temp_dir(self.outerdir): 3534 try: 3535 tar.extractall(self.destdir, filter=filter) 3536 except Exception as exc: 3537 self.raised_exception = exc 3538 self.expected_paths = set() 3539 else: 3540 self.raised_exception = None 3541 self.expected_paths = set(self.outerdir.glob('**/*')) 3542 self.expected_paths.discard(self.destdir) 3543 try: 3544 yield 3545 finally: 3546 tar.close() 3547 if self.raised_exception: 3548 raise self.raised_exception 3549 self.assertEqual(self.expected_paths, set()) 3550 3551 def expect_file(self, name, type=None, symlink_to=None, mode=None, 3552 size=None): 3553 """Check a single file. See check_context.""" 3554 if self.raised_exception: 3555 raise self.raised_exception 3556 # use normpath() rather than resolve() so we don't follow symlinks 3557 path = pathlib.Path(os.path.normpath(self.destdir / name)) 3558 self.assertIn(path, self.expected_paths) 3559 self.expected_paths.remove(path) 3560 if mode is not None and os_helper.can_chmod() and os.name != 'nt': 3561 got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) 3562 self.assertEqual(got, mode) 3563 if type is None and isinstance(name, str) and name.endswith('/'): 3564 type = tarfile.DIRTYPE 3565 if symlink_to is not None: 3566 got = (self.destdir / name).readlink() 3567 expected = pathlib.Path(symlink_to) 3568 # The symlink might be the same (textually) as what we expect, 3569 # but some systems change the link to an equivalent path, so 3570 # we fall back to samefile(). 3571 if expected != got: 3572 self.assertTrue(got.samefile(expected)) 3573 elif type == tarfile.REGTYPE or type is None: 3574 self.assertTrue(path.is_file()) 3575 elif type == tarfile.DIRTYPE: 3576 self.assertTrue(path.is_dir()) 3577 elif type == tarfile.FIFOTYPE: 3578 self.assertTrue(path.is_fifo()) 3579 else: 3580 raise NotImplementedError(type) 3581 if size is not None: 3582 self.assertEqual(path.stat().st_size, size) 3583 for parent in path.parents: 3584 self.expected_paths.discard(parent) 3585 3586 def expect_exception(self, exc_type, message_re='.'): 3587 with self.assertRaisesRegex(exc_type, message_re): 3588 if self.raised_exception is not None: 3589 raise self.raised_exception 3590 self.raised_exception = None 3591 3592 def test_benign_file(self): 3593 with ArchiveMaker() as arc: 3594 arc.add('benign.txt') 3595 for filter in 'fully_trusted', 'tar', 'data': 3596 with self.check_context(arc.open(), filter): 3597 self.expect_file('benign.txt') 3598 3599 def test_absolute(self): 3600 # Test handling a member with an absolute path 3601 # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives 3602 with ArchiveMaker() as arc: 3603 arc.add(self.outerdir / 'escaped.evil') 3604 3605 with self.check_context(arc.open(), 'fully_trusted'): 3606 self.expect_file('../escaped.evil') 3607 3608 for filter in 'tar', 'data': 3609 with self.check_context(arc.open(), filter): 3610 if str(self.outerdir).startswith('/'): 3611 # We strip leading slashes, as e.g. GNU tar does 3612 # (without --absolute-filenames). 3613 outerdir_stripped = str(self.outerdir).lstrip('/') 3614 self.expect_file(f'{outerdir_stripped}/escaped.evil') 3615 else: 3616 # On this system, absolute paths don't have leading 3617 # slashes. 3618 # So, there's nothing to strip. We refuse to unpack 3619 # to an absolute path, nonetheless. 3620 self.expect_exception( 3621 tarfile.AbsolutePathError, 3622 """['"].*escaped.evil['"] has an absolute path""") 3623 3624 @symlink_test 3625 def test_parent_symlink(self): 3626 # Test interplaying symlinks 3627 # Inspired by 'dirsymlink2a' in jwilk/traversal-archives 3628 with ArchiveMaker() as arc: 3629 3630 # `current` links to `.` which is both: 3631 # - the destination directory 3632 # - `current` itself 3633 arc.add('current', symlink_to='.') 3634 3635 # effectively points to ./../ 3636 arc.add('parent', symlink_to='current/..') 3637 3638 arc.add('parent/evil') 3639 3640 if os_helper.can_symlink(): 3641 with self.check_context(arc.open(), 'fully_trusted'): 3642 if self.raised_exception is not None: 3643 # Windows will refuse to create a file that's a symlink to itself 3644 # (and tarfile doesn't swallow that exception) 3645 self.expect_exception(FileExistsError) 3646 # The other cases will fail with this error too. 3647 # Skip the rest of this test. 3648 return 3649 else: 3650 self.expect_file('current', symlink_to='.') 3651 self.expect_file('parent', symlink_to='current/..') 3652 self.expect_file('../evil') 3653 3654 with self.check_context(arc.open(), 'tar'): 3655 self.expect_exception( 3656 tarfile.OutsideDestinationError, 3657 """'parent/evil' would be extracted to ['"].*evil['"], """ 3658 + "which is outside the destination") 3659 3660 with self.check_context(arc.open(), 'data'): 3661 self.expect_exception( 3662 tarfile.LinkOutsideDestinationError, 3663 """'parent' would link to ['"].*outerdir['"], """ 3664 + "which is outside the destination") 3665 3666 else: 3667 # No symlink support. The symlinks are ignored. 3668 with self.check_context(arc.open(), 'fully_trusted'): 3669 self.expect_file('parent/evil') 3670 with self.check_context(arc.open(), 'tar'): 3671 self.expect_file('parent/evil') 3672 with self.check_context(arc.open(), 'data'): 3673 self.expect_file('parent/evil') 3674 3675 @symlink_test 3676 def test_parent_symlink2(self): 3677 # Test interplaying symlinks 3678 # Inspired by 'dirsymlink2b' in jwilk/traversal-archives 3679 3680 # Posix and Windows have different pathname resolution: 3681 # either symlink or a '..' component resolve first. 3682 # Let's see which we are on. 3683 if os_helper.can_symlink(): 3684 testpath = os.path.join(TEMPDIR, 'resolution_test') 3685 os.mkdir(testpath) 3686 3687 # testpath/current links to `.` which is all of: 3688 # - `testpath` 3689 # - `testpath/current` 3690 # - `testpath/current/current` 3691 # - etc. 3692 os.symlink('.', os.path.join(testpath, 'current')) 3693 3694 # we'll test where `testpath/current/../file` ends up 3695 with open(os.path.join(testpath, 'current', '..', 'file'), 'w'): 3696 pass 3697 3698 if os.path.exists(os.path.join(testpath, 'file')): 3699 # Windows collapses 'current\..' to '.' first, leaving 3700 # 'testpath\file' 3701 dotdot_resolves_early = True 3702 elif os.path.exists(os.path.join(testpath, '..', 'file')): 3703 # Posix resolves 'current' to '.' first, leaving 3704 # 'testpath/../file' 3705 dotdot_resolves_early = False 3706 else: 3707 raise AssertionError('Could not determine link resolution') 3708 3709 with ArchiveMaker() as arc: 3710 3711 # `current` links to `.` which is both the destination directory 3712 # and `current` itself 3713 arc.add('current', symlink_to='.') 3714 3715 # `current/parent` is also available as `./parent`, 3716 # and effectively points to `./../` 3717 arc.add('current/parent', symlink_to='..') 3718 3719 arc.add('parent/evil') 3720 3721 with self.check_context(arc.open(), 'fully_trusted'): 3722 if os_helper.can_symlink(): 3723 self.expect_file('current', symlink_to='.') 3724 self.expect_file('parent', symlink_to='..') 3725 self.expect_file('../evil') 3726 else: 3727 self.expect_file('current/') 3728 self.expect_file('parent/evil') 3729 3730 with self.check_context(arc.open(), 'tar'): 3731 if os_helper.can_symlink(): 3732 # Fail when extracting a file outside destination 3733 self.expect_exception( 3734 tarfile.OutsideDestinationError, 3735 "'parent/evil' would be extracted to " 3736 + """['"].*evil['"], which is outside """ 3737 + "the destination") 3738 else: 3739 self.expect_file('current/') 3740 self.expect_file('parent/evil') 3741 3742 with self.check_context(arc.open(), 'data'): 3743 if os_helper.can_symlink(): 3744 if dotdot_resolves_early: 3745 # Fail when extracting a file outside destination 3746 self.expect_exception( 3747 tarfile.OutsideDestinationError, 3748 "'parent/evil' would be extracted to " 3749 + """['"].*evil['"], which is outside """ 3750 + "the destination") 3751 else: 3752 # Fail as soon as we have a symlink outside the destination 3753 self.expect_exception( 3754 tarfile.LinkOutsideDestinationError, 3755 "'current/parent' would link to " 3756 + """['"].*outerdir['"], which is outside """ 3757 + "the destination") 3758 else: 3759 self.expect_file('current/') 3760 self.expect_file('parent/evil') 3761 3762 @symlink_test 3763 def test_absolute_symlink(self): 3764 # Test symlink to an absolute path 3765 # Inspired by 'dirsymlink' in jwilk/traversal-archives 3766 with ArchiveMaker() as arc: 3767 arc.add('parent', symlink_to=self.outerdir) 3768 arc.add('parent/evil') 3769 3770 with self.check_context(arc.open(), 'fully_trusted'): 3771 if os_helper.can_symlink(): 3772 self.expect_file('parent', symlink_to=self.outerdir) 3773 self.expect_file('../evil') 3774 else: 3775 self.expect_file('parent/evil') 3776 3777 with self.check_context(arc.open(), 'tar'): 3778 if os_helper.can_symlink(): 3779 self.expect_exception( 3780 tarfile.OutsideDestinationError, 3781 "'parent/evil' would be extracted to " 3782 + """['"].*evil['"], which is outside """ 3783 + "the destination") 3784 else: 3785 self.expect_file('parent/evil') 3786 3787 with self.check_context(arc.open(), 'data'): 3788 self.expect_exception( 3789 tarfile.AbsoluteLinkError, 3790 "'parent' is a link to an absolute path") 3791 3792 def test_absolute_hardlink(self): 3793 # Test hardlink to an absolute path 3794 # Inspired by 'dirsymlink' in https://github.com/jwilk/traversal-archives 3795 with ArchiveMaker() as arc: 3796 arc.add('parent', hardlink_to=self.outerdir / 'foo') 3797 3798 with self.check_context(arc.open(), 'fully_trusted'): 3799 self.expect_exception(KeyError, ".*foo. not found") 3800 3801 with self.check_context(arc.open(), 'tar'): 3802 self.expect_exception(KeyError, ".*foo. not found") 3803 3804 with self.check_context(arc.open(), 'data'): 3805 self.expect_exception( 3806 tarfile.AbsoluteLinkError, 3807 "'parent' is a link to an absolute path") 3808 3809 @symlink_test 3810 def test_sly_relative0(self): 3811 # Inspired by 'relative0' in jwilk/traversal-archives 3812 with ArchiveMaker() as arc: 3813 # points to `../../tmp/moo` 3814 arc.add('../moo', symlink_to='..//tmp/moo') 3815 3816 try: 3817 with self.check_context(arc.open(), filter='fully_trusted'): 3818 if os_helper.can_symlink(): 3819 if isinstance(self.raised_exception, FileExistsError): 3820 # XXX TarFile happens to fail creating a parent 3821 # directory. 3822 # This might be a bug, but fixing it would hurt 3823 # security. 3824 # Note that e.g. GNU `tar` rejects '..' components, 3825 # so you could argue this is an invalid archive and we 3826 # just raise an bad type of exception. 3827 self.expect_exception(FileExistsError) 3828 else: 3829 self.expect_file('../moo', symlink_to='..//tmp/moo') 3830 else: 3831 # The symlink can't be extracted and is ignored 3832 pass 3833 except FileExistsError: 3834 pass 3835 3836 for filter in 'tar', 'data': 3837 with self.check_context(arc.open(), filter): 3838 self.expect_exception( 3839 tarfile.OutsideDestinationError, 3840 "'../moo' would be extracted to " 3841 + "'.*moo', which is outside " 3842 + "the destination") 3843 3844 @symlink_test 3845 def test_sly_relative2(self): 3846 # Inspired by 'relative2' in jwilk/traversal-archives 3847 with ArchiveMaker() as arc: 3848 arc.add('tmp/') 3849 arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') 3850 3851 with self.check_context(arc.open(), 'fully_trusted'): 3852 self.expect_file('tmp', type=tarfile.DIRTYPE) 3853 if os_helper.can_symlink(): 3854 self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') 3855 3856 for filter in 'tar', 'data': 3857 with self.check_context(arc.open(), filter): 3858 self.expect_exception( 3859 tarfile.OutsideDestinationError, 3860 "'tmp/../../moo' would be extracted to " 3861 + """['"].*moo['"], which is outside the """ 3862 + "destination") 3863 3864 @symlink_test 3865 def test_deep_symlink(self): 3866 # Test that symlinks and hardlinks inside a directory 3867 # point to the correct file (`target` of size 3). 3868 # If links aren't supported we get a copy of the file. 3869 with ArchiveMaker() as arc: 3870 arc.add('targetdir/target', size=3) 3871 # a hardlink's linkname is relative to the archive 3872 arc.add('linkdir/hardlink', hardlink_to=os.path.join( 3873 'targetdir', 'target')) 3874 # a symlink's linkname is relative to the link's directory 3875 arc.add('linkdir/symlink', symlink_to=os.path.join( 3876 '..', 'targetdir', 'target')) 3877 3878 for filter in 'tar', 'data', 'fully_trusted': 3879 with self.check_context(arc.open(), filter): 3880 self.expect_file('targetdir/target', size=3) 3881 self.expect_file('linkdir/hardlink', size=3) 3882 if os_helper.can_symlink(): 3883 self.expect_file('linkdir/symlink', size=3, 3884 symlink_to='../targetdir/target') 3885 else: 3886 self.expect_file('linkdir/symlink', size=3) 3887 3888 @symlink_test 3889 def test_chains(self): 3890 # Test chaining of symlinks/hardlinks. 3891 # Symlinks are created before the files they point to. 3892 with ArchiveMaker() as arc: 3893 arc.add('linkdir/symlink', symlink_to='hardlink') 3894 arc.add('symlink2', symlink_to=os.path.join( 3895 'linkdir', 'hardlink2')) 3896 arc.add('targetdir/target', size=3) 3897 arc.add('linkdir/hardlink', hardlink_to='targetdir/target') 3898 arc.add('linkdir/hardlink2', hardlink_to='linkdir/symlink') 3899 3900 for filter in 'tar', 'data', 'fully_trusted': 3901 with self.check_context(arc.open(), filter): 3902 self.expect_file('targetdir/target', size=3) 3903 self.expect_file('linkdir/hardlink', size=3) 3904 self.expect_file('linkdir/hardlink2', size=3) 3905 if os_helper.can_symlink(): 3906 self.expect_file('linkdir/symlink', size=3, 3907 symlink_to='hardlink') 3908 self.expect_file('symlink2', size=3, 3909 symlink_to='linkdir/hardlink2') 3910 else: 3911 self.expect_file('linkdir/symlink', size=3) 3912 self.expect_file('symlink2', size=3) 3913 3914 def test_modes(self): 3915 # Test how file modes are extracted 3916 # (Note that the modes are ignored on platforms without working chmod) 3917 with ArchiveMaker() as arc: 3918 arc.add('all_bits', mode='?rwsrwsrwt') 3919 arc.add('perm_bits', mode='?rwxrwxrwx') 3920 arc.add('exec_group_other', mode='?rw-rwxrwx') 3921 arc.add('read_group_only', mode='?---r-----') 3922 arc.add('no_bits', mode='?---------') 3923 arc.add('dir/', mode='?---rwsrwt') 3924 arc.add('dir_all_bits/', mode='?rwsrwsrwt') 3925 3926 # On some systems, setting the uid, gid, and/or sticky bit is a no-ops. 3927 # Check which bits we can set, so we can compare tarfile machinery to 3928 # a simple chmod. 3929 tmp_filename = os.path.join(TEMPDIR, "tmp.file") 3930 with open(tmp_filename, 'w'): 3931 pass 3932 try: 3933 new_mode = (os.stat(tmp_filename).st_mode 3934 | stat.S_ISVTX | stat.S_ISGID | stat.S_ISUID) 3935 try: 3936 os.chmod(tmp_filename, new_mode) 3937 except OSError as exc: 3938 if exc.errno == getattr(errno, "EFTYPE", 0): 3939 # gh-108948: On FreeBSD, regular users cannot set 3940 # the sticky bit. 3941 self.skipTest("chmod() failed with EFTYPE: " 3942 "regular users cannot set sticky bit") 3943 else: 3944 raise 3945 3946 got_mode = os.stat(tmp_filename).st_mode 3947 _t_file = 't' if (got_mode & stat.S_ISVTX) else 'x' 3948 _suid_file = 's' if (got_mode & stat.S_ISUID) else 'x' 3949 _sgid_file = 's' if (got_mode & stat.S_ISGID) else 'x' 3950 finally: 3951 os.unlink(tmp_filename) 3952 3953 os.mkdir(tmp_filename) 3954 new_mode = (os.stat(tmp_filename).st_mode 3955 | stat.S_ISVTX | stat.S_ISGID | stat.S_ISUID) 3956 os.chmod(tmp_filename, new_mode) 3957 got_mode = os.stat(tmp_filename).st_mode 3958 _t_dir = 't' if (got_mode & stat.S_ISVTX) else 'x' 3959 _suid_dir = 's' if (got_mode & stat.S_ISUID) else 'x' 3960 _sgid_dir = 's' if (got_mode & stat.S_ISGID) else 'x' 3961 os.rmdir(tmp_filename) 3962 3963 with self.check_context(arc.open(), 'fully_trusted'): 3964 self.expect_file('all_bits', 3965 mode=f'?rw{_suid_file}rw{_sgid_file}rw{_t_file}') 3966 self.expect_file('perm_bits', mode='?rwxrwxrwx') 3967 self.expect_file('exec_group_other', mode='?rw-rwxrwx') 3968 self.expect_file('read_group_only', mode='?---r-----') 3969 self.expect_file('no_bits', mode='?---------') 3970 self.expect_file('dir/', mode=f'?---rw{_sgid_dir}rw{_t_dir}') 3971 self.expect_file('dir_all_bits/', 3972 mode=f'?rw{_suid_dir}rw{_sgid_dir}rw{_t_dir}') 3973 3974 with self.check_context(arc.open(), 'tar'): 3975 self.expect_file('all_bits', mode='?rwxr-xr-x') 3976 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3977 self.expect_file('exec_group_other', mode='?rw-r-xr-x') 3978 self.expect_file('read_group_only', mode='?---r-----') 3979 self.expect_file('no_bits', mode='?---------') 3980 self.expect_file('dir/', mode='?---r-xr-x') 3981 self.expect_file('dir_all_bits/', mode='?rwxr-xr-x') 3982 3983 with self.check_context(arc.open(), 'data'): 3984 normal_dir_mode = stat.filemode(stat.S_IMODE( 3985 self.outerdir.stat().st_mode)) 3986 self.expect_file('all_bits', mode='?rwxr-xr-x') 3987 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3988 self.expect_file('exec_group_other', mode='?rw-r--r--') 3989 self.expect_file('read_group_only', mode='?rw-r-----') 3990 self.expect_file('no_bits', mode='?rw-------') 3991 self.expect_file('dir/', mode=normal_dir_mode) 3992 self.expect_file('dir_all_bits/', mode=normal_dir_mode) 3993 3994 def test_pipe(self): 3995 # Test handling of a special file 3996 with ArchiveMaker() as arc: 3997 arc.add('foo', type=tarfile.FIFOTYPE) 3998 3999 for filter in 'fully_trusted', 'tar': 4000 with self.check_context(arc.open(), filter): 4001 if hasattr(os, 'mkfifo'): 4002 self.expect_file('foo', type=tarfile.FIFOTYPE) 4003 else: 4004 # The pipe can't be extracted and is skipped. 4005 pass 4006 4007 with self.check_context(arc.open(), 'data'): 4008 self.expect_exception( 4009 tarfile.SpecialFileError, 4010 "'foo' is a special file") 4011 4012 def test_special_files(self): 4013 # Creating device files is tricky. Instead of attempting that let's 4014 # only check the filter result. 4015 for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: 4016 tarinfo = tarfile.TarInfo('foo') 4017 tarinfo.type = special_type 4018 trusted = tarfile.fully_trusted_filter(tarinfo, '') 4019 self.assertIs(trusted, tarinfo) 4020 tar = tarfile.tar_filter(tarinfo, '') 4021 self.assertEqual(tar.type, special_type) 4022 with self.assertRaises(tarfile.SpecialFileError) as cm: 4023 tarfile.data_filter(tarinfo, '') 4024 self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo) 4025 self.assertEqual(cm.exception.tarinfo.name, 'foo') 4026 4027 def test_fully_trusted_filter(self): 4028 # The 'fully_trusted' filter returns the original TarInfo objects. 4029 with tarfile.TarFile.open(tarname) as tar: 4030 for tarinfo in tar.getmembers(): 4031 filtered = tarfile.fully_trusted_filter(tarinfo, '') 4032 self.assertIs(filtered, tarinfo) 4033 4034 def test_tar_filter(self): 4035 # The 'tar' filter returns TarInfo objects with the same name/type. 4036 # (It can also fail for particularly "evil" input, but we don't have 4037 # that in the test archive.) 4038 with tarfile.TarFile.open(tarname) as tar: 4039 for tarinfo in tar.getmembers(): 4040 filtered = tarfile.tar_filter(tarinfo, '') 4041 self.assertIs(filtered.name, tarinfo.name) 4042 self.assertIs(filtered.type, tarinfo.type) 4043 4044 def test_data_filter(self): 4045 # The 'data' filter either raises, or returns TarInfo with the same 4046 # name/type. 4047 with tarfile.TarFile.open(tarname) as tar: 4048 for tarinfo in tar.getmembers(): 4049 try: 4050 filtered = tarfile.data_filter(tarinfo, '') 4051 except tarfile.FilterError: 4052 continue 4053 self.assertIs(filtered.name, tarinfo.name) 4054 self.assertIs(filtered.type, tarinfo.type) 4055 4056 def test_default_filter_warns(self): 4057 """Ensure the default filter warns""" 4058 with ArchiveMaker() as arc: 4059 arc.add('foo') 4060 with warnings_helper.check_warnings( 4061 ('Python 3.14', DeprecationWarning)): 4062 with self.check_context(arc.open(), None): 4063 self.expect_file('foo') 4064 4065 def test_change_default_filter_on_instance(self): 4066 tar = tarfile.TarFile(tarname, 'r') 4067 def strict_filter(tarinfo, path): 4068 if tarinfo.name == 'ustar/regtype': 4069 return tarinfo 4070 else: 4071 return None 4072 tar.extraction_filter = strict_filter 4073 with self.check_context(tar, None): 4074 self.expect_file('ustar/regtype') 4075 4076 def test_change_default_filter_on_class(self): 4077 def strict_filter(tarinfo, path): 4078 if tarinfo.name == 'ustar/regtype': 4079 return tarinfo 4080 else: 4081 return None 4082 tar = tarfile.TarFile(tarname, 'r') 4083 with support.swap_attr(tarfile.TarFile, 'extraction_filter', 4084 staticmethod(strict_filter)): 4085 with self.check_context(tar, None): 4086 self.expect_file('ustar/regtype') 4087 4088 def test_change_default_filter_on_subclass(self): 4089 class TarSubclass(tarfile.TarFile): 4090 def extraction_filter(self, tarinfo, path): 4091 if tarinfo.name == 'ustar/regtype': 4092 return tarinfo 4093 else: 4094 return None 4095 4096 tar = TarSubclass(tarname, 'r') 4097 with self.check_context(tar, None): 4098 self.expect_file('ustar/regtype') 4099 4100 def test_change_default_filter_to_string(self): 4101 tar = tarfile.TarFile(tarname, 'r') 4102 tar.extraction_filter = 'data' 4103 with self.check_context(tar, None): 4104 self.expect_exception(TypeError) 4105 4106 def test_custom_filter(self): 4107 def custom_filter(tarinfo, path): 4108 self.assertIs(path, self.destdir) 4109 if tarinfo.name == 'move_this': 4110 return tarinfo.replace(name='moved') 4111 if tarinfo.name == 'ignore_this': 4112 return None 4113 return tarinfo 4114 4115 with ArchiveMaker() as arc: 4116 arc.add('move_this') 4117 arc.add('ignore_this') 4118 arc.add('keep') 4119 with self.check_context(arc.open(), custom_filter): 4120 self.expect_file('moved') 4121 self.expect_file('keep') 4122 4123 def test_bad_filter_name(self): 4124 with ArchiveMaker() as arc: 4125 arc.add('foo') 4126 with self.check_context(arc.open(), 'bad filter name'): 4127 self.expect_exception(ValueError) 4128 4129 def test_stateful_filter(self): 4130 # Stateful filters should be possible. 4131 # (This doesn't really test tarfile. Rather, it demonstrates 4132 # that third parties can implement a stateful filter.) 4133 class StatefulFilter: 4134 def __enter__(self): 4135 self.num_files_processed = 0 4136 return self 4137 4138 def __call__(self, tarinfo, path): 4139 try: 4140 tarinfo = tarfile.data_filter(tarinfo, path) 4141 except tarfile.FilterError: 4142 return None 4143 self.num_files_processed += 1 4144 return tarinfo 4145 4146 def __exit__(self, *exc_info): 4147 self.done = True 4148 4149 with ArchiveMaker() as arc: 4150 arc.add('good') 4151 arc.add('bad', symlink_to='/') 4152 arc.add('good') 4153 with StatefulFilter() as custom_filter: 4154 with self.check_context(arc.open(), custom_filter): 4155 self.expect_file('good') 4156 self.assertEqual(custom_filter.num_files_processed, 2) 4157 self.assertEqual(custom_filter.done, True) 4158 4159 def test_errorlevel(self): 4160 def extracterror_filter(tarinfo, path): 4161 raise tarfile.ExtractError('failed with ExtractError') 4162 def filtererror_filter(tarinfo, path): 4163 raise tarfile.FilterError('failed with FilterError') 4164 def oserror_filter(tarinfo, path): 4165 raise OSError('failed with OSError') 4166 def tarerror_filter(tarinfo, path): 4167 raise tarfile.TarError('failed with base TarError') 4168 def valueerror_filter(tarinfo, path): 4169 raise ValueError('failed with ValueError') 4170 4171 with ArchiveMaker() as arc: 4172 arc.add('file') 4173 4174 # If errorlevel is 0, errors affected by errorlevel are ignored 4175 4176 with self.check_context(arc.open(errorlevel=0), extracterror_filter): 4177 self.expect_file('file') 4178 4179 with self.check_context(arc.open(errorlevel=0), filtererror_filter): 4180 self.expect_file('file') 4181 4182 with self.check_context(arc.open(errorlevel=0), oserror_filter): 4183 self.expect_file('file') 4184 4185 with self.check_context(arc.open(errorlevel=0), tarerror_filter): 4186 self.expect_exception(tarfile.TarError) 4187 4188 with self.check_context(arc.open(errorlevel=0), valueerror_filter): 4189 self.expect_exception(ValueError) 4190 4191 # If 1, all fatal errors are raised 4192 4193 with self.check_context(arc.open(errorlevel=1), extracterror_filter): 4194 self.expect_file('file') 4195 4196 with self.check_context(arc.open(errorlevel=1), filtererror_filter): 4197 self.expect_exception(tarfile.FilterError) 4198 4199 with self.check_context(arc.open(errorlevel=1), oserror_filter): 4200 self.expect_exception(OSError) 4201 4202 with self.check_context(arc.open(errorlevel=1), tarerror_filter): 4203 self.expect_exception(tarfile.TarError) 4204 4205 with self.check_context(arc.open(errorlevel=1), valueerror_filter): 4206 self.expect_exception(ValueError) 4207 4208 # If 2, all non-fatal errors are raised as well. 4209 4210 with self.check_context(arc.open(errorlevel=2), extracterror_filter): 4211 self.expect_exception(tarfile.ExtractError) 4212 4213 with self.check_context(arc.open(errorlevel=2), filtererror_filter): 4214 self.expect_exception(tarfile.FilterError) 4215 4216 with self.check_context(arc.open(errorlevel=2), oserror_filter): 4217 self.expect_exception(OSError) 4218 4219 with self.check_context(arc.open(errorlevel=2), tarerror_filter): 4220 self.expect_exception(tarfile.TarError) 4221 4222 with self.check_context(arc.open(errorlevel=2), valueerror_filter): 4223 self.expect_exception(ValueError) 4224 4225 # We only handle ExtractionError, FilterError & OSError specially. 4226 4227 with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter): 4228 self.expect_exception(TypeError) # errorlevel is not int 4229 4230 4231class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase): 4232 testdir = os.path.join(TEMPDIR, "testoverwrite") 4233 4234 @classmethod 4235 def setUpClass(cls): 4236 p = cls.ar_with_file = os.path.join(TEMPDIR, 'tar-with-file.tar') 4237 cls.addClassCleanup(os_helper.unlink, p) 4238 with tarfile.open(p, 'w') as tar: 4239 t = tarfile.TarInfo('test') 4240 t.size = 10 4241 tar.addfile(t, io.BytesIO(b'newcontent')) 4242 4243 p = cls.ar_with_dir = os.path.join(TEMPDIR, 'tar-with-dir.tar') 4244 cls.addClassCleanup(os_helper.unlink, p) 4245 with tarfile.open(p, 'w') as tar: 4246 tar.addfile(tar.gettarinfo(os.curdir, 'test')) 4247 4248 p = os.path.join(TEMPDIR, 'tar-with-implicit-dir.tar') 4249 cls.ar_with_implicit_dir = p 4250 cls.addClassCleanup(os_helper.unlink, p) 4251 with tarfile.open(p, 'w') as tar: 4252 t = tarfile.TarInfo('test/file') 4253 t.size = 10 4254 tar.addfile(t, io.BytesIO(b'newcontent')) 4255 4256 def open(self, path): 4257 return tarfile.open(path, 'r') 4258 4259 def extractall(self, ar): 4260 ar.extractall(self.testdir, filter='fully_trusted') 4261 4262 4263def setUpModule(): 4264 os_helper.unlink(TEMPDIR) 4265 os.makedirs(TEMPDIR) 4266 4267 global testtarnames 4268 testtarnames = [tarname] 4269 with open(tarname, "rb") as fobj: 4270 data = fobj.read() 4271 4272 # Create compressed tarfiles. 4273 for c in GzipTest, Bz2Test, LzmaTest: 4274 if c.open: 4275 os_helper.unlink(c.tarname) 4276 testtarnames.append(c.tarname) 4277 with c.open(c.tarname, "wb") as tar: 4278 tar.write(data) 4279 4280def tearDownModule(): 4281 if os.path.exists(TEMPDIR): 4282 os_helper.rmtree(TEMPDIR) 4283 4284if __name__ == "__main__": 4285 unittest.main() 4286