1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper, requires_hashdigest 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25try: 26 import lzma 27except ImportError: 28 lzma = None 29 30def sha256sum(data): 31 return sha256(data).hexdigest() 32 33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 34tarextdir = TEMPDIR + '-extract-test' 35tarname = support.findfile("testtar.tar") 36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 38xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 39tmpname = os.path.join(TEMPDIR, "tmp.tar") 40dotlessname = os.path.join(TEMPDIR, "testtar") 41 42sha256_regtype = ( 43 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 44) 45sha256_sparse = ( 46 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 47) 48 49 50class TarTest: 51 tarname = tarname 52 suffix = '' 53 open = io.FileIO 54 taropen = tarfile.TarFile.taropen 55 56 @property 57 def mode(self): 58 return self.prefix + self.suffix 59 60@support.requires_gzip 61class GzipTest: 62 tarname = gzipname 63 suffix = 'gz' 64 open = gzip.GzipFile if gzip else None 65 taropen = tarfile.TarFile.gzopen 66 67@support.requires_bz2 68class Bz2Test: 69 tarname = bz2name 70 suffix = 'bz2' 71 open = bz2.BZ2File if bz2 else None 72 taropen = tarfile.TarFile.bz2open 73 74@support.requires_lzma 75class LzmaTest: 76 tarname = xzname 77 suffix = 'xz' 78 open = lzma.LZMAFile if lzma else None 79 taropen = tarfile.TarFile.xzopen 80 81 82class ReadTest(TarTest): 83 84 prefix = "r:" 85 86 def setUp(self): 87 self.tar = tarfile.open(self.tarname, mode=self.mode, 88 encoding="iso8859-1") 89 90 def tearDown(self): 91 self.tar.close() 92 93 94class UstarReadTest(ReadTest, unittest.TestCase): 95 96 def test_fileobj_regular_file(self): 97 tarinfo = self.tar.getmember("ustar/regtype") 98 with self.tar.extractfile(tarinfo) as fobj: 99 data = fobj.read() 100 self.assertEqual(len(data), tarinfo.size, 101 "regular file extraction failed") 102 self.assertEqual(sha256sum(data), sha256_regtype, 103 "regular file extraction failed") 104 105 def test_fileobj_readlines(self): 106 self.tar.extract("ustar/regtype", TEMPDIR) 107 tarinfo = self.tar.getmember("ustar/regtype") 108 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 109 lines1 = fobj1.readlines() 110 111 with self.tar.extractfile(tarinfo) as fobj: 112 fobj2 = io.TextIOWrapper(fobj) 113 lines2 = fobj2.readlines() 114 self.assertEqual(lines1, lines2, 115 "fileobj.readlines() failed") 116 self.assertEqual(len(lines2), 114, 117 "fileobj.readlines() failed") 118 self.assertEqual(lines2[83], 119 "I will gladly admit that Python is not the fastest " 120 "running scripting language.\n", 121 "fileobj.readlines() failed") 122 123 def test_fileobj_iter(self): 124 self.tar.extract("ustar/regtype", TEMPDIR) 125 tarinfo = self.tar.getmember("ustar/regtype") 126 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 127 lines1 = fobj1.readlines() 128 with self.tar.extractfile(tarinfo) as fobj2: 129 lines2 = list(io.TextIOWrapper(fobj2)) 130 self.assertEqual(lines1, lines2, 131 "fileobj.__iter__() failed") 132 133 def test_fileobj_seek(self): 134 self.tar.extract("ustar/regtype", TEMPDIR) 135 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 136 data = fobj.read() 137 138 tarinfo = self.tar.getmember("ustar/regtype") 139 with self.tar.extractfile(tarinfo) as fobj: 140 text = fobj.read() 141 fobj.seek(0) 142 self.assertEqual(0, fobj.tell(), 143 "seek() to file's start failed") 144 fobj.seek(2048, 0) 145 self.assertEqual(2048, fobj.tell(), 146 "seek() to absolute position failed") 147 fobj.seek(-1024, 1) 148 self.assertEqual(1024, fobj.tell(), 149 "seek() to negative relative position failed") 150 fobj.seek(1024, 1) 151 self.assertEqual(2048, fobj.tell(), 152 "seek() to positive relative position failed") 153 s = fobj.read(10) 154 self.assertEqual(s, data[2048:2058], 155 "read() after seek failed") 156 fobj.seek(0, 2) 157 self.assertEqual(tarinfo.size, fobj.tell(), 158 "seek() to file's end failed") 159 self.assertEqual(fobj.read(), b"", 160 "read() at file's end did not return empty string") 161 fobj.seek(-tarinfo.size, 2) 162 self.assertEqual(0, fobj.tell(), 163 "relative seek() to file's end failed") 164 fobj.seek(512) 165 s1 = fobj.readlines() 166 fobj.seek(512) 167 s2 = fobj.readlines() 168 self.assertEqual(s1, s2, 169 "readlines() after seek failed") 170 fobj.seek(0) 171 self.assertEqual(len(fobj.readline()), fobj.tell(), 172 "tell() after readline() failed") 173 fobj.seek(512) 174 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 175 "tell() after seek() and readline() failed") 176 fobj.seek(0) 177 line = fobj.readline() 178 self.assertEqual(fobj.read(), data[len(line):], 179 "read() after readline() failed") 180 181 def test_fileobj_text(self): 182 with self.tar.extractfile("ustar/regtype") as fobj: 183 fobj = io.TextIOWrapper(fobj) 184 data = fobj.read().encode("iso8859-1") 185 self.assertEqual(sha256sum(data), sha256_regtype) 186 try: 187 fobj.seek(100) 188 except AttributeError: 189 # Issue #13815: seek() complained about a missing 190 # flush() method. 191 self.fail("seeking failed in text mode") 192 193 # Test if symbolic and hard links are resolved by extractfile(). The 194 # test link members each point to a regular member whose data is 195 # supposed to be exported. 196 def _test_fileobj_link(self, lnktype, regtype): 197 with self.tar.extractfile(lnktype) as a, \ 198 self.tar.extractfile(regtype) as b: 199 self.assertEqual(a.name, b.name) 200 201 def test_fileobj_link1(self): 202 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 203 204 def test_fileobj_link2(self): 205 self._test_fileobj_link("./ustar/linktest2/lnktype", 206 "ustar/linktest1/regtype") 207 208 def test_fileobj_symlink1(self): 209 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 210 211 def test_fileobj_symlink2(self): 212 self._test_fileobj_link("./ustar/linktest2/symtype", 213 "ustar/linktest1/regtype") 214 215 def test_issue14160(self): 216 self._test_fileobj_link("symtype2", "ustar/regtype") 217 218class GzipUstarReadTest(GzipTest, UstarReadTest): 219 pass 220 221class Bz2UstarReadTest(Bz2Test, UstarReadTest): 222 pass 223 224class LzmaUstarReadTest(LzmaTest, UstarReadTest): 225 pass 226 227 228class ListTest(ReadTest, unittest.TestCase): 229 230 # Override setUp to use default encoding (UTF-8) 231 def setUp(self): 232 self.tar = tarfile.open(self.tarname, mode=self.mode) 233 234 def test_list(self): 235 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 236 with support.swap_attr(sys, 'stdout', tio): 237 self.tar.list(verbose=False) 238 out = tio.detach().getvalue() 239 self.assertIn(b'ustar/conttype', out) 240 self.assertIn(b'ustar/regtype', out) 241 self.assertIn(b'ustar/lnktype', out) 242 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 243 self.assertIn(b'./ustar/linktest2/symtype', out) 244 self.assertIn(b'./ustar/linktest2/lnktype', out) 245 # Make sure it puts trailing slash for directory 246 self.assertIn(b'ustar/dirtype/', out) 247 self.assertIn(b'ustar/dirtype-with-size/', out) 248 # Make sure it is able to print unencodable characters 249 def conv(b): 250 s = b.decode(self.tar.encoding, 'surrogateescape') 251 return s.encode('ascii', 'backslashreplace') 252 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 254 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 255 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 256 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 257 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 258 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 259 # Make sure it prints files separated by one newline without any 260 # 'ls -l'-like accessories if verbose flag is not being used 261 # ... 262 # ustar/conttype 263 # ustar/regtype 264 # ... 265 self.assertRegex(out, br'ustar/conttype ?\r?\n' 266 br'ustar/regtype ?\r?\n') 267 # Make sure it does not print the source of link without verbose flag 268 self.assertNotIn(b'link to', out) 269 self.assertNotIn(b'->', out) 270 271 def test_list_verbose(self): 272 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 273 with support.swap_attr(sys, 'stdout', tio): 274 self.tar.list(verbose=True) 275 out = tio.detach().getvalue() 276 # Make sure it prints files separated by one newline with 'ls -l'-like 277 # accessories if verbose flag is being used 278 # ... 279 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 280 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 281 # ... 282 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 283 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 284 br'ustar/\w+type ?\r?\n') * 2) 285 # Make sure it prints the source of link with verbose flag 286 self.assertIn(b'ustar/symtype -> regtype', out) 287 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 288 self.assertIn(b'./ustar/linktest2/lnktype link to ' 289 b'./ustar/linktest1/regtype', out) 290 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 291 (b'/123' * 125) + b'/longname', out) 292 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 293 (b'/123' * 125) + b'/longname', out) 294 295 def test_list_members(self): 296 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 297 def members(tar): 298 for tarinfo in tar.getmembers(): 299 if 'reg' in tarinfo.name: 300 yield tarinfo 301 with support.swap_attr(sys, 'stdout', tio): 302 self.tar.list(verbose=False, members=members(self.tar)) 303 out = tio.detach().getvalue() 304 self.assertIn(b'ustar/regtype', out) 305 self.assertNotIn(b'ustar/conttype', out) 306 307 308class GzipListTest(GzipTest, ListTest): 309 pass 310 311 312class Bz2ListTest(Bz2Test, ListTest): 313 pass 314 315 316class LzmaListTest(LzmaTest, ListTest): 317 pass 318 319 320class CommonReadTest(ReadTest): 321 322 def test_empty_tarfile(self): 323 # Test for issue6123: Allow opening empty archives. 324 # This test checks if tarfile.open() is able to open an empty tar 325 # archive successfully. Note that an empty tar archive is not the 326 # same as an empty file! 327 with tarfile.open(tmpname, self.mode.replace("r", "w")): 328 pass 329 try: 330 tar = tarfile.open(tmpname, self.mode) 331 tar.getnames() 332 except tarfile.ReadError: 333 self.fail("tarfile.open() failed on empty archive") 334 else: 335 self.assertListEqual(tar.getmembers(), []) 336 finally: 337 tar.close() 338 339 def test_non_existent_tarfile(self): 340 # Test for issue11513: prevent non-existent gzipped tarfiles raising 341 # multiple exceptions. 342 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 343 tarfile.open("xxx", self.mode) 344 345 def test_null_tarfile(self): 346 # Test for issue6123: Allow opening empty archives. 347 # This test guarantees that tarfile.open() does not treat an empty 348 # file as an empty tar archive. 349 with open(tmpname, "wb"): 350 pass 351 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 352 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 353 354 def test_ignore_zeros(self): 355 # Test TarFile's ignore_zeros option. 356 # generate 512 pseudorandom bytes 357 data = Random(0).getrandbits(512*8).to_bytes(512, 'big') 358 for char in (b'\0', b'a'): 359 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 360 # are ignored correctly. 361 with self.open(tmpname, "w") as fobj: 362 fobj.write(char * 1024) 363 tarinfo = tarfile.TarInfo("foo") 364 tarinfo.size = len(data) 365 fobj.write(tarinfo.tobuf()) 366 fobj.write(data) 367 368 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 369 try: 370 self.assertListEqual(tar.getnames(), ["foo"], 371 "ignore_zeros=True should have skipped the %r-blocks" % 372 char) 373 finally: 374 tar.close() 375 376 def test_premature_end_of_archive(self): 377 for size in (512, 600, 1024, 1200): 378 with tarfile.open(tmpname, "w:") as tar: 379 t = tarfile.TarInfo("foo") 380 t.size = 1024 381 tar.addfile(t, io.BytesIO(b"a" * 1024)) 382 383 with open(tmpname, "r+b") as fobj: 384 fobj.truncate(size) 385 386 with tarfile.open(tmpname) as tar: 387 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 388 for t in tar: 389 pass 390 391 with tarfile.open(tmpname) as tar: 392 t = tar.next() 393 394 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 395 tar.extract(t, TEMPDIR) 396 397 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 398 tar.extractfile(t).read() 399 400class MiscReadTestBase(CommonReadTest): 401 def requires_name_attribute(self): 402 pass 403 404 def test_no_name_argument(self): 405 self.requires_name_attribute() 406 with open(self.tarname, "rb") as fobj: 407 self.assertIsInstance(fobj.name, str) 408 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 409 self.assertIsInstance(tar.name, str) 410 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 411 412 def test_no_name_attribute(self): 413 with open(self.tarname, "rb") as fobj: 414 data = fobj.read() 415 fobj = io.BytesIO(data) 416 self.assertRaises(AttributeError, getattr, fobj, "name") 417 tar = tarfile.open(fileobj=fobj, mode=self.mode) 418 self.assertIsNone(tar.name) 419 420 def test_empty_name_attribute(self): 421 with open(self.tarname, "rb") as fobj: 422 data = fobj.read() 423 fobj = io.BytesIO(data) 424 fobj.name = "" 425 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 426 self.assertIsNone(tar.name) 427 428 def test_int_name_attribute(self): 429 # Issue 21044: tarfile.open() should handle fileobj with an integer 430 # 'name' attribute. 431 fd = os.open(self.tarname, os.O_RDONLY) 432 with open(fd, 'rb') as fobj: 433 self.assertIsInstance(fobj.name, int) 434 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 435 self.assertIsNone(tar.name) 436 437 def test_bytes_name_attribute(self): 438 self.requires_name_attribute() 439 tarname = os.fsencode(self.tarname) 440 with open(tarname, 'rb') as fobj: 441 self.assertIsInstance(fobj.name, bytes) 442 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 443 self.assertIsInstance(tar.name, bytes) 444 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 445 446 def test_pathlike_name(self): 447 tarname = pathlib.Path(self.tarname) 448 with tarfile.open(tarname, mode=self.mode) as tar: 449 self.assertIsInstance(tar.name, str) 450 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 451 with self.taropen(tarname) as tar: 452 self.assertIsInstance(tar.name, str) 453 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 454 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 455 self.assertIsInstance(tar.name, str) 456 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 457 if self.suffix == '': 458 with tarfile.TarFile(tarname, mode='r') as tar: 459 self.assertIsInstance(tar.name, str) 460 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 461 462 def test_illegal_mode_arg(self): 463 with open(tmpname, 'wb'): 464 pass 465 with self.assertRaisesRegex(ValueError, 'mode must be '): 466 tar = self.taropen(tmpname, 'q') 467 with self.assertRaisesRegex(ValueError, 'mode must be '): 468 tar = self.taropen(tmpname, 'rw') 469 with self.assertRaisesRegex(ValueError, 'mode must be '): 470 tar = self.taropen(tmpname, '') 471 472 def test_fileobj_with_offset(self): 473 # Skip the first member and store values from the second member 474 # of the testtar. 475 tar = tarfile.open(self.tarname, mode=self.mode) 476 try: 477 tar.next() 478 t = tar.next() 479 name = t.name 480 offset = t.offset 481 with tar.extractfile(t) as f: 482 data = f.read() 483 finally: 484 tar.close() 485 486 # Open the testtar and seek to the offset of the second member. 487 with self.open(self.tarname) as fobj: 488 fobj.seek(offset) 489 490 # Test if the tarfile starts with the second member. 491 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 492 t = tar.next() 493 self.assertEqual(t.name, name) 494 # Read to the end of fileobj and test if seeking back to the 495 # beginning works. 496 tar.getmembers() 497 self.assertEqual(tar.extractfile(t).read(), data, 498 "seek back did not work") 499 500 def test_fail_comp(self): 501 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 502 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 503 with open(tarname, "rb") as fobj: 504 self.assertRaises(tarfile.ReadError, tarfile.open, 505 fileobj=fobj, mode=self.mode) 506 507 def test_v7_dirtype(self): 508 # Test old style dirtype member (bug #1336623): 509 # Old V7 tars create directory members using an AREGTYPE 510 # header with a "/" appended to the filename field. 511 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 512 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 513 "v7 dirtype failed") 514 515 def test_xstar_type(self): 516 # The xstar format stores extra atime and ctime fields inside the 517 # space reserved for the prefix field. The prefix field must be 518 # ignored in this case, otherwise it will mess up the name. 519 try: 520 self.tar.getmember("misc/regtype-xstar") 521 except KeyError: 522 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 523 524 def test_check_members(self): 525 for tarinfo in self.tar: 526 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 527 "wrong mtime for %s" % tarinfo.name) 528 if not tarinfo.name.startswith("ustar/"): 529 continue 530 self.assertEqual(tarinfo.uname, "tarfile", 531 "wrong uname for %s" % tarinfo.name) 532 533 def test_find_members(self): 534 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 535 "could not find all members") 536 537 @unittest.skipUnless(hasattr(os, "link"), 538 "Missing hardlink implementation") 539 @support.skip_unless_symlink 540 def test_extract_hardlink(self): 541 # Test hardlink extraction (e.g. bug #857297). 542 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 543 tar.extract("ustar/regtype", TEMPDIR) 544 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 545 546 tar.extract("ustar/lnktype", TEMPDIR) 547 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 548 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 549 data = f.read() 550 self.assertEqual(sha256sum(data), sha256_regtype) 551 552 tar.extract("ustar/symtype", TEMPDIR) 553 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 554 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 555 data = f.read() 556 self.assertEqual(sha256sum(data), sha256_regtype) 557 558 def test_extractall(self): 559 # Test if extractall() correctly restores directory permissions 560 # and times (see issue1735). 561 tar = tarfile.open(tarname, encoding="iso8859-1") 562 DIR = os.path.join(TEMPDIR, "extractall") 563 os.mkdir(DIR) 564 try: 565 directories = [t for t in tar if t.isdir()] 566 tar.extractall(DIR, directories) 567 for tarinfo in directories: 568 path = os.path.join(DIR, tarinfo.name) 569 if sys.platform != "win32": 570 # Win32 has no support for fine grained permissions. 571 self.assertEqual(tarinfo.mode & 0o777, 572 os.stat(path).st_mode & 0o777) 573 def format_mtime(mtime): 574 if isinstance(mtime, float): 575 return "{} ({})".format(mtime, mtime.hex()) 576 else: 577 return "{!r} (int)".format(mtime) 578 file_mtime = os.path.getmtime(path) 579 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 580 format_mtime(tarinfo.mtime), 581 format_mtime(file_mtime), 582 path) 583 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 584 finally: 585 tar.close() 586 support.rmtree(DIR) 587 588 def test_extract_directory(self): 589 dirtype = "ustar/dirtype" 590 DIR = os.path.join(TEMPDIR, "extractdir") 591 os.mkdir(DIR) 592 try: 593 with tarfile.open(tarname, encoding="iso8859-1") as tar: 594 tarinfo = tar.getmember(dirtype) 595 tar.extract(tarinfo, path=DIR) 596 extracted = os.path.join(DIR, dirtype) 597 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 598 if sys.platform != "win32": 599 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 600 finally: 601 support.rmtree(DIR) 602 603 def test_extractall_pathlike_name(self): 604 DIR = pathlib.Path(TEMPDIR) / "extractall" 605 with support.temp_dir(DIR), \ 606 tarfile.open(tarname, encoding="iso8859-1") as tar: 607 directories = [t for t in tar if t.isdir()] 608 tar.extractall(DIR, directories) 609 for tarinfo in directories: 610 path = DIR / tarinfo.name 611 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 612 613 def test_extract_pathlike_name(self): 614 dirtype = "ustar/dirtype" 615 DIR = pathlib.Path(TEMPDIR) / "extractall" 616 with support.temp_dir(DIR), \ 617 tarfile.open(tarname, encoding="iso8859-1") as tar: 618 tarinfo = tar.getmember(dirtype) 619 tar.extract(tarinfo, path=DIR) 620 extracted = DIR / dirtype 621 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 622 623 def test_init_close_fobj(self): 624 # Issue #7341: Close the internal file object in the TarFile 625 # constructor in case of an error. For the test we rely on 626 # the fact that opening an empty file raises a ReadError. 627 empty = os.path.join(TEMPDIR, "empty") 628 with open(empty, "wb") as fobj: 629 fobj.write(b"") 630 631 try: 632 tar = object.__new__(tarfile.TarFile) 633 try: 634 tar.__init__(empty) 635 except tarfile.ReadError: 636 self.assertTrue(tar.fileobj.closed) 637 else: 638 self.fail("ReadError not raised") 639 finally: 640 support.unlink(empty) 641 642 def test_parallel_iteration(self): 643 # Issue #16601: Restarting iteration over tarfile continued 644 # from where it left off. 645 with tarfile.open(self.tarname) as tar: 646 for m1, m2 in zip(tar, tar): 647 self.assertEqual(m1.offset, m2.offset) 648 self.assertEqual(m1.get_info(), m2.get_info()) 649 650class MiscReadTest(MiscReadTestBase, unittest.TestCase): 651 test_fail_comp = None 652 653class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 654 pass 655 656class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 657 def requires_name_attribute(self): 658 self.skipTest("BZ2File have no name attribute") 659 660class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 661 def requires_name_attribute(self): 662 self.skipTest("LZMAFile have no name attribute") 663 664 665class StreamReadTest(CommonReadTest, unittest.TestCase): 666 667 prefix="r|" 668 669 def test_read_through(self): 670 # Issue #11224: A poorly designed _FileInFile.read() method 671 # caused seeking errors with stream tar files. 672 for tarinfo in self.tar: 673 if not tarinfo.isreg(): 674 continue 675 with self.tar.extractfile(tarinfo) as fobj: 676 while True: 677 try: 678 buf = fobj.read(512) 679 except tarfile.StreamError: 680 self.fail("simple read-through using " 681 "TarFile.extractfile() failed") 682 if not buf: 683 break 684 685 def test_fileobj_regular_file(self): 686 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 687 with self.tar.extractfile(tarinfo) as fobj: 688 data = fobj.read() 689 self.assertEqual(len(data), tarinfo.size, 690 "regular file extraction failed") 691 self.assertEqual(sha256sum(data), sha256_regtype, 692 "regular file extraction failed") 693 694 def test_provoke_stream_error(self): 695 tarinfos = self.tar.getmembers() 696 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 697 self.assertRaises(tarfile.StreamError, f.read) 698 699 def test_compare_members(self): 700 tar1 = tarfile.open(tarname, encoding="iso8859-1") 701 try: 702 tar2 = self.tar 703 704 while True: 705 t1 = tar1.next() 706 t2 = tar2.next() 707 if t1 is None: 708 break 709 self.assertIsNotNone(t2, "stream.next() failed.") 710 711 if t2.islnk() or t2.issym(): 712 with self.assertRaises(tarfile.StreamError): 713 tar2.extractfile(t2) 714 continue 715 716 v1 = tar1.extractfile(t1) 717 v2 = tar2.extractfile(t2) 718 if v1 is None: 719 continue 720 self.assertIsNotNone(v2, "stream.extractfile() failed") 721 self.assertEqual(v1.read(), v2.read(), 722 "stream extraction failed") 723 finally: 724 tar1.close() 725 726class GzipStreamReadTest(GzipTest, StreamReadTest): 727 pass 728 729class Bz2StreamReadTest(Bz2Test, StreamReadTest): 730 pass 731 732class LzmaStreamReadTest(LzmaTest, StreamReadTest): 733 pass 734 735 736class DetectReadTest(TarTest, unittest.TestCase): 737 def _testfunc_file(self, name, mode): 738 try: 739 tar = tarfile.open(name, mode) 740 except tarfile.ReadError as e: 741 self.fail() 742 else: 743 tar.close() 744 745 def _testfunc_fileobj(self, name, mode): 746 try: 747 with open(name, "rb") as f: 748 tar = tarfile.open(name, mode, fileobj=f) 749 except tarfile.ReadError as e: 750 self.fail() 751 else: 752 tar.close() 753 754 def _test_modes(self, testfunc): 755 if self.suffix: 756 with self.assertRaises(tarfile.ReadError): 757 tarfile.open(tarname, mode="r:" + self.suffix) 758 with self.assertRaises(tarfile.ReadError): 759 tarfile.open(tarname, mode="r|" + self.suffix) 760 with self.assertRaises(tarfile.ReadError): 761 tarfile.open(self.tarname, mode="r:") 762 with self.assertRaises(tarfile.ReadError): 763 tarfile.open(self.tarname, mode="r|") 764 testfunc(self.tarname, "r") 765 testfunc(self.tarname, "r:" + self.suffix) 766 testfunc(self.tarname, "r:*") 767 testfunc(self.tarname, "r|" + self.suffix) 768 testfunc(self.tarname, "r|*") 769 770 def test_detect_file(self): 771 self._test_modes(self._testfunc_file) 772 773 def test_detect_fileobj(self): 774 self._test_modes(self._testfunc_fileobj) 775 776class GzipDetectReadTest(GzipTest, DetectReadTest): 777 pass 778 779class Bz2DetectReadTest(Bz2Test, DetectReadTest): 780 def test_detect_stream_bz2(self): 781 # Originally, tarfile's stream detection looked for the string 782 # "BZh91" at the start of the file. This is incorrect because 783 # the '9' represents the blocksize (900,000 bytes). If the file was 784 # compressed using another blocksize autodetection fails. 785 with open(tarname, "rb") as fobj: 786 data = fobj.read() 787 788 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 789 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 790 fobj.write(data) 791 792 self._testfunc_file(tmpname, "r|*") 793 794class LzmaDetectReadTest(LzmaTest, DetectReadTest): 795 pass 796 797 798class MemberReadTest(ReadTest, unittest.TestCase): 799 800 def _test_member(self, tarinfo, chksum=None, **kwargs): 801 if chksum is not None: 802 with self.tar.extractfile(tarinfo) as f: 803 self.assertEqual(sha256sum(f.read()), chksum, 804 "wrong sha256sum for %s" % tarinfo.name) 805 806 kwargs["mtime"] = 0o7606136617 807 kwargs["uid"] = 1000 808 kwargs["gid"] = 100 809 if "old-v7" not in tarinfo.name: 810 # V7 tar can't handle alphabetic owners. 811 kwargs["uname"] = "tarfile" 812 kwargs["gname"] = "tarfile" 813 for k, v in kwargs.items(): 814 self.assertEqual(getattr(tarinfo, k), v, 815 "wrong value in %s field of %s" % (k, tarinfo.name)) 816 817 def test_find_regtype(self): 818 tarinfo = self.tar.getmember("ustar/regtype") 819 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 820 821 def test_find_conttype(self): 822 tarinfo = self.tar.getmember("ustar/conttype") 823 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 824 825 def test_find_dirtype(self): 826 tarinfo = self.tar.getmember("ustar/dirtype") 827 self._test_member(tarinfo, size=0) 828 829 def test_find_dirtype_with_size(self): 830 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 831 self._test_member(tarinfo, size=255) 832 833 def test_find_lnktype(self): 834 tarinfo = self.tar.getmember("ustar/lnktype") 835 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 836 837 def test_find_symtype(self): 838 tarinfo = self.tar.getmember("ustar/symtype") 839 self._test_member(tarinfo, size=0, linkname="regtype") 840 841 def test_find_blktype(self): 842 tarinfo = self.tar.getmember("ustar/blktype") 843 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 844 845 def test_find_chrtype(self): 846 tarinfo = self.tar.getmember("ustar/chrtype") 847 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 848 849 def test_find_fifotype(self): 850 tarinfo = self.tar.getmember("ustar/fifotype") 851 self._test_member(tarinfo, size=0) 852 853 def test_find_sparse(self): 854 tarinfo = self.tar.getmember("ustar/sparse") 855 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 856 857 def test_find_gnusparse(self): 858 tarinfo = self.tar.getmember("gnu/sparse") 859 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 860 861 def test_find_gnusparse_00(self): 862 tarinfo = self.tar.getmember("gnu/sparse-0.0") 863 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 864 865 def test_find_gnusparse_01(self): 866 tarinfo = self.tar.getmember("gnu/sparse-0.1") 867 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 868 869 def test_find_gnusparse_10(self): 870 tarinfo = self.tar.getmember("gnu/sparse-1.0") 871 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 872 873 def test_find_umlauts(self): 874 tarinfo = self.tar.getmember("ustar/umlauts-" 875 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 876 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 877 878 def test_find_ustar_longname(self): 879 name = "ustar/" + "12345/" * 39 + "1234567/longname" 880 self.assertIn(name, self.tar.getnames()) 881 882 def test_find_regtype_oldv7(self): 883 tarinfo = self.tar.getmember("misc/regtype-old-v7") 884 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 885 886 def test_find_pax_umlauts(self): 887 self.tar.close() 888 self.tar = tarfile.open(self.tarname, mode=self.mode, 889 encoding="iso8859-1") 890 tarinfo = self.tar.getmember("pax/umlauts-" 891 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 892 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 893 894 895class LongnameTest: 896 897 def test_read_longname(self): 898 # Test reading of longname (bug #1471427). 899 longname = self.subdir + "/" + "123/" * 125 + "longname" 900 try: 901 tarinfo = self.tar.getmember(longname) 902 except KeyError: 903 self.fail("longname not found") 904 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 905 "read longname as dirtype") 906 907 def test_read_longlink(self): 908 longname = self.subdir + "/" + "123/" * 125 + "longname" 909 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 910 try: 911 tarinfo = self.tar.getmember(longlink) 912 except KeyError: 913 self.fail("longlink not found") 914 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 915 916 def test_truncated_longname(self): 917 longname = self.subdir + "/" + "123/" * 125 + "longname" 918 tarinfo = self.tar.getmember(longname) 919 offset = tarinfo.offset 920 self.tar.fileobj.seek(offset) 921 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 922 with self.assertRaises(tarfile.ReadError): 923 tarfile.open(name="foo.tar", fileobj=fobj) 924 925 def test_header_offset(self): 926 # Test if the start offset of the TarInfo object includes 927 # the preceding extended header. 928 longname = self.subdir + "/" + "123/" * 125 + "longname" 929 offset = self.tar.getmember(longname).offset 930 with open(tarname, "rb") as fobj: 931 fobj.seek(offset) 932 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 933 "iso8859-1", "strict") 934 self.assertEqual(tarinfo.type, self.longnametype) 935 936 937class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 938 939 subdir = "gnu" 940 longnametype = tarfile.GNUTYPE_LONGNAME 941 942 # Since 3.2 tarfile is supposed to accurately restore sparse members and 943 # produce files with holes. This is what we actually want to test here. 944 # Unfortunately, not all platforms/filesystems support sparse files, and 945 # even on platforms that do it is non-trivial to make reliable assertions 946 # about holes in files. Therefore, we first do one basic test which works 947 # an all platforms, and after that a test that will work only on 948 # platforms/filesystems that prove to support sparse files. 949 def _test_sparse_file(self, name): 950 self.tar.extract(name, TEMPDIR) 951 filename = os.path.join(TEMPDIR, name) 952 with open(filename, "rb") as fobj: 953 data = fobj.read() 954 self.assertEqual(sha256sum(data), sha256_sparse, 955 "wrong sha256sum for %s" % name) 956 957 if self._fs_supports_holes(): 958 s = os.stat(filename) 959 self.assertLess(s.st_blocks * 512, s.st_size) 960 961 def test_sparse_file_old(self): 962 self._test_sparse_file("gnu/sparse") 963 964 def test_sparse_file_00(self): 965 self._test_sparse_file("gnu/sparse-0.0") 966 967 def test_sparse_file_01(self): 968 self._test_sparse_file("gnu/sparse-0.1") 969 970 def test_sparse_file_10(self): 971 self._test_sparse_file("gnu/sparse-1.0") 972 973 @staticmethod 974 def _fs_supports_holes(): 975 # Return True if the platform knows the st_blocks stat attribute and 976 # uses st_blocks units of 512 bytes, and if the filesystem is able to 977 # store holes of 4 KiB in files. 978 # 979 # The function returns False if page size is larger than 4 KiB. 980 # For example, ppc64 uses pages of 64 KiB. 981 if sys.platform.startswith("linux"): 982 # Linux evidentially has 512 byte st_blocks units. 983 name = os.path.join(TEMPDIR, "sparse-test") 984 with open(name, "wb") as fobj: 985 # Seek to "punch a hole" of 4 KiB 986 fobj.seek(4096) 987 fobj.write(b'x' * 4096) 988 fobj.truncate() 989 s = os.stat(name) 990 support.unlink(name) 991 return (s.st_blocks * 512 < s.st_size) 992 else: 993 return False 994 995 996class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 997 998 subdir = "pax" 999 longnametype = tarfile.XHDTYPE 1000 1001 def test_pax_global_headers(self): 1002 tar = tarfile.open(tarname, encoding="iso8859-1") 1003 try: 1004 tarinfo = tar.getmember("pax/regtype1") 1005 self.assertEqual(tarinfo.uname, "foo") 1006 self.assertEqual(tarinfo.gname, "bar") 1007 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1008 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1009 1010 tarinfo = tar.getmember("pax/regtype2") 1011 self.assertEqual(tarinfo.uname, "") 1012 self.assertEqual(tarinfo.gname, "bar") 1013 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1014 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1015 1016 tarinfo = tar.getmember("pax/regtype3") 1017 self.assertEqual(tarinfo.uname, "tarfile") 1018 self.assertEqual(tarinfo.gname, "tarfile") 1019 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1020 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1021 finally: 1022 tar.close() 1023 1024 def test_pax_number_fields(self): 1025 # All following number fields are read from the pax header. 1026 tar = tarfile.open(tarname, encoding="iso8859-1") 1027 try: 1028 tarinfo = tar.getmember("pax/regtype4") 1029 self.assertEqual(tarinfo.size, 7011) 1030 self.assertEqual(tarinfo.uid, 123) 1031 self.assertEqual(tarinfo.gid, 123) 1032 self.assertEqual(tarinfo.mtime, 1041808783.0) 1033 self.assertEqual(type(tarinfo.mtime), float) 1034 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1035 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1036 finally: 1037 tar.close() 1038 1039 1040class WriteTestBase(TarTest): 1041 # Put all write tests in here that are supposed to be tested 1042 # in all possible mode combinations. 1043 1044 def test_fileobj_no_close(self): 1045 fobj = io.BytesIO() 1046 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1047 tar.addfile(tarfile.TarInfo("foo")) 1048 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1049 # Issue #20238: Incomplete gzip output with mode="w:gz" 1050 data = fobj.getvalue() 1051 del tar 1052 support.gc_collect() 1053 self.assertFalse(fobj.closed) 1054 self.assertEqual(data, fobj.getvalue()) 1055 1056 def test_eof_marker(self): 1057 # Make sure an end of archive marker is written (two zero blocks). 1058 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1059 # So, we create an archive that has exactly 10240 bytes without the 1060 # marker, and has 20480 bytes once the marker is written. 1061 with tarfile.open(tmpname, self.mode) as tar: 1062 t = tarfile.TarInfo("foo") 1063 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1064 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1065 1066 with self.open(tmpname, "rb") as fobj: 1067 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1068 1069 1070class WriteTest(WriteTestBase, unittest.TestCase): 1071 1072 prefix = "w:" 1073 1074 def test_100_char_name(self): 1075 # The name field in a tar header stores strings of at most 100 chars. 1076 # If a string is shorter than 100 chars it has to be padded with '\0', 1077 # which implies that a string of exactly 100 chars is stored without 1078 # a trailing '\0'. 1079 name = "0123456789" * 10 1080 tar = tarfile.open(tmpname, self.mode) 1081 try: 1082 t = tarfile.TarInfo(name) 1083 tar.addfile(t) 1084 finally: 1085 tar.close() 1086 1087 tar = tarfile.open(tmpname) 1088 try: 1089 self.assertEqual(tar.getnames()[0], name, 1090 "failed to store 100 char filename") 1091 finally: 1092 tar.close() 1093 1094 def test_tar_size(self): 1095 # Test for bug #1013882. 1096 tar = tarfile.open(tmpname, self.mode) 1097 try: 1098 path = os.path.join(TEMPDIR, "file") 1099 with open(path, "wb") as fobj: 1100 fobj.write(b"aaa") 1101 tar.add(path) 1102 finally: 1103 tar.close() 1104 self.assertGreater(os.path.getsize(tmpname), 0, 1105 "tarfile is empty") 1106 1107 # The test_*_size tests test for bug #1167128. 1108 def test_file_size(self): 1109 tar = tarfile.open(tmpname, self.mode) 1110 try: 1111 path = os.path.join(TEMPDIR, "file") 1112 with open(path, "wb"): 1113 pass 1114 tarinfo = tar.gettarinfo(path) 1115 self.assertEqual(tarinfo.size, 0) 1116 1117 with open(path, "wb") as fobj: 1118 fobj.write(b"aaa") 1119 tarinfo = tar.gettarinfo(path) 1120 self.assertEqual(tarinfo.size, 3) 1121 finally: 1122 tar.close() 1123 1124 def test_directory_size(self): 1125 path = os.path.join(TEMPDIR, "directory") 1126 os.mkdir(path) 1127 try: 1128 tar = tarfile.open(tmpname, self.mode) 1129 try: 1130 tarinfo = tar.gettarinfo(path) 1131 self.assertEqual(tarinfo.size, 0) 1132 finally: 1133 tar.close() 1134 finally: 1135 support.rmdir(path) 1136 1137 # mock the following: 1138 # os.listdir: so we know that files are in the wrong order 1139 def test_ordered_recursion(self): 1140 path = os.path.join(TEMPDIR, "directory") 1141 os.mkdir(path) 1142 open(os.path.join(path, "1"), "a").close() 1143 open(os.path.join(path, "2"), "a").close() 1144 try: 1145 tar = tarfile.open(tmpname, self.mode) 1146 try: 1147 with unittest.mock.patch('os.listdir') as mock_listdir: 1148 mock_listdir.return_value = ["2", "1"] 1149 tar.add(path) 1150 paths = [] 1151 for m in tar.getmembers(): 1152 paths.append(os.path.split(m.name)[-1]) 1153 self.assertEqual(paths, ["directory", "1", "2"]); 1154 finally: 1155 tar.close() 1156 finally: 1157 support.unlink(os.path.join(path, "1")) 1158 support.unlink(os.path.join(path, "2")) 1159 support.rmdir(path) 1160 1161 def test_gettarinfo_pathlike_name(self): 1162 with tarfile.open(tmpname, self.mode) as tar: 1163 path = pathlib.Path(TEMPDIR) / "file" 1164 with open(path, "wb") as fobj: 1165 fobj.write(b"aaa") 1166 tarinfo = tar.gettarinfo(path) 1167 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1168 self.assertIsInstance(tarinfo.name, str) 1169 self.assertEqual(tarinfo.name, tarinfo2.name) 1170 self.assertEqual(tarinfo.size, 3) 1171 1172 @unittest.skipUnless(hasattr(os, "link"), 1173 "Missing hardlink implementation") 1174 def test_link_size(self): 1175 link = os.path.join(TEMPDIR, "link") 1176 target = os.path.join(TEMPDIR, "link_target") 1177 with open(target, "wb") as fobj: 1178 fobj.write(b"aaa") 1179 try: 1180 os.link(target, link) 1181 except PermissionError as e: 1182 self.skipTest('os.link(): %s' % e) 1183 try: 1184 tar = tarfile.open(tmpname, self.mode) 1185 try: 1186 # Record the link target in the inodes list. 1187 tar.gettarinfo(target) 1188 tarinfo = tar.gettarinfo(link) 1189 self.assertEqual(tarinfo.size, 0) 1190 finally: 1191 tar.close() 1192 finally: 1193 support.unlink(target) 1194 support.unlink(link) 1195 1196 @support.skip_unless_symlink 1197 def test_symlink_size(self): 1198 path = os.path.join(TEMPDIR, "symlink") 1199 os.symlink("link_target", path) 1200 try: 1201 tar = tarfile.open(tmpname, self.mode) 1202 try: 1203 tarinfo = tar.gettarinfo(path) 1204 self.assertEqual(tarinfo.size, 0) 1205 finally: 1206 tar.close() 1207 finally: 1208 support.unlink(path) 1209 1210 def test_add_self(self): 1211 # Test for #1257255. 1212 dstname = os.path.abspath(tmpname) 1213 tar = tarfile.open(tmpname, self.mode) 1214 try: 1215 self.assertEqual(tar.name, dstname, 1216 "archive name must be absolute") 1217 tar.add(dstname) 1218 self.assertEqual(tar.getnames(), [], 1219 "added the archive to itself") 1220 1221 with support.change_cwd(TEMPDIR): 1222 tar.add(dstname) 1223 self.assertEqual(tar.getnames(), [], 1224 "added the archive to itself") 1225 finally: 1226 tar.close() 1227 1228 def test_filter(self): 1229 tempdir = os.path.join(TEMPDIR, "filter") 1230 os.mkdir(tempdir) 1231 try: 1232 for name in ("foo", "bar", "baz"): 1233 name = os.path.join(tempdir, name) 1234 support.create_empty_file(name) 1235 1236 def filter(tarinfo): 1237 if os.path.basename(tarinfo.name) == "bar": 1238 return 1239 tarinfo.uid = 123 1240 tarinfo.uname = "foo" 1241 return tarinfo 1242 1243 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1244 try: 1245 tar.add(tempdir, arcname="empty_dir", filter=filter) 1246 finally: 1247 tar.close() 1248 1249 # Verify that filter is a keyword-only argument 1250 with self.assertRaises(TypeError): 1251 tar.add(tempdir, "empty_dir", True, None, filter) 1252 1253 tar = tarfile.open(tmpname, "r") 1254 try: 1255 for tarinfo in tar: 1256 self.assertEqual(tarinfo.uid, 123) 1257 self.assertEqual(tarinfo.uname, "foo") 1258 self.assertEqual(len(tar.getmembers()), 3) 1259 finally: 1260 tar.close() 1261 finally: 1262 support.rmtree(tempdir) 1263 1264 # Guarantee that stored pathnames are not modified. Don't 1265 # remove ./ or ../ or double slashes. Still make absolute 1266 # pathnames relative. 1267 # For details see bug #6054. 1268 def _test_pathname(self, path, cmp_path=None, dir=False): 1269 # Create a tarfile with an empty member named path 1270 # and compare the stored name with the original. 1271 foo = os.path.join(TEMPDIR, "foo") 1272 if not dir: 1273 support.create_empty_file(foo) 1274 else: 1275 os.mkdir(foo) 1276 1277 tar = tarfile.open(tmpname, self.mode) 1278 try: 1279 tar.add(foo, arcname=path) 1280 finally: 1281 tar.close() 1282 1283 tar = tarfile.open(tmpname, "r") 1284 try: 1285 t = tar.next() 1286 finally: 1287 tar.close() 1288 1289 if not dir: 1290 support.unlink(foo) 1291 else: 1292 support.rmdir(foo) 1293 1294 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1295 1296 1297 @support.skip_unless_symlink 1298 def test_extractall_symlinks(self): 1299 # Test if extractall works properly when tarfile contains symlinks 1300 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1301 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1302 os.mkdir(tempdir) 1303 try: 1304 source_file = os.path.join(tempdir,'source') 1305 target_file = os.path.join(tempdir,'symlink') 1306 with open(source_file,'w') as f: 1307 f.write('something\n') 1308 os.symlink(source_file, target_file) 1309 with tarfile.open(temparchive, 'w') as tar: 1310 tar.add(source_file) 1311 tar.add(target_file) 1312 # Let's extract it to the location which contains the symlink 1313 with tarfile.open(temparchive) as tar: 1314 # this should not raise OSError: [Errno 17] File exists 1315 try: 1316 tar.extractall(path=tempdir) 1317 except OSError: 1318 self.fail("extractall failed with symlinked files") 1319 finally: 1320 support.unlink(temparchive) 1321 support.rmtree(tempdir) 1322 1323 def test_pathnames(self): 1324 self._test_pathname("foo") 1325 self._test_pathname(os.path.join("foo", ".", "bar")) 1326 self._test_pathname(os.path.join("foo", "..", "bar")) 1327 self._test_pathname(os.path.join(".", "foo")) 1328 self._test_pathname(os.path.join(".", "foo", ".")) 1329 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1330 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1331 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1332 self._test_pathname(os.path.join("..", "foo")) 1333 self._test_pathname(os.path.join("..", "foo", "..")) 1334 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1335 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1336 1337 self._test_pathname("foo" + os.sep + os.sep + "bar") 1338 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1339 1340 def test_abs_pathnames(self): 1341 if sys.platform == "win32": 1342 self._test_pathname("C:\\foo", "foo") 1343 else: 1344 self._test_pathname("/foo", "foo") 1345 self._test_pathname("///foo", "foo") 1346 1347 def test_cwd(self): 1348 # Test adding the current working directory. 1349 with support.change_cwd(TEMPDIR): 1350 tar = tarfile.open(tmpname, self.mode) 1351 try: 1352 tar.add(".") 1353 finally: 1354 tar.close() 1355 1356 tar = tarfile.open(tmpname, "r") 1357 try: 1358 for t in tar: 1359 if t.name != ".": 1360 self.assertTrue(t.name.startswith("./"), t.name) 1361 finally: 1362 tar.close() 1363 1364 def test_open_nonwritable_fileobj(self): 1365 for exctype in OSError, EOFError, RuntimeError: 1366 class BadFile(io.BytesIO): 1367 first = True 1368 def write(self, data): 1369 if self.first: 1370 self.first = False 1371 raise exctype 1372 1373 f = BadFile() 1374 with self.assertRaises(exctype): 1375 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1376 format=tarfile.PAX_FORMAT, 1377 pax_headers={'non': 'empty'}) 1378 self.assertFalse(f.closed) 1379 1380class GzipWriteTest(GzipTest, WriteTest): 1381 pass 1382 1383class Bz2WriteTest(Bz2Test, WriteTest): 1384 pass 1385 1386class LzmaWriteTest(LzmaTest, WriteTest): 1387 pass 1388 1389 1390class StreamWriteTest(WriteTestBase, unittest.TestCase): 1391 1392 prefix = "w|" 1393 decompressor = None 1394 1395 def test_stream_padding(self): 1396 # Test for bug #1543303. 1397 tar = tarfile.open(tmpname, self.mode) 1398 tar.close() 1399 if self.decompressor: 1400 dec = self.decompressor() 1401 with open(tmpname, "rb") as fobj: 1402 data = fobj.read() 1403 data = dec.decompress(data) 1404 self.assertFalse(dec.unused_data, "found trailing data") 1405 else: 1406 with self.open(tmpname) as fobj: 1407 data = fobj.read() 1408 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1409 "incorrect zero padding") 1410 1411 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1412 "Missing umask implementation") 1413 def test_file_mode(self): 1414 # Test for issue #8464: Create files with correct 1415 # permissions. 1416 if os.path.exists(tmpname): 1417 support.unlink(tmpname) 1418 1419 original_umask = os.umask(0o022) 1420 try: 1421 tar = tarfile.open(tmpname, self.mode) 1422 tar.close() 1423 mode = os.stat(tmpname).st_mode & 0o777 1424 self.assertEqual(mode, 0o644, "wrong file permissions") 1425 finally: 1426 os.umask(original_umask) 1427 1428class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1429 pass 1430 1431class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1432 decompressor = bz2.BZ2Decompressor if bz2 else None 1433 1434class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1435 decompressor = lzma.LZMADecompressor if lzma else None 1436 1437 1438class GNUWriteTest(unittest.TestCase): 1439 # This testcase checks for correct creation of GNU Longname 1440 # and Longlink extended headers (cp. bug #812325). 1441 1442 def _length(self, s): 1443 blocks = len(s) // 512 + 1 1444 return blocks * 512 1445 1446 def _calc_size(self, name, link=None): 1447 # Initial tar header 1448 count = 512 1449 1450 if len(name) > tarfile.LENGTH_NAME: 1451 # GNU longname extended header + longname 1452 count += 512 1453 count += self._length(name) 1454 if link is not None and len(link) > tarfile.LENGTH_LINK: 1455 # GNU longlink extended header + longlink 1456 count += 512 1457 count += self._length(link) 1458 return count 1459 1460 def _test(self, name, link=None): 1461 tarinfo = tarfile.TarInfo(name) 1462 if link: 1463 tarinfo.linkname = link 1464 tarinfo.type = tarfile.LNKTYPE 1465 1466 tar = tarfile.open(tmpname, "w") 1467 try: 1468 tar.format = tarfile.GNU_FORMAT 1469 tar.addfile(tarinfo) 1470 1471 v1 = self._calc_size(name, link) 1472 v2 = tar.offset 1473 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1474 finally: 1475 tar.close() 1476 1477 tar = tarfile.open(tmpname) 1478 try: 1479 member = tar.next() 1480 self.assertIsNotNone(member, 1481 "unable to read longname member") 1482 self.assertEqual(tarinfo.name, member.name, 1483 "unable to read longname member") 1484 self.assertEqual(tarinfo.linkname, member.linkname, 1485 "unable to read longname member") 1486 finally: 1487 tar.close() 1488 1489 def test_longname_1023(self): 1490 self._test(("longnam/" * 127) + "longnam") 1491 1492 def test_longname_1024(self): 1493 self._test(("longnam/" * 127) + "longname") 1494 1495 def test_longname_1025(self): 1496 self._test(("longnam/" * 127) + "longname_") 1497 1498 def test_longlink_1023(self): 1499 self._test("name", ("longlnk/" * 127) + "longlnk") 1500 1501 def test_longlink_1024(self): 1502 self._test("name", ("longlnk/" * 127) + "longlink") 1503 1504 def test_longlink_1025(self): 1505 self._test("name", ("longlnk/" * 127) + "longlink_") 1506 1507 def test_longnamelink_1023(self): 1508 self._test(("longnam/" * 127) + "longnam", 1509 ("longlnk/" * 127) + "longlnk") 1510 1511 def test_longnamelink_1024(self): 1512 self._test(("longnam/" * 127) + "longname", 1513 ("longlnk/" * 127) + "longlink") 1514 1515 def test_longnamelink_1025(self): 1516 self._test(("longnam/" * 127) + "longname_", 1517 ("longlnk/" * 127) + "longlink_") 1518 1519 1520class CreateTest(WriteTestBase, unittest.TestCase): 1521 1522 prefix = "x:" 1523 1524 file_path = os.path.join(TEMPDIR, "spameggs42") 1525 1526 def setUp(self): 1527 support.unlink(tmpname) 1528 1529 @classmethod 1530 def setUpClass(cls): 1531 with open(cls.file_path, "wb") as fobj: 1532 fobj.write(b"aaa") 1533 1534 @classmethod 1535 def tearDownClass(cls): 1536 support.unlink(cls.file_path) 1537 1538 def test_create(self): 1539 with tarfile.open(tmpname, self.mode) as tobj: 1540 tobj.add(self.file_path) 1541 1542 with self.taropen(tmpname) as tobj: 1543 names = tobj.getnames() 1544 self.assertEqual(len(names), 1) 1545 self.assertIn('spameggs42', names[0]) 1546 1547 def test_create_existing(self): 1548 with tarfile.open(tmpname, self.mode) as tobj: 1549 tobj.add(self.file_path) 1550 1551 with self.assertRaises(FileExistsError): 1552 tobj = tarfile.open(tmpname, self.mode) 1553 1554 with self.taropen(tmpname) as tobj: 1555 names = tobj.getnames() 1556 self.assertEqual(len(names), 1) 1557 self.assertIn('spameggs42', names[0]) 1558 1559 def test_create_taropen(self): 1560 with self.taropen(tmpname, "x") as tobj: 1561 tobj.add(self.file_path) 1562 1563 with self.taropen(tmpname) as tobj: 1564 names = tobj.getnames() 1565 self.assertEqual(len(names), 1) 1566 self.assertIn('spameggs42', names[0]) 1567 1568 def test_create_existing_taropen(self): 1569 with self.taropen(tmpname, "x") as tobj: 1570 tobj.add(self.file_path) 1571 1572 with self.assertRaises(FileExistsError): 1573 with self.taropen(tmpname, "x"): 1574 pass 1575 1576 with self.taropen(tmpname) as tobj: 1577 names = tobj.getnames() 1578 self.assertEqual(len(names), 1) 1579 self.assertIn("spameggs42", names[0]) 1580 1581 def test_create_pathlike_name(self): 1582 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1583 self.assertIsInstance(tobj.name, str) 1584 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1585 tobj.add(pathlib.Path(self.file_path)) 1586 names = tobj.getnames() 1587 self.assertEqual(len(names), 1) 1588 self.assertIn('spameggs42', names[0]) 1589 1590 with self.taropen(tmpname) as tobj: 1591 names = tobj.getnames() 1592 self.assertEqual(len(names), 1) 1593 self.assertIn('spameggs42', names[0]) 1594 1595 def test_create_taropen_pathlike_name(self): 1596 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1597 self.assertIsInstance(tobj.name, str) 1598 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1599 tobj.add(pathlib.Path(self.file_path)) 1600 names = tobj.getnames() 1601 self.assertEqual(len(names), 1) 1602 self.assertIn('spameggs42', names[0]) 1603 1604 with self.taropen(tmpname) as tobj: 1605 names = tobj.getnames() 1606 self.assertEqual(len(names), 1) 1607 self.assertIn('spameggs42', names[0]) 1608 1609 1610class GzipCreateTest(GzipTest, CreateTest): 1611 pass 1612 1613 1614class Bz2CreateTest(Bz2Test, CreateTest): 1615 pass 1616 1617 1618class LzmaCreateTest(LzmaTest, CreateTest): 1619 pass 1620 1621 1622class CreateWithXModeTest(CreateTest): 1623 1624 prefix = "x" 1625 1626 test_create_taropen = None 1627 test_create_existing_taropen = None 1628 1629 1630@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1631class HardlinkTest(unittest.TestCase): 1632 # Test the creation of LNKTYPE (hardlink) members in an archive. 1633 1634 def setUp(self): 1635 self.foo = os.path.join(TEMPDIR, "foo") 1636 self.bar = os.path.join(TEMPDIR, "bar") 1637 1638 with open(self.foo, "wb") as fobj: 1639 fobj.write(b"foo") 1640 1641 try: 1642 os.link(self.foo, self.bar) 1643 except PermissionError as e: 1644 self.skipTest('os.link(): %s' % e) 1645 1646 self.tar = tarfile.open(tmpname, "w") 1647 self.tar.add(self.foo) 1648 1649 def tearDown(self): 1650 self.tar.close() 1651 support.unlink(self.foo) 1652 support.unlink(self.bar) 1653 1654 def test_add_twice(self): 1655 # The same name will be added as a REGTYPE every 1656 # time regardless of st_nlink. 1657 tarinfo = self.tar.gettarinfo(self.foo) 1658 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1659 "add file as regular failed") 1660 1661 def test_add_hardlink(self): 1662 tarinfo = self.tar.gettarinfo(self.bar) 1663 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1664 "add file as hardlink failed") 1665 1666 def test_dereference_hardlink(self): 1667 self.tar.dereference = True 1668 tarinfo = self.tar.gettarinfo(self.bar) 1669 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1670 "dereferencing hardlink failed") 1671 1672 1673class PaxWriteTest(GNUWriteTest): 1674 1675 def _test(self, name, link=None): 1676 # See GNUWriteTest. 1677 tarinfo = tarfile.TarInfo(name) 1678 if link: 1679 tarinfo.linkname = link 1680 tarinfo.type = tarfile.LNKTYPE 1681 1682 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1683 try: 1684 tar.addfile(tarinfo) 1685 finally: 1686 tar.close() 1687 1688 tar = tarfile.open(tmpname) 1689 try: 1690 if link: 1691 l = tar.getmembers()[0].linkname 1692 self.assertEqual(link, l, "PAX longlink creation failed") 1693 else: 1694 n = tar.getmembers()[0].name 1695 self.assertEqual(name, n, "PAX longname creation failed") 1696 finally: 1697 tar.close() 1698 1699 def test_pax_global_header(self): 1700 pax_headers = { 1701 "foo": "bar", 1702 "uid": "0", 1703 "mtime": "1.23", 1704 "test": "\xe4\xf6\xfc", 1705 "\xe4\xf6\xfc": "test"} 1706 1707 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1708 pax_headers=pax_headers) 1709 try: 1710 tar.addfile(tarfile.TarInfo("test")) 1711 finally: 1712 tar.close() 1713 1714 # Test if the global header was written correctly. 1715 tar = tarfile.open(tmpname, encoding="iso8859-1") 1716 try: 1717 self.assertEqual(tar.pax_headers, pax_headers) 1718 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1719 # Test if all the fields are strings. 1720 for key, val in tar.pax_headers.items(): 1721 self.assertIsNot(type(key), bytes) 1722 self.assertIsNot(type(val), bytes) 1723 if key in tarfile.PAX_NUMBER_FIELDS: 1724 try: 1725 tarfile.PAX_NUMBER_FIELDS[key](val) 1726 except (TypeError, ValueError): 1727 self.fail("unable to convert pax header field") 1728 finally: 1729 tar.close() 1730 1731 def test_pax_extended_header(self): 1732 # The fields from the pax header have priority over the 1733 # TarInfo. 1734 pax_headers = {"path": "foo", "uid": "123"} 1735 1736 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1737 encoding="iso8859-1") 1738 try: 1739 t = tarfile.TarInfo() 1740 t.name = "\xe4\xf6\xfc" # non-ASCII 1741 t.uid = 8**8 # too large 1742 t.pax_headers = pax_headers 1743 tar.addfile(t) 1744 finally: 1745 tar.close() 1746 1747 tar = tarfile.open(tmpname, encoding="iso8859-1") 1748 try: 1749 t = tar.getmembers()[0] 1750 self.assertEqual(t.pax_headers, pax_headers) 1751 self.assertEqual(t.name, "foo") 1752 self.assertEqual(t.uid, 123) 1753 finally: 1754 tar.close() 1755 1756 1757class UnicodeTest: 1758 1759 def test_iso8859_1_filename(self): 1760 self._test_unicode_filename("iso8859-1") 1761 1762 def test_utf7_filename(self): 1763 self._test_unicode_filename("utf7") 1764 1765 def test_utf8_filename(self): 1766 self._test_unicode_filename("utf-8") 1767 1768 def _test_unicode_filename(self, encoding): 1769 tar = tarfile.open(tmpname, "w", format=self.format, 1770 encoding=encoding, errors="strict") 1771 try: 1772 name = "\xe4\xf6\xfc" 1773 tar.addfile(tarfile.TarInfo(name)) 1774 finally: 1775 tar.close() 1776 1777 tar = tarfile.open(tmpname, encoding=encoding) 1778 try: 1779 self.assertEqual(tar.getmembers()[0].name, name) 1780 finally: 1781 tar.close() 1782 1783 def test_unicode_filename_error(self): 1784 tar = tarfile.open(tmpname, "w", format=self.format, 1785 encoding="ascii", errors="strict") 1786 try: 1787 tarinfo = tarfile.TarInfo() 1788 1789 tarinfo.name = "\xe4\xf6\xfc" 1790 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1791 1792 tarinfo.name = "foo" 1793 tarinfo.uname = "\xe4\xf6\xfc" 1794 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1795 finally: 1796 tar.close() 1797 1798 def test_unicode_argument(self): 1799 tar = tarfile.open(tarname, "r", 1800 encoding="iso8859-1", errors="strict") 1801 try: 1802 for t in tar: 1803 self.assertIs(type(t.name), str) 1804 self.assertIs(type(t.linkname), str) 1805 self.assertIs(type(t.uname), str) 1806 self.assertIs(type(t.gname), str) 1807 finally: 1808 tar.close() 1809 1810 def test_uname_unicode(self): 1811 t = tarfile.TarInfo("foo") 1812 t.uname = "\xe4\xf6\xfc" 1813 t.gname = "\xe4\xf6\xfc" 1814 1815 tar = tarfile.open(tmpname, mode="w", format=self.format, 1816 encoding="iso8859-1") 1817 try: 1818 tar.addfile(t) 1819 finally: 1820 tar.close() 1821 1822 tar = tarfile.open(tmpname, encoding="iso8859-1") 1823 try: 1824 t = tar.getmember("foo") 1825 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1826 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1827 1828 if self.format != tarfile.PAX_FORMAT: 1829 tar.close() 1830 tar = tarfile.open(tmpname, encoding="ascii") 1831 t = tar.getmember("foo") 1832 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1833 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1834 finally: 1835 tar.close() 1836 1837 1838class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1839 1840 format = tarfile.USTAR_FORMAT 1841 1842 # Test whether the utf-8 encoded version of a filename exceeds the 100 1843 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1844 # bytes). 1845 def test_unicode_name1(self): 1846 self._test_ustar_name("0123456789" * 10) 1847 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1848 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1849 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1850 1851 def test_unicode_name2(self): 1852 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1853 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1854 1855 # Test whether the utf-8 encoded version of a filename exceeds the 155 1856 # bytes prefix + '/' + 100 bytes name limit. 1857 def test_unicode_longname1(self): 1858 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1859 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1860 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1861 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1862 1863 def test_unicode_longname2(self): 1864 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1865 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1866 1867 def test_unicode_longname3(self): 1868 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1869 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1870 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1871 1872 def test_unicode_longname4(self): 1873 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1874 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1875 1876 def _test_ustar_name(self, name, exc=None): 1877 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1878 t = tarfile.TarInfo(name) 1879 if exc is None: 1880 tar.addfile(t) 1881 else: 1882 self.assertRaises(exc, tar.addfile, t) 1883 1884 if exc is None: 1885 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1886 for t in tar: 1887 self.assertEqual(name, t.name) 1888 break 1889 1890 # Test the same as above for the 100 bytes link field. 1891 def test_unicode_link1(self): 1892 self._test_ustar_link("0123456789" * 10) 1893 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1894 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1895 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1896 1897 def test_unicode_link2(self): 1898 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1899 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1900 1901 def _test_ustar_link(self, name, exc=None): 1902 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1903 t = tarfile.TarInfo("foo") 1904 t.linkname = name 1905 if exc is None: 1906 tar.addfile(t) 1907 else: 1908 self.assertRaises(exc, tar.addfile, t) 1909 1910 if exc is None: 1911 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1912 for t in tar: 1913 self.assertEqual(name, t.linkname) 1914 break 1915 1916 1917class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 1918 1919 format = tarfile.GNU_FORMAT 1920 1921 def test_bad_pax_header(self): 1922 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1923 # without a hdrcharset=BINARY header. 1924 for encoding, name in ( 1925 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1926 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1927 with tarfile.open(tarname, encoding=encoding, 1928 errors="surrogateescape") as tar: 1929 try: 1930 t = tar.getmember(name) 1931 except KeyError: 1932 self.fail("unable to read bad GNU tar pax header") 1933 1934 1935class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 1936 1937 format = tarfile.PAX_FORMAT 1938 1939 # PAX_FORMAT ignores encoding in write mode. 1940 test_unicode_filename_error = None 1941 1942 def test_binary_header(self): 1943 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1944 for encoding, name in ( 1945 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1946 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1947 with tarfile.open(tarname, encoding=encoding, 1948 errors="surrogateescape") as tar: 1949 try: 1950 t = tar.getmember(name) 1951 except KeyError: 1952 self.fail("unable to read POSIX.1-2008 binary header") 1953 1954 1955class AppendTestBase: 1956 # Test append mode (cp. patch #1652681). 1957 1958 def setUp(self): 1959 self.tarname = tmpname 1960 if os.path.exists(self.tarname): 1961 support.unlink(self.tarname) 1962 1963 def _create_testtar(self, mode="w:"): 1964 with tarfile.open(tarname, encoding="iso8859-1") as src: 1965 t = src.getmember("ustar/regtype") 1966 t.name = "foo" 1967 with src.extractfile(t) as f: 1968 with tarfile.open(self.tarname, mode) as tar: 1969 tar.addfile(t, f) 1970 1971 def test_append_compressed(self): 1972 self._create_testtar("w:" + self.suffix) 1973 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1974 1975class AppendTest(AppendTestBase, unittest.TestCase): 1976 test_append_compressed = None 1977 1978 def _add_testfile(self, fileobj=None): 1979 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1980 tar.addfile(tarfile.TarInfo("bar")) 1981 1982 def _test(self, names=["bar"], fileobj=None): 1983 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1984 self.assertEqual(tar.getnames(), names) 1985 1986 def test_non_existing(self): 1987 self._add_testfile() 1988 self._test() 1989 1990 def test_empty(self): 1991 tarfile.open(self.tarname, "w:").close() 1992 self._add_testfile() 1993 self._test() 1994 1995 def test_empty_fileobj(self): 1996 fobj = io.BytesIO(b"\0" * 1024) 1997 self._add_testfile(fobj) 1998 fobj.seek(0) 1999 self._test(fileobj=fobj) 2000 2001 def test_fileobj(self): 2002 self._create_testtar() 2003 with open(self.tarname, "rb") as fobj: 2004 data = fobj.read() 2005 fobj = io.BytesIO(data) 2006 self._add_testfile(fobj) 2007 fobj.seek(0) 2008 self._test(names=["foo", "bar"], fileobj=fobj) 2009 2010 def test_existing(self): 2011 self._create_testtar() 2012 self._add_testfile() 2013 self._test(names=["foo", "bar"]) 2014 2015 # Append mode is supposed to fail if the tarfile to append to 2016 # does not end with a zero block. 2017 def _test_error(self, data): 2018 with open(self.tarname, "wb") as fobj: 2019 fobj.write(data) 2020 self.assertRaises(tarfile.ReadError, self._add_testfile) 2021 2022 def test_null(self): 2023 self._test_error(b"") 2024 2025 def test_incomplete(self): 2026 self._test_error(b"\0" * 13) 2027 2028 def test_premature_eof(self): 2029 data = tarfile.TarInfo("foo").tobuf() 2030 self._test_error(data) 2031 2032 def test_trailing_garbage(self): 2033 data = tarfile.TarInfo("foo").tobuf() 2034 self._test_error(data + b"\0" * 13) 2035 2036 def test_invalid(self): 2037 self._test_error(b"a" * 512) 2038 2039class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2040 pass 2041 2042class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2043 pass 2044 2045class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2046 pass 2047 2048 2049class LimitsTest(unittest.TestCase): 2050 2051 def test_ustar_limits(self): 2052 # 100 char name 2053 tarinfo = tarfile.TarInfo("0123456789" * 10) 2054 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2055 2056 # 101 char name that cannot be stored 2057 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2058 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2059 2060 # 256 char name with a slash at pos 156 2061 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2062 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2063 2064 # 256 char name that cannot be stored 2065 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2066 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2067 2068 # 512 char name 2069 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2070 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2071 2072 # 512 char linkname 2073 tarinfo = tarfile.TarInfo("longlink") 2074 tarinfo.linkname = "123/" * 126 + "longname" 2075 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2076 2077 # uid > 8 digits 2078 tarinfo = tarfile.TarInfo("name") 2079 tarinfo.uid = 0o10000000 2080 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2081 2082 def test_gnu_limits(self): 2083 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2084 tarinfo.tobuf(tarfile.GNU_FORMAT) 2085 2086 tarinfo = tarfile.TarInfo("longlink") 2087 tarinfo.linkname = "123/" * 126 + "longname" 2088 tarinfo.tobuf(tarfile.GNU_FORMAT) 2089 2090 # uid >= 256 ** 7 2091 tarinfo = tarfile.TarInfo("name") 2092 tarinfo.uid = 0o4000000000000000000 2093 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2094 2095 def test_pax_limits(self): 2096 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2097 tarinfo.tobuf(tarfile.PAX_FORMAT) 2098 2099 tarinfo = tarfile.TarInfo("longlink") 2100 tarinfo.linkname = "123/" * 126 + "longname" 2101 tarinfo.tobuf(tarfile.PAX_FORMAT) 2102 2103 tarinfo = tarfile.TarInfo("name") 2104 tarinfo.uid = 0o4000000000000000000 2105 tarinfo.tobuf(tarfile.PAX_FORMAT) 2106 2107 2108class MiscTest(unittest.TestCase): 2109 2110 def test_char_fields(self): 2111 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2112 b"foo\0\0\0\0\0") 2113 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2114 b"foo") 2115 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2116 "foo") 2117 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2118 "foo") 2119 2120 def test_read_number_fields(self): 2121 # Issue 13158: Test if GNU tar specific base-256 number fields 2122 # are decoded correctly. 2123 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2124 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2125 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2126 0o10000000) 2127 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2128 0xffffffff) 2129 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2130 -1) 2131 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2132 -100) 2133 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2134 -0x100000000000000) 2135 2136 # Issue 24514: Test if empty number fields are converted to zero. 2137 self.assertEqual(tarfile.nti(b"\0"), 0) 2138 self.assertEqual(tarfile.nti(b" \0"), 0) 2139 2140 def test_write_number_fields(self): 2141 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2142 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2143 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2144 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2145 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2146 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2147 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2148 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2149 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2150 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2151 self.assertEqual(tarfile.itn(-0x100000000000000, 2152 format=tarfile.GNU_FORMAT), 2153 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2154 2155 # Issue 32713: Test if itn() supports float values outside the 2156 # non-GNU format range 2157 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2158 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2159 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2160 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2161 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2162 2163 def test_number_field_limits(self): 2164 with self.assertRaises(ValueError): 2165 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2166 with self.assertRaises(ValueError): 2167 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2168 with self.assertRaises(ValueError): 2169 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2170 with self.assertRaises(ValueError): 2171 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2172 2173 def test__all__(self): 2174 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2175 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2176 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2177 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2178 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2179 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2180 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2181 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2182 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2183 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2184 'filemode', 2185 'EmptyHeaderError', 'TruncatedHeaderError', 2186 'EOFHeaderError', 'InvalidHeaderError', 2187 'SubsequentHeaderError', 'ExFileObject', 2188 'main'} 2189 support.check__all__(self, tarfile, blacklist=blacklist) 2190 2191 2192class CommandLineTest(unittest.TestCase): 2193 2194 def tarfilecmd(self, *args, **kwargs): 2195 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2196 **kwargs) 2197 return out.replace(os.linesep.encode(), b'\n') 2198 2199 def tarfilecmd_failure(self, *args): 2200 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2201 2202 def make_simple_tarfile(self, tar_name): 2203 files = [support.findfile('tokenize_tests.txt'), 2204 support.findfile('tokenize_tests-no-coding-cookie-' 2205 'and-utf8-bom-sig-only.txt')] 2206 self.addCleanup(support.unlink, tar_name) 2207 with tarfile.open(tar_name, 'w') as tf: 2208 for tardata in files: 2209 tf.add(tardata, arcname=os.path.basename(tardata)) 2210 2211 def test_bad_use(self): 2212 rc, out, err = self.tarfilecmd_failure() 2213 self.assertEqual(out, b'') 2214 self.assertIn(b'usage', err.lower()) 2215 self.assertIn(b'error', err.lower()) 2216 self.assertIn(b'required', err.lower()) 2217 rc, out, err = self.tarfilecmd_failure('-l', '') 2218 self.assertEqual(out, b'') 2219 self.assertNotEqual(err.strip(), b'') 2220 2221 def test_test_command(self): 2222 for tar_name in testtarnames: 2223 for opt in '-t', '--test': 2224 out = self.tarfilecmd(opt, tar_name) 2225 self.assertEqual(out, b'') 2226 2227 def test_test_command_verbose(self): 2228 for tar_name in testtarnames: 2229 for opt in '-v', '--verbose': 2230 out = self.tarfilecmd(opt, '-t', tar_name) 2231 self.assertIn(b'is a tar archive.\n', out) 2232 2233 def test_test_command_invalid_file(self): 2234 zipname = support.findfile('zipdir.zip') 2235 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2236 self.assertIn(b' is not a tar archive.', err) 2237 self.assertEqual(out, b'') 2238 self.assertEqual(rc, 1) 2239 2240 for tar_name in testtarnames: 2241 with self.subTest(tar_name=tar_name): 2242 with open(tar_name, 'rb') as f: 2243 data = f.read() 2244 try: 2245 with open(tmpname, 'wb') as f: 2246 f.write(data[:511]) 2247 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2248 self.assertEqual(out, b'') 2249 self.assertEqual(rc, 1) 2250 finally: 2251 support.unlink(tmpname) 2252 2253 def test_list_command(self): 2254 for tar_name in testtarnames: 2255 with support.captured_stdout() as t: 2256 with tarfile.open(tar_name, 'r') as tf: 2257 tf.list(verbose=False) 2258 expected = t.getvalue().encode('ascii', 'backslashreplace') 2259 for opt in '-l', '--list': 2260 out = self.tarfilecmd(opt, tar_name, 2261 PYTHONIOENCODING='ascii') 2262 self.assertEqual(out, expected) 2263 2264 def test_list_command_verbose(self): 2265 for tar_name in testtarnames: 2266 with support.captured_stdout() as t: 2267 with tarfile.open(tar_name, 'r') as tf: 2268 tf.list(verbose=True) 2269 expected = t.getvalue().encode('ascii', 'backslashreplace') 2270 for opt in '-v', '--verbose': 2271 out = self.tarfilecmd(opt, '-l', tar_name, 2272 PYTHONIOENCODING='ascii') 2273 self.assertEqual(out, expected) 2274 2275 def test_list_command_invalid_file(self): 2276 zipname = support.findfile('zipdir.zip') 2277 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2278 self.assertIn(b' is not a tar archive.', err) 2279 self.assertEqual(out, b'') 2280 self.assertEqual(rc, 1) 2281 2282 def test_create_command(self): 2283 files = [support.findfile('tokenize_tests.txt'), 2284 support.findfile('tokenize_tests-no-coding-cookie-' 2285 'and-utf8-bom-sig-only.txt')] 2286 for opt in '-c', '--create': 2287 try: 2288 out = self.tarfilecmd(opt, tmpname, *files) 2289 self.assertEqual(out, b'') 2290 with tarfile.open(tmpname) as tar: 2291 tar.getmembers() 2292 finally: 2293 support.unlink(tmpname) 2294 2295 def test_create_command_verbose(self): 2296 files = [support.findfile('tokenize_tests.txt'), 2297 support.findfile('tokenize_tests-no-coding-cookie-' 2298 'and-utf8-bom-sig-only.txt')] 2299 for opt in '-v', '--verbose': 2300 try: 2301 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2302 self.assertIn(b' file created.', out) 2303 with tarfile.open(tmpname) as tar: 2304 tar.getmembers() 2305 finally: 2306 support.unlink(tmpname) 2307 2308 def test_create_command_dotless_filename(self): 2309 files = [support.findfile('tokenize_tests.txt')] 2310 try: 2311 out = self.tarfilecmd('-c', dotlessname, *files) 2312 self.assertEqual(out, b'') 2313 with tarfile.open(dotlessname) as tar: 2314 tar.getmembers() 2315 finally: 2316 support.unlink(dotlessname) 2317 2318 def test_create_command_dot_started_filename(self): 2319 tar_name = os.path.join(TEMPDIR, ".testtar") 2320 files = [support.findfile('tokenize_tests.txt')] 2321 try: 2322 out = self.tarfilecmd('-c', tar_name, *files) 2323 self.assertEqual(out, b'') 2324 with tarfile.open(tar_name) as tar: 2325 tar.getmembers() 2326 finally: 2327 support.unlink(tar_name) 2328 2329 def test_create_command_compressed(self): 2330 files = [support.findfile('tokenize_tests.txt'), 2331 support.findfile('tokenize_tests-no-coding-cookie-' 2332 'and-utf8-bom-sig-only.txt')] 2333 for filetype in (GzipTest, Bz2Test, LzmaTest): 2334 if not filetype.open: 2335 continue 2336 try: 2337 tar_name = tmpname + '.' + filetype.suffix 2338 out = self.tarfilecmd('-c', tar_name, *files) 2339 with filetype.taropen(tar_name) as tar: 2340 tar.getmembers() 2341 finally: 2342 support.unlink(tar_name) 2343 2344 def test_extract_command(self): 2345 self.make_simple_tarfile(tmpname) 2346 for opt in '-e', '--extract': 2347 try: 2348 with support.temp_cwd(tarextdir): 2349 out = self.tarfilecmd(opt, tmpname) 2350 self.assertEqual(out, b'') 2351 finally: 2352 support.rmtree(tarextdir) 2353 2354 def test_extract_command_verbose(self): 2355 self.make_simple_tarfile(tmpname) 2356 for opt in '-v', '--verbose': 2357 try: 2358 with support.temp_cwd(tarextdir): 2359 out = self.tarfilecmd(opt, '-e', tmpname) 2360 self.assertIn(b' file is extracted.', out) 2361 finally: 2362 support.rmtree(tarextdir) 2363 2364 def test_extract_command_different_directory(self): 2365 self.make_simple_tarfile(tmpname) 2366 try: 2367 with support.temp_cwd(tarextdir): 2368 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2369 self.assertEqual(out, b'') 2370 finally: 2371 support.rmtree(tarextdir) 2372 2373 def test_extract_command_invalid_file(self): 2374 zipname = support.findfile('zipdir.zip') 2375 with support.temp_cwd(tarextdir): 2376 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2377 self.assertIn(b' is not a tar archive.', err) 2378 self.assertEqual(out, b'') 2379 self.assertEqual(rc, 1) 2380 2381 2382class ContextManagerTest(unittest.TestCase): 2383 2384 def test_basic(self): 2385 with tarfile.open(tarname) as tar: 2386 self.assertFalse(tar.closed, "closed inside runtime context") 2387 self.assertTrue(tar.closed, "context manager failed") 2388 2389 def test_closed(self): 2390 # The __enter__() method is supposed to raise OSError 2391 # if the TarFile object is already closed. 2392 tar = tarfile.open(tarname) 2393 tar.close() 2394 with self.assertRaises(OSError): 2395 with tar: 2396 pass 2397 2398 def test_exception(self): 2399 # Test if the OSError exception is passed through properly. 2400 with self.assertRaises(Exception) as exc: 2401 with tarfile.open(tarname) as tar: 2402 raise OSError 2403 self.assertIsInstance(exc.exception, OSError, 2404 "wrong exception raised in context manager") 2405 self.assertTrue(tar.closed, "context manager failed") 2406 2407 def test_no_eof(self): 2408 # __exit__() must not write end-of-archive blocks if an 2409 # exception was raised. 2410 try: 2411 with tarfile.open(tmpname, "w") as tar: 2412 raise Exception 2413 except: 2414 pass 2415 self.assertEqual(os.path.getsize(tmpname), 0, 2416 "context manager wrote an end-of-archive block") 2417 self.assertTrue(tar.closed, "context manager failed") 2418 2419 def test_eof(self): 2420 # __exit__() must write end-of-archive blocks, i.e. call 2421 # TarFile.close() if there was no error. 2422 with tarfile.open(tmpname, "w"): 2423 pass 2424 self.assertNotEqual(os.path.getsize(tmpname), 0, 2425 "context manager wrote no end-of-archive block") 2426 2427 def test_fileobj(self): 2428 # Test that __exit__() did not close the external file 2429 # object. 2430 with open(tmpname, "wb") as fobj: 2431 try: 2432 with tarfile.open(fileobj=fobj, mode="w") as tar: 2433 raise Exception 2434 except: 2435 pass 2436 self.assertFalse(fobj.closed, "external file object was closed") 2437 self.assertTrue(tar.closed, "context manager failed") 2438 2439 2440@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2441class LinkEmulationTest(ReadTest, unittest.TestCase): 2442 2443 # Test for issue #8741 regression. On platforms that do not support 2444 # symbolic or hard links tarfile tries to extract these types of members 2445 # as the regular files they point to. 2446 def _test_link_extraction(self, name): 2447 self.tar.extract(name, TEMPDIR) 2448 with open(os.path.join(TEMPDIR, name), "rb") as f: 2449 data = f.read() 2450 self.assertEqual(sha256sum(data), sha256_regtype) 2451 2452 # See issues #1578269, #8879, and #17689 for some history on these skips 2453 @unittest.skipIf(hasattr(os.path, "islink"), 2454 "Skip emulation - has os.path.islink but not os.link") 2455 def test_hardlink_extraction1(self): 2456 self._test_link_extraction("ustar/lnktype") 2457 2458 @unittest.skipIf(hasattr(os.path, "islink"), 2459 "Skip emulation - has os.path.islink but not os.link") 2460 def test_hardlink_extraction2(self): 2461 self._test_link_extraction("./ustar/linktest2/lnktype") 2462 2463 @unittest.skipIf(hasattr(os, "symlink"), 2464 "Skip emulation if symlink exists") 2465 def test_symlink_extraction1(self): 2466 self._test_link_extraction("ustar/symtype") 2467 2468 @unittest.skipIf(hasattr(os, "symlink"), 2469 "Skip emulation if symlink exists") 2470 def test_symlink_extraction2(self): 2471 self._test_link_extraction("./ustar/linktest2/symtype") 2472 2473 2474class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2475 # Issue5068: The _BZ2Proxy.read() method loops forever 2476 # on an empty or partial bzipped file. 2477 2478 def _test_partial_input(self, mode): 2479 class MyBytesIO(io.BytesIO): 2480 hit_eof = False 2481 def read(self, n): 2482 if self.hit_eof: 2483 raise AssertionError("infinite loop detected in " 2484 "tarfile.open()") 2485 self.hit_eof = self.tell() == len(self.getvalue()) 2486 return super(MyBytesIO, self).read(n) 2487 def seek(self, *args): 2488 self.hit_eof = False 2489 return super(MyBytesIO, self).seek(*args) 2490 2491 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2492 for x in range(len(data) + 1): 2493 try: 2494 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2495 except tarfile.ReadError: 2496 pass # we have no interest in ReadErrors 2497 2498 def test_partial_input(self): 2499 self._test_partial_input("r") 2500 2501 def test_partial_input_bz2(self): 2502 self._test_partial_input("r:bz2") 2503 2504 2505def root_is_uid_gid_0(): 2506 try: 2507 import pwd, grp 2508 except ImportError: 2509 return False 2510 if pwd.getpwuid(0)[0] != 'root': 2511 return False 2512 if grp.getgrgid(0)[0] != 'root': 2513 return False 2514 return True 2515 2516 2517@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2518@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2519class NumericOwnerTest(unittest.TestCase): 2520 # mock the following: 2521 # os.chown: so we can test what's being called 2522 # os.chmod: so the modes are not actually changed. if they are, we can't 2523 # delete the files/directories 2524 # os.geteuid: so we can lie and say we're root (uid = 0) 2525 2526 @staticmethod 2527 def _make_test_archive(filename_1, dirname_1, filename_2): 2528 # the file contents to write 2529 fobj = io.BytesIO(b"content") 2530 2531 # create a tar file with a file, a directory, and a file within that 2532 # directory. Assign various .uid/.gid values to them 2533 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2534 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2535 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2536 ] 2537 with tarfile.open(tmpname, 'w') as tarfl: 2538 for name, uid, gid, typ, contents in items: 2539 t = tarfile.TarInfo(name) 2540 t.uid = uid 2541 t.gid = gid 2542 t.uname = 'root' 2543 t.gname = 'root' 2544 t.type = typ 2545 tarfl.addfile(t, contents) 2546 2547 # return the full pathname to the tar file 2548 return tmpname 2549 2550 @staticmethod 2551 @contextmanager 2552 def _setup_test(mock_geteuid): 2553 mock_geteuid.return_value = 0 # lie and say we're root 2554 fname = 'numeric-owner-testfile' 2555 dirname = 'dir' 2556 2557 # the names we want stored in the tarfile 2558 filename_1 = fname 2559 dirname_1 = dirname 2560 filename_2 = os.path.join(dirname, fname) 2561 2562 # create the tarfile with the contents we're after 2563 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2564 dirname_1, 2565 filename_2) 2566 2567 # open the tarfile for reading. yield it and the names of the items 2568 # we stored into the file 2569 with tarfile.open(tar_filename) as tarfl: 2570 yield tarfl, filename_1, dirname_1, filename_2 2571 2572 @unittest.mock.patch('os.chown') 2573 @unittest.mock.patch('os.chmod') 2574 @unittest.mock.patch('os.geteuid') 2575 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2576 mock_chown): 2577 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2578 filename_2): 2579 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2580 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2581 2582 # convert to filesystem paths 2583 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2584 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2585 2586 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2587 unittest.mock.call(f_filename_2, 88, 87), 2588 ], 2589 any_order=True) 2590 2591 @unittest.mock.patch('os.chown') 2592 @unittest.mock.patch('os.chmod') 2593 @unittest.mock.patch('os.geteuid') 2594 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2595 mock_chown): 2596 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2597 filename_2): 2598 tarfl.extractall(TEMPDIR, numeric_owner=True) 2599 2600 # convert to filesystem paths 2601 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2602 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2603 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2604 2605 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2606 unittest.mock.call(f_dirname_1, 77, 76), 2607 unittest.mock.call(f_filename_2, 88, 87), 2608 ], 2609 any_order=True) 2610 2611 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2612 # because the uname and gname in the test file are 'root', and extract() 2613 # will look them up using pwd and grp to find their uid and gid, which we 2614 # test here to be 0. 2615 @unittest.skipUnless(root_is_uid_gid_0(), 2616 'uid=0,gid=0 must be named "root"') 2617 @unittest.mock.patch('os.chown') 2618 @unittest.mock.patch('os.chmod') 2619 @unittest.mock.patch('os.geteuid') 2620 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2621 mock_chown): 2622 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2623 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2624 2625 # convert to filesystem paths 2626 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2627 2628 mock_chown.assert_called_with(f_filename_1, 0, 0) 2629 2630 @unittest.mock.patch('os.geteuid') 2631 def test_keyword_only(self, mock_geteuid): 2632 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2633 self.assertRaises(TypeError, 2634 tarfl.extract, filename_1, TEMPDIR, False, True) 2635 2636 2637def setUpModule(): 2638 support.unlink(TEMPDIR) 2639 os.makedirs(TEMPDIR) 2640 2641 global testtarnames 2642 testtarnames = [tarname] 2643 with open(tarname, "rb") as fobj: 2644 data = fobj.read() 2645 2646 # Create compressed tarfiles. 2647 for c in GzipTest, Bz2Test, LzmaTest: 2648 if c.open: 2649 support.unlink(c.tarname) 2650 testtarnames.append(c.tarname) 2651 with c.open(c.tarname, "wb") as tar: 2652 tar.write(data) 2653 2654def tearDownModule(): 2655 if os.path.exists(TEMPDIR): 2656 support.rmtree(TEMPDIR) 2657 2658if __name__ == "__main__": 2659 unittest.main() 2660