1import sys 2import os 3import shutil 4import StringIO 5from binascii import unhexlify 6from hashlib import md5 7from random import Random 8import errno 9 10import unittest 11import tarfile 12 13from test import test_support 14from test import test_support as support 15 16# Check for our compression modules. 17try: 18 import gzip 19 gzip.GzipFile 20except (ImportError, AttributeError): 21 gzip = None 22try: 23 import bz2 24except ImportError: 25 bz2 = None 26 27def md5sum(data): 28 return md5(data).hexdigest() 29 30TEMPDIR = os.path.abspath(test_support.TESTFN) 31tarname = test_support.findfile("testtar.tar") 32gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 33bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 34tmpname = os.path.join(TEMPDIR, "tmp.tar") 35 36md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" 37md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" 38 39 40class ReadTest(unittest.TestCase): 41 42 tarname = tarname 43 mode = "r:" 44 45 def setUp(self): 46 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 47 48 def tearDown(self): 49 self.tar.close() 50 51 52class UstarReadTest(ReadTest): 53 54 def test_fileobj_regular_file(self): 55 tarinfo = self.tar.getmember("ustar/regtype") 56 fobj = self.tar.extractfile(tarinfo) 57 data = fobj.read() 58 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 59 "regular file extraction failed") 60 61 def test_fileobj_readlines(self): 62 self.tar.extract("ustar/regtype", TEMPDIR) 63 tarinfo = self.tar.getmember("ustar/regtype") 64 fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") 65 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1: 66 lines1 = fobj1.readlines() 67 fobj2 = self.tar.extractfile(tarinfo) 68 69 lines2 = fobj2.readlines() 70 self.assertTrue(lines1 == lines2, 71 "fileobj.readlines() failed") 72 self.assertTrue(len(lines2) == 114, 73 "fileobj.readlines() failed") 74 self.assertTrue(lines2[83] == 75 "I will gladly admit that Python is not the fastest running scripting language.\n", 76 "fileobj.readlines() failed") 77 78 def test_fileobj_iter(self): 79 self.tar.extract("ustar/regtype", TEMPDIR) 80 tarinfo = self.tar.getmember("ustar/regtype") 81 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1: 82 lines1 = fobj1.readlines() 83 fobj2 = self.tar.extractfile(tarinfo) 84 lines2 = [line for line in fobj2] 85 self.assertTrue(lines1 == lines2, 86 "fileobj.__iter__() failed") 87 88 def test_fileobj_seek(self): 89 self.tar.extract("ustar/regtype", TEMPDIR) 90 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 91 data = fobj.read() 92 93 tarinfo = self.tar.getmember("ustar/regtype") 94 fobj = self.tar.extractfile(tarinfo) 95 96 text = fobj.read() 97 fobj.seek(0) 98 self.assertTrue(0 == fobj.tell(), 99 "seek() to file's start failed") 100 fobj.seek(2048, 0) 101 self.assertTrue(2048 == fobj.tell(), 102 "seek() to absolute position failed") 103 fobj.seek(-1024, 1) 104 self.assertTrue(1024 == fobj.tell(), 105 "seek() to negative relative position failed") 106 fobj.seek(1024, 1) 107 self.assertTrue(2048 == fobj.tell(), 108 "seek() to positive relative position failed") 109 s = fobj.read(10) 110 self.assertTrue(s == data[2048:2058], 111 "read() after seek failed") 112 fobj.seek(0, 2) 113 self.assertTrue(tarinfo.size == fobj.tell(), 114 "seek() to file's end failed") 115 self.assertTrue(fobj.read() == "", 116 "read() at file's end did not return empty string") 117 fobj.seek(-tarinfo.size, 2) 118 self.assertTrue(0 == fobj.tell(), 119 "relative seek() to file's start failed") 120 fobj.seek(512) 121 s1 = fobj.readlines() 122 fobj.seek(512) 123 s2 = fobj.readlines() 124 self.assertTrue(s1 == s2, 125 "readlines() after seek failed") 126 fobj.seek(0) 127 self.assertTrue(len(fobj.readline()) == fobj.tell(), 128 "tell() after readline() failed") 129 fobj.seek(512) 130 self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), 131 "tell() after seek() and readline() failed") 132 fobj.seek(0) 133 line = fobj.readline() 134 self.assertTrue(fobj.read() == data[len(line):], 135 "read() after readline() failed") 136 fobj.close() 137 138 # Test if symbolic and hard links are resolved by extractfile(). The 139 # test link members each point to a regular member whose data is 140 # supposed to be exported. 141 def _test_fileobj_link(self, lnktype, regtype): 142 a = self.tar.extractfile(lnktype) 143 b = self.tar.extractfile(regtype) 144 self.assertEqual(a.name, b.name) 145 146 def test_fileobj_link1(self): 147 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 148 149 def test_fileobj_link2(self): 150 self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") 151 152 def test_fileobj_symlink1(self): 153 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 154 155 def test_fileobj_symlink2(self): 156 self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") 157 158 def test_issue14160(self): 159 self._test_fileobj_link("symtype2", "ustar/regtype") 160 161 162class ListTest(ReadTest, unittest.TestCase): 163 164 # Override setUp to use default encoding (UTF-8) 165 def setUp(self): 166 self.tar = tarfile.open(self.tarname, mode=self.mode) 167 168 def test_list(self): 169 with test_support.captured_stdout() as t: 170 self.tar.list(verbose=False) 171 out = t.getvalue() 172 self.assertIn('ustar/conttype', out) 173 self.assertIn('ustar/regtype', out) 174 self.assertIn('ustar/lnktype', out) 175 self.assertIn('ustar' + ('/12345' * 40) + '67/longname', out) 176 self.assertIn('./ustar/linktest2/symtype', out) 177 self.assertIn('./ustar/linktest2/lnktype', out) 178 # Make sure it puts trailing slash for directory 179 self.assertIn('ustar/dirtype/', out) 180 self.assertIn('ustar/dirtype-with-size/', out) 181 # Make sure it is able to print non-ASCII characters 182 self.assertIn('ustar/umlauts-' 183 '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out) 184 self.assertIn('misc/regtype-hpux-signed-chksum-' 185 '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out) 186 self.assertIn('misc/regtype-old-v7-signed-chksum-' 187 '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out) 188 # Make sure it prints files separated by one newline without any 189 # 'ls -l'-like accessories if verbose flag is not being used 190 # ... 191 # ustar/conttype 192 # ustar/regtype 193 # ... 194 self.assertRegexpMatches(out, r'ustar/conttype ?\r?\n' 195 r'ustar/regtype ?\r?\n') 196 # Make sure it does not print the source of link without verbose flag 197 self.assertNotIn('link to', out) 198 self.assertNotIn('->', out) 199 200 def test_list_verbose(self): 201 with test_support.captured_stdout() as t: 202 self.tar.list(verbose=True) 203 out = t.getvalue() 204 # Make sure it prints files separated by one newline with 'ls -l'-like 205 # accessories if verbose flag is being used 206 # ... 207 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 208 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 209 # ... 210 self.assertRegexpMatches(out, (r'-rw-r--r-- tarfile/tarfile\s+7011 ' 211 r'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 212 r'ustar/\w+type ?\r?\n') * 2) 213 # Make sure it prints the source of link with verbose flag 214 self.assertIn('ustar/symtype -> regtype', out) 215 self.assertIn('./ustar/linktest2/symtype -> ../linktest1/regtype', out) 216 self.assertIn('./ustar/linktest2/lnktype link to ' 217 './ustar/linktest1/regtype', out) 218 self.assertIn('gnu' + ('/123' * 125) + '/longlink link to gnu' + 219 ('/123' * 125) + '/longname', out) 220 self.assertIn('pax' + ('/123' * 125) + '/longlink link to pax' + 221 ('/123' * 125) + '/longname', out) 222 223 224class GzipListTest(ListTest): 225 tarname = gzipname 226 mode = "r:gz" 227 taropen = tarfile.TarFile.gzopen 228 229 230class Bz2ListTest(ListTest): 231 tarname = bz2name 232 mode = "r:bz2" 233 taropen = tarfile.TarFile.bz2open 234 235 236class CommonReadTest(ReadTest): 237 238 def test_empty_tarfile(self): 239 # Test for issue6123: Allow opening empty archives. 240 # This test checks if tarfile.open() is able to open an empty tar 241 # archive successfully. Note that an empty tar archive is not the 242 # same as an empty file! 243 with tarfile.open(tmpname, self.mode.replace("r", "w")): 244 pass 245 try: 246 tar = tarfile.open(tmpname, self.mode) 247 tar.getnames() 248 except tarfile.ReadError: 249 self.fail("tarfile.open() failed on empty archive") 250 else: 251 self.assertListEqual(tar.getmembers(), []) 252 finally: 253 tar.close() 254 255 def test_null_tarfile(self): 256 # Test for issue6123: Allow opening empty archives. 257 # This test guarantees that tarfile.open() does not treat an empty 258 # file as an empty tar archive. 259 with open(tmpname, "wb"): 260 pass 261 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 262 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 263 264 def test_non_existent_tarfile(self): 265 # Test for issue11513: prevent non-existent gzipped tarfiles raising 266 # multiple exceptions. 267 exctype = OSError if '|' in self.mode else IOError 268 with self.assertRaisesRegexp(exctype, "xxx") as ex: 269 tarfile.open("xxx", self.mode) 270 self.assertEqual(ex.exception.errno, errno.ENOENT) 271 272 def test_ignore_zeros(self): 273 # Test TarFile's ignore_zeros option. 274 if self.mode.endswith(":gz"): 275 _open = gzip.GzipFile 276 elif self.mode.endswith(":bz2"): 277 _open = bz2.BZ2File 278 else: 279 _open = open 280 281 # generate 512 pseudorandom bytes 282 data = unhexlify('%1024x' % Random(0).getrandbits(512*8)) 283 for char in ('\0', 'a'): 284 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 285 # are ignored correctly. 286 with _open(tmpname, "wb") as fobj: 287 fobj.write(char * 1024) 288 tarinfo = tarfile.TarInfo("foo") 289 tarinfo.size = len(data) 290 fobj.write(tarinfo.tobuf()) 291 fobj.write(data) 292 293 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 294 try: 295 self.assertListEqual(tar.getnames(), ["foo"], 296 "ignore_zeros=True should have skipped the %r-blocks" % char) 297 finally: 298 tar.close() 299 300 def test_premature_end_of_archive(self): 301 for size in (512, 600, 1024, 1200): 302 with tarfile.open(tmpname, "w:") as tar: 303 t = tarfile.TarInfo("foo") 304 t.size = 1024 305 tar.addfile(t, StringIO.StringIO("a" * 1024)) 306 307 with open(tmpname, "r+b") as fobj: 308 fobj.truncate(size) 309 310 with tarfile.open(tmpname) as tar: 311 with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): 312 for t in tar: 313 pass 314 315 with tarfile.open(tmpname) as tar: 316 t = tar.next() 317 318 with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): 319 tar.extract(t, TEMPDIR) 320 321 with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"): 322 tar.extractfile(t).read() 323 324 325class MiscReadTest(CommonReadTest): 326 taropen = tarfile.TarFile.taropen 327 328 def test_no_name_argument(self): 329 with open(self.tarname, "rb") as fobj: 330 tar = tarfile.open(fileobj=fobj, mode=self.mode) 331 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 332 333 def test_no_name_attribute(self): 334 with open(self.tarname, "rb") as fobj: 335 data = fobj.read() 336 fobj = StringIO.StringIO(data) 337 self.assertRaises(AttributeError, getattr, fobj, "name") 338 tar = tarfile.open(fileobj=fobj, mode=self.mode) 339 self.assertEqual(tar.name, None) 340 341 def test_empty_name_attribute(self): 342 with open(self.tarname, "rb") as fobj: 343 data = fobj.read() 344 fobj = StringIO.StringIO(data) 345 fobj.name = "" 346 tar = tarfile.open(fileobj=fobj, mode=self.mode) 347 self.assertEqual(tar.name, None) 348 349 def test_illegal_mode_arg(self): 350 with open(tmpname, 'wb'): 351 pass 352 self.addCleanup(os.unlink, tmpname) 353 with self.assertRaisesRegexp(ValueError, 'mode must be '): 354 tar = self.taropen(tmpname, 'q') 355 with self.assertRaisesRegexp(ValueError, 'mode must be '): 356 tar = self.taropen(tmpname, 'rw') 357 with self.assertRaisesRegexp(ValueError, 'mode must be '): 358 tar = self.taropen(tmpname, '') 359 360 def test_fileobj_with_offset(self): 361 # Skip the first member and store values from the second member 362 # of the testtar. 363 tar = tarfile.open(self.tarname, mode=self.mode) 364 try: 365 tar.next() 366 t = tar.next() 367 name = t.name 368 offset = t.offset 369 data = tar.extractfile(t).read() 370 finally: 371 tar.close() 372 373 # Open the testtar and seek to the offset of the second member. 374 if self.mode.endswith(":gz"): 375 _open = gzip.GzipFile 376 elif self.mode.endswith(":bz2"): 377 _open = bz2.BZ2File 378 else: 379 _open = open 380 fobj = _open(self.tarname, "rb") 381 try: 382 fobj.seek(offset) 383 384 # Test if the tarfile starts with the second member. 385 tar = tar.open(self.tarname, mode="r:", fileobj=fobj) 386 t = tar.next() 387 self.assertEqual(t.name, name) 388 # Read to the end of fileobj and test if seeking back to the 389 # beginning works. 390 tar.getmembers() 391 self.assertEqual(tar.extractfile(t).read(), data, 392 "seek back did not work") 393 tar.close() 394 finally: 395 fobj.close() 396 397 def test_fail_comp(self): 398 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 399 if self.mode == "r:": 400 self.skipTest('needs a gz or bz2 mode') 401 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 402 with open(tarname, "rb") as fobj: 403 self.assertRaises(tarfile.ReadError, tarfile.open, 404 fileobj=fobj, mode=self.mode) 405 406 def test_v7_dirtype(self): 407 # Test old style dirtype member (bug #1336623): 408 # Old V7 tars create directory members using an AREGTYPE 409 # header with a "/" appended to the filename field. 410 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 411 self.assertTrue(tarinfo.type == tarfile.DIRTYPE, 412 "v7 dirtype failed") 413 414 def test_xstar_type(self): 415 # The xstar format stores extra atime and ctime fields inside the 416 # space reserved for the prefix field. The prefix field must be 417 # ignored in this case, otherwise it will mess up the name. 418 try: 419 self.tar.getmember("misc/regtype-xstar") 420 except KeyError: 421 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 422 423 def test_check_members(self): 424 for tarinfo in self.tar: 425 self.assertTrue(int(tarinfo.mtime) == 07606136617, 426 "wrong mtime for %s" % tarinfo.name) 427 if not tarinfo.name.startswith("ustar/"): 428 continue 429 self.assertTrue(tarinfo.uname == "tarfile", 430 "wrong uname for %s" % tarinfo.name) 431 432 def test_find_members(self): 433 self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", 434 "could not find all members") 435 436 def test_extract_hardlink(self): 437 # Test hardlink extraction (e.g. bug #857297). 438 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 439 tar.extract("ustar/regtype", TEMPDIR) 440 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype")) 441 442 tar.extract("ustar/lnktype", TEMPDIR) 443 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype")) 444 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 445 data = f.read() 446 self.assertEqual(md5sum(data), md5_regtype) 447 448 tar.extract("ustar/symtype", TEMPDIR) 449 self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype")) 450 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 451 data = f.read() 452 self.assertEqual(md5sum(data), md5_regtype) 453 454 def test_extractall(self): 455 # Test if extractall() correctly restores directory permissions 456 # and times (see issue1735). 457 tar = tarfile.open(tarname, encoding="iso8859-1") 458 try: 459 directories = [t for t in tar if t.isdir()] 460 tar.extractall(TEMPDIR, directories) 461 for tarinfo in directories: 462 path = os.path.join(TEMPDIR, tarinfo.name) 463 if sys.platform != "win32": 464 # Win32 has no support for fine grained permissions. 465 self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777) 466 self.assertEqual(tarinfo.mtime, os.path.getmtime(path)) 467 finally: 468 tar.close() 469 470 def test_init_close_fobj(self): 471 # Issue #7341: Close the internal file object in the TarFile 472 # constructor in case of an error. For the test we rely on 473 # the fact that opening an empty file raises a ReadError. 474 empty = os.path.join(TEMPDIR, "empty") 475 with open(empty, "wb") as fobj: 476 fobj.write("") 477 478 try: 479 tar = object.__new__(tarfile.TarFile) 480 try: 481 tar.__init__(empty) 482 except tarfile.ReadError: 483 self.assertTrue(tar.fileobj.closed) 484 else: 485 self.fail("ReadError not raised") 486 finally: 487 support.unlink(empty) 488 489 def test_parallel_iteration(self): 490 # Issue #16601: Restarting iteration over tarfile continued 491 # from where it left off. 492 with tarfile.open(self.tarname) as tar: 493 for m1, m2 in zip(tar, tar): 494 self.assertEqual(m1.offset, m2.offset) 495 self.assertEqual(m1.name, m2.name) 496 497 498class StreamReadTest(CommonReadTest): 499 500 mode="r|" 501 502 def test_fileobj_regular_file(self): 503 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 504 fobj = self.tar.extractfile(tarinfo) 505 data = fobj.read() 506 self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), 507 "regular file extraction failed") 508 509 def test_provoke_stream_error(self): 510 tarinfos = self.tar.getmembers() 511 f = self.tar.extractfile(tarinfos[0]) # read the first member 512 self.assertRaises(tarfile.StreamError, f.read) 513 514 def test_compare_members(self): 515 tar1 = tarfile.open(tarname, encoding="iso8859-1") 516 try: 517 tar2 = self.tar 518 519 while True: 520 t1 = tar1.next() 521 t2 = tar2.next() 522 if t1 is None: 523 break 524 self.assertTrue(t2 is not None, "stream.next() failed.") 525 526 if t2.islnk() or t2.issym(): 527 self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) 528 continue 529 530 v1 = tar1.extractfile(t1) 531 v2 = tar2.extractfile(t2) 532 if v1 is None: 533 continue 534 self.assertTrue(v2 is not None, "stream.extractfile() failed") 535 self.assertTrue(v1.read() == v2.read(), "stream extraction failed") 536 finally: 537 tar1.close() 538 539 540class DetectReadTest(unittest.TestCase): 541 542 def _testfunc_file(self, name, mode): 543 try: 544 tar = tarfile.open(name, mode) 545 except tarfile.ReadError: 546 self.fail() 547 else: 548 tar.close() 549 550 def _testfunc_fileobj(self, name, mode): 551 try: 552 tar = tarfile.open(name, mode, fileobj=open(name, "rb")) 553 except tarfile.ReadError: 554 self.fail() 555 else: 556 tar.close() 557 558 def _test_modes(self, testfunc): 559 testfunc(tarname, "r") 560 testfunc(tarname, "r:") 561 testfunc(tarname, "r:*") 562 testfunc(tarname, "r|") 563 testfunc(tarname, "r|*") 564 565 if gzip: 566 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") 567 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") 568 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") 569 self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") 570 571 testfunc(gzipname, "r") 572 testfunc(gzipname, "r:*") 573 testfunc(gzipname, "r:gz") 574 testfunc(gzipname, "r|*") 575 testfunc(gzipname, "r|gz") 576 577 if bz2: 578 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") 579 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") 580 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") 581 self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") 582 583 testfunc(bz2name, "r") 584 testfunc(bz2name, "r:*") 585 testfunc(bz2name, "r:bz2") 586 testfunc(bz2name, "r|*") 587 testfunc(bz2name, "r|bz2") 588 589 def test_detect_file(self): 590 self._test_modes(self._testfunc_file) 591 592 def test_detect_fileobj(self): 593 self._test_modes(self._testfunc_fileobj) 594 595 @unittest.skipUnless(bz2, 'requires bz2') 596 def test_detect_stream_bz2(self): 597 # Originally, tarfile's stream detection looked for the string 598 # "BZh91" at the start of the file. This is incorrect because 599 # the '9' represents the blocksize (900kB). If the file was 600 # compressed using another blocksize autodetection fails. 601 with open(tarname, "rb") as fobj: 602 data = fobj.read() 603 604 # Compress with blocksize 100kB, the file starts with "BZh11". 605 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 606 fobj.write(data) 607 608 self._testfunc_file(tmpname, "r|*") 609 610 611class MemberReadTest(ReadTest): 612 613 def _test_member(self, tarinfo, chksum=None, **kwargs): 614 if chksum is not None: 615 self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, 616 "wrong md5sum for %s" % tarinfo.name) 617 618 kwargs["mtime"] = 07606136617 619 kwargs["uid"] = 1000 620 kwargs["gid"] = 100 621 if "old-v7" not in tarinfo.name: 622 # V7 tar can't handle alphabetic owners. 623 kwargs["uname"] = "tarfile" 624 kwargs["gname"] = "tarfile" 625 for k, v in kwargs.iteritems(): 626 self.assertTrue(getattr(tarinfo, k) == v, 627 "wrong value in %s field of %s" % (k, tarinfo.name)) 628 629 def test_find_regtype(self): 630 tarinfo = self.tar.getmember("ustar/regtype") 631 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 632 633 def test_find_conttype(self): 634 tarinfo = self.tar.getmember("ustar/conttype") 635 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 636 637 def test_find_dirtype(self): 638 tarinfo = self.tar.getmember("ustar/dirtype") 639 self._test_member(tarinfo, size=0) 640 641 def test_find_dirtype_with_size(self): 642 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 643 self._test_member(tarinfo, size=255) 644 645 def test_find_lnktype(self): 646 tarinfo = self.tar.getmember("ustar/lnktype") 647 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 648 649 def test_find_symtype(self): 650 tarinfo = self.tar.getmember("ustar/symtype") 651 self._test_member(tarinfo, size=0, linkname="regtype") 652 653 def test_find_blktype(self): 654 tarinfo = self.tar.getmember("ustar/blktype") 655 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 656 657 def test_find_chrtype(self): 658 tarinfo = self.tar.getmember("ustar/chrtype") 659 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 660 661 def test_find_fifotype(self): 662 tarinfo = self.tar.getmember("ustar/fifotype") 663 self._test_member(tarinfo, size=0) 664 665 def test_find_sparse(self): 666 tarinfo = self.tar.getmember("ustar/sparse") 667 self._test_member(tarinfo, size=86016, chksum=md5_sparse) 668 669 def test_find_umlauts(self): 670 tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 671 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 672 673 def test_find_ustar_longname(self): 674 name = "ustar/" + "12345/" * 39 + "1234567/longname" 675 self.assertIn(name, self.tar.getnames()) 676 677 def test_find_regtype_oldv7(self): 678 tarinfo = self.tar.getmember("misc/regtype-old-v7") 679 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 680 681 def test_find_pax_umlauts(self): 682 self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") 683 tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 684 self._test_member(tarinfo, size=7011, chksum=md5_regtype) 685 686 687class LongnameTest(ReadTest): 688 689 def test_read_longname(self): 690 # Test reading of longname (bug #1471427). 691 longname = self.subdir + "/" + "123/" * 125 + "longname" 692 try: 693 tarinfo = self.tar.getmember(longname) 694 except KeyError: 695 self.fail("longname not found") 696 self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") 697 698 def test_read_longlink(self): 699 longname = self.subdir + "/" + "123/" * 125 + "longname" 700 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 701 try: 702 tarinfo = self.tar.getmember(longlink) 703 except KeyError: 704 self.fail("longlink not found") 705 self.assertTrue(tarinfo.linkname == longname, "linkname wrong") 706 707 def test_truncated_longname(self): 708 longname = self.subdir + "/" + "123/" * 125 + "longname" 709 tarinfo = self.tar.getmember(longname) 710 offset = tarinfo.offset 711 self.tar.fileobj.seek(offset) 712 fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512)) 713 self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) 714 715 def test_header_offset(self): 716 # Test if the start offset of the TarInfo object includes 717 # the preceding extended header. 718 longname = self.subdir + "/" + "123/" * 125 + "longname" 719 offset = self.tar.getmember(longname).offset 720 fobj = open(tarname) 721 fobj.seek(offset) 722 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512)) 723 self.assertEqual(tarinfo.type, self.longnametype) 724 725 726class GNUReadTest(LongnameTest): 727 728 subdir = "gnu" 729 longnametype = tarfile.GNUTYPE_LONGNAME 730 731 def test_sparse_file(self): 732 tarinfo1 = self.tar.getmember("ustar/sparse") 733 fobj1 = self.tar.extractfile(tarinfo1) 734 tarinfo2 = self.tar.getmember("gnu/sparse") 735 fobj2 = self.tar.extractfile(tarinfo2) 736 self.assertTrue(fobj1.read() == fobj2.read(), 737 "sparse file extraction failed") 738 739 740class PaxReadTest(LongnameTest): 741 742 subdir = "pax" 743 longnametype = tarfile.XHDTYPE 744 745 def test_pax_global_headers(self): 746 tar = tarfile.open(tarname, encoding="iso8859-1") 747 try: 748 749 tarinfo = tar.getmember("pax/regtype1") 750 self.assertEqual(tarinfo.uname, "foo") 751 self.assertEqual(tarinfo.gname, "bar") 752 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 753 754 tarinfo = tar.getmember("pax/regtype2") 755 self.assertEqual(tarinfo.uname, "") 756 self.assertEqual(tarinfo.gname, "bar") 757 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 758 759 tarinfo = tar.getmember("pax/regtype3") 760 self.assertEqual(tarinfo.uname, "tarfile") 761 self.assertEqual(tarinfo.gname, "tarfile") 762 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 763 finally: 764 tar.close() 765 766 def test_pax_number_fields(self): 767 # All following number fields are read from the pax header. 768 tar = tarfile.open(tarname, encoding="iso8859-1") 769 try: 770 tarinfo = tar.getmember("pax/regtype4") 771 self.assertEqual(tarinfo.size, 7011) 772 self.assertEqual(tarinfo.uid, 123) 773 self.assertEqual(tarinfo.gid, 123) 774 self.assertEqual(tarinfo.mtime, 1041808783.0) 775 self.assertEqual(type(tarinfo.mtime), float) 776 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 777 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 778 finally: 779 tar.close() 780 781 782class WriteTestBase(unittest.TestCase): 783 # Put all write tests in here that are supposed to be tested 784 # in all possible mode combinations. 785 786 def test_fileobj_no_close(self): 787 fobj = StringIO.StringIO() 788 tar = tarfile.open(fileobj=fobj, mode=self.mode) 789 tar.addfile(tarfile.TarInfo("foo")) 790 tar.close() 791 self.assertTrue(fobj.closed is False, "external fileobjs must never closed") 792 # Issue #20238: Incomplete gzip output with mode="w:gz" 793 data = fobj.getvalue() 794 del tar 795 test_support.gc_collect() 796 self.assertFalse(fobj.closed) 797 self.assertEqual(data, fobj.getvalue()) 798 799 800class WriteTest(WriteTestBase): 801 802 mode = "w:" 803 804 def test_100_char_name(self): 805 # The name field in a tar header stores strings of at most 100 chars. 806 # If a string is shorter than 100 chars it has to be padded with '\0', 807 # which implies that a string of exactly 100 chars is stored without 808 # a trailing '\0'. 809 name = "0123456789" * 10 810 tar = tarfile.open(tmpname, self.mode) 811 try: 812 t = tarfile.TarInfo(name) 813 tar.addfile(t) 814 finally: 815 tar.close() 816 817 tar = tarfile.open(tmpname) 818 try: 819 self.assertTrue(tar.getnames()[0] == name, 820 "failed to store 100 char filename") 821 finally: 822 tar.close() 823 824 def test_tar_size(self): 825 # Test for bug #1013882. 826 tar = tarfile.open(tmpname, self.mode) 827 try: 828 path = os.path.join(TEMPDIR, "file") 829 with open(path, "wb") as fobj: 830 fobj.write("aaa") 831 tar.add(path) 832 finally: 833 tar.close() 834 self.assertTrue(os.path.getsize(tmpname) > 0, 835 "tarfile is empty") 836 837 # The test_*_size tests test for bug #1167128. 838 def test_file_size(self): 839 tar = tarfile.open(tmpname, self.mode) 840 try: 841 842 path = os.path.join(TEMPDIR, "file") 843 with open(path, "wb"): 844 pass 845 tarinfo = tar.gettarinfo(path) 846 self.assertEqual(tarinfo.size, 0) 847 848 with open(path, "wb") as fobj: 849 fobj.write("aaa") 850 tarinfo = tar.gettarinfo(path) 851 self.assertEqual(tarinfo.size, 3) 852 finally: 853 tar.close() 854 855 def test_directory_size(self): 856 path = os.path.join(TEMPDIR, "directory") 857 os.mkdir(path) 858 try: 859 tar = tarfile.open(tmpname, self.mode) 860 try: 861 tarinfo = tar.gettarinfo(path) 862 self.assertEqual(tarinfo.size, 0) 863 finally: 864 tar.close() 865 finally: 866 os.rmdir(path) 867 868 def test_link_size(self): 869 if hasattr(os, "link"): 870 link = os.path.join(TEMPDIR, "link") 871 target = os.path.join(TEMPDIR, "link_target") 872 with open(target, "wb") as fobj: 873 fobj.write("aaa") 874 os.link(target, link) 875 try: 876 tar = tarfile.open(tmpname, self.mode) 877 try: 878 # Record the link target in the inodes list. 879 tar.gettarinfo(target) 880 tarinfo = tar.gettarinfo(link) 881 self.assertEqual(tarinfo.size, 0) 882 finally: 883 tar.close() 884 finally: 885 os.remove(target) 886 os.remove(link) 887 888 def test_symlink_size(self): 889 if hasattr(os, "symlink"): 890 path = os.path.join(TEMPDIR, "symlink") 891 os.symlink("link_target", path) 892 try: 893 tar = tarfile.open(tmpname, self.mode) 894 try: 895 tarinfo = tar.gettarinfo(path) 896 self.assertEqual(tarinfo.size, 0) 897 finally: 898 tar.close() 899 finally: 900 os.remove(path) 901 902 def test_add_self(self): 903 # Test for #1257255. 904 dstname = os.path.abspath(tmpname) 905 tar = tarfile.open(tmpname, self.mode) 906 try: 907 self.assertTrue(tar.name == dstname, "archive name must be absolute") 908 tar.add(dstname) 909 self.assertTrue(tar.getnames() == [], "added the archive to itself") 910 911 cwd = os.getcwd() 912 os.chdir(TEMPDIR) 913 tar.add(dstname) 914 os.chdir(cwd) 915 self.assertTrue(tar.getnames() == [], "added the archive to itself") 916 finally: 917 tar.close() 918 919 def test_exclude(self): 920 tempdir = os.path.join(TEMPDIR, "exclude") 921 os.mkdir(tempdir) 922 try: 923 for name in ("foo", "bar", "baz"): 924 name = os.path.join(tempdir, name) 925 open(name, "wb").close() 926 927 exclude = os.path.isfile 928 929 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 930 try: 931 with test_support.check_warnings(("use the filter argument", 932 DeprecationWarning)): 933 tar.add(tempdir, arcname="empty_dir", exclude=exclude) 934 finally: 935 tar.close() 936 937 tar = tarfile.open(tmpname, "r") 938 try: 939 self.assertEqual(len(tar.getmembers()), 1) 940 self.assertEqual(tar.getnames()[0], "empty_dir") 941 finally: 942 tar.close() 943 finally: 944 shutil.rmtree(tempdir) 945 946 def test_filter(self): 947 tempdir = os.path.join(TEMPDIR, "filter") 948 os.mkdir(tempdir) 949 try: 950 for name in ("foo", "bar", "baz"): 951 name = os.path.join(tempdir, name) 952 open(name, "wb").close() 953 954 def filter(tarinfo): 955 if os.path.basename(tarinfo.name) == "bar": 956 return 957 tarinfo.uid = 123 958 tarinfo.uname = "foo" 959 return tarinfo 960 961 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 962 try: 963 tar.add(tempdir, arcname="empty_dir", filter=filter) 964 finally: 965 tar.close() 966 967 tar = tarfile.open(tmpname, "r") 968 try: 969 for tarinfo in tar: 970 self.assertEqual(tarinfo.uid, 123) 971 self.assertEqual(tarinfo.uname, "foo") 972 self.assertEqual(len(tar.getmembers()), 3) 973 finally: 974 tar.close() 975 finally: 976 shutil.rmtree(tempdir) 977 978 # Guarantee that stored pathnames are not modified. Don't 979 # remove ./ or ../ or double slashes. Still make absolute 980 # pathnames relative. 981 # For details see bug #6054. 982 def _test_pathname(self, path, cmp_path=None, dir=False): 983 # Create a tarfile with an empty member named path 984 # and compare the stored name with the original. 985 foo = os.path.join(TEMPDIR, "foo") 986 if not dir: 987 open(foo, "w").close() 988 else: 989 os.mkdir(foo) 990 991 tar = tarfile.open(tmpname, self.mode) 992 try: 993 tar.add(foo, arcname=path) 994 finally: 995 tar.close() 996 997 tar = tarfile.open(tmpname, "r") 998 try: 999 t = tar.next() 1000 finally: 1001 tar.close() 1002 1003 if not dir: 1004 os.remove(foo) 1005 else: 1006 os.rmdir(foo) 1007 1008 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1009 1010 def test_pathnames(self): 1011 self._test_pathname("foo") 1012 self._test_pathname(os.path.join("foo", ".", "bar")) 1013 self._test_pathname(os.path.join("foo", "..", "bar")) 1014 self._test_pathname(os.path.join(".", "foo")) 1015 self._test_pathname(os.path.join(".", "foo", ".")) 1016 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1017 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1018 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1019 self._test_pathname(os.path.join("..", "foo")) 1020 self._test_pathname(os.path.join("..", "foo", "..")) 1021 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1022 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1023 1024 self._test_pathname("foo" + os.sep + os.sep + "bar") 1025 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1026 1027 def test_abs_pathnames(self): 1028 if sys.platform == "win32": 1029 self._test_pathname("C:\\foo", "foo") 1030 else: 1031 self._test_pathname("/foo", "foo") 1032 self._test_pathname("///foo", "foo") 1033 1034 def test_cwd(self): 1035 # Test adding the current working directory. 1036 with support.change_cwd(TEMPDIR): 1037 tar = tarfile.open(tmpname, self.mode) 1038 try: 1039 tar.add(".") 1040 finally: 1041 tar.close() 1042 1043 tar = tarfile.open(tmpname, "r") 1044 try: 1045 for t in tar: 1046 self.assertTrue(t.name == "." or t.name.startswith("./")) 1047 finally: 1048 tar.close() 1049 1050 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 1051 def test_extractall_symlinks(self): 1052 # Test if extractall works properly when tarfile contains symlinks 1053 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1054 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1055 os.mkdir(tempdir) 1056 try: 1057 source_file = os.path.join(tempdir,'source') 1058 target_file = os.path.join(tempdir,'symlink') 1059 with open(source_file,'w') as f: 1060 f.write('something\n') 1061 os.symlink(source_file, target_file) 1062 tar = tarfile.open(temparchive,'w') 1063 tar.add(source_file, arcname=os.path.basename(source_file)) 1064 tar.add(target_file, arcname=os.path.basename(target_file)) 1065 tar.close() 1066 # Let's extract it to the location which contains the symlink 1067 tar = tarfile.open(temparchive,'r') 1068 # this should not raise OSError: [Errno 17] File exists 1069 try: 1070 tar.extractall(path=tempdir) 1071 except OSError: 1072 self.fail("extractall failed with symlinked files") 1073 finally: 1074 tar.close() 1075 finally: 1076 os.unlink(temparchive) 1077 shutil.rmtree(tempdir) 1078 1079 @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink") 1080 def test_extractall_broken_symlinks(self): 1081 # Test if extractall works properly when tarfile contains broken 1082 # symlinks 1083 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1084 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1085 os.mkdir(tempdir) 1086 try: 1087 source_file = os.path.join(tempdir,'source') 1088 target_file = os.path.join(tempdir,'symlink') 1089 with open(source_file,'w') as f: 1090 f.write('something\n') 1091 os.symlink(source_file, target_file) 1092 tar = tarfile.open(temparchive,'w') 1093 tar.add(target_file, arcname=os.path.basename(target_file)) 1094 tar.close() 1095 # remove the real file 1096 os.unlink(source_file) 1097 # Let's extract it to the location which contains the symlink 1098 tar = tarfile.open(temparchive,'r') 1099 # this should not raise OSError: [Errno 17] File exists 1100 try: 1101 tar.extractall(path=tempdir) 1102 except OSError: 1103 self.fail("extractall failed with broken symlinked files") 1104 finally: 1105 tar.close() 1106 finally: 1107 os.unlink(temparchive) 1108 shutil.rmtree(tempdir) 1109 1110 @unittest.skipUnless(hasattr(os, 'link'), "needs os.link") 1111 def test_extractall_hardlinks(self): 1112 # Test if extractall works properly when tarfile contains symlinks 1113 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1114 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1115 os.mkdir(tempdir) 1116 try: 1117 source_file = os.path.join(tempdir,'source') 1118 target_file = os.path.join(tempdir,'symlink') 1119 with open(source_file,'w') as f: 1120 f.write('something\n') 1121 os.link(source_file, target_file) 1122 tar = tarfile.open(temparchive,'w') 1123 tar.add(source_file, arcname=os.path.basename(source_file)) 1124 tar.add(target_file, arcname=os.path.basename(target_file)) 1125 tar.close() 1126 # Let's extract it to the location which contains the symlink 1127 tar = tarfile.open(temparchive,'r') 1128 # this should not raise OSError: [Errno 17] File exists 1129 try: 1130 tar.extractall(path=tempdir) 1131 except OSError: 1132 self.fail("extractall failed with linked files") 1133 finally: 1134 tar.close() 1135 finally: 1136 os.unlink(temparchive) 1137 shutil.rmtree(tempdir) 1138 1139 def test_open_nonwritable_fileobj(self): 1140 for exctype in IOError, EOFError, RuntimeError: 1141 class BadFile(StringIO.StringIO): 1142 first = True 1143 def write(self, data): 1144 if self.first: 1145 self.first = False 1146 raise exctype 1147 1148 f = BadFile() 1149 with self.assertRaises(exctype): 1150 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1151 format=tarfile.PAX_FORMAT, 1152 pax_headers={'non': 'empty'}) 1153 self.assertFalse(f.closed) 1154 1155class StreamWriteTest(WriteTestBase): 1156 1157 mode = "w|" 1158 1159 def test_stream_padding(self): 1160 # Test for bug #1543303. 1161 tar = tarfile.open(tmpname, self.mode) 1162 tar.close() 1163 1164 if self.mode.endswith("gz"): 1165 with gzip.GzipFile(tmpname) as fobj: 1166 data = fobj.read() 1167 elif self.mode.endswith("bz2"): 1168 dec = bz2.BZ2Decompressor() 1169 with open(tmpname, "rb") as fobj: 1170 data = fobj.read() 1171 data = dec.decompress(data) 1172 self.assertTrue(len(dec.unused_data) == 0, 1173 "found trailing data") 1174 else: 1175 with open(tmpname, "rb") as fobj: 1176 data = fobj.read() 1177 1178 self.assertTrue(data.count("\0") == tarfile.RECORDSIZE, 1179 "incorrect zero padding") 1180 1181 @unittest.skipIf(sys.platform == 'win32', 'not appropriate for Windows') 1182 @unittest.skipUnless(hasattr(os, 'umask'), 'requires os.umask') 1183 def test_file_mode(self): 1184 # Test for issue #8464: Create files with correct 1185 # permissions. 1186 if os.path.exists(tmpname): 1187 os.remove(tmpname) 1188 1189 original_umask = os.umask(0022) 1190 try: 1191 tar = tarfile.open(tmpname, self.mode) 1192 tar.close() 1193 mode = os.stat(tmpname).st_mode & 0777 1194 self.assertEqual(mode, 0644, "wrong file permissions") 1195 finally: 1196 os.umask(original_umask) 1197 1198 def test_issue13639(self): 1199 try: 1200 with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode): 1201 pass 1202 except UnicodeDecodeError: 1203 self.fail("_Stream failed to write unicode filename") 1204 1205 1206class GNUWriteTest(unittest.TestCase): 1207 # This testcase checks for correct creation of GNU Longname 1208 # and Longlink extended headers (cp. bug #812325). 1209 1210 def _length(self, s): 1211 blocks, remainder = divmod(len(s) + 1, 512) 1212 if remainder: 1213 blocks += 1 1214 return blocks * 512 1215 1216 def _calc_size(self, name, link=None): 1217 # Initial tar header 1218 count = 512 1219 1220 if len(name) > tarfile.LENGTH_NAME: 1221 # GNU longname extended header + longname 1222 count += 512 1223 count += self._length(name) 1224 if link is not None and len(link) > tarfile.LENGTH_LINK: 1225 # GNU longlink extended header + longlink 1226 count += 512 1227 count += self._length(link) 1228 return count 1229 1230 def _test(self, name, link=None): 1231 tarinfo = tarfile.TarInfo(name) 1232 if link: 1233 tarinfo.linkname = link 1234 tarinfo.type = tarfile.LNKTYPE 1235 1236 tar = tarfile.open(tmpname, "w") 1237 try: 1238 tar.format = tarfile.GNU_FORMAT 1239 tar.addfile(tarinfo) 1240 1241 v1 = self._calc_size(name, link) 1242 v2 = tar.offset 1243 self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") 1244 finally: 1245 tar.close() 1246 1247 tar = tarfile.open(tmpname) 1248 try: 1249 member = tar.next() 1250 self.assertIsNotNone(member, 1251 "unable to read longname member") 1252 self.assertEqual(tarinfo.name, member.name, 1253 "unable to read longname member") 1254 self.assertEqual(tarinfo.linkname, member.linkname, 1255 "unable to read longname member") 1256 finally: 1257 tar.close() 1258 1259 def test_longname_1023(self): 1260 self._test(("longnam/" * 127) + "longnam") 1261 1262 def test_longname_1024(self): 1263 self._test(("longnam/" * 127) + "longname") 1264 1265 def test_longname_1025(self): 1266 self._test(("longnam/" * 127) + "longname_") 1267 1268 def test_longlink_1023(self): 1269 self._test("name", ("longlnk/" * 127) + "longlnk") 1270 1271 def test_longlink_1024(self): 1272 self._test("name", ("longlnk/" * 127) + "longlink") 1273 1274 def test_longlink_1025(self): 1275 self._test("name", ("longlnk/" * 127) + "longlink_") 1276 1277 def test_longnamelink_1023(self): 1278 self._test(("longnam/" * 127) + "longnam", 1279 ("longlnk/" * 127) + "longlnk") 1280 1281 def test_longnamelink_1024(self): 1282 self._test(("longnam/" * 127) + "longname", 1283 ("longlnk/" * 127) + "longlink") 1284 1285 def test_longnamelink_1025(self): 1286 self._test(("longnam/" * 127) + "longname_", 1287 ("longlnk/" * 127) + "longlink_") 1288 1289 1290class HardlinkTest(unittest.TestCase): 1291 # Test the creation of LNKTYPE (hardlink) members in an archive. 1292 1293 def setUp(self): 1294 self.foo = os.path.join(TEMPDIR, "foo") 1295 self.bar = os.path.join(TEMPDIR, "bar") 1296 1297 with open(self.foo, "wb") as fobj: 1298 fobj.write("foo") 1299 1300 os.link(self.foo, self.bar) 1301 1302 self.tar = tarfile.open(tmpname, "w") 1303 self.tar.add(self.foo) 1304 1305 def tearDown(self): 1306 self.tar.close() 1307 support.unlink(self.foo) 1308 support.unlink(self.bar) 1309 1310 def test_add_twice(self): 1311 # The same name will be added as a REGTYPE every 1312 # time regardless of st_nlink. 1313 tarinfo = self.tar.gettarinfo(self.foo) 1314 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1315 "add file as regular failed") 1316 1317 def test_add_hardlink(self): 1318 tarinfo = self.tar.gettarinfo(self.bar) 1319 self.assertTrue(tarinfo.type == tarfile.LNKTYPE, 1320 "add file as hardlink failed") 1321 1322 def test_dereference_hardlink(self): 1323 self.tar.dereference = True 1324 tarinfo = self.tar.gettarinfo(self.bar) 1325 self.assertTrue(tarinfo.type == tarfile.REGTYPE, 1326 "dereferencing hardlink failed") 1327 1328 1329class PaxWriteTest(GNUWriteTest): 1330 1331 def _test(self, name, link=None): 1332 # See GNUWriteTest. 1333 tarinfo = tarfile.TarInfo(name) 1334 if link: 1335 tarinfo.linkname = link 1336 tarinfo.type = tarfile.LNKTYPE 1337 1338 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1339 try: 1340 tar.addfile(tarinfo) 1341 finally: 1342 tar.close() 1343 1344 tar = tarfile.open(tmpname) 1345 try: 1346 if link: 1347 l = tar.getmembers()[0].linkname 1348 self.assertTrue(link == l, "PAX longlink creation failed") 1349 else: 1350 n = tar.getmembers()[0].name 1351 self.assertTrue(name == n, "PAX longname creation failed") 1352 finally: 1353 tar.close() 1354 1355 def test_pax_global_header(self): 1356 pax_headers = { 1357 u"foo": u"bar", 1358 u"uid": u"0", 1359 u"mtime": u"1.23", 1360 u"test": u"\xe4\xf6\xfc", 1361 u"\xe4\xf6\xfc": u"test"} 1362 1363 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1364 pax_headers=pax_headers) 1365 try: 1366 tar.addfile(tarfile.TarInfo("test")) 1367 finally: 1368 tar.close() 1369 1370 # Test if the global header was written correctly. 1371 tar = tarfile.open(tmpname, encoding="iso8859-1") 1372 try: 1373 self.assertEqual(tar.pax_headers, pax_headers) 1374 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1375 1376 # Test if all the fields are unicode. 1377 for key, val in tar.pax_headers.iteritems(): 1378 self.assertTrue(type(key) is unicode) 1379 self.assertTrue(type(val) is unicode) 1380 if key in tarfile.PAX_NUMBER_FIELDS: 1381 try: 1382 tarfile.PAX_NUMBER_FIELDS[key](val) 1383 except (TypeError, ValueError): 1384 self.fail("unable to convert pax header field") 1385 finally: 1386 tar.close() 1387 1388 def test_pax_extended_header(self): 1389 # The fields from the pax header have priority over the 1390 # TarInfo. 1391 pax_headers = {u"path": u"foo", u"uid": u"123"} 1392 1393 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") 1394 try: 1395 t = tarfile.TarInfo() 1396 t.name = u"\xe4\xf6\xfc" # non-ASCII 1397 t.uid = 8**8 # too large 1398 t.pax_headers = pax_headers 1399 tar.addfile(t) 1400 finally: 1401 tar.close() 1402 1403 tar = tarfile.open(tmpname, encoding="iso8859-1") 1404 try: 1405 t = tar.getmembers()[0] 1406 self.assertEqual(t.pax_headers, pax_headers) 1407 self.assertEqual(t.name, "foo") 1408 self.assertEqual(t.uid, 123) 1409 finally: 1410 tar.close() 1411 1412 1413class UstarUnicodeTest(unittest.TestCase): 1414 # All *UnicodeTests FIXME 1415 1416 format = tarfile.USTAR_FORMAT 1417 1418 def test_iso8859_1_filename(self): 1419 self._test_unicode_filename("iso8859-1") 1420 1421 def test_utf7_filename(self): 1422 self._test_unicode_filename("utf7") 1423 1424 def test_utf8_filename(self): 1425 self._test_unicode_filename("utf8") 1426 1427 def _test_unicode_filename(self, encoding): 1428 tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") 1429 try: 1430 name = u"\xe4\xf6\xfc" 1431 tar.addfile(tarfile.TarInfo(name)) 1432 finally: 1433 tar.close() 1434 1435 tar = tarfile.open(tmpname, encoding=encoding) 1436 try: 1437 self.assertTrue(type(tar.getnames()[0]) is not unicode) 1438 self.assertEqual(tar.getmembers()[0].name, name.encode(encoding)) 1439 finally: 1440 tar.close() 1441 1442 def test_unicode_filename_error(self): 1443 tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") 1444 try: 1445 tarinfo = tarfile.TarInfo() 1446 1447 tarinfo.name = "\xe4\xf6\xfc" 1448 if self.format == tarfile.PAX_FORMAT: 1449 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1450 else: 1451 tar.addfile(tarinfo) 1452 1453 tarinfo.name = u"\xe4\xf6\xfc" 1454 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1455 1456 tarinfo.name = "foo" 1457 tarinfo.uname = u"\xe4\xf6\xfc" 1458 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 1459 finally: 1460 tar.close() 1461 1462 def test_unicode_argument(self): 1463 tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") 1464 try: 1465 for t in tar: 1466 self.assertTrue(type(t.name) is str) 1467 self.assertTrue(type(t.linkname) is str) 1468 self.assertTrue(type(t.uname) is str) 1469 self.assertTrue(type(t.gname) is str) 1470 finally: 1471 tar.close() 1472 1473 def test_uname_unicode(self): 1474 for name in (u"\xe4\xf6\xfc", "\xe4\xf6\xfc"): 1475 t = tarfile.TarInfo("foo") 1476 t.uname = name 1477 t.gname = name 1478 1479 fobj = StringIO.StringIO() 1480 tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1") 1481 try: 1482 tar.addfile(t) 1483 finally: 1484 tar.close() 1485 fobj.seek(0) 1486 1487 tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1") 1488 t = tar.getmember("foo") 1489 self.assertEqual(t.uname, "\xe4\xf6\xfc") 1490 self.assertEqual(t.gname, "\xe4\xf6\xfc") 1491 1492 1493class GNUUnicodeTest(UstarUnicodeTest): 1494 1495 format = tarfile.GNU_FORMAT 1496 1497 1498class PaxUnicodeTest(UstarUnicodeTest): 1499 1500 format = tarfile.PAX_FORMAT 1501 1502 def _create_unicode_name(self, name): 1503 tar = tarfile.open(tmpname, "w", format=self.format) 1504 t = tarfile.TarInfo() 1505 t.pax_headers["path"] = name 1506 tar.addfile(t) 1507 tar.close() 1508 1509 def test_error_handlers(self): 1510 # Test if the unicode error handlers work correctly for characters 1511 # that cannot be expressed in a given encoding. 1512 self._create_unicode_name(u"\xe4\xf6\xfc") 1513 1514 for handler, name in (("utf-8", u"\xe4\xf6\xfc".encode("utf8")), 1515 ("replace", "???"), ("ignore", "")): 1516 tar = tarfile.open(tmpname, format=self.format, encoding="ascii", 1517 errors=handler) 1518 self.assertEqual(tar.getnames()[0], name) 1519 1520 self.assertRaises(UnicodeError, tarfile.open, tmpname, 1521 encoding="ascii", errors="strict") 1522 1523 def test_error_handler_utf8(self): 1524 # Create a pathname that has one component representable using 1525 # iso8859-1 and the other only in iso8859-15. 1526 self._create_unicode_name(u"\xe4\xf6\xfc/\u20ac") 1527 1528 tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1", 1529 errors="utf-8") 1530 self.assertEqual(tar.getnames()[0], "\xe4\xf6\xfc/" + u"\u20ac".encode("utf8")) 1531 1532 1533class AppendTest(unittest.TestCase): 1534 # Test append mode (cp. patch #1652681). 1535 1536 def setUp(self): 1537 self.tarname = tmpname 1538 if os.path.exists(self.tarname): 1539 os.remove(self.tarname) 1540 1541 def _add_testfile(self, fileobj=None): 1542 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 1543 tar.addfile(tarfile.TarInfo("bar")) 1544 1545 def _create_testtar(self, mode="w:"): 1546 with tarfile.open(tarname, encoding="iso8859-1") as src: 1547 t = src.getmember("ustar/regtype") 1548 t.name = "foo" 1549 f = src.extractfile(t) 1550 with tarfile.open(self.tarname, mode) as tar: 1551 tar.addfile(t, f) 1552 1553 def _test(self, names=["bar"], fileobj=None): 1554 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 1555 self.assertEqual(tar.getnames(), names) 1556 1557 def test_non_existing(self): 1558 self._add_testfile() 1559 self._test() 1560 1561 def test_empty(self): 1562 tarfile.open(self.tarname, "w:").close() 1563 self._add_testfile() 1564 self._test() 1565 1566 def test_empty_fileobj(self): 1567 fobj = StringIO.StringIO("\0" * 1024) 1568 self._add_testfile(fobj) 1569 fobj.seek(0) 1570 self._test(fileobj=fobj) 1571 1572 def test_fileobj(self): 1573 self._create_testtar() 1574 with open(self.tarname) as fobj: 1575 data = fobj.read() 1576 fobj = StringIO.StringIO(data) 1577 self._add_testfile(fobj) 1578 fobj.seek(0) 1579 self._test(names=["foo", "bar"], fileobj=fobj) 1580 1581 def test_existing(self): 1582 self._create_testtar() 1583 self._add_testfile() 1584 self._test(names=["foo", "bar"]) 1585 1586 @unittest.skipUnless(gzip, 'requires gzip') 1587 def test_append_gz(self): 1588 self._create_testtar("w:gz") 1589 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1590 1591 @unittest.skipUnless(bz2, 'requires bz2') 1592 def test_append_bz2(self): 1593 self._create_testtar("w:bz2") 1594 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 1595 1596 # Append mode is supposed to fail if the tarfile to append to 1597 # does not end with a zero block. 1598 def _test_error(self, data): 1599 with open(self.tarname, "wb") as fobj: 1600 fobj.write(data) 1601 self.assertRaises(tarfile.ReadError, self._add_testfile) 1602 1603 def test_null(self): 1604 self._test_error("") 1605 1606 def test_incomplete(self): 1607 self._test_error("\0" * 13) 1608 1609 def test_premature_eof(self): 1610 data = tarfile.TarInfo("foo").tobuf() 1611 self._test_error(data) 1612 1613 def test_trailing_garbage(self): 1614 data = tarfile.TarInfo("foo").tobuf() 1615 self._test_error(data + "\0" * 13) 1616 1617 def test_invalid(self): 1618 self._test_error("a" * 512) 1619 1620 1621class LimitsTest(unittest.TestCase): 1622 1623 def test_ustar_limits(self): 1624 # 100 char name 1625 tarinfo = tarfile.TarInfo("0123456789" * 10) 1626 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1627 1628 # 101 char name that cannot be stored 1629 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 1630 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1631 1632 # 256 char name with a slash at pos 156 1633 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 1634 tarinfo.tobuf(tarfile.USTAR_FORMAT) 1635 1636 # 256 char name that cannot be stored 1637 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 1638 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1639 1640 # 512 char name 1641 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1642 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1643 1644 # 512 char linkname 1645 tarinfo = tarfile.TarInfo("longlink") 1646 tarinfo.linkname = "123/" * 126 + "longname" 1647 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1648 1649 # uid > 8 digits 1650 tarinfo = tarfile.TarInfo("name") 1651 tarinfo.uid = 010000000 1652 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 1653 1654 def test_gnu_limits(self): 1655 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1656 tarinfo.tobuf(tarfile.GNU_FORMAT) 1657 1658 tarinfo = tarfile.TarInfo("longlink") 1659 tarinfo.linkname = "123/" * 126 + "longname" 1660 tarinfo.tobuf(tarfile.GNU_FORMAT) 1661 1662 # uid >= 256 ** 7 1663 tarinfo = tarfile.TarInfo("name") 1664 tarinfo.uid = 04000000000000000000L 1665 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 1666 1667 def test_pax_limits(self): 1668 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 1669 tarinfo.tobuf(tarfile.PAX_FORMAT) 1670 1671 tarinfo = tarfile.TarInfo("longlink") 1672 tarinfo.linkname = "123/" * 126 + "longname" 1673 tarinfo.tobuf(tarfile.PAX_FORMAT) 1674 1675 tarinfo = tarfile.TarInfo("name") 1676 tarinfo.uid = 04000000000000000000L 1677 tarinfo.tobuf(tarfile.PAX_FORMAT) 1678 1679 1680class MiscTest(unittest.TestCase): 1681 1682 def test_read_number_fields(self): 1683 # Issue 24514: Test if empty number fields are converted to zero. 1684 self.assertEqual(tarfile.nti("\0"), 0) 1685 self.assertEqual(tarfile.nti(" \0"), 0) 1686 1687 1688class ContextManagerTest(unittest.TestCase): 1689 1690 def test_basic(self): 1691 with tarfile.open(tarname) as tar: 1692 self.assertFalse(tar.closed, "closed inside runtime context") 1693 self.assertTrue(tar.closed, "context manager failed") 1694 1695 def test_closed(self): 1696 # The __enter__() method is supposed to raise IOError 1697 # if the TarFile object is already closed. 1698 tar = tarfile.open(tarname) 1699 tar.close() 1700 with self.assertRaises(IOError): 1701 with tar: 1702 pass 1703 1704 def test_exception(self): 1705 # Test if the IOError exception is passed through properly. 1706 with self.assertRaises(Exception) as exc: 1707 with tarfile.open(tarname) as tar: 1708 raise IOError 1709 self.assertIsInstance(exc.exception, IOError, 1710 "wrong exception raised in context manager") 1711 self.assertTrue(tar.closed, "context manager failed") 1712 1713 def test_no_eof(self): 1714 # __exit__() must not write end-of-archive blocks if an 1715 # exception was raised. 1716 try: 1717 with tarfile.open(tmpname, "w") as tar: 1718 raise Exception 1719 except: 1720 pass 1721 self.assertEqual(os.path.getsize(tmpname), 0, 1722 "context manager wrote an end-of-archive block") 1723 self.assertTrue(tar.closed, "context manager failed") 1724 1725 def test_eof(self): 1726 # __exit__() must write end-of-archive blocks, i.e. call 1727 # TarFile.close() if there was no error. 1728 with tarfile.open(tmpname, "w"): 1729 pass 1730 self.assertNotEqual(os.path.getsize(tmpname), 0, 1731 "context manager wrote no end-of-archive block") 1732 1733 def test_fileobj(self): 1734 # Test that __exit__() did not close the external file 1735 # object. 1736 with open(tmpname, "wb") as fobj: 1737 try: 1738 with tarfile.open(fileobj=fobj, mode="w") as tar: 1739 raise Exception 1740 except: 1741 pass 1742 self.assertFalse(fobj.closed, "external file object was closed") 1743 self.assertTrue(tar.closed, "context manager failed") 1744 1745 1746class LinkEmulationTest(ReadTest): 1747 1748 # Test for issue #8741 regression. On platforms that do not support 1749 # symbolic or hard links tarfile tries to extract these types of members as 1750 # the regular files they point to. 1751 def _test_link_extraction(self, name): 1752 self.tar.extract(name, TEMPDIR) 1753 data = open(os.path.join(TEMPDIR, name), "rb").read() 1754 self.assertEqual(md5sum(data), md5_regtype) 1755 1756 def test_hardlink_extraction1(self): 1757 self._test_link_extraction("ustar/lnktype") 1758 1759 def test_hardlink_extraction2(self): 1760 self._test_link_extraction("./ustar/linktest2/lnktype") 1761 1762 def test_symlink_extraction1(self): 1763 self._test_link_extraction("ustar/symtype") 1764 1765 def test_symlink_extraction2(self): 1766 self._test_link_extraction("./ustar/linktest2/symtype") 1767 1768 1769class GzipMiscReadTest(MiscReadTest): 1770 tarname = gzipname 1771 mode = "r:gz" 1772 taropen = tarfile.TarFile.gzopen 1773class GzipUstarReadTest(UstarReadTest): 1774 tarname = gzipname 1775 mode = "r:gz" 1776class GzipStreamReadTest(StreamReadTest): 1777 tarname = gzipname 1778 mode = "r|gz" 1779class GzipWriteTest(WriteTest): 1780 mode = "w:gz" 1781class GzipStreamWriteTest(StreamWriteTest): 1782 mode = "w|gz" 1783 1784 1785class Bz2MiscReadTest(MiscReadTest): 1786 tarname = bz2name 1787 mode = "r:bz2" 1788 taropen = tarfile.TarFile.bz2open 1789class Bz2UstarReadTest(UstarReadTest): 1790 tarname = bz2name 1791 mode = "r:bz2" 1792class Bz2StreamReadTest(StreamReadTest): 1793 tarname = bz2name 1794 mode = "r|bz2" 1795class Bz2WriteTest(WriteTest): 1796 mode = "w:bz2" 1797class Bz2StreamWriteTest(StreamWriteTest): 1798 mode = "w|bz2" 1799 1800class Bz2PartialReadTest(unittest.TestCase): 1801 # Issue5068: The _BZ2Proxy.read() method loops forever 1802 # on an empty or partial bzipped file. 1803 1804 def _test_partial_input(self, mode): 1805 class MyStringIO(StringIO.StringIO): 1806 hit_eof = False 1807 def read(self, n): 1808 if self.hit_eof: 1809 raise AssertionError("infinite loop detected in tarfile.open()") 1810 self.hit_eof = self.pos == self.len 1811 return StringIO.StringIO.read(self, n) 1812 def seek(self, *args): 1813 self.hit_eof = False 1814 return StringIO.StringIO.seek(self, *args) 1815 1816 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 1817 for x in range(len(data) + 1): 1818 try: 1819 tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode) 1820 except tarfile.ReadError: 1821 pass # we have no interest in ReadErrors 1822 1823 def test_partial_input(self): 1824 self._test_partial_input("r") 1825 1826 def test_partial_input_bz2(self): 1827 self._test_partial_input("r:bz2") 1828 1829 1830def test_main(): 1831 support.unlink(TEMPDIR) 1832 os.makedirs(TEMPDIR) 1833 1834 tests = [ 1835 UstarReadTest, 1836 MiscReadTest, 1837 StreamReadTest, 1838 DetectReadTest, 1839 MemberReadTest, 1840 GNUReadTest, 1841 PaxReadTest, 1842 ListTest, 1843 WriteTest, 1844 StreamWriteTest, 1845 GNUWriteTest, 1846 PaxWriteTest, 1847 UstarUnicodeTest, 1848 GNUUnicodeTest, 1849 PaxUnicodeTest, 1850 AppendTest, 1851 LimitsTest, 1852 MiscTest, 1853 ContextManagerTest, 1854 ] 1855 1856 if hasattr(os, "link"): 1857 tests.append(HardlinkTest) 1858 else: 1859 tests.append(LinkEmulationTest) 1860 1861 with open(tarname, "rb") as fobj: 1862 data = fobj.read() 1863 1864 if gzip: 1865 # Create testtar.tar.gz and add gzip-specific tests. 1866 support.unlink(gzipname) 1867 with gzip.open(gzipname, "wb") as tar: 1868 tar.write(data) 1869 1870 tests += [ 1871 GzipMiscReadTest, 1872 GzipUstarReadTest, 1873 GzipStreamReadTest, 1874 GzipListTest, 1875 GzipWriteTest, 1876 GzipStreamWriteTest, 1877 ] 1878 1879 if bz2: 1880 # Create testtar.tar.bz2 and add bz2-specific tests. 1881 support.unlink(bz2name) 1882 tar = bz2.BZ2File(bz2name, "wb") 1883 try: 1884 tar.write(data) 1885 finally: 1886 tar.close() 1887 1888 tests += [ 1889 Bz2MiscReadTest, 1890 Bz2UstarReadTest, 1891 Bz2StreamReadTest, 1892 Bz2ListTest, 1893 Bz2WriteTest, 1894 Bz2StreamWriteTest, 1895 Bz2PartialReadTest, 1896 ] 1897 1898 try: 1899 test_support.run_unittest(*tests) 1900 finally: 1901 if os.path.exists(TEMPDIR): 1902 shutil.rmtree(TEMPDIR) 1903 1904if __name__ == "__main__": 1905 test_main() 1906