1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper, requires_hashdigest 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25try: 26 import lzma 27except ImportError: 28 lzma = None 29 30def sha256sum(data): 31 return sha256(data).hexdigest() 32 33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 34tarextdir = TEMPDIR + '-extract-test' 35tarname = support.findfile("testtar.tar") 36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 38xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 39tmpname = os.path.join(TEMPDIR, "tmp.tar") 40dotlessname = os.path.join(TEMPDIR, "testtar") 41 42sha256_regtype = ( 43 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 44) 45sha256_sparse = ( 46 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 47) 48 49 50class TarTest: 51 tarname = tarname 52 suffix = '' 53 open = io.FileIO 54 taropen = tarfile.TarFile.taropen 55 56 @property 57 def mode(self): 58 return self.prefix + self.suffix 59 60@support.requires_gzip 61class GzipTest: 62 tarname = gzipname 63 suffix = 'gz' 64 open = gzip.GzipFile if gzip else None 65 taropen = tarfile.TarFile.gzopen 66 67@support.requires_bz2 68class Bz2Test: 69 tarname = bz2name 70 suffix = 'bz2' 71 open = bz2.BZ2File if bz2 else None 72 taropen = tarfile.TarFile.bz2open 73 74@support.requires_lzma 75class LzmaTest: 76 tarname = xzname 77 suffix = 'xz' 78 open = lzma.LZMAFile if lzma else None 79 taropen = tarfile.TarFile.xzopen 80 81 82class ReadTest(TarTest): 83 84 prefix = "r:" 85 86 def setUp(self): 87 self.tar = tarfile.open(self.tarname, mode=self.mode, 88 encoding="iso8859-1") 89 90 def tearDown(self): 91 self.tar.close() 92 93 94class UstarReadTest(ReadTest, unittest.TestCase): 95 96 def test_fileobj_regular_file(self): 97 tarinfo = self.tar.getmember("ustar/regtype") 98 with self.tar.extractfile(tarinfo) as fobj: 99 data = fobj.read() 100 self.assertEqual(len(data), tarinfo.size, 101 "regular file extraction failed") 102 self.assertEqual(sha256sum(data), sha256_regtype, 103 "regular file extraction failed") 104 105 def test_fileobj_readlines(self): 106 self.tar.extract("ustar/regtype", TEMPDIR) 107 tarinfo = self.tar.getmember("ustar/regtype") 108 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 109 lines1 = fobj1.readlines() 110 111 with self.tar.extractfile(tarinfo) as fobj: 112 fobj2 = io.TextIOWrapper(fobj) 113 lines2 = fobj2.readlines() 114 self.assertEqual(lines1, lines2, 115 "fileobj.readlines() failed") 116 self.assertEqual(len(lines2), 114, 117 "fileobj.readlines() failed") 118 self.assertEqual(lines2[83], 119 "I will gladly admit that Python is not the fastest " 120 "running scripting language.\n", 121 "fileobj.readlines() failed") 122 123 def test_fileobj_iter(self): 124 self.tar.extract("ustar/regtype", TEMPDIR) 125 tarinfo = self.tar.getmember("ustar/regtype") 126 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 127 lines1 = fobj1.readlines() 128 with self.tar.extractfile(tarinfo) as fobj2: 129 lines2 = list(io.TextIOWrapper(fobj2)) 130 self.assertEqual(lines1, lines2, 131 "fileobj.__iter__() failed") 132 133 def test_fileobj_seek(self): 134 self.tar.extract("ustar/regtype", TEMPDIR) 135 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 136 data = fobj.read() 137 138 tarinfo = self.tar.getmember("ustar/regtype") 139 with self.tar.extractfile(tarinfo) as fobj: 140 text = fobj.read() 141 fobj.seek(0) 142 self.assertEqual(0, fobj.tell(), 143 "seek() to file's start failed") 144 fobj.seek(2048, 0) 145 self.assertEqual(2048, fobj.tell(), 146 "seek() to absolute position failed") 147 fobj.seek(-1024, 1) 148 self.assertEqual(1024, fobj.tell(), 149 "seek() to negative relative position failed") 150 fobj.seek(1024, 1) 151 self.assertEqual(2048, fobj.tell(), 152 "seek() to positive relative position failed") 153 s = fobj.read(10) 154 self.assertEqual(s, data[2048:2058], 155 "read() after seek failed") 156 fobj.seek(0, 2) 157 self.assertEqual(tarinfo.size, fobj.tell(), 158 "seek() to file's end failed") 159 self.assertEqual(fobj.read(), b"", 160 "read() at file's end did not return empty string") 161 fobj.seek(-tarinfo.size, 2) 162 self.assertEqual(0, fobj.tell(), 163 "relative seek() to file's end failed") 164 fobj.seek(512) 165 s1 = fobj.readlines() 166 fobj.seek(512) 167 s2 = fobj.readlines() 168 self.assertEqual(s1, s2, 169 "readlines() after seek failed") 170 fobj.seek(0) 171 self.assertEqual(len(fobj.readline()), fobj.tell(), 172 "tell() after readline() failed") 173 fobj.seek(512) 174 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 175 "tell() after seek() and readline() failed") 176 fobj.seek(0) 177 line = fobj.readline() 178 self.assertEqual(fobj.read(), data[len(line):], 179 "read() after readline() failed") 180 181 def test_fileobj_text(self): 182 with self.tar.extractfile("ustar/regtype") as fobj: 183 fobj = io.TextIOWrapper(fobj) 184 data = fobj.read().encode("iso8859-1") 185 self.assertEqual(sha256sum(data), sha256_regtype) 186 try: 187 fobj.seek(100) 188 except AttributeError: 189 # Issue #13815: seek() complained about a missing 190 # flush() method. 191 self.fail("seeking failed in text mode") 192 193 # Test if symbolic and hard links are resolved by extractfile(). The 194 # test link members each point to a regular member whose data is 195 # supposed to be exported. 196 def _test_fileobj_link(self, lnktype, regtype): 197 with self.tar.extractfile(lnktype) as a, \ 198 self.tar.extractfile(regtype) as b: 199 self.assertEqual(a.name, b.name) 200 201 def test_fileobj_link1(self): 202 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 203 204 def test_fileobj_link2(self): 205 self._test_fileobj_link("./ustar/linktest2/lnktype", 206 "ustar/linktest1/regtype") 207 208 def test_fileobj_symlink1(self): 209 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 210 211 def test_fileobj_symlink2(self): 212 self._test_fileobj_link("./ustar/linktest2/symtype", 213 "ustar/linktest1/regtype") 214 215 def test_issue14160(self): 216 self._test_fileobj_link("symtype2", "ustar/regtype") 217 218class GzipUstarReadTest(GzipTest, UstarReadTest): 219 pass 220 221class Bz2UstarReadTest(Bz2Test, UstarReadTest): 222 pass 223 224class LzmaUstarReadTest(LzmaTest, UstarReadTest): 225 pass 226 227 228class ListTest(ReadTest, unittest.TestCase): 229 230 # Override setUp to use default encoding (UTF-8) 231 def setUp(self): 232 self.tar = tarfile.open(self.tarname, mode=self.mode) 233 234 def test_list(self): 235 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 236 with support.swap_attr(sys, 'stdout', tio): 237 self.tar.list(verbose=False) 238 out = tio.detach().getvalue() 239 self.assertIn(b'ustar/conttype', out) 240 self.assertIn(b'ustar/regtype', out) 241 self.assertIn(b'ustar/lnktype', out) 242 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 243 self.assertIn(b'./ustar/linktest2/symtype', out) 244 self.assertIn(b'./ustar/linktest2/lnktype', out) 245 # Make sure it puts trailing slash for directory 246 self.assertIn(b'ustar/dirtype/', out) 247 self.assertIn(b'ustar/dirtype-with-size/', out) 248 # Make sure it is able to print unencodable characters 249 def conv(b): 250 s = b.decode(self.tar.encoding, 'surrogateescape') 251 return s.encode('ascii', 'backslashreplace') 252 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 254 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 255 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 256 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 257 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 258 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 259 # Make sure it prints files separated by one newline without any 260 # 'ls -l'-like accessories if verbose flag is not being used 261 # ... 262 # ustar/conttype 263 # ustar/regtype 264 # ... 265 self.assertRegex(out, br'ustar/conttype ?\r?\n' 266 br'ustar/regtype ?\r?\n') 267 # Make sure it does not print the source of link without verbose flag 268 self.assertNotIn(b'link to', out) 269 self.assertNotIn(b'->', out) 270 271 def test_list_verbose(self): 272 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 273 with support.swap_attr(sys, 'stdout', tio): 274 self.tar.list(verbose=True) 275 out = tio.detach().getvalue() 276 # Make sure it prints files separated by one newline with 'ls -l'-like 277 # accessories if verbose flag is being used 278 # ... 279 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 280 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 281 # ... 282 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 283 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 284 br'ustar/\w+type ?\r?\n') * 2) 285 # Make sure it prints the source of link with verbose flag 286 self.assertIn(b'ustar/symtype -> regtype', out) 287 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 288 self.assertIn(b'./ustar/linktest2/lnktype link to ' 289 b'./ustar/linktest1/regtype', out) 290 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 291 (b'/123' * 125) + b'/longname', out) 292 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 293 (b'/123' * 125) + b'/longname', out) 294 295 def test_list_members(self): 296 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 297 def members(tar): 298 for tarinfo in tar.getmembers(): 299 if 'reg' in tarinfo.name: 300 yield tarinfo 301 with support.swap_attr(sys, 'stdout', tio): 302 self.tar.list(verbose=False, members=members(self.tar)) 303 out = tio.detach().getvalue() 304 self.assertIn(b'ustar/regtype', out) 305 self.assertNotIn(b'ustar/conttype', out) 306 307 308class GzipListTest(GzipTest, ListTest): 309 pass 310 311 312class Bz2ListTest(Bz2Test, ListTest): 313 pass 314 315 316class LzmaListTest(LzmaTest, ListTest): 317 pass 318 319 320class CommonReadTest(ReadTest): 321 322 def test_empty_tarfile(self): 323 # Test for issue6123: Allow opening empty archives. 324 # This test checks if tarfile.open() is able to open an empty tar 325 # archive successfully. Note that an empty tar archive is not the 326 # same as an empty file! 327 with tarfile.open(tmpname, self.mode.replace("r", "w")): 328 pass 329 try: 330 tar = tarfile.open(tmpname, self.mode) 331 tar.getnames() 332 except tarfile.ReadError: 333 self.fail("tarfile.open() failed on empty archive") 334 else: 335 self.assertListEqual(tar.getmembers(), []) 336 finally: 337 tar.close() 338 339 def test_non_existent_tarfile(self): 340 # Test for issue11513: prevent non-existent gzipped tarfiles raising 341 # multiple exceptions. 342 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 343 tarfile.open("xxx", self.mode) 344 345 def test_null_tarfile(self): 346 # Test for issue6123: Allow opening empty archives. 347 # This test guarantees that tarfile.open() does not treat an empty 348 # file as an empty tar archive. 349 with open(tmpname, "wb"): 350 pass 351 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 352 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 353 354 def test_ignore_zeros(self): 355 # Test TarFile's ignore_zeros option. 356 # generate 512 pseudorandom bytes 357 data = Random(0).getrandbits(512*8).to_bytes(512, 'big') 358 for char in (b'\0', b'a'): 359 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 360 # are ignored correctly. 361 with self.open(tmpname, "w") as fobj: 362 fobj.write(char * 1024) 363 tarinfo = tarfile.TarInfo("foo") 364 tarinfo.size = len(data) 365 fobj.write(tarinfo.tobuf()) 366 fobj.write(data) 367 368 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 369 try: 370 self.assertListEqual(tar.getnames(), ["foo"], 371 "ignore_zeros=True should have skipped the %r-blocks" % 372 char) 373 finally: 374 tar.close() 375 376 def test_premature_end_of_archive(self): 377 for size in (512, 600, 1024, 1200): 378 with tarfile.open(tmpname, "w:") as tar: 379 t = tarfile.TarInfo("foo") 380 t.size = 1024 381 tar.addfile(t, io.BytesIO(b"a" * 1024)) 382 383 with open(tmpname, "r+b") as fobj: 384 fobj.truncate(size) 385 386 with tarfile.open(tmpname) as tar: 387 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 388 for t in tar: 389 pass 390 391 with tarfile.open(tmpname) as tar: 392 t = tar.next() 393 394 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 395 tar.extract(t, TEMPDIR) 396 397 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 398 tar.extractfile(t).read() 399 400 def test_length_zero_header(self): 401 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 402 # with an exception 403 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 404 with tarfile.open(support.findfile('recursion.tar')) as tar: 405 pass 406 407class MiscReadTestBase(CommonReadTest): 408 def requires_name_attribute(self): 409 pass 410 411 def test_no_name_argument(self): 412 self.requires_name_attribute() 413 with open(self.tarname, "rb") as fobj: 414 self.assertIsInstance(fobj.name, str) 415 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 416 self.assertIsInstance(tar.name, str) 417 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 418 419 def test_no_name_attribute(self): 420 with open(self.tarname, "rb") as fobj: 421 data = fobj.read() 422 fobj = io.BytesIO(data) 423 self.assertRaises(AttributeError, getattr, fobj, "name") 424 tar = tarfile.open(fileobj=fobj, mode=self.mode) 425 self.assertIsNone(tar.name) 426 427 def test_empty_name_attribute(self): 428 with open(self.tarname, "rb") as fobj: 429 data = fobj.read() 430 fobj = io.BytesIO(data) 431 fobj.name = "" 432 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 433 self.assertIsNone(tar.name) 434 435 def test_int_name_attribute(self): 436 # Issue 21044: tarfile.open() should handle fileobj with an integer 437 # 'name' attribute. 438 fd = os.open(self.tarname, os.O_RDONLY) 439 with open(fd, 'rb') as fobj: 440 self.assertIsInstance(fobj.name, int) 441 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 442 self.assertIsNone(tar.name) 443 444 def test_bytes_name_attribute(self): 445 self.requires_name_attribute() 446 tarname = os.fsencode(self.tarname) 447 with open(tarname, 'rb') as fobj: 448 self.assertIsInstance(fobj.name, bytes) 449 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 450 self.assertIsInstance(tar.name, bytes) 451 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 452 453 def test_pathlike_name(self): 454 tarname = pathlib.Path(self.tarname) 455 with tarfile.open(tarname, mode=self.mode) as tar: 456 self.assertIsInstance(tar.name, str) 457 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 458 with self.taropen(tarname) as tar: 459 self.assertIsInstance(tar.name, str) 460 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 461 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 462 self.assertIsInstance(tar.name, str) 463 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 464 if self.suffix == '': 465 with tarfile.TarFile(tarname, mode='r') as tar: 466 self.assertIsInstance(tar.name, str) 467 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 468 469 def test_illegal_mode_arg(self): 470 with open(tmpname, 'wb'): 471 pass 472 with self.assertRaisesRegex(ValueError, 'mode must be '): 473 tar = self.taropen(tmpname, 'q') 474 with self.assertRaisesRegex(ValueError, 'mode must be '): 475 tar = self.taropen(tmpname, 'rw') 476 with self.assertRaisesRegex(ValueError, 'mode must be '): 477 tar = self.taropen(tmpname, '') 478 479 def test_fileobj_with_offset(self): 480 # Skip the first member and store values from the second member 481 # of the testtar. 482 tar = tarfile.open(self.tarname, mode=self.mode) 483 try: 484 tar.next() 485 t = tar.next() 486 name = t.name 487 offset = t.offset 488 with tar.extractfile(t) as f: 489 data = f.read() 490 finally: 491 tar.close() 492 493 # Open the testtar and seek to the offset of the second member. 494 with self.open(self.tarname) as fobj: 495 fobj.seek(offset) 496 497 # Test if the tarfile starts with the second member. 498 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 499 t = tar.next() 500 self.assertEqual(t.name, name) 501 # Read to the end of fileobj and test if seeking back to the 502 # beginning works. 503 tar.getmembers() 504 self.assertEqual(tar.extractfile(t).read(), data, 505 "seek back did not work") 506 507 def test_fail_comp(self): 508 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 509 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 510 with open(tarname, "rb") as fobj: 511 self.assertRaises(tarfile.ReadError, tarfile.open, 512 fileobj=fobj, mode=self.mode) 513 514 def test_v7_dirtype(self): 515 # Test old style dirtype member (bug #1336623): 516 # Old V7 tars create directory members using an AREGTYPE 517 # header with a "/" appended to the filename field. 518 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 519 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 520 "v7 dirtype failed") 521 522 def test_xstar_type(self): 523 # The xstar format stores extra atime and ctime fields inside the 524 # space reserved for the prefix field. The prefix field must be 525 # ignored in this case, otherwise it will mess up the name. 526 try: 527 self.tar.getmember("misc/regtype-xstar") 528 except KeyError: 529 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 530 531 def test_check_members(self): 532 for tarinfo in self.tar: 533 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 534 "wrong mtime for %s" % tarinfo.name) 535 if not tarinfo.name.startswith("ustar/"): 536 continue 537 self.assertEqual(tarinfo.uname, "tarfile", 538 "wrong uname for %s" % tarinfo.name) 539 540 def test_find_members(self): 541 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 542 "could not find all members") 543 544 @unittest.skipUnless(hasattr(os, "link"), 545 "Missing hardlink implementation") 546 @support.skip_unless_symlink 547 def test_extract_hardlink(self): 548 # Test hardlink extraction (e.g. bug #857297). 549 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 550 tar.extract("ustar/regtype", TEMPDIR) 551 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 552 553 tar.extract("ustar/lnktype", TEMPDIR) 554 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 555 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 556 data = f.read() 557 self.assertEqual(sha256sum(data), sha256_regtype) 558 559 tar.extract("ustar/symtype", TEMPDIR) 560 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 561 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 562 data = f.read() 563 self.assertEqual(sha256sum(data), sha256_regtype) 564 565 def test_extractall(self): 566 # Test if extractall() correctly restores directory permissions 567 # and times (see issue1735). 568 tar = tarfile.open(tarname, encoding="iso8859-1") 569 DIR = os.path.join(TEMPDIR, "extractall") 570 os.mkdir(DIR) 571 try: 572 directories = [t for t in tar if t.isdir()] 573 tar.extractall(DIR, directories) 574 for tarinfo in directories: 575 path = os.path.join(DIR, tarinfo.name) 576 if sys.platform != "win32": 577 # Win32 has no support for fine grained permissions. 578 self.assertEqual(tarinfo.mode & 0o777, 579 os.stat(path).st_mode & 0o777) 580 def format_mtime(mtime): 581 if isinstance(mtime, float): 582 return "{} ({})".format(mtime, mtime.hex()) 583 else: 584 return "{!r} (int)".format(mtime) 585 file_mtime = os.path.getmtime(path) 586 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 587 format_mtime(tarinfo.mtime), 588 format_mtime(file_mtime), 589 path) 590 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 591 finally: 592 tar.close() 593 support.rmtree(DIR) 594 595 def test_extract_directory(self): 596 dirtype = "ustar/dirtype" 597 DIR = os.path.join(TEMPDIR, "extractdir") 598 os.mkdir(DIR) 599 try: 600 with tarfile.open(tarname, encoding="iso8859-1") as tar: 601 tarinfo = tar.getmember(dirtype) 602 tar.extract(tarinfo, path=DIR) 603 extracted = os.path.join(DIR, dirtype) 604 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 605 if sys.platform != "win32": 606 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 607 finally: 608 support.rmtree(DIR) 609 610 def test_extractall_pathlike_name(self): 611 DIR = pathlib.Path(TEMPDIR) / "extractall" 612 with support.temp_dir(DIR), \ 613 tarfile.open(tarname, encoding="iso8859-1") as tar: 614 directories = [t for t in tar if t.isdir()] 615 tar.extractall(DIR, directories) 616 for tarinfo in directories: 617 path = DIR / tarinfo.name 618 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 619 620 def test_extract_pathlike_name(self): 621 dirtype = "ustar/dirtype" 622 DIR = pathlib.Path(TEMPDIR) / "extractall" 623 with support.temp_dir(DIR), \ 624 tarfile.open(tarname, encoding="iso8859-1") as tar: 625 tarinfo = tar.getmember(dirtype) 626 tar.extract(tarinfo, path=DIR) 627 extracted = DIR / dirtype 628 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 629 630 def test_init_close_fobj(self): 631 # Issue #7341: Close the internal file object in the TarFile 632 # constructor in case of an error. For the test we rely on 633 # the fact that opening an empty file raises a ReadError. 634 empty = os.path.join(TEMPDIR, "empty") 635 with open(empty, "wb") as fobj: 636 fobj.write(b"") 637 638 try: 639 tar = object.__new__(tarfile.TarFile) 640 try: 641 tar.__init__(empty) 642 except tarfile.ReadError: 643 self.assertTrue(tar.fileobj.closed) 644 else: 645 self.fail("ReadError not raised") 646 finally: 647 support.unlink(empty) 648 649 def test_parallel_iteration(self): 650 # Issue #16601: Restarting iteration over tarfile continued 651 # from where it left off. 652 with tarfile.open(self.tarname) as tar: 653 for m1, m2 in zip(tar, tar): 654 self.assertEqual(m1.offset, m2.offset) 655 self.assertEqual(m1.get_info(), m2.get_info()) 656 657class MiscReadTest(MiscReadTestBase, unittest.TestCase): 658 test_fail_comp = None 659 660class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 661 pass 662 663class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 664 def requires_name_attribute(self): 665 self.skipTest("BZ2File have no name attribute") 666 667class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 668 def requires_name_attribute(self): 669 self.skipTest("LZMAFile have no name attribute") 670 671 672class StreamReadTest(CommonReadTest, unittest.TestCase): 673 674 prefix="r|" 675 676 def test_read_through(self): 677 # Issue #11224: A poorly designed _FileInFile.read() method 678 # caused seeking errors with stream tar files. 679 for tarinfo in self.tar: 680 if not tarinfo.isreg(): 681 continue 682 with self.tar.extractfile(tarinfo) as fobj: 683 while True: 684 try: 685 buf = fobj.read(512) 686 except tarfile.StreamError: 687 self.fail("simple read-through using " 688 "TarFile.extractfile() failed") 689 if not buf: 690 break 691 692 def test_fileobj_regular_file(self): 693 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 694 with self.tar.extractfile(tarinfo) as fobj: 695 data = fobj.read() 696 self.assertEqual(len(data), tarinfo.size, 697 "regular file extraction failed") 698 self.assertEqual(sha256sum(data), sha256_regtype, 699 "regular file extraction failed") 700 701 def test_provoke_stream_error(self): 702 tarinfos = self.tar.getmembers() 703 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 704 self.assertRaises(tarfile.StreamError, f.read) 705 706 def test_compare_members(self): 707 tar1 = tarfile.open(tarname, encoding="iso8859-1") 708 try: 709 tar2 = self.tar 710 711 while True: 712 t1 = tar1.next() 713 t2 = tar2.next() 714 if t1 is None: 715 break 716 self.assertIsNotNone(t2, "stream.next() failed.") 717 718 if t2.islnk() or t2.issym(): 719 with self.assertRaises(tarfile.StreamError): 720 tar2.extractfile(t2) 721 continue 722 723 v1 = tar1.extractfile(t1) 724 v2 = tar2.extractfile(t2) 725 if v1 is None: 726 continue 727 self.assertIsNotNone(v2, "stream.extractfile() failed") 728 self.assertEqual(v1.read(), v2.read(), 729 "stream extraction failed") 730 finally: 731 tar1.close() 732 733class GzipStreamReadTest(GzipTest, StreamReadTest): 734 pass 735 736class Bz2StreamReadTest(Bz2Test, StreamReadTest): 737 pass 738 739class LzmaStreamReadTest(LzmaTest, StreamReadTest): 740 pass 741 742 743class DetectReadTest(TarTest, unittest.TestCase): 744 def _testfunc_file(self, name, mode): 745 try: 746 tar = tarfile.open(name, mode) 747 except tarfile.ReadError as e: 748 self.fail() 749 else: 750 tar.close() 751 752 def _testfunc_fileobj(self, name, mode): 753 try: 754 with open(name, "rb") as f: 755 tar = tarfile.open(name, mode, fileobj=f) 756 except tarfile.ReadError as e: 757 self.fail() 758 else: 759 tar.close() 760 761 def _test_modes(self, testfunc): 762 if self.suffix: 763 with self.assertRaises(tarfile.ReadError): 764 tarfile.open(tarname, mode="r:" + self.suffix) 765 with self.assertRaises(tarfile.ReadError): 766 tarfile.open(tarname, mode="r|" + self.suffix) 767 with self.assertRaises(tarfile.ReadError): 768 tarfile.open(self.tarname, mode="r:") 769 with self.assertRaises(tarfile.ReadError): 770 tarfile.open(self.tarname, mode="r|") 771 testfunc(self.tarname, "r") 772 testfunc(self.tarname, "r:" + self.suffix) 773 testfunc(self.tarname, "r:*") 774 testfunc(self.tarname, "r|" + self.suffix) 775 testfunc(self.tarname, "r|*") 776 777 def test_detect_file(self): 778 self._test_modes(self._testfunc_file) 779 780 def test_detect_fileobj(self): 781 self._test_modes(self._testfunc_fileobj) 782 783class GzipDetectReadTest(GzipTest, DetectReadTest): 784 pass 785 786class Bz2DetectReadTest(Bz2Test, DetectReadTest): 787 def test_detect_stream_bz2(self): 788 # Originally, tarfile's stream detection looked for the string 789 # "BZh91" at the start of the file. This is incorrect because 790 # the '9' represents the blocksize (900,000 bytes). If the file was 791 # compressed using another blocksize autodetection fails. 792 with open(tarname, "rb") as fobj: 793 data = fobj.read() 794 795 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 796 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 797 fobj.write(data) 798 799 self._testfunc_file(tmpname, "r|*") 800 801class LzmaDetectReadTest(LzmaTest, DetectReadTest): 802 pass 803 804 805class MemberReadTest(ReadTest, unittest.TestCase): 806 807 def _test_member(self, tarinfo, chksum=None, **kwargs): 808 if chksum is not None: 809 with self.tar.extractfile(tarinfo) as f: 810 self.assertEqual(sha256sum(f.read()), chksum, 811 "wrong sha256sum for %s" % tarinfo.name) 812 813 kwargs["mtime"] = 0o7606136617 814 kwargs["uid"] = 1000 815 kwargs["gid"] = 100 816 if "old-v7" not in tarinfo.name: 817 # V7 tar can't handle alphabetic owners. 818 kwargs["uname"] = "tarfile" 819 kwargs["gname"] = "tarfile" 820 for k, v in kwargs.items(): 821 self.assertEqual(getattr(tarinfo, k), v, 822 "wrong value in %s field of %s" % (k, tarinfo.name)) 823 824 def test_find_regtype(self): 825 tarinfo = self.tar.getmember("ustar/regtype") 826 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 827 828 def test_find_conttype(self): 829 tarinfo = self.tar.getmember("ustar/conttype") 830 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 831 832 def test_find_dirtype(self): 833 tarinfo = self.tar.getmember("ustar/dirtype") 834 self._test_member(tarinfo, size=0) 835 836 def test_find_dirtype_with_size(self): 837 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 838 self._test_member(tarinfo, size=255) 839 840 def test_find_lnktype(self): 841 tarinfo = self.tar.getmember("ustar/lnktype") 842 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 843 844 def test_find_symtype(self): 845 tarinfo = self.tar.getmember("ustar/symtype") 846 self._test_member(tarinfo, size=0, linkname="regtype") 847 848 def test_find_blktype(self): 849 tarinfo = self.tar.getmember("ustar/blktype") 850 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 851 852 def test_find_chrtype(self): 853 tarinfo = self.tar.getmember("ustar/chrtype") 854 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 855 856 def test_find_fifotype(self): 857 tarinfo = self.tar.getmember("ustar/fifotype") 858 self._test_member(tarinfo, size=0) 859 860 def test_find_sparse(self): 861 tarinfo = self.tar.getmember("ustar/sparse") 862 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 863 864 def test_find_gnusparse(self): 865 tarinfo = self.tar.getmember("gnu/sparse") 866 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 867 868 def test_find_gnusparse_00(self): 869 tarinfo = self.tar.getmember("gnu/sparse-0.0") 870 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 871 872 def test_find_gnusparse_01(self): 873 tarinfo = self.tar.getmember("gnu/sparse-0.1") 874 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 875 876 def test_find_gnusparse_10(self): 877 tarinfo = self.tar.getmember("gnu/sparse-1.0") 878 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 879 880 def test_find_umlauts(self): 881 tarinfo = self.tar.getmember("ustar/umlauts-" 882 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 883 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 884 885 def test_find_ustar_longname(self): 886 name = "ustar/" + "12345/" * 39 + "1234567/longname" 887 self.assertIn(name, self.tar.getnames()) 888 889 def test_find_regtype_oldv7(self): 890 tarinfo = self.tar.getmember("misc/regtype-old-v7") 891 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 892 893 def test_find_pax_umlauts(self): 894 self.tar.close() 895 self.tar = tarfile.open(self.tarname, mode=self.mode, 896 encoding="iso8859-1") 897 tarinfo = self.tar.getmember("pax/umlauts-" 898 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 899 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 900 901 902class LongnameTest: 903 904 def test_read_longname(self): 905 # Test reading of longname (bug #1471427). 906 longname = self.subdir + "/" + "123/" * 125 + "longname" 907 try: 908 tarinfo = self.tar.getmember(longname) 909 except KeyError: 910 self.fail("longname not found") 911 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 912 "read longname as dirtype") 913 914 def test_read_longlink(self): 915 longname = self.subdir + "/" + "123/" * 125 + "longname" 916 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 917 try: 918 tarinfo = self.tar.getmember(longlink) 919 except KeyError: 920 self.fail("longlink not found") 921 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 922 923 def test_truncated_longname(self): 924 longname = self.subdir + "/" + "123/" * 125 + "longname" 925 tarinfo = self.tar.getmember(longname) 926 offset = tarinfo.offset 927 self.tar.fileobj.seek(offset) 928 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 929 with self.assertRaises(tarfile.ReadError): 930 tarfile.open(name="foo.tar", fileobj=fobj) 931 932 def test_header_offset(self): 933 # Test if the start offset of the TarInfo object includes 934 # the preceding extended header. 935 longname = self.subdir + "/" + "123/" * 125 + "longname" 936 offset = self.tar.getmember(longname).offset 937 with open(tarname, "rb") as fobj: 938 fobj.seek(offset) 939 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 940 "iso8859-1", "strict") 941 self.assertEqual(tarinfo.type, self.longnametype) 942 943 944class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 945 946 subdir = "gnu" 947 longnametype = tarfile.GNUTYPE_LONGNAME 948 949 # Since 3.2 tarfile is supposed to accurately restore sparse members and 950 # produce files with holes. This is what we actually want to test here. 951 # Unfortunately, not all platforms/filesystems support sparse files, and 952 # even on platforms that do it is non-trivial to make reliable assertions 953 # about holes in files. Therefore, we first do one basic test which works 954 # an all platforms, and after that a test that will work only on 955 # platforms/filesystems that prove to support sparse files. 956 def _test_sparse_file(self, name): 957 self.tar.extract(name, TEMPDIR) 958 filename = os.path.join(TEMPDIR, name) 959 with open(filename, "rb") as fobj: 960 data = fobj.read() 961 self.assertEqual(sha256sum(data), sha256_sparse, 962 "wrong sha256sum for %s" % name) 963 964 if self._fs_supports_holes(): 965 s = os.stat(filename) 966 self.assertLess(s.st_blocks * 512, s.st_size) 967 968 def test_sparse_file_old(self): 969 self._test_sparse_file("gnu/sparse") 970 971 def test_sparse_file_00(self): 972 self._test_sparse_file("gnu/sparse-0.0") 973 974 def test_sparse_file_01(self): 975 self._test_sparse_file("gnu/sparse-0.1") 976 977 def test_sparse_file_10(self): 978 self._test_sparse_file("gnu/sparse-1.0") 979 980 @staticmethod 981 def _fs_supports_holes(): 982 # Return True if the platform knows the st_blocks stat attribute and 983 # uses st_blocks units of 512 bytes, and if the filesystem is able to 984 # store holes of 4 KiB in files. 985 # 986 # The function returns False if page size is larger than 4 KiB. 987 # For example, ppc64 uses pages of 64 KiB. 988 if sys.platform.startswith("linux"): 989 # Linux evidentially has 512 byte st_blocks units. 990 name = os.path.join(TEMPDIR, "sparse-test") 991 with open(name, "wb") as fobj: 992 # Seek to "punch a hole" of 4 KiB 993 fobj.seek(4096) 994 fobj.write(b'x' * 4096) 995 fobj.truncate() 996 s = os.stat(name) 997 support.unlink(name) 998 return (s.st_blocks * 512 < s.st_size) 999 else: 1000 return False 1001 1002 1003class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1004 1005 subdir = "pax" 1006 longnametype = tarfile.XHDTYPE 1007 1008 def test_pax_global_headers(self): 1009 tar = tarfile.open(tarname, encoding="iso8859-1") 1010 try: 1011 tarinfo = tar.getmember("pax/regtype1") 1012 self.assertEqual(tarinfo.uname, "foo") 1013 self.assertEqual(tarinfo.gname, "bar") 1014 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1015 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1016 1017 tarinfo = tar.getmember("pax/regtype2") 1018 self.assertEqual(tarinfo.uname, "") 1019 self.assertEqual(tarinfo.gname, "bar") 1020 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1021 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1022 1023 tarinfo = tar.getmember("pax/regtype3") 1024 self.assertEqual(tarinfo.uname, "tarfile") 1025 self.assertEqual(tarinfo.gname, "tarfile") 1026 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1027 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1028 finally: 1029 tar.close() 1030 1031 def test_pax_number_fields(self): 1032 # All following number fields are read from the pax header. 1033 tar = tarfile.open(tarname, encoding="iso8859-1") 1034 try: 1035 tarinfo = tar.getmember("pax/regtype4") 1036 self.assertEqual(tarinfo.size, 7011) 1037 self.assertEqual(tarinfo.uid, 123) 1038 self.assertEqual(tarinfo.gid, 123) 1039 self.assertEqual(tarinfo.mtime, 1041808783.0) 1040 self.assertEqual(type(tarinfo.mtime), float) 1041 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1042 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1043 finally: 1044 tar.close() 1045 1046 1047class WriteTestBase(TarTest): 1048 # Put all write tests in here that are supposed to be tested 1049 # in all possible mode combinations. 1050 1051 def test_fileobj_no_close(self): 1052 fobj = io.BytesIO() 1053 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1054 tar.addfile(tarfile.TarInfo("foo")) 1055 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1056 # Issue #20238: Incomplete gzip output with mode="w:gz" 1057 data = fobj.getvalue() 1058 del tar 1059 support.gc_collect() 1060 self.assertFalse(fobj.closed) 1061 self.assertEqual(data, fobj.getvalue()) 1062 1063 def test_eof_marker(self): 1064 # Make sure an end of archive marker is written (two zero blocks). 1065 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1066 # So, we create an archive that has exactly 10240 bytes without the 1067 # marker, and has 20480 bytes once the marker is written. 1068 with tarfile.open(tmpname, self.mode) as tar: 1069 t = tarfile.TarInfo("foo") 1070 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1071 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1072 1073 with self.open(tmpname, "rb") as fobj: 1074 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1075 1076 1077class WriteTest(WriteTestBase, unittest.TestCase): 1078 1079 prefix = "w:" 1080 1081 def test_100_char_name(self): 1082 # The name field in a tar header stores strings of at most 100 chars. 1083 # If a string is shorter than 100 chars it has to be padded with '\0', 1084 # which implies that a string of exactly 100 chars is stored without 1085 # a trailing '\0'. 1086 name = "0123456789" * 10 1087 tar = tarfile.open(tmpname, self.mode) 1088 try: 1089 t = tarfile.TarInfo(name) 1090 tar.addfile(t) 1091 finally: 1092 tar.close() 1093 1094 tar = tarfile.open(tmpname) 1095 try: 1096 self.assertEqual(tar.getnames()[0], name, 1097 "failed to store 100 char filename") 1098 finally: 1099 tar.close() 1100 1101 def test_tar_size(self): 1102 # Test for bug #1013882. 1103 tar = tarfile.open(tmpname, self.mode) 1104 try: 1105 path = os.path.join(TEMPDIR, "file") 1106 with open(path, "wb") as fobj: 1107 fobj.write(b"aaa") 1108 tar.add(path) 1109 finally: 1110 tar.close() 1111 self.assertGreater(os.path.getsize(tmpname), 0, 1112 "tarfile is empty") 1113 1114 # The test_*_size tests test for bug #1167128. 1115 def test_file_size(self): 1116 tar = tarfile.open(tmpname, self.mode) 1117 try: 1118 path = os.path.join(TEMPDIR, "file") 1119 with open(path, "wb"): 1120 pass 1121 tarinfo = tar.gettarinfo(path) 1122 self.assertEqual(tarinfo.size, 0) 1123 1124 with open(path, "wb") as fobj: 1125 fobj.write(b"aaa") 1126 tarinfo = tar.gettarinfo(path) 1127 self.assertEqual(tarinfo.size, 3) 1128 finally: 1129 tar.close() 1130 1131 def test_directory_size(self): 1132 path = os.path.join(TEMPDIR, "directory") 1133 os.mkdir(path) 1134 try: 1135 tar = tarfile.open(tmpname, self.mode) 1136 try: 1137 tarinfo = tar.gettarinfo(path) 1138 self.assertEqual(tarinfo.size, 0) 1139 finally: 1140 tar.close() 1141 finally: 1142 support.rmdir(path) 1143 1144 # mock the following: 1145 # os.listdir: so we know that files are in the wrong order 1146 def test_ordered_recursion(self): 1147 path = os.path.join(TEMPDIR, "directory") 1148 os.mkdir(path) 1149 open(os.path.join(path, "1"), "a").close() 1150 open(os.path.join(path, "2"), "a").close() 1151 try: 1152 tar = tarfile.open(tmpname, self.mode) 1153 try: 1154 with unittest.mock.patch('os.listdir') as mock_listdir: 1155 mock_listdir.return_value = ["2", "1"] 1156 tar.add(path) 1157 paths = [] 1158 for m in tar.getmembers(): 1159 paths.append(os.path.split(m.name)[-1]) 1160 self.assertEqual(paths, ["directory", "1", "2"]); 1161 finally: 1162 tar.close() 1163 finally: 1164 support.unlink(os.path.join(path, "1")) 1165 support.unlink(os.path.join(path, "2")) 1166 support.rmdir(path) 1167 1168 def test_gettarinfo_pathlike_name(self): 1169 with tarfile.open(tmpname, self.mode) as tar: 1170 path = pathlib.Path(TEMPDIR) / "file" 1171 with open(path, "wb") as fobj: 1172 fobj.write(b"aaa") 1173 tarinfo = tar.gettarinfo(path) 1174 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1175 self.assertIsInstance(tarinfo.name, str) 1176 self.assertEqual(tarinfo.name, tarinfo2.name) 1177 self.assertEqual(tarinfo.size, 3) 1178 1179 @unittest.skipUnless(hasattr(os, "link"), 1180 "Missing hardlink implementation") 1181 def test_link_size(self): 1182 link = os.path.join(TEMPDIR, "link") 1183 target = os.path.join(TEMPDIR, "link_target") 1184 with open(target, "wb") as fobj: 1185 fobj.write(b"aaa") 1186 try: 1187 os.link(target, link) 1188 except PermissionError as e: 1189 self.skipTest('os.link(): %s' % e) 1190 try: 1191 tar = tarfile.open(tmpname, self.mode) 1192 try: 1193 # Record the link target in the inodes list. 1194 tar.gettarinfo(target) 1195 tarinfo = tar.gettarinfo(link) 1196 self.assertEqual(tarinfo.size, 0) 1197 finally: 1198 tar.close() 1199 finally: 1200 support.unlink(target) 1201 support.unlink(link) 1202 1203 @support.skip_unless_symlink 1204 def test_symlink_size(self): 1205 path = os.path.join(TEMPDIR, "symlink") 1206 os.symlink("link_target", path) 1207 try: 1208 tar = tarfile.open(tmpname, self.mode) 1209 try: 1210 tarinfo = tar.gettarinfo(path) 1211 self.assertEqual(tarinfo.size, 0) 1212 finally: 1213 tar.close() 1214 finally: 1215 support.unlink(path) 1216 1217 def test_add_self(self): 1218 # Test for #1257255. 1219 dstname = os.path.abspath(tmpname) 1220 tar = tarfile.open(tmpname, self.mode) 1221 try: 1222 self.assertEqual(tar.name, dstname, 1223 "archive name must be absolute") 1224 tar.add(dstname) 1225 self.assertEqual(tar.getnames(), [], 1226 "added the archive to itself") 1227 1228 with support.change_cwd(TEMPDIR): 1229 tar.add(dstname) 1230 self.assertEqual(tar.getnames(), [], 1231 "added the archive to itself") 1232 finally: 1233 tar.close() 1234 1235 def test_filter(self): 1236 tempdir = os.path.join(TEMPDIR, "filter") 1237 os.mkdir(tempdir) 1238 try: 1239 for name in ("foo", "bar", "baz"): 1240 name = os.path.join(tempdir, name) 1241 support.create_empty_file(name) 1242 1243 def filter(tarinfo): 1244 if os.path.basename(tarinfo.name) == "bar": 1245 return 1246 tarinfo.uid = 123 1247 tarinfo.uname = "foo" 1248 return tarinfo 1249 1250 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1251 try: 1252 tar.add(tempdir, arcname="empty_dir", filter=filter) 1253 finally: 1254 tar.close() 1255 1256 # Verify that filter is a keyword-only argument 1257 with self.assertRaises(TypeError): 1258 tar.add(tempdir, "empty_dir", True, None, filter) 1259 1260 tar = tarfile.open(tmpname, "r") 1261 try: 1262 for tarinfo in tar: 1263 self.assertEqual(tarinfo.uid, 123) 1264 self.assertEqual(tarinfo.uname, "foo") 1265 self.assertEqual(len(tar.getmembers()), 3) 1266 finally: 1267 tar.close() 1268 finally: 1269 support.rmtree(tempdir) 1270 1271 # Guarantee that stored pathnames are not modified. Don't 1272 # remove ./ or ../ or double slashes. Still make absolute 1273 # pathnames relative. 1274 # For details see bug #6054. 1275 def _test_pathname(self, path, cmp_path=None, dir=False): 1276 # Create a tarfile with an empty member named path 1277 # and compare the stored name with the original. 1278 foo = os.path.join(TEMPDIR, "foo") 1279 if not dir: 1280 support.create_empty_file(foo) 1281 else: 1282 os.mkdir(foo) 1283 1284 tar = tarfile.open(tmpname, self.mode) 1285 try: 1286 tar.add(foo, arcname=path) 1287 finally: 1288 tar.close() 1289 1290 tar = tarfile.open(tmpname, "r") 1291 try: 1292 t = tar.next() 1293 finally: 1294 tar.close() 1295 1296 if not dir: 1297 support.unlink(foo) 1298 else: 1299 support.rmdir(foo) 1300 1301 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1302 1303 1304 @support.skip_unless_symlink 1305 def test_extractall_symlinks(self): 1306 # Test if extractall works properly when tarfile contains symlinks 1307 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1308 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1309 os.mkdir(tempdir) 1310 try: 1311 source_file = os.path.join(tempdir,'source') 1312 target_file = os.path.join(tempdir,'symlink') 1313 with open(source_file,'w') as f: 1314 f.write('something\n') 1315 os.symlink(source_file, target_file) 1316 with tarfile.open(temparchive, 'w') as tar: 1317 tar.add(source_file) 1318 tar.add(target_file) 1319 # Let's extract it to the location which contains the symlink 1320 with tarfile.open(temparchive) as tar: 1321 # this should not raise OSError: [Errno 17] File exists 1322 try: 1323 tar.extractall(path=tempdir) 1324 except OSError: 1325 self.fail("extractall failed with symlinked files") 1326 finally: 1327 support.unlink(temparchive) 1328 support.rmtree(tempdir) 1329 1330 def test_pathnames(self): 1331 self._test_pathname("foo") 1332 self._test_pathname(os.path.join("foo", ".", "bar")) 1333 self._test_pathname(os.path.join("foo", "..", "bar")) 1334 self._test_pathname(os.path.join(".", "foo")) 1335 self._test_pathname(os.path.join(".", "foo", ".")) 1336 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1337 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1338 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1339 self._test_pathname(os.path.join("..", "foo")) 1340 self._test_pathname(os.path.join("..", "foo", "..")) 1341 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1342 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1343 1344 self._test_pathname("foo" + os.sep + os.sep + "bar") 1345 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1346 1347 def test_abs_pathnames(self): 1348 if sys.platform == "win32": 1349 self._test_pathname("C:\\foo", "foo") 1350 else: 1351 self._test_pathname("/foo", "foo") 1352 self._test_pathname("///foo", "foo") 1353 1354 def test_cwd(self): 1355 # Test adding the current working directory. 1356 with support.change_cwd(TEMPDIR): 1357 tar = tarfile.open(tmpname, self.mode) 1358 try: 1359 tar.add(".") 1360 finally: 1361 tar.close() 1362 1363 tar = tarfile.open(tmpname, "r") 1364 try: 1365 for t in tar: 1366 if t.name != ".": 1367 self.assertTrue(t.name.startswith("./"), t.name) 1368 finally: 1369 tar.close() 1370 1371 def test_open_nonwritable_fileobj(self): 1372 for exctype in OSError, EOFError, RuntimeError: 1373 class BadFile(io.BytesIO): 1374 first = True 1375 def write(self, data): 1376 if self.first: 1377 self.first = False 1378 raise exctype 1379 1380 f = BadFile() 1381 with self.assertRaises(exctype): 1382 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1383 format=tarfile.PAX_FORMAT, 1384 pax_headers={'non': 'empty'}) 1385 self.assertFalse(f.closed) 1386 1387class GzipWriteTest(GzipTest, WriteTest): 1388 pass 1389 1390class Bz2WriteTest(Bz2Test, WriteTest): 1391 pass 1392 1393class LzmaWriteTest(LzmaTest, WriteTest): 1394 pass 1395 1396 1397class StreamWriteTest(WriteTestBase, unittest.TestCase): 1398 1399 prefix = "w|" 1400 decompressor = None 1401 1402 def test_stream_padding(self): 1403 # Test for bug #1543303. 1404 tar = tarfile.open(tmpname, self.mode) 1405 tar.close() 1406 if self.decompressor: 1407 dec = self.decompressor() 1408 with open(tmpname, "rb") as fobj: 1409 data = fobj.read() 1410 data = dec.decompress(data) 1411 self.assertFalse(dec.unused_data, "found trailing data") 1412 else: 1413 with self.open(tmpname) as fobj: 1414 data = fobj.read() 1415 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1416 "incorrect zero padding") 1417 1418 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1419 "Missing umask implementation") 1420 def test_file_mode(self): 1421 # Test for issue #8464: Create files with correct 1422 # permissions. 1423 if os.path.exists(tmpname): 1424 support.unlink(tmpname) 1425 1426 original_umask = os.umask(0o022) 1427 try: 1428 tar = tarfile.open(tmpname, self.mode) 1429 tar.close() 1430 mode = os.stat(tmpname).st_mode & 0o777 1431 self.assertEqual(mode, 0o644, "wrong file permissions") 1432 finally: 1433 os.umask(original_umask) 1434 1435class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1436 pass 1437 1438class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1439 decompressor = bz2.BZ2Decompressor if bz2 else None 1440 1441class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1442 decompressor = lzma.LZMADecompressor if lzma else None 1443 1444 1445class GNUWriteTest(unittest.TestCase): 1446 # This testcase checks for correct creation of GNU Longname 1447 # and Longlink extended headers (cp. bug #812325). 1448 1449 def _length(self, s): 1450 blocks = len(s) // 512 + 1 1451 return blocks * 512 1452 1453 def _calc_size(self, name, link=None): 1454 # Initial tar header 1455 count = 512 1456 1457 if len(name) > tarfile.LENGTH_NAME: 1458 # GNU longname extended header + longname 1459 count += 512 1460 count += self._length(name) 1461 if link is not None and len(link) > tarfile.LENGTH_LINK: 1462 # GNU longlink extended header + longlink 1463 count += 512 1464 count += self._length(link) 1465 return count 1466 1467 def _test(self, name, link=None): 1468 tarinfo = tarfile.TarInfo(name) 1469 if link: 1470 tarinfo.linkname = link 1471 tarinfo.type = tarfile.LNKTYPE 1472 1473 tar = tarfile.open(tmpname, "w") 1474 try: 1475 tar.format = tarfile.GNU_FORMAT 1476 tar.addfile(tarinfo) 1477 1478 v1 = self._calc_size(name, link) 1479 v2 = tar.offset 1480 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1481 finally: 1482 tar.close() 1483 1484 tar = tarfile.open(tmpname) 1485 try: 1486 member = tar.next() 1487 self.assertIsNotNone(member, 1488 "unable to read longname member") 1489 self.assertEqual(tarinfo.name, member.name, 1490 "unable to read longname member") 1491 self.assertEqual(tarinfo.linkname, member.linkname, 1492 "unable to read longname member") 1493 finally: 1494 tar.close() 1495 1496 def test_longname_1023(self): 1497 self._test(("longnam/" * 127) + "longnam") 1498 1499 def test_longname_1024(self): 1500 self._test(("longnam/" * 127) + "longname") 1501 1502 def test_longname_1025(self): 1503 self._test(("longnam/" * 127) + "longname_") 1504 1505 def test_longlink_1023(self): 1506 self._test("name", ("longlnk/" * 127) + "longlnk") 1507 1508 def test_longlink_1024(self): 1509 self._test("name", ("longlnk/" * 127) + "longlink") 1510 1511 def test_longlink_1025(self): 1512 self._test("name", ("longlnk/" * 127) + "longlink_") 1513 1514 def test_longnamelink_1023(self): 1515 self._test(("longnam/" * 127) + "longnam", 1516 ("longlnk/" * 127) + "longlnk") 1517 1518 def test_longnamelink_1024(self): 1519 self._test(("longnam/" * 127) + "longname", 1520 ("longlnk/" * 127) + "longlink") 1521 1522 def test_longnamelink_1025(self): 1523 self._test(("longnam/" * 127) + "longname_", 1524 ("longlnk/" * 127) + "longlink_") 1525 1526 1527class CreateTest(WriteTestBase, unittest.TestCase): 1528 1529 prefix = "x:" 1530 1531 file_path = os.path.join(TEMPDIR, "spameggs42") 1532 1533 def setUp(self): 1534 support.unlink(tmpname) 1535 1536 @classmethod 1537 def setUpClass(cls): 1538 with open(cls.file_path, "wb") as fobj: 1539 fobj.write(b"aaa") 1540 1541 @classmethod 1542 def tearDownClass(cls): 1543 support.unlink(cls.file_path) 1544 1545 def test_create(self): 1546 with tarfile.open(tmpname, self.mode) as tobj: 1547 tobj.add(self.file_path) 1548 1549 with self.taropen(tmpname) as tobj: 1550 names = tobj.getnames() 1551 self.assertEqual(len(names), 1) 1552 self.assertIn('spameggs42', names[0]) 1553 1554 def test_create_existing(self): 1555 with tarfile.open(tmpname, self.mode) as tobj: 1556 tobj.add(self.file_path) 1557 1558 with self.assertRaises(FileExistsError): 1559 tobj = tarfile.open(tmpname, self.mode) 1560 1561 with self.taropen(tmpname) as tobj: 1562 names = tobj.getnames() 1563 self.assertEqual(len(names), 1) 1564 self.assertIn('spameggs42', names[0]) 1565 1566 def test_create_taropen(self): 1567 with self.taropen(tmpname, "x") as tobj: 1568 tobj.add(self.file_path) 1569 1570 with self.taropen(tmpname) as tobj: 1571 names = tobj.getnames() 1572 self.assertEqual(len(names), 1) 1573 self.assertIn('spameggs42', names[0]) 1574 1575 def test_create_existing_taropen(self): 1576 with self.taropen(tmpname, "x") as tobj: 1577 tobj.add(self.file_path) 1578 1579 with self.assertRaises(FileExistsError): 1580 with self.taropen(tmpname, "x"): 1581 pass 1582 1583 with self.taropen(tmpname) as tobj: 1584 names = tobj.getnames() 1585 self.assertEqual(len(names), 1) 1586 self.assertIn("spameggs42", names[0]) 1587 1588 def test_create_pathlike_name(self): 1589 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1590 self.assertIsInstance(tobj.name, str) 1591 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1592 tobj.add(pathlib.Path(self.file_path)) 1593 names = tobj.getnames() 1594 self.assertEqual(len(names), 1) 1595 self.assertIn('spameggs42', names[0]) 1596 1597 with self.taropen(tmpname) as tobj: 1598 names = tobj.getnames() 1599 self.assertEqual(len(names), 1) 1600 self.assertIn('spameggs42', names[0]) 1601 1602 def test_create_taropen_pathlike_name(self): 1603 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1604 self.assertIsInstance(tobj.name, str) 1605 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1606 tobj.add(pathlib.Path(self.file_path)) 1607 names = tobj.getnames() 1608 self.assertEqual(len(names), 1) 1609 self.assertIn('spameggs42', names[0]) 1610 1611 with self.taropen(tmpname) as tobj: 1612 names = tobj.getnames() 1613 self.assertEqual(len(names), 1) 1614 self.assertIn('spameggs42', names[0]) 1615 1616 1617class GzipCreateTest(GzipTest, CreateTest): 1618 pass 1619 1620 1621class Bz2CreateTest(Bz2Test, CreateTest): 1622 pass 1623 1624 1625class LzmaCreateTest(LzmaTest, CreateTest): 1626 pass 1627 1628 1629class CreateWithXModeTest(CreateTest): 1630 1631 prefix = "x" 1632 1633 test_create_taropen = None 1634 test_create_existing_taropen = None 1635 1636 1637@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1638class HardlinkTest(unittest.TestCase): 1639 # Test the creation of LNKTYPE (hardlink) members in an archive. 1640 1641 def setUp(self): 1642 self.foo = os.path.join(TEMPDIR, "foo") 1643 self.bar = os.path.join(TEMPDIR, "bar") 1644 1645 with open(self.foo, "wb") as fobj: 1646 fobj.write(b"foo") 1647 1648 try: 1649 os.link(self.foo, self.bar) 1650 except PermissionError as e: 1651 self.skipTest('os.link(): %s' % e) 1652 1653 self.tar = tarfile.open(tmpname, "w") 1654 self.tar.add(self.foo) 1655 1656 def tearDown(self): 1657 self.tar.close() 1658 support.unlink(self.foo) 1659 support.unlink(self.bar) 1660 1661 def test_add_twice(self): 1662 # The same name will be added as a REGTYPE every 1663 # time regardless of st_nlink. 1664 tarinfo = self.tar.gettarinfo(self.foo) 1665 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1666 "add file as regular failed") 1667 1668 def test_add_hardlink(self): 1669 tarinfo = self.tar.gettarinfo(self.bar) 1670 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1671 "add file as hardlink failed") 1672 1673 def test_dereference_hardlink(self): 1674 self.tar.dereference = True 1675 tarinfo = self.tar.gettarinfo(self.bar) 1676 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1677 "dereferencing hardlink failed") 1678 1679 1680class PaxWriteTest(GNUWriteTest): 1681 1682 def _test(self, name, link=None): 1683 # See GNUWriteTest. 1684 tarinfo = tarfile.TarInfo(name) 1685 if link: 1686 tarinfo.linkname = link 1687 tarinfo.type = tarfile.LNKTYPE 1688 1689 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1690 try: 1691 tar.addfile(tarinfo) 1692 finally: 1693 tar.close() 1694 1695 tar = tarfile.open(tmpname) 1696 try: 1697 if link: 1698 l = tar.getmembers()[0].linkname 1699 self.assertEqual(link, l, "PAX longlink creation failed") 1700 else: 1701 n = tar.getmembers()[0].name 1702 self.assertEqual(name, n, "PAX longname creation failed") 1703 finally: 1704 tar.close() 1705 1706 def test_pax_global_header(self): 1707 pax_headers = { 1708 "foo": "bar", 1709 "uid": "0", 1710 "mtime": "1.23", 1711 "test": "\xe4\xf6\xfc", 1712 "\xe4\xf6\xfc": "test"} 1713 1714 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1715 pax_headers=pax_headers) 1716 try: 1717 tar.addfile(tarfile.TarInfo("test")) 1718 finally: 1719 tar.close() 1720 1721 # Test if the global header was written correctly. 1722 tar = tarfile.open(tmpname, encoding="iso8859-1") 1723 try: 1724 self.assertEqual(tar.pax_headers, pax_headers) 1725 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1726 # Test if all the fields are strings. 1727 for key, val in tar.pax_headers.items(): 1728 self.assertIsNot(type(key), bytes) 1729 self.assertIsNot(type(val), bytes) 1730 if key in tarfile.PAX_NUMBER_FIELDS: 1731 try: 1732 tarfile.PAX_NUMBER_FIELDS[key](val) 1733 except (TypeError, ValueError): 1734 self.fail("unable to convert pax header field") 1735 finally: 1736 tar.close() 1737 1738 def test_pax_extended_header(self): 1739 # The fields from the pax header have priority over the 1740 # TarInfo. 1741 pax_headers = {"path": "foo", "uid": "123"} 1742 1743 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1744 encoding="iso8859-1") 1745 try: 1746 t = tarfile.TarInfo() 1747 t.name = "\xe4\xf6\xfc" # non-ASCII 1748 t.uid = 8**8 # too large 1749 t.pax_headers = pax_headers 1750 tar.addfile(t) 1751 finally: 1752 tar.close() 1753 1754 tar = tarfile.open(tmpname, encoding="iso8859-1") 1755 try: 1756 t = tar.getmembers()[0] 1757 self.assertEqual(t.pax_headers, pax_headers) 1758 self.assertEqual(t.name, "foo") 1759 self.assertEqual(t.uid, 123) 1760 finally: 1761 tar.close() 1762 1763 1764class UnicodeTest: 1765 1766 def test_iso8859_1_filename(self): 1767 self._test_unicode_filename("iso8859-1") 1768 1769 def test_utf7_filename(self): 1770 self._test_unicode_filename("utf7") 1771 1772 def test_utf8_filename(self): 1773 self._test_unicode_filename("utf-8") 1774 1775 def _test_unicode_filename(self, encoding): 1776 tar = tarfile.open(tmpname, "w", format=self.format, 1777 encoding=encoding, errors="strict") 1778 try: 1779 name = "\xe4\xf6\xfc" 1780 tar.addfile(tarfile.TarInfo(name)) 1781 finally: 1782 tar.close() 1783 1784 tar = tarfile.open(tmpname, encoding=encoding) 1785 try: 1786 self.assertEqual(tar.getmembers()[0].name, name) 1787 finally: 1788 tar.close() 1789 1790 def test_unicode_filename_error(self): 1791 tar = tarfile.open(tmpname, "w", format=self.format, 1792 encoding="ascii", errors="strict") 1793 try: 1794 tarinfo = tarfile.TarInfo() 1795 1796 tarinfo.name = "\xe4\xf6\xfc" 1797 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1798 1799 tarinfo.name = "foo" 1800 tarinfo.uname = "\xe4\xf6\xfc" 1801 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1802 finally: 1803 tar.close() 1804 1805 def test_unicode_argument(self): 1806 tar = tarfile.open(tarname, "r", 1807 encoding="iso8859-1", errors="strict") 1808 try: 1809 for t in tar: 1810 self.assertIs(type(t.name), str) 1811 self.assertIs(type(t.linkname), str) 1812 self.assertIs(type(t.uname), str) 1813 self.assertIs(type(t.gname), str) 1814 finally: 1815 tar.close() 1816 1817 def test_uname_unicode(self): 1818 t = tarfile.TarInfo("foo") 1819 t.uname = "\xe4\xf6\xfc" 1820 t.gname = "\xe4\xf6\xfc" 1821 1822 tar = tarfile.open(tmpname, mode="w", format=self.format, 1823 encoding="iso8859-1") 1824 try: 1825 tar.addfile(t) 1826 finally: 1827 tar.close() 1828 1829 tar = tarfile.open(tmpname, encoding="iso8859-1") 1830 try: 1831 t = tar.getmember("foo") 1832 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1833 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1834 1835 if self.format != tarfile.PAX_FORMAT: 1836 tar.close() 1837 tar = tarfile.open(tmpname, encoding="ascii") 1838 t = tar.getmember("foo") 1839 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1840 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1841 finally: 1842 tar.close() 1843 1844 1845class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1846 1847 format = tarfile.USTAR_FORMAT 1848 1849 # Test whether the utf-8 encoded version of a filename exceeds the 100 1850 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1851 # bytes). 1852 def test_unicode_name1(self): 1853 self._test_ustar_name("0123456789" * 10) 1854 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1855 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1856 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1857 1858 def test_unicode_name2(self): 1859 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1860 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1861 1862 # Test whether the utf-8 encoded version of a filename exceeds the 155 1863 # bytes prefix + '/' + 100 bytes name limit. 1864 def test_unicode_longname1(self): 1865 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1866 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1867 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1868 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1869 1870 def test_unicode_longname2(self): 1871 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1872 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1873 1874 def test_unicode_longname3(self): 1875 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1876 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1877 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1878 1879 def test_unicode_longname4(self): 1880 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1881 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1882 1883 def _test_ustar_name(self, name, exc=None): 1884 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1885 t = tarfile.TarInfo(name) 1886 if exc is None: 1887 tar.addfile(t) 1888 else: 1889 self.assertRaises(exc, tar.addfile, t) 1890 1891 if exc is None: 1892 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1893 for t in tar: 1894 self.assertEqual(name, t.name) 1895 break 1896 1897 # Test the same as above for the 100 bytes link field. 1898 def test_unicode_link1(self): 1899 self._test_ustar_link("0123456789" * 10) 1900 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1901 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1902 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1903 1904 def test_unicode_link2(self): 1905 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1906 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1907 1908 def _test_ustar_link(self, name, exc=None): 1909 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1910 t = tarfile.TarInfo("foo") 1911 t.linkname = name 1912 if exc is None: 1913 tar.addfile(t) 1914 else: 1915 self.assertRaises(exc, tar.addfile, t) 1916 1917 if exc is None: 1918 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1919 for t in tar: 1920 self.assertEqual(name, t.linkname) 1921 break 1922 1923 1924class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 1925 1926 format = tarfile.GNU_FORMAT 1927 1928 def test_bad_pax_header(self): 1929 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 1930 # without a hdrcharset=BINARY header. 1931 for encoding, name in ( 1932 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 1933 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 1934 with tarfile.open(tarname, encoding=encoding, 1935 errors="surrogateescape") as tar: 1936 try: 1937 t = tar.getmember(name) 1938 except KeyError: 1939 self.fail("unable to read bad GNU tar pax header") 1940 1941 1942class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 1943 1944 format = tarfile.PAX_FORMAT 1945 1946 # PAX_FORMAT ignores encoding in write mode. 1947 test_unicode_filename_error = None 1948 1949 def test_binary_header(self): 1950 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 1951 for encoding, name in ( 1952 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 1953 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 1954 with tarfile.open(tarname, encoding=encoding, 1955 errors="surrogateescape") as tar: 1956 try: 1957 t = tar.getmember(name) 1958 except KeyError: 1959 self.fail("unable to read POSIX.1-2008 binary header") 1960 1961 1962class AppendTestBase: 1963 # Test append mode (cp. patch #1652681). 1964 1965 def setUp(self): 1966 self.tarname = tmpname 1967 if os.path.exists(self.tarname): 1968 support.unlink(self.tarname) 1969 1970 def _create_testtar(self, mode="w:"): 1971 with tarfile.open(tarname, encoding="iso8859-1") as src: 1972 t = src.getmember("ustar/regtype") 1973 t.name = "foo" 1974 with src.extractfile(t) as f: 1975 with tarfile.open(self.tarname, mode) as tar: 1976 tar.addfile(t, f) 1977 1978 def test_append_compressed(self): 1979 self._create_testtar("w:" + self.suffix) 1980 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1981 1982class AppendTest(AppendTestBase, unittest.TestCase): 1983 test_append_compressed = None 1984 1985 def _add_testfile(self, fileobj=None): 1986 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1987 tar.addfile(tarfile.TarInfo("bar")) 1988 1989 def _test(self, names=["bar"], fileobj=None): 1990 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1991 self.assertEqual(tar.getnames(), names) 1992 1993 def test_non_existing(self): 1994 self._add_testfile() 1995 self._test() 1996 1997 def test_empty(self): 1998 tarfile.open(self.tarname, "w:").close() 1999 self._add_testfile() 2000 self._test() 2001 2002 def test_empty_fileobj(self): 2003 fobj = io.BytesIO(b"\0" * 1024) 2004 self._add_testfile(fobj) 2005 fobj.seek(0) 2006 self._test(fileobj=fobj) 2007 2008 def test_fileobj(self): 2009 self._create_testtar() 2010 with open(self.tarname, "rb") as fobj: 2011 data = fobj.read() 2012 fobj = io.BytesIO(data) 2013 self._add_testfile(fobj) 2014 fobj.seek(0) 2015 self._test(names=["foo", "bar"], fileobj=fobj) 2016 2017 def test_existing(self): 2018 self._create_testtar() 2019 self._add_testfile() 2020 self._test(names=["foo", "bar"]) 2021 2022 # Append mode is supposed to fail if the tarfile to append to 2023 # does not end with a zero block. 2024 def _test_error(self, data): 2025 with open(self.tarname, "wb") as fobj: 2026 fobj.write(data) 2027 self.assertRaises(tarfile.ReadError, self._add_testfile) 2028 2029 def test_null(self): 2030 self._test_error(b"") 2031 2032 def test_incomplete(self): 2033 self._test_error(b"\0" * 13) 2034 2035 def test_premature_eof(self): 2036 data = tarfile.TarInfo("foo").tobuf() 2037 self._test_error(data) 2038 2039 def test_trailing_garbage(self): 2040 data = tarfile.TarInfo("foo").tobuf() 2041 self._test_error(data + b"\0" * 13) 2042 2043 def test_invalid(self): 2044 self._test_error(b"a" * 512) 2045 2046class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2047 pass 2048 2049class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2050 pass 2051 2052class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2053 pass 2054 2055 2056class LimitsTest(unittest.TestCase): 2057 2058 def test_ustar_limits(self): 2059 # 100 char name 2060 tarinfo = tarfile.TarInfo("0123456789" * 10) 2061 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2062 2063 # 101 char name that cannot be stored 2064 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2065 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2066 2067 # 256 char name with a slash at pos 156 2068 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2069 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2070 2071 # 256 char name that cannot be stored 2072 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2073 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2074 2075 # 512 char name 2076 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2077 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2078 2079 # 512 char linkname 2080 tarinfo = tarfile.TarInfo("longlink") 2081 tarinfo.linkname = "123/" * 126 + "longname" 2082 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2083 2084 # uid > 8 digits 2085 tarinfo = tarfile.TarInfo("name") 2086 tarinfo.uid = 0o10000000 2087 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2088 2089 def test_gnu_limits(self): 2090 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2091 tarinfo.tobuf(tarfile.GNU_FORMAT) 2092 2093 tarinfo = tarfile.TarInfo("longlink") 2094 tarinfo.linkname = "123/" * 126 + "longname" 2095 tarinfo.tobuf(tarfile.GNU_FORMAT) 2096 2097 # uid >= 256 ** 7 2098 tarinfo = tarfile.TarInfo("name") 2099 tarinfo.uid = 0o4000000000000000000 2100 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2101 2102 def test_pax_limits(self): 2103 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2104 tarinfo.tobuf(tarfile.PAX_FORMAT) 2105 2106 tarinfo = tarfile.TarInfo("longlink") 2107 tarinfo.linkname = "123/" * 126 + "longname" 2108 tarinfo.tobuf(tarfile.PAX_FORMAT) 2109 2110 tarinfo = tarfile.TarInfo("name") 2111 tarinfo.uid = 0o4000000000000000000 2112 tarinfo.tobuf(tarfile.PAX_FORMAT) 2113 2114 2115class MiscTest(unittest.TestCase): 2116 2117 def test_char_fields(self): 2118 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2119 b"foo\0\0\0\0\0") 2120 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2121 b"foo") 2122 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2123 "foo") 2124 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2125 "foo") 2126 2127 def test_read_number_fields(self): 2128 # Issue 13158: Test if GNU tar specific base-256 number fields 2129 # are decoded correctly. 2130 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2131 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2132 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2133 0o10000000) 2134 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2135 0xffffffff) 2136 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2137 -1) 2138 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2139 -100) 2140 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2141 -0x100000000000000) 2142 2143 # Issue 24514: Test if empty number fields are converted to zero. 2144 self.assertEqual(tarfile.nti(b"\0"), 0) 2145 self.assertEqual(tarfile.nti(b" \0"), 0) 2146 2147 def test_write_number_fields(self): 2148 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2149 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2150 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2151 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2152 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2153 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2154 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2155 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2156 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2157 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2158 self.assertEqual(tarfile.itn(-0x100000000000000, 2159 format=tarfile.GNU_FORMAT), 2160 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2161 2162 # Issue 32713: Test if itn() supports float values outside the 2163 # non-GNU format range 2164 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2165 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2166 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2167 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2168 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2169 2170 def test_number_field_limits(self): 2171 with self.assertRaises(ValueError): 2172 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2173 with self.assertRaises(ValueError): 2174 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2175 with self.assertRaises(ValueError): 2176 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2177 with self.assertRaises(ValueError): 2178 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2179 2180 def test__all__(self): 2181 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2182 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2183 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2184 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2185 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2186 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2187 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2188 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2189 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2190 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2191 'filemode', 2192 'EmptyHeaderError', 'TruncatedHeaderError', 2193 'EOFHeaderError', 'InvalidHeaderError', 2194 'SubsequentHeaderError', 'ExFileObject', 2195 'main'} 2196 support.check__all__(self, tarfile, blacklist=blacklist) 2197 2198 2199class CommandLineTest(unittest.TestCase): 2200 2201 def tarfilecmd(self, *args, **kwargs): 2202 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2203 **kwargs) 2204 return out.replace(os.linesep.encode(), b'\n') 2205 2206 def tarfilecmd_failure(self, *args): 2207 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2208 2209 def make_simple_tarfile(self, tar_name): 2210 files = [support.findfile('tokenize_tests.txt'), 2211 support.findfile('tokenize_tests-no-coding-cookie-' 2212 'and-utf8-bom-sig-only.txt')] 2213 self.addCleanup(support.unlink, tar_name) 2214 with tarfile.open(tar_name, 'w') as tf: 2215 for tardata in files: 2216 tf.add(tardata, arcname=os.path.basename(tardata)) 2217 2218 def test_bad_use(self): 2219 rc, out, err = self.tarfilecmd_failure() 2220 self.assertEqual(out, b'') 2221 self.assertIn(b'usage', err.lower()) 2222 self.assertIn(b'error', err.lower()) 2223 self.assertIn(b'required', err.lower()) 2224 rc, out, err = self.tarfilecmd_failure('-l', '') 2225 self.assertEqual(out, b'') 2226 self.assertNotEqual(err.strip(), b'') 2227 2228 def test_test_command(self): 2229 for tar_name in testtarnames: 2230 for opt in '-t', '--test': 2231 out = self.tarfilecmd(opt, tar_name) 2232 self.assertEqual(out, b'') 2233 2234 def test_test_command_verbose(self): 2235 for tar_name in testtarnames: 2236 for opt in '-v', '--verbose': 2237 out = self.tarfilecmd(opt, '-t', tar_name) 2238 self.assertIn(b'is a tar archive.\n', out) 2239 2240 def test_test_command_invalid_file(self): 2241 zipname = support.findfile('zipdir.zip') 2242 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2243 self.assertIn(b' is not a tar archive.', err) 2244 self.assertEqual(out, b'') 2245 self.assertEqual(rc, 1) 2246 2247 for tar_name in testtarnames: 2248 with self.subTest(tar_name=tar_name): 2249 with open(tar_name, 'rb') as f: 2250 data = f.read() 2251 try: 2252 with open(tmpname, 'wb') as f: 2253 f.write(data[:511]) 2254 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2255 self.assertEqual(out, b'') 2256 self.assertEqual(rc, 1) 2257 finally: 2258 support.unlink(tmpname) 2259 2260 def test_list_command(self): 2261 for tar_name in testtarnames: 2262 with support.captured_stdout() as t: 2263 with tarfile.open(tar_name, 'r') as tf: 2264 tf.list(verbose=False) 2265 expected = t.getvalue().encode('ascii', 'backslashreplace') 2266 for opt in '-l', '--list': 2267 out = self.tarfilecmd(opt, tar_name, 2268 PYTHONIOENCODING='ascii') 2269 self.assertEqual(out, expected) 2270 2271 def test_list_command_verbose(self): 2272 for tar_name in testtarnames: 2273 with support.captured_stdout() as t: 2274 with tarfile.open(tar_name, 'r') as tf: 2275 tf.list(verbose=True) 2276 expected = t.getvalue().encode('ascii', 'backslashreplace') 2277 for opt in '-v', '--verbose': 2278 out = self.tarfilecmd(opt, '-l', tar_name, 2279 PYTHONIOENCODING='ascii') 2280 self.assertEqual(out, expected) 2281 2282 def test_list_command_invalid_file(self): 2283 zipname = support.findfile('zipdir.zip') 2284 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2285 self.assertIn(b' is not a tar archive.', err) 2286 self.assertEqual(out, b'') 2287 self.assertEqual(rc, 1) 2288 2289 def test_create_command(self): 2290 files = [support.findfile('tokenize_tests.txt'), 2291 support.findfile('tokenize_tests-no-coding-cookie-' 2292 'and-utf8-bom-sig-only.txt')] 2293 for opt in '-c', '--create': 2294 try: 2295 out = self.tarfilecmd(opt, tmpname, *files) 2296 self.assertEqual(out, b'') 2297 with tarfile.open(tmpname) as tar: 2298 tar.getmembers() 2299 finally: 2300 support.unlink(tmpname) 2301 2302 def test_create_command_verbose(self): 2303 files = [support.findfile('tokenize_tests.txt'), 2304 support.findfile('tokenize_tests-no-coding-cookie-' 2305 'and-utf8-bom-sig-only.txt')] 2306 for opt in '-v', '--verbose': 2307 try: 2308 out = self.tarfilecmd(opt, '-c', tmpname, *files) 2309 self.assertIn(b' file created.', out) 2310 with tarfile.open(tmpname) as tar: 2311 tar.getmembers() 2312 finally: 2313 support.unlink(tmpname) 2314 2315 def test_create_command_dotless_filename(self): 2316 files = [support.findfile('tokenize_tests.txt')] 2317 try: 2318 out = self.tarfilecmd('-c', dotlessname, *files) 2319 self.assertEqual(out, b'') 2320 with tarfile.open(dotlessname) as tar: 2321 tar.getmembers() 2322 finally: 2323 support.unlink(dotlessname) 2324 2325 def test_create_command_dot_started_filename(self): 2326 tar_name = os.path.join(TEMPDIR, ".testtar") 2327 files = [support.findfile('tokenize_tests.txt')] 2328 try: 2329 out = self.tarfilecmd('-c', tar_name, *files) 2330 self.assertEqual(out, b'') 2331 with tarfile.open(tar_name) as tar: 2332 tar.getmembers() 2333 finally: 2334 support.unlink(tar_name) 2335 2336 def test_create_command_compressed(self): 2337 files = [support.findfile('tokenize_tests.txt'), 2338 support.findfile('tokenize_tests-no-coding-cookie-' 2339 'and-utf8-bom-sig-only.txt')] 2340 for filetype in (GzipTest, Bz2Test, LzmaTest): 2341 if not filetype.open: 2342 continue 2343 try: 2344 tar_name = tmpname + '.' + filetype.suffix 2345 out = self.tarfilecmd('-c', tar_name, *files) 2346 with filetype.taropen(tar_name) as tar: 2347 tar.getmembers() 2348 finally: 2349 support.unlink(tar_name) 2350 2351 def test_extract_command(self): 2352 self.make_simple_tarfile(tmpname) 2353 for opt in '-e', '--extract': 2354 try: 2355 with support.temp_cwd(tarextdir): 2356 out = self.tarfilecmd(opt, tmpname) 2357 self.assertEqual(out, b'') 2358 finally: 2359 support.rmtree(tarextdir) 2360 2361 def test_extract_command_verbose(self): 2362 self.make_simple_tarfile(tmpname) 2363 for opt in '-v', '--verbose': 2364 try: 2365 with support.temp_cwd(tarextdir): 2366 out = self.tarfilecmd(opt, '-e', tmpname) 2367 self.assertIn(b' file is extracted.', out) 2368 finally: 2369 support.rmtree(tarextdir) 2370 2371 def test_extract_command_different_directory(self): 2372 self.make_simple_tarfile(tmpname) 2373 try: 2374 with support.temp_cwd(tarextdir): 2375 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2376 self.assertEqual(out, b'') 2377 finally: 2378 support.rmtree(tarextdir) 2379 2380 def test_extract_command_invalid_file(self): 2381 zipname = support.findfile('zipdir.zip') 2382 with support.temp_cwd(tarextdir): 2383 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2384 self.assertIn(b' is not a tar archive.', err) 2385 self.assertEqual(out, b'') 2386 self.assertEqual(rc, 1) 2387 2388 2389class ContextManagerTest(unittest.TestCase): 2390 2391 def test_basic(self): 2392 with tarfile.open(tarname) as tar: 2393 self.assertFalse(tar.closed, "closed inside runtime context") 2394 self.assertTrue(tar.closed, "context manager failed") 2395 2396 def test_closed(self): 2397 # The __enter__() method is supposed to raise OSError 2398 # if the TarFile object is already closed. 2399 tar = tarfile.open(tarname) 2400 tar.close() 2401 with self.assertRaises(OSError): 2402 with tar: 2403 pass 2404 2405 def test_exception(self): 2406 # Test if the OSError exception is passed through properly. 2407 with self.assertRaises(Exception) as exc: 2408 with tarfile.open(tarname) as tar: 2409 raise OSError 2410 self.assertIsInstance(exc.exception, OSError, 2411 "wrong exception raised in context manager") 2412 self.assertTrue(tar.closed, "context manager failed") 2413 2414 def test_no_eof(self): 2415 # __exit__() must not write end-of-archive blocks if an 2416 # exception was raised. 2417 try: 2418 with tarfile.open(tmpname, "w") as tar: 2419 raise Exception 2420 except: 2421 pass 2422 self.assertEqual(os.path.getsize(tmpname), 0, 2423 "context manager wrote an end-of-archive block") 2424 self.assertTrue(tar.closed, "context manager failed") 2425 2426 def test_eof(self): 2427 # __exit__() must write end-of-archive blocks, i.e. call 2428 # TarFile.close() if there was no error. 2429 with tarfile.open(tmpname, "w"): 2430 pass 2431 self.assertNotEqual(os.path.getsize(tmpname), 0, 2432 "context manager wrote no end-of-archive block") 2433 2434 def test_fileobj(self): 2435 # Test that __exit__() did not close the external file 2436 # object. 2437 with open(tmpname, "wb") as fobj: 2438 try: 2439 with tarfile.open(fileobj=fobj, mode="w") as tar: 2440 raise Exception 2441 except: 2442 pass 2443 self.assertFalse(fobj.closed, "external file object was closed") 2444 self.assertTrue(tar.closed, "context manager failed") 2445 2446 2447@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2448class LinkEmulationTest(ReadTest, unittest.TestCase): 2449 2450 # Test for issue #8741 regression. On platforms that do not support 2451 # symbolic or hard links tarfile tries to extract these types of members 2452 # as the regular files they point to. 2453 def _test_link_extraction(self, name): 2454 self.tar.extract(name, TEMPDIR) 2455 with open(os.path.join(TEMPDIR, name), "rb") as f: 2456 data = f.read() 2457 self.assertEqual(sha256sum(data), sha256_regtype) 2458 2459 # See issues #1578269, #8879, and #17689 for some history on these skips 2460 @unittest.skipIf(hasattr(os.path, "islink"), 2461 "Skip emulation - has os.path.islink but not os.link") 2462 def test_hardlink_extraction1(self): 2463 self._test_link_extraction("ustar/lnktype") 2464 2465 @unittest.skipIf(hasattr(os.path, "islink"), 2466 "Skip emulation - has os.path.islink but not os.link") 2467 def test_hardlink_extraction2(self): 2468 self._test_link_extraction("./ustar/linktest2/lnktype") 2469 2470 @unittest.skipIf(hasattr(os, "symlink"), 2471 "Skip emulation if symlink exists") 2472 def test_symlink_extraction1(self): 2473 self._test_link_extraction("ustar/symtype") 2474 2475 @unittest.skipIf(hasattr(os, "symlink"), 2476 "Skip emulation if symlink exists") 2477 def test_symlink_extraction2(self): 2478 self._test_link_extraction("./ustar/linktest2/symtype") 2479 2480 2481class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2482 # Issue5068: The _BZ2Proxy.read() method loops forever 2483 # on an empty or partial bzipped file. 2484 2485 def _test_partial_input(self, mode): 2486 class MyBytesIO(io.BytesIO): 2487 hit_eof = False 2488 def read(self, n): 2489 if self.hit_eof: 2490 raise AssertionError("infinite loop detected in " 2491 "tarfile.open()") 2492 self.hit_eof = self.tell() == len(self.getvalue()) 2493 return super(MyBytesIO, self).read(n) 2494 def seek(self, *args): 2495 self.hit_eof = False 2496 return super(MyBytesIO, self).seek(*args) 2497 2498 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2499 for x in range(len(data) + 1): 2500 try: 2501 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2502 except tarfile.ReadError: 2503 pass # we have no interest in ReadErrors 2504 2505 def test_partial_input(self): 2506 self._test_partial_input("r") 2507 2508 def test_partial_input_bz2(self): 2509 self._test_partial_input("r:bz2") 2510 2511 2512def root_is_uid_gid_0(): 2513 try: 2514 import pwd, grp 2515 except ImportError: 2516 return False 2517 if pwd.getpwuid(0)[0] != 'root': 2518 return False 2519 if grp.getgrgid(0)[0] != 'root': 2520 return False 2521 return True 2522 2523 2524@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2525@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2526class NumericOwnerTest(unittest.TestCase): 2527 # mock the following: 2528 # os.chown: so we can test what's being called 2529 # os.chmod: so the modes are not actually changed. if they are, we can't 2530 # delete the files/directories 2531 # os.geteuid: so we can lie and say we're root (uid = 0) 2532 2533 @staticmethod 2534 def _make_test_archive(filename_1, dirname_1, filename_2): 2535 # the file contents to write 2536 fobj = io.BytesIO(b"content") 2537 2538 # create a tar file with a file, a directory, and a file within that 2539 # directory. Assign various .uid/.gid values to them 2540 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2541 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2542 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2543 ] 2544 with tarfile.open(tmpname, 'w') as tarfl: 2545 for name, uid, gid, typ, contents in items: 2546 t = tarfile.TarInfo(name) 2547 t.uid = uid 2548 t.gid = gid 2549 t.uname = 'root' 2550 t.gname = 'root' 2551 t.type = typ 2552 tarfl.addfile(t, contents) 2553 2554 # return the full pathname to the tar file 2555 return tmpname 2556 2557 @staticmethod 2558 @contextmanager 2559 def _setup_test(mock_geteuid): 2560 mock_geteuid.return_value = 0 # lie and say we're root 2561 fname = 'numeric-owner-testfile' 2562 dirname = 'dir' 2563 2564 # the names we want stored in the tarfile 2565 filename_1 = fname 2566 dirname_1 = dirname 2567 filename_2 = os.path.join(dirname, fname) 2568 2569 # create the tarfile with the contents we're after 2570 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2571 dirname_1, 2572 filename_2) 2573 2574 # open the tarfile for reading. yield it and the names of the items 2575 # we stored into the file 2576 with tarfile.open(tar_filename) as tarfl: 2577 yield tarfl, filename_1, dirname_1, filename_2 2578 2579 @unittest.mock.patch('os.chown') 2580 @unittest.mock.patch('os.chmod') 2581 @unittest.mock.patch('os.geteuid') 2582 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2583 mock_chown): 2584 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2585 filename_2): 2586 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2587 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2588 2589 # convert to filesystem paths 2590 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2591 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2592 2593 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2594 unittest.mock.call(f_filename_2, 88, 87), 2595 ], 2596 any_order=True) 2597 2598 @unittest.mock.patch('os.chown') 2599 @unittest.mock.patch('os.chmod') 2600 @unittest.mock.patch('os.geteuid') 2601 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2602 mock_chown): 2603 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2604 filename_2): 2605 tarfl.extractall(TEMPDIR, numeric_owner=True) 2606 2607 # convert to filesystem paths 2608 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2609 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2610 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2611 2612 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2613 unittest.mock.call(f_dirname_1, 77, 76), 2614 unittest.mock.call(f_filename_2, 88, 87), 2615 ], 2616 any_order=True) 2617 2618 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2619 # because the uname and gname in the test file are 'root', and extract() 2620 # will look them up using pwd and grp to find their uid and gid, which we 2621 # test here to be 0. 2622 @unittest.skipUnless(root_is_uid_gid_0(), 2623 'uid=0,gid=0 must be named "root"') 2624 @unittest.mock.patch('os.chown') 2625 @unittest.mock.patch('os.chmod') 2626 @unittest.mock.patch('os.geteuid') 2627 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2628 mock_chown): 2629 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2630 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2631 2632 # convert to filesystem paths 2633 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2634 2635 mock_chown.assert_called_with(f_filename_1, 0, 0) 2636 2637 @unittest.mock.patch('os.geteuid') 2638 def test_keyword_only(self, mock_geteuid): 2639 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2640 self.assertRaises(TypeError, 2641 tarfl.extract, filename_1, TEMPDIR, False, True) 2642 2643 2644def setUpModule(): 2645 support.unlink(TEMPDIR) 2646 os.makedirs(TEMPDIR) 2647 2648 global testtarnames 2649 testtarnames = [tarname] 2650 with open(tarname, "rb") as fobj: 2651 data = fobj.read() 2652 2653 # Create compressed tarfiles. 2654 for c in GzipTest, Bz2Test, LzmaTest: 2655 if c.open: 2656 support.unlink(c.tarname) 2657 testtarnames.append(c.tarname) 2658 with c.open(c.tarname, "wb") as tar: 2659 tar.write(data) 2660 2661def tearDownModule(): 2662 if os.path.exists(TEMPDIR): 2663 support.rmtree(TEMPDIR) 2664 2665if __name__ == "__main__": 2666 unittest.main() 2667