• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper, requires_hashdigest
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25try:
26    import lzma
27except ImportError:
28    lzma = None
29
30def sha256sum(data):
31    return sha256(data).hexdigest()
32
33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
34tarextdir = TEMPDIR + '-extract-test'
35tarname = support.findfile("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
39tmpname = os.path.join(TEMPDIR, "tmp.tar")
40dotlessname = os.path.join(TEMPDIR, "testtar")
41
42sha256_regtype = (
43    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
44)
45sha256_sparse = (
46    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
47)
48
49
50class TarTest:
51    tarname = tarname
52    suffix = ''
53    open = io.FileIO
54    taropen = tarfile.TarFile.taropen
55
56    @property
57    def mode(self):
58        return self.prefix + self.suffix
59
60@support.requires_gzip
61class GzipTest:
62    tarname = gzipname
63    suffix = 'gz'
64    open = gzip.GzipFile if gzip else None
65    taropen = tarfile.TarFile.gzopen
66
67@support.requires_bz2
68class Bz2Test:
69    tarname = bz2name
70    suffix = 'bz2'
71    open = bz2.BZ2File if bz2 else None
72    taropen = tarfile.TarFile.bz2open
73
74@support.requires_lzma
75class LzmaTest:
76    tarname = xzname
77    suffix = 'xz'
78    open = lzma.LZMAFile if lzma else None
79    taropen = tarfile.TarFile.xzopen
80
81
82class ReadTest(TarTest):
83
84    prefix = "r:"
85
86    def setUp(self):
87        self.tar = tarfile.open(self.tarname, mode=self.mode,
88                                encoding="iso8859-1")
89
90    def tearDown(self):
91        self.tar.close()
92
93
94class UstarReadTest(ReadTest, unittest.TestCase):
95
96    def test_fileobj_regular_file(self):
97        tarinfo = self.tar.getmember("ustar/regtype")
98        with self.tar.extractfile(tarinfo) as fobj:
99            data = fobj.read()
100            self.assertEqual(len(data), tarinfo.size,
101                    "regular file extraction failed")
102            self.assertEqual(sha256sum(data), sha256_regtype,
103                    "regular file extraction failed")
104
105    def test_fileobj_readlines(self):
106        self.tar.extract("ustar/regtype", TEMPDIR)
107        tarinfo = self.tar.getmember("ustar/regtype")
108        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
109            lines1 = fobj1.readlines()
110
111        with self.tar.extractfile(tarinfo) as fobj:
112            fobj2 = io.TextIOWrapper(fobj)
113            lines2 = fobj2.readlines()
114            self.assertEqual(lines1, lines2,
115                    "fileobj.readlines() failed")
116            self.assertEqual(len(lines2), 114,
117                    "fileobj.readlines() failed")
118            self.assertEqual(lines2[83],
119                    "I will gladly admit that Python is not the fastest "
120                    "running scripting language.\n",
121                    "fileobj.readlines() failed")
122
123    def test_fileobj_iter(self):
124        self.tar.extract("ustar/regtype", TEMPDIR)
125        tarinfo = self.tar.getmember("ustar/regtype")
126        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
127            lines1 = fobj1.readlines()
128        with self.tar.extractfile(tarinfo) as fobj2:
129            lines2 = list(io.TextIOWrapper(fobj2))
130            self.assertEqual(lines1, lines2,
131                    "fileobj.__iter__() failed")
132
133    def test_fileobj_seek(self):
134        self.tar.extract("ustar/regtype", TEMPDIR)
135        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
136            data = fobj.read()
137
138        tarinfo = self.tar.getmember("ustar/regtype")
139        with self.tar.extractfile(tarinfo) as fobj:
140            text = fobj.read()
141            fobj.seek(0)
142            self.assertEqual(0, fobj.tell(),
143                         "seek() to file's start failed")
144            fobj.seek(2048, 0)
145            self.assertEqual(2048, fobj.tell(),
146                         "seek() to absolute position failed")
147            fobj.seek(-1024, 1)
148            self.assertEqual(1024, fobj.tell(),
149                         "seek() to negative relative position failed")
150            fobj.seek(1024, 1)
151            self.assertEqual(2048, fobj.tell(),
152                         "seek() to positive relative position failed")
153            s = fobj.read(10)
154            self.assertEqual(s, data[2048:2058],
155                         "read() after seek failed")
156            fobj.seek(0, 2)
157            self.assertEqual(tarinfo.size, fobj.tell(),
158                         "seek() to file's end failed")
159            self.assertEqual(fobj.read(), b"",
160                         "read() at file's end did not return empty string")
161            fobj.seek(-tarinfo.size, 2)
162            self.assertEqual(0, fobj.tell(),
163                         "relative seek() to file's end failed")
164            fobj.seek(512)
165            s1 = fobj.readlines()
166            fobj.seek(512)
167            s2 = fobj.readlines()
168            self.assertEqual(s1, s2,
169                         "readlines() after seek failed")
170            fobj.seek(0)
171            self.assertEqual(len(fobj.readline()), fobj.tell(),
172                         "tell() after readline() failed")
173            fobj.seek(512)
174            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
175                         "tell() after seek() and readline() failed")
176            fobj.seek(0)
177            line = fobj.readline()
178            self.assertEqual(fobj.read(), data[len(line):],
179                         "read() after readline() failed")
180
181    def test_fileobj_text(self):
182        with self.tar.extractfile("ustar/regtype") as fobj:
183            fobj = io.TextIOWrapper(fobj)
184            data = fobj.read().encode("iso8859-1")
185            self.assertEqual(sha256sum(data), sha256_regtype)
186            try:
187                fobj.seek(100)
188            except AttributeError:
189                # Issue #13815: seek() complained about a missing
190                # flush() method.
191                self.fail("seeking failed in text mode")
192
193    # Test if symbolic and hard links are resolved by extractfile().  The
194    # test link members each point to a regular member whose data is
195    # supposed to be exported.
196    def _test_fileobj_link(self, lnktype, regtype):
197        with self.tar.extractfile(lnktype) as a, \
198             self.tar.extractfile(regtype) as b:
199            self.assertEqual(a.name, b.name)
200
201    def test_fileobj_link1(self):
202        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
203
204    def test_fileobj_link2(self):
205        self._test_fileobj_link("./ustar/linktest2/lnktype",
206                                "ustar/linktest1/regtype")
207
208    def test_fileobj_symlink1(self):
209        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
210
211    def test_fileobj_symlink2(self):
212        self._test_fileobj_link("./ustar/linktest2/symtype",
213                                "ustar/linktest1/regtype")
214
215    def test_issue14160(self):
216        self._test_fileobj_link("symtype2", "ustar/regtype")
217
218class GzipUstarReadTest(GzipTest, UstarReadTest):
219    pass
220
221class Bz2UstarReadTest(Bz2Test, UstarReadTest):
222    pass
223
224class LzmaUstarReadTest(LzmaTest, UstarReadTest):
225    pass
226
227
228class ListTest(ReadTest, unittest.TestCase):
229
230    # Override setUp to use default encoding (UTF-8)
231    def setUp(self):
232        self.tar = tarfile.open(self.tarname, mode=self.mode)
233
234    def test_list(self):
235        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
236        with support.swap_attr(sys, 'stdout', tio):
237            self.tar.list(verbose=False)
238        out = tio.detach().getvalue()
239        self.assertIn(b'ustar/conttype', out)
240        self.assertIn(b'ustar/regtype', out)
241        self.assertIn(b'ustar/lnktype', out)
242        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
243        self.assertIn(b'./ustar/linktest2/symtype', out)
244        self.assertIn(b'./ustar/linktest2/lnktype', out)
245        # Make sure it puts trailing slash for directory
246        self.assertIn(b'ustar/dirtype/', out)
247        self.assertIn(b'ustar/dirtype-with-size/', out)
248        # Make sure it is able to print unencodable characters
249        def conv(b):
250            s = b.decode(self.tar.encoding, 'surrogateescape')
251            return s.encode('ascii', 'backslashreplace')
252        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
254                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
255        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
256                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
257        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
258        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
259        # Make sure it prints files separated by one newline without any
260        # 'ls -l'-like accessories if verbose flag is not being used
261        # ...
262        # ustar/conttype
263        # ustar/regtype
264        # ...
265        self.assertRegex(out, br'ustar/conttype ?\r?\n'
266                              br'ustar/regtype ?\r?\n')
267        # Make sure it does not print the source of link without verbose flag
268        self.assertNotIn(b'link to', out)
269        self.assertNotIn(b'->', out)
270
271    def test_list_verbose(self):
272        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
273        with support.swap_attr(sys, 'stdout', tio):
274            self.tar.list(verbose=True)
275        out = tio.detach().getvalue()
276        # Make sure it prints files separated by one newline with 'ls -l'-like
277        # accessories if verbose flag is being used
278        # ...
279        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
280        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
281        # ...
282        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
283                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
284                               br'ustar/\w+type ?\r?\n') * 2)
285        # Make sure it prints the source of link with verbose flag
286        self.assertIn(b'ustar/symtype -> regtype', out)
287        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
288        self.assertIn(b'./ustar/linktest2/lnktype link to '
289                      b'./ustar/linktest1/regtype', out)
290        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
291                      (b'/123' * 125) + b'/longname', out)
292        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
293                      (b'/123' * 125) + b'/longname', out)
294
295    def test_list_members(self):
296        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
297        def members(tar):
298            for tarinfo in tar.getmembers():
299                if 'reg' in tarinfo.name:
300                    yield tarinfo
301        with support.swap_attr(sys, 'stdout', tio):
302            self.tar.list(verbose=False, members=members(self.tar))
303        out = tio.detach().getvalue()
304        self.assertIn(b'ustar/regtype', out)
305        self.assertNotIn(b'ustar/conttype', out)
306
307
308class GzipListTest(GzipTest, ListTest):
309    pass
310
311
312class Bz2ListTest(Bz2Test, ListTest):
313    pass
314
315
316class LzmaListTest(LzmaTest, ListTest):
317    pass
318
319
320class CommonReadTest(ReadTest):
321
322    def test_empty_tarfile(self):
323        # Test for issue6123: Allow opening empty archives.
324        # This test checks if tarfile.open() is able to open an empty tar
325        # archive successfully. Note that an empty tar archive is not the
326        # same as an empty file!
327        with tarfile.open(tmpname, self.mode.replace("r", "w")):
328            pass
329        try:
330            tar = tarfile.open(tmpname, self.mode)
331            tar.getnames()
332        except tarfile.ReadError:
333            self.fail("tarfile.open() failed on empty archive")
334        else:
335            self.assertListEqual(tar.getmembers(), [])
336        finally:
337            tar.close()
338
339    def test_non_existent_tarfile(self):
340        # Test for issue11513: prevent non-existent gzipped tarfiles raising
341        # multiple exceptions.
342        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
343            tarfile.open("xxx", self.mode)
344
345    def test_null_tarfile(self):
346        # Test for issue6123: Allow opening empty archives.
347        # This test guarantees that tarfile.open() does not treat an empty
348        # file as an empty tar archive.
349        with open(tmpname, "wb"):
350            pass
351        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
352        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
353
354    def test_ignore_zeros(self):
355        # Test TarFile's ignore_zeros option.
356        # generate 512 pseudorandom bytes
357        data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
358        for char in (b'\0', b'a'):
359            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
360            # are ignored correctly.
361            with self.open(tmpname, "w") as fobj:
362                fobj.write(char * 1024)
363                tarinfo = tarfile.TarInfo("foo")
364                tarinfo.size = len(data)
365                fobj.write(tarinfo.tobuf())
366                fobj.write(data)
367
368            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
369            try:
370                self.assertListEqual(tar.getnames(), ["foo"],
371                    "ignore_zeros=True should have skipped the %r-blocks" %
372                    char)
373            finally:
374                tar.close()
375
376    def test_premature_end_of_archive(self):
377        for size in (512, 600, 1024, 1200):
378            with tarfile.open(tmpname, "w:") as tar:
379                t = tarfile.TarInfo("foo")
380                t.size = 1024
381                tar.addfile(t, io.BytesIO(b"a" * 1024))
382
383            with open(tmpname, "r+b") as fobj:
384                fobj.truncate(size)
385
386            with tarfile.open(tmpname) as tar:
387                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
388                    for t in tar:
389                        pass
390
391            with tarfile.open(tmpname) as tar:
392                t = tar.next()
393
394                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
395                    tar.extract(t, TEMPDIR)
396
397                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
398                    tar.extractfile(t).read()
399
400class MiscReadTestBase(CommonReadTest):
401    def requires_name_attribute(self):
402        pass
403
404    def test_no_name_argument(self):
405        self.requires_name_attribute()
406        with open(self.tarname, "rb") as fobj:
407            self.assertIsInstance(fobj.name, str)
408            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
409                self.assertIsInstance(tar.name, str)
410                self.assertEqual(tar.name, os.path.abspath(fobj.name))
411
412    def test_no_name_attribute(self):
413        with open(self.tarname, "rb") as fobj:
414            data = fobj.read()
415        fobj = io.BytesIO(data)
416        self.assertRaises(AttributeError, getattr, fobj, "name")
417        tar = tarfile.open(fileobj=fobj, mode=self.mode)
418        self.assertIsNone(tar.name)
419
420    def test_empty_name_attribute(self):
421        with open(self.tarname, "rb") as fobj:
422            data = fobj.read()
423        fobj = io.BytesIO(data)
424        fobj.name = ""
425        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
426            self.assertIsNone(tar.name)
427
428    def test_int_name_attribute(self):
429        # Issue 21044: tarfile.open() should handle fileobj with an integer
430        # 'name' attribute.
431        fd = os.open(self.tarname, os.O_RDONLY)
432        with open(fd, 'rb') as fobj:
433            self.assertIsInstance(fobj.name, int)
434            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
435                self.assertIsNone(tar.name)
436
437    def test_bytes_name_attribute(self):
438        self.requires_name_attribute()
439        tarname = os.fsencode(self.tarname)
440        with open(tarname, 'rb') as fobj:
441            self.assertIsInstance(fobj.name, bytes)
442            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
443                self.assertIsInstance(tar.name, bytes)
444                self.assertEqual(tar.name, os.path.abspath(fobj.name))
445
446    def test_pathlike_name(self):
447        tarname = pathlib.Path(self.tarname)
448        with tarfile.open(tarname, mode=self.mode) as tar:
449            self.assertIsInstance(tar.name, str)
450            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
451        with self.taropen(tarname) as tar:
452            self.assertIsInstance(tar.name, str)
453            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
454        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
455            self.assertIsInstance(tar.name, str)
456            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
457        if self.suffix == '':
458            with tarfile.TarFile(tarname, mode='r') as tar:
459                self.assertIsInstance(tar.name, str)
460                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
461
462    def test_illegal_mode_arg(self):
463        with open(tmpname, 'wb'):
464            pass
465        with self.assertRaisesRegex(ValueError, 'mode must be '):
466            tar = self.taropen(tmpname, 'q')
467        with self.assertRaisesRegex(ValueError, 'mode must be '):
468            tar = self.taropen(tmpname, 'rw')
469        with self.assertRaisesRegex(ValueError, 'mode must be '):
470            tar = self.taropen(tmpname, '')
471
472    def test_fileobj_with_offset(self):
473        # Skip the first member and store values from the second member
474        # of the testtar.
475        tar = tarfile.open(self.tarname, mode=self.mode)
476        try:
477            tar.next()
478            t = tar.next()
479            name = t.name
480            offset = t.offset
481            with tar.extractfile(t) as f:
482                data = f.read()
483        finally:
484            tar.close()
485
486        # Open the testtar and seek to the offset of the second member.
487        with self.open(self.tarname) as fobj:
488            fobj.seek(offset)
489
490            # Test if the tarfile starts with the second member.
491            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
492                t = tar.next()
493                self.assertEqual(t.name, name)
494                # Read to the end of fileobj and test if seeking back to the
495                # beginning works.
496                tar.getmembers()
497                self.assertEqual(tar.extractfile(t).read(), data,
498                        "seek back did not work")
499
500    def test_fail_comp(self):
501        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
502        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
503        with open(tarname, "rb") as fobj:
504            self.assertRaises(tarfile.ReadError, tarfile.open,
505                              fileobj=fobj, mode=self.mode)
506
507    def test_v7_dirtype(self):
508        # Test old style dirtype member (bug #1336623):
509        # Old V7 tars create directory members using an AREGTYPE
510        # header with a "/" appended to the filename field.
511        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
512        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
513                "v7 dirtype failed")
514
515    def test_xstar_type(self):
516        # The xstar format stores extra atime and ctime fields inside the
517        # space reserved for the prefix field. The prefix field must be
518        # ignored in this case, otherwise it will mess up the name.
519        try:
520            self.tar.getmember("misc/regtype-xstar")
521        except KeyError:
522            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
523
524    def test_check_members(self):
525        for tarinfo in self.tar:
526            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
527                    "wrong mtime for %s" % tarinfo.name)
528            if not tarinfo.name.startswith("ustar/"):
529                continue
530            self.assertEqual(tarinfo.uname, "tarfile",
531                    "wrong uname for %s" % tarinfo.name)
532
533    def test_find_members(self):
534        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
535                "could not find all members")
536
537    @unittest.skipUnless(hasattr(os, "link"),
538                         "Missing hardlink implementation")
539    @support.skip_unless_symlink
540    def test_extract_hardlink(self):
541        # Test hardlink extraction (e.g. bug #857297).
542        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
543            tar.extract("ustar/regtype", TEMPDIR)
544            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
545
546            tar.extract("ustar/lnktype", TEMPDIR)
547            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
548            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
549                data = f.read()
550            self.assertEqual(sha256sum(data), sha256_regtype)
551
552            tar.extract("ustar/symtype", TEMPDIR)
553            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
554            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
555                data = f.read()
556            self.assertEqual(sha256sum(data), sha256_regtype)
557
558    def test_extractall(self):
559        # Test if extractall() correctly restores directory permissions
560        # and times (see issue1735).
561        tar = tarfile.open(tarname, encoding="iso8859-1")
562        DIR = os.path.join(TEMPDIR, "extractall")
563        os.mkdir(DIR)
564        try:
565            directories = [t for t in tar if t.isdir()]
566            tar.extractall(DIR, directories)
567            for tarinfo in directories:
568                path = os.path.join(DIR, tarinfo.name)
569                if sys.platform != "win32":
570                    # Win32 has no support for fine grained permissions.
571                    self.assertEqual(tarinfo.mode & 0o777,
572                                     os.stat(path).st_mode & 0o777)
573                def format_mtime(mtime):
574                    if isinstance(mtime, float):
575                        return "{} ({})".format(mtime, mtime.hex())
576                    else:
577                        return "{!r} (int)".format(mtime)
578                file_mtime = os.path.getmtime(path)
579                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
580                    format_mtime(tarinfo.mtime),
581                    format_mtime(file_mtime),
582                    path)
583                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
584        finally:
585            tar.close()
586            support.rmtree(DIR)
587
588    def test_extract_directory(self):
589        dirtype = "ustar/dirtype"
590        DIR = os.path.join(TEMPDIR, "extractdir")
591        os.mkdir(DIR)
592        try:
593            with tarfile.open(tarname, encoding="iso8859-1") as tar:
594                tarinfo = tar.getmember(dirtype)
595                tar.extract(tarinfo, path=DIR)
596                extracted = os.path.join(DIR, dirtype)
597                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
598                if sys.platform != "win32":
599                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
600        finally:
601            support.rmtree(DIR)
602
603    def test_extractall_pathlike_name(self):
604        DIR = pathlib.Path(TEMPDIR) / "extractall"
605        with support.temp_dir(DIR), \
606             tarfile.open(tarname, encoding="iso8859-1") as tar:
607            directories = [t for t in tar if t.isdir()]
608            tar.extractall(DIR, directories)
609            for tarinfo in directories:
610                path = DIR / tarinfo.name
611                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
612
613    def test_extract_pathlike_name(self):
614        dirtype = "ustar/dirtype"
615        DIR = pathlib.Path(TEMPDIR) / "extractall"
616        with support.temp_dir(DIR), \
617             tarfile.open(tarname, encoding="iso8859-1") as tar:
618            tarinfo = tar.getmember(dirtype)
619            tar.extract(tarinfo, path=DIR)
620            extracted = DIR / dirtype
621            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
622
623    def test_init_close_fobj(self):
624        # Issue #7341: Close the internal file object in the TarFile
625        # constructor in case of an error. For the test we rely on
626        # the fact that opening an empty file raises a ReadError.
627        empty = os.path.join(TEMPDIR, "empty")
628        with open(empty, "wb") as fobj:
629            fobj.write(b"")
630
631        try:
632            tar = object.__new__(tarfile.TarFile)
633            try:
634                tar.__init__(empty)
635            except tarfile.ReadError:
636                self.assertTrue(tar.fileobj.closed)
637            else:
638                self.fail("ReadError not raised")
639        finally:
640            support.unlink(empty)
641
642    def test_parallel_iteration(self):
643        # Issue #16601: Restarting iteration over tarfile continued
644        # from where it left off.
645        with tarfile.open(self.tarname) as tar:
646            for m1, m2 in zip(tar, tar):
647                self.assertEqual(m1.offset, m2.offset)
648                self.assertEqual(m1.get_info(), m2.get_info())
649
650class MiscReadTest(MiscReadTestBase, unittest.TestCase):
651    test_fail_comp = None
652
653class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
654    pass
655
656class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
657    def requires_name_attribute(self):
658        self.skipTest("BZ2File have no name attribute")
659
660class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
661    def requires_name_attribute(self):
662        self.skipTest("LZMAFile have no name attribute")
663
664
665class StreamReadTest(CommonReadTest, unittest.TestCase):
666
667    prefix="r|"
668
669    def test_read_through(self):
670        # Issue #11224: A poorly designed _FileInFile.read() method
671        # caused seeking errors with stream tar files.
672        for tarinfo in self.tar:
673            if not tarinfo.isreg():
674                continue
675            with self.tar.extractfile(tarinfo) as fobj:
676                while True:
677                    try:
678                        buf = fobj.read(512)
679                    except tarfile.StreamError:
680                        self.fail("simple read-through using "
681                                  "TarFile.extractfile() failed")
682                    if not buf:
683                        break
684
685    def test_fileobj_regular_file(self):
686        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
687        with self.tar.extractfile(tarinfo) as fobj:
688            data = fobj.read()
689        self.assertEqual(len(data), tarinfo.size,
690                "regular file extraction failed")
691        self.assertEqual(sha256sum(data), sha256_regtype,
692                "regular file extraction failed")
693
694    def test_provoke_stream_error(self):
695        tarinfos = self.tar.getmembers()
696        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
697            self.assertRaises(tarfile.StreamError, f.read)
698
699    def test_compare_members(self):
700        tar1 = tarfile.open(tarname, encoding="iso8859-1")
701        try:
702            tar2 = self.tar
703
704            while True:
705                t1 = tar1.next()
706                t2 = tar2.next()
707                if t1 is None:
708                    break
709                self.assertIsNotNone(t2, "stream.next() failed.")
710
711                if t2.islnk() or t2.issym():
712                    with self.assertRaises(tarfile.StreamError):
713                        tar2.extractfile(t2)
714                    continue
715
716                v1 = tar1.extractfile(t1)
717                v2 = tar2.extractfile(t2)
718                if v1 is None:
719                    continue
720                self.assertIsNotNone(v2, "stream.extractfile() failed")
721                self.assertEqual(v1.read(), v2.read(),
722                        "stream extraction failed")
723        finally:
724            tar1.close()
725
726class GzipStreamReadTest(GzipTest, StreamReadTest):
727    pass
728
729class Bz2StreamReadTest(Bz2Test, StreamReadTest):
730    pass
731
732class LzmaStreamReadTest(LzmaTest, StreamReadTest):
733    pass
734
735
736class DetectReadTest(TarTest, unittest.TestCase):
737    def _testfunc_file(self, name, mode):
738        try:
739            tar = tarfile.open(name, mode)
740        except tarfile.ReadError as e:
741            self.fail()
742        else:
743            tar.close()
744
745    def _testfunc_fileobj(self, name, mode):
746        try:
747            with open(name, "rb") as f:
748                tar = tarfile.open(name, mode, fileobj=f)
749        except tarfile.ReadError as e:
750            self.fail()
751        else:
752            tar.close()
753
754    def _test_modes(self, testfunc):
755        if self.suffix:
756            with self.assertRaises(tarfile.ReadError):
757                tarfile.open(tarname, mode="r:" + self.suffix)
758            with self.assertRaises(tarfile.ReadError):
759                tarfile.open(tarname, mode="r|" + self.suffix)
760            with self.assertRaises(tarfile.ReadError):
761                tarfile.open(self.tarname, mode="r:")
762            with self.assertRaises(tarfile.ReadError):
763                tarfile.open(self.tarname, mode="r|")
764        testfunc(self.tarname, "r")
765        testfunc(self.tarname, "r:" + self.suffix)
766        testfunc(self.tarname, "r:*")
767        testfunc(self.tarname, "r|" + self.suffix)
768        testfunc(self.tarname, "r|*")
769
770    def test_detect_file(self):
771        self._test_modes(self._testfunc_file)
772
773    def test_detect_fileobj(self):
774        self._test_modes(self._testfunc_fileobj)
775
776class GzipDetectReadTest(GzipTest, DetectReadTest):
777    pass
778
779class Bz2DetectReadTest(Bz2Test, DetectReadTest):
780    def test_detect_stream_bz2(self):
781        # Originally, tarfile's stream detection looked for the string
782        # "BZh91" at the start of the file. This is incorrect because
783        # the '9' represents the blocksize (900,000 bytes). If the file was
784        # compressed using another blocksize autodetection fails.
785        with open(tarname, "rb") as fobj:
786            data = fobj.read()
787
788        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
789        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
790            fobj.write(data)
791
792        self._testfunc_file(tmpname, "r|*")
793
794class LzmaDetectReadTest(LzmaTest, DetectReadTest):
795    pass
796
797
798class MemberReadTest(ReadTest, unittest.TestCase):
799
800    def _test_member(self, tarinfo, chksum=None, **kwargs):
801        if chksum is not None:
802            with self.tar.extractfile(tarinfo) as f:
803                self.assertEqual(sha256sum(f.read()), chksum,
804                        "wrong sha256sum for %s" % tarinfo.name)
805
806        kwargs["mtime"] = 0o7606136617
807        kwargs["uid"] = 1000
808        kwargs["gid"] = 100
809        if "old-v7" not in tarinfo.name:
810            # V7 tar can't handle alphabetic owners.
811            kwargs["uname"] = "tarfile"
812            kwargs["gname"] = "tarfile"
813        for k, v in kwargs.items():
814            self.assertEqual(getattr(tarinfo, k), v,
815                    "wrong value in %s field of %s" % (k, tarinfo.name))
816
817    def test_find_regtype(self):
818        tarinfo = self.tar.getmember("ustar/regtype")
819        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
820
821    def test_find_conttype(self):
822        tarinfo = self.tar.getmember("ustar/conttype")
823        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
824
825    def test_find_dirtype(self):
826        tarinfo = self.tar.getmember("ustar/dirtype")
827        self._test_member(tarinfo, size=0)
828
829    def test_find_dirtype_with_size(self):
830        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
831        self._test_member(tarinfo, size=255)
832
833    def test_find_lnktype(self):
834        tarinfo = self.tar.getmember("ustar/lnktype")
835        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
836
837    def test_find_symtype(self):
838        tarinfo = self.tar.getmember("ustar/symtype")
839        self._test_member(tarinfo, size=0, linkname="regtype")
840
841    def test_find_blktype(self):
842        tarinfo = self.tar.getmember("ustar/blktype")
843        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
844
845    def test_find_chrtype(self):
846        tarinfo = self.tar.getmember("ustar/chrtype")
847        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
848
849    def test_find_fifotype(self):
850        tarinfo = self.tar.getmember("ustar/fifotype")
851        self._test_member(tarinfo, size=0)
852
853    def test_find_sparse(self):
854        tarinfo = self.tar.getmember("ustar/sparse")
855        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
856
857    def test_find_gnusparse(self):
858        tarinfo = self.tar.getmember("gnu/sparse")
859        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
860
861    def test_find_gnusparse_00(self):
862        tarinfo = self.tar.getmember("gnu/sparse-0.0")
863        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
864
865    def test_find_gnusparse_01(self):
866        tarinfo = self.tar.getmember("gnu/sparse-0.1")
867        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
868
869    def test_find_gnusparse_10(self):
870        tarinfo = self.tar.getmember("gnu/sparse-1.0")
871        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
872
873    def test_find_umlauts(self):
874        tarinfo = self.tar.getmember("ustar/umlauts-"
875                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
876        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
877
878    def test_find_ustar_longname(self):
879        name = "ustar/" + "12345/" * 39 + "1234567/longname"
880        self.assertIn(name, self.tar.getnames())
881
882    def test_find_regtype_oldv7(self):
883        tarinfo = self.tar.getmember("misc/regtype-old-v7")
884        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
885
886    def test_find_pax_umlauts(self):
887        self.tar.close()
888        self.tar = tarfile.open(self.tarname, mode=self.mode,
889                                encoding="iso8859-1")
890        tarinfo = self.tar.getmember("pax/umlauts-"
891                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
892        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
893
894
895class LongnameTest:
896
897    def test_read_longname(self):
898        # Test reading of longname (bug #1471427).
899        longname = self.subdir + "/" + "123/" * 125 + "longname"
900        try:
901            tarinfo = self.tar.getmember(longname)
902        except KeyError:
903            self.fail("longname not found")
904        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
905                "read longname as dirtype")
906
907    def test_read_longlink(self):
908        longname = self.subdir + "/" + "123/" * 125 + "longname"
909        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
910        try:
911            tarinfo = self.tar.getmember(longlink)
912        except KeyError:
913            self.fail("longlink not found")
914        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
915
916    def test_truncated_longname(self):
917        longname = self.subdir + "/" + "123/" * 125 + "longname"
918        tarinfo = self.tar.getmember(longname)
919        offset = tarinfo.offset
920        self.tar.fileobj.seek(offset)
921        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
922        with self.assertRaises(tarfile.ReadError):
923            tarfile.open(name="foo.tar", fileobj=fobj)
924
925    def test_header_offset(self):
926        # Test if the start offset of the TarInfo object includes
927        # the preceding extended header.
928        longname = self.subdir + "/" + "123/" * 125 + "longname"
929        offset = self.tar.getmember(longname).offset
930        with open(tarname, "rb") as fobj:
931            fobj.seek(offset)
932            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
933                                              "iso8859-1", "strict")
934            self.assertEqual(tarinfo.type, self.longnametype)
935
936
937class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
938
939    subdir = "gnu"
940    longnametype = tarfile.GNUTYPE_LONGNAME
941
942    # Since 3.2 tarfile is supposed to accurately restore sparse members and
943    # produce files with holes. This is what we actually want to test here.
944    # Unfortunately, not all platforms/filesystems support sparse files, and
945    # even on platforms that do it is non-trivial to make reliable assertions
946    # about holes in files. Therefore, we first do one basic test which works
947    # an all platforms, and after that a test that will work only on
948    # platforms/filesystems that prove to support sparse files.
949    def _test_sparse_file(self, name):
950        self.tar.extract(name, TEMPDIR)
951        filename = os.path.join(TEMPDIR, name)
952        with open(filename, "rb") as fobj:
953            data = fobj.read()
954        self.assertEqual(sha256sum(data), sha256_sparse,
955                "wrong sha256sum for %s" % name)
956
957        if self._fs_supports_holes():
958            s = os.stat(filename)
959            self.assertLess(s.st_blocks * 512, s.st_size)
960
961    def test_sparse_file_old(self):
962        self._test_sparse_file("gnu/sparse")
963
964    def test_sparse_file_00(self):
965        self._test_sparse_file("gnu/sparse-0.0")
966
967    def test_sparse_file_01(self):
968        self._test_sparse_file("gnu/sparse-0.1")
969
970    def test_sparse_file_10(self):
971        self._test_sparse_file("gnu/sparse-1.0")
972
973    @staticmethod
974    def _fs_supports_holes():
975        # Return True if the platform knows the st_blocks stat attribute and
976        # uses st_blocks units of 512 bytes, and if the filesystem is able to
977        # store holes of 4 KiB in files.
978        #
979        # The function returns False if page size is larger than 4 KiB.
980        # For example, ppc64 uses pages of 64 KiB.
981        if sys.platform.startswith("linux"):
982            # Linux evidentially has 512 byte st_blocks units.
983            name = os.path.join(TEMPDIR, "sparse-test")
984            with open(name, "wb") as fobj:
985                # Seek to "punch a hole" of 4 KiB
986                fobj.seek(4096)
987                fobj.write(b'x' * 4096)
988                fobj.truncate()
989            s = os.stat(name)
990            support.unlink(name)
991            return (s.st_blocks * 512 < s.st_size)
992        else:
993            return False
994
995
996class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
997
998    subdir = "pax"
999    longnametype = tarfile.XHDTYPE
1000
1001    def test_pax_global_headers(self):
1002        tar = tarfile.open(tarname, encoding="iso8859-1")
1003        try:
1004            tarinfo = tar.getmember("pax/regtype1")
1005            self.assertEqual(tarinfo.uname, "foo")
1006            self.assertEqual(tarinfo.gname, "bar")
1007            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1008                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1009
1010            tarinfo = tar.getmember("pax/regtype2")
1011            self.assertEqual(tarinfo.uname, "")
1012            self.assertEqual(tarinfo.gname, "bar")
1013            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1014                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1015
1016            tarinfo = tar.getmember("pax/regtype3")
1017            self.assertEqual(tarinfo.uname, "tarfile")
1018            self.assertEqual(tarinfo.gname, "tarfile")
1019            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1020                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1021        finally:
1022            tar.close()
1023
1024    def test_pax_number_fields(self):
1025        # All following number fields are read from the pax header.
1026        tar = tarfile.open(tarname, encoding="iso8859-1")
1027        try:
1028            tarinfo = tar.getmember("pax/regtype4")
1029            self.assertEqual(tarinfo.size, 7011)
1030            self.assertEqual(tarinfo.uid, 123)
1031            self.assertEqual(tarinfo.gid, 123)
1032            self.assertEqual(tarinfo.mtime, 1041808783.0)
1033            self.assertEqual(type(tarinfo.mtime), float)
1034            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1035            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1036        finally:
1037            tar.close()
1038
1039
1040class WriteTestBase(TarTest):
1041    # Put all write tests in here that are supposed to be tested
1042    # in all possible mode combinations.
1043
1044    def test_fileobj_no_close(self):
1045        fobj = io.BytesIO()
1046        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1047            tar.addfile(tarfile.TarInfo("foo"))
1048        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1049        # Issue #20238: Incomplete gzip output with mode="w:gz"
1050        data = fobj.getvalue()
1051        del tar
1052        support.gc_collect()
1053        self.assertFalse(fobj.closed)
1054        self.assertEqual(data, fobj.getvalue())
1055
1056    def test_eof_marker(self):
1057        # Make sure an end of archive marker is written (two zero blocks).
1058        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1059        # So, we create an archive that has exactly 10240 bytes without the
1060        # marker, and has 20480 bytes once the marker is written.
1061        with tarfile.open(tmpname, self.mode) as tar:
1062            t = tarfile.TarInfo("foo")
1063            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1064            tar.addfile(t, io.BytesIO(b"a" * t.size))
1065
1066        with self.open(tmpname, "rb") as fobj:
1067            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1068
1069
1070class WriteTest(WriteTestBase, unittest.TestCase):
1071
1072    prefix = "w:"
1073
1074    def test_100_char_name(self):
1075        # The name field in a tar header stores strings of at most 100 chars.
1076        # If a string is shorter than 100 chars it has to be padded with '\0',
1077        # which implies that a string of exactly 100 chars is stored without
1078        # a trailing '\0'.
1079        name = "0123456789" * 10
1080        tar = tarfile.open(tmpname, self.mode)
1081        try:
1082            t = tarfile.TarInfo(name)
1083            tar.addfile(t)
1084        finally:
1085            tar.close()
1086
1087        tar = tarfile.open(tmpname)
1088        try:
1089            self.assertEqual(tar.getnames()[0], name,
1090                    "failed to store 100 char filename")
1091        finally:
1092            tar.close()
1093
1094    def test_tar_size(self):
1095        # Test for bug #1013882.
1096        tar = tarfile.open(tmpname, self.mode)
1097        try:
1098            path = os.path.join(TEMPDIR, "file")
1099            with open(path, "wb") as fobj:
1100                fobj.write(b"aaa")
1101            tar.add(path)
1102        finally:
1103            tar.close()
1104        self.assertGreater(os.path.getsize(tmpname), 0,
1105                "tarfile is empty")
1106
1107    # The test_*_size tests test for bug #1167128.
1108    def test_file_size(self):
1109        tar = tarfile.open(tmpname, self.mode)
1110        try:
1111            path = os.path.join(TEMPDIR, "file")
1112            with open(path, "wb"):
1113                pass
1114            tarinfo = tar.gettarinfo(path)
1115            self.assertEqual(tarinfo.size, 0)
1116
1117            with open(path, "wb") as fobj:
1118                fobj.write(b"aaa")
1119            tarinfo = tar.gettarinfo(path)
1120            self.assertEqual(tarinfo.size, 3)
1121        finally:
1122            tar.close()
1123
1124    def test_directory_size(self):
1125        path = os.path.join(TEMPDIR, "directory")
1126        os.mkdir(path)
1127        try:
1128            tar = tarfile.open(tmpname, self.mode)
1129            try:
1130                tarinfo = tar.gettarinfo(path)
1131                self.assertEqual(tarinfo.size, 0)
1132            finally:
1133                tar.close()
1134        finally:
1135            support.rmdir(path)
1136
1137    # mock the following:
1138    #  os.listdir: so we know that files are in the wrong order
1139    def test_ordered_recursion(self):
1140        path = os.path.join(TEMPDIR, "directory")
1141        os.mkdir(path)
1142        open(os.path.join(path, "1"), "a").close()
1143        open(os.path.join(path, "2"), "a").close()
1144        try:
1145            tar = tarfile.open(tmpname, self.mode)
1146            try:
1147                with unittest.mock.patch('os.listdir') as mock_listdir:
1148                    mock_listdir.return_value = ["2", "1"]
1149                    tar.add(path)
1150                paths = []
1151                for m in tar.getmembers():
1152                    paths.append(os.path.split(m.name)[-1])
1153                self.assertEqual(paths, ["directory", "1", "2"]);
1154            finally:
1155                tar.close()
1156        finally:
1157            support.unlink(os.path.join(path, "1"))
1158            support.unlink(os.path.join(path, "2"))
1159            support.rmdir(path)
1160
1161    def test_gettarinfo_pathlike_name(self):
1162        with tarfile.open(tmpname, self.mode) as tar:
1163            path = pathlib.Path(TEMPDIR) / "file"
1164            with open(path, "wb") as fobj:
1165                fobj.write(b"aaa")
1166            tarinfo = tar.gettarinfo(path)
1167            tarinfo2 = tar.gettarinfo(os.fspath(path))
1168            self.assertIsInstance(tarinfo.name, str)
1169            self.assertEqual(tarinfo.name, tarinfo2.name)
1170            self.assertEqual(tarinfo.size, 3)
1171
1172    @unittest.skipUnless(hasattr(os, "link"),
1173                         "Missing hardlink implementation")
1174    def test_link_size(self):
1175        link = os.path.join(TEMPDIR, "link")
1176        target = os.path.join(TEMPDIR, "link_target")
1177        with open(target, "wb") as fobj:
1178            fobj.write(b"aaa")
1179        try:
1180            os.link(target, link)
1181        except PermissionError as e:
1182            self.skipTest('os.link(): %s' % e)
1183        try:
1184            tar = tarfile.open(tmpname, self.mode)
1185            try:
1186                # Record the link target in the inodes list.
1187                tar.gettarinfo(target)
1188                tarinfo = tar.gettarinfo(link)
1189                self.assertEqual(tarinfo.size, 0)
1190            finally:
1191                tar.close()
1192        finally:
1193            support.unlink(target)
1194            support.unlink(link)
1195
1196    @support.skip_unless_symlink
1197    def test_symlink_size(self):
1198        path = os.path.join(TEMPDIR, "symlink")
1199        os.symlink("link_target", path)
1200        try:
1201            tar = tarfile.open(tmpname, self.mode)
1202            try:
1203                tarinfo = tar.gettarinfo(path)
1204                self.assertEqual(tarinfo.size, 0)
1205            finally:
1206                tar.close()
1207        finally:
1208            support.unlink(path)
1209
1210    def test_add_self(self):
1211        # Test for #1257255.
1212        dstname = os.path.abspath(tmpname)
1213        tar = tarfile.open(tmpname, self.mode)
1214        try:
1215            self.assertEqual(tar.name, dstname,
1216                    "archive name must be absolute")
1217            tar.add(dstname)
1218            self.assertEqual(tar.getnames(), [],
1219                    "added the archive to itself")
1220
1221            with support.change_cwd(TEMPDIR):
1222                tar.add(dstname)
1223            self.assertEqual(tar.getnames(), [],
1224                    "added the archive to itself")
1225        finally:
1226            tar.close()
1227
1228    def test_filter(self):
1229        tempdir = os.path.join(TEMPDIR, "filter")
1230        os.mkdir(tempdir)
1231        try:
1232            for name in ("foo", "bar", "baz"):
1233                name = os.path.join(tempdir, name)
1234                support.create_empty_file(name)
1235
1236            def filter(tarinfo):
1237                if os.path.basename(tarinfo.name) == "bar":
1238                    return
1239                tarinfo.uid = 123
1240                tarinfo.uname = "foo"
1241                return tarinfo
1242
1243            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1244            try:
1245                tar.add(tempdir, arcname="empty_dir", filter=filter)
1246            finally:
1247                tar.close()
1248
1249            # Verify that filter is a keyword-only argument
1250            with self.assertRaises(TypeError):
1251                tar.add(tempdir, "empty_dir", True, None, filter)
1252
1253            tar = tarfile.open(tmpname, "r")
1254            try:
1255                for tarinfo in tar:
1256                    self.assertEqual(tarinfo.uid, 123)
1257                    self.assertEqual(tarinfo.uname, "foo")
1258                self.assertEqual(len(tar.getmembers()), 3)
1259            finally:
1260                tar.close()
1261        finally:
1262            support.rmtree(tempdir)
1263
1264    # Guarantee that stored pathnames are not modified. Don't
1265    # remove ./ or ../ or double slashes. Still make absolute
1266    # pathnames relative.
1267    # For details see bug #6054.
1268    def _test_pathname(self, path, cmp_path=None, dir=False):
1269        # Create a tarfile with an empty member named path
1270        # and compare the stored name with the original.
1271        foo = os.path.join(TEMPDIR, "foo")
1272        if not dir:
1273            support.create_empty_file(foo)
1274        else:
1275            os.mkdir(foo)
1276
1277        tar = tarfile.open(tmpname, self.mode)
1278        try:
1279            tar.add(foo, arcname=path)
1280        finally:
1281            tar.close()
1282
1283        tar = tarfile.open(tmpname, "r")
1284        try:
1285            t = tar.next()
1286        finally:
1287            tar.close()
1288
1289        if not dir:
1290            support.unlink(foo)
1291        else:
1292            support.rmdir(foo)
1293
1294        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1295
1296
1297    @support.skip_unless_symlink
1298    def test_extractall_symlinks(self):
1299        # Test if extractall works properly when tarfile contains symlinks
1300        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1301        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1302        os.mkdir(tempdir)
1303        try:
1304            source_file = os.path.join(tempdir,'source')
1305            target_file = os.path.join(tempdir,'symlink')
1306            with open(source_file,'w') as f:
1307                f.write('something\n')
1308            os.symlink(source_file, target_file)
1309            with tarfile.open(temparchive, 'w') as tar:
1310                tar.add(source_file)
1311                tar.add(target_file)
1312            # Let's extract it to the location which contains the symlink
1313            with tarfile.open(temparchive) as tar:
1314                # this should not raise OSError: [Errno 17] File exists
1315                try:
1316                    tar.extractall(path=tempdir)
1317                except OSError:
1318                    self.fail("extractall failed with symlinked files")
1319        finally:
1320            support.unlink(temparchive)
1321            support.rmtree(tempdir)
1322
1323    def test_pathnames(self):
1324        self._test_pathname("foo")
1325        self._test_pathname(os.path.join("foo", ".", "bar"))
1326        self._test_pathname(os.path.join("foo", "..", "bar"))
1327        self._test_pathname(os.path.join(".", "foo"))
1328        self._test_pathname(os.path.join(".", "foo", "."))
1329        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1330        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1331        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1332        self._test_pathname(os.path.join("..", "foo"))
1333        self._test_pathname(os.path.join("..", "foo", ".."))
1334        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1335        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1336
1337        self._test_pathname("foo" + os.sep + os.sep + "bar")
1338        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1339
1340    def test_abs_pathnames(self):
1341        if sys.platform == "win32":
1342            self._test_pathname("C:\\foo", "foo")
1343        else:
1344            self._test_pathname("/foo", "foo")
1345            self._test_pathname("///foo", "foo")
1346
1347    def test_cwd(self):
1348        # Test adding the current working directory.
1349        with support.change_cwd(TEMPDIR):
1350            tar = tarfile.open(tmpname, self.mode)
1351            try:
1352                tar.add(".")
1353            finally:
1354                tar.close()
1355
1356            tar = tarfile.open(tmpname, "r")
1357            try:
1358                for t in tar:
1359                    if t.name != ".":
1360                        self.assertTrue(t.name.startswith("./"), t.name)
1361            finally:
1362                tar.close()
1363
1364    def test_open_nonwritable_fileobj(self):
1365        for exctype in OSError, EOFError, RuntimeError:
1366            class BadFile(io.BytesIO):
1367                first = True
1368                def write(self, data):
1369                    if self.first:
1370                        self.first = False
1371                        raise exctype
1372
1373            f = BadFile()
1374            with self.assertRaises(exctype):
1375                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1376                                   format=tarfile.PAX_FORMAT,
1377                                   pax_headers={'non': 'empty'})
1378            self.assertFalse(f.closed)
1379
1380class GzipWriteTest(GzipTest, WriteTest):
1381    pass
1382
1383class Bz2WriteTest(Bz2Test, WriteTest):
1384    pass
1385
1386class LzmaWriteTest(LzmaTest, WriteTest):
1387    pass
1388
1389
1390class StreamWriteTest(WriteTestBase, unittest.TestCase):
1391
1392    prefix = "w|"
1393    decompressor = None
1394
1395    def test_stream_padding(self):
1396        # Test for bug #1543303.
1397        tar = tarfile.open(tmpname, self.mode)
1398        tar.close()
1399        if self.decompressor:
1400            dec = self.decompressor()
1401            with open(tmpname, "rb") as fobj:
1402                data = fobj.read()
1403            data = dec.decompress(data)
1404            self.assertFalse(dec.unused_data, "found trailing data")
1405        else:
1406            with self.open(tmpname) as fobj:
1407                data = fobj.read()
1408        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1409                        "incorrect zero padding")
1410
1411    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1412                         "Missing umask implementation")
1413    def test_file_mode(self):
1414        # Test for issue #8464: Create files with correct
1415        # permissions.
1416        if os.path.exists(tmpname):
1417            support.unlink(tmpname)
1418
1419        original_umask = os.umask(0o022)
1420        try:
1421            tar = tarfile.open(tmpname, self.mode)
1422            tar.close()
1423            mode = os.stat(tmpname).st_mode & 0o777
1424            self.assertEqual(mode, 0o644, "wrong file permissions")
1425        finally:
1426            os.umask(original_umask)
1427
1428class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1429    pass
1430
1431class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1432    decompressor = bz2.BZ2Decompressor if bz2 else None
1433
1434class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1435    decompressor = lzma.LZMADecompressor if lzma else None
1436
1437
1438class GNUWriteTest(unittest.TestCase):
1439    # This testcase checks for correct creation of GNU Longname
1440    # and Longlink extended headers (cp. bug #812325).
1441
1442    def _length(self, s):
1443        blocks = len(s) // 512 + 1
1444        return blocks * 512
1445
1446    def _calc_size(self, name, link=None):
1447        # Initial tar header
1448        count = 512
1449
1450        if len(name) > tarfile.LENGTH_NAME:
1451            # GNU longname extended header + longname
1452            count += 512
1453            count += self._length(name)
1454        if link is not None and len(link) > tarfile.LENGTH_LINK:
1455            # GNU longlink extended header + longlink
1456            count += 512
1457            count += self._length(link)
1458        return count
1459
1460    def _test(self, name, link=None):
1461        tarinfo = tarfile.TarInfo(name)
1462        if link:
1463            tarinfo.linkname = link
1464            tarinfo.type = tarfile.LNKTYPE
1465
1466        tar = tarfile.open(tmpname, "w")
1467        try:
1468            tar.format = tarfile.GNU_FORMAT
1469            tar.addfile(tarinfo)
1470
1471            v1 = self._calc_size(name, link)
1472            v2 = tar.offset
1473            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1474        finally:
1475            tar.close()
1476
1477        tar = tarfile.open(tmpname)
1478        try:
1479            member = tar.next()
1480            self.assertIsNotNone(member,
1481                    "unable to read longname member")
1482            self.assertEqual(tarinfo.name, member.name,
1483                    "unable to read longname member")
1484            self.assertEqual(tarinfo.linkname, member.linkname,
1485                    "unable to read longname member")
1486        finally:
1487            tar.close()
1488
1489    def test_longname_1023(self):
1490        self._test(("longnam/" * 127) + "longnam")
1491
1492    def test_longname_1024(self):
1493        self._test(("longnam/" * 127) + "longname")
1494
1495    def test_longname_1025(self):
1496        self._test(("longnam/" * 127) + "longname_")
1497
1498    def test_longlink_1023(self):
1499        self._test("name", ("longlnk/" * 127) + "longlnk")
1500
1501    def test_longlink_1024(self):
1502        self._test("name", ("longlnk/" * 127) + "longlink")
1503
1504    def test_longlink_1025(self):
1505        self._test("name", ("longlnk/" * 127) + "longlink_")
1506
1507    def test_longnamelink_1023(self):
1508        self._test(("longnam/" * 127) + "longnam",
1509                   ("longlnk/" * 127) + "longlnk")
1510
1511    def test_longnamelink_1024(self):
1512        self._test(("longnam/" * 127) + "longname",
1513                   ("longlnk/" * 127) + "longlink")
1514
1515    def test_longnamelink_1025(self):
1516        self._test(("longnam/" * 127) + "longname_",
1517                   ("longlnk/" * 127) + "longlink_")
1518
1519
1520class CreateTest(WriteTestBase, unittest.TestCase):
1521
1522    prefix = "x:"
1523
1524    file_path = os.path.join(TEMPDIR, "spameggs42")
1525
1526    def setUp(self):
1527        support.unlink(tmpname)
1528
1529    @classmethod
1530    def setUpClass(cls):
1531        with open(cls.file_path, "wb") as fobj:
1532            fobj.write(b"aaa")
1533
1534    @classmethod
1535    def tearDownClass(cls):
1536        support.unlink(cls.file_path)
1537
1538    def test_create(self):
1539        with tarfile.open(tmpname, self.mode) as tobj:
1540            tobj.add(self.file_path)
1541
1542        with self.taropen(tmpname) as tobj:
1543            names = tobj.getnames()
1544        self.assertEqual(len(names), 1)
1545        self.assertIn('spameggs42', names[0])
1546
1547    def test_create_existing(self):
1548        with tarfile.open(tmpname, self.mode) as tobj:
1549            tobj.add(self.file_path)
1550
1551        with self.assertRaises(FileExistsError):
1552            tobj = tarfile.open(tmpname, self.mode)
1553
1554        with self.taropen(tmpname) as tobj:
1555            names = tobj.getnames()
1556        self.assertEqual(len(names), 1)
1557        self.assertIn('spameggs42', names[0])
1558
1559    def test_create_taropen(self):
1560        with self.taropen(tmpname, "x") as tobj:
1561            tobj.add(self.file_path)
1562
1563        with self.taropen(tmpname) as tobj:
1564            names = tobj.getnames()
1565        self.assertEqual(len(names), 1)
1566        self.assertIn('spameggs42', names[0])
1567
1568    def test_create_existing_taropen(self):
1569        with self.taropen(tmpname, "x") as tobj:
1570            tobj.add(self.file_path)
1571
1572        with self.assertRaises(FileExistsError):
1573            with self.taropen(tmpname, "x"):
1574                pass
1575
1576        with self.taropen(tmpname) as tobj:
1577            names = tobj.getnames()
1578        self.assertEqual(len(names), 1)
1579        self.assertIn("spameggs42", names[0])
1580
1581    def test_create_pathlike_name(self):
1582        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1583            self.assertIsInstance(tobj.name, str)
1584            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1585            tobj.add(pathlib.Path(self.file_path))
1586            names = tobj.getnames()
1587        self.assertEqual(len(names), 1)
1588        self.assertIn('spameggs42', names[0])
1589
1590        with self.taropen(tmpname) as tobj:
1591            names = tobj.getnames()
1592        self.assertEqual(len(names), 1)
1593        self.assertIn('spameggs42', names[0])
1594
1595    def test_create_taropen_pathlike_name(self):
1596        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1597            self.assertIsInstance(tobj.name, str)
1598            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1599            tobj.add(pathlib.Path(self.file_path))
1600            names = tobj.getnames()
1601        self.assertEqual(len(names), 1)
1602        self.assertIn('spameggs42', names[0])
1603
1604        with self.taropen(tmpname) as tobj:
1605            names = tobj.getnames()
1606        self.assertEqual(len(names), 1)
1607        self.assertIn('spameggs42', names[0])
1608
1609
1610class GzipCreateTest(GzipTest, CreateTest):
1611    pass
1612
1613
1614class Bz2CreateTest(Bz2Test, CreateTest):
1615    pass
1616
1617
1618class LzmaCreateTest(LzmaTest, CreateTest):
1619    pass
1620
1621
1622class CreateWithXModeTest(CreateTest):
1623
1624    prefix = "x"
1625
1626    test_create_taropen = None
1627    test_create_existing_taropen = None
1628
1629
1630@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1631class HardlinkTest(unittest.TestCase):
1632    # Test the creation of LNKTYPE (hardlink) members in an archive.
1633
1634    def setUp(self):
1635        self.foo = os.path.join(TEMPDIR, "foo")
1636        self.bar = os.path.join(TEMPDIR, "bar")
1637
1638        with open(self.foo, "wb") as fobj:
1639            fobj.write(b"foo")
1640
1641        try:
1642            os.link(self.foo, self.bar)
1643        except PermissionError as e:
1644            self.skipTest('os.link(): %s' % e)
1645
1646        self.tar = tarfile.open(tmpname, "w")
1647        self.tar.add(self.foo)
1648
1649    def tearDown(self):
1650        self.tar.close()
1651        support.unlink(self.foo)
1652        support.unlink(self.bar)
1653
1654    def test_add_twice(self):
1655        # The same name will be added as a REGTYPE every
1656        # time regardless of st_nlink.
1657        tarinfo = self.tar.gettarinfo(self.foo)
1658        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1659                "add file as regular failed")
1660
1661    def test_add_hardlink(self):
1662        tarinfo = self.tar.gettarinfo(self.bar)
1663        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1664                "add file as hardlink failed")
1665
1666    def test_dereference_hardlink(self):
1667        self.tar.dereference = True
1668        tarinfo = self.tar.gettarinfo(self.bar)
1669        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1670                "dereferencing hardlink failed")
1671
1672
1673class PaxWriteTest(GNUWriteTest):
1674
1675    def _test(self, name, link=None):
1676        # See GNUWriteTest.
1677        tarinfo = tarfile.TarInfo(name)
1678        if link:
1679            tarinfo.linkname = link
1680            tarinfo.type = tarfile.LNKTYPE
1681
1682        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1683        try:
1684            tar.addfile(tarinfo)
1685        finally:
1686            tar.close()
1687
1688        tar = tarfile.open(tmpname)
1689        try:
1690            if link:
1691                l = tar.getmembers()[0].linkname
1692                self.assertEqual(link, l, "PAX longlink creation failed")
1693            else:
1694                n = tar.getmembers()[0].name
1695                self.assertEqual(name, n, "PAX longname creation failed")
1696        finally:
1697            tar.close()
1698
1699    def test_pax_global_header(self):
1700        pax_headers = {
1701                "foo": "bar",
1702                "uid": "0",
1703                "mtime": "1.23",
1704                "test": "\xe4\xf6\xfc",
1705                "\xe4\xf6\xfc": "test"}
1706
1707        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1708                pax_headers=pax_headers)
1709        try:
1710            tar.addfile(tarfile.TarInfo("test"))
1711        finally:
1712            tar.close()
1713
1714        # Test if the global header was written correctly.
1715        tar = tarfile.open(tmpname, encoding="iso8859-1")
1716        try:
1717            self.assertEqual(tar.pax_headers, pax_headers)
1718            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1719            # Test if all the fields are strings.
1720            for key, val in tar.pax_headers.items():
1721                self.assertIsNot(type(key), bytes)
1722                self.assertIsNot(type(val), bytes)
1723                if key in tarfile.PAX_NUMBER_FIELDS:
1724                    try:
1725                        tarfile.PAX_NUMBER_FIELDS[key](val)
1726                    except (TypeError, ValueError):
1727                        self.fail("unable to convert pax header field")
1728        finally:
1729            tar.close()
1730
1731    def test_pax_extended_header(self):
1732        # The fields from the pax header have priority over the
1733        # TarInfo.
1734        pax_headers = {"path": "foo", "uid": "123"}
1735
1736        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1737                           encoding="iso8859-1")
1738        try:
1739            t = tarfile.TarInfo()
1740            t.name = "\xe4\xf6\xfc" # non-ASCII
1741            t.uid = 8**8 # too large
1742            t.pax_headers = pax_headers
1743            tar.addfile(t)
1744        finally:
1745            tar.close()
1746
1747        tar = tarfile.open(tmpname, encoding="iso8859-1")
1748        try:
1749            t = tar.getmembers()[0]
1750            self.assertEqual(t.pax_headers, pax_headers)
1751            self.assertEqual(t.name, "foo")
1752            self.assertEqual(t.uid, 123)
1753        finally:
1754            tar.close()
1755
1756
1757class UnicodeTest:
1758
1759    def test_iso8859_1_filename(self):
1760        self._test_unicode_filename("iso8859-1")
1761
1762    def test_utf7_filename(self):
1763        self._test_unicode_filename("utf7")
1764
1765    def test_utf8_filename(self):
1766        self._test_unicode_filename("utf-8")
1767
1768    def _test_unicode_filename(self, encoding):
1769        tar = tarfile.open(tmpname, "w", format=self.format,
1770                           encoding=encoding, errors="strict")
1771        try:
1772            name = "\xe4\xf6\xfc"
1773            tar.addfile(tarfile.TarInfo(name))
1774        finally:
1775            tar.close()
1776
1777        tar = tarfile.open(tmpname, encoding=encoding)
1778        try:
1779            self.assertEqual(tar.getmembers()[0].name, name)
1780        finally:
1781            tar.close()
1782
1783    def test_unicode_filename_error(self):
1784        tar = tarfile.open(tmpname, "w", format=self.format,
1785                           encoding="ascii", errors="strict")
1786        try:
1787            tarinfo = tarfile.TarInfo()
1788
1789            tarinfo.name = "\xe4\xf6\xfc"
1790            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1791
1792            tarinfo.name = "foo"
1793            tarinfo.uname = "\xe4\xf6\xfc"
1794            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1795        finally:
1796            tar.close()
1797
1798    def test_unicode_argument(self):
1799        tar = tarfile.open(tarname, "r",
1800                           encoding="iso8859-1", errors="strict")
1801        try:
1802            for t in tar:
1803                self.assertIs(type(t.name), str)
1804                self.assertIs(type(t.linkname), str)
1805                self.assertIs(type(t.uname), str)
1806                self.assertIs(type(t.gname), str)
1807        finally:
1808            tar.close()
1809
1810    def test_uname_unicode(self):
1811        t = tarfile.TarInfo("foo")
1812        t.uname = "\xe4\xf6\xfc"
1813        t.gname = "\xe4\xf6\xfc"
1814
1815        tar = tarfile.open(tmpname, mode="w", format=self.format,
1816                           encoding="iso8859-1")
1817        try:
1818            tar.addfile(t)
1819        finally:
1820            tar.close()
1821
1822        tar = tarfile.open(tmpname, encoding="iso8859-1")
1823        try:
1824            t = tar.getmember("foo")
1825            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1826            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1827
1828            if self.format != tarfile.PAX_FORMAT:
1829                tar.close()
1830                tar = tarfile.open(tmpname, encoding="ascii")
1831                t = tar.getmember("foo")
1832                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1833                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1834        finally:
1835            tar.close()
1836
1837
1838class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1839
1840    format = tarfile.USTAR_FORMAT
1841
1842    # Test whether the utf-8 encoded version of a filename exceeds the 100
1843    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1844    # bytes).
1845    def test_unicode_name1(self):
1846        self._test_ustar_name("0123456789" * 10)
1847        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1848        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1849        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1850
1851    def test_unicode_name2(self):
1852        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1853        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1854
1855    # Test whether the utf-8 encoded version of a filename exceeds the 155
1856    # bytes prefix + '/' + 100 bytes name limit.
1857    def test_unicode_longname1(self):
1858        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1859        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1860        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1861        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1862
1863    def test_unicode_longname2(self):
1864        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1865        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1866
1867    def test_unicode_longname3(self):
1868        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1869        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1870        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1871
1872    def test_unicode_longname4(self):
1873        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1874        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1875
1876    def _test_ustar_name(self, name, exc=None):
1877        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1878            t = tarfile.TarInfo(name)
1879            if exc is None:
1880                tar.addfile(t)
1881            else:
1882                self.assertRaises(exc, tar.addfile, t)
1883
1884        if exc is None:
1885            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1886                for t in tar:
1887                    self.assertEqual(name, t.name)
1888                    break
1889
1890    # Test the same as above for the 100 bytes link field.
1891    def test_unicode_link1(self):
1892        self._test_ustar_link("0123456789" * 10)
1893        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1894        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1895        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1896
1897    def test_unicode_link2(self):
1898        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1899        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1900
1901    def _test_ustar_link(self, name, exc=None):
1902        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1903            t = tarfile.TarInfo("foo")
1904            t.linkname = name
1905            if exc is None:
1906                tar.addfile(t)
1907            else:
1908                self.assertRaises(exc, tar.addfile, t)
1909
1910        if exc is None:
1911            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1912                for t in tar:
1913                    self.assertEqual(name, t.linkname)
1914                    break
1915
1916
1917class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1918
1919    format = tarfile.GNU_FORMAT
1920
1921    def test_bad_pax_header(self):
1922        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1923        # without a hdrcharset=BINARY header.
1924        for encoding, name in (
1925                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1926                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1927            with tarfile.open(tarname, encoding=encoding,
1928                              errors="surrogateescape") as tar:
1929                try:
1930                    t = tar.getmember(name)
1931                except KeyError:
1932                    self.fail("unable to read bad GNU tar pax header")
1933
1934
1935class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1936
1937    format = tarfile.PAX_FORMAT
1938
1939    # PAX_FORMAT ignores encoding in write mode.
1940    test_unicode_filename_error = None
1941
1942    def test_binary_header(self):
1943        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1944        for encoding, name in (
1945                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1946                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1947            with tarfile.open(tarname, encoding=encoding,
1948                              errors="surrogateescape") as tar:
1949                try:
1950                    t = tar.getmember(name)
1951                except KeyError:
1952                    self.fail("unable to read POSIX.1-2008 binary header")
1953
1954
1955class AppendTestBase:
1956    # Test append mode (cp. patch #1652681).
1957
1958    def setUp(self):
1959        self.tarname = tmpname
1960        if os.path.exists(self.tarname):
1961            support.unlink(self.tarname)
1962
1963    def _create_testtar(self, mode="w:"):
1964        with tarfile.open(tarname, encoding="iso8859-1") as src:
1965            t = src.getmember("ustar/regtype")
1966            t.name = "foo"
1967            with src.extractfile(t) as f:
1968                with tarfile.open(self.tarname, mode) as tar:
1969                    tar.addfile(t, f)
1970
1971    def test_append_compressed(self):
1972        self._create_testtar("w:" + self.suffix)
1973        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1974
1975class AppendTest(AppendTestBase, unittest.TestCase):
1976    test_append_compressed = None
1977
1978    def _add_testfile(self, fileobj=None):
1979        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1980            tar.addfile(tarfile.TarInfo("bar"))
1981
1982    def _test(self, names=["bar"], fileobj=None):
1983        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1984            self.assertEqual(tar.getnames(), names)
1985
1986    def test_non_existing(self):
1987        self._add_testfile()
1988        self._test()
1989
1990    def test_empty(self):
1991        tarfile.open(self.tarname, "w:").close()
1992        self._add_testfile()
1993        self._test()
1994
1995    def test_empty_fileobj(self):
1996        fobj = io.BytesIO(b"\0" * 1024)
1997        self._add_testfile(fobj)
1998        fobj.seek(0)
1999        self._test(fileobj=fobj)
2000
2001    def test_fileobj(self):
2002        self._create_testtar()
2003        with open(self.tarname, "rb") as fobj:
2004            data = fobj.read()
2005        fobj = io.BytesIO(data)
2006        self._add_testfile(fobj)
2007        fobj.seek(0)
2008        self._test(names=["foo", "bar"], fileobj=fobj)
2009
2010    def test_existing(self):
2011        self._create_testtar()
2012        self._add_testfile()
2013        self._test(names=["foo", "bar"])
2014
2015    # Append mode is supposed to fail if the tarfile to append to
2016    # does not end with a zero block.
2017    def _test_error(self, data):
2018        with open(self.tarname, "wb") as fobj:
2019            fobj.write(data)
2020        self.assertRaises(tarfile.ReadError, self._add_testfile)
2021
2022    def test_null(self):
2023        self._test_error(b"")
2024
2025    def test_incomplete(self):
2026        self._test_error(b"\0" * 13)
2027
2028    def test_premature_eof(self):
2029        data = tarfile.TarInfo("foo").tobuf()
2030        self._test_error(data)
2031
2032    def test_trailing_garbage(self):
2033        data = tarfile.TarInfo("foo").tobuf()
2034        self._test_error(data + b"\0" * 13)
2035
2036    def test_invalid(self):
2037        self._test_error(b"a" * 512)
2038
2039class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2040    pass
2041
2042class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2043    pass
2044
2045class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2046    pass
2047
2048
2049class LimitsTest(unittest.TestCase):
2050
2051    def test_ustar_limits(self):
2052        # 100 char name
2053        tarinfo = tarfile.TarInfo("0123456789" * 10)
2054        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2055
2056        # 101 char name that cannot be stored
2057        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2058        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2059
2060        # 256 char name with a slash at pos 156
2061        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2062        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2063
2064        # 256 char name that cannot be stored
2065        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2066        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2067
2068        # 512 char name
2069        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2070        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2071
2072        # 512 char linkname
2073        tarinfo = tarfile.TarInfo("longlink")
2074        tarinfo.linkname = "123/" * 126 + "longname"
2075        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2076
2077        # uid > 8 digits
2078        tarinfo = tarfile.TarInfo("name")
2079        tarinfo.uid = 0o10000000
2080        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2081
2082    def test_gnu_limits(self):
2083        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2084        tarinfo.tobuf(tarfile.GNU_FORMAT)
2085
2086        tarinfo = tarfile.TarInfo("longlink")
2087        tarinfo.linkname = "123/" * 126 + "longname"
2088        tarinfo.tobuf(tarfile.GNU_FORMAT)
2089
2090        # uid >= 256 ** 7
2091        tarinfo = tarfile.TarInfo("name")
2092        tarinfo.uid = 0o4000000000000000000
2093        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2094
2095    def test_pax_limits(self):
2096        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2097        tarinfo.tobuf(tarfile.PAX_FORMAT)
2098
2099        tarinfo = tarfile.TarInfo("longlink")
2100        tarinfo.linkname = "123/" * 126 + "longname"
2101        tarinfo.tobuf(tarfile.PAX_FORMAT)
2102
2103        tarinfo = tarfile.TarInfo("name")
2104        tarinfo.uid = 0o4000000000000000000
2105        tarinfo.tobuf(tarfile.PAX_FORMAT)
2106
2107
2108class MiscTest(unittest.TestCase):
2109
2110    def test_char_fields(self):
2111        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2112                         b"foo\0\0\0\0\0")
2113        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2114                         b"foo")
2115        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2116                         "foo")
2117        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2118                         "foo")
2119
2120    def test_read_number_fields(self):
2121        # Issue 13158: Test if GNU tar specific base-256 number fields
2122        # are decoded correctly.
2123        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2124        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2125        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2126                         0o10000000)
2127        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2128                         0xffffffff)
2129        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2130                         -1)
2131        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2132                         -100)
2133        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2134                         -0x100000000000000)
2135
2136        # Issue 24514: Test if empty number fields are converted to zero.
2137        self.assertEqual(tarfile.nti(b"\0"), 0)
2138        self.assertEqual(tarfile.nti(b"       \0"), 0)
2139
2140    def test_write_number_fields(self):
2141        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2142        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2143        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2144                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2145        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2146                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2147        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2148                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2149        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2150                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2151        self.assertEqual(tarfile.itn(-0x100000000000000,
2152                                     format=tarfile.GNU_FORMAT),
2153                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2154
2155        # Issue 32713: Test if itn() supports float values outside the
2156        # non-GNU format range
2157        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2158                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2159        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2160                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2161        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2162
2163    def test_number_field_limits(self):
2164        with self.assertRaises(ValueError):
2165            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2166        with self.assertRaises(ValueError):
2167            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2168        with self.assertRaises(ValueError):
2169            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2170        with self.assertRaises(ValueError):
2171            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2172
2173    def test__all__(self):
2174        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2175                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2176                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2177                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2178                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2179                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2180                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2181                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2182                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2183                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2184                     'filemode',
2185                     'EmptyHeaderError', 'TruncatedHeaderError',
2186                     'EOFHeaderError', 'InvalidHeaderError',
2187                     'SubsequentHeaderError', 'ExFileObject',
2188                     'main'}
2189        support.check__all__(self, tarfile, blacklist=blacklist)
2190
2191
2192class CommandLineTest(unittest.TestCase):
2193
2194    def tarfilecmd(self, *args, **kwargs):
2195        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2196                                                      **kwargs)
2197        return out.replace(os.linesep.encode(), b'\n')
2198
2199    def tarfilecmd_failure(self, *args):
2200        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2201
2202    def make_simple_tarfile(self, tar_name):
2203        files = [support.findfile('tokenize_tests.txt'),
2204                 support.findfile('tokenize_tests-no-coding-cookie-'
2205                                  'and-utf8-bom-sig-only.txt')]
2206        self.addCleanup(support.unlink, tar_name)
2207        with tarfile.open(tar_name, 'w') as tf:
2208            for tardata in files:
2209                tf.add(tardata, arcname=os.path.basename(tardata))
2210
2211    def test_bad_use(self):
2212        rc, out, err = self.tarfilecmd_failure()
2213        self.assertEqual(out, b'')
2214        self.assertIn(b'usage', err.lower())
2215        self.assertIn(b'error', err.lower())
2216        self.assertIn(b'required', err.lower())
2217        rc, out, err = self.tarfilecmd_failure('-l', '')
2218        self.assertEqual(out, b'')
2219        self.assertNotEqual(err.strip(), b'')
2220
2221    def test_test_command(self):
2222        for tar_name in testtarnames:
2223            for opt in '-t', '--test':
2224                out = self.tarfilecmd(opt, tar_name)
2225                self.assertEqual(out, b'')
2226
2227    def test_test_command_verbose(self):
2228        for tar_name in testtarnames:
2229            for opt in '-v', '--verbose':
2230                out = self.tarfilecmd(opt, '-t', tar_name)
2231                self.assertIn(b'is a tar archive.\n', out)
2232
2233    def test_test_command_invalid_file(self):
2234        zipname = support.findfile('zipdir.zip')
2235        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2236        self.assertIn(b' is not a tar archive.', err)
2237        self.assertEqual(out, b'')
2238        self.assertEqual(rc, 1)
2239
2240        for tar_name in testtarnames:
2241            with self.subTest(tar_name=tar_name):
2242                with open(tar_name, 'rb') as f:
2243                    data = f.read()
2244                try:
2245                    with open(tmpname, 'wb') as f:
2246                        f.write(data[:511])
2247                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2248                    self.assertEqual(out, b'')
2249                    self.assertEqual(rc, 1)
2250                finally:
2251                    support.unlink(tmpname)
2252
2253    def test_list_command(self):
2254        for tar_name in testtarnames:
2255            with support.captured_stdout() as t:
2256                with tarfile.open(tar_name, 'r') as tf:
2257                    tf.list(verbose=False)
2258            expected = t.getvalue().encode('ascii', 'backslashreplace')
2259            for opt in '-l', '--list':
2260                out = self.tarfilecmd(opt, tar_name,
2261                                      PYTHONIOENCODING='ascii')
2262                self.assertEqual(out, expected)
2263
2264    def test_list_command_verbose(self):
2265        for tar_name in testtarnames:
2266            with support.captured_stdout() as t:
2267                with tarfile.open(tar_name, 'r') as tf:
2268                    tf.list(verbose=True)
2269            expected = t.getvalue().encode('ascii', 'backslashreplace')
2270            for opt in '-v', '--verbose':
2271                out = self.tarfilecmd(opt, '-l', tar_name,
2272                                      PYTHONIOENCODING='ascii')
2273                self.assertEqual(out, expected)
2274
2275    def test_list_command_invalid_file(self):
2276        zipname = support.findfile('zipdir.zip')
2277        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2278        self.assertIn(b' is not a tar archive.', err)
2279        self.assertEqual(out, b'')
2280        self.assertEqual(rc, 1)
2281
2282    def test_create_command(self):
2283        files = [support.findfile('tokenize_tests.txt'),
2284                 support.findfile('tokenize_tests-no-coding-cookie-'
2285                                  'and-utf8-bom-sig-only.txt')]
2286        for opt in '-c', '--create':
2287            try:
2288                out = self.tarfilecmd(opt, tmpname, *files)
2289                self.assertEqual(out, b'')
2290                with tarfile.open(tmpname) as tar:
2291                    tar.getmembers()
2292            finally:
2293                support.unlink(tmpname)
2294
2295    def test_create_command_verbose(self):
2296        files = [support.findfile('tokenize_tests.txt'),
2297                 support.findfile('tokenize_tests-no-coding-cookie-'
2298                                  'and-utf8-bom-sig-only.txt')]
2299        for opt in '-v', '--verbose':
2300            try:
2301                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2302                self.assertIn(b' file created.', out)
2303                with tarfile.open(tmpname) as tar:
2304                    tar.getmembers()
2305            finally:
2306                support.unlink(tmpname)
2307
2308    def test_create_command_dotless_filename(self):
2309        files = [support.findfile('tokenize_tests.txt')]
2310        try:
2311            out = self.tarfilecmd('-c', dotlessname, *files)
2312            self.assertEqual(out, b'')
2313            with tarfile.open(dotlessname) as tar:
2314                tar.getmembers()
2315        finally:
2316            support.unlink(dotlessname)
2317
2318    def test_create_command_dot_started_filename(self):
2319        tar_name = os.path.join(TEMPDIR, ".testtar")
2320        files = [support.findfile('tokenize_tests.txt')]
2321        try:
2322            out = self.tarfilecmd('-c', tar_name, *files)
2323            self.assertEqual(out, b'')
2324            with tarfile.open(tar_name) as tar:
2325                tar.getmembers()
2326        finally:
2327            support.unlink(tar_name)
2328
2329    def test_create_command_compressed(self):
2330        files = [support.findfile('tokenize_tests.txt'),
2331                 support.findfile('tokenize_tests-no-coding-cookie-'
2332                                  'and-utf8-bom-sig-only.txt')]
2333        for filetype in (GzipTest, Bz2Test, LzmaTest):
2334            if not filetype.open:
2335                continue
2336            try:
2337                tar_name = tmpname + '.' + filetype.suffix
2338                out = self.tarfilecmd('-c', tar_name, *files)
2339                with filetype.taropen(tar_name) as tar:
2340                    tar.getmembers()
2341            finally:
2342                support.unlink(tar_name)
2343
2344    def test_extract_command(self):
2345        self.make_simple_tarfile(tmpname)
2346        for opt in '-e', '--extract':
2347            try:
2348                with support.temp_cwd(tarextdir):
2349                    out = self.tarfilecmd(opt, tmpname)
2350                self.assertEqual(out, b'')
2351            finally:
2352                support.rmtree(tarextdir)
2353
2354    def test_extract_command_verbose(self):
2355        self.make_simple_tarfile(tmpname)
2356        for opt in '-v', '--verbose':
2357            try:
2358                with support.temp_cwd(tarextdir):
2359                    out = self.tarfilecmd(opt, '-e', tmpname)
2360                self.assertIn(b' file is extracted.', out)
2361            finally:
2362                support.rmtree(tarextdir)
2363
2364    def test_extract_command_different_directory(self):
2365        self.make_simple_tarfile(tmpname)
2366        try:
2367            with support.temp_cwd(tarextdir):
2368                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2369            self.assertEqual(out, b'')
2370        finally:
2371            support.rmtree(tarextdir)
2372
2373    def test_extract_command_invalid_file(self):
2374        zipname = support.findfile('zipdir.zip')
2375        with support.temp_cwd(tarextdir):
2376            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2377        self.assertIn(b' is not a tar archive.', err)
2378        self.assertEqual(out, b'')
2379        self.assertEqual(rc, 1)
2380
2381
2382class ContextManagerTest(unittest.TestCase):
2383
2384    def test_basic(self):
2385        with tarfile.open(tarname) as tar:
2386            self.assertFalse(tar.closed, "closed inside runtime context")
2387        self.assertTrue(tar.closed, "context manager failed")
2388
2389    def test_closed(self):
2390        # The __enter__() method is supposed to raise OSError
2391        # if the TarFile object is already closed.
2392        tar = tarfile.open(tarname)
2393        tar.close()
2394        with self.assertRaises(OSError):
2395            with tar:
2396                pass
2397
2398    def test_exception(self):
2399        # Test if the OSError exception is passed through properly.
2400        with self.assertRaises(Exception) as exc:
2401            with tarfile.open(tarname) as tar:
2402                raise OSError
2403        self.assertIsInstance(exc.exception, OSError,
2404                              "wrong exception raised in context manager")
2405        self.assertTrue(tar.closed, "context manager failed")
2406
2407    def test_no_eof(self):
2408        # __exit__() must not write end-of-archive blocks if an
2409        # exception was raised.
2410        try:
2411            with tarfile.open(tmpname, "w") as tar:
2412                raise Exception
2413        except:
2414            pass
2415        self.assertEqual(os.path.getsize(tmpname), 0,
2416                "context manager wrote an end-of-archive block")
2417        self.assertTrue(tar.closed, "context manager failed")
2418
2419    def test_eof(self):
2420        # __exit__() must write end-of-archive blocks, i.e. call
2421        # TarFile.close() if there was no error.
2422        with tarfile.open(tmpname, "w"):
2423            pass
2424        self.assertNotEqual(os.path.getsize(tmpname), 0,
2425                "context manager wrote no end-of-archive block")
2426
2427    def test_fileobj(self):
2428        # Test that __exit__() did not close the external file
2429        # object.
2430        with open(tmpname, "wb") as fobj:
2431            try:
2432                with tarfile.open(fileobj=fobj, mode="w") as tar:
2433                    raise Exception
2434            except:
2435                pass
2436            self.assertFalse(fobj.closed, "external file object was closed")
2437            self.assertTrue(tar.closed, "context manager failed")
2438
2439
2440@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2441class LinkEmulationTest(ReadTest, unittest.TestCase):
2442
2443    # Test for issue #8741 regression. On platforms that do not support
2444    # symbolic or hard links tarfile tries to extract these types of members
2445    # as the regular files they point to.
2446    def _test_link_extraction(self, name):
2447        self.tar.extract(name, TEMPDIR)
2448        with open(os.path.join(TEMPDIR, name), "rb") as f:
2449            data = f.read()
2450        self.assertEqual(sha256sum(data), sha256_regtype)
2451
2452    # See issues #1578269, #8879, and #17689 for some history on these skips
2453    @unittest.skipIf(hasattr(os.path, "islink"),
2454                     "Skip emulation - has os.path.islink but not os.link")
2455    def test_hardlink_extraction1(self):
2456        self._test_link_extraction("ustar/lnktype")
2457
2458    @unittest.skipIf(hasattr(os.path, "islink"),
2459                     "Skip emulation - has os.path.islink but not os.link")
2460    def test_hardlink_extraction2(self):
2461        self._test_link_extraction("./ustar/linktest2/lnktype")
2462
2463    @unittest.skipIf(hasattr(os, "symlink"),
2464                     "Skip emulation if symlink exists")
2465    def test_symlink_extraction1(self):
2466        self._test_link_extraction("ustar/symtype")
2467
2468    @unittest.skipIf(hasattr(os, "symlink"),
2469                     "Skip emulation if symlink exists")
2470    def test_symlink_extraction2(self):
2471        self._test_link_extraction("./ustar/linktest2/symtype")
2472
2473
2474class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2475    # Issue5068: The _BZ2Proxy.read() method loops forever
2476    # on an empty or partial bzipped file.
2477
2478    def _test_partial_input(self, mode):
2479        class MyBytesIO(io.BytesIO):
2480            hit_eof = False
2481            def read(self, n):
2482                if self.hit_eof:
2483                    raise AssertionError("infinite loop detected in "
2484                                         "tarfile.open()")
2485                self.hit_eof = self.tell() == len(self.getvalue())
2486                return super(MyBytesIO, self).read(n)
2487            def seek(self, *args):
2488                self.hit_eof = False
2489                return super(MyBytesIO, self).seek(*args)
2490
2491        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2492        for x in range(len(data) + 1):
2493            try:
2494                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2495            except tarfile.ReadError:
2496                pass # we have no interest in ReadErrors
2497
2498    def test_partial_input(self):
2499        self._test_partial_input("r")
2500
2501    def test_partial_input_bz2(self):
2502        self._test_partial_input("r:bz2")
2503
2504
2505def root_is_uid_gid_0():
2506    try:
2507        import pwd, grp
2508    except ImportError:
2509        return False
2510    if pwd.getpwuid(0)[0] != 'root':
2511        return False
2512    if grp.getgrgid(0)[0] != 'root':
2513        return False
2514    return True
2515
2516
2517@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2518@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2519class NumericOwnerTest(unittest.TestCase):
2520    # mock the following:
2521    #  os.chown: so we can test what's being called
2522    #  os.chmod: so the modes are not actually changed. if they are, we can't
2523    #             delete the files/directories
2524    #  os.geteuid: so we can lie and say we're root (uid = 0)
2525
2526    @staticmethod
2527    def _make_test_archive(filename_1, dirname_1, filename_2):
2528        # the file contents to write
2529        fobj = io.BytesIO(b"content")
2530
2531        # create a tar file with a file, a directory, and a file within that
2532        #  directory. Assign various .uid/.gid values to them
2533        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2534                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2535                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2536                 ]
2537        with tarfile.open(tmpname, 'w') as tarfl:
2538            for name, uid, gid, typ, contents in items:
2539                t = tarfile.TarInfo(name)
2540                t.uid = uid
2541                t.gid = gid
2542                t.uname = 'root'
2543                t.gname = 'root'
2544                t.type = typ
2545                tarfl.addfile(t, contents)
2546
2547        # return the full pathname to the tar file
2548        return tmpname
2549
2550    @staticmethod
2551    @contextmanager
2552    def _setup_test(mock_geteuid):
2553        mock_geteuid.return_value = 0  # lie and say we're root
2554        fname = 'numeric-owner-testfile'
2555        dirname = 'dir'
2556
2557        # the names we want stored in the tarfile
2558        filename_1 = fname
2559        dirname_1 = dirname
2560        filename_2 = os.path.join(dirname, fname)
2561
2562        # create the tarfile with the contents we're after
2563        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2564                                                           dirname_1,
2565                                                           filename_2)
2566
2567        # open the tarfile for reading. yield it and the names of the items
2568        #  we stored into the file
2569        with tarfile.open(tar_filename) as tarfl:
2570            yield tarfl, filename_1, dirname_1, filename_2
2571
2572    @unittest.mock.patch('os.chown')
2573    @unittest.mock.patch('os.chmod')
2574    @unittest.mock.patch('os.geteuid')
2575    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2576                                        mock_chown):
2577        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2578                                                filename_2):
2579            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2580            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2581
2582        # convert to filesystem paths
2583        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2584        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2585
2586        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2587                                     unittest.mock.call(f_filename_2, 88, 87),
2588                                     ],
2589                                    any_order=True)
2590
2591    @unittest.mock.patch('os.chown')
2592    @unittest.mock.patch('os.chmod')
2593    @unittest.mock.patch('os.geteuid')
2594    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2595                                           mock_chown):
2596        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2597                                                filename_2):
2598            tarfl.extractall(TEMPDIR, numeric_owner=True)
2599
2600        # convert to filesystem paths
2601        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2602        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2603        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2604
2605        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2606                                     unittest.mock.call(f_dirname_1, 77, 76),
2607                                     unittest.mock.call(f_filename_2, 88, 87),
2608                                     ],
2609                                    any_order=True)
2610
2611    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2612    #  because the uname and gname in the test file are 'root', and extract()
2613    #  will look them up using pwd and grp to find their uid and gid, which we
2614    #  test here to be 0.
2615    @unittest.skipUnless(root_is_uid_gid_0(),
2616                         'uid=0,gid=0 must be named "root"')
2617    @unittest.mock.patch('os.chown')
2618    @unittest.mock.patch('os.chmod')
2619    @unittest.mock.patch('os.geteuid')
2620    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2621                                           mock_chown):
2622        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2623            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2624
2625        # convert to filesystem paths
2626        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2627
2628        mock_chown.assert_called_with(f_filename_1, 0, 0)
2629
2630    @unittest.mock.patch('os.geteuid')
2631    def test_keyword_only(self, mock_geteuid):
2632        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2633            self.assertRaises(TypeError,
2634                              tarfl.extract, filename_1, TEMPDIR, False, True)
2635
2636
2637def setUpModule():
2638    support.unlink(TEMPDIR)
2639    os.makedirs(TEMPDIR)
2640
2641    global testtarnames
2642    testtarnames = [tarname]
2643    with open(tarname, "rb") as fobj:
2644        data = fobj.read()
2645
2646    # Create compressed tarfiles.
2647    for c in GzipTest, Bz2Test, LzmaTest:
2648        if c.open:
2649            support.unlink(c.tarname)
2650            testtarnames.append(c.tarname)
2651            with c.open(c.tarname, "wb") as tar:
2652                tar.write(data)
2653
2654def tearDownModule():
2655    if os.path.exists(TEMPDIR):
2656        support.rmtree(TEMPDIR)
2657
2658if __name__ == "__main__":
2659    unittest.main()
2660