1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8 9import unittest 10import unittest.mock 11import tarfile 12 13from test import support 14from test.support import script_helper 15 16# Check for our compression modules. 17try: 18 import gzip 19except ImportError: 20 gzip = None 21try: 22 import bz2 23except ImportError: 24 bz2 = None 25try: 26 import lzma 27except ImportError: 28 lzma = None 29 30def sha256sum(data): 31 return sha256(data).hexdigest() 32 33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir" 34tarextdir = TEMPDIR + '-extract-test' 35tarname = support.findfile("testtar.tar") 36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 38xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 39tmpname = os.path.join(TEMPDIR, "tmp.tar") 40dotlessname = os.path.join(TEMPDIR, "testtar") 41 42sha256_regtype = ( 43 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 44) 45sha256_sparse = ( 46 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 47) 48 49 50class TarTest: 51 tarname = tarname 52 suffix = '' 53 open = io.FileIO 54 taropen = tarfile.TarFile.taropen 55 56 @property 57 def mode(self): 58 return self.prefix + self.suffix 59 60@support.requires_gzip() 61class GzipTest: 62 tarname = gzipname 63 suffix = 'gz' 64 open = gzip.GzipFile if gzip else None 65 taropen = tarfile.TarFile.gzopen 66 67@support.requires_bz2() 68class Bz2Test: 69 tarname = bz2name 70 suffix = 'bz2' 71 open = bz2.BZ2File if bz2 else None 72 taropen = tarfile.TarFile.bz2open 73 74@support.requires_lzma() 75class LzmaTest: 76 tarname = xzname 77 suffix = 'xz' 78 open = lzma.LZMAFile if lzma else None 79 taropen = tarfile.TarFile.xzopen 80 81 82class ReadTest(TarTest): 83 84 prefix = "r:" 85 86 def setUp(self): 87 self.tar = tarfile.open(self.tarname, mode=self.mode, 88 encoding="iso8859-1") 89 90 def tearDown(self): 91 self.tar.close() 92 93 94class UstarReadTest(ReadTest, unittest.TestCase): 95 96 def test_fileobj_regular_file(self): 97 tarinfo = self.tar.getmember("ustar/regtype") 98 with self.tar.extractfile(tarinfo) as fobj: 99 data = fobj.read() 100 self.assertEqual(len(data), tarinfo.size, 101 "regular file extraction failed") 102 self.assertEqual(sha256sum(data), sha256_regtype, 103 "regular file extraction failed") 104 105 def test_fileobj_readlines(self): 106 self.tar.extract("ustar/regtype", TEMPDIR) 107 tarinfo = self.tar.getmember("ustar/regtype") 108 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 109 lines1 = fobj1.readlines() 110 111 with self.tar.extractfile(tarinfo) as fobj: 112 fobj2 = io.TextIOWrapper(fobj) 113 lines2 = fobj2.readlines() 114 self.assertEqual(lines1, lines2, 115 "fileobj.readlines() failed") 116 self.assertEqual(len(lines2), 114, 117 "fileobj.readlines() failed") 118 self.assertEqual(lines2[83], 119 "I will gladly admit that Python is not the fastest " 120 "running scripting language.\n", 121 "fileobj.readlines() failed") 122 123 def test_fileobj_iter(self): 124 self.tar.extract("ustar/regtype", TEMPDIR) 125 tarinfo = self.tar.getmember("ustar/regtype") 126 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 127 lines1 = fobj1.readlines() 128 with self.tar.extractfile(tarinfo) as fobj2: 129 lines2 = list(io.TextIOWrapper(fobj2)) 130 self.assertEqual(lines1, lines2, 131 "fileobj.__iter__() failed") 132 133 def test_fileobj_seek(self): 134 self.tar.extract("ustar/regtype", TEMPDIR) 135 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 136 data = fobj.read() 137 138 tarinfo = self.tar.getmember("ustar/regtype") 139 with self.tar.extractfile(tarinfo) as fobj: 140 text = fobj.read() 141 fobj.seek(0) 142 self.assertEqual(0, fobj.tell(), 143 "seek() to file's start failed") 144 fobj.seek(2048, 0) 145 self.assertEqual(2048, fobj.tell(), 146 "seek() to absolute position failed") 147 fobj.seek(-1024, 1) 148 self.assertEqual(1024, fobj.tell(), 149 "seek() to negative relative position failed") 150 fobj.seek(1024, 1) 151 self.assertEqual(2048, fobj.tell(), 152 "seek() to positive relative position failed") 153 s = fobj.read(10) 154 self.assertEqual(s, data[2048:2058], 155 "read() after seek failed") 156 fobj.seek(0, 2) 157 self.assertEqual(tarinfo.size, fobj.tell(), 158 "seek() to file's end failed") 159 self.assertEqual(fobj.read(), b"", 160 "read() at file's end did not return empty string") 161 fobj.seek(-tarinfo.size, 2) 162 self.assertEqual(0, fobj.tell(), 163 "relative seek() to file's end failed") 164 fobj.seek(512) 165 s1 = fobj.readlines() 166 fobj.seek(512) 167 s2 = fobj.readlines() 168 self.assertEqual(s1, s2, 169 "readlines() after seek failed") 170 fobj.seek(0) 171 self.assertEqual(len(fobj.readline()), fobj.tell(), 172 "tell() after readline() failed") 173 fobj.seek(512) 174 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 175 "tell() after seek() and readline() failed") 176 fobj.seek(0) 177 line = fobj.readline() 178 self.assertEqual(fobj.read(), data[len(line):], 179 "read() after readline() failed") 180 181 def test_fileobj_text(self): 182 with self.tar.extractfile("ustar/regtype") as fobj: 183 fobj = io.TextIOWrapper(fobj) 184 data = fobj.read().encode("iso8859-1") 185 self.assertEqual(sha256sum(data), sha256_regtype) 186 try: 187 fobj.seek(100) 188 except AttributeError: 189 # Issue #13815: seek() complained about a missing 190 # flush() method. 191 self.fail("seeking failed in text mode") 192 193 # Test if symbolic and hard links are resolved by extractfile(). The 194 # test link members each point to a regular member whose data is 195 # supposed to be exported. 196 def _test_fileobj_link(self, lnktype, regtype): 197 with self.tar.extractfile(lnktype) as a, \ 198 self.tar.extractfile(regtype) as b: 199 self.assertEqual(a.name, b.name) 200 201 def test_fileobj_link1(self): 202 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 203 204 def test_fileobj_link2(self): 205 self._test_fileobj_link("./ustar/linktest2/lnktype", 206 "ustar/linktest1/regtype") 207 208 def test_fileobj_symlink1(self): 209 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 210 211 def test_fileobj_symlink2(self): 212 self._test_fileobj_link("./ustar/linktest2/symtype", 213 "ustar/linktest1/regtype") 214 215 def test_issue14160(self): 216 self._test_fileobj_link("symtype2", "ustar/regtype") 217 218class GzipUstarReadTest(GzipTest, UstarReadTest): 219 pass 220 221class Bz2UstarReadTest(Bz2Test, UstarReadTest): 222 pass 223 224class LzmaUstarReadTest(LzmaTest, UstarReadTest): 225 pass 226 227 228class ListTest(ReadTest, unittest.TestCase): 229 230 # Override setUp to use default encoding (UTF-8) 231 def setUp(self): 232 self.tar = tarfile.open(self.tarname, mode=self.mode) 233 234 def test_list(self): 235 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 236 with support.swap_attr(sys, 'stdout', tio): 237 self.tar.list(verbose=False) 238 out = tio.detach().getvalue() 239 self.assertIn(b'ustar/conttype', out) 240 self.assertIn(b'ustar/regtype', out) 241 self.assertIn(b'ustar/lnktype', out) 242 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 243 self.assertIn(b'./ustar/linktest2/symtype', out) 244 self.assertIn(b'./ustar/linktest2/lnktype', out) 245 # Make sure it puts trailing slash for directory 246 self.assertIn(b'ustar/dirtype/', out) 247 self.assertIn(b'ustar/dirtype-with-size/', out) 248 # Make sure it is able to print unencodable characters 249 def conv(b): 250 s = b.decode(self.tar.encoding, 'surrogateescape') 251 return s.encode('ascii', 'backslashreplace') 252 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 253 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 254 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 255 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 256 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 257 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 258 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 259 # Make sure it prints files separated by one newline without any 260 # 'ls -l'-like accessories if verbose flag is not being used 261 # ... 262 # ustar/conttype 263 # ustar/regtype 264 # ... 265 self.assertRegex(out, br'ustar/conttype ?\r?\n' 266 br'ustar/regtype ?\r?\n') 267 # Make sure it does not print the source of link without verbose flag 268 self.assertNotIn(b'link to', out) 269 self.assertNotIn(b'->', out) 270 271 def test_list_verbose(self): 272 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 273 with support.swap_attr(sys, 'stdout', tio): 274 self.tar.list(verbose=True) 275 out = tio.detach().getvalue() 276 # Make sure it prints files separated by one newline with 'ls -l'-like 277 # accessories if verbose flag is being used 278 # ... 279 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 280 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 281 # ... 282 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 283 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 284 br'ustar/\w+type ?\r?\n') * 2) 285 # Make sure it prints the source of link with verbose flag 286 self.assertIn(b'ustar/symtype -> regtype', out) 287 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 288 self.assertIn(b'./ustar/linktest2/lnktype link to ' 289 b'./ustar/linktest1/regtype', out) 290 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 291 (b'/123' * 125) + b'/longname', out) 292 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 293 (b'/123' * 125) + b'/longname', out) 294 295 def test_list_members(self): 296 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 297 def members(tar): 298 for tarinfo in tar.getmembers(): 299 if 'reg' in tarinfo.name: 300 yield tarinfo 301 with support.swap_attr(sys, 'stdout', tio): 302 self.tar.list(verbose=False, members=members(self.tar)) 303 out = tio.detach().getvalue() 304 self.assertIn(b'ustar/regtype', out) 305 self.assertNotIn(b'ustar/conttype', out) 306 307 308class GzipListTest(GzipTest, ListTest): 309 pass 310 311 312class Bz2ListTest(Bz2Test, ListTest): 313 pass 314 315 316class LzmaListTest(LzmaTest, ListTest): 317 pass 318 319 320class CommonReadTest(ReadTest): 321 322 def test_is_tarfile_erroneous(self): 323 with open(tmpname, "wb"): 324 pass 325 326 # is_tarfile works on filenames 327 self.assertFalse(tarfile.is_tarfile(tmpname)) 328 329 # is_tarfile works on path-like objects 330 self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname))) 331 332 # is_tarfile works on file objects 333 with open(tmpname, "rb") as fobj: 334 self.assertFalse(tarfile.is_tarfile(fobj)) 335 336 # is_tarfile works on file-like objects 337 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 338 339 def test_is_tarfile_valid(self): 340 # is_tarfile works on filenames 341 self.assertTrue(tarfile.is_tarfile(self.tarname)) 342 343 # is_tarfile works on path-like objects 344 self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname))) 345 346 # is_tarfile works on file objects 347 with open(self.tarname, "rb") as fobj: 348 self.assertTrue(tarfile.is_tarfile(fobj)) 349 350 # is_tarfile works on file-like objects 351 with open(self.tarname, "rb") as fobj: 352 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 353 354 def test_empty_tarfile(self): 355 # Test for issue6123: Allow opening empty archives. 356 # This test checks if tarfile.open() is able to open an empty tar 357 # archive successfully. Note that an empty tar archive is not the 358 # same as an empty file! 359 with tarfile.open(tmpname, self.mode.replace("r", "w")): 360 pass 361 try: 362 tar = tarfile.open(tmpname, self.mode) 363 tar.getnames() 364 except tarfile.ReadError: 365 self.fail("tarfile.open() failed on empty archive") 366 else: 367 self.assertListEqual(tar.getmembers(), []) 368 finally: 369 tar.close() 370 371 def test_non_existent_tarfile(self): 372 # Test for issue11513: prevent non-existent gzipped tarfiles raising 373 # multiple exceptions. 374 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 375 tarfile.open("xxx", self.mode) 376 377 def test_null_tarfile(self): 378 # Test for issue6123: Allow opening empty archives. 379 # This test guarantees that tarfile.open() does not treat an empty 380 # file as an empty tar archive. 381 with open(tmpname, "wb"): 382 pass 383 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 384 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 385 386 def test_ignore_zeros(self): 387 # Test TarFile's ignore_zeros option. 388 # generate 512 pseudorandom bytes 389 data = Random(0).randbytes(512) 390 for char in (b'\0', b'a'): 391 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 392 # are ignored correctly. 393 with self.open(tmpname, "w") as fobj: 394 fobj.write(char * 1024) 395 tarinfo = tarfile.TarInfo("foo") 396 tarinfo.size = len(data) 397 fobj.write(tarinfo.tobuf()) 398 fobj.write(data) 399 400 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 401 try: 402 self.assertListEqual(tar.getnames(), ["foo"], 403 "ignore_zeros=True should have skipped the %r-blocks" % 404 char) 405 finally: 406 tar.close() 407 408 def test_premature_end_of_archive(self): 409 for size in (512, 600, 1024, 1200): 410 with tarfile.open(tmpname, "w:") as tar: 411 t = tarfile.TarInfo("foo") 412 t.size = 1024 413 tar.addfile(t, io.BytesIO(b"a" * 1024)) 414 415 with open(tmpname, "r+b") as fobj: 416 fobj.truncate(size) 417 418 with tarfile.open(tmpname) as tar: 419 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 420 for t in tar: 421 pass 422 423 with tarfile.open(tmpname) as tar: 424 t = tar.next() 425 426 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 427 tar.extract(t, TEMPDIR) 428 429 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 430 tar.extractfile(t).read() 431 432 def test_length_zero_header(self): 433 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 434 # with an exception 435 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 436 with tarfile.open(support.findfile('recursion.tar')) as tar: 437 pass 438 439class MiscReadTestBase(CommonReadTest): 440 def requires_name_attribute(self): 441 pass 442 443 def test_no_name_argument(self): 444 self.requires_name_attribute() 445 with open(self.tarname, "rb") as fobj: 446 self.assertIsInstance(fobj.name, str) 447 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 448 self.assertIsInstance(tar.name, str) 449 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 450 451 def test_no_name_attribute(self): 452 with open(self.tarname, "rb") as fobj: 453 data = fobj.read() 454 fobj = io.BytesIO(data) 455 self.assertRaises(AttributeError, getattr, fobj, "name") 456 tar = tarfile.open(fileobj=fobj, mode=self.mode) 457 self.assertIsNone(tar.name) 458 459 def test_empty_name_attribute(self): 460 with open(self.tarname, "rb") as fobj: 461 data = fobj.read() 462 fobj = io.BytesIO(data) 463 fobj.name = "" 464 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 465 self.assertIsNone(tar.name) 466 467 def test_int_name_attribute(self): 468 # Issue 21044: tarfile.open() should handle fileobj with an integer 469 # 'name' attribute. 470 fd = os.open(self.tarname, os.O_RDONLY) 471 with open(fd, 'rb') as fobj: 472 self.assertIsInstance(fobj.name, int) 473 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 474 self.assertIsNone(tar.name) 475 476 def test_bytes_name_attribute(self): 477 self.requires_name_attribute() 478 tarname = os.fsencode(self.tarname) 479 with open(tarname, 'rb') as fobj: 480 self.assertIsInstance(fobj.name, bytes) 481 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 482 self.assertIsInstance(tar.name, bytes) 483 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 484 485 def test_pathlike_name(self): 486 tarname = pathlib.Path(self.tarname) 487 with tarfile.open(tarname, mode=self.mode) as tar: 488 self.assertIsInstance(tar.name, str) 489 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 490 with self.taropen(tarname) as tar: 491 self.assertIsInstance(tar.name, str) 492 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 493 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 494 self.assertIsInstance(tar.name, str) 495 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 496 if self.suffix == '': 497 with tarfile.TarFile(tarname, mode='r') as tar: 498 self.assertIsInstance(tar.name, str) 499 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 500 501 def test_illegal_mode_arg(self): 502 with open(tmpname, 'wb'): 503 pass 504 with self.assertRaisesRegex(ValueError, 'mode must be '): 505 tar = self.taropen(tmpname, 'q') 506 with self.assertRaisesRegex(ValueError, 'mode must be '): 507 tar = self.taropen(tmpname, 'rw') 508 with self.assertRaisesRegex(ValueError, 'mode must be '): 509 tar = self.taropen(tmpname, '') 510 511 def test_fileobj_with_offset(self): 512 # Skip the first member and store values from the second member 513 # of the testtar. 514 tar = tarfile.open(self.tarname, mode=self.mode) 515 try: 516 tar.next() 517 t = tar.next() 518 name = t.name 519 offset = t.offset 520 with tar.extractfile(t) as f: 521 data = f.read() 522 finally: 523 tar.close() 524 525 # Open the testtar and seek to the offset of the second member. 526 with self.open(self.tarname) as fobj: 527 fobj.seek(offset) 528 529 # Test if the tarfile starts with the second member. 530 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 531 t = tar.next() 532 self.assertEqual(t.name, name) 533 # Read to the end of fileobj and test if seeking back to the 534 # beginning works. 535 tar.getmembers() 536 self.assertEqual(tar.extractfile(t).read(), data, 537 "seek back did not work") 538 539 def test_fail_comp(self): 540 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 541 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 542 with open(tarname, "rb") as fobj: 543 self.assertRaises(tarfile.ReadError, tarfile.open, 544 fileobj=fobj, mode=self.mode) 545 546 def test_v7_dirtype(self): 547 # Test old style dirtype member (bug #1336623): 548 # Old V7 tars create directory members using an AREGTYPE 549 # header with a "/" appended to the filename field. 550 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 551 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 552 "v7 dirtype failed") 553 554 def test_xstar_type(self): 555 # The xstar format stores extra atime and ctime fields inside the 556 # space reserved for the prefix field. The prefix field must be 557 # ignored in this case, otherwise it will mess up the name. 558 try: 559 self.tar.getmember("misc/regtype-xstar") 560 except KeyError: 561 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 562 563 def test_check_members(self): 564 for tarinfo in self.tar: 565 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 566 "wrong mtime for %s" % tarinfo.name) 567 if not tarinfo.name.startswith("ustar/"): 568 continue 569 self.assertEqual(tarinfo.uname, "tarfile", 570 "wrong uname for %s" % tarinfo.name) 571 572 def test_find_members(self): 573 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 574 "could not find all members") 575 576 @unittest.skipUnless(hasattr(os, "link"), 577 "Missing hardlink implementation") 578 @support.skip_unless_symlink 579 def test_extract_hardlink(self): 580 # Test hardlink extraction (e.g. bug #857297). 581 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 582 tar.extract("ustar/regtype", TEMPDIR) 583 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 584 585 tar.extract("ustar/lnktype", TEMPDIR) 586 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 587 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 588 data = f.read() 589 self.assertEqual(sha256sum(data), sha256_regtype) 590 591 tar.extract("ustar/symtype", TEMPDIR) 592 self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 593 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 594 data = f.read() 595 self.assertEqual(sha256sum(data), sha256_regtype) 596 597 def test_extractall(self): 598 # Test if extractall() correctly restores directory permissions 599 # and times (see issue1735). 600 tar = tarfile.open(tarname, encoding="iso8859-1") 601 DIR = os.path.join(TEMPDIR, "extractall") 602 os.mkdir(DIR) 603 try: 604 directories = [t for t in tar if t.isdir()] 605 tar.extractall(DIR, directories) 606 for tarinfo in directories: 607 path = os.path.join(DIR, tarinfo.name) 608 if sys.platform != "win32": 609 # Win32 has no support for fine grained permissions. 610 self.assertEqual(tarinfo.mode & 0o777, 611 os.stat(path).st_mode & 0o777) 612 def format_mtime(mtime): 613 if isinstance(mtime, float): 614 return "{} ({})".format(mtime, mtime.hex()) 615 else: 616 return "{!r} (int)".format(mtime) 617 file_mtime = os.path.getmtime(path) 618 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 619 format_mtime(tarinfo.mtime), 620 format_mtime(file_mtime), 621 path) 622 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 623 finally: 624 tar.close() 625 support.rmtree(DIR) 626 627 def test_extract_directory(self): 628 dirtype = "ustar/dirtype" 629 DIR = os.path.join(TEMPDIR, "extractdir") 630 os.mkdir(DIR) 631 try: 632 with tarfile.open(tarname, encoding="iso8859-1") as tar: 633 tarinfo = tar.getmember(dirtype) 634 tar.extract(tarinfo, path=DIR) 635 extracted = os.path.join(DIR, dirtype) 636 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 637 if sys.platform != "win32": 638 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 639 finally: 640 support.rmtree(DIR) 641 642 def test_extractall_pathlike_name(self): 643 DIR = pathlib.Path(TEMPDIR) / "extractall" 644 with support.temp_dir(DIR), \ 645 tarfile.open(tarname, encoding="iso8859-1") as tar: 646 directories = [t for t in tar if t.isdir()] 647 tar.extractall(DIR, directories) 648 for tarinfo in directories: 649 path = DIR / tarinfo.name 650 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 651 652 def test_extract_pathlike_name(self): 653 dirtype = "ustar/dirtype" 654 DIR = pathlib.Path(TEMPDIR) / "extractall" 655 with support.temp_dir(DIR), \ 656 tarfile.open(tarname, encoding="iso8859-1") as tar: 657 tarinfo = tar.getmember(dirtype) 658 tar.extract(tarinfo, path=DIR) 659 extracted = DIR / dirtype 660 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 661 662 def test_init_close_fobj(self): 663 # Issue #7341: Close the internal file object in the TarFile 664 # constructor in case of an error. For the test we rely on 665 # the fact that opening an empty file raises a ReadError. 666 empty = os.path.join(TEMPDIR, "empty") 667 with open(empty, "wb") as fobj: 668 fobj.write(b"") 669 670 try: 671 tar = object.__new__(tarfile.TarFile) 672 try: 673 tar.__init__(empty) 674 except tarfile.ReadError: 675 self.assertTrue(tar.fileobj.closed) 676 else: 677 self.fail("ReadError not raised") 678 finally: 679 support.unlink(empty) 680 681 def test_parallel_iteration(self): 682 # Issue #16601: Restarting iteration over tarfile continued 683 # from where it left off. 684 with tarfile.open(self.tarname) as tar: 685 for m1, m2 in zip(tar, tar): 686 self.assertEqual(m1.offset, m2.offset) 687 self.assertEqual(m1.get_info(), m2.get_info()) 688 689class MiscReadTest(MiscReadTestBase, unittest.TestCase): 690 test_fail_comp = None 691 692class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 693 pass 694 695class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 696 def requires_name_attribute(self): 697 self.skipTest("BZ2File have no name attribute") 698 699class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 700 def requires_name_attribute(self): 701 self.skipTest("LZMAFile have no name attribute") 702 703 704class StreamReadTest(CommonReadTest, unittest.TestCase): 705 706 prefix="r|" 707 708 def test_read_through(self): 709 # Issue #11224: A poorly designed _FileInFile.read() method 710 # caused seeking errors with stream tar files. 711 for tarinfo in self.tar: 712 if not tarinfo.isreg(): 713 continue 714 with self.tar.extractfile(tarinfo) as fobj: 715 while True: 716 try: 717 buf = fobj.read(512) 718 except tarfile.StreamError: 719 self.fail("simple read-through using " 720 "TarFile.extractfile() failed") 721 if not buf: 722 break 723 724 def test_fileobj_regular_file(self): 725 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 726 with self.tar.extractfile(tarinfo) as fobj: 727 data = fobj.read() 728 self.assertEqual(len(data), tarinfo.size, 729 "regular file extraction failed") 730 self.assertEqual(sha256sum(data), sha256_regtype, 731 "regular file extraction failed") 732 733 def test_provoke_stream_error(self): 734 tarinfos = self.tar.getmembers() 735 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 736 self.assertRaises(tarfile.StreamError, f.read) 737 738 def test_compare_members(self): 739 tar1 = tarfile.open(tarname, encoding="iso8859-1") 740 try: 741 tar2 = self.tar 742 743 while True: 744 t1 = tar1.next() 745 t2 = tar2.next() 746 if t1 is None: 747 break 748 self.assertIsNotNone(t2, "stream.next() failed.") 749 750 if t2.islnk() or t2.issym(): 751 with self.assertRaises(tarfile.StreamError): 752 tar2.extractfile(t2) 753 continue 754 755 v1 = tar1.extractfile(t1) 756 v2 = tar2.extractfile(t2) 757 if v1 is None: 758 continue 759 self.assertIsNotNone(v2, "stream.extractfile() failed") 760 self.assertEqual(v1.read(), v2.read(), 761 "stream extraction failed") 762 finally: 763 tar1.close() 764 765class GzipStreamReadTest(GzipTest, StreamReadTest): 766 pass 767 768class Bz2StreamReadTest(Bz2Test, StreamReadTest): 769 pass 770 771class LzmaStreamReadTest(LzmaTest, StreamReadTest): 772 pass 773 774 775class DetectReadTest(TarTest, unittest.TestCase): 776 def _testfunc_file(self, name, mode): 777 try: 778 tar = tarfile.open(name, mode) 779 except tarfile.ReadError as e: 780 self.fail() 781 else: 782 tar.close() 783 784 def _testfunc_fileobj(self, name, mode): 785 try: 786 with open(name, "rb") as f: 787 tar = tarfile.open(name, mode, fileobj=f) 788 except tarfile.ReadError as e: 789 self.fail() 790 else: 791 tar.close() 792 793 def _test_modes(self, testfunc): 794 if self.suffix: 795 with self.assertRaises(tarfile.ReadError): 796 tarfile.open(tarname, mode="r:" + self.suffix) 797 with self.assertRaises(tarfile.ReadError): 798 tarfile.open(tarname, mode="r|" + self.suffix) 799 with self.assertRaises(tarfile.ReadError): 800 tarfile.open(self.tarname, mode="r:") 801 with self.assertRaises(tarfile.ReadError): 802 tarfile.open(self.tarname, mode="r|") 803 testfunc(self.tarname, "r") 804 testfunc(self.tarname, "r:" + self.suffix) 805 testfunc(self.tarname, "r:*") 806 testfunc(self.tarname, "r|" + self.suffix) 807 testfunc(self.tarname, "r|*") 808 809 def test_detect_file(self): 810 self._test_modes(self._testfunc_file) 811 812 def test_detect_fileobj(self): 813 self._test_modes(self._testfunc_fileobj) 814 815class GzipDetectReadTest(GzipTest, DetectReadTest): 816 pass 817 818class Bz2DetectReadTest(Bz2Test, DetectReadTest): 819 def test_detect_stream_bz2(self): 820 # Originally, tarfile's stream detection looked for the string 821 # "BZh91" at the start of the file. This is incorrect because 822 # the '9' represents the blocksize (900,000 bytes). If the file was 823 # compressed using another blocksize autodetection fails. 824 with open(tarname, "rb") as fobj: 825 data = fobj.read() 826 827 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 828 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 829 fobj.write(data) 830 831 self._testfunc_file(tmpname, "r|*") 832 833class LzmaDetectReadTest(LzmaTest, DetectReadTest): 834 pass 835 836 837class MemberReadTest(ReadTest, unittest.TestCase): 838 839 def _test_member(self, tarinfo, chksum=None, **kwargs): 840 if chksum is not None: 841 with self.tar.extractfile(tarinfo) as f: 842 self.assertEqual(sha256sum(f.read()), chksum, 843 "wrong sha256sum for %s" % tarinfo.name) 844 845 kwargs["mtime"] = 0o7606136617 846 kwargs["uid"] = 1000 847 kwargs["gid"] = 100 848 if "old-v7" not in tarinfo.name: 849 # V7 tar can't handle alphabetic owners. 850 kwargs["uname"] = "tarfile" 851 kwargs["gname"] = "tarfile" 852 for k, v in kwargs.items(): 853 self.assertEqual(getattr(tarinfo, k), v, 854 "wrong value in %s field of %s" % (k, tarinfo.name)) 855 856 def test_find_regtype(self): 857 tarinfo = self.tar.getmember("ustar/regtype") 858 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 859 860 def test_find_conttype(self): 861 tarinfo = self.tar.getmember("ustar/conttype") 862 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 863 864 def test_find_dirtype(self): 865 tarinfo = self.tar.getmember("ustar/dirtype") 866 self._test_member(tarinfo, size=0) 867 868 def test_find_dirtype_with_size(self): 869 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 870 self._test_member(tarinfo, size=255) 871 872 def test_find_lnktype(self): 873 tarinfo = self.tar.getmember("ustar/lnktype") 874 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 875 876 def test_find_symtype(self): 877 tarinfo = self.tar.getmember("ustar/symtype") 878 self._test_member(tarinfo, size=0, linkname="regtype") 879 880 def test_find_blktype(self): 881 tarinfo = self.tar.getmember("ustar/blktype") 882 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 883 884 def test_find_chrtype(self): 885 tarinfo = self.tar.getmember("ustar/chrtype") 886 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 887 888 def test_find_fifotype(self): 889 tarinfo = self.tar.getmember("ustar/fifotype") 890 self._test_member(tarinfo, size=0) 891 892 def test_find_sparse(self): 893 tarinfo = self.tar.getmember("ustar/sparse") 894 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 895 896 def test_find_gnusparse(self): 897 tarinfo = self.tar.getmember("gnu/sparse") 898 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 899 900 def test_find_gnusparse_00(self): 901 tarinfo = self.tar.getmember("gnu/sparse-0.0") 902 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 903 904 def test_find_gnusparse_01(self): 905 tarinfo = self.tar.getmember("gnu/sparse-0.1") 906 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 907 908 def test_find_gnusparse_10(self): 909 tarinfo = self.tar.getmember("gnu/sparse-1.0") 910 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 911 912 def test_find_umlauts(self): 913 tarinfo = self.tar.getmember("ustar/umlauts-" 914 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 915 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 916 917 def test_find_ustar_longname(self): 918 name = "ustar/" + "12345/" * 39 + "1234567/longname" 919 self.assertIn(name, self.tar.getnames()) 920 921 def test_find_regtype_oldv7(self): 922 tarinfo = self.tar.getmember("misc/regtype-old-v7") 923 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 924 925 def test_find_pax_umlauts(self): 926 self.tar.close() 927 self.tar = tarfile.open(self.tarname, mode=self.mode, 928 encoding="iso8859-1") 929 tarinfo = self.tar.getmember("pax/umlauts-" 930 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 931 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 932 933 934class LongnameTest: 935 936 def test_read_longname(self): 937 # Test reading of longname (bug #1471427). 938 longname = self.subdir + "/" + "123/" * 125 + "longname" 939 try: 940 tarinfo = self.tar.getmember(longname) 941 except KeyError: 942 self.fail("longname not found") 943 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 944 "read longname as dirtype") 945 946 def test_read_longlink(self): 947 longname = self.subdir + "/" + "123/" * 125 + "longname" 948 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 949 try: 950 tarinfo = self.tar.getmember(longlink) 951 except KeyError: 952 self.fail("longlink not found") 953 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 954 955 def test_truncated_longname(self): 956 longname = self.subdir + "/" + "123/" * 125 + "longname" 957 tarinfo = self.tar.getmember(longname) 958 offset = tarinfo.offset 959 self.tar.fileobj.seek(offset) 960 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 961 with self.assertRaises(tarfile.ReadError): 962 tarfile.open(name="foo.tar", fileobj=fobj) 963 964 def test_header_offset(self): 965 # Test if the start offset of the TarInfo object includes 966 # the preceding extended header. 967 longname = self.subdir + "/" + "123/" * 125 + "longname" 968 offset = self.tar.getmember(longname).offset 969 with open(tarname, "rb") as fobj: 970 fobj.seek(offset) 971 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 972 "iso8859-1", "strict") 973 self.assertEqual(tarinfo.type, self.longnametype) 974 975 976class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 977 978 subdir = "gnu" 979 longnametype = tarfile.GNUTYPE_LONGNAME 980 981 # Since 3.2 tarfile is supposed to accurately restore sparse members and 982 # produce files with holes. This is what we actually want to test here. 983 # Unfortunately, not all platforms/filesystems support sparse files, and 984 # even on platforms that do it is non-trivial to make reliable assertions 985 # about holes in files. Therefore, we first do one basic test which works 986 # an all platforms, and after that a test that will work only on 987 # platforms/filesystems that prove to support sparse files. 988 def _test_sparse_file(self, name): 989 self.tar.extract(name, TEMPDIR) 990 filename = os.path.join(TEMPDIR, name) 991 with open(filename, "rb") as fobj: 992 data = fobj.read() 993 self.assertEqual(sha256sum(data), sha256_sparse, 994 "wrong sha256sum for %s" % name) 995 996 if self._fs_supports_holes(): 997 s = os.stat(filename) 998 self.assertLess(s.st_blocks * 512, s.st_size) 999 1000 def test_sparse_file_old(self): 1001 self._test_sparse_file("gnu/sparse") 1002 1003 def test_sparse_file_00(self): 1004 self._test_sparse_file("gnu/sparse-0.0") 1005 1006 def test_sparse_file_01(self): 1007 self._test_sparse_file("gnu/sparse-0.1") 1008 1009 def test_sparse_file_10(self): 1010 self._test_sparse_file("gnu/sparse-1.0") 1011 1012 @staticmethod 1013 def _fs_supports_holes(): 1014 # Return True if the platform knows the st_blocks stat attribute and 1015 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1016 # store holes of 4 KiB in files. 1017 # 1018 # The function returns False if page size is larger than 4 KiB. 1019 # For example, ppc64 uses pages of 64 KiB. 1020 if sys.platform.startswith("linux"): 1021 # Linux evidentially has 512 byte st_blocks units. 1022 name = os.path.join(TEMPDIR, "sparse-test") 1023 with open(name, "wb") as fobj: 1024 # Seek to "punch a hole" of 4 KiB 1025 fobj.seek(4096) 1026 fobj.write(b'x' * 4096) 1027 fobj.truncate() 1028 s = os.stat(name) 1029 support.unlink(name) 1030 return (s.st_blocks * 512 < s.st_size) 1031 else: 1032 return False 1033 1034 1035class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1036 1037 subdir = "pax" 1038 longnametype = tarfile.XHDTYPE 1039 1040 def test_pax_global_headers(self): 1041 tar = tarfile.open(tarname, encoding="iso8859-1") 1042 try: 1043 tarinfo = tar.getmember("pax/regtype1") 1044 self.assertEqual(tarinfo.uname, "foo") 1045 self.assertEqual(tarinfo.gname, "bar") 1046 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1047 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1048 1049 tarinfo = tar.getmember("pax/regtype2") 1050 self.assertEqual(tarinfo.uname, "") 1051 self.assertEqual(tarinfo.gname, "bar") 1052 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1053 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1054 1055 tarinfo = tar.getmember("pax/regtype3") 1056 self.assertEqual(tarinfo.uname, "tarfile") 1057 self.assertEqual(tarinfo.gname, "tarfile") 1058 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1059 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1060 finally: 1061 tar.close() 1062 1063 def test_pax_number_fields(self): 1064 # All following number fields are read from the pax header. 1065 tar = tarfile.open(tarname, encoding="iso8859-1") 1066 try: 1067 tarinfo = tar.getmember("pax/regtype4") 1068 self.assertEqual(tarinfo.size, 7011) 1069 self.assertEqual(tarinfo.uid, 123) 1070 self.assertEqual(tarinfo.gid, 123) 1071 self.assertEqual(tarinfo.mtime, 1041808783.0) 1072 self.assertEqual(type(tarinfo.mtime), float) 1073 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1074 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1075 finally: 1076 tar.close() 1077 1078 1079class WriteTestBase(TarTest): 1080 # Put all write tests in here that are supposed to be tested 1081 # in all possible mode combinations. 1082 1083 def test_fileobj_no_close(self): 1084 fobj = io.BytesIO() 1085 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1086 tar.addfile(tarfile.TarInfo("foo")) 1087 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1088 # Issue #20238: Incomplete gzip output with mode="w:gz" 1089 data = fobj.getvalue() 1090 del tar 1091 support.gc_collect() 1092 self.assertFalse(fobj.closed) 1093 self.assertEqual(data, fobj.getvalue()) 1094 1095 def test_eof_marker(self): 1096 # Make sure an end of archive marker is written (two zero blocks). 1097 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1098 # So, we create an archive that has exactly 10240 bytes without the 1099 # marker, and has 20480 bytes once the marker is written. 1100 with tarfile.open(tmpname, self.mode) as tar: 1101 t = tarfile.TarInfo("foo") 1102 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1103 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1104 1105 with self.open(tmpname, "rb") as fobj: 1106 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1107 1108 1109class WriteTest(WriteTestBase, unittest.TestCase): 1110 1111 prefix = "w:" 1112 1113 def test_100_char_name(self): 1114 # The name field in a tar header stores strings of at most 100 chars. 1115 # If a string is shorter than 100 chars it has to be padded with '\0', 1116 # which implies that a string of exactly 100 chars is stored without 1117 # a trailing '\0'. 1118 name = "0123456789" * 10 1119 tar = tarfile.open(tmpname, self.mode) 1120 try: 1121 t = tarfile.TarInfo(name) 1122 tar.addfile(t) 1123 finally: 1124 tar.close() 1125 1126 tar = tarfile.open(tmpname) 1127 try: 1128 self.assertEqual(tar.getnames()[0], name, 1129 "failed to store 100 char filename") 1130 finally: 1131 tar.close() 1132 1133 def test_tar_size(self): 1134 # Test for bug #1013882. 1135 tar = tarfile.open(tmpname, self.mode) 1136 try: 1137 path = os.path.join(TEMPDIR, "file") 1138 with open(path, "wb") as fobj: 1139 fobj.write(b"aaa") 1140 tar.add(path) 1141 finally: 1142 tar.close() 1143 self.assertGreater(os.path.getsize(tmpname), 0, 1144 "tarfile is empty") 1145 1146 # The test_*_size tests test for bug #1167128. 1147 def test_file_size(self): 1148 tar = tarfile.open(tmpname, self.mode) 1149 try: 1150 path = os.path.join(TEMPDIR, "file") 1151 with open(path, "wb"): 1152 pass 1153 tarinfo = tar.gettarinfo(path) 1154 self.assertEqual(tarinfo.size, 0) 1155 1156 with open(path, "wb") as fobj: 1157 fobj.write(b"aaa") 1158 tarinfo = tar.gettarinfo(path) 1159 self.assertEqual(tarinfo.size, 3) 1160 finally: 1161 tar.close() 1162 1163 def test_directory_size(self): 1164 path = os.path.join(TEMPDIR, "directory") 1165 os.mkdir(path) 1166 try: 1167 tar = tarfile.open(tmpname, self.mode) 1168 try: 1169 tarinfo = tar.gettarinfo(path) 1170 self.assertEqual(tarinfo.size, 0) 1171 finally: 1172 tar.close() 1173 finally: 1174 support.rmdir(path) 1175 1176 # mock the following: 1177 # os.listdir: so we know that files are in the wrong order 1178 def test_ordered_recursion(self): 1179 path = os.path.join(TEMPDIR, "directory") 1180 os.mkdir(path) 1181 open(os.path.join(path, "1"), "a").close() 1182 open(os.path.join(path, "2"), "a").close() 1183 try: 1184 tar = tarfile.open(tmpname, self.mode) 1185 try: 1186 with unittest.mock.patch('os.listdir') as mock_listdir: 1187 mock_listdir.return_value = ["2", "1"] 1188 tar.add(path) 1189 paths = [] 1190 for m in tar.getmembers(): 1191 paths.append(os.path.split(m.name)[-1]) 1192 self.assertEqual(paths, ["directory", "1", "2"]); 1193 finally: 1194 tar.close() 1195 finally: 1196 support.unlink(os.path.join(path, "1")) 1197 support.unlink(os.path.join(path, "2")) 1198 support.rmdir(path) 1199 1200 def test_gettarinfo_pathlike_name(self): 1201 with tarfile.open(tmpname, self.mode) as tar: 1202 path = pathlib.Path(TEMPDIR) / "file" 1203 with open(path, "wb") as fobj: 1204 fobj.write(b"aaa") 1205 tarinfo = tar.gettarinfo(path) 1206 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1207 self.assertIsInstance(tarinfo.name, str) 1208 self.assertEqual(tarinfo.name, tarinfo2.name) 1209 self.assertEqual(tarinfo.size, 3) 1210 1211 @unittest.skipUnless(hasattr(os, "link"), 1212 "Missing hardlink implementation") 1213 def test_link_size(self): 1214 link = os.path.join(TEMPDIR, "link") 1215 target = os.path.join(TEMPDIR, "link_target") 1216 with open(target, "wb") as fobj: 1217 fobj.write(b"aaa") 1218 try: 1219 os.link(target, link) 1220 except PermissionError as e: 1221 self.skipTest('os.link(): %s' % e) 1222 try: 1223 tar = tarfile.open(tmpname, self.mode) 1224 try: 1225 # Record the link target in the inodes list. 1226 tar.gettarinfo(target) 1227 tarinfo = tar.gettarinfo(link) 1228 self.assertEqual(tarinfo.size, 0) 1229 finally: 1230 tar.close() 1231 finally: 1232 support.unlink(target) 1233 support.unlink(link) 1234 1235 @support.skip_unless_symlink 1236 def test_symlink_size(self): 1237 path = os.path.join(TEMPDIR, "symlink") 1238 os.symlink("link_target", path) 1239 try: 1240 tar = tarfile.open(tmpname, self.mode) 1241 try: 1242 tarinfo = tar.gettarinfo(path) 1243 self.assertEqual(tarinfo.size, 0) 1244 finally: 1245 tar.close() 1246 finally: 1247 support.unlink(path) 1248 1249 def test_add_self(self): 1250 # Test for #1257255. 1251 dstname = os.path.abspath(tmpname) 1252 tar = tarfile.open(tmpname, self.mode) 1253 try: 1254 self.assertEqual(tar.name, dstname, 1255 "archive name must be absolute") 1256 tar.add(dstname) 1257 self.assertEqual(tar.getnames(), [], 1258 "added the archive to itself") 1259 1260 with support.change_cwd(TEMPDIR): 1261 tar.add(dstname) 1262 self.assertEqual(tar.getnames(), [], 1263 "added the archive to itself") 1264 finally: 1265 tar.close() 1266 1267 def test_filter(self): 1268 tempdir = os.path.join(TEMPDIR, "filter") 1269 os.mkdir(tempdir) 1270 try: 1271 for name in ("foo", "bar", "baz"): 1272 name = os.path.join(tempdir, name) 1273 support.create_empty_file(name) 1274 1275 def filter(tarinfo): 1276 if os.path.basename(tarinfo.name) == "bar": 1277 return 1278 tarinfo.uid = 123 1279 tarinfo.uname = "foo" 1280 return tarinfo 1281 1282 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1283 try: 1284 tar.add(tempdir, arcname="empty_dir", filter=filter) 1285 finally: 1286 tar.close() 1287 1288 # Verify that filter is a keyword-only argument 1289 with self.assertRaises(TypeError): 1290 tar.add(tempdir, "empty_dir", True, None, filter) 1291 1292 tar = tarfile.open(tmpname, "r") 1293 try: 1294 for tarinfo in tar: 1295 self.assertEqual(tarinfo.uid, 123) 1296 self.assertEqual(tarinfo.uname, "foo") 1297 self.assertEqual(len(tar.getmembers()), 3) 1298 finally: 1299 tar.close() 1300 finally: 1301 support.rmtree(tempdir) 1302 1303 # Guarantee that stored pathnames are not modified. Don't 1304 # remove ./ or ../ or double slashes. Still make absolute 1305 # pathnames relative. 1306 # For details see bug #6054. 1307 def _test_pathname(self, path, cmp_path=None, dir=False): 1308 # Create a tarfile with an empty member named path 1309 # and compare the stored name with the original. 1310 foo = os.path.join(TEMPDIR, "foo") 1311 if not dir: 1312 support.create_empty_file(foo) 1313 else: 1314 os.mkdir(foo) 1315 1316 tar = tarfile.open(tmpname, self.mode) 1317 try: 1318 tar.add(foo, arcname=path) 1319 finally: 1320 tar.close() 1321 1322 tar = tarfile.open(tmpname, "r") 1323 try: 1324 t = tar.next() 1325 finally: 1326 tar.close() 1327 1328 if not dir: 1329 support.unlink(foo) 1330 else: 1331 support.rmdir(foo) 1332 1333 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1334 1335 1336 @support.skip_unless_symlink 1337 def test_extractall_symlinks(self): 1338 # Test if extractall works properly when tarfile contains symlinks 1339 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1340 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1341 os.mkdir(tempdir) 1342 try: 1343 source_file = os.path.join(tempdir,'source') 1344 target_file = os.path.join(tempdir,'symlink') 1345 with open(source_file,'w') as f: 1346 f.write('something\n') 1347 os.symlink(source_file, target_file) 1348 with tarfile.open(temparchive, 'w') as tar: 1349 tar.add(source_file, arcname="source") 1350 tar.add(target_file, arcname="symlink") 1351 # Let's extract it to the location which contains the symlink 1352 with tarfile.open(temparchive, errorlevel=2) as tar: 1353 # this should not raise OSError: [Errno 17] File exists 1354 try: 1355 tar.extractall(path=tempdir) 1356 except OSError: 1357 self.fail("extractall failed with symlinked files") 1358 finally: 1359 support.unlink(temparchive) 1360 support.rmtree(tempdir) 1361 1362 def test_pathnames(self): 1363 self._test_pathname("foo") 1364 self._test_pathname(os.path.join("foo", ".", "bar")) 1365 self._test_pathname(os.path.join("foo", "..", "bar")) 1366 self._test_pathname(os.path.join(".", "foo")) 1367 self._test_pathname(os.path.join(".", "foo", ".")) 1368 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1369 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1370 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1371 self._test_pathname(os.path.join("..", "foo")) 1372 self._test_pathname(os.path.join("..", "foo", "..")) 1373 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1374 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1375 1376 self._test_pathname("foo" + os.sep + os.sep + "bar") 1377 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1378 1379 def test_abs_pathnames(self): 1380 if sys.platform == "win32": 1381 self._test_pathname("C:\\foo", "foo") 1382 else: 1383 self._test_pathname("/foo", "foo") 1384 self._test_pathname("///foo", "foo") 1385 1386 def test_cwd(self): 1387 # Test adding the current working directory. 1388 with support.change_cwd(TEMPDIR): 1389 tar = tarfile.open(tmpname, self.mode) 1390 try: 1391 tar.add(".") 1392 finally: 1393 tar.close() 1394 1395 tar = tarfile.open(tmpname, "r") 1396 try: 1397 for t in tar: 1398 if t.name != ".": 1399 self.assertTrue(t.name.startswith("./"), t.name) 1400 finally: 1401 tar.close() 1402 1403 def test_open_nonwritable_fileobj(self): 1404 for exctype in OSError, EOFError, RuntimeError: 1405 class BadFile(io.BytesIO): 1406 first = True 1407 def write(self, data): 1408 if self.first: 1409 self.first = False 1410 raise exctype 1411 1412 f = BadFile() 1413 with self.assertRaises(exctype): 1414 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1415 format=tarfile.PAX_FORMAT, 1416 pax_headers={'non': 'empty'}) 1417 self.assertFalse(f.closed) 1418 1419 1420class GzipWriteTest(GzipTest, WriteTest): 1421 pass 1422 1423 1424class Bz2WriteTest(Bz2Test, WriteTest): 1425 pass 1426 1427 1428class LzmaWriteTest(LzmaTest, WriteTest): 1429 pass 1430 1431 1432class StreamWriteTest(WriteTestBase, unittest.TestCase): 1433 1434 prefix = "w|" 1435 decompressor = None 1436 1437 def test_stream_padding(self): 1438 # Test for bug #1543303. 1439 tar = tarfile.open(tmpname, self.mode) 1440 tar.close() 1441 if self.decompressor: 1442 dec = self.decompressor() 1443 with open(tmpname, "rb") as fobj: 1444 data = fobj.read() 1445 data = dec.decompress(data) 1446 self.assertFalse(dec.unused_data, "found trailing data") 1447 else: 1448 with self.open(tmpname) as fobj: 1449 data = fobj.read() 1450 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1451 "incorrect zero padding") 1452 1453 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1454 "Missing umask implementation") 1455 def test_file_mode(self): 1456 # Test for issue #8464: Create files with correct 1457 # permissions. 1458 if os.path.exists(tmpname): 1459 support.unlink(tmpname) 1460 1461 original_umask = os.umask(0o022) 1462 try: 1463 tar = tarfile.open(tmpname, self.mode) 1464 tar.close() 1465 mode = os.stat(tmpname).st_mode & 0o777 1466 self.assertEqual(mode, 0o644, "wrong file permissions") 1467 finally: 1468 os.umask(original_umask) 1469 1470 1471class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1472 def test_source_directory_not_leaked(self): 1473 """ 1474 Ensure the source directory is not included in the tar header 1475 per bpo-41316. 1476 """ 1477 tarfile.open(tmpname, self.mode).close() 1478 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1479 assert os.path.dirname(tmpname) not in payload 1480 1481 1482class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1483 decompressor = bz2.BZ2Decompressor if bz2 else None 1484 1485class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1486 decompressor = lzma.LZMADecompressor if lzma else None 1487 1488 1489class GNUWriteTest(unittest.TestCase): 1490 # This testcase checks for correct creation of GNU Longname 1491 # and Longlink extended headers (cp. bug #812325). 1492 1493 def _length(self, s): 1494 blocks = len(s) // 512 + 1 1495 return blocks * 512 1496 1497 def _calc_size(self, name, link=None): 1498 # Initial tar header 1499 count = 512 1500 1501 if len(name) > tarfile.LENGTH_NAME: 1502 # GNU longname extended header + longname 1503 count += 512 1504 count += self._length(name) 1505 if link is not None and len(link) > tarfile.LENGTH_LINK: 1506 # GNU longlink extended header + longlink 1507 count += 512 1508 count += self._length(link) 1509 return count 1510 1511 def _test(self, name, link=None): 1512 tarinfo = tarfile.TarInfo(name) 1513 if link: 1514 tarinfo.linkname = link 1515 tarinfo.type = tarfile.LNKTYPE 1516 1517 tar = tarfile.open(tmpname, "w") 1518 try: 1519 tar.format = tarfile.GNU_FORMAT 1520 tar.addfile(tarinfo) 1521 1522 v1 = self._calc_size(name, link) 1523 v2 = tar.offset 1524 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1525 finally: 1526 tar.close() 1527 1528 tar = tarfile.open(tmpname) 1529 try: 1530 member = tar.next() 1531 self.assertIsNotNone(member, 1532 "unable to read longname member") 1533 self.assertEqual(tarinfo.name, member.name, 1534 "unable to read longname member") 1535 self.assertEqual(tarinfo.linkname, member.linkname, 1536 "unable to read longname member") 1537 finally: 1538 tar.close() 1539 1540 def test_longname_1023(self): 1541 self._test(("longnam/" * 127) + "longnam") 1542 1543 def test_longname_1024(self): 1544 self._test(("longnam/" * 127) + "longname") 1545 1546 def test_longname_1025(self): 1547 self._test(("longnam/" * 127) + "longname_") 1548 1549 def test_longlink_1023(self): 1550 self._test("name", ("longlnk/" * 127) + "longlnk") 1551 1552 def test_longlink_1024(self): 1553 self._test("name", ("longlnk/" * 127) + "longlink") 1554 1555 def test_longlink_1025(self): 1556 self._test("name", ("longlnk/" * 127) + "longlink_") 1557 1558 def test_longnamelink_1023(self): 1559 self._test(("longnam/" * 127) + "longnam", 1560 ("longlnk/" * 127) + "longlnk") 1561 1562 def test_longnamelink_1024(self): 1563 self._test(("longnam/" * 127) + "longname", 1564 ("longlnk/" * 127) + "longlink") 1565 1566 def test_longnamelink_1025(self): 1567 self._test(("longnam/" * 127) + "longname_", 1568 ("longlnk/" * 127) + "longlink_") 1569 1570 1571class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1572 1573 prefix = "w:" 1574 1575 def test_headers_written_only_for_device_files(self): 1576 # Regression test for bpo-18819. 1577 tempdir = os.path.join(TEMPDIR, "device_header_test") 1578 os.mkdir(tempdir) 1579 try: 1580 tar = tarfile.open(tmpname, self.mode) 1581 try: 1582 input_blk = tarfile.TarInfo(name="my_block_device") 1583 input_reg = tarfile.TarInfo(name="my_regular_file") 1584 input_blk.type = tarfile.BLKTYPE 1585 input_reg.type = tarfile.REGTYPE 1586 tar.addfile(input_blk) 1587 tar.addfile(input_reg) 1588 finally: 1589 tar.close() 1590 1591 # devmajor and devminor should be *interpreted* as 0 in both... 1592 tar = tarfile.open(tmpname, "r") 1593 try: 1594 output_blk = tar.getmember("my_block_device") 1595 output_reg = tar.getmember("my_regular_file") 1596 finally: 1597 tar.close() 1598 self.assertEqual(output_blk.devmajor, 0) 1599 self.assertEqual(output_blk.devminor, 0) 1600 self.assertEqual(output_reg.devmajor, 0) 1601 self.assertEqual(output_reg.devminor, 0) 1602 1603 # ...but the fields should not actually be set on regular files: 1604 with open(tmpname, "rb") as infile: 1605 buf = infile.read() 1606 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1607 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1608 # See `struct posixheader` in GNU docs for byte offsets: 1609 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1610 device_headers = slice(329, 329 + 16) 1611 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1612 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1613 finally: 1614 support.rmtree(tempdir) 1615 1616 1617class CreateTest(WriteTestBase, unittest.TestCase): 1618 1619 prefix = "x:" 1620 1621 file_path = os.path.join(TEMPDIR, "spameggs42") 1622 1623 def setUp(self): 1624 support.unlink(tmpname) 1625 1626 @classmethod 1627 def setUpClass(cls): 1628 with open(cls.file_path, "wb") as fobj: 1629 fobj.write(b"aaa") 1630 1631 @classmethod 1632 def tearDownClass(cls): 1633 support.unlink(cls.file_path) 1634 1635 def test_create(self): 1636 with tarfile.open(tmpname, self.mode) as tobj: 1637 tobj.add(self.file_path) 1638 1639 with self.taropen(tmpname) as tobj: 1640 names = tobj.getnames() 1641 self.assertEqual(len(names), 1) 1642 self.assertIn('spameggs42', names[0]) 1643 1644 def test_create_existing(self): 1645 with tarfile.open(tmpname, self.mode) as tobj: 1646 tobj.add(self.file_path) 1647 1648 with self.assertRaises(FileExistsError): 1649 tobj = tarfile.open(tmpname, self.mode) 1650 1651 with self.taropen(tmpname) as tobj: 1652 names = tobj.getnames() 1653 self.assertEqual(len(names), 1) 1654 self.assertIn('spameggs42', names[0]) 1655 1656 def test_create_taropen(self): 1657 with self.taropen(tmpname, "x") as tobj: 1658 tobj.add(self.file_path) 1659 1660 with self.taropen(tmpname) as tobj: 1661 names = tobj.getnames() 1662 self.assertEqual(len(names), 1) 1663 self.assertIn('spameggs42', names[0]) 1664 1665 def test_create_existing_taropen(self): 1666 with self.taropen(tmpname, "x") as tobj: 1667 tobj.add(self.file_path) 1668 1669 with self.assertRaises(FileExistsError): 1670 with self.taropen(tmpname, "x"): 1671 pass 1672 1673 with self.taropen(tmpname) as tobj: 1674 names = tobj.getnames() 1675 self.assertEqual(len(names), 1) 1676 self.assertIn("spameggs42", names[0]) 1677 1678 def test_create_pathlike_name(self): 1679 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1680 self.assertIsInstance(tobj.name, str) 1681 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1682 tobj.add(pathlib.Path(self.file_path)) 1683 names = tobj.getnames() 1684 self.assertEqual(len(names), 1) 1685 self.assertIn('spameggs42', names[0]) 1686 1687 with self.taropen(tmpname) as tobj: 1688 names = tobj.getnames() 1689 self.assertEqual(len(names), 1) 1690 self.assertIn('spameggs42', names[0]) 1691 1692 def test_create_taropen_pathlike_name(self): 1693 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1694 self.assertIsInstance(tobj.name, str) 1695 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1696 tobj.add(pathlib.Path(self.file_path)) 1697 names = tobj.getnames() 1698 self.assertEqual(len(names), 1) 1699 self.assertIn('spameggs42', names[0]) 1700 1701 with self.taropen(tmpname) as tobj: 1702 names = tobj.getnames() 1703 self.assertEqual(len(names), 1) 1704 self.assertIn('spameggs42', names[0]) 1705 1706 1707class GzipCreateTest(GzipTest, CreateTest): 1708 pass 1709 1710 1711class Bz2CreateTest(Bz2Test, CreateTest): 1712 pass 1713 1714 1715class LzmaCreateTest(LzmaTest, CreateTest): 1716 pass 1717 1718 1719class CreateWithXModeTest(CreateTest): 1720 1721 prefix = "x" 1722 1723 test_create_taropen = None 1724 test_create_existing_taropen = None 1725 1726 1727@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1728class HardlinkTest(unittest.TestCase): 1729 # Test the creation of LNKTYPE (hardlink) members in an archive. 1730 1731 def setUp(self): 1732 self.foo = os.path.join(TEMPDIR, "foo") 1733 self.bar = os.path.join(TEMPDIR, "bar") 1734 1735 with open(self.foo, "wb") as fobj: 1736 fobj.write(b"foo") 1737 1738 try: 1739 os.link(self.foo, self.bar) 1740 except PermissionError as e: 1741 self.skipTest('os.link(): %s' % e) 1742 1743 self.tar = tarfile.open(tmpname, "w") 1744 self.tar.add(self.foo) 1745 1746 def tearDown(self): 1747 self.tar.close() 1748 support.unlink(self.foo) 1749 support.unlink(self.bar) 1750 1751 def test_add_twice(self): 1752 # The same name will be added as a REGTYPE every 1753 # time regardless of st_nlink. 1754 tarinfo = self.tar.gettarinfo(self.foo) 1755 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1756 "add file as regular failed") 1757 1758 def test_add_hardlink(self): 1759 tarinfo = self.tar.gettarinfo(self.bar) 1760 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1761 "add file as hardlink failed") 1762 1763 def test_dereference_hardlink(self): 1764 self.tar.dereference = True 1765 tarinfo = self.tar.gettarinfo(self.bar) 1766 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1767 "dereferencing hardlink failed") 1768 1769 1770class PaxWriteTest(GNUWriteTest): 1771 1772 def _test(self, name, link=None): 1773 # See GNUWriteTest. 1774 tarinfo = tarfile.TarInfo(name) 1775 if link: 1776 tarinfo.linkname = link 1777 tarinfo.type = tarfile.LNKTYPE 1778 1779 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1780 try: 1781 tar.addfile(tarinfo) 1782 finally: 1783 tar.close() 1784 1785 tar = tarfile.open(tmpname) 1786 try: 1787 if link: 1788 l = tar.getmembers()[0].linkname 1789 self.assertEqual(link, l, "PAX longlink creation failed") 1790 else: 1791 n = tar.getmembers()[0].name 1792 self.assertEqual(name, n, "PAX longname creation failed") 1793 finally: 1794 tar.close() 1795 1796 def test_pax_global_header(self): 1797 pax_headers = { 1798 "foo": "bar", 1799 "uid": "0", 1800 "mtime": "1.23", 1801 "test": "\xe4\xf6\xfc", 1802 "\xe4\xf6\xfc": "test"} 1803 1804 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1805 pax_headers=pax_headers) 1806 try: 1807 tar.addfile(tarfile.TarInfo("test")) 1808 finally: 1809 tar.close() 1810 1811 # Test if the global header was written correctly. 1812 tar = tarfile.open(tmpname, encoding="iso8859-1") 1813 try: 1814 self.assertEqual(tar.pax_headers, pax_headers) 1815 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1816 # Test if all the fields are strings. 1817 for key, val in tar.pax_headers.items(): 1818 self.assertIsNot(type(key), bytes) 1819 self.assertIsNot(type(val), bytes) 1820 if key in tarfile.PAX_NUMBER_FIELDS: 1821 try: 1822 tarfile.PAX_NUMBER_FIELDS[key](val) 1823 except (TypeError, ValueError): 1824 self.fail("unable to convert pax header field") 1825 finally: 1826 tar.close() 1827 1828 def test_pax_extended_header(self): 1829 # The fields from the pax header have priority over the 1830 # TarInfo. 1831 pax_headers = {"path": "foo", "uid": "123"} 1832 1833 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1834 encoding="iso8859-1") 1835 try: 1836 t = tarfile.TarInfo() 1837 t.name = "\xe4\xf6\xfc" # non-ASCII 1838 t.uid = 8**8 # too large 1839 t.pax_headers = pax_headers 1840 tar.addfile(t) 1841 finally: 1842 tar.close() 1843 1844 tar = tarfile.open(tmpname, encoding="iso8859-1") 1845 try: 1846 t = tar.getmembers()[0] 1847 self.assertEqual(t.pax_headers, pax_headers) 1848 self.assertEqual(t.name, "foo") 1849 self.assertEqual(t.uid, 123) 1850 finally: 1851 tar.close() 1852 1853 1854class UnicodeTest: 1855 1856 def test_iso8859_1_filename(self): 1857 self._test_unicode_filename("iso8859-1") 1858 1859 def test_utf7_filename(self): 1860 self._test_unicode_filename("utf7") 1861 1862 def test_utf8_filename(self): 1863 self._test_unicode_filename("utf-8") 1864 1865 def _test_unicode_filename(self, encoding): 1866 tar = tarfile.open(tmpname, "w", format=self.format, 1867 encoding=encoding, errors="strict") 1868 try: 1869 name = "\xe4\xf6\xfc" 1870 tar.addfile(tarfile.TarInfo(name)) 1871 finally: 1872 tar.close() 1873 1874 tar = tarfile.open(tmpname, encoding=encoding) 1875 try: 1876 self.assertEqual(tar.getmembers()[0].name, name) 1877 finally: 1878 tar.close() 1879 1880 def test_unicode_filename_error(self): 1881 tar = tarfile.open(tmpname, "w", format=self.format, 1882 encoding="ascii", errors="strict") 1883 try: 1884 tarinfo = tarfile.TarInfo() 1885 1886 tarinfo.name = "\xe4\xf6\xfc" 1887 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1888 1889 tarinfo.name = "foo" 1890 tarinfo.uname = "\xe4\xf6\xfc" 1891 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1892 finally: 1893 tar.close() 1894 1895 def test_unicode_argument(self): 1896 tar = tarfile.open(tarname, "r", 1897 encoding="iso8859-1", errors="strict") 1898 try: 1899 for t in tar: 1900 self.assertIs(type(t.name), str) 1901 self.assertIs(type(t.linkname), str) 1902 self.assertIs(type(t.uname), str) 1903 self.assertIs(type(t.gname), str) 1904 finally: 1905 tar.close() 1906 1907 def test_uname_unicode(self): 1908 t = tarfile.TarInfo("foo") 1909 t.uname = "\xe4\xf6\xfc" 1910 t.gname = "\xe4\xf6\xfc" 1911 1912 tar = tarfile.open(tmpname, mode="w", format=self.format, 1913 encoding="iso8859-1") 1914 try: 1915 tar.addfile(t) 1916 finally: 1917 tar.close() 1918 1919 tar = tarfile.open(tmpname, encoding="iso8859-1") 1920 try: 1921 t = tar.getmember("foo") 1922 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1923 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1924 1925 if self.format != tarfile.PAX_FORMAT: 1926 tar.close() 1927 tar = tarfile.open(tmpname, encoding="ascii") 1928 t = tar.getmember("foo") 1929 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 1930 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 1931 finally: 1932 tar.close() 1933 1934 1935class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 1936 1937 format = tarfile.USTAR_FORMAT 1938 1939 # Test whether the utf-8 encoded version of a filename exceeds the 100 1940 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 1941 # bytes). 1942 def test_unicode_name1(self): 1943 self._test_ustar_name("0123456789" * 10) 1944 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 1945 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 1946 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 1947 1948 def test_unicode_name2(self): 1949 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 1950 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 1951 1952 # Test whether the utf-8 encoded version of a filename exceeds the 155 1953 # bytes prefix + '/' + 100 bytes name limit. 1954 def test_unicode_longname1(self): 1955 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 1956 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 1957 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 1958 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 1959 1960 def test_unicode_longname2(self): 1961 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 1962 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 1963 1964 def test_unicode_longname3(self): 1965 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 1966 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 1967 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 1968 1969 def test_unicode_longname4(self): 1970 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 1971 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 1972 1973 def _test_ustar_name(self, name, exc=None): 1974 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 1975 t = tarfile.TarInfo(name) 1976 if exc is None: 1977 tar.addfile(t) 1978 else: 1979 self.assertRaises(exc, tar.addfile, t) 1980 1981 if exc is None: 1982 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 1983 for t in tar: 1984 self.assertEqual(name, t.name) 1985 break 1986 1987 # Test the same as above for the 100 bytes link field. 1988 def test_unicode_link1(self): 1989 self._test_ustar_link("0123456789" * 10) 1990 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 1991 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 1992 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 1993 1994 def test_unicode_link2(self): 1995 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 1996 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 1997 1998 def _test_ustar_link(self, name, exc=None): 1999 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2000 t = tarfile.TarInfo("foo") 2001 t.linkname = name 2002 if exc is None: 2003 tar.addfile(t) 2004 else: 2005 self.assertRaises(exc, tar.addfile, t) 2006 2007 if exc is None: 2008 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2009 for t in tar: 2010 self.assertEqual(name, t.linkname) 2011 break 2012 2013 2014class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2015 2016 format = tarfile.GNU_FORMAT 2017 2018 def test_bad_pax_header(self): 2019 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2020 # without a hdrcharset=BINARY header. 2021 for encoding, name in ( 2022 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2023 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2024 with tarfile.open(tarname, encoding=encoding, 2025 errors="surrogateescape") as tar: 2026 try: 2027 t = tar.getmember(name) 2028 except KeyError: 2029 self.fail("unable to read bad GNU tar pax header") 2030 2031 2032class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2033 2034 format = tarfile.PAX_FORMAT 2035 2036 # PAX_FORMAT ignores encoding in write mode. 2037 test_unicode_filename_error = None 2038 2039 def test_binary_header(self): 2040 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2041 for encoding, name in ( 2042 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2043 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2044 with tarfile.open(tarname, encoding=encoding, 2045 errors="surrogateescape") as tar: 2046 try: 2047 t = tar.getmember(name) 2048 except KeyError: 2049 self.fail("unable to read POSIX.1-2008 binary header") 2050 2051 2052class AppendTestBase: 2053 # Test append mode (cp. patch #1652681). 2054 2055 def setUp(self): 2056 self.tarname = tmpname 2057 if os.path.exists(self.tarname): 2058 support.unlink(self.tarname) 2059 2060 def _create_testtar(self, mode="w:"): 2061 with tarfile.open(tarname, encoding="iso8859-1") as src: 2062 t = src.getmember("ustar/regtype") 2063 t.name = "foo" 2064 with src.extractfile(t) as f: 2065 with tarfile.open(self.tarname, mode) as tar: 2066 tar.addfile(t, f) 2067 2068 def test_append_compressed(self): 2069 self._create_testtar("w:" + self.suffix) 2070 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2071 2072class AppendTest(AppendTestBase, unittest.TestCase): 2073 test_append_compressed = None 2074 2075 def _add_testfile(self, fileobj=None): 2076 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2077 tar.addfile(tarfile.TarInfo("bar")) 2078 2079 def _test(self, names=["bar"], fileobj=None): 2080 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2081 self.assertEqual(tar.getnames(), names) 2082 2083 def test_non_existing(self): 2084 self._add_testfile() 2085 self._test() 2086 2087 def test_empty(self): 2088 tarfile.open(self.tarname, "w:").close() 2089 self._add_testfile() 2090 self._test() 2091 2092 def test_empty_fileobj(self): 2093 fobj = io.BytesIO(b"\0" * 1024) 2094 self._add_testfile(fobj) 2095 fobj.seek(0) 2096 self._test(fileobj=fobj) 2097 2098 def test_fileobj(self): 2099 self._create_testtar() 2100 with open(self.tarname, "rb") as fobj: 2101 data = fobj.read() 2102 fobj = io.BytesIO(data) 2103 self._add_testfile(fobj) 2104 fobj.seek(0) 2105 self._test(names=["foo", "bar"], fileobj=fobj) 2106 2107 def test_existing(self): 2108 self._create_testtar() 2109 self._add_testfile() 2110 self._test(names=["foo", "bar"]) 2111 2112 # Append mode is supposed to fail if the tarfile to append to 2113 # does not end with a zero block. 2114 def _test_error(self, data): 2115 with open(self.tarname, "wb") as fobj: 2116 fobj.write(data) 2117 self.assertRaises(tarfile.ReadError, self._add_testfile) 2118 2119 def test_null(self): 2120 self._test_error(b"") 2121 2122 def test_incomplete(self): 2123 self._test_error(b"\0" * 13) 2124 2125 def test_premature_eof(self): 2126 data = tarfile.TarInfo("foo").tobuf() 2127 self._test_error(data) 2128 2129 def test_trailing_garbage(self): 2130 data = tarfile.TarInfo("foo").tobuf() 2131 self._test_error(data + b"\0" * 13) 2132 2133 def test_invalid(self): 2134 self._test_error(b"a" * 512) 2135 2136class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2137 pass 2138 2139class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2140 pass 2141 2142class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2143 pass 2144 2145 2146class LimitsTest(unittest.TestCase): 2147 2148 def test_ustar_limits(self): 2149 # 100 char name 2150 tarinfo = tarfile.TarInfo("0123456789" * 10) 2151 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2152 2153 # 101 char name that cannot be stored 2154 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2155 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2156 2157 # 256 char name with a slash at pos 156 2158 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2159 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2160 2161 # 256 char name that cannot be stored 2162 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2163 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2164 2165 # 512 char name 2166 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2167 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2168 2169 # 512 char linkname 2170 tarinfo = tarfile.TarInfo("longlink") 2171 tarinfo.linkname = "123/" * 126 + "longname" 2172 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2173 2174 # uid > 8 digits 2175 tarinfo = tarfile.TarInfo("name") 2176 tarinfo.uid = 0o10000000 2177 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2178 2179 def test_gnu_limits(self): 2180 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2181 tarinfo.tobuf(tarfile.GNU_FORMAT) 2182 2183 tarinfo = tarfile.TarInfo("longlink") 2184 tarinfo.linkname = "123/" * 126 + "longname" 2185 tarinfo.tobuf(tarfile.GNU_FORMAT) 2186 2187 # uid >= 256 ** 7 2188 tarinfo = tarfile.TarInfo("name") 2189 tarinfo.uid = 0o4000000000000000000 2190 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2191 2192 def test_pax_limits(self): 2193 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2194 tarinfo.tobuf(tarfile.PAX_FORMAT) 2195 2196 tarinfo = tarfile.TarInfo("longlink") 2197 tarinfo.linkname = "123/" * 126 + "longname" 2198 tarinfo.tobuf(tarfile.PAX_FORMAT) 2199 2200 tarinfo = tarfile.TarInfo("name") 2201 tarinfo.uid = 0o4000000000000000000 2202 tarinfo.tobuf(tarfile.PAX_FORMAT) 2203 2204 2205class MiscTest(unittest.TestCase): 2206 2207 def test_char_fields(self): 2208 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2209 b"foo\0\0\0\0\0") 2210 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2211 b"foo") 2212 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2213 "foo") 2214 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2215 "foo") 2216 2217 def test_read_number_fields(self): 2218 # Issue 13158: Test if GNU tar specific base-256 number fields 2219 # are decoded correctly. 2220 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2221 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2222 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2223 0o10000000) 2224 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2225 0xffffffff) 2226 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2227 -1) 2228 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2229 -100) 2230 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2231 -0x100000000000000) 2232 2233 # Issue 24514: Test if empty number fields are converted to zero. 2234 self.assertEqual(tarfile.nti(b"\0"), 0) 2235 self.assertEqual(tarfile.nti(b" \0"), 0) 2236 2237 def test_write_number_fields(self): 2238 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2239 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2240 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2241 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2242 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2243 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2244 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2245 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2246 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2247 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2248 self.assertEqual(tarfile.itn(-0x100000000000000, 2249 format=tarfile.GNU_FORMAT), 2250 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2251 2252 # Issue 32713: Test if itn() supports float values outside the 2253 # non-GNU format range 2254 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2255 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2256 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2257 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2258 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2259 2260 def test_number_field_limits(self): 2261 with self.assertRaises(ValueError): 2262 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2263 with self.assertRaises(ValueError): 2264 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2265 with self.assertRaises(ValueError): 2266 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2267 with self.assertRaises(ValueError): 2268 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2269 2270 def test__all__(self): 2271 blacklist = {'version', 'grp', 'pwd', 'symlink_exception', 2272 'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC', 2273 'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK', 2274 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2275 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 2276 'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 2277 'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 2278 'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES', 2279 'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS', 2280 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj', 2281 'filemode', 2282 'EmptyHeaderError', 'TruncatedHeaderError', 2283 'EOFHeaderError', 'InvalidHeaderError', 2284 'SubsequentHeaderError', 'ExFileObject', 2285 'main'} 2286 support.check__all__(self, tarfile, blacklist=blacklist) 2287 2288 2289class CommandLineTest(unittest.TestCase): 2290 2291 def tarfilecmd(self, *args, **kwargs): 2292 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2293 **kwargs) 2294 return out.replace(os.linesep.encode(), b'\n') 2295 2296 def tarfilecmd_failure(self, *args): 2297 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2298 2299 def make_simple_tarfile(self, tar_name): 2300 files = [support.findfile('tokenize_tests.txt'), 2301 support.findfile('tokenize_tests-no-coding-cookie-' 2302 'and-utf8-bom-sig-only.txt')] 2303 self.addCleanup(support.unlink, tar_name) 2304 with tarfile.open(tar_name, 'w') as tf: 2305 for tardata in files: 2306 tf.add(tardata, arcname=os.path.basename(tardata)) 2307 2308 def test_bad_use(self): 2309 rc, out, err = self.tarfilecmd_failure() 2310 self.assertEqual(out, b'') 2311 self.assertIn(b'usage', err.lower()) 2312 self.assertIn(b'error', err.lower()) 2313 self.assertIn(b'required', err.lower()) 2314 rc, out, err = self.tarfilecmd_failure('-l', '') 2315 self.assertEqual(out, b'') 2316 self.assertNotEqual(err.strip(), b'') 2317 2318 def test_test_command(self): 2319 for tar_name in testtarnames: 2320 for opt in '-t', '--test': 2321 out = self.tarfilecmd(opt, tar_name) 2322 self.assertEqual(out, b'') 2323 2324 def test_test_command_verbose(self): 2325 for tar_name in testtarnames: 2326 for opt in '-v', '--verbose': 2327 out = self.tarfilecmd(opt, '-t', tar_name, 2328 PYTHONIOENCODING='utf-8') 2329 self.assertIn(b'is a tar archive.\n', out) 2330 2331 def test_test_command_invalid_file(self): 2332 zipname = support.findfile('zipdir.zip') 2333 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2334 self.assertIn(b' is not a tar archive.', err) 2335 self.assertEqual(out, b'') 2336 self.assertEqual(rc, 1) 2337 2338 for tar_name in testtarnames: 2339 with self.subTest(tar_name=tar_name): 2340 with open(tar_name, 'rb') as f: 2341 data = f.read() 2342 try: 2343 with open(tmpname, 'wb') as f: 2344 f.write(data[:511]) 2345 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2346 self.assertEqual(out, b'') 2347 self.assertEqual(rc, 1) 2348 finally: 2349 support.unlink(tmpname) 2350 2351 def test_list_command(self): 2352 for tar_name in testtarnames: 2353 with support.captured_stdout() as t: 2354 with tarfile.open(tar_name, 'r') as tf: 2355 tf.list(verbose=False) 2356 expected = t.getvalue().encode('ascii', 'backslashreplace') 2357 for opt in '-l', '--list': 2358 out = self.tarfilecmd(opt, tar_name, 2359 PYTHONIOENCODING='ascii') 2360 self.assertEqual(out, expected) 2361 2362 def test_list_command_verbose(self): 2363 for tar_name in testtarnames: 2364 with support.captured_stdout() as t: 2365 with tarfile.open(tar_name, 'r') as tf: 2366 tf.list(verbose=True) 2367 expected = t.getvalue().encode('ascii', 'backslashreplace') 2368 for opt in '-v', '--verbose': 2369 out = self.tarfilecmd(opt, '-l', tar_name, 2370 PYTHONIOENCODING='ascii') 2371 self.assertEqual(out, expected) 2372 2373 def test_list_command_invalid_file(self): 2374 zipname = support.findfile('zipdir.zip') 2375 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2376 self.assertIn(b' is not a tar archive.', err) 2377 self.assertEqual(out, b'') 2378 self.assertEqual(rc, 1) 2379 2380 def test_create_command(self): 2381 files = [support.findfile('tokenize_tests.txt'), 2382 support.findfile('tokenize_tests-no-coding-cookie-' 2383 'and-utf8-bom-sig-only.txt')] 2384 for opt in '-c', '--create': 2385 try: 2386 out = self.tarfilecmd(opt, tmpname, *files) 2387 self.assertEqual(out, b'') 2388 with tarfile.open(tmpname) as tar: 2389 tar.getmembers() 2390 finally: 2391 support.unlink(tmpname) 2392 2393 def test_create_command_verbose(self): 2394 files = [support.findfile('tokenize_tests.txt'), 2395 support.findfile('tokenize_tests-no-coding-cookie-' 2396 'and-utf8-bom-sig-only.txt')] 2397 for opt in '-v', '--verbose': 2398 try: 2399 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2400 PYTHONIOENCODING='utf-8') 2401 self.assertIn(b' file created.', out) 2402 with tarfile.open(tmpname) as tar: 2403 tar.getmembers() 2404 finally: 2405 support.unlink(tmpname) 2406 2407 def test_create_command_dotless_filename(self): 2408 files = [support.findfile('tokenize_tests.txt')] 2409 try: 2410 out = self.tarfilecmd('-c', dotlessname, *files) 2411 self.assertEqual(out, b'') 2412 with tarfile.open(dotlessname) as tar: 2413 tar.getmembers() 2414 finally: 2415 support.unlink(dotlessname) 2416 2417 def test_create_command_dot_started_filename(self): 2418 tar_name = os.path.join(TEMPDIR, ".testtar") 2419 files = [support.findfile('tokenize_tests.txt')] 2420 try: 2421 out = self.tarfilecmd('-c', tar_name, *files) 2422 self.assertEqual(out, b'') 2423 with tarfile.open(tar_name) as tar: 2424 tar.getmembers() 2425 finally: 2426 support.unlink(tar_name) 2427 2428 def test_create_command_compressed(self): 2429 files = [support.findfile('tokenize_tests.txt'), 2430 support.findfile('tokenize_tests-no-coding-cookie-' 2431 'and-utf8-bom-sig-only.txt')] 2432 for filetype in (GzipTest, Bz2Test, LzmaTest): 2433 if not filetype.open: 2434 continue 2435 try: 2436 tar_name = tmpname + '.' + filetype.suffix 2437 out = self.tarfilecmd('-c', tar_name, *files) 2438 with filetype.taropen(tar_name) as tar: 2439 tar.getmembers() 2440 finally: 2441 support.unlink(tar_name) 2442 2443 def test_extract_command(self): 2444 self.make_simple_tarfile(tmpname) 2445 for opt in '-e', '--extract': 2446 try: 2447 with support.temp_cwd(tarextdir): 2448 out = self.tarfilecmd(opt, tmpname) 2449 self.assertEqual(out, b'') 2450 finally: 2451 support.rmtree(tarextdir) 2452 2453 def test_extract_command_verbose(self): 2454 self.make_simple_tarfile(tmpname) 2455 for opt in '-v', '--verbose': 2456 try: 2457 with support.temp_cwd(tarextdir): 2458 out = self.tarfilecmd(opt, '-e', tmpname, 2459 PYTHONIOENCODING='utf-8') 2460 self.assertIn(b' file is extracted.', out) 2461 finally: 2462 support.rmtree(tarextdir) 2463 2464 def test_extract_command_different_directory(self): 2465 self.make_simple_tarfile(tmpname) 2466 try: 2467 with support.temp_cwd(tarextdir): 2468 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2469 self.assertEqual(out, b'') 2470 finally: 2471 support.rmtree(tarextdir) 2472 2473 def test_extract_command_invalid_file(self): 2474 zipname = support.findfile('zipdir.zip') 2475 with support.temp_cwd(tarextdir): 2476 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2477 self.assertIn(b' is not a tar archive.', err) 2478 self.assertEqual(out, b'') 2479 self.assertEqual(rc, 1) 2480 2481 2482class ContextManagerTest(unittest.TestCase): 2483 2484 def test_basic(self): 2485 with tarfile.open(tarname) as tar: 2486 self.assertFalse(tar.closed, "closed inside runtime context") 2487 self.assertTrue(tar.closed, "context manager failed") 2488 2489 def test_closed(self): 2490 # The __enter__() method is supposed to raise OSError 2491 # if the TarFile object is already closed. 2492 tar = tarfile.open(tarname) 2493 tar.close() 2494 with self.assertRaises(OSError): 2495 with tar: 2496 pass 2497 2498 def test_exception(self): 2499 # Test if the OSError exception is passed through properly. 2500 with self.assertRaises(Exception) as exc: 2501 with tarfile.open(tarname) as tar: 2502 raise OSError 2503 self.assertIsInstance(exc.exception, OSError, 2504 "wrong exception raised in context manager") 2505 self.assertTrue(tar.closed, "context manager failed") 2506 2507 def test_no_eof(self): 2508 # __exit__() must not write end-of-archive blocks if an 2509 # exception was raised. 2510 try: 2511 with tarfile.open(tmpname, "w") as tar: 2512 raise Exception 2513 except: 2514 pass 2515 self.assertEqual(os.path.getsize(tmpname), 0, 2516 "context manager wrote an end-of-archive block") 2517 self.assertTrue(tar.closed, "context manager failed") 2518 2519 def test_eof(self): 2520 # __exit__() must write end-of-archive blocks, i.e. call 2521 # TarFile.close() if there was no error. 2522 with tarfile.open(tmpname, "w"): 2523 pass 2524 self.assertNotEqual(os.path.getsize(tmpname), 0, 2525 "context manager wrote no end-of-archive block") 2526 2527 def test_fileobj(self): 2528 # Test that __exit__() did not close the external file 2529 # object. 2530 with open(tmpname, "wb") as fobj: 2531 try: 2532 with tarfile.open(fileobj=fobj, mode="w") as tar: 2533 raise Exception 2534 except: 2535 pass 2536 self.assertFalse(fobj.closed, "external file object was closed") 2537 self.assertTrue(tar.closed, "context manager failed") 2538 2539 2540@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2541class LinkEmulationTest(ReadTest, unittest.TestCase): 2542 2543 # Test for issue #8741 regression. On platforms that do not support 2544 # symbolic or hard links tarfile tries to extract these types of members 2545 # as the regular files they point to. 2546 def _test_link_extraction(self, name): 2547 self.tar.extract(name, TEMPDIR) 2548 with open(os.path.join(TEMPDIR, name), "rb") as f: 2549 data = f.read() 2550 self.assertEqual(sha256sum(data), sha256_regtype) 2551 2552 # See issues #1578269, #8879, and #17689 for some history on these skips 2553 @unittest.skipIf(hasattr(os.path, "islink"), 2554 "Skip emulation - has os.path.islink but not os.link") 2555 def test_hardlink_extraction1(self): 2556 self._test_link_extraction("ustar/lnktype") 2557 2558 @unittest.skipIf(hasattr(os.path, "islink"), 2559 "Skip emulation - has os.path.islink but not os.link") 2560 def test_hardlink_extraction2(self): 2561 self._test_link_extraction("./ustar/linktest2/lnktype") 2562 2563 @unittest.skipIf(hasattr(os, "symlink"), 2564 "Skip emulation if symlink exists") 2565 def test_symlink_extraction1(self): 2566 self._test_link_extraction("ustar/symtype") 2567 2568 @unittest.skipIf(hasattr(os, "symlink"), 2569 "Skip emulation if symlink exists") 2570 def test_symlink_extraction2(self): 2571 self._test_link_extraction("./ustar/linktest2/symtype") 2572 2573 2574class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2575 # Issue5068: The _BZ2Proxy.read() method loops forever 2576 # on an empty or partial bzipped file. 2577 2578 def _test_partial_input(self, mode): 2579 class MyBytesIO(io.BytesIO): 2580 hit_eof = False 2581 def read(self, n): 2582 if self.hit_eof: 2583 raise AssertionError("infinite loop detected in " 2584 "tarfile.open()") 2585 self.hit_eof = self.tell() == len(self.getvalue()) 2586 return super(MyBytesIO, self).read(n) 2587 def seek(self, *args): 2588 self.hit_eof = False 2589 return super(MyBytesIO, self).seek(*args) 2590 2591 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2592 for x in range(len(data) + 1): 2593 try: 2594 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2595 except tarfile.ReadError: 2596 pass # we have no interest in ReadErrors 2597 2598 def test_partial_input(self): 2599 self._test_partial_input("r") 2600 2601 def test_partial_input_bz2(self): 2602 self._test_partial_input("r:bz2") 2603 2604 2605def root_is_uid_gid_0(): 2606 try: 2607 import pwd, grp 2608 except ImportError: 2609 return False 2610 if pwd.getpwuid(0)[0] != 'root': 2611 return False 2612 if grp.getgrgid(0)[0] != 'root': 2613 return False 2614 return True 2615 2616 2617@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2618@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2619class NumericOwnerTest(unittest.TestCase): 2620 # mock the following: 2621 # os.chown: so we can test what's being called 2622 # os.chmod: so the modes are not actually changed. if they are, we can't 2623 # delete the files/directories 2624 # os.geteuid: so we can lie and say we're root (uid = 0) 2625 2626 @staticmethod 2627 def _make_test_archive(filename_1, dirname_1, filename_2): 2628 # the file contents to write 2629 fobj = io.BytesIO(b"content") 2630 2631 # create a tar file with a file, a directory, and a file within that 2632 # directory. Assign various .uid/.gid values to them 2633 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2634 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2635 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2636 ] 2637 with tarfile.open(tmpname, 'w') as tarfl: 2638 for name, uid, gid, typ, contents in items: 2639 t = tarfile.TarInfo(name) 2640 t.uid = uid 2641 t.gid = gid 2642 t.uname = 'root' 2643 t.gname = 'root' 2644 t.type = typ 2645 tarfl.addfile(t, contents) 2646 2647 # return the full pathname to the tar file 2648 return tmpname 2649 2650 @staticmethod 2651 @contextmanager 2652 def _setup_test(mock_geteuid): 2653 mock_geteuid.return_value = 0 # lie and say we're root 2654 fname = 'numeric-owner-testfile' 2655 dirname = 'dir' 2656 2657 # the names we want stored in the tarfile 2658 filename_1 = fname 2659 dirname_1 = dirname 2660 filename_2 = os.path.join(dirname, fname) 2661 2662 # create the tarfile with the contents we're after 2663 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2664 dirname_1, 2665 filename_2) 2666 2667 # open the tarfile for reading. yield it and the names of the items 2668 # we stored into the file 2669 with tarfile.open(tar_filename) as tarfl: 2670 yield tarfl, filename_1, dirname_1, filename_2 2671 2672 @unittest.mock.patch('os.chown') 2673 @unittest.mock.patch('os.chmod') 2674 @unittest.mock.patch('os.geteuid') 2675 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2676 mock_chown): 2677 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2678 filename_2): 2679 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) 2680 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) 2681 2682 # convert to filesystem paths 2683 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2684 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2685 2686 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2687 unittest.mock.call(f_filename_2, 88, 87), 2688 ], 2689 any_order=True) 2690 2691 @unittest.mock.patch('os.chown') 2692 @unittest.mock.patch('os.chmod') 2693 @unittest.mock.patch('os.geteuid') 2694 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2695 mock_chown): 2696 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2697 filename_2): 2698 tarfl.extractall(TEMPDIR, numeric_owner=True) 2699 2700 # convert to filesystem paths 2701 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2702 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2703 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2704 2705 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2706 unittest.mock.call(f_dirname_1, 77, 76), 2707 unittest.mock.call(f_filename_2, 88, 87), 2708 ], 2709 any_order=True) 2710 2711 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2712 # because the uname and gname in the test file are 'root', and extract() 2713 # will look them up using pwd and grp to find their uid and gid, which we 2714 # test here to be 0. 2715 @unittest.skipUnless(root_is_uid_gid_0(), 2716 'uid=0,gid=0 must be named "root"') 2717 @unittest.mock.patch('os.chown') 2718 @unittest.mock.patch('os.chmod') 2719 @unittest.mock.patch('os.geteuid') 2720 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2721 mock_chown): 2722 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2723 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) 2724 2725 # convert to filesystem paths 2726 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2727 2728 mock_chown.assert_called_with(f_filename_1, 0, 0) 2729 2730 @unittest.mock.patch('os.geteuid') 2731 def test_keyword_only(self, mock_geteuid): 2732 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2733 self.assertRaises(TypeError, 2734 tarfl.extract, filename_1, TEMPDIR, False, True) 2735 2736 2737def setUpModule(): 2738 support.unlink(TEMPDIR) 2739 os.makedirs(TEMPDIR) 2740 2741 global testtarnames 2742 testtarnames = [tarname] 2743 with open(tarname, "rb") as fobj: 2744 data = fobj.read() 2745 2746 # Create compressed tarfiles. 2747 for c in GzipTest, Bz2Test, LzmaTest: 2748 if c.open: 2749 support.unlink(c.tarname) 2750 testtarnames.append(c.tarname) 2751 with c.open(c.tarname, "wb") as tar: 2752 tar.write(data) 2753 2754def tearDownModule(): 2755 if os.path.exists(TEMPDIR): 2756 support.rmtree(TEMPDIR) 2757 2758if __name__ == "__main__": 2759 unittest.main() 2760