• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25try:
26    import lzma
27except ImportError:
28    lzma = None
29
30def sha256sum(data):
31    return sha256(data).hexdigest()
32
33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
34tarextdir = TEMPDIR + '-extract-test'
35tarname = support.findfile("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
39tmpname = os.path.join(TEMPDIR, "tmp.tar")
40dotlessname = os.path.join(TEMPDIR, "testtar")
41
42sha256_regtype = (
43    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
44)
45sha256_sparse = (
46    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
47)
48
49
50class TarTest:
51    tarname = tarname
52    suffix = ''
53    open = io.FileIO
54    taropen = tarfile.TarFile.taropen
55
56    @property
57    def mode(self):
58        return self.prefix + self.suffix
59
60@support.requires_gzip()
61class GzipTest:
62    tarname = gzipname
63    suffix = 'gz'
64    open = gzip.GzipFile if gzip else None
65    taropen = tarfile.TarFile.gzopen
66
67@support.requires_bz2()
68class Bz2Test:
69    tarname = bz2name
70    suffix = 'bz2'
71    open = bz2.BZ2File if bz2 else None
72    taropen = tarfile.TarFile.bz2open
73
74@support.requires_lzma()
75class LzmaTest:
76    tarname = xzname
77    suffix = 'xz'
78    open = lzma.LZMAFile if lzma else None
79    taropen = tarfile.TarFile.xzopen
80
81
82class ReadTest(TarTest):
83
84    prefix = "r:"
85
86    def setUp(self):
87        self.tar = tarfile.open(self.tarname, mode=self.mode,
88                                encoding="iso8859-1")
89
90    def tearDown(self):
91        self.tar.close()
92
93
94class UstarReadTest(ReadTest, unittest.TestCase):
95
96    def test_fileobj_regular_file(self):
97        tarinfo = self.tar.getmember("ustar/regtype")
98        with self.tar.extractfile(tarinfo) as fobj:
99            data = fobj.read()
100            self.assertEqual(len(data), tarinfo.size,
101                    "regular file extraction failed")
102            self.assertEqual(sha256sum(data), sha256_regtype,
103                    "regular file extraction failed")
104
105    def test_fileobj_readlines(self):
106        self.tar.extract("ustar/regtype", TEMPDIR)
107        tarinfo = self.tar.getmember("ustar/regtype")
108        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
109            lines1 = fobj1.readlines()
110
111        with self.tar.extractfile(tarinfo) as fobj:
112            fobj2 = io.TextIOWrapper(fobj)
113            lines2 = fobj2.readlines()
114            self.assertEqual(lines1, lines2,
115                    "fileobj.readlines() failed")
116            self.assertEqual(len(lines2), 114,
117                    "fileobj.readlines() failed")
118            self.assertEqual(lines2[83],
119                    "I will gladly admit that Python is not the fastest "
120                    "running scripting language.\n",
121                    "fileobj.readlines() failed")
122
123    def test_fileobj_iter(self):
124        self.tar.extract("ustar/regtype", TEMPDIR)
125        tarinfo = self.tar.getmember("ustar/regtype")
126        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
127            lines1 = fobj1.readlines()
128        with self.tar.extractfile(tarinfo) as fobj2:
129            lines2 = list(io.TextIOWrapper(fobj2))
130            self.assertEqual(lines1, lines2,
131                    "fileobj.__iter__() failed")
132
133    def test_fileobj_seek(self):
134        self.tar.extract("ustar/regtype", TEMPDIR)
135        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
136            data = fobj.read()
137
138        tarinfo = self.tar.getmember("ustar/regtype")
139        with self.tar.extractfile(tarinfo) as fobj:
140            text = fobj.read()
141            fobj.seek(0)
142            self.assertEqual(0, fobj.tell(),
143                         "seek() to file's start failed")
144            fobj.seek(2048, 0)
145            self.assertEqual(2048, fobj.tell(),
146                         "seek() to absolute position failed")
147            fobj.seek(-1024, 1)
148            self.assertEqual(1024, fobj.tell(),
149                         "seek() to negative relative position failed")
150            fobj.seek(1024, 1)
151            self.assertEqual(2048, fobj.tell(),
152                         "seek() to positive relative position failed")
153            s = fobj.read(10)
154            self.assertEqual(s, data[2048:2058],
155                         "read() after seek failed")
156            fobj.seek(0, 2)
157            self.assertEqual(tarinfo.size, fobj.tell(),
158                         "seek() to file's end failed")
159            self.assertEqual(fobj.read(), b"",
160                         "read() at file's end did not return empty string")
161            fobj.seek(-tarinfo.size, 2)
162            self.assertEqual(0, fobj.tell(),
163                         "relative seek() to file's end failed")
164            fobj.seek(512)
165            s1 = fobj.readlines()
166            fobj.seek(512)
167            s2 = fobj.readlines()
168            self.assertEqual(s1, s2,
169                         "readlines() after seek failed")
170            fobj.seek(0)
171            self.assertEqual(len(fobj.readline()), fobj.tell(),
172                         "tell() after readline() failed")
173            fobj.seek(512)
174            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
175                         "tell() after seek() and readline() failed")
176            fobj.seek(0)
177            line = fobj.readline()
178            self.assertEqual(fobj.read(), data[len(line):],
179                         "read() after readline() failed")
180
181    def test_fileobj_text(self):
182        with self.tar.extractfile("ustar/regtype") as fobj:
183            fobj = io.TextIOWrapper(fobj)
184            data = fobj.read().encode("iso8859-1")
185            self.assertEqual(sha256sum(data), sha256_regtype)
186            try:
187                fobj.seek(100)
188            except AttributeError:
189                # Issue #13815: seek() complained about a missing
190                # flush() method.
191                self.fail("seeking failed in text mode")
192
193    # Test if symbolic and hard links are resolved by extractfile().  The
194    # test link members each point to a regular member whose data is
195    # supposed to be exported.
196    def _test_fileobj_link(self, lnktype, regtype):
197        with self.tar.extractfile(lnktype) as a, \
198             self.tar.extractfile(regtype) as b:
199            self.assertEqual(a.name, b.name)
200
201    def test_fileobj_link1(self):
202        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
203
204    def test_fileobj_link2(self):
205        self._test_fileobj_link("./ustar/linktest2/lnktype",
206                                "ustar/linktest1/regtype")
207
208    def test_fileobj_symlink1(self):
209        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
210
211    def test_fileobj_symlink2(self):
212        self._test_fileobj_link("./ustar/linktest2/symtype",
213                                "ustar/linktest1/regtype")
214
215    def test_issue14160(self):
216        self._test_fileobj_link("symtype2", "ustar/regtype")
217
218class GzipUstarReadTest(GzipTest, UstarReadTest):
219    pass
220
221class Bz2UstarReadTest(Bz2Test, UstarReadTest):
222    pass
223
224class LzmaUstarReadTest(LzmaTest, UstarReadTest):
225    pass
226
227
228class ListTest(ReadTest, unittest.TestCase):
229
230    # Override setUp to use default encoding (UTF-8)
231    def setUp(self):
232        self.tar = tarfile.open(self.tarname, mode=self.mode)
233
234    def test_list(self):
235        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
236        with support.swap_attr(sys, 'stdout', tio):
237            self.tar.list(verbose=False)
238        out = tio.detach().getvalue()
239        self.assertIn(b'ustar/conttype', out)
240        self.assertIn(b'ustar/regtype', out)
241        self.assertIn(b'ustar/lnktype', out)
242        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
243        self.assertIn(b'./ustar/linktest2/symtype', out)
244        self.assertIn(b'./ustar/linktest2/lnktype', out)
245        # Make sure it puts trailing slash for directory
246        self.assertIn(b'ustar/dirtype/', out)
247        self.assertIn(b'ustar/dirtype-with-size/', out)
248        # Make sure it is able to print unencodable characters
249        def conv(b):
250            s = b.decode(self.tar.encoding, 'surrogateescape')
251            return s.encode('ascii', 'backslashreplace')
252        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
254                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
255        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
256                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
257        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
258        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
259        # Make sure it prints files separated by one newline without any
260        # 'ls -l'-like accessories if verbose flag is not being used
261        # ...
262        # ustar/conttype
263        # ustar/regtype
264        # ...
265        self.assertRegex(out, br'ustar/conttype ?\r?\n'
266                              br'ustar/regtype ?\r?\n')
267        # Make sure it does not print the source of link without verbose flag
268        self.assertNotIn(b'link to', out)
269        self.assertNotIn(b'->', out)
270
271    def test_list_verbose(self):
272        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
273        with support.swap_attr(sys, 'stdout', tio):
274            self.tar.list(verbose=True)
275        out = tio.detach().getvalue()
276        # Make sure it prints files separated by one newline with 'ls -l'-like
277        # accessories if verbose flag is being used
278        # ...
279        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
280        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
281        # ...
282        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
283                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
284                               br'ustar/\w+type ?\r?\n') * 2)
285        # Make sure it prints the source of link with verbose flag
286        self.assertIn(b'ustar/symtype -> regtype', out)
287        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
288        self.assertIn(b'./ustar/linktest2/lnktype link to '
289                      b'./ustar/linktest1/regtype', out)
290        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
291                      (b'/123' * 125) + b'/longname', out)
292        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
293                      (b'/123' * 125) + b'/longname', out)
294
295    def test_list_members(self):
296        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
297        def members(tar):
298            for tarinfo in tar.getmembers():
299                if 'reg' in tarinfo.name:
300                    yield tarinfo
301        with support.swap_attr(sys, 'stdout', tio):
302            self.tar.list(verbose=False, members=members(self.tar))
303        out = tio.detach().getvalue()
304        self.assertIn(b'ustar/regtype', out)
305        self.assertNotIn(b'ustar/conttype', out)
306
307
308class GzipListTest(GzipTest, ListTest):
309    pass
310
311
312class Bz2ListTest(Bz2Test, ListTest):
313    pass
314
315
316class LzmaListTest(LzmaTest, ListTest):
317    pass
318
319
320class CommonReadTest(ReadTest):
321
322    def test_is_tarfile_erroneous(self):
323        with open(tmpname, "wb"):
324            pass
325
326        # is_tarfile works on filenames
327        self.assertFalse(tarfile.is_tarfile(tmpname))
328
329        # is_tarfile works on path-like objects
330        self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname)))
331
332        # is_tarfile works on file objects
333        with open(tmpname, "rb") as fobj:
334            self.assertFalse(tarfile.is_tarfile(fobj))
335
336        # is_tarfile works on file-like objects
337        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
338
339    def test_is_tarfile_valid(self):
340        # is_tarfile works on filenames
341        self.assertTrue(tarfile.is_tarfile(self.tarname))
342
343        # is_tarfile works on path-like objects
344        self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname)))
345
346        # is_tarfile works on file objects
347        with open(self.tarname, "rb") as fobj:
348            self.assertTrue(tarfile.is_tarfile(fobj))
349
350        # is_tarfile works on file-like objects
351        with open(self.tarname, "rb") as fobj:
352            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
353
354    def test_empty_tarfile(self):
355        # Test for issue6123: Allow opening empty archives.
356        # This test checks if tarfile.open() is able to open an empty tar
357        # archive successfully. Note that an empty tar archive is not the
358        # same as an empty file!
359        with tarfile.open(tmpname, self.mode.replace("r", "w")):
360            pass
361        try:
362            tar = tarfile.open(tmpname, self.mode)
363            tar.getnames()
364        except tarfile.ReadError:
365            self.fail("tarfile.open() failed on empty archive")
366        else:
367            self.assertListEqual(tar.getmembers(), [])
368        finally:
369            tar.close()
370
371    def test_non_existent_tarfile(self):
372        # Test for issue11513: prevent non-existent gzipped tarfiles raising
373        # multiple exceptions.
374        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
375            tarfile.open("xxx", self.mode)
376
377    def test_null_tarfile(self):
378        # Test for issue6123: Allow opening empty archives.
379        # This test guarantees that tarfile.open() does not treat an empty
380        # file as an empty tar archive.
381        with open(tmpname, "wb"):
382            pass
383        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
384        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
385
386    def test_ignore_zeros(self):
387        # Test TarFile's ignore_zeros option.
388        # generate 512 pseudorandom bytes
389        data = Random(0).randbytes(512)
390        for char in (b'\0', b'a'):
391            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
392            # are ignored correctly.
393            with self.open(tmpname, "w") as fobj:
394                fobj.write(char * 1024)
395                tarinfo = tarfile.TarInfo("foo")
396                tarinfo.size = len(data)
397                fobj.write(tarinfo.tobuf())
398                fobj.write(data)
399
400            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
401            try:
402                self.assertListEqual(tar.getnames(), ["foo"],
403                    "ignore_zeros=True should have skipped the %r-blocks" %
404                    char)
405            finally:
406                tar.close()
407
408    def test_premature_end_of_archive(self):
409        for size in (512, 600, 1024, 1200):
410            with tarfile.open(tmpname, "w:") as tar:
411                t = tarfile.TarInfo("foo")
412                t.size = 1024
413                tar.addfile(t, io.BytesIO(b"a" * 1024))
414
415            with open(tmpname, "r+b") as fobj:
416                fobj.truncate(size)
417
418            with tarfile.open(tmpname) as tar:
419                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
420                    for t in tar:
421                        pass
422
423            with tarfile.open(tmpname) as tar:
424                t = tar.next()
425
426                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
427                    tar.extract(t, TEMPDIR)
428
429                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
430                    tar.extractfile(t).read()
431
432    def test_length_zero_header(self):
433        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
434        # with an exception
435        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
436            with tarfile.open(support.findfile('recursion.tar')) as tar:
437                pass
438
439class MiscReadTestBase(CommonReadTest):
440    def requires_name_attribute(self):
441        pass
442
443    def test_no_name_argument(self):
444        self.requires_name_attribute()
445        with open(self.tarname, "rb") as fobj:
446            self.assertIsInstance(fobj.name, str)
447            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
448                self.assertIsInstance(tar.name, str)
449                self.assertEqual(tar.name, os.path.abspath(fobj.name))
450
451    def test_no_name_attribute(self):
452        with open(self.tarname, "rb") as fobj:
453            data = fobj.read()
454        fobj = io.BytesIO(data)
455        self.assertRaises(AttributeError, getattr, fobj, "name")
456        tar = tarfile.open(fileobj=fobj, mode=self.mode)
457        self.assertIsNone(tar.name)
458
459    def test_empty_name_attribute(self):
460        with open(self.tarname, "rb") as fobj:
461            data = fobj.read()
462        fobj = io.BytesIO(data)
463        fobj.name = ""
464        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
465            self.assertIsNone(tar.name)
466
467    def test_int_name_attribute(self):
468        # Issue 21044: tarfile.open() should handle fileobj with an integer
469        # 'name' attribute.
470        fd = os.open(self.tarname, os.O_RDONLY)
471        with open(fd, 'rb') as fobj:
472            self.assertIsInstance(fobj.name, int)
473            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
474                self.assertIsNone(tar.name)
475
476    def test_bytes_name_attribute(self):
477        self.requires_name_attribute()
478        tarname = os.fsencode(self.tarname)
479        with open(tarname, 'rb') as fobj:
480            self.assertIsInstance(fobj.name, bytes)
481            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
482                self.assertIsInstance(tar.name, bytes)
483                self.assertEqual(tar.name, os.path.abspath(fobj.name))
484
485    def test_pathlike_name(self):
486        tarname = pathlib.Path(self.tarname)
487        with tarfile.open(tarname, mode=self.mode) as tar:
488            self.assertIsInstance(tar.name, str)
489            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
490        with self.taropen(tarname) as tar:
491            self.assertIsInstance(tar.name, str)
492            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
493        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
494            self.assertIsInstance(tar.name, str)
495            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
496        if self.suffix == '':
497            with tarfile.TarFile(tarname, mode='r') as tar:
498                self.assertIsInstance(tar.name, str)
499                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
500
501    def test_illegal_mode_arg(self):
502        with open(tmpname, 'wb'):
503            pass
504        with self.assertRaisesRegex(ValueError, 'mode must be '):
505            tar = self.taropen(tmpname, 'q')
506        with self.assertRaisesRegex(ValueError, 'mode must be '):
507            tar = self.taropen(tmpname, 'rw')
508        with self.assertRaisesRegex(ValueError, 'mode must be '):
509            tar = self.taropen(tmpname, '')
510
511    def test_fileobj_with_offset(self):
512        # Skip the first member and store values from the second member
513        # of the testtar.
514        tar = tarfile.open(self.tarname, mode=self.mode)
515        try:
516            tar.next()
517            t = tar.next()
518            name = t.name
519            offset = t.offset
520            with tar.extractfile(t) as f:
521                data = f.read()
522        finally:
523            tar.close()
524
525        # Open the testtar and seek to the offset of the second member.
526        with self.open(self.tarname) as fobj:
527            fobj.seek(offset)
528
529            # Test if the tarfile starts with the second member.
530            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
531                t = tar.next()
532                self.assertEqual(t.name, name)
533                # Read to the end of fileobj and test if seeking back to the
534                # beginning works.
535                tar.getmembers()
536                self.assertEqual(tar.extractfile(t).read(), data,
537                        "seek back did not work")
538
539    def test_fail_comp(self):
540        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
541        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
542        with open(tarname, "rb") as fobj:
543            self.assertRaises(tarfile.ReadError, tarfile.open,
544                              fileobj=fobj, mode=self.mode)
545
546    def test_v7_dirtype(self):
547        # Test old style dirtype member (bug #1336623):
548        # Old V7 tars create directory members using an AREGTYPE
549        # header with a "/" appended to the filename field.
550        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
551        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
552                "v7 dirtype failed")
553
554    def test_xstar_type(self):
555        # The xstar format stores extra atime and ctime fields inside the
556        # space reserved for the prefix field. The prefix field must be
557        # ignored in this case, otherwise it will mess up the name.
558        try:
559            self.tar.getmember("misc/regtype-xstar")
560        except KeyError:
561            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
562
563    def test_check_members(self):
564        for tarinfo in self.tar:
565            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
566                    "wrong mtime for %s" % tarinfo.name)
567            if not tarinfo.name.startswith("ustar/"):
568                continue
569            self.assertEqual(tarinfo.uname, "tarfile",
570                    "wrong uname for %s" % tarinfo.name)
571
572    def test_find_members(self):
573        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
574                "could not find all members")
575
576    @unittest.skipUnless(hasattr(os, "link"),
577                         "Missing hardlink implementation")
578    @support.skip_unless_symlink
579    def test_extract_hardlink(self):
580        # Test hardlink extraction (e.g. bug #857297).
581        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
582            tar.extract("ustar/regtype", TEMPDIR)
583            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
584
585            tar.extract("ustar/lnktype", TEMPDIR)
586            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
587            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
588                data = f.read()
589            self.assertEqual(sha256sum(data), sha256_regtype)
590
591            tar.extract("ustar/symtype", TEMPDIR)
592            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
593            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
594                data = f.read()
595            self.assertEqual(sha256sum(data), sha256_regtype)
596
597    def test_extractall(self):
598        # Test if extractall() correctly restores directory permissions
599        # and times (see issue1735).
600        tar = tarfile.open(tarname, encoding="iso8859-1")
601        DIR = os.path.join(TEMPDIR, "extractall")
602        os.mkdir(DIR)
603        try:
604            directories = [t for t in tar if t.isdir()]
605            tar.extractall(DIR, directories)
606            for tarinfo in directories:
607                path = os.path.join(DIR, tarinfo.name)
608                if sys.platform != "win32":
609                    # Win32 has no support for fine grained permissions.
610                    self.assertEqual(tarinfo.mode & 0o777,
611                                     os.stat(path).st_mode & 0o777)
612                def format_mtime(mtime):
613                    if isinstance(mtime, float):
614                        return "{} ({})".format(mtime, mtime.hex())
615                    else:
616                        return "{!r} (int)".format(mtime)
617                file_mtime = os.path.getmtime(path)
618                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
619                    format_mtime(tarinfo.mtime),
620                    format_mtime(file_mtime),
621                    path)
622                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
623        finally:
624            tar.close()
625            support.rmtree(DIR)
626
627    def test_extract_directory(self):
628        dirtype = "ustar/dirtype"
629        DIR = os.path.join(TEMPDIR, "extractdir")
630        os.mkdir(DIR)
631        try:
632            with tarfile.open(tarname, encoding="iso8859-1") as tar:
633                tarinfo = tar.getmember(dirtype)
634                tar.extract(tarinfo, path=DIR)
635                extracted = os.path.join(DIR, dirtype)
636                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
637                if sys.platform != "win32":
638                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
639        finally:
640            support.rmtree(DIR)
641
642    def test_extractall_pathlike_name(self):
643        DIR = pathlib.Path(TEMPDIR) / "extractall"
644        with support.temp_dir(DIR), \
645             tarfile.open(tarname, encoding="iso8859-1") as tar:
646            directories = [t for t in tar if t.isdir()]
647            tar.extractall(DIR, directories)
648            for tarinfo in directories:
649                path = DIR / tarinfo.name
650                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
651
652    def test_extract_pathlike_name(self):
653        dirtype = "ustar/dirtype"
654        DIR = pathlib.Path(TEMPDIR) / "extractall"
655        with support.temp_dir(DIR), \
656             tarfile.open(tarname, encoding="iso8859-1") as tar:
657            tarinfo = tar.getmember(dirtype)
658            tar.extract(tarinfo, path=DIR)
659            extracted = DIR / dirtype
660            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
661
662    def test_init_close_fobj(self):
663        # Issue #7341: Close the internal file object in the TarFile
664        # constructor in case of an error. For the test we rely on
665        # the fact that opening an empty file raises a ReadError.
666        empty = os.path.join(TEMPDIR, "empty")
667        with open(empty, "wb") as fobj:
668            fobj.write(b"")
669
670        try:
671            tar = object.__new__(tarfile.TarFile)
672            try:
673                tar.__init__(empty)
674            except tarfile.ReadError:
675                self.assertTrue(tar.fileobj.closed)
676            else:
677                self.fail("ReadError not raised")
678        finally:
679            support.unlink(empty)
680
681    def test_parallel_iteration(self):
682        # Issue #16601: Restarting iteration over tarfile continued
683        # from where it left off.
684        with tarfile.open(self.tarname) as tar:
685            for m1, m2 in zip(tar, tar):
686                self.assertEqual(m1.offset, m2.offset)
687                self.assertEqual(m1.get_info(), m2.get_info())
688
689class MiscReadTest(MiscReadTestBase, unittest.TestCase):
690    test_fail_comp = None
691
692class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
693    pass
694
695class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
696    def requires_name_attribute(self):
697        self.skipTest("BZ2File have no name attribute")
698
699class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
700    def requires_name_attribute(self):
701        self.skipTest("LZMAFile have no name attribute")
702
703
704class StreamReadTest(CommonReadTest, unittest.TestCase):
705
706    prefix="r|"
707
708    def test_read_through(self):
709        # Issue #11224: A poorly designed _FileInFile.read() method
710        # caused seeking errors with stream tar files.
711        for tarinfo in self.tar:
712            if not tarinfo.isreg():
713                continue
714            with self.tar.extractfile(tarinfo) as fobj:
715                while True:
716                    try:
717                        buf = fobj.read(512)
718                    except tarfile.StreamError:
719                        self.fail("simple read-through using "
720                                  "TarFile.extractfile() failed")
721                    if not buf:
722                        break
723
724    def test_fileobj_regular_file(self):
725        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
726        with self.tar.extractfile(tarinfo) as fobj:
727            data = fobj.read()
728        self.assertEqual(len(data), tarinfo.size,
729                "regular file extraction failed")
730        self.assertEqual(sha256sum(data), sha256_regtype,
731                "regular file extraction failed")
732
733    def test_provoke_stream_error(self):
734        tarinfos = self.tar.getmembers()
735        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
736            self.assertRaises(tarfile.StreamError, f.read)
737
738    def test_compare_members(self):
739        tar1 = tarfile.open(tarname, encoding="iso8859-1")
740        try:
741            tar2 = self.tar
742
743            while True:
744                t1 = tar1.next()
745                t2 = tar2.next()
746                if t1 is None:
747                    break
748                self.assertIsNotNone(t2, "stream.next() failed.")
749
750                if t2.islnk() or t2.issym():
751                    with self.assertRaises(tarfile.StreamError):
752                        tar2.extractfile(t2)
753                    continue
754
755                v1 = tar1.extractfile(t1)
756                v2 = tar2.extractfile(t2)
757                if v1 is None:
758                    continue
759                self.assertIsNotNone(v2, "stream.extractfile() failed")
760                self.assertEqual(v1.read(), v2.read(),
761                        "stream extraction failed")
762        finally:
763            tar1.close()
764
765class GzipStreamReadTest(GzipTest, StreamReadTest):
766    pass
767
768class Bz2StreamReadTest(Bz2Test, StreamReadTest):
769    pass
770
771class LzmaStreamReadTest(LzmaTest, StreamReadTest):
772    pass
773
774
775class DetectReadTest(TarTest, unittest.TestCase):
776    def _testfunc_file(self, name, mode):
777        try:
778            tar = tarfile.open(name, mode)
779        except tarfile.ReadError as e:
780            self.fail()
781        else:
782            tar.close()
783
784    def _testfunc_fileobj(self, name, mode):
785        try:
786            with open(name, "rb") as f:
787                tar = tarfile.open(name, mode, fileobj=f)
788        except tarfile.ReadError as e:
789            self.fail()
790        else:
791            tar.close()
792
793    def _test_modes(self, testfunc):
794        if self.suffix:
795            with self.assertRaises(tarfile.ReadError):
796                tarfile.open(tarname, mode="r:" + self.suffix)
797            with self.assertRaises(tarfile.ReadError):
798                tarfile.open(tarname, mode="r|" + self.suffix)
799            with self.assertRaises(tarfile.ReadError):
800                tarfile.open(self.tarname, mode="r:")
801            with self.assertRaises(tarfile.ReadError):
802                tarfile.open(self.tarname, mode="r|")
803        testfunc(self.tarname, "r")
804        testfunc(self.tarname, "r:" + self.suffix)
805        testfunc(self.tarname, "r:*")
806        testfunc(self.tarname, "r|" + self.suffix)
807        testfunc(self.tarname, "r|*")
808
809    def test_detect_file(self):
810        self._test_modes(self._testfunc_file)
811
812    def test_detect_fileobj(self):
813        self._test_modes(self._testfunc_fileobj)
814
815class GzipDetectReadTest(GzipTest, DetectReadTest):
816    pass
817
818class Bz2DetectReadTest(Bz2Test, DetectReadTest):
819    def test_detect_stream_bz2(self):
820        # Originally, tarfile's stream detection looked for the string
821        # "BZh91" at the start of the file. This is incorrect because
822        # the '9' represents the blocksize (900,000 bytes). If the file was
823        # compressed using another blocksize autodetection fails.
824        with open(tarname, "rb") as fobj:
825            data = fobj.read()
826
827        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
828        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
829            fobj.write(data)
830
831        self._testfunc_file(tmpname, "r|*")
832
833class LzmaDetectReadTest(LzmaTest, DetectReadTest):
834    pass
835
836
837class MemberReadTest(ReadTest, unittest.TestCase):
838
839    def _test_member(self, tarinfo, chksum=None, **kwargs):
840        if chksum is not None:
841            with self.tar.extractfile(tarinfo) as f:
842                self.assertEqual(sha256sum(f.read()), chksum,
843                        "wrong sha256sum for %s" % tarinfo.name)
844
845        kwargs["mtime"] = 0o7606136617
846        kwargs["uid"] = 1000
847        kwargs["gid"] = 100
848        if "old-v7" not in tarinfo.name:
849            # V7 tar can't handle alphabetic owners.
850            kwargs["uname"] = "tarfile"
851            kwargs["gname"] = "tarfile"
852        for k, v in kwargs.items():
853            self.assertEqual(getattr(tarinfo, k), v,
854                    "wrong value in %s field of %s" % (k, tarinfo.name))
855
856    def test_find_regtype(self):
857        tarinfo = self.tar.getmember("ustar/regtype")
858        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
859
860    def test_find_conttype(self):
861        tarinfo = self.tar.getmember("ustar/conttype")
862        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
863
864    def test_find_dirtype(self):
865        tarinfo = self.tar.getmember("ustar/dirtype")
866        self._test_member(tarinfo, size=0)
867
868    def test_find_dirtype_with_size(self):
869        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
870        self._test_member(tarinfo, size=255)
871
872    def test_find_lnktype(self):
873        tarinfo = self.tar.getmember("ustar/lnktype")
874        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
875
876    def test_find_symtype(self):
877        tarinfo = self.tar.getmember("ustar/symtype")
878        self._test_member(tarinfo, size=0, linkname="regtype")
879
880    def test_find_blktype(self):
881        tarinfo = self.tar.getmember("ustar/blktype")
882        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
883
884    def test_find_chrtype(self):
885        tarinfo = self.tar.getmember("ustar/chrtype")
886        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
887
888    def test_find_fifotype(self):
889        tarinfo = self.tar.getmember("ustar/fifotype")
890        self._test_member(tarinfo, size=0)
891
892    def test_find_sparse(self):
893        tarinfo = self.tar.getmember("ustar/sparse")
894        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
895
896    def test_find_gnusparse(self):
897        tarinfo = self.tar.getmember("gnu/sparse")
898        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
899
900    def test_find_gnusparse_00(self):
901        tarinfo = self.tar.getmember("gnu/sparse-0.0")
902        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
903
904    def test_find_gnusparse_01(self):
905        tarinfo = self.tar.getmember("gnu/sparse-0.1")
906        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
907
908    def test_find_gnusparse_10(self):
909        tarinfo = self.tar.getmember("gnu/sparse-1.0")
910        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
911
912    def test_find_umlauts(self):
913        tarinfo = self.tar.getmember("ustar/umlauts-"
914                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
915        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
916
917    def test_find_ustar_longname(self):
918        name = "ustar/" + "12345/" * 39 + "1234567/longname"
919        self.assertIn(name, self.tar.getnames())
920
921    def test_find_regtype_oldv7(self):
922        tarinfo = self.tar.getmember("misc/regtype-old-v7")
923        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
924
925    def test_find_pax_umlauts(self):
926        self.tar.close()
927        self.tar = tarfile.open(self.tarname, mode=self.mode,
928                                encoding="iso8859-1")
929        tarinfo = self.tar.getmember("pax/umlauts-"
930                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
931        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
932
933
934class LongnameTest:
935
936    def test_read_longname(self):
937        # Test reading of longname (bug #1471427).
938        longname = self.subdir + "/" + "123/" * 125 + "longname"
939        try:
940            tarinfo = self.tar.getmember(longname)
941        except KeyError:
942            self.fail("longname not found")
943        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
944                "read longname as dirtype")
945
946    def test_read_longlink(self):
947        longname = self.subdir + "/" + "123/" * 125 + "longname"
948        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
949        try:
950            tarinfo = self.tar.getmember(longlink)
951        except KeyError:
952            self.fail("longlink not found")
953        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
954
955    def test_truncated_longname(self):
956        longname = self.subdir + "/" + "123/" * 125 + "longname"
957        tarinfo = self.tar.getmember(longname)
958        offset = tarinfo.offset
959        self.tar.fileobj.seek(offset)
960        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
961        with self.assertRaises(tarfile.ReadError):
962            tarfile.open(name="foo.tar", fileobj=fobj)
963
964    def test_header_offset(self):
965        # Test if the start offset of the TarInfo object includes
966        # the preceding extended header.
967        longname = self.subdir + "/" + "123/" * 125 + "longname"
968        offset = self.tar.getmember(longname).offset
969        with open(tarname, "rb") as fobj:
970            fobj.seek(offset)
971            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
972                                              "iso8859-1", "strict")
973            self.assertEqual(tarinfo.type, self.longnametype)
974
975
976class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
977
978    subdir = "gnu"
979    longnametype = tarfile.GNUTYPE_LONGNAME
980
981    # Since 3.2 tarfile is supposed to accurately restore sparse members and
982    # produce files with holes. This is what we actually want to test here.
983    # Unfortunately, not all platforms/filesystems support sparse files, and
984    # even on platforms that do it is non-trivial to make reliable assertions
985    # about holes in files. Therefore, we first do one basic test which works
986    # an all platforms, and after that a test that will work only on
987    # platforms/filesystems that prove to support sparse files.
988    def _test_sparse_file(self, name):
989        self.tar.extract(name, TEMPDIR)
990        filename = os.path.join(TEMPDIR, name)
991        with open(filename, "rb") as fobj:
992            data = fobj.read()
993        self.assertEqual(sha256sum(data), sha256_sparse,
994                "wrong sha256sum for %s" % name)
995
996        if self._fs_supports_holes():
997            s = os.stat(filename)
998            self.assertLess(s.st_blocks * 512, s.st_size)
999
1000    def test_sparse_file_old(self):
1001        self._test_sparse_file("gnu/sparse")
1002
1003    def test_sparse_file_00(self):
1004        self._test_sparse_file("gnu/sparse-0.0")
1005
1006    def test_sparse_file_01(self):
1007        self._test_sparse_file("gnu/sparse-0.1")
1008
1009    def test_sparse_file_10(self):
1010        self._test_sparse_file("gnu/sparse-1.0")
1011
1012    @staticmethod
1013    def _fs_supports_holes():
1014        # Return True if the platform knows the st_blocks stat attribute and
1015        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1016        # store holes of 4 KiB in files.
1017        #
1018        # The function returns False if page size is larger than 4 KiB.
1019        # For example, ppc64 uses pages of 64 KiB.
1020        if sys.platform.startswith("linux"):
1021            # Linux evidentially has 512 byte st_blocks units.
1022            name = os.path.join(TEMPDIR, "sparse-test")
1023            with open(name, "wb") as fobj:
1024                # Seek to "punch a hole" of 4 KiB
1025                fobj.seek(4096)
1026                fobj.write(b'x' * 4096)
1027                fobj.truncate()
1028            s = os.stat(name)
1029            support.unlink(name)
1030            return (s.st_blocks * 512 < s.st_size)
1031        else:
1032            return False
1033
1034
1035class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1036
1037    subdir = "pax"
1038    longnametype = tarfile.XHDTYPE
1039
1040    def test_pax_global_headers(self):
1041        tar = tarfile.open(tarname, encoding="iso8859-1")
1042        try:
1043            tarinfo = tar.getmember("pax/regtype1")
1044            self.assertEqual(tarinfo.uname, "foo")
1045            self.assertEqual(tarinfo.gname, "bar")
1046            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1047                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1048
1049            tarinfo = tar.getmember("pax/regtype2")
1050            self.assertEqual(tarinfo.uname, "")
1051            self.assertEqual(tarinfo.gname, "bar")
1052            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1053                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1054
1055            tarinfo = tar.getmember("pax/regtype3")
1056            self.assertEqual(tarinfo.uname, "tarfile")
1057            self.assertEqual(tarinfo.gname, "tarfile")
1058            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1059                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1060        finally:
1061            tar.close()
1062
1063    def test_pax_number_fields(self):
1064        # All following number fields are read from the pax header.
1065        tar = tarfile.open(tarname, encoding="iso8859-1")
1066        try:
1067            tarinfo = tar.getmember("pax/regtype4")
1068            self.assertEqual(tarinfo.size, 7011)
1069            self.assertEqual(tarinfo.uid, 123)
1070            self.assertEqual(tarinfo.gid, 123)
1071            self.assertEqual(tarinfo.mtime, 1041808783.0)
1072            self.assertEqual(type(tarinfo.mtime), float)
1073            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1074            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1075        finally:
1076            tar.close()
1077
1078
1079class WriteTestBase(TarTest):
1080    # Put all write tests in here that are supposed to be tested
1081    # in all possible mode combinations.
1082
1083    def test_fileobj_no_close(self):
1084        fobj = io.BytesIO()
1085        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1086            tar.addfile(tarfile.TarInfo("foo"))
1087        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1088        # Issue #20238: Incomplete gzip output with mode="w:gz"
1089        data = fobj.getvalue()
1090        del tar
1091        support.gc_collect()
1092        self.assertFalse(fobj.closed)
1093        self.assertEqual(data, fobj.getvalue())
1094
1095    def test_eof_marker(self):
1096        # Make sure an end of archive marker is written (two zero blocks).
1097        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1098        # So, we create an archive that has exactly 10240 bytes without the
1099        # marker, and has 20480 bytes once the marker is written.
1100        with tarfile.open(tmpname, self.mode) as tar:
1101            t = tarfile.TarInfo("foo")
1102            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1103            tar.addfile(t, io.BytesIO(b"a" * t.size))
1104
1105        with self.open(tmpname, "rb") as fobj:
1106            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1107
1108
1109class WriteTest(WriteTestBase, unittest.TestCase):
1110
1111    prefix = "w:"
1112
1113    def test_100_char_name(self):
1114        # The name field in a tar header stores strings of at most 100 chars.
1115        # If a string is shorter than 100 chars it has to be padded with '\0',
1116        # which implies that a string of exactly 100 chars is stored without
1117        # a trailing '\0'.
1118        name = "0123456789" * 10
1119        tar = tarfile.open(tmpname, self.mode)
1120        try:
1121            t = tarfile.TarInfo(name)
1122            tar.addfile(t)
1123        finally:
1124            tar.close()
1125
1126        tar = tarfile.open(tmpname)
1127        try:
1128            self.assertEqual(tar.getnames()[0], name,
1129                    "failed to store 100 char filename")
1130        finally:
1131            tar.close()
1132
1133    def test_tar_size(self):
1134        # Test for bug #1013882.
1135        tar = tarfile.open(tmpname, self.mode)
1136        try:
1137            path = os.path.join(TEMPDIR, "file")
1138            with open(path, "wb") as fobj:
1139                fobj.write(b"aaa")
1140            tar.add(path)
1141        finally:
1142            tar.close()
1143        self.assertGreater(os.path.getsize(tmpname), 0,
1144                "tarfile is empty")
1145
1146    # The test_*_size tests test for bug #1167128.
1147    def test_file_size(self):
1148        tar = tarfile.open(tmpname, self.mode)
1149        try:
1150            path = os.path.join(TEMPDIR, "file")
1151            with open(path, "wb"):
1152                pass
1153            tarinfo = tar.gettarinfo(path)
1154            self.assertEqual(tarinfo.size, 0)
1155
1156            with open(path, "wb") as fobj:
1157                fobj.write(b"aaa")
1158            tarinfo = tar.gettarinfo(path)
1159            self.assertEqual(tarinfo.size, 3)
1160        finally:
1161            tar.close()
1162
1163    def test_directory_size(self):
1164        path = os.path.join(TEMPDIR, "directory")
1165        os.mkdir(path)
1166        try:
1167            tar = tarfile.open(tmpname, self.mode)
1168            try:
1169                tarinfo = tar.gettarinfo(path)
1170                self.assertEqual(tarinfo.size, 0)
1171            finally:
1172                tar.close()
1173        finally:
1174            support.rmdir(path)
1175
1176    # mock the following:
1177    #  os.listdir: so we know that files are in the wrong order
1178    def test_ordered_recursion(self):
1179        path = os.path.join(TEMPDIR, "directory")
1180        os.mkdir(path)
1181        open(os.path.join(path, "1"), "a").close()
1182        open(os.path.join(path, "2"), "a").close()
1183        try:
1184            tar = tarfile.open(tmpname, self.mode)
1185            try:
1186                with unittest.mock.patch('os.listdir') as mock_listdir:
1187                    mock_listdir.return_value = ["2", "1"]
1188                    tar.add(path)
1189                paths = []
1190                for m in tar.getmembers():
1191                    paths.append(os.path.split(m.name)[-1])
1192                self.assertEqual(paths, ["directory", "1", "2"]);
1193            finally:
1194                tar.close()
1195        finally:
1196            support.unlink(os.path.join(path, "1"))
1197            support.unlink(os.path.join(path, "2"))
1198            support.rmdir(path)
1199
1200    def test_gettarinfo_pathlike_name(self):
1201        with tarfile.open(tmpname, self.mode) as tar:
1202            path = pathlib.Path(TEMPDIR) / "file"
1203            with open(path, "wb") as fobj:
1204                fobj.write(b"aaa")
1205            tarinfo = tar.gettarinfo(path)
1206            tarinfo2 = tar.gettarinfo(os.fspath(path))
1207            self.assertIsInstance(tarinfo.name, str)
1208            self.assertEqual(tarinfo.name, tarinfo2.name)
1209            self.assertEqual(tarinfo.size, 3)
1210
1211    @unittest.skipUnless(hasattr(os, "link"),
1212                         "Missing hardlink implementation")
1213    def test_link_size(self):
1214        link = os.path.join(TEMPDIR, "link")
1215        target = os.path.join(TEMPDIR, "link_target")
1216        with open(target, "wb") as fobj:
1217            fobj.write(b"aaa")
1218        try:
1219            os.link(target, link)
1220        except PermissionError as e:
1221            self.skipTest('os.link(): %s' % e)
1222        try:
1223            tar = tarfile.open(tmpname, self.mode)
1224            try:
1225                # Record the link target in the inodes list.
1226                tar.gettarinfo(target)
1227                tarinfo = tar.gettarinfo(link)
1228                self.assertEqual(tarinfo.size, 0)
1229            finally:
1230                tar.close()
1231        finally:
1232            support.unlink(target)
1233            support.unlink(link)
1234
1235    @support.skip_unless_symlink
1236    def test_symlink_size(self):
1237        path = os.path.join(TEMPDIR, "symlink")
1238        os.symlink("link_target", path)
1239        try:
1240            tar = tarfile.open(tmpname, self.mode)
1241            try:
1242                tarinfo = tar.gettarinfo(path)
1243                self.assertEqual(tarinfo.size, 0)
1244            finally:
1245                tar.close()
1246        finally:
1247            support.unlink(path)
1248
1249    def test_add_self(self):
1250        # Test for #1257255.
1251        dstname = os.path.abspath(tmpname)
1252        tar = tarfile.open(tmpname, self.mode)
1253        try:
1254            self.assertEqual(tar.name, dstname,
1255                    "archive name must be absolute")
1256            tar.add(dstname)
1257            self.assertEqual(tar.getnames(), [],
1258                    "added the archive to itself")
1259
1260            with support.change_cwd(TEMPDIR):
1261                tar.add(dstname)
1262            self.assertEqual(tar.getnames(), [],
1263                    "added the archive to itself")
1264        finally:
1265            tar.close()
1266
1267    def test_filter(self):
1268        tempdir = os.path.join(TEMPDIR, "filter")
1269        os.mkdir(tempdir)
1270        try:
1271            for name in ("foo", "bar", "baz"):
1272                name = os.path.join(tempdir, name)
1273                support.create_empty_file(name)
1274
1275            def filter(tarinfo):
1276                if os.path.basename(tarinfo.name) == "bar":
1277                    return
1278                tarinfo.uid = 123
1279                tarinfo.uname = "foo"
1280                return tarinfo
1281
1282            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1283            try:
1284                tar.add(tempdir, arcname="empty_dir", filter=filter)
1285            finally:
1286                tar.close()
1287
1288            # Verify that filter is a keyword-only argument
1289            with self.assertRaises(TypeError):
1290                tar.add(tempdir, "empty_dir", True, None, filter)
1291
1292            tar = tarfile.open(tmpname, "r")
1293            try:
1294                for tarinfo in tar:
1295                    self.assertEqual(tarinfo.uid, 123)
1296                    self.assertEqual(tarinfo.uname, "foo")
1297                self.assertEqual(len(tar.getmembers()), 3)
1298            finally:
1299                tar.close()
1300        finally:
1301            support.rmtree(tempdir)
1302
1303    # Guarantee that stored pathnames are not modified. Don't
1304    # remove ./ or ../ or double slashes. Still make absolute
1305    # pathnames relative.
1306    # For details see bug #6054.
1307    def _test_pathname(self, path, cmp_path=None, dir=False):
1308        # Create a tarfile with an empty member named path
1309        # and compare the stored name with the original.
1310        foo = os.path.join(TEMPDIR, "foo")
1311        if not dir:
1312            support.create_empty_file(foo)
1313        else:
1314            os.mkdir(foo)
1315
1316        tar = tarfile.open(tmpname, self.mode)
1317        try:
1318            tar.add(foo, arcname=path)
1319        finally:
1320            tar.close()
1321
1322        tar = tarfile.open(tmpname, "r")
1323        try:
1324            t = tar.next()
1325        finally:
1326            tar.close()
1327
1328        if not dir:
1329            support.unlink(foo)
1330        else:
1331            support.rmdir(foo)
1332
1333        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1334
1335
1336    @support.skip_unless_symlink
1337    def test_extractall_symlinks(self):
1338        # Test if extractall works properly when tarfile contains symlinks
1339        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1340        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1341        os.mkdir(tempdir)
1342        try:
1343            source_file = os.path.join(tempdir,'source')
1344            target_file = os.path.join(tempdir,'symlink')
1345            with open(source_file,'w') as f:
1346                f.write('something\n')
1347            os.symlink(source_file, target_file)
1348            with tarfile.open(temparchive, 'w') as tar:
1349                tar.add(source_file, arcname="source")
1350                tar.add(target_file, arcname="symlink")
1351            # Let's extract it to the location which contains the symlink
1352            with tarfile.open(temparchive, errorlevel=2) as tar:
1353                # this should not raise OSError: [Errno 17] File exists
1354                try:
1355                    tar.extractall(path=tempdir)
1356                except OSError:
1357                    self.fail("extractall failed with symlinked files")
1358        finally:
1359            support.unlink(temparchive)
1360            support.rmtree(tempdir)
1361
1362    def test_pathnames(self):
1363        self._test_pathname("foo")
1364        self._test_pathname(os.path.join("foo", ".", "bar"))
1365        self._test_pathname(os.path.join("foo", "..", "bar"))
1366        self._test_pathname(os.path.join(".", "foo"))
1367        self._test_pathname(os.path.join(".", "foo", "."))
1368        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1369        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1370        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1371        self._test_pathname(os.path.join("..", "foo"))
1372        self._test_pathname(os.path.join("..", "foo", ".."))
1373        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1374        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1375
1376        self._test_pathname("foo" + os.sep + os.sep + "bar")
1377        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1378
1379    def test_abs_pathnames(self):
1380        if sys.platform == "win32":
1381            self._test_pathname("C:\\foo", "foo")
1382        else:
1383            self._test_pathname("/foo", "foo")
1384            self._test_pathname("///foo", "foo")
1385
1386    def test_cwd(self):
1387        # Test adding the current working directory.
1388        with support.change_cwd(TEMPDIR):
1389            tar = tarfile.open(tmpname, self.mode)
1390            try:
1391                tar.add(".")
1392            finally:
1393                tar.close()
1394
1395            tar = tarfile.open(tmpname, "r")
1396            try:
1397                for t in tar:
1398                    if t.name != ".":
1399                        self.assertTrue(t.name.startswith("./"), t.name)
1400            finally:
1401                tar.close()
1402
1403    def test_open_nonwritable_fileobj(self):
1404        for exctype in OSError, EOFError, RuntimeError:
1405            class BadFile(io.BytesIO):
1406                first = True
1407                def write(self, data):
1408                    if self.first:
1409                        self.first = False
1410                        raise exctype
1411
1412            f = BadFile()
1413            with self.assertRaises(exctype):
1414                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1415                                   format=tarfile.PAX_FORMAT,
1416                                   pax_headers={'non': 'empty'})
1417            self.assertFalse(f.closed)
1418
1419
1420class GzipWriteTest(GzipTest, WriteTest):
1421    pass
1422
1423
1424class Bz2WriteTest(Bz2Test, WriteTest):
1425    pass
1426
1427
1428class LzmaWriteTest(LzmaTest, WriteTest):
1429    pass
1430
1431
1432class StreamWriteTest(WriteTestBase, unittest.TestCase):
1433
1434    prefix = "w|"
1435    decompressor = None
1436
1437    def test_stream_padding(self):
1438        # Test for bug #1543303.
1439        tar = tarfile.open(tmpname, self.mode)
1440        tar.close()
1441        if self.decompressor:
1442            dec = self.decompressor()
1443            with open(tmpname, "rb") as fobj:
1444                data = fobj.read()
1445            data = dec.decompress(data)
1446            self.assertFalse(dec.unused_data, "found trailing data")
1447        else:
1448            with self.open(tmpname) as fobj:
1449                data = fobj.read()
1450        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1451                        "incorrect zero padding")
1452
1453    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1454                         "Missing umask implementation")
1455    def test_file_mode(self):
1456        # Test for issue #8464: Create files with correct
1457        # permissions.
1458        if os.path.exists(tmpname):
1459            support.unlink(tmpname)
1460
1461        original_umask = os.umask(0o022)
1462        try:
1463            tar = tarfile.open(tmpname, self.mode)
1464            tar.close()
1465            mode = os.stat(tmpname).st_mode & 0o777
1466            self.assertEqual(mode, 0o644, "wrong file permissions")
1467        finally:
1468            os.umask(original_umask)
1469
1470
1471class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1472    def test_source_directory_not_leaked(self):
1473        """
1474        Ensure the source directory is not included in the tar header
1475        per bpo-41316.
1476        """
1477        tarfile.open(tmpname, self.mode).close()
1478        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1479        assert os.path.dirname(tmpname) not in payload
1480
1481
1482class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1483    decompressor = bz2.BZ2Decompressor if bz2 else None
1484
1485class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1486    decompressor = lzma.LZMADecompressor if lzma else None
1487
1488
1489class GNUWriteTest(unittest.TestCase):
1490    # This testcase checks for correct creation of GNU Longname
1491    # and Longlink extended headers (cp. bug #812325).
1492
1493    def _length(self, s):
1494        blocks = len(s) // 512 + 1
1495        return blocks * 512
1496
1497    def _calc_size(self, name, link=None):
1498        # Initial tar header
1499        count = 512
1500
1501        if len(name) > tarfile.LENGTH_NAME:
1502            # GNU longname extended header + longname
1503            count += 512
1504            count += self._length(name)
1505        if link is not None and len(link) > tarfile.LENGTH_LINK:
1506            # GNU longlink extended header + longlink
1507            count += 512
1508            count += self._length(link)
1509        return count
1510
1511    def _test(self, name, link=None):
1512        tarinfo = tarfile.TarInfo(name)
1513        if link:
1514            tarinfo.linkname = link
1515            tarinfo.type = tarfile.LNKTYPE
1516
1517        tar = tarfile.open(tmpname, "w")
1518        try:
1519            tar.format = tarfile.GNU_FORMAT
1520            tar.addfile(tarinfo)
1521
1522            v1 = self._calc_size(name, link)
1523            v2 = tar.offset
1524            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1525        finally:
1526            tar.close()
1527
1528        tar = tarfile.open(tmpname)
1529        try:
1530            member = tar.next()
1531            self.assertIsNotNone(member,
1532                    "unable to read longname member")
1533            self.assertEqual(tarinfo.name, member.name,
1534                    "unable to read longname member")
1535            self.assertEqual(tarinfo.linkname, member.linkname,
1536                    "unable to read longname member")
1537        finally:
1538            tar.close()
1539
1540    def test_longname_1023(self):
1541        self._test(("longnam/" * 127) + "longnam")
1542
1543    def test_longname_1024(self):
1544        self._test(("longnam/" * 127) + "longname")
1545
1546    def test_longname_1025(self):
1547        self._test(("longnam/" * 127) + "longname_")
1548
1549    def test_longlink_1023(self):
1550        self._test("name", ("longlnk/" * 127) + "longlnk")
1551
1552    def test_longlink_1024(self):
1553        self._test("name", ("longlnk/" * 127) + "longlink")
1554
1555    def test_longlink_1025(self):
1556        self._test("name", ("longlnk/" * 127) + "longlink_")
1557
1558    def test_longnamelink_1023(self):
1559        self._test(("longnam/" * 127) + "longnam",
1560                   ("longlnk/" * 127) + "longlnk")
1561
1562    def test_longnamelink_1024(self):
1563        self._test(("longnam/" * 127) + "longname",
1564                   ("longlnk/" * 127) + "longlink")
1565
1566    def test_longnamelink_1025(self):
1567        self._test(("longnam/" * 127) + "longname_",
1568                   ("longlnk/" * 127) + "longlink_")
1569
1570
1571class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1572
1573    prefix = "w:"
1574
1575    def test_headers_written_only_for_device_files(self):
1576        # Regression test for bpo-18819.
1577        tempdir = os.path.join(TEMPDIR, "device_header_test")
1578        os.mkdir(tempdir)
1579        try:
1580            tar = tarfile.open(tmpname, self.mode)
1581            try:
1582                input_blk = tarfile.TarInfo(name="my_block_device")
1583                input_reg = tarfile.TarInfo(name="my_regular_file")
1584                input_blk.type = tarfile.BLKTYPE
1585                input_reg.type = tarfile.REGTYPE
1586                tar.addfile(input_blk)
1587                tar.addfile(input_reg)
1588            finally:
1589                tar.close()
1590
1591            # devmajor and devminor should be *interpreted* as 0 in both...
1592            tar = tarfile.open(tmpname, "r")
1593            try:
1594                output_blk = tar.getmember("my_block_device")
1595                output_reg = tar.getmember("my_regular_file")
1596            finally:
1597                tar.close()
1598            self.assertEqual(output_blk.devmajor, 0)
1599            self.assertEqual(output_blk.devminor, 0)
1600            self.assertEqual(output_reg.devmajor, 0)
1601            self.assertEqual(output_reg.devminor, 0)
1602
1603            # ...but the fields should not actually be set on regular files:
1604            with open(tmpname, "rb") as infile:
1605                buf = infile.read()
1606            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1607            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1608            # See `struct posixheader` in GNU docs for byte offsets:
1609            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1610            device_headers = slice(329, 329 + 16)
1611            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1612            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1613        finally:
1614            support.rmtree(tempdir)
1615
1616
1617class CreateTest(WriteTestBase, unittest.TestCase):
1618
1619    prefix = "x:"
1620
1621    file_path = os.path.join(TEMPDIR, "spameggs42")
1622
1623    def setUp(self):
1624        support.unlink(tmpname)
1625
1626    @classmethod
1627    def setUpClass(cls):
1628        with open(cls.file_path, "wb") as fobj:
1629            fobj.write(b"aaa")
1630
1631    @classmethod
1632    def tearDownClass(cls):
1633        support.unlink(cls.file_path)
1634
1635    def test_create(self):
1636        with tarfile.open(tmpname, self.mode) as tobj:
1637            tobj.add(self.file_path)
1638
1639        with self.taropen(tmpname) as tobj:
1640            names = tobj.getnames()
1641        self.assertEqual(len(names), 1)
1642        self.assertIn('spameggs42', names[0])
1643
1644    def test_create_existing(self):
1645        with tarfile.open(tmpname, self.mode) as tobj:
1646            tobj.add(self.file_path)
1647
1648        with self.assertRaises(FileExistsError):
1649            tobj = tarfile.open(tmpname, self.mode)
1650
1651        with self.taropen(tmpname) as tobj:
1652            names = tobj.getnames()
1653        self.assertEqual(len(names), 1)
1654        self.assertIn('spameggs42', names[0])
1655
1656    def test_create_taropen(self):
1657        with self.taropen(tmpname, "x") as tobj:
1658            tobj.add(self.file_path)
1659
1660        with self.taropen(tmpname) as tobj:
1661            names = tobj.getnames()
1662        self.assertEqual(len(names), 1)
1663        self.assertIn('spameggs42', names[0])
1664
1665    def test_create_existing_taropen(self):
1666        with self.taropen(tmpname, "x") as tobj:
1667            tobj.add(self.file_path)
1668
1669        with self.assertRaises(FileExistsError):
1670            with self.taropen(tmpname, "x"):
1671                pass
1672
1673        with self.taropen(tmpname) as tobj:
1674            names = tobj.getnames()
1675        self.assertEqual(len(names), 1)
1676        self.assertIn("spameggs42", names[0])
1677
1678    def test_create_pathlike_name(self):
1679        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1680            self.assertIsInstance(tobj.name, str)
1681            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1682            tobj.add(pathlib.Path(self.file_path))
1683            names = tobj.getnames()
1684        self.assertEqual(len(names), 1)
1685        self.assertIn('spameggs42', names[0])
1686
1687        with self.taropen(tmpname) as tobj:
1688            names = tobj.getnames()
1689        self.assertEqual(len(names), 1)
1690        self.assertIn('spameggs42', names[0])
1691
1692    def test_create_taropen_pathlike_name(self):
1693        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1694            self.assertIsInstance(tobj.name, str)
1695            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1696            tobj.add(pathlib.Path(self.file_path))
1697            names = tobj.getnames()
1698        self.assertEqual(len(names), 1)
1699        self.assertIn('spameggs42', names[0])
1700
1701        with self.taropen(tmpname) as tobj:
1702            names = tobj.getnames()
1703        self.assertEqual(len(names), 1)
1704        self.assertIn('spameggs42', names[0])
1705
1706
1707class GzipCreateTest(GzipTest, CreateTest):
1708    pass
1709
1710
1711class Bz2CreateTest(Bz2Test, CreateTest):
1712    pass
1713
1714
1715class LzmaCreateTest(LzmaTest, CreateTest):
1716    pass
1717
1718
1719class CreateWithXModeTest(CreateTest):
1720
1721    prefix = "x"
1722
1723    test_create_taropen = None
1724    test_create_existing_taropen = None
1725
1726
1727@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1728class HardlinkTest(unittest.TestCase):
1729    # Test the creation of LNKTYPE (hardlink) members in an archive.
1730
1731    def setUp(self):
1732        self.foo = os.path.join(TEMPDIR, "foo")
1733        self.bar = os.path.join(TEMPDIR, "bar")
1734
1735        with open(self.foo, "wb") as fobj:
1736            fobj.write(b"foo")
1737
1738        try:
1739            os.link(self.foo, self.bar)
1740        except PermissionError as e:
1741            self.skipTest('os.link(): %s' % e)
1742
1743        self.tar = tarfile.open(tmpname, "w")
1744        self.tar.add(self.foo)
1745
1746    def tearDown(self):
1747        self.tar.close()
1748        support.unlink(self.foo)
1749        support.unlink(self.bar)
1750
1751    def test_add_twice(self):
1752        # The same name will be added as a REGTYPE every
1753        # time regardless of st_nlink.
1754        tarinfo = self.tar.gettarinfo(self.foo)
1755        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1756                "add file as regular failed")
1757
1758    def test_add_hardlink(self):
1759        tarinfo = self.tar.gettarinfo(self.bar)
1760        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1761                "add file as hardlink failed")
1762
1763    def test_dereference_hardlink(self):
1764        self.tar.dereference = True
1765        tarinfo = self.tar.gettarinfo(self.bar)
1766        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1767                "dereferencing hardlink failed")
1768
1769
1770class PaxWriteTest(GNUWriteTest):
1771
1772    def _test(self, name, link=None):
1773        # See GNUWriteTest.
1774        tarinfo = tarfile.TarInfo(name)
1775        if link:
1776            tarinfo.linkname = link
1777            tarinfo.type = tarfile.LNKTYPE
1778
1779        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1780        try:
1781            tar.addfile(tarinfo)
1782        finally:
1783            tar.close()
1784
1785        tar = tarfile.open(tmpname)
1786        try:
1787            if link:
1788                l = tar.getmembers()[0].linkname
1789                self.assertEqual(link, l, "PAX longlink creation failed")
1790            else:
1791                n = tar.getmembers()[0].name
1792                self.assertEqual(name, n, "PAX longname creation failed")
1793        finally:
1794            tar.close()
1795
1796    def test_pax_global_header(self):
1797        pax_headers = {
1798                "foo": "bar",
1799                "uid": "0",
1800                "mtime": "1.23",
1801                "test": "\xe4\xf6\xfc",
1802                "\xe4\xf6\xfc": "test"}
1803
1804        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1805                pax_headers=pax_headers)
1806        try:
1807            tar.addfile(tarfile.TarInfo("test"))
1808        finally:
1809            tar.close()
1810
1811        # Test if the global header was written correctly.
1812        tar = tarfile.open(tmpname, encoding="iso8859-1")
1813        try:
1814            self.assertEqual(tar.pax_headers, pax_headers)
1815            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1816            # Test if all the fields are strings.
1817            for key, val in tar.pax_headers.items():
1818                self.assertIsNot(type(key), bytes)
1819                self.assertIsNot(type(val), bytes)
1820                if key in tarfile.PAX_NUMBER_FIELDS:
1821                    try:
1822                        tarfile.PAX_NUMBER_FIELDS[key](val)
1823                    except (TypeError, ValueError):
1824                        self.fail("unable to convert pax header field")
1825        finally:
1826            tar.close()
1827
1828    def test_pax_extended_header(self):
1829        # The fields from the pax header have priority over the
1830        # TarInfo.
1831        pax_headers = {"path": "foo", "uid": "123"}
1832
1833        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1834                           encoding="iso8859-1")
1835        try:
1836            t = tarfile.TarInfo()
1837            t.name = "\xe4\xf6\xfc" # non-ASCII
1838            t.uid = 8**8 # too large
1839            t.pax_headers = pax_headers
1840            tar.addfile(t)
1841        finally:
1842            tar.close()
1843
1844        tar = tarfile.open(tmpname, encoding="iso8859-1")
1845        try:
1846            t = tar.getmembers()[0]
1847            self.assertEqual(t.pax_headers, pax_headers)
1848            self.assertEqual(t.name, "foo")
1849            self.assertEqual(t.uid, 123)
1850        finally:
1851            tar.close()
1852
1853
1854class UnicodeTest:
1855
1856    def test_iso8859_1_filename(self):
1857        self._test_unicode_filename("iso8859-1")
1858
1859    def test_utf7_filename(self):
1860        self._test_unicode_filename("utf7")
1861
1862    def test_utf8_filename(self):
1863        self._test_unicode_filename("utf-8")
1864
1865    def _test_unicode_filename(self, encoding):
1866        tar = tarfile.open(tmpname, "w", format=self.format,
1867                           encoding=encoding, errors="strict")
1868        try:
1869            name = "\xe4\xf6\xfc"
1870            tar.addfile(tarfile.TarInfo(name))
1871        finally:
1872            tar.close()
1873
1874        tar = tarfile.open(tmpname, encoding=encoding)
1875        try:
1876            self.assertEqual(tar.getmembers()[0].name, name)
1877        finally:
1878            tar.close()
1879
1880    def test_unicode_filename_error(self):
1881        tar = tarfile.open(tmpname, "w", format=self.format,
1882                           encoding="ascii", errors="strict")
1883        try:
1884            tarinfo = tarfile.TarInfo()
1885
1886            tarinfo.name = "\xe4\xf6\xfc"
1887            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1888
1889            tarinfo.name = "foo"
1890            tarinfo.uname = "\xe4\xf6\xfc"
1891            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1892        finally:
1893            tar.close()
1894
1895    def test_unicode_argument(self):
1896        tar = tarfile.open(tarname, "r",
1897                           encoding="iso8859-1", errors="strict")
1898        try:
1899            for t in tar:
1900                self.assertIs(type(t.name), str)
1901                self.assertIs(type(t.linkname), str)
1902                self.assertIs(type(t.uname), str)
1903                self.assertIs(type(t.gname), str)
1904        finally:
1905            tar.close()
1906
1907    def test_uname_unicode(self):
1908        t = tarfile.TarInfo("foo")
1909        t.uname = "\xe4\xf6\xfc"
1910        t.gname = "\xe4\xf6\xfc"
1911
1912        tar = tarfile.open(tmpname, mode="w", format=self.format,
1913                           encoding="iso8859-1")
1914        try:
1915            tar.addfile(t)
1916        finally:
1917            tar.close()
1918
1919        tar = tarfile.open(tmpname, encoding="iso8859-1")
1920        try:
1921            t = tar.getmember("foo")
1922            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1923            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1924
1925            if self.format != tarfile.PAX_FORMAT:
1926                tar.close()
1927                tar = tarfile.open(tmpname, encoding="ascii")
1928                t = tar.getmember("foo")
1929                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1930                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1931        finally:
1932            tar.close()
1933
1934
1935class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1936
1937    format = tarfile.USTAR_FORMAT
1938
1939    # Test whether the utf-8 encoded version of a filename exceeds the 100
1940    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1941    # bytes).
1942    def test_unicode_name1(self):
1943        self._test_ustar_name("0123456789" * 10)
1944        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1945        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1946        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1947
1948    def test_unicode_name2(self):
1949        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1950        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1951
1952    # Test whether the utf-8 encoded version of a filename exceeds the 155
1953    # bytes prefix + '/' + 100 bytes name limit.
1954    def test_unicode_longname1(self):
1955        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1956        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1957        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1958        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1959
1960    def test_unicode_longname2(self):
1961        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1962        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1963
1964    def test_unicode_longname3(self):
1965        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1966        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1967        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1968
1969    def test_unicode_longname4(self):
1970        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1971        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1972
1973    def _test_ustar_name(self, name, exc=None):
1974        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1975            t = tarfile.TarInfo(name)
1976            if exc is None:
1977                tar.addfile(t)
1978            else:
1979                self.assertRaises(exc, tar.addfile, t)
1980
1981        if exc is None:
1982            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1983                for t in tar:
1984                    self.assertEqual(name, t.name)
1985                    break
1986
1987    # Test the same as above for the 100 bytes link field.
1988    def test_unicode_link1(self):
1989        self._test_ustar_link("0123456789" * 10)
1990        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1991        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1992        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1993
1994    def test_unicode_link2(self):
1995        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1996        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1997
1998    def _test_ustar_link(self, name, exc=None):
1999        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2000            t = tarfile.TarInfo("foo")
2001            t.linkname = name
2002            if exc is None:
2003                tar.addfile(t)
2004            else:
2005                self.assertRaises(exc, tar.addfile, t)
2006
2007        if exc is None:
2008            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2009                for t in tar:
2010                    self.assertEqual(name, t.linkname)
2011                    break
2012
2013
2014class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2015
2016    format = tarfile.GNU_FORMAT
2017
2018    def test_bad_pax_header(self):
2019        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2020        # without a hdrcharset=BINARY header.
2021        for encoding, name in (
2022                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2023                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2024            with tarfile.open(tarname, encoding=encoding,
2025                              errors="surrogateescape") as tar:
2026                try:
2027                    t = tar.getmember(name)
2028                except KeyError:
2029                    self.fail("unable to read bad GNU tar pax header")
2030
2031
2032class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2033
2034    format = tarfile.PAX_FORMAT
2035
2036    # PAX_FORMAT ignores encoding in write mode.
2037    test_unicode_filename_error = None
2038
2039    def test_binary_header(self):
2040        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2041        for encoding, name in (
2042                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2043                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2044            with tarfile.open(tarname, encoding=encoding,
2045                              errors="surrogateescape") as tar:
2046                try:
2047                    t = tar.getmember(name)
2048                except KeyError:
2049                    self.fail("unable to read POSIX.1-2008 binary header")
2050
2051
2052class AppendTestBase:
2053    # Test append mode (cp. patch #1652681).
2054
2055    def setUp(self):
2056        self.tarname = tmpname
2057        if os.path.exists(self.tarname):
2058            support.unlink(self.tarname)
2059
2060    def _create_testtar(self, mode="w:"):
2061        with tarfile.open(tarname, encoding="iso8859-1") as src:
2062            t = src.getmember("ustar/regtype")
2063            t.name = "foo"
2064            with src.extractfile(t) as f:
2065                with tarfile.open(self.tarname, mode) as tar:
2066                    tar.addfile(t, f)
2067
2068    def test_append_compressed(self):
2069        self._create_testtar("w:" + self.suffix)
2070        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2071
2072class AppendTest(AppendTestBase, unittest.TestCase):
2073    test_append_compressed = None
2074
2075    def _add_testfile(self, fileobj=None):
2076        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2077            tar.addfile(tarfile.TarInfo("bar"))
2078
2079    def _test(self, names=["bar"], fileobj=None):
2080        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2081            self.assertEqual(tar.getnames(), names)
2082
2083    def test_non_existing(self):
2084        self._add_testfile()
2085        self._test()
2086
2087    def test_empty(self):
2088        tarfile.open(self.tarname, "w:").close()
2089        self._add_testfile()
2090        self._test()
2091
2092    def test_empty_fileobj(self):
2093        fobj = io.BytesIO(b"\0" * 1024)
2094        self._add_testfile(fobj)
2095        fobj.seek(0)
2096        self._test(fileobj=fobj)
2097
2098    def test_fileobj(self):
2099        self._create_testtar()
2100        with open(self.tarname, "rb") as fobj:
2101            data = fobj.read()
2102        fobj = io.BytesIO(data)
2103        self._add_testfile(fobj)
2104        fobj.seek(0)
2105        self._test(names=["foo", "bar"], fileobj=fobj)
2106
2107    def test_existing(self):
2108        self._create_testtar()
2109        self._add_testfile()
2110        self._test(names=["foo", "bar"])
2111
2112    # Append mode is supposed to fail if the tarfile to append to
2113    # does not end with a zero block.
2114    def _test_error(self, data):
2115        with open(self.tarname, "wb") as fobj:
2116            fobj.write(data)
2117        self.assertRaises(tarfile.ReadError, self._add_testfile)
2118
2119    def test_null(self):
2120        self._test_error(b"")
2121
2122    def test_incomplete(self):
2123        self._test_error(b"\0" * 13)
2124
2125    def test_premature_eof(self):
2126        data = tarfile.TarInfo("foo").tobuf()
2127        self._test_error(data)
2128
2129    def test_trailing_garbage(self):
2130        data = tarfile.TarInfo("foo").tobuf()
2131        self._test_error(data + b"\0" * 13)
2132
2133    def test_invalid(self):
2134        self._test_error(b"a" * 512)
2135
2136class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2137    pass
2138
2139class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2140    pass
2141
2142class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2143    pass
2144
2145
2146class LimitsTest(unittest.TestCase):
2147
2148    def test_ustar_limits(self):
2149        # 100 char name
2150        tarinfo = tarfile.TarInfo("0123456789" * 10)
2151        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2152
2153        # 101 char name that cannot be stored
2154        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2155        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2156
2157        # 256 char name with a slash at pos 156
2158        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2159        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2160
2161        # 256 char name that cannot be stored
2162        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2163        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2164
2165        # 512 char name
2166        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2167        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2168
2169        # 512 char linkname
2170        tarinfo = tarfile.TarInfo("longlink")
2171        tarinfo.linkname = "123/" * 126 + "longname"
2172        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2173
2174        # uid > 8 digits
2175        tarinfo = tarfile.TarInfo("name")
2176        tarinfo.uid = 0o10000000
2177        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2178
2179    def test_gnu_limits(self):
2180        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2181        tarinfo.tobuf(tarfile.GNU_FORMAT)
2182
2183        tarinfo = tarfile.TarInfo("longlink")
2184        tarinfo.linkname = "123/" * 126 + "longname"
2185        tarinfo.tobuf(tarfile.GNU_FORMAT)
2186
2187        # uid >= 256 ** 7
2188        tarinfo = tarfile.TarInfo("name")
2189        tarinfo.uid = 0o4000000000000000000
2190        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2191
2192    def test_pax_limits(self):
2193        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2194        tarinfo.tobuf(tarfile.PAX_FORMAT)
2195
2196        tarinfo = tarfile.TarInfo("longlink")
2197        tarinfo.linkname = "123/" * 126 + "longname"
2198        tarinfo.tobuf(tarfile.PAX_FORMAT)
2199
2200        tarinfo = tarfile.TarInfo("name")
2201        tarinfo.uid = 0o4000000000000000000
2202        tarinfo.tobuf(tarfile.PAX_FORMAT)
2203
2204
2205class MiscTest(unittest.TestCase):
2206
2207    def test_char_fields(self):
2208        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2209                         b"foo\0\0\0\0\0")
2210        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2211                         b"foo")
2212        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2213                         "foo")
2214        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2215                         "foo")
2216
2217    def test_read_number_fields(self):
2218        # Issue 13158: Test if GNU tar specific base-256 number fields
2219        # are decoded correctly.
2220        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2221        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2222        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2223                         0o10000000)
2224        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2225                         0xffffffff)
2226        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2227                         -1)
2228        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2229                         -100)
2230        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2231                         -0x100000000000000)
2232
2233        # Issue 24514: Test if empty number fields are converted to zero.
2234        self.assertEqual(tarfile.nti(b"\0"), 0)
2235        self.assertEqual(tarfile.nti(b"       \0"), 0)
2236
2237    def test_write_number_fields(self):
2238        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2239        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2240        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2241                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2242        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2243                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2244        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2245                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2246        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2247                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2248        self.assertEqual(tarfile.itn(-0x100000000000000,
2249                                     format=tarfile.GNU_FORMAT),
2250                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2251
2252        # Issue 32713: Test if itn() supports float values outside the
2253        # non-GNU format range
2254        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2255                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2256        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2257                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2258        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2259
2260    def test_number_field_limits(self):
2261        with self.assertRaises(ValueError):
2262            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2263        with self.assertRaises(ValueError):
2264            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2265        with self.assertRaises(ValueError):
2266            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2267        with self.assertRaises(ValueError):
2268            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2269
2270    def test__all__(self):
2271        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2272                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2273                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2274                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2275                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2276                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2277                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2278                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2279                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2280                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2281                     'filemode',
2282                     'EmptyHeaderError', 'TruncatedHeaderError',
2283                     'EOFHeaderError', 'InvalidHeaderError',
2284                     'SubsequentHeaderError', 'ExFileObject',
2285                     'main'}
2286        support.check__all__(self, tarfile, blacklist=blacklist)
2287
2288
2289class CommandLineTest(unittest.TestCase):
2290
2291    def tarfilecmd(self, *args, **kwargs):
2292        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2293                                                      **kwargs)
2294        return out.replace(os.linesep.encode(), b'\n')
2295
2296    def tarfilecmd_failure(self, *args):
2297        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2298
2299    def make_simple_tarfile(self, tar_name):
2300        files = [support.findfile('tokenize_tests.txt'),
2301                 support.findfile('tokenize_tests-no-coding-cookie-'
2302                                  'and-utf8-bom-sig-only.txt')]
2303        self.addCleanup(support.unlink, tar_name)
2304        with tarfile.open(tar_name, 'w') as tf:
2305            for tardata in files:
2306                tf.add(tardata, arcname=os.path.basename(tardata))
2307
2308    def test_bad_use(self):
2309        rc, out, err = self.tarfilecmd_failure()
2310        self.assertEqual(out, b'')
2311        self.assertIn(b'usage', err.lower())
2312        self.assertIn(b'error', err.lower())
2313        self.assertIn(b'required', err.lower())
2314        rc, out, err = self.tarfilecmd_failure('-l', '')
2315        self.assertEqual(out, b'')
2316        self.assertNotEqual(err.strip(), b'')
2317
2318    def test_test_command(self):
2319        for tar_name in testtarnames:
2320            for opt in '-t', '--test':
2321                out = self.tarfilecmd(opt, tar_name)
2322                self.assertEqual(out, b'')
2323
2324    def test_test_command_verbose(self):
2325        for tar_name in testtarnames:
2326            for opt in '-v', '--verbose':
2327                out = self.tarfilecmd(opt, '-t', tar_name,
2328                                      PYTHONIOENCODING='utf-8')
2329                self.assertIn(b'is a tar archive.\n', out)
2330
2331    def test_test_command_invalid_file(self):
2332        zipname = support.findfile('zipdir.zip')
2333        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2334        self.assertIn(b' is not a tar archive.', err)
2335        self.assertEqual(out, b'')
2336        self.assertEqual(rc, 1)
2337
2338        for tar_name in testtarnames:
2339            with self.subTest(tar_name=tar_name):
2340                with open(tar_name, 'rb') as f:
2341                    data = f.read()
2342                try:
2343                    with open(tmpname, 'wb') as f:
2344                        f.write(data[:511])
2345                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2346                    self.assertEqual(out, b'')
2347                    self.assertEqual(rc, 1)
2348                finally:
2349                    support.unlink(tmpname)
2350
2351    def test_list_command(self):
2352        for tar_name in testtarnames:
2353            with support.captured_stdout() as t:
2354                with tarfile.open(tar_name, 'r') as tf:
2355                    tf.list(verbose=False)
2356            expected = t.getvalue().encode('ascii', 'backslashreplace')
2357            for opt in '-l', '--list':
2358                out = self.tarfilecmd(opt, tar_name,
2359                                      PYTHONIOENCODING='ascii')
2360                self.assertEqual(out, expected)
2361
2362    def test_list_command_verbose(self):
2363        for tar_name in testtarnames:
2364            with support.captured_stdout() as t:
2365                with tarfile.open(tar_name, 'r') as tf:
2366                    tf.list(verbose=True)
2367            expected = t.getvalue().encode('ascii', 'backslashreplace')
2368            for opt in '-v', '--verbose':
2369                out = self.tarfilecmd(opt, '-l', tar_name,
2370                                      PYTHONIOENCODING='ascii')
2371                self.assertEqual(out, expected)
2372
2373    def test_list_command_invalid_file(self):
2374        zipname = support.findfile('zipdir.zip')
2375        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2376        self.assertIn(b' is not a tar archive.', err)
2377        self.assertEqual(out, b'')
2378        self.assertEqual(rc, 1)
2379
2380    def test_create_command(self):
2381        files = [support.findfile('tokenize_tests.txt'),
2382                 support.findfile('tokenize_tests-no-coding-cookie-'
2383                                  'and-utf8-bom-sig-only.txt')]
2384        for opt in '-c', '--create':
2385            try:
2386                out = self.tarfilecmd(opt, tmpname, *files)
2387                self.assertEqual(out, b'')
2388                with tarfile.open(tmpname) as tar:
2389                    tar.getmembers()
2390            finally:
2391                support.unlink(tmpname)
2392
2393    def test_create_command_verbose(self):
2394        files = [support.findfile('tokenize_tests.txt'),
2395                 support.findfile('tokenize_tests-no-coding-cookie-'
2396                                  'and-utf8-bom-sig-only.txt')]
2397        for opt in '-v', '--verbose':
2398            try:
2399                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2400                                      PYTHONIOENCODING='utf-8')
2401                self.assertIn(b' file created.', out)
2402                with tarfile.open(tmpname) as tar:
2403                    tar.getmembers()
2404            finally:
2405                support.unlink(tmpname)
2406
2407    def test_create_command_dotless_filename(self):
2408        files = [support.findfile('tokenize_tests.txt')]
2409        try:
2410            out = self.tarfilecmd('-c', dotlessname, *files)
2411            self.assertEqual(out, b'')
2412            with tarfile.open(dotlessname) as tar:
2413                tar.getmembers()
2414        finally:
2415            support.unlink(dotlessname)
2416
2417    def test_create_command_dot_started_filename(self):
2418        tar_name = os.path.join(TEMPDIR, ".testtar")
2419        files = [support.findfile('tokenize_tests.txt')]
2420        try:
2421            out = self.tarfilecmd('-c', tar_name, *files)
2422            self.assertEqual(out, b'')
2423            with tarfile.open(tar_name) as tar:
2424                tar.getmembers()
2425        finally:
2426            support.unlink(tar_name)
2427
2428    def test_create_command_compressed(self):
2429        files = [support.findfile('tokenize_tests.txt'),
2430                 support.findfile('tokenize_tests-no-coding-cookie-'
2431                                  'and-utf8-bom-sig-only.txt')]
2432        for filetype in (GzipTest, Bz2Test, LzmaTest):
2433            if not filetype.open:
2434                continue
2435            try:
2436                tar_name = tmpname + '.' + filetype.suffix
2437                out = self.tarfilecmd('-c', tar_name, *files)
2438                with filetype.taropen(tar_name) as tar:
2439                    tar.getmembers()
2440            finally:
2441                support.unlink(tar_name)
2442
2443    def test_extract_command(self):
2444        self.make_simple_tarfile(tmpname)
2445        for opt in '-e', '--extract':
2446            try:
2447                with support.temp_cwd(tarextdir):
2448                    out = self.tarfilecmd(opt, tmpname)
2449                self.assertEqual(out, b'')
2450            finally:
2451                support.rmtree(tarextdir)
2452
2453    def test_extract_command_verbose(self):
2454        self.make_simple_tarfile(tmpname)
2455        for opt in '-v', '--verbose':
2456            try:
2457                with support.temp_cwd(tarextdir):
2458                    out = self.tarfilecmd(opt, '-e', tmpname,
2459                                          PYTHONIOENCODING='utf-8')
2460                self.assertIn(b' file is extracted.', out)
2461            finally:
2462                support.rmtree(tarextdir)
2463
2464    def test_extract_command_different_directory(self):
2465        self.make_simple_tarfile(tmpname)
2466        try:
2467            with support.temp_cwd(tarextdir):
2468                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2469            self.assertEqual(out, b'')
2470        finally:
2471            support.rmtree(tarextdir)
2472
2473    def test_extract_command_invalid_file(self):
2474        zipname = support.findfile('zipdir.zip')
2475        with support.temp_cwd(tarextdir):
2476            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2477        self.assertIn(b' is not a tar archive.', err)
2478        self.assertEqual(out, b'')
2479        self.assertEqual(rc, 1)
2480
2481
2482class ContextManagerTest(unittest.TestCase):
2483
2484    def test_basic(self):
2485        with tarfile.open(tarname) as tar:
2486            self.assertFalse(tar.closed, "closed inside runtime context")
2487        self.assertTrue(tar.closed, "context manager failed")
2488
2489    def test_closed(self):
2490        # The __enter__() method is supposed to raise OSError
2491        # if the TarFile object is already closed.
2492        tar = tarfile.open(tarname)
2493        tar.close()
2494        with self.assertRaises(OSError):
2495            with tar:
2496                pass
2497
2498    def test_exception(self):
2499        # Test if the OSError exception is passed through properly.
2500        with self.assertRaises(Exception) as exc:
2501            with tarfile.open(tarname) as tar:
2502                raise OSError
2503        self.assertIsInstance(exc.exception, OSError,
2504                              "wrong exception raised in context manager")
2505        self.assertTrue(tar.closed, "context manager failed")
2506
2507    def test_no_eof(self):
2508        # __exit__() must not write end-of-archive blocks if an
2509        # exception was raised.
2510        try:
2511            with tarfile.open(tmpname, "w") as tar:
2512                raise Exception
2513        except:
2514            pass
2515        self.assertEqual(os.path.getsize(tmpname), 0,
2516                "context manager wrote an end-of-archive block")
2517        self.assertTrue(tar.closed, "context manager failed")
2518
2519    def test_eof(self):
2520        # __exit__() must write end-of-archive blocks, i.e. call
2521        # TarFile.close() if there was no error.
2522        with tarfile.open(tmpname, "w"):
2523            pass
2524        self.assertNotEqual(os.path.getsize(tmpname), 0,
2525                "context manager wrote no end-of-archive block")
2526
2527    def test_fileobj(self):
2528        # Test that __exit__() did not close the external file
2529        # object.
2530        with open(tmpname, "wb") as fobj:
2531            try:
2532                with tarfile.open(fileobj=fobj, mode="w") as tar:
2533                    raise Exception
2534            except:
2535                pass
2536            self.assertFalse(fobj.closed, "external file object was closed")
2537            self.assertTrue(tar.closed, "context manager failed")
2538
2539
2540@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2541class LinkEmulationTest(ReadTest, unittest.TestCase):
2542
2543    # Test for issue #8741 regression. On platforms that do not support
2544    # symbolic or hard links tarfile tries to extract these types of members
2545    # as the regular files they point to.
2546    def _test_link_extraction(self, name):
2547        self.tar.extract(name, TEMPDIR)
2548        with open(os.path.join(TEMPDIR, name), "rb") as f:
2549            data = f.read()
2550        self.assertEqual(sha256sum(data), sha256_regtype)
2551
2552    # See issues #1578269, #8879, and #17689 for some history on these skips
2553    @unittest.skipIf(hasattr(os.path, "islink"),
2554                     "Skip emulation - has os.path.islink but not os.link")
2555    def test_hardlink_extraction1(self):
2556        self._test_link_extraction("ustar/lnktype")
2557
2558    @unittest.skipIf(hasattr(os.path, "islink"),
2559                     "Skip emulation - has os.path.islink but not os.link")
2560    def test_hardlink_extraction2(self):
2561        self._test_link_extraction("./ustar/linktest2/lnktype")
2562
2563    @unittest.skipIf(hasattr(os, "symlink"),
2564                     "Skip emulation if symlink exists")
2565    def test_symlink_extraction1(self):
2566        self._test_link_extraction("ustar/symtype")
2567
2568    @unittest.skipIf(hasattr(os, "symlink"),
2569                     "Skip emulation if symlink exists")
2570    def test_symlink_extraction2(self):
2571        self._test_link_extraction("./ustar/linktest2/symtype")
2572
2573
2574class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2575    # Issue5068: The _BZ2Proxy.read() method loops forever
2576    # on an empty or partial bzipped file.
2577
2578    def _test_partial_input(self, mode):
2579        class MyBytesIO(io.BytesIO):
2580            hit_eof = False
2581            def read(self, n):
2582                if self.hit_eof:
2583                    raise AssertionError("infinite loop detected in "
2584                                         "tarfile.open()")
2585                self.hit_eof = self.tell() == len(self.getvalue())
2586                return super(MyBytesIO, self).read(n)
2587            def seek(self, *args):
2588                self.hit_eof = False
2589                return super(MyBytesIO, self).seek(*args)
2590
2591        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2592        for x in range(len(data) + 1):
2593            try:
2594                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2595            except tarfile.ReadError:
2596                pass # we have no interest in ReadErrors
2597
2598    def test_partial_input(self):
2599        self._test_partial_input("r")
2600
2601    def test_partial_input_bz2(self):
2602        self._test_partial_input("r:bz2")
2603
2604
2605def root_is_uid_gid_0():
2606    try:
2607        import pwd, grp
2608    except ImportError:
2609        return False
2610    if pwd.getpwuid(0)[0] != 'root':
2611        return False
2612    if grp.getgrgid(0)[0] != 'root':
2613        return False
2614    return True
2615
2616
2617@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2618@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2619class NumericOwnerTest(unittest.TestCase):
2620    # mock the following:
2621    #  os.chown: so we can test what's being called
2622    #  os.chmod: so the modes are not actually changed. if they are, we can't
2623    #             delete the files/directories
2624    #  os.geteuid: so we can lie and say we're root (uid = 0)
2625
2626    @staticmethod
2627    def _make_test_archive(filename_1, dirname_1, filename_2):
2628        # the file contents to write
2629        fobj = io.BytesIO(b"content")
2630
2631        # create a tar file with a file, a directory, and a file within that
2632        #  directory. Assign various .uid/.gid values to them
2633        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2634                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2635                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2636                 ]
2637        with tarfile.open(tmpname, 'w') as tarfl:
2638            for name, uid, gid, typ, contents in items:
2639                t = tarfile.TarInfo(name)
2640                t.uid = uid
2641                t.gid = gid
2642                t.uname = 'root'
2643                t.gname = 'root'
2644                t.type = typ
2645                tarfl.addfile(t, contents)
2646
2647        # return the full pathname to the tar file
2648        return tmpname
2649
2650    @staticmethod
2651    @contextmanager
2652    def _setup_test(mock_geteuid):
2653        mock_geteuid.return_value = 0  # lie and say we're root
2654        fname = 'numeric-owner-testfile'
2655        dirname = 'dir'
2656
2657        # the names we want stored in the tarfile
2658        filename_1 = fname
2659        dirname_1 = dirname
2660        filename_2 = os.path.join(dirname, fname)
2661
2662        # create the tarfile with the contents we're after
2663        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2664                                                           dirname_1,
2665                                                           filename_2)
2666
2667        # open the tarfile for reading. yield it and the names of the items
2668        #  we stored into the file
2669        with tarfile.open(tar_filename) as tarfl:
2670            yield tarfl, filename_1, dirname_1, filename_2
2671
2672    @unittest.mock.patch('os.chown')
2673    @unittest.mock.patch('os.chmod')
2674    @unittest.mock.patch('os.geteuid')
2675    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2676                                        mock_chown):
2677        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2678                                                filename_2):
2679            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2680            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2681
2682        # convert to filesystem paths
2683        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2684        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2685
2686        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2687                                     unittest.mock.call(f_filename_2, 88, 87),
2688                                     ],
2689                                    any_order=True)
2690
2691    @unittest.mock.patch('os.chown')
2692    @unittest.mock.patch('os.chmod')
2693    @unittest.mock.patch('os.geteuid')
2694    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2695                                           mock_chown):
2696        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2697                                                filename_2):
2698            tarfl.extractall(TEMPDIR, numeric_owner=True)
2699
2700        # convert to filesystem paths
2701        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2702        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2703        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2704
2705        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2706                                     unittest.mock.call(f_dirname_1, 77, 76),
2707                                     unittest.mock.call(f_filename_2, 88, 87),
2708                                     ],
2709                                    any_order=True)
2710
2711    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2712    #  because the uname and gname in the test file are 'root', and extract()
2713    #  will look them up using pwd and grp to find their uid and gid, which we
2714    #  test here to be 0.
2715    @unittest.skipUnless(root_is_uid_gid_0(),
2716                         'uid=0,gid=0 must be named "root"')
2717    @unittest.mock.patch('os.chown')
2718    @unittest.mock.patch('os.chmod')
2719    @unittest.mock.patch('os.geteuid')
2720    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2721                                           mock_chown):
2722        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2723            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2724
2725        # convert to filesystem paths
2726        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2727
2728        mock_chown.assert_called_with(f_filename_1, 0, 0)
2729
2730    @unittest.mock.patch('os.geteuid')
2731    def test_keyword_only(self, mock_geteuid):
2732        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2733            self.assertRaises(TypeError,
2734                              tarfl.extract, filename_1, TEMPDIR, False, True)
2735
2736
2737def setUpModule():
2738    support.unlink(TEMPDIR)
2739    os.makedirs(TEMPDIR)
2740
2741    global testtarnames
2742    testtarnames = [tarname]
2743    with open(tarname, "rb") as fobj:
2744        data = fobj.read()
2745
2746    # Create compressed tarfiles.
2747    for c in GzipTest, Bz2Test, LzmaTest:
2748        if c.open:
2749            support.unlink(c.tarname)
2750            testtarnames.append(c.tarname)
2751            with c.open(c.tarname, "wb") as tar:
2752                tar.write(data)
2753
2754def tearDownModule():
2755    if os.path.exists(TEMPDIR):
2756        support.rmtree(TEMPDIR)
2757
2758if __name__ == "__main__":
2759    unittest.main()
2760