1import sys 2import os 3import io 4from hashlib import md5 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25try: 26 import lzma 27except ImportError: 28 lzma = None 29 30def md5sum(data): 31 return md5(data).hexdigest() 32 33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 34tarextdir = TEMPDIR + '-extract-test' 35tarname = support.findfile("testtar.tar") 36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 38xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 39tmpname = os.path.join(TEMPDIR, "tmp.tar") 40dotlessname = os.path.join(TEMPDIR, "testtar") 41 42md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 43md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 44 45 46class TarTest: 47 tarname = tarname 48 suffix = '' 49 open = io.FileIO 50 taropen = tarfile.TarFile.taropen 51 52 @property 53 def mode(self): 54 return self.prefix + self.suffix 55 56@support.requires_gzip 57class GzipTest: 58 tarname = gzipname 59 suffix = 'gz' 60 open = gzip.GzipFile if gzip else None 61 taropen = tarfile.TarFile.gzopen 62 63@support.requires_bz2 64class Bz2Test: 65 tarname = bz2name 66 suffix = 'bz2' 67 open = bz2.BZ2File if bz2 else None 68 taropen = tarfile.TarFile.bz2open 69 70@support.requires_lzma 71class LzmaTest: 72 tarname = xzname 73 suffix = 'xz' 74 open = lzma.LZMAFile if lzma else None 75 taropen = tarfile.TarFile.xzopen 76 77 78class ReadTest(TarTest): 79 80 prefix = "r:" 81 82 def setUp(self): 83 self.tar = tarfile.open(self.tarname, mode=self.mode, 84 encoding="iso8859-1") 85 86 def tearDown(self): 87 self.tar.close() 88 89 90class UstarReadTest(ReadTest, unittest.TestCase): 91 92 def test_fileobj_regular_file(self): 93 tarinfo = self.tar.getmember("ustar/regtype") 94 with self.tar.extractfile(tarinfo) as fobj: 95 data = fobj.read() 96 self.assertEqual(len(data), tarinfo.size, 97 "regular file extraction failed") 98 self.assertEqual(md5sum(data), md5_regtype, 99 "regular file extraction failed") 100 101 def test_fileobj_readlines(self): 102 self.tar.extract("ustar/regtype", TEMPDIR) 103 tarinfo = self.tar.getmember("ustar/regtype") 104 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 105 lines1 = fobj1.readlines() 106 107 with self.tar.extractfile(tarinfo) as fobj: 108 fobj2 = io.TextIOWrapper(fobj) 109 lines2 = fobj2.readlines() 110 self.assertEqual(lines1, lines2, 111 "fileobj.readlines() failed") 112 self.assertEqual(len(lines2), 114, 113 "fileobj.readlines() failed") 114 self.assertEqual(lines2[83], 115 "I will gladly admit that Python is not the fastest " 116 "running scripting language.\n", 117 "fileobj.readlines() failed") 118 119 def test_fileobj_iter(self): 120 self.tar.extract("ustar/regtype", TEMPDIR) 121 tarinfo = self.tar.getmember("ustar/regtype") 122 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 123 lines1 = fobj1.readlines() 124 with self.tar.extractfile(tarinfo) as fobj2: 125 lines2 = list(io.TextIOWrapper(fobj2)) 126 self.assertEqual(lines1, lines2, 127 "fileobj.__iter__() failed") 128 129 def test_fileobj_seek(self): 130 self.tar.extract("ustar/regtype", TEMPDIR) 131 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 132 data = fobj.read() 133 134 tarinfo = self.tar.getmember("ustar/regtype") 135 fobj = self.tar.extractfile(tarinfo) 136 137 text = fobj.read() 138 fobj.seek(0) 139 self.assertEqual(0, fobj.tell(), 140 "seek() to file's start failed") 141 fobj.seek(2048, 0) 142 self.assertEqual(2048, fobj.tell(), 143 "seek() to absolute position failed") 144 fobj.seek(-1024, 1) 145 self.assertEqual(1024, fobj.tell(), 146 "seek() to negative relative position failed") 147 fobj.seek(1024, 1) 148 self.assertEqual(2048, fobj.tell(), 149 "seek() to positive relative position failed") 150 s = fobj.read(10) 151 self.assertEqual(s, data[2048:2058], 152 "read() after seek failed") 153 fobj.seek(0, 2) 154 self.assertEqual(tarinfo.size, fobj.tell(), 155 "seek() to file's end failed") 156 self.assertEqual(fobj.read(), b"", 157 "read() at file's end did not return empty string") 158 fobj.seek(-tarinfo.size, 2) 159 self.assertEqual(0, fobj.tell(), 160 "relative seek() to file's end failed") 161 fobj.seek(512) 162 s1 = fobj.readlines() 163 fobj.seek(512) 164 s2 = fobj.readlines() 165 self.assertEqual(s1, s2, 166 "readlines() after seek failed") 167 fobj.seek(0) 168 self.assertEqual(len(fobj.readline()), fobj.tell(), 169 "tell() after readline() failed") 170 fobj.seek(512) 171 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 172 "tell() after seek() and readline() failed") 173 fobj.seek(0) 174 line = fobj.readline() 175 self.assertEqual(fobj.read(), data[len(line):], 176 "read() after readline() failed") 177 fobj.close() 178 179 def test_fileobj_text(self): 180 with self.tar.extractfile("ustar/regtype") as fobj: 181 fobj = io.TextIOWrapper(fobj) 182 data = fobj.read().encode("iso8859-1") 183 self.assertEqual(md5sum(data), md5_regtype) 184 try: 185 fobj.seek(100) 186 except AttributeError: 187 # Issue #13815: seek() complained about a missing 188 # flush() method. 189 self.fail("seeking failed in text mode") 190 191 # Test if symbolic and hard links are resolved by extractfile(). The 192 # test link members each point to a regular member whose data is 193 # supposed to be exported. 194 def _test_fileobj_link(self, lnktype, regtype): 195 with self.tar.extractfile(lnktype) as a, \ 196 self.tar.extractfile(regtype) as b: 197 self.assertEqual(a.name, b.name) 198 199 def test_fileobj_link1(self): 200 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 201 202 def test_fileobj_link2(self): 203 self._test_fileobj_link("./ustar/linktest2/lnktype", 204 "ustar/linktest1/regtype") 205 206 def test_fileobj_symlink1(self): 207 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 208 209 def test_fileobj_symlink2(self): 210 self._test_fileobj_link("./ustar/linktest2/symtype", 211 "ustar/linktest1/regtype") 212 213 def test_issue14160(self): 214 self._test_fileobj_link("symtype2", "ustar/regtype") 215 216class GzipUstarReadTest(GzipTest, UstarReadTest): 217 pass 218 219class Bz2UstarReadTest(Bz2Test, UstarReadTest): 220 pass 221 222class LzmaUstarReadTest(LzmaTest, UstarReadTest): 223 pass 224 225 226class ListTest(ReadTest, unittest.TestCase): 227 228 # Override setUp to use default encoding (UTF-8) 229 def setUp(self): 230 self.tar = tarfile.open(self.tarname, mode=self.mode) 231 232 def test_list(self): 233 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 234 with support.swap_attr(sys, 'stdout', tio): 235 self.tar.list(verbose=False) 236 out = tio.detach().getvalue() 237 self.assertIn(b'ustar/conttype', out) 238 self.assertIn(b'ustar/regtype', out) 239 self.assertIn(b'ustar/lnktype', out) 240 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 241 self.assertIn(b'./ustar/linktest2/symtype', out) 242 self.assertIn(b'./ustar/linktest2/lnktype', out) 243 # Make sure it puts trailing slash for directory 244 self.assertIn(b'ustar/dirtype/', out) 245 self.assertIn(b'ustar/dirtype-with-size/', out) 246 # Make sure it is able to print unencodable characters 247 def conv(b): 248 s = b.decode(self.tar.encoding, 'surrogateescape') 249 return s.encode('ascii', 'backslashreplace') 250 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 251 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 252 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 254 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 255 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 256 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 257 # Make sure it prints files separated by one newline without any 258 # 'ls -l'-like accessories if verbose flag is not being used 259 # ... 260 # ustar/conttype 261 # ustar/regtype 262 # ... 263 self.assertRegex(out, br'ustar/conttype ?\r?\n' 264 br'ustar/regtype ?\r?\n') 265 # Make sure it does not print the source of link without verbose flag 266 self.assertNotIn(b'link to', out) 267 self.assertNotIn(b'->', out) 268 269 def test_list_verbose(self): 270 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 271 with support.swap_attr(sys, 'stdout', tio): 272 self.tar.list(verbose=True) 273 out = tio.detach().getvalue() 274 # Make sure it prints files separated by one newline with 'ls -l'-like 275 # accessories if verbose flag is being used 276 # ... 277 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 278 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 279 # ... 280 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 281 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 282 br'ustar/\w+type ?\r?\n') * 2) 283 # Make sure it prints the source of link with verbose flag 284 self.assertIn(b'ustar/symtype -> regtype', out) 285 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 286 self.assertIn(b'./ustar/linktest2/lnktype link to ' 287 b'./ustar/linktest1/regtype', out) 288 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 289 (b'/123' * 125) + b'/longname', out) 290 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 291 (b'/123' * 125) + b'/longname', out) 292 293 def test_list_members(self): 294 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 295 def members(tar): 296 for tarinfo in tar.getmembers(): 297 if 'reg' in tarinfo.name: 298 yield tarinfo 299 with support.swap_attr(sys, 'stdout', tio): 300 self.tar.list(verbose=False, members=members(self.tar)) 301 out = tio.detach().getvalue() 302 self.assertIn(b'ustar/regtype', out) 303 self.assertNotIn(b'ustar/conttype', out) 304 305 306class GzipListTest(GzipTest, ListTest): 307 pass 308 309 310class Bz2ListTest(Bz2Test, ListTest): 311 pass 312 313 314class LzmaListTest(LzmaTest, ListTest): 315 pass 316 317 318class CommonReadTest(ReadTest): 319 320 def test_empty_tarfile(self): 321 # Test for issue6123: Allow opening empty archives. 322 # This test checks if tarfile.open() is able to open an empty tar 323 # archive successfully. Note that an empty tar archive is not the 324 # same as an empty file! 325 with tarfile.open(tmpname, self.mode.replace("r", "w")): 326 pass 327 try: 328 tar = tarfile.open(tmpname, self.mode) 329 tar.getnames() 330 except tarfile.ReadError: 331 self.fail("tarfile.open() failed on empty archive") 332 else: 333 self.assertListEqual(tar.getmembers(), []) 334 finally: 335 tar.close() 336 337 def test_non_existent_tarfile(self): 338 # Test for issue11513: prevent non-existent gzipped tarfiles raising 339 # multiple exceptions. 340 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 341 tarfile.open("xxx", self.mode) 342 343 def test_null_tarfile(self): 344 # Test for issue6123: Allow opening empty archives. 345 # This test guarantees that tarfile.open() does not treat an empty 346 # file as an empty tar archive. 347 with open(tmpname, "wb"): 348 pass 349 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 350 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 351 352 def test_ignore_zeros(self): 353 # Test TarFile's ignore_zeros option. 354 # generate 512 pseudorandom bytes 355 data = Random(0).getrandbits(512*8).to_bytes(512, 'big') 356 for char in (b'\0', b'a'): 357 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 358 # are ignored correctly. 359 with self.open(tmpname, "w") as fobj: 360 fobj.write(char * 1024) 361 tarinfo = tarfile.TarInfo("foo") 362 tarinfo.size = len(data) 363 fobj.write(tarinfo.tobuf()) 364 fobj.write(data) 365 366 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 367 try: 368 self.assertListEqual(tar.getnames(), ["foo"], 369 "ignore_zeros=True should have skipped the %r-blocks" % 370 char) 371 finally: 372 tar.close() 373 374 def test_premature_end_of_archive(self): 375 for size in (512, 600, 1024, 1200): 376 with tarfile.open(tmpname, "w:") as tar: 377 t = tarfile.TarInfo("foo") 378 t.size = 1024 379 tar.addfile(t, io.BytesIO(b"a" * 1024)) 380 381 with open(tmpname, "r+b") as fobj: 382 fobj.truncate(size) 383 384 with tarfile.open(tmpname) as tar: 385 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 386 for t in tar: 387 pass 388 389 with tarfile.open(tmpname) as tar: 390 t = tar.next() 391 392 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 393 tar.extract(t, TEMPDIR) 394 395 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 396 tar.extractfile(t).read() 397 398class MiscReadTestBase(CommonReadTest): 399 def requires_name_attribute(self): 400 pass 401 402 def test_no_name_argument(self): 403 self.requires_name_attribute() 404 with open(self.tarname, "rb") as fobj: 405 self.assertIsInstance(fobj.name, str) 406 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 407 self.assertIsInstance(tar.name, str) 408 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 409 410 def test_no_name_attribute(self): 411 with open(self.tarname, "rb") as fobj: 412 data = fobj.read() 413 fobj = io.BytesIO(data) 414 self.assertRaises(AttributeError, getattr, fobj, "name") 415 tar = tarfile.open(fileobj=fobj, mode=self.mode) 416 self.assertIsNone(tar.name) 417 418 def test_empty_name_attribute(self): 419 with open(self.tarname, "rb") as fobj: 420 data = fobj.read() 421 fobj = io.BytesIO(data) 422 fobj.name = "" 423 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 424 self.assertIsNone(tar.name) 425 426 def test_int_name_attribute(self): 427 # Issue 21044: tarfile.open() should handle fileobj with an integer 428 # 'name' attribute. 429 fd = os.open(self.tarname, os.O_RDONLY) 430 with open(fd, 'rb') as fobj: 431 self.assertIsInstance(fobj.name, int) 432 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 433 self.assertIsNone(tar.name) 434 435 def test_bytes_name_attribute(self): 436 self.requires_name_attribute() 437 tarname = os.fsencode(self.tarname) 438 with open(tarname, 'rb') as fobj: 439 self.assertIsInstance(fobj.name, bytes) 440 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 441 self.assertIsInstance(tar.name, bytes) 442 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 443 444 def test_pathlike_name(self): 445 tarname = pathlib.Path(self.tarname) 446 with tarfile.open(tarname, mode=self.mode) as tar: 447 self.assertIsInstance(tar.name, str) 448 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 449 with self.taropen(tarname) as tar: 450 self.assertIsInstance(tar.name, str) 451 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 452 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 453 self.assertIsInstance(tar.name, str) 454 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 455 if self.suffix == '': 456 with tarfile.TarFile(tarname, mode='r') as tar: 457 self.assertIsInstance(tar.name, str) 458 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 459 460 def test_illegal_mode_arg(self): 461 with open(tmpname, 'wb'): 462 pass 463 with self.assertRaisesRegex(ValueError, 'mode must be '): 464 tar = self.taropen(tmpname, 'q') 465 with self.assertRaisesRegex(ValueError, 'mode must be '): 466 tar = self.taropen(tmpname, 'rw') 467 with self.assertRaisesRegex(ValueError, 'mode must be '): 468 tar = self.taropen(tmpname, '') 469 470 def test_fileobj_with_offset(self): 471 # Skip the first member and store values from the second member 472 # of the testtar. 473 tar = tarfile.open(self.tarname, mode=self.mode) 474 try: 475 tar.next() 476 t = tar.next() 477 name = t.name 478 offset = t.offset 479 with tar.extractfile(t) as f: 480 data = f.read() 481 finally: 482 tar.close() 483 484 # Open the testtar and seek to the offset of the second member. 485 with self.open(self.tarname) as fobj: 486 fobj.seek(offset) 487 488 # Test if the tarfile starts with the second member. 489 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 490 t = tar.next() 491 self.assertEqual(t.name, name) 492 # Read to the end of fileobj and test if seeking back to the 493 # beginning works. 494 tar.getmembers() 495 self.assertEqual(tar.extractfile(t).read(), data, 496 "seek back did not work") 497 tar.close() 498 499 def test_fail_comp(self): 500 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 501 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 502 with open(tarname, "rb") as fobj: 503 self.assertRaises(tarfile.ReadError, tarfile.open, 504 fileobj=fobj, mode=self.mode) 505 506 def test_v7_dirtype(self): 507 # Test old style dirtype member (bug #1336623): 508 # Old V7 tars create directory members using an AREGTYPE 509 # header with a "/" appended to the filename field. 510 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 511 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 512 "v7 dirtype failed") 513 514 def test_xstar_type(self): 515 # The xstar format stores extra atime and ctime fields inside the 516 # space reserved for the prefix field. The prefix field must be 517 # ignored in this case, otherwise it will mess up the name. 518 try: 519 self.tar.getmember("misc/regtype-xstar") 520 except KeyError: 521 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 522 523 def test_check_members(self): 524 for tarinfo in self.tar: 525 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 526 "wrong mtime for %s" % tarinfo.name) 527 if not tarinfo.name.startswith("ustar/"): 528 continue 529 self.assertEqual(tarinfo.uname, "tarfile", 530 "wrong uname for %s" % tarinfo.name) 531 532 def test_find_members(self): 533 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 534 "could not find all members") 535 536 @unittest.skipUnless(hasattr(os, "link"), 537 "Missing hardlink implementation") 538 @support.skip_unless_symlink 539 def test_extract_hardlink(self): 540 # Test hardlink extraction (e.g. bug #857297). 541 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 542 tar.extract("ustar/regtype", TEMPDIR) 543 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 544 545 tar.extract("ustar/lnktype", TEMPDIR) 546 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 547 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 548 data = f.read() 549 self.assertEqual(md5sum(data), md5_regtype) 550 551 tar.extract("ustar/symtype", TEMPDIR) 552 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 553 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 554 data = f.read() 555 self.assertEqual(md5sum(data), md5_regtype) 556 557 def test_extractall(self): 558 # Test if extractall() correctly restores directory permissions 559 # and times (see issue1735). 560 tar = tarfile.open(tarname, encoding="iso8859-1") 561 DIR = os.path.join(TEMPDIR, "extractall") 562 os.mkdir(DIR) 563 try: 564 directories = [t for t in tar if t.isdir()] 565 tar.extractall(DIR, directories) 566 for tarinfo in directories: 567 path = os.path.join(DIR, tarinfo.name) 568 if sys.platform != "win32": 569 # Win32 has no support for fine grained permissions. 570 self.assertEqual(tarinfo.mode & 0o777, 571 os.stat(path).st_mode & 0o777) 572 def format_mtime(mtime): 573 if isinstance(mtime, float): 574 return "{} ({})".format(mtime, mtime.hex()) 575 else: 576 return "{!r} (int)".format(mtime) 577 file_mtime = os.path.getmtime(path) 578 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 579 format_mtime(tarinfo.mtime), 580 format_mtime(file_mtime), 581 path) 582 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 583 finally: 584 tar.close() 585 support.rmtree(DIR) 586 587 def test_extract_directory(self): 588 dirtype = "ustar/dirtype" 589 DIR = os.path.join(TEMPDIR, "extractdir") 590 os.mkdir(DIR) 591 try: 592 with tarfile.open(tarname, encoding="iso8859-1") as tar: 593 tarinfo = tar.getmember(dirtype) 594 tar.extract(tarinfo, path=DIR) 595 extracted = os.path.join(DIR, dirtype) 596 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 597 if sys.platform != "win32": 598 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 599 finally: 600 support.rmtree(DIR) 601 602 def test_extractall_pathlike_name(self): 603 DIR = pathlib.Path(TEMPDIR) / "extractall" 604 with support.temp_dir(DIR), \ 605 tarfile.open(tarname, encoding="iso8859-1") as tar: 606 directories = [t for t in tar if t.isdir()] 607 tar.extractall(DIR, directories) 608 for tarinfo in directories: 609 path = DIR / tarinfo.name 610 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 611 612 def test_extract_pathlike_name(self): 613 dirtype = "ustar/dirtype" 614 DIR = pathlib.Path(TEMPDIR) / "extractall" 615 with support.temp_dir(DIR), \ 616 tarfile.open(tarname, encoding="iso8859-1") as tar: 617 tarinfo = tar.getmember(dirtype) 618 tar.extract(tarinfo, path=DIR) 619 extracted = DIR / dirtype 620 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 621 622 def test_init_close_fobj(self): 623 # Issue #7341: Close the internal file object in the TarFile 624 # constructor in case of an error. For the test we rely on 625 # the fact that opening an empty file raises a ReadError. 626 empty = os.path.join(TEMPDIR, "empty") 627 with open(empty, "wb") as fobj: 628 fobj.write(b"") 629 630 try: 631 tar = object.__new__(tarfile.TarFile) 632 try: 633 tar.__init__(empty) 634 except tarfile.ReadError: 635 self.assertTrue(tar.fileobj.closed) 636 else: 637 self.fail("ReadError not raised") 638 finally: 639 support.unlink(empty) 640 641 def test_parallel_iteration(self): 642 # Issue #16601: Restarting iteration over tarfile continued 643 # from where it left off. 644 with tarfile.open(self.tarname) as tar: 645 for m1, m2 in zip(tar, tar): 646 self.assertEqual(m1.offset, m2.offset) 647 self.assertEqual(m1.get_info(), m2.get_info()) 648 649class MiscReadTest(MiscReadTestBase, unittest.TestCase): 650 test_fail_comp = None 651 652class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 653 pass 654 655class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 656 def requires_name_attribute(self): 657 self.skipTest("BZ2File have no name attribute") 658 659class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 660 def requires_name_attribute(self): 661 self.skipTest("LZMAFile have no name attribute") 662 663 664class StreamReadTest(CommonReadTest, unittest.TestCase): 665 666 prefix="r|" 667 668 def test_read_through(self): 669 # Issue #11224: A poorly designed _FileInFile.read() method 670 # caused seeking errors with stream tar files. 671 for tarinfo in self.tar: 672 if not tarinfo.isreg(): 673 continue 674 with self.tar.extractfile(tarinfo) as fobj: 675 while True: 676 try: 677 buf = fobj.read(512) 678 except tarfile.StreamError: 679 self.fail("simple read-through using " 680 "TarFile.extractfile() failed") 681 if not buf: 682 break 683 684 def test_fileobj_regular_file(self): 685 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 686 with self.tar.extractfile(tarinfo) as fobj: 687 data = fobj.read() 688 self.assertEqual(len(data), tarinfo.size, 689 "regular file extraction failed") 690 self.assertEqual(md5sum(data), md5_regtype, 691 "regular file extraction failed") 692 693 def test_provoke_stream_error(self): 694 tarinfos = self.tar.getmembers() 695 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 696 self.assertRaises(tarfile.StreamError, f.read) 697 698 def test_compare_members(self): 699 tar1 = tarfile.open(tarname, encoding="iso8859-1") 700 try: 701 tar2 = self.tar 702 703 while True: 704 t1 = tar1.next() 705 t2 = tar2.next() 706 if t1 is None: 707 break 708 self.assertIsNotNone(t2, "stream.next() failed.") 709 710 if t2.islnk() or t2.issym(): 711 with self.assertRaises(tarfile.StreamError): 712 tar2.extractfile(t2) 713 continue 714 715 v1 = tar1.extractfile(t1) 716 v2 = tar2.extractfile(t2) 717 if v1 is None: 718 continue 719 self.assertIsNotNone(v2, "stream.extractfile() failed") 720 self.assertEqual(v1.read(), v2.read(), 721 "stream extraction failed") 722 finally: 723 tar1.close() 724 725class GzipStreamReadTest(GzipTest, StreamReadTest): 726 pass 727 728class Bz2StreamReadTest(Bz2Test, StreamReadTest): 729 pass 730 731class LzmaStreamReadTest(LzmaTest, StreamReadTest): 732 pass 733 734 735class DetectReadTest(TarTest, unittest.TestCase): 736 def _testfunc_file(self, name, mode): 737 try: 738 tar = tarfile.open(name, mode) 739 except tarfile.ReadError as e: 740 self.fail() 741 else: 742 tar.close() 743 744 def _testfunc_fileobj(self, name, mode): 745 try: 746 with open(name, "rb") as f: 747 tar = tarfile.open(name, mode, fileobj=f) 748 except tarfile.ReadError as e: 749 self.fail() 750 else: 751 tar.close() 752 753 def _test_modes(self, testfunc): 754 if self.suffix: 755 with self.assertRaises(tarfile.ReadError): 756 tarfile.open(tarname, mode="r:" + self.suffix) 757 with self.assertRaises(tarfile.ReadError): 758 tarfile.open(tarname, mode="r|" + self.suffix) 759 with self.assertRaises(tarfile.ReadError): 760 tarfile.open(self.tarname, mode="r:") 761 with self.assertRaises(tarfile.ReadError): 762 tarfile.open(self.tarname, mode="r|") 763 testfunc(self.tarname, "r") 764 testfunc(self.tarname, "r:" + self.suffix) 765 testfunc(self.tarname, "r:*") 766 testfunc(self.tarname, "r|" + self.suffix) 767 testfunc(self.tarname, "r|*") 768 769 def test_detect_file(self): 770 self._test_modes(self._testfunc_file) 771 772 def test_detect_fileobj(self): 773 self._test_modes(self._testfunc_fileobj) 774 775class GzipDetectReadTest(GzipTest, DetectReadTest): 776 pass 777 778class Bz2DetectReadTest(Bz2Test, DetectReadTest): 779 def test_detect_stream_bz2(self): 780 # Originally, tarfile's stream detection looked for the string 781 # "BZh91" at the start of the file. This is incorrect because 782 # the '9' represents the blocksize (900,000 bytes). If the file was 783 # compressed using another blocksize autodetection fails. 784 with open(tarname, "rb") as fobj: 785 data = fobj.read() 786 787 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 788 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 789 fobj.write(data) 790 791 self._testfunc_file(tmpname, "r|*") 792 793class LzmaDetectReadTest(LzmaTest, DetectReadTest): 794 pass 795 796 797class MemberReadTest(ReadTest, unittest.TestCase): 798 799 def _test_member(self, tarinfo, chksum=None, **kwargs): 800 if chksum is not None: 801 with self.tar.extractfile(tarinfo) as f: 802 self.assertEqual(md5sum(f.read()), chksum, 803 "wrong md5sum for %s" % tarinfo.name) 804 805 kwargs["mtime"] = 0o7606136617 806 kwargs["uid"] = 1000 807 kwargs["gid"] = 100 808 if "old-v7" not in tarinfo.name: 809 # V7 tar can't handle alphabetic owners. 810 kwargs["uname"] = "tarfile" 811 kwargs["gname"] = "tarfile" 812 for k, v in kwargs.items(): 813 self.assertEqual(getattr(tarinfo, k), v, 814 "wrong value in %s field of %s" % (k, tarinfo.name)) 815 816 def test_find_regtype(self): 817 tarinfo = self.tar.getmember("ustar/regtype") 818 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 819 820 def test_find_conttype(self): 821 tarinfo = self.tar.getmember("ustar/conttype") 822 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 823 824 def test_find_dirtype(self): 825 tarinfo = self.tar.getmember("ustar/dirtype") 826 self._test_member(tarinfo, size=0) 827 828 def test_find_dirtype_with_size(self): 829 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 830 self._test_member(tarinfo, size=255) 831 832 def test_find_lnktype(self): 833 tarinfo = self.tar.getmember("ustar/lnktype") 834 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 835 836 def test_find_symtype(self): 837 tarinfo = self.tar.getmember("ustar/symtype") 838 self._test_member(tarinfo, size=0, linkname="regtype") 839 840 def test_find_blktype(self): 841 tarinfo = self.tar.getmember("ustar/blktype") 842 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 843 844 def test_find_chrtype(self): 845 tarinfo = self.tar.getmember("ustar/chrtype") 846 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 847 848 def test_find_fifotype(self): 849 tarinfo = self.tar.getmember("ustar/fifotype") 850 self._test_member(tarinfo, size=0) 851 852 def test_find_sparse(self): 853 tarinfo = self.tar.getmember("ustar/sparse") 854 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 855 856 def test_find_gnusparse(self): 857 tarinfo = self.tar.getmember("gnu/sparse") 858 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 859 860 def test_find_gnusparse_00(self): 861 tarinfo = self.tar.getmember("gnu/sparse-0.0") 862 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 863 864 def test_find_gnusparse_01(self): 865 tarinfo = self.tar.getmember("gnu/sparse-0.1") 866 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 867 868 def test_find_gnusparse_10(self): 869 tarinfo = self.tar.getmember("gnu/sparse-1.0") 870 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 871 872 def test_find_umlauts(self): 873 tarinfo = self.tar.getmember("ustar/umlauts-" 874 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 875 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 876 877 def test_find_ustar_longname(self): 878 name = "ustar/" + "12345/" * 39 + "1234567/longname" 879 self.assertIn(name, self.tar.getnames()) 880 881 def test_find_regtype_oldv7(self): 882 tarinfo = self.tar.getmember("misc/regtype-old-v7") 883 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 884 885 def test_find_pax_umlauts(self): 886 self.tar.close() 887 self.tar = tarfile.open(self.tarname, mode=self.mode, 888 encoding="iso8859-1") 889 tarinfo = self.tar.getmember("pax/umlauts-" 890 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 891 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 892 893 894class LongnameTest: 895 896 def test_read_longname(self): 897 # Test reading of longname (bug #1471427). 898 longname = self.subdir + "/" + "123/" * 125 + "longname" 899 try: 900 tarinfo = self.tar.getmember(longname) 901 except KeyError: 902 self.fail("longname not found") 903 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 904 "read longname as dirtype") 905 906 def test_read_longlink(self): 907 longname = self.subdir + "/" + "123/" * 125 + "longname" 908 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 909 try: 910 tarinfo = self.tar.getmember(longlink) 911 except KeyError: 912 self.fail("longlink not found") 913 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 914 915 def test_truncated_longname(self): 916 longname = self.subdir + "/" + "123/" * 125 + "longname" 917 tarinfo = self.tar.getmember(longname) 918 offset = tarinfo.offset 919 self.tar.fileobj.seek(offset) 920 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 921 with self.assertRaises(tarfile.ReadError): 922 tarfile.open(name="foo.tar", fileobj=fobj) 923 924 def test_header_offset(self): 925 # Test if the start offset of the TarInfo object includes 926 # the preceding extended header. 927 longname = self.subdir + "/" + "123/" * 125 + "longname" 928 offset = self.tar.getmember(longname).offset 929 with open(tarname, "rb") as fobj: 930 fobj.seek(offset) 931 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 932 "iso8859-1", "strict") 933 self.assertEqual(tarinfo.type, self.longnametype) 934 935 936class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 937 938 subdir = "gnu" 939 longnametype = tarfile.GNUTYPE_LONGNAME 940 941 # Since 3.2 tarfile is supposed to accurately restore sparse members and 942 # produce files with holes. This is what we actually want to test here. 943 # Unfortunately, not all platforms/filesystems support sparse files, and 944 # even on platforms that do it is non-trivial to make reliable assertions 945 # about holes in files. Therefore, we first do one basic test which works 946 # an all platforms, and after that a test that will work only on 947 # platforms/filesystems that prove to support sparse files. 948 def _test_sparse_file(self, name): 949 self.tar.extract(name, TEMPDIR) 950 filename = os.path.join(TEMPDIR, name) 951 with open(filename, "rb") as fobj: 952 data = fobj.read() 953 self.assertEqual(md5sum(data), md5_sparse, 954 "wrong md5sum for %s" % name) 955 956 if self._fs_supports_holes(): 957 s = os.stat(filename) 958 self.assertLess(s.st_blocks * 512, s.st_size) 959 960 def test_sparse_file_old(self): 961 self._test_sparse_file("gnu/sparse") 962 963 def test_sparse_file_00(self): 964 self._test_sparse_file("gnu/sparse-0.0") 965 966 def test_sparse_file_01(self): 967 self._test_sparse_file("gnu/sparse-0.1") 968 969 def test_sparse_file_10(self): 970 self._test_sparse_file("gnu/sparse-1.0") 971 972 @staticmethod 973 def _fs_supports_holes(): 974 # Return True if the platform knows the st_blocks stat attribute and 975 # uses st_blocks units of 512 bytes, and if the filesystem is able to 976 # store holes of 4 KiB in files. 977 # 978 # The function returns False if page size is larger than 4 KiB. 979 # For example, ppc64 uses pages of 64 KiB. 980 if sys.platform.startswith("linux"): 981 # Linux evidentially has 512 byte st_blocks units. 982 name = os.path.join(TEMPDIR, "sparse-test") 983 with open(name, "wb") as fobj: 984 # Seek to "punch a hole" of 4 KiB 985 fobj.seek(4096) 986 fobj.write(b'x' * 4096) 987 fobj.truncate() 988 s = os.stat(name) 989 support.unlink(name) 990 return (s.st_blocks * 512 < s.st_size) 991 else: 992 return False 993 994 995class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 996 997 subdir = "pax" 998 longnametype = tarfile.XHDTYPE 999 1000 def test_pax_global_headers(self): 1001 tar = tarfile.open(tarname, encoding="iso8859-1") 1002 try: 1003 tarinfo = tar.getmember("pax/regtype1") 1004 self.assertEqual(tarinfo.uname, "foo") 1005 self.assertEqual(tarinfo.gname, "bar") 1006 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1007 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1008 1009 tarinfo = tar.getmember("pax/regtype2") 1010 self.assertEqual(tarinfo.uname, "") 1011 self.assertEqual(tarinfo.gname, "bar") 1012 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1013 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1014 1015 tarinfo = tar.getmember("pax/regtype3") 1016 self.assertEqual(tarinfo.uname, "tarfile") 1017 self.assertEqual(tarinfo.gname, "tarfile") 1018 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1019 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1020 finally: 1021 tar.close() 1022 1023 def test_pax_number_fields(self): 1024 # All following number fields are read from the pax header. 1025 tar = tarfile.open(tarname, encoding="iso8859-1") 1026 try: 1027 tarinfo = tar.getmember("pax/regtype4") 1028 self.assertEqual(tarinfo.size, 7011) 1029 self.assertEqual(tarinfo.uid, 123) 1030 self.assertEqual(tarinfo.gid, 123) 1031 self.assertEqual(tarinfo.mtime, 1041808783.0) 1032 self.assertEqual(type(tarinfo.mtime), float) 1033 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1034 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1035 finally: 1036 tar.close() 1037 1038 1039class WriteTestBase(TarTest): 1040 # Put all write tests in here that are supposed to be tested 1041 # in all possible mode combinations. 1042 1043 def test_fileobj_no_close(self): 1044 fobj = io.BytesIO() 1045 tar = tarfile.open(fileobj=fobj, mode=self.mode) 1046 tar.addfile(tarfile.TarInfo("foo")) 1047 tar.close() 1048 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1049 # Issue #20238: Incomplete gzip output with mode="w:gz" 1050 data = fobj.getvalue() 1051 del tar 1052 support.gc_collect() 1053 self.assertFalse(fobj.closed) 1054 self.assertEqual(data, fobj.getvalue()) 1055 1056 def test_eof_marker(self): 1057 # Make sure an end of archive marker is written (two zero blocks). 1058 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1059 # So, we create an archive that has exactly 10240 bytes without the 1060 # marker, and has 20480 bytes once the marker is written. 1061 with tarfile.open(tmpname, self.mode) as tar: 1062 t = tarfile.TarInfo("foo") 1063 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1064 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1065 1066 with self.open(tmpname, "rb") as fobj: 1067 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1068 1069 1070class WriteTest(WriteTestBase, unittest.TestCase): 1071 1072 prefix = "w:" 1073 1074 def test_100_char_name(self): 1075 # The name field in a tar header stores strings of at most 100 chars. 1076 # If a string is shorter than 100 chars it has to be padded with '\0', 1077 # which implies that a string of exactly 100 chars is stored without 1078 # a trailing '\0'. 1079 name = "0123456789" * 10 1080 tar = tarfile.open(tmpname, self.mode) 1081 try: 1082 t = tarfile.TarInfo(name) 1083 tar.addfile(t) 1084 finally: 1085 tar.close() 1086 1087 tar = tarfile.open(tmpname) 1088 try: 1089 self.assertEqual(tar.getnames()[0], name, 1090 "failed to store 100 char filename") 1091 finally: 1092 tar.close() 1093 1094 def test_tar_size(self): 1095 # Test for bug #1013882. 1096 tar = tarfile.open(tmpname, self.mode) 1097 try: 1098 path = os.path.join(TEMPDIR, "file") 1099 with open(path, "wb") as fobj: 1100 fobj.write(b"aaa") 1101 tar.add(path) 1102 finally: 1103 tar.close() 1104 self.assertGreater(os.path.getsize(tmpname), 0, 1105 "tarfile is empty") 1106 1107 # The test_*_size tests test for bug #1167128. 1108 def test_file_size(self): 1109 tar = tarfile.open(tmpname, self.mode) 1110 try: 1111 path = os.path.join(TEMPDIR, "file") 1112 with open(path, "wb"): 1113 pass 1114 tarinfo = tar.gettarinfo(path) 1115 self.assertEqual(tarinfo.size, 0) 1116 1117 with open(path, "wb") as fobj: 1118 fobj.write(b"aaa") 1119 tarinfo = tar.gettarinfo(path) 1120 self.assertEqual(tarinfo.size, 3) 1121 finally: 1122 tar.close() 1123 1124 def test_directory_size(self): 1125 path = os.path.join(TEMPDIR, "directory") 1126 os.mkdir(path) 1127 try: 1128 tar = tarfile.open(tmpname, self.mode) 1129 try: 1130 tarinfo = tar.gettarinfo(path) 1131 self.assertEqual(tarinfo.size, 0) 1132 finally: 1133 tar.close() 1134 finally: 1135 support.rmdir(path) 1136 1137 # mock the following: 1138 # os.listdir: so we know that files are in the wrong order 1139 def test_ordered_recursion(self): 1140 path = os.path.join(TEMPDIR, "directory") 1141 os.mkdir(path) 1142 open(os.path.join(path, "1"), "a").close() 1143 open(os.path.join(path, "2"), "a").close() 1144 try: 1145 tar = tarfile.open(tmpname, self.mode) 1146 try: 1147 with unittest.mock.patch('os.listdir') as mock_listdir: 1148 mock_listdir.return_value = ["2", "1"] 1149 tar.add(path) 1150 paths = [] 1151 for m in tar.getmembers(): 1152 paths.append(os.path.split(m.name)[-1]) 1153 self.assertEqual(paths, ["directory", "1", "2"]); 1154 finally: 1155 tar.close() 1156 finally: 1157 support.unlink(os.path.join(path, "1")) 1158 support.unlink(os.path.join(path, "2")) 1159 support.rmdir(path) 1160 1161 def test_gettarinfo_pathlike_name(self): 1162 with tarfile.open(tmpname, self.mode) as tar: 1163 path = pathlib.Path(TEMPDIR) / "file" 1164 with open(path, "wb") as fobj: 1165 fobj.write(b"aaa") 1166 tarinfo = tar.gettarinfo(path) 1167 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1168 self.assertIsInstance(tarinfo.name, str) 1169 self.assertEqual(tarinfo.name, tarinfo2.name) 1170 self.assertEqual(tarinfo.size, 3) 1171 1172 @unittest.skipUnless(hasattr(os, "link"), 1173 "Missing hardlink implementation") 1174 def test_link_size(self): 1175 link = os.path.join(TEMPDIR, "link") 1176 target = os.path.join(TEMPDIR, "link_target") 1177 with open(target, "wb") as fobj: 1178 fobj.write(b"aaa") 1179 try: 1180 os.link(target, link) 1181 except PermissionError as e: 1182 self.skipTest('os.link(): %s' % e) 1183 try: 1184 tar = tarfile.open(tmpname, self.mode) 1185 try: 1186 # Record the link target in the inodes list. 1187 tar.gettarinfo(target) 1188 tarinfo = tar.gettarinfo(link) 1189 self.assertEqual(tarinfo.size, 0) 1190 finally: 1191 tar.close() 1192 finally: 1193 support.unlink(target) 1194 support.unlink(link) 1195 1196 @support.skip_unless_symlink 1197 def test_symlink_size(self): 1198 path = os.path.join(TEMPDIR, "symlink") 1199 os.symlink("link_target", path) 1200 try: 1201 tar = tarfile.open(tmpname, self.mode) 1202 try: 1203 tarinfo = tar.gettarinfo(path) 1204 self.assertEqual(tarinfo.size, 0) 1205 finally: 1206 tar.close() 1207 finally: 1208 support.unlink(path) 1209 1210 def test_add_self(self): 1211 # Test for #1257255. 1212 dstname = os.path.abspath(tmpname) 1213 tar = tarfile.open(tmpname, self.mode) 1214 try: 1215 self.assertEqual(tar.name, dstname, 1216 "archive name must be absolute") 1217 tar.add(dstname) 1218 self.assertEqual(tar.getnames(), [], 1219 "added the archive to itself") 1220 1221 with support.change_cwd(TEMPDIR): 1222 tar.add(dstname) 1223 self.assertEqual(tar.getnames(), [], 1224 "added the archive to itself") 1225 finally: 1226 tar.close() 1227 1228 def test_filter(self): 1229 tempdir = os.path.join(TEMPDIR, "filter") 1230 os.mkdir(tempdir) 1231 try: 1232 for name in ("foo", "bar", "baz"): 1233 name = os.path.join(tempdir, name) 1234 support.create_empty_file(name) 1235 1236 def filter(tarinfo): 1237 if os.path.basename(tarinfo.name) == "bar": 1238 return 1239 tarinfo.uid = 123 1240 tarinfo.uname = "foo" 1241 return tarinfo 1242 1243 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1244 try: 1245 tar.add(tempdir, arcname="empty_dir", filter=filter) 1246 finally: 1247 tar.close() 1248 1249 # Verify that filter is a keyword-only argument 1250 with self.assertRaises(TypeError): 1251 tar.add(tempdir, "empty_dir", True, None, filter) 1252 1253 tar = tarfile.open(tmpname, "r") 1254 try: 1255 for tarinfo in tar: 1256 self.assertEqual(tarinfo.uid, 123) 1257 self.assertEqual(tarinfo.uname, "foo") 1258 self.assertEqual(len(tar.getmembers()), 3) 1259 finally: 1260 tar.close() 1261 finally: 1262 support.rmtree(tempdir) 1263 1264 # Guarantee that stored pathnames are not modified. Don't 1265 # remove ./ or ../ or double slashes. Still make absolute 1266 # pathnames relative. 1267 # For details see bug #6054. 1268 def _test_pathname(self, path, cmp_path=None, dir=False): 1269 # Create a tarfile with an empty member named path 1270 # and compare the stored name with the original. 1271 foo = os.path.join(TEMPDIR, "foo") 1272 if not dir: 1273 support.create_empty_file(foo) 1274 else: 1275 os.mkdir(foo) 1276 1277 tar = tarfile.open(tmpname, self.mode) 1278 try: 1279 tar.add(foo, arcname=path) 1280 finally: 1281 tar.close() 1282 1283 tar = tarfile.open(tmpname, "r") 1284 try: 1285 t = tar.next() 1286 finally: 1287 tar.close() 1288 1289 if not dir: 1290 support.unlink(foo) 1291 else: 1292 support.rmdir(foo) 1293 1294 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1295 1296 1297 @support.skip_unless_symlink 1298 def test_extractall_symlinks(self): 1299 # Test if extractall works properly when tarfile contains symlinks 1300 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1301 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1302 os.mkdir(tempdir) 1303 try: 1304 source_file = os.path.join(tempdir,'source') 1305 target_file = os.path.join(tempdir,'symlink') 1306 with open(source_file,'w') as f: 1307 f.write('something\n') 1308 os.symlink(source_file, target_file) 1309 tar = tarfile.open(temparchive,'w') 1310 tar.add(source_file) 1311 tar.add(target_file) 1312 tar.close() 1313 # Let's extract it to the location which contains the symlink 1314 tar = tarfile.open(temparchive,'r') 1315 # this should not raise OSError: [Errno 17] File exists 1316 try: 1317 tar.extractall(path=tempdir) 1318 except OSError: 1319 self.fail("extractall failed with symlinked files") 1320 finally: 1321 tar.close() 1322 finally: 1323 support.unlink(temparchive) 1324 support.rmtree(tempdir) 1325 1326 def test_pathnames(self): 1327 self._test_pathname("foo") 1328 self._test_pathname(os.path.join("foo", ".", "bar")) 1329 self._test_pathname(os.path.join("foo", "..", "bar")) 1330 self._test_pathname(os.path.join(".", "foo")) 1331 self._test_pathname(os.path.join(".", "foo", ".")) 1332 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1333 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1334 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1335 self._test_pathname(os.path.join("..", "foo")) 1336 self._test_pathname(os.path.join("..", "foo", "..")) 1337 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1338 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1339 1340 self._test_pathname("foo" + os.sep + os.sep + "bar") 1341 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1342 1343 def test_abs_pathnames(self): 1344 if sys.platform == "win32": 1345 self._test_pathname("C:\\foo", "foo") 1346 else: 1347 self._test_pathname("/foo", "foo") 1348 self._test_pathname("///foo", "foo") 1349 1350 def test_cwd(self): 1351 # Test adding the current working directory. 1352 with support.change_cwd(TEMPDIR): 1353 tar = tarfile.open(tmpname, self.mode) 1354 try: 1355 tar.add(".") 1356 finally: 1357 tar.close() 1358 1359 tar = tarfile.open(tmpname, "r") 1360 try: 1361 for t in tar: 1362 if t.name != ".": 1363 self.assertTrue(t.name.startswith("./"), t.name) 1364 finally: 1365 tar.close() 1366 1367 def test_open_nonwritable_fileobj(self): 1368 for exctype in OSError, EOFError, RuntimeError: 1369 class BadFile(io.BytesIO): 1370 first = True 1371 def write(self, data): 1372 if self.first: 1373 self.first = False 1374 raise exctype 1375 1376 f = BadFile() 1377 with self.assertRaises(exctype): 1378 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1379 format=tarfile.PAX_FORMAT, 1380 pax_headers={'non': 'empty'}) 1381 self.assertFalse(f.closed) 1382 1383class GzipWriteTest(GzipTest, WriteTest): 1384 pass 1385 1386class Bz2WriteTest(Bz2Test, WriteTest): 1387 pass 1388 1389class LzmaWriteTest(LzmaTest, WriteTest): 1390 pass 1391 1392 1393class StreamWriteTest(WriteTestBase, unittest.TestCase): 1394 1395 prefix = "w|" 1396 decompressor = None 1397 1398 def test_stream_padding(self): 1399 # Test for bug #1543303. 1400 tar = tarfile.open(tmpname, self.mode) 1401 tar.close() 1402 if self.decompressor: 1403 dec = self.decompressor() 1404 with open(tmpname, "rb") as fobj: 1405 data = fobj.read() 1406 data = dec.decompress(data) 1407 self.assertFalse(dec.unused_data, "found trailing data") 1408 else: 1409 with self.open(tmpname) as fobj: 1410 data = fobj.read() 1411 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1412 "incorrect zero padding") 1413 1414 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1415 "Missing umask implementation") 1416 def test_file_mode(self): 1417 # Test for issue #8464: Create files with correct 1418 # permissions. 1419 if os.path.exists(tmpname): 1420 support.unlink(tmpname) 1421 1422 original_umask = os.umask(0o022) 1423 try: 1424 tar = tarfile.open(tmpname, self.mode) 1425 tar.close() 1426 mode = os.stat(tmpname).st_mode & 0o777 1427 self.assertEqual(mode, 0o644, "wrong file permissions") 1428 finally: 1429 os.umask(original_umask) 1430 1431class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1432 pass 1433 1434class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1435 decompressor = bz2.BZ2Decompressor if bz2 else None 1436 1437class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1438 decompressor = lzma.LZMADecompressor if lzma else None 1439 1440 1441class GNUWriteTest(unittest.TestCase): 1442 # This testcase checks for correct creation of GNU Longname 1443 # and Longlink extended headers (cp. bug #812325). 1444 1445 def _length(self, s): 1446 blocks = len(s) // 512 + 1 1447 return blocks * 512 1448 1449 def _calc_size(self, name, link=None): 1450 # Initial tar header 1451 count = 512 1452 1453 if len(name) > tarfile.LENGTH_NAME: 1454 # GNU longname extended header + longname 1455 count += 512 1456 count += self._length(name) 1457 if link is not None and len(link) > tarfile.LENGTH_LINK: 1458 # GNU longlink extended header + longlink 1459 count += 512 1460 count += self._length(link) 1461 return count 1462 1463 def _test(self, name, link=None): 1464 tarinfo = tarfile.TarInfo(name) 1465 if link: 1466 tarinfo.linkname = link 1467 tarinfo.type = tarfile.LNKTYPE 1468 1469 tar = tarfile.open(tmpname, "w") 1470 try: 1471 tar.format = tarfile.GNU_FORMAT 1472 tar.addfile(tarinfo) 1473 1474 v1 = self._calc_size(name, link) 1475 v2 = tar.offset 1476 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1477 finally: 1478 tar.close() 1479 1480 tar = tarfile.open(tmpname) 1481 try: 1482 member = tar.next() 1483 self.assertIsNotNone(member, 1484 "unable to read longname member") 1485 self.assertEqual(tarinfo.name, member.name, 1486 "unable to read longname member") 1487 self.assertEqual(tarinfo.linkname, member.linkname, 1488 "unable to read longname member") 1489 finally: 1490 tar.close() 1491 1492 def test_longname_1023(self): 1493 self._test(("longnam/" * 127) + "longnam") 1494 1495 def test_longname_1024(self): 1496 self._test(("longnam/" * 127) + "longname") 1497 1498 def test_longname_1025(self): 1499 self._test(("longnam/" * 127) + "longname_") 1500 1501 def test_longlink_1023(self): 1502 self._test("name", ("longlnk/" * 127) + "longlnk") 1503 1504 def test_longlink_1024(self): 1505 self._test("name", ("longlnk/" * 127) + "longlink") 1506 1507 def test_longlink_1025(self): 1508 self._test("name", ("longlnk/" * 127) + "longlink_") 1509 1510 def test_longnamelink_1023(self): 1511 self._test(("longnam/" * 127) + "longnam", 1512 ("longlnk/" * 127) + "longlnk") 1513 1514 def test_longnamelink_1024(self): 1515 self._test(("longnam/" * 127) + "longname", 1516 ("longlnk/" * 127) + "longlink") 1517 1518 def test_longnamelink_1025(self): 1519 self._test(("longnam/" * 127) + "longname_", 1520 ("longlnk/" * 127) + "longlink_") 1521 1522 1523class CreateTest(WriteTestBase, unittest.TestCase): 1524 1525 prefix = "x:" 1526 1527 file_path = os.path.join(TEMPDIR, "spameggs42") 1528 1529 def setUp(self): 1530 support.unlink(tmpname) 1531 1532 @classmethod 1533 def setUpClass(cls): 1534 with open(cls.file_path, "wb") as fobj: 1535 fobj.write(b"aaa") 1536 1537 @classmethod 1538 def tearDownClass(cls): 1539 support.unlink(cls.file_path) 1540 1541 def test_create(self): 1542 with tarfile.open(tmpname, self.mode) as tobj: 1543 tobj.add(self.file_path) 1544 1545 with self.taropen(tmpname) as tobj: 1546 names = tobj.getnames() 1547 self.assertEqual(len(names), 1) 1548 self.assertIn('spameggs42', names[0]) 1549 1550 def test_create_existing(self): 1551 with tarfile.open(tmpname, self.mode) as tobj: 1552 tobj.add(self.file_path) 1553 1554 with self.assertRaises(FileExistsError): 1555 tobj = tarfile.open(tmpname, self.mode) 1556 1557 with self.taropen(tmpname) as tobj: 1558 names = tobj.getnames() 1559 self.assertEqual(len(names), 1) 1560 self.assertIn('spameggs42', names[0]) 1561 1562 def test_create_taropen(self): 1563 with self.taropen(tmpname, "x") as tobj: 1564 tobj.add(self.file_path) 1565 1566 with self.taropen(tmpname) as tobj: 1567 names = tobj.getnames() 1568 self.assertEqual(len(names), 1) 1569 self.assertIn('spameggs42', names[0]) 1570 1571 def test_create_existing_taropen(self): 1572 with self.taropen(tmpname, "x") as tobj: 1573 tobj.add(self.file_path) 1574 1575 with self.assertRaises(FileExistsError): 1576 with self.taropen(tmpname, "x"): 1577 pass 1578 1579 with self.taropen(tmpname) as tobj: 1580 names = tobj.getnames() 1581 self.assertEqual(len(names), 1) 1582 self.assertIn("spameggs42", names[0]) 1583 1584 def test_create_pathlike_name(self): 1585 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1586 self.assertIsInstance(tobj.name, str) 1587 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1588 tobj.add(pathlib.Path(self.file_path)) 1589 names = tobj.getnames() 1590 self.assertEqual(len(names), 1) 1591 self.assertIn('spameggs42', names[0]) 1592 1593 with self.taropen(tmpname) as tobj: 1594 names = tobj.getnames() 1595 self.assertEqual(len(names), 1) 1596 self.assertIn('spameggs42', names[0]) 1597 1598 def test_create_taropen_pathlike_name(self): 1599 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1600 self.assertIsInstance(tobj.name, str) 1601 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1602 tobj.add(pathlib.Path(self.file_path)) 1603 names = tobj.getnames() 1604 self.assertEqual(len(names), 1) 1605 self.assertIn('spameggs42', names[0]) 1606 1607 with self.taropen(tmpname) as tobj: 1608 names = tobj.getnames() 1609 self.assertEqual(len(names), 1) 1610 self.assertIn('spameggs42', names[0]) 1611 1612 1613class GzipCreateTest(GzipTest, CreateTest): 1614 pass 1615 1616 1617class Bz2CreateTest(Bz2Test, CreateTest): 1618 pass 1619 1620 1621class LzmaCreateTest(LzmaTest, CreateTest): 1622 pass 1623 1624 1625class CreateWithXModeTest(CreateTest): 1626 1627 prefix = "x" 1628 1629 test_create_taropen = None 1630 test_create_existing_taropen = None 1631 1632 1633@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1634class HardlinkTest(unittest.TestCase): 1635 # Test the creation of LNKTYPE (hardlink) members in an archive. 1636 1637 def setUp(self): 1638 self.foo = os.path.join(TEMPDIR, "foo") 1639 self.bar = os.path.join(TEMPDIR, "bar") 1640 1641 with open(self.foo, "wb") as fobj: 1642 fobj.write(b"foo") 1643 1644 try: 1645 os.link(self.foo, self.bar) 1646 except PermissionError as e: 1647 self.skipTest('os.link(): %s' % e) 1648 1649 self.tar = tarfile.open(tmpname, "w") 1650 self.tar.add(self.foo) 1651 1652 def tearDown(self): 1653 self.tar.close() 1654 support.unlink(self.foo) 1655 support.unlink(self.bar) 1656 1657 def test_add_twice(self): 1658 # The same name will be added as a REGTYPE every 1659 # time regardless of st_nlink. 1660 tarinfo = self.tar.gettarinfo(self.foo) 1661 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1662 "add file as regular failed") 1663 1664 def test_add_hardlink(self): 1665 tarinfo = self.tar.gettarinfo(self.bar) 1666 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1667 "add file as hardlink failed") 1668 1669 def test_dereference_hardlink(self): 1670 self.tar.dereference = True 1671 tarinfo = self.tar.gettarinfo(self.bar) 1672 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1673 "dereferencing hardlink failed") 1674 1675 1676class PaxWriteTest(GNUWriteTest): 1677 1678 def _test(self, name, link=None): 1679 # See GNUWriteTest. 1680 tarinfo = tarfile.TarInfo(name) 1681 if link: 1682 tarinfo.linkname = link 1683 tarinfo.type = tarfile.LNKTYPE 1684 1685 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1686 try: 1687 tar.addfile(tarinfo) 1688 finally: 1689 tar.close() 1690 1691 tar = tarfile.open(tmpname) 1692 try: 1693 if link: 1694 l = tar.getmembers()[0].linkname 1695 self.assertEqual(link, l, "PAX longlink creation failed") 1696 else: 1697 n = tar.getmembers()[0].name 1698 self.assertEqual(name, n, "PAX longname creation failed") 1699 finally: 1700 tar.close() 1701 1702 def test_pax_global_header(self): 1703 pax_headers = { 1704 "foo": "bar", 1705 "uid": "0", 1706 "mtime": "1.23", 1707 "test": "\xe4\xf6\xfc", 1708 "\xe4\xf6\xfc": "test"} 1709 1710 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1711 pax_headers=pax_headers) 1712 try: 1713 tar.addfile(tarfile.TarInfo("test")) 1714 finally: 1715 tar.close() 1716 1717 # Test if the global header was written correctly. 1718 tar = tarfile.open(tmpname, encoding="iso8859-1") 1719 try: 1720 self.assertEqual(tar.pax_headers, pax_headers) 1721 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1722 # Test if all the fields are strings. 1723 for key, val in tar.pax_headers.items(): 1724 self.assertIsNot(type(key), bytes) 1725 self.assertIsNot(type(val), bytes) 1726 if key in tarfile.PAX_NUMBER_FIELDS: 1727 try: 1728 tarfile.PAX_NUMBER_FIELDS[key](val) 1729 except (TypeError, ValueError): 1730 self.fail("unable to convert pax header field") 1731 finally: 1732 tar.close() 1733 1734 def test_pax_extended_header(self): 1735 # The fields from the pax header have priority over the 1736 # TarInfo. 1737 pax_headers = {"path": "foo", "uid": "123"} 1738 1739 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1740 encoding="iso8859-1") 1741 try: 1742 t = tarfile.TarInfo() 1743 t.name = "\xe4\xf6\xfc" # non-ASCII 1744 t.uid = 8**8 # too large 1745 t.pax_headers = pax_headers 1746 tar.addfile(t) 1747 finally: 1748 tar.close() 1749 1750 tar = tarfile.open(tmpname, encoding="iso8859-1") 1751 try: 1752 t = tar.getmembers()[0] 1753 self.assertEqual(t.pax_headers, pax_headers) 1754 self.assertEqual(t.name, "foo") 1755 self.assertEqual(t.uid, 123) 1756 finally: 1757 tar.close() 1758 1759 1760class UnicodeTest: 1761 1762 def test_iso8859_1_filename(self): 1763 self._test_unicode_filename("iso8859-1") 1764 1765 def test_utf7_filename(self): 1766 self._test_unicode_filename("utf7") 1767 1768 def test_utf8_filename(self): 1769 self._test_unicode_filename("utf-8") 1770 1771 def _test_unicode_filename(self, encoding): 1772 tar = tarfile.open(tmpname, "w", format=self.format, 1773 encoding=encoding, errors="strict") 1774 try: 1775 name = "\xe4\xf6\xfc" 1776 tar.addfile(tarfile.TarInfo(name)) 1777 finally: 1778 tar.close() 1779 1780 tar = tarfile.open(tmpname, encoding=encoding) 1781 try: 1782 self.assertEqual(tar.getmembers()[0].name, name) 1783 finally: 1784 tar.close() 1785 1786 def test_unicode_filename_error(self): 1787 tar = tarfile.open(tmpname, "w", format=self.format, 1788 encoding="ascii", errors="strict") 1789 try: 1790 tarinfo = tarfile.TarInfo() 1791 1792 tarinfo.name = "\xe4\xf6\xfc" 1793 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1794 1795 tarinfo.name = "foo" 1796 tarinfo.uname = "\xe4\xf6\xfc" 1797 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1798 finally: 1799 tar.close() 1800 1801 def test_unicode_argument(self): 1802 tar = tarfile.open(tarname, "r", 1803 encoding="iso8859-1", errors="strict") 1804 try: 1805 for t in tar: 1806 self.assertIs(type(t.name), str) 1807 self.assertIs(type(t.linkname), str) 1808 self.assertIs(type(t.uname), str) 1809 self.assertIs(type(t.gname), str) 1810 finally: 1811 tar.close() 1812 1813 def test_uname_unicode(self): 1814 t = tarfile.TarInfo("foo") 1815 t.uname = "\xe4\xf6\xfc" 1816 t.gname = "\xe4\xf6\xfc" 1817 1818 tar = tarfile.open(tmpname, mode="w", format=self.format, 1819 encoding="iso8859-1") 1820 try: 1821 tar.addfile(t) 1822 finally: 1823 tar.close() 1824 1825 tar = tarfile.open(tmpname, encoding="iso8859-1") 1826 try: 1827 t = tar.getmember("foo") 1828 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1829 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1830 1831 if self.format != tarfile.PAX_FORMAT: 1832 tar.close() 1833 tar = tarfile.open(tmpname, encoding="ascii") 1834 t = tar.getmember("foo") 1835 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1836 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1837 finally: 1838 tar.close() 1839 1840 1841class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1842 1843 format = tarfile.USTAR_FORMAT 1844 1845 # Test whether the utf-8 encoded version of a filename exceeds the 100 1846 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1847 # bytes). 1848 def test_unicode_name1(self): 1849 self._test_ustar_name("0123456789" * 10) 1850 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1851 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1852 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1853 1854 def test_unicode_name2(self): 1855 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1856 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1857 1858 # Test whether the utf-8 encoded version of a filename exceeds the 155 1859 # bytes prefix + '/' + 100 bytes name limit. 1860 def test_unicode_longname1(self): 1861 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1862 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1863 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1864 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1865 1866 def test_unicode_longname2(self): 1867 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1868 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1869 1870 def test_unicode_longname3(self): 1871 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1872 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1873 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1874 1875 def test_unicode_longname4(self): 1876 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1877 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1878 1879 def _test_ustar_name(self, name, exc=None): 1880 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1881 t = tarfile.TarInfo(name) 1882 if exc is None: 1883 tar.addfile(t) 1884 else: 1885 self.assertRaises(exc, tar.addfile, t) 1886 1887 if exc is None: 1888 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1889 for t in tar: 1890 self.assertEqual(name, t.name) 1891 break 1892 1893 # Test the same as above for the 100 bytes link field. 1894 def test_unicode_link1(self): 1895 self._test_ustar_link("0123456789" * 10) 1896 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1897 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1898 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1899 1900 def test_unicode_link2(self): 1901 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1902 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1903 1904 def _test_ustar_link(self, name, exc=None): 1905 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1906 t = tarfile.TarInfo("foo") 1907 t.linkname = name 1908 if exc is None: 1909 tar.addfile(t) 1910 else: 1911 self.assertRaises(exc, tar.addfile, t) 1912 1913 if exc is None: 1914 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1915 for t in tar: 1916 self.assertEqual(name, t.linkname) 1917 break 1918 1919 1920class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 1921 1922 format = tarfile.GNU_FORMAT 1923 1924 def test_bad_pax_header(self): 1925 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1926 # without a hdrcharset=BINARY header. 1927 for encoding, name in ( 1928 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1929 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1930 with tarfile.open(tarname, encoding=encoding, 1931 errors="surrogateescape") as tar: 1932 try: 1933 t = tar.getmember(name) 1934 except KeyError: 1935 self.fail("unable to read bad GNU tar pax header") 1936 1937 1938class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 1939 1940 format = tarfile.PAX_FORMAT 1941 1942 # PAX_FORMAT ignores encoding in write mode. 1943 test_unicode_filename_error = None 1944 1945 def test_binary_header(self): 1946 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1947 for encoding, name in ( 1948 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1949 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1950 with tarfile.open(tarname, encoding=encoding, 1951 errors="surrogateescape") as tar: 1952 try: 1953 t = tar.getmember(name) 1954 except KeyError: 1955 self.fail("unable to read POSIX.1-2008 binary header") 1956 1957 1958class AppendTestBase: 1959 # Test append mode (cp. patch #1652681). 1960 1961 def setUp(self): 1962 self.tarname = tmpname 1963 if os.path.exists(self.tarname): 1964 support.unlink(self.tarname) 1965 1966 def _create_testtar(self, mode="w:"): 1967 with tarfile.open(tarname, encoding="iso8859-1") as src: 1968 t = src.getmember("ustar/regtype") 1969 t.name = "foo" 1970 with src.extractfile(t) as f: 1971 with tarfile.open(self.tarname, mode) as tar: 1972 tar.addfile(t, f) 1973 1974 def test_append_compressed(self): 1975 self._create_testtar("w:" + self.suffix) 1976 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1977 1978class AppendTest(AppendTestBase, unittest.TestCase): 1979 test_append_compressed = None 1980 1981 def _add_testfile(self, fileobj=None): 1982 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1983 tar.addfile(tarfile.TarInfo("bar")) 1984 1985 def _test(self, names=["bar"], fileobj=None): 1986 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1987 self.assertEqual(tar.getnames(), names) 1988 1989 def test_non_existing(self): 1990 self._add_testfile() 1991 self._test() 1992 1993 def test_empty(self): 1994 tarfile.open(self.tarname, "w:").close() 1995 self._add_testfile() 1996 self._test() 1997 1998 def test_empty_fileobj(self): 1999 fobj = io.BytesIO(b"\0" * 1024) 2000 self._add_testfile(fobj) 2001 fobj.seek(0) 2002 self._test(fileobj=fobj) 2003 2004 def test_fileobj(self): 2005 self._create_testtar() 2006 with open(self.tarname, "rb") as fobj: 2007 data = fobj.read() 2008 fobj = io.BytesIO(data) 2009 self._add_testfile(fobj) 2010 fobj.seek(0) 2011 self._test(names=["foo", "bar"], fileobj=fobj) 2012 2013 def test_existing(self): 2014 self._create_testtar() 2015 self._add_testfile() 2016 self._test(names=["foo", "bar"]) 2017 2018 # Append mode is supposed to fail if the tarfile to append to 2019 # does not end with a zero block. 2020 def _test_error(self, data): 2021 with open(self.tarname, "wb") as fobj: 2022 fobj.write(data) 2023 self.assertRaises(tarfile.ReadError, self._add_testfile) 2024 2025 def test_null(self): 2026 self._test_error(b"") 2027 2028 def test_incomplete(self): 2029 self._test_error(b"\0" * 13) 2030 2031 def test_premature_eof(self): 2032 data = tarfile.TarInfo("foo").tobuf() 2033 self._test_error(data) 2034 2035 def test_trailing_garbage(self): 2036 data = tarfile.TarInfo("foo").tobuf() 2037 self._test_error(data + b"\0" * 13) 2038 2039 def test_invalid(self): 2040 self._test_error(b"a" * 512) 2041 2042class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2043 pass 2044 2045class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2046 pass 2047 2048class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2049 pass 2050 2051 2052class LimitsTest(unittest.TestCase): 2053 2054 def test_ustar_limits(self): 2055 # 100 char name 2056 tarinfo = tarfile.TarInfo("0123456789" * 10) 2057 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2058 2059 # 101 char name that cannot be stored 2060 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2061 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2062 2063 # 256 char name with a slash at pos 156 2064 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2065 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2066 2067 # 256 char name that cannot be stored 2068 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2069 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2070 2071 # 512 char name 2072 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2073 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2074 2075 # 512 char linkname 2076 tarinfo = tarfile.TarInfo("longlink") 2077 tarinfo.linkname = "123/" * 126 + "longname" 2078 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2079 2080 # uid > 8 digits 2081 tarinfo = tarfile.TarInfo("name") 2082 tarinfo.uid = 0o10000000 2083 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2084 2085 def test_gnu_limits(self): 2086 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2087 tarinfo.tobuf(tarfile.GNU_FORMAT) 2088 2089 tarinfo = tarfile.TarInfo("longlink") 2090 tarinfo.linkname = "123/" * 126 + "longname" 2091 tarinfo.tobuf(tarfile.GNU_FORMAT) 2092 2093 # uid >= 256 ** 7 2094 tarinfo = tarfile.TarInfo("name") 2095 tarinfo.uid = 0o4000000000000000000 2096 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2097 2098 def test_pax_limits(self): 2099 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2100 tarinfo.tobuf(tarfile.PAX_FORMAT) 2101 2102 tarinfo = tarfile.TarInfo("longlink") 2103 tarinfo.linkname = "123/" * 126 + "longname" 2104 tarinfo.tobuf(tarfile.PAX_FORMAT) 2105 2106 tarinfo = tarfile.TarInfo("name") 2107 tarinfo.uid = 0o4000000000000000000 2108 tarinfo.tobuf(tarfile.PAX_FORMAT) 2109 2110 2111class MiscTest(unittest.TestCase): 2112 2113 def test_char_fields(self): 2114 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2115 b"foo\0\0\0\0\0") 2116 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2117 b"foo") 2118 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2119 "foo") 2120 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2121 "foo") 2122 2123 def test_read_number_fields(self): 2124 # Issue 13158: Test if GNU tar specific base-256 number fields 2125 # are decoded correctly. 2126 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2127 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2128 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2129 0o10000000) 2130 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2131 0xffffffff) 2132 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2133 -1) 2134 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2135 -100) 2136 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2137 -0x100000000000000) 2138 2139 # Issue 24514: Test if empty number fields are converted to zero. 2140 self.assertEqual(tarfile.nti(b"\0"), 0) 2141 self.assertEqual(tarfile.nti(b" \0"), 0) 2142 2143 def test_write_number_fields(self): 2144 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2145 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2146 self.assertEqual(tarfile.itn(0o10000000), 2147 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2148 self.assertEqual(tarfile.itn(0xffffffff), 2149 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2150 self.assertEqual(tarfile.itn(-1), 2151 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2152 self.assertEqual(tarfile.itn(-100), 2153 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2154 self.assertEqual(tarfile.itn(-0x100000000000000), 2155 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2156 2157 # Issue 32713: Test if itn() supports float values outside the 2158 # non-GNU format range 2159 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2160 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2161 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2162 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2163 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2164 2165 def test_number_field_limits(self): 2166 with self.assertRaises(ValueError): 2167 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2168 with self.assertRaises(ValueError): 2169 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2170 with self.assertRaises(ValueError): 2171 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2172 with self.assertRaises(ValueError): 2173 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2174 2175 def test__all__(self): 2176 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2177 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2178 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2179 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2180 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2181 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2182 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2183 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2184 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2185 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2186 'filemode', 2187 'EmptyHeaderError', 'TruncatedHeaderError', 2188 'EOFHeaderError', 'InvalidHeaderError', 2189 'SubsequentHeaderError', 'ExFileObject', 2190 'main'} 2191 support.check__all__(self, tarfile, blacklist=blacklist) 2192 2193 2194class CommandLineTest(unittest.TestCase): 2195 2196 def tarfilecmd(self, *args, **kwargs): 2197 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2198 **kwargs) 2199 return out.replace(os.linesep.encode(), b'\n') 2200 2201 def tarfilecmd_failure(self, *args): 2202 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2203 2204 def make_simple_tarfile(self, tar_name): 2205 files = [support.findfile('tokenize_tests.txt'), 2206 support.findfile('tokenize_tests-no-coding-cookie-' 2207 'and-utf8-bom-sig-only.txt')] 2208 self.addCleanup(support.unlink, tar_name) 2209 with tarfile.open(tar_name, 'w') as tf: 2210 for tardata in files: 2211 tf.add(tardata, arcname=os.path.basename(tardata)) 2212 2213 def test_bad_use(self): 2214 rc, out, err = self.tarfilecmd_failure() 2215 self.assertEqual(out, b'') 2216 self.assertIn(b'usage', err.lower()) 2217 self.assertIn(b'error', err.lower()) 2218 self.assertIn(b'required', err.lower()) 2219 rc, out, err = self.tarfilecmd_failure('-l', '') 2220 self.assertEqual(out, b'') 2221 self.assertNotEqual(err.strip(), b'') 2222 2223 def test_test_command(self): 2224 for tar_name in testtarnames: 2225 for opt in '-t', '--test': 2226 out = self.tarfilecmd(opt, tar_name) 2227 self.assertEqual(out, b'') 2228 2229 def test_test_command_verbose(self): 2230 for tar_name in testtarnames: 2231 for opt in '-v', '--verbose': 2232 out = self.tarfilecmd(opt, '-t', tar_name) 2233 self.assertIn(b'is a tar archive.\n', out) 2234 2235 def test_test_command_invalid_file(self): 2236 zipname = support.findfile('zipdir.zip') 2237 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2238 self.assertIn(b' is not a tar archive.', err) 2239 self.assertEqual(out, b'') 2240 self.assertEqual(rc, 1) 2241 2242 for tar_name in testtarnames: 2243 with self.subTest(tar_name=tar_name): 2244 with open(tar_name, 'rb') as f: 2245 data = f.read() 2246 try: 2247 with open(tmpname, 'wb') as f: 2248 f.write(data[:511]) 2249 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2250 self.assertEqual(out, b'') 2251 self.assertEqual(rc, 1) 2252 finally: 2253 support.unlink(tmpname) 2254 2255 def test_list_command(self): 2256 for tar_name in testtarnames: 2257 with support.captured_stdout() as t: 2258 with tarfile.open(tar_name, 'r') as tf: 2259 tf.list(verbose=False) 2260 expected = t.getvalue().encode('ascii', 'backslashreplace') 2261 for opt in '-l', '--list': 2262 out = self.tarfilecmd(opt, tar_name, 2263 PYTHONIOENCODING='ascii') 2264 self.assertEqual(out, expected) 2265 2266 def test_list_command_verbose(self): 2267 for tar_name in testtarnames: 2268 with support.captured_stdout() as t: 2269 with tarfile.open(tar_name, 'r') as tf: 2270 tf.list(verbose=True) 2271 expected = t.getvalue().encode('ascii', 'backslashreplace') 2272 for opt in '-v', '--verbose': 2273 out = self.tarfilecmd(opt, '-l', tar_name, 2274 PYTHONIOENCODING='ascii') 2275 self.assertEqual(out, expected) 2276 2277 def test_list_command_invalid_file(self): 2278 zipname = support.findfile('zipdir.zip') 2279 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2280 self.assertIn(b' is not a tar archive.', err) 2281 self.assertEqual(out, b'') 2282 self.assertEqual(rc, 1) 2283 2284 def test_create_command(self): 2285 files = [support.findfile('tokenize_tests.txt'), 2286 support.findfile('tokenize_tests-no-coding-cookie-' 2287 'and-utf8-bom-sig-only.txt')] 2288 for opt in '-c', '--create': 2289 try: 2290 out = self.tarfilecmd(opt, tmpname, *files) 2291 self.assertEqual(out, b'') 2292 with tarfile.open(tmpname) as tar: 2293 tar.getmembers() 2294 finally: 2295 support.unlink(tmpname) 2296 2297 def test_create_command_verbose(self): 2298 files = [support.findfile('tokenize_tests.txt'), 2299 support.findfile('tokenize_tests-no-coding-cookie-' 2300 'and-utf8-bom-sig-only.txt')] 2301 for opt in '-v', '--verbose': 2302 try: 2303 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2304 self.assertIn(b' file created.', out) 2305 with tarfile.open(tmpname) as tar: 2306 tar.getmembers() 2307 finally: 2308 support.unlink(tmpname) 2309 2310 def test_create_command_dotless_filename(self): 2311 files = [support.findfile('tokenize_tests.txt')] 2312 try: 2313 out = self.tarfilecmd('-c', dotlessname, *files) 2314 self.assertEqual(out, b'') 2315 with tarfile.open(dotlessname) as tar: 2316 tar.getmembers() 2317 finally: 2318 support.unlink(dotlessname) 2319 2320 def test_create_command_dot_started_filename(self): 2321 tar_name = os.path.join(TEMPDIR, ".testtar") 2322 files = [support.findfile('tokenize_tests.txt')] 2323 try: 2324 out = self.tarfilecmd('-c', tar_name, *files) 2325 self.assertEqual(out, b'') 2326 with tarfile.open(tar_name) as tar: 2327 tar.getmembers() 2328 finally: 2329 support.unlink(tar_name) 2330 2331 def test_create_command_compressed(self): 2332 files = [support.findfile('tokenize_tests.txt'), 2333 support.findfile('tokenize_tests-no-coding-cookie-' 2334 'and-utf8-bom-sig-only.txt')] 2335 for filetype in (GzipTest, Bz2Test, LzmaTest): 2336 if not filetype.open: 2337 continue 2338 try: 2339 tar_name = tmpname + '.' + filetype.suffix 2340 out = self.tarfilecmd('-c', tar_name, *files) 2341 with filetype.taropen(tar_name) as tar: 2342 tar.getmembers() 2343 finally: 2344 support.unlink(tar_name) 2345 2346 def test_extract_command(self): 2347 self.make_simple_tarfile(tmpname) 2348 for opt in '-e', '--extract': 2349 try: 2350 with support.temp_cwd(tarextdir): 2351 out = self.tarfilecmd(opt, tmpname) 2352 self.assertEqual(out, b'') 2353 finally: 2354 support.rmtree(tarextdir) 2355 2356 def test_extract_command_verbose(self): 2357 self.make_simple_tarfile(tmpname) 2358 for opt in '-v', '--verbose': 2359 try: 2360 with support.temp_cwd(tarextdir): 2361 out = self.tarfilecmd(opt, '-e', tmpname) 2362 self.assertIn(b' file is extracted.', out) 2363 finally: 2364 support.rmtree(tarextdir) 2365 2366 def test_extract_command_different_directory(self): 2367 self.make_simple_tarfile(tmpname) 2368 try: 2369 with support.temp_cwd(tarextdir): 2370 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2371 self.assertEqual(out, b'') 2372 finally: 2373 support.rmtree(tarextdir) 2374 2375 def test_extract_command_invalid_file(self): 2376 zipname = support.findfile('zipdir.zip') 2377 with support.temp_cwd(tarextdir): 2378 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2379 self.assertIn(b' is not a tar archive.', err) 2380 self.assertEqual(out, b'') 2381 self.assertEqual(rc, 1) 2382 2383 2384class ContextManagerTest(unittest.TestCase): 2385 2386 def test_basic(self): 2387 with tarfile.open(tarname) as tar: 2388 self.assertFalse(tar.closed, "closed inside runtime context") 2389 self.assertTrue(tar.closed, "context manager failed") 2390 2391 def test_closed(self): 2392 # The __enter__() method is supposed to raise OSError 2393 # if the TarFile object is already closed. 2394 tar = tarfile.open(tarname) 2395 tar.close() 2396 with self.assertRaises(OSError): 2397 with tar: 2398 pass 2399 2400 def test_exception(self): 2401 # Test if the OSError exception is passed through properly. 2402 with self.assertRaises(Exception) as exc: 2403 with tarfile.open(tarname) as tar: 2404 raise OSError 2405 self.assertIsInstance(exc.exception, OSError, 2406 "wrong exception raised in context manager") 2407 self.assertTrue(tar.closed, "context manager failed") 2408 2409 def test_no_eof(self): 2410 # __exit__() must not write end-of-archive blocks if an 2411 # exception was raised. 2412 try: 2413 with tarfile.open(tmpname, "w") as tar: 2414 raise Exception 2415 except: 2416 pass 2417 self.assertEqual(os.path.getsize(tmpname), 0, 2418 "context manager wrote an end-of-archive block") 2419 self.assertTrue(tar.closed, "context manager failed") 2420 2421 def test_eof(self): 2422 # __exit__() must write end-of-archive blocks, i.e. call 2423 # TarFile.close() if there was no error. 2424 with tarfile.open(tmpname, "w"): 2425 pass 2426 self.assertNotEqual(os.path.getsize(tmpname), 0, 2427 "context manager wrote no end-of-archive block") 2428 2429 def test_fileobj(self): 2430 # Test that __exit__() did not close the external file 2431 # object. 2432 with open(tmpname, "wb") as fobj: 2433 try: 2434 with tarfile.open(fileobj=fobj, mode="w") as tar: 2435 raise Exception 2436 except: 2437 pass 2438 self.assertFalse(fobj.closed, "external file object was closed") 2439 self.assertTrue(tar.closed, "context manager failed") 2440 2441 2442@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2443class LinkEmulationTest(ReadTest, unittest.TestCase): 2444 2445 # Test for issue #8741 regression. On platforms that do not support 2446 # symbolic or hard links tarfile tries to extract these types of members 2447 # as the regular files they point to. 2448 def _test_link_extraction(self, name): 2449 self.tar.extract(name, TEMPDIR) 2450 with open(os.path.join(TEMPDIR, name), "rb") as f: 2451 data = f.read() 2452 self.assertEqual(md5sum(data), md5_regtype) 2453 2454 # See issues #1578269, #8879, and #17689 for some history on these skips 2455 @unittest.skipIf(hasattr(os.path, "islink"), 2456 "Skip emulation - has os.path.islink but not os.link") 2457 def test_hardlink_extraction1(self): 2458 self._test_link_extraction("ustar/lnktype") 2459 2460 @unittest.skipIf(hasattr(os.path, "islink"), 2461 "Skip emulation - has os.path.islink but not os.link") 2462 def test_hardlink_extraction2(self): 2463 self._test_link_extraction("./ustar/linktest2/lnktype") 2464 2465 @unittest.skipIf(hasattr(os, "symlink"), 2466 "Skip emulation if symlink exists") 2467 def test_symlink_extraction1(self): 2468 self._test_link_extraction("ustar/symtype") 2469 2470 @unittest.skipIf(hasattr(os, "symlink"), 2471 "Skip emulation if symlink exists") 2472 def test_symlink_extraction2(self): 2473 self._test_link_extraction("./ustar/linktest2/symtype") 2474 2475 2476class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2477 # Issue5068: The _BZ2Proxy.read() method loops forever 2478 # on an empty or partial bzipped file. 2479 2480 def _test_partial_input(self, mode): 2481 class MyBytesIO(io.BytesIO): 2482 hit_eof = False 2483 def read(self, n): 2484 if self.hit_eof: 2485 raise AssertionError("infinite loop detected in " 2486 "tarfile.open()") 2487 self.hit_eof = self.tell() == len(self.getvalue()) 2488 return super(MyBytesIO, self).read(n) 2489 def seek(self, *args): 2490 self.hit_eof = False 2491 return super(MyBytesIO, self).seek(*args) 2492 2493 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2494 for x in range(len(data) + 1): 2495 try: 2496 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2497 except tarfile.ReadError: 2498 pass # we have no interest in ReadErrors 2499 2500 def test_partial_input(self): 2501 self._test_partial_input("r") 2502 2503 def test_partial_input_bz2(self): 2504 self._test_partial_input("r:bz2") 2505 2506 2507def root_is_uid_gid_0(): 2508 try: 2509 import pwd, grp 2510 except ImportError: 2511 return False 2512 if pwd.getpwuid(0)[0] != 'root': 2513 return False 2514 if grp.getgrgid(0)[0] != 'root': 2515 return False 2516 return True 2517 2518 2519@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2520@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2521class NumericOwnerTest(unittest.TestCase): 2522 # mock the following: 2523 # os.chown: so we can test what's being called 2524 # os.chmod: so the modes are not actually changed. if they are, we can't 2525 # delete the files/directories 2526 # os.geteuid: so we can lie and say we're root (uid = 0) 2527 2528 @staticmethod 2529 def _make_test_archive(filename_1, dirname_1, filename_2): 2530 # the file contents to write 2531 fobj = io.BytesIO(b"content") 2532 2533 # create a tar file with a file, a directory, and a file within that 2534 # directory. Assign various .uid/.gid values to them 2535 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2536 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2537 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2538 ] 2539 with tarfile.open(tmpname, 'w') as tarfl: 2540 for name, uid, gid, typ, contents in items: 2541 t = tarfile.TarInfo(name) 2542 t.uid = uid 2543 t.gid = gid 2544 t.uname = 'root' 2545 t.gname = 'root' 2546 t.type = typ 2547 tarfl.addfile(t, contents) 2548 2549 # return the full pathname to the tar file 2550 return tmpname 2551 2552 @staticmethod 2553 @contextmanager 2554 def _setup_test(mock_geteuid): 2555 mock_geteuid.return_value = 0 # lie and say we're root 2556 fname = 'numeric-owner-testfile' 2557 dirname = 'dir' 2558 2559 # the names we want stored in the tarfile 2560 filename_1 = fname 2561 dirname_1 = dirname 2562 filename_2 = os.path.join(dirname, fname) 2563 2564 # create the tarfile with the contents we're after 2565 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2566 dirname_1, 2567 filename_2) 2568 2569 # open the tarfile for reading. yield it and the names of the items 2570 # we stored into the file 2571 with tarfile.open(tar_filename) as tarfl: 2572 yield tarfl, filename_1, dirname_1, filename_2 2573 2574 @unittest.mock.patch('os.chown') 2575 @unittest.mock.patch('os.chmod') 2576 @unittest.mock.patch('os.geteuid') 2577 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2578 mock_chown): 2579 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2580 filename_2): 2581 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2582 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2583 2584 # convert to filesystem paths 2585 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2586 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2587 2588 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2589 unittest.mock.call(f_filename_2, 88, 87), 2590 ], 2591 any_order=True) 2592 2593 @unittest.mock.patch('os.chown') 2594 @unittest.mock.patch('os.chmod') 2595 @unittest.mock.patch('os.geteuid') 2596 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2597 mock_chown): 2598 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2599 filename_2): 2600 tarfl.extractall(TEMPDIR, numeric_owner=True) 2601 2602 # convert to filesystem paths 2603 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2604 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2605 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2606 2607 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2608 unittest.mock.call(f_dirname_1, 77, 76), 2609 unittest.mock.call(f_filename_2, 88, 87), 2610 ], 2611 any_order=True) 2612 2613 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2614 # because the uname and gname in the test file are 'root', and extract() 2615 # will look them up using pwd and grp to find their uid and gid, which we 2616 # test here to be 0. 2617 @unittest.skipUnless(root_is_uid_gid_0(), 2618 'uid=0,gid=0 must be named "root"') 2619 @unittest.mock.patch('os.chown') 2620 @unittest.mock.patch('os.chmod') 2621 @unittest.mock.patch('os.geteuid') 2622 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2623 mock_chown): 2624 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2625 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2626 2627 # convert to filesystem paths 2628 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2629 2630 mock_chown.assert_called_with(f_filename_1, 0, 0) 2631 2632 @unittest.mock.patch('os.geteuid') 2633 def test_keyword_only(self, mock_geteuid): 2634 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2635 self.assertRaises(TypeError, 2636 tarfl.extract, filename_1, TEMPDIR, False, True) 2637 2638 2639def setUpModule(): 2640 support.unlink(TEMPDIR) 2641 os.makedirs(TEMPDIR) 2642 2643 global testtarnames 2644 testtarnames = [tarname] 2645 with open(tarname, "rb") as fobj: 2646 data = fobj.read() 2647 2648 # Create compressed tarfiles. 2649 for c in GzipTest, Bz2Test, LzmaTest: 2650 if c.open: 2651 support.unlink(c.tarname) 2652 testtarnames.append(c.tarname) 2653 with c.open(c.tarname, "wb") as tar: 2654 tar.write(data) 2655 2656def tearDownModule(): 2657 if os.path.exists(TEMPDIR): 2658 support.rmtree(TEMPDIR) 2659 2660if __name__ == "__main__": 2661 unittest.main() 2662