• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper, requires_hashdigest
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25try:
26    import lzma
27except ImportError:
28    lzma = None
29
30def sha256sum(data):
31    return sha256(data).hexdigest()
32
33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
34tarextdir = TEMPDIR + '-extract-test'
35tarname = support.findfile("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
39tmpname = os.path.join(TEMPDIR, "tmp.tar")
40dotlessname = os.path.join(TEMPDIR, "testtar")
41
42sha256_regtype = (
43    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
44)
45sha256_sparse = (
46    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
47)
48
49
50class TarTest:
51    tarname = tarname
52    suffix = ''
53    open = io.FileIO
54    taropen = tarfile.TarFile.taropen
55
56    @property
57    def mode(self):
58        return self.prefix + self.suffix
59
60@support.requires_gzip
61class GzipTest:
62    tarname = gzipname
63    suffix = 'gz'
64    open = gzip.GzipFile if gzip else None
65    taropen = tarfile.TarFile.gzopen
66
67@support.requires_bz2
68class Bz2Test:
69    tarname = bz2name
70    suffix = 'bz2'
71    open = bz2.BZ2File if bz2 else None
72    taropen = tarfile.TarFile.bz2open
73
74@support.requires_lzma
75class LzmaTest:
76    tarname = xzname
77    suffix = 'xz'
78    open = lzma.LZMAFile if lzma else None
79    taropen = tarfile.TarFile.xzopen
80
81
82class ReadTest(TarTest):
83
84    prefix = "r:"
85
86    def setUp(self):
87        self.tar = tarfile.open(self.tarname, mode=self.mode,
88                                encoding="iso8859-1")
89
90    def tearDown(self):
91        self.tar.close()
92
93
94class UstarReadTest(ReadTest, unittest.TestCase):
95
96    def test_fileobj_regular_file(self):
97        tarinfo = self.tar.getmember("ustar/regtype")
98        with self.tar.extractfile(tarinfo) as fobj:
99            data = fobj.read()
100            self.assertEqual(len(data), tarinfo.size,
101                    "regular file extraction failed")
102            self.assertEqual(sha256sum(data), sha256_regtype,
103                    "regular file extraction failed")
104
105    def test_fileobj_readlines(self):
106        self.tar.extract("ustar/regtype", TEMPDIR)
107        tarinfo = self.tar.getmember("ustar/regtype")
108        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
109            lines1 = fobj1.readlines()
110
111        with self.tar.extractfile(tarinfo) as fobj:
112            fobj2 = io.TextIOWrapper(fobj)
113            lines2 = fobj2.readlines()
114            self.assertEqual(lines1, lines2,
115                    "fileobj.readlines() failed")
116            self.assertEqual(len(lines2), 114,
117                    "fileobj.readlines() failed")
118            self.assertEqual(lines2[83],
119                    "I will gladly admit that Python is not the fastest "
120                    "running scripting language.\n",
121                    "fileobj.readlines() failed")
122
123    def test_fileobj_iter(self):
124        self.tar.extract("ustar/regtype", TEMPDIR)
125        tarinfo = self.tar.getmember("ustar/regtype")
126        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
127            lines1 = fobj1.readlines()
128        with self.tar.extractfile(tarinfo) as fobj2:
129            lines2 = list(io.TextIOWrapper(fobj2))
130            self.assertEqual(lines1, lines2,
131                    "fileobj.__iter__() failed")
132
133    def test_fileobj_seek(self):
134        self.tar.extract("ustar/regtype", TEMPDIR)
135        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
136            data = fobj.read()
137
138        tarinfo = self.tar.getmember("ustar/regtype")
139        with self.tar.extractfile(tarinfo) as fobj:
140            text = fobj.read()
141            fobj.seek(0)
142            self.assertEqual(0, fobj.tell(),
143                         "seek() to file's start failed")
144            fobj.seek(2048, 0)
145            self.assertEqual(2048, fobj.tell(),
146                         "seek() to absolute position failed")
147            fobj.seek(-1024, 1)
148            self.assertEqual(1024, fobj.tell(),
149                         "seek() to negative relative position failed")
150            fobj.seek(1024, 1)
151            self.assertEqual(2048, fobj.tell(),
152                         "seek() to positive relative position failed")
153            s = fobj.read(10)
154            self.assertEqual(s, data[2048:2058],
155                         "read() after seek failed")
156            fobj.seek(0, 2)
157            self.assertEqual(tarinfo.size, fobj.tell(),
158                         "seek() to file's end failed")
159            self.assertEqual(fobj.read(), b"",
160                         "read() at file's end did not return empty string")
161            fobj.seek(-tarinfo.size, 2)
162            self.assertEqual(0, fobj.tell(),
163                         "relative seek() to file's end failed")
164            fobj.seek(512)
165            s1 = fobj.readlines()
166            fobj.seek(512)
167            s2 = fobj.readlines()
168            self.assertEqual(s1, s2,
169                         "readlines() after seek failed")
170            fobj.seek(0)
171            self.assertEqual(len(fobj.readline()), fobj.tell(),
172                         "tell() after readline() failed")
173            fobj.seek(512)
174            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
175                         "tell() after seek() and readline() failed")
176            fobj.seek(0)
177            line = fobj.readline()
178            self.assertEqual(fobj.read(), data[len(line):],
179                         "read() after readline() failed")
180
181    def test_fileobj_text(self):
182        with self.tar.extractfile("ustar/regtype") as fobj:
183            fobj = io.TextIOWrapper(fobj)
184            data = fobj.read().encode("iso8859-1")
185            self.assertEqual(sha256sum(data), sha256_regtype)
186            try:
187                fobj.seek(100)
188            except AttributeError:
189                # Issue #13815: seek() complained about a missing
190                # flush() method.
191                self.fail("seeking failed in text mode")
192
193    # Test if symbolic and hard links are resolved by extractfile().  The
194    # test link members each point to a regular member whose data is
195    # supposed to be exported.
196    def _test_fileobj_link(self, lnktype, regtype):
197        with self.tar.extractfile(lnktype) as a, \
198             self.tar.extractfile(regtype) as b:
199            self.assertEqual(a.name, b.name)
200
201    def test_fileobj_link1(self):
202        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
203
204    def test_fileobj_link2(self):
205        self._test_fileobj_link("./ustar/linktest2/lnktype",
206                                "ustar/linktest1/regtype")
207
208    def test_fileobj_symlink1(self):
209        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
210
211    def test_fileobj_symlink2(self):
212        self._test_fileobj_link("./ustar/linktest2/symtype",
213                                "ustar/linktest1/regtype")
214
215    def test_issue14160(self):
216        self._test_fileobj_link("symtype2", "ustar/regtype")
217
218class GzipUstarReadTest(GzipTest, UstarReadTest):
219    pass
220
221class Bz2UstarReadTest(Bz2Test, UstarReadTest):
222    pass
223
224class LzmaUstarReadTest(LzmaTest, UstarReadTest):
225    pass
226
227
228class ListTest(ReadTest, unittest.TestCase):
229
230    # Override setUp to use default encoding (UTF-8)
231    def setUp(self):
232        self.tar = tarfile.open(self.tarname, mode=self.mode)
233
234    def test_list(self):
235        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
236        with support.swap_attr(sys, 'stdout', tio):
237            self.tar.list(verbose=False)
238        out = tio.detach().getvalue()
239        self.assertIn(b'ustar/conttype', out)
240        self.assertIn(b'ustar/regtype', out)
241        self.assertIn(b'ustar/lnktype', out)
242        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
243        self.assertIn(b'./ustar/linktest2/symtype', out)
244        self.assertIn(b'./ustar/linktest2/lnktype', out)
245        # Make sure it puts trailing slash for directory
246        self.assertIn(b'ustar/dirtype/', out)
247        self.assertIn(b'ustar/dirtype-with-size/', out)
248        # Make sure it is able to print unencodable characters
249        def conv(b):
250            s = b.decode(self.tar.encoding, 'surrogateescape')
251            return s.encode('ascii', 'backslashreplace')
252        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
254                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
255        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
256                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
257        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
258        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
259        # Make sure it prints files separated by one newline without any
260        # 'ls -l'-like accessories if verbose flag is not being used
261        # ...
262        # ustar/conttype
263        # ustar/regtype
264        # ...
265        self.assertRegex(out, br'ustar/conttype ?\r?\n'
266                              br'ustar/regtype ?\r?\n')
267        # Make sure it does not print the source of link without verbose flag
268        self.assertNotIn(b'link to', out)
269        self.assertNotIn(b'->', out)
270
271    def test_list_verbose(self):
272        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
273        with support.swap_attr(sys, 'stdout', tio):
274            self.tar.list(verbose=True)
275        out = tio.detach().getvalue()
276        # Make sure it prints files separated by one newline with 'ls -l'-like
277        # accessories if verbose flag is being used
278        # ...
279        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
280        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
281        # ...
282        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
283                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
284                               br'ustar/\w+type ?\r?\n') * 2)
285        # Make sure it prints the source of link with verbose flag
286        self.assertIn(b'ustar/symtype -> regtype', out)
287        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
288        self.assertIn(b'./ustar/linktest2/lnktype link to '
289                      b'./ustar/linktest1/regtype', out)
290        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
291                      (b'/123' * 125) + b'/longname', out)
292        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
293                      (b'/123' * 125) + b'/longname', out)
294
295    def test_list_members(self):
296        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
297        def members(tar):
298            for tarinfo in tar.getmembers():
299                if 'reg' in tarinfo.name:
300                    yield tarinfo
301        with support.swap_attr(sys, 'stdout', tio):
302            self.tar.list(verbose=False, members=members(self.tar))
303        out = tio.detach().getvalue()
304        self.assertIn(b'ustar/regtype', out)
305        self.assertNotIn(b'ustar/conttype', out)
306
307
308class GzipListTest(GzipTest, ListTest):
309    pass
310
311
312class Bz2ListTest(Bz2Test, ListTest):
313    pass
314
315
316class LzmaListTest(LzmaTest, ListTest):
317    pass
318
319
320class CommonReadTest(ReadTest):
321
322    def test_empty_tarfile(self):
323        # Test for issue6123: Allow opening empty archives.
324        # This test checks if tarfile.open() is able to open an empty tar
325        # archive successfully. Note that an empty tar archive is not the
326        # same as an empty file!
327        with tarfile.open(tmpname, self.mode.replace("r", "w")):
328            pass
329        try:
330            tar = tarfile.open(tmpname, self.mode)
331            tar.getnames()
332        except tarfile.ReadError:
333            self.fail("tarfile.open() failed on empty archive")
334        else:
335            self.assertListEqual(tar.getmembers(), [])
336        finally:
337            tar.close()
338
339    def test_non_existent_tarfile(self):
340        # Test for issue11513: prevent non-existent gzipped tarfiles raising
341        # multiple exceptions.
342        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
343            tarfile.open("xxx", self.mode)
344
345    def test_null_tarfile(self):
346        # Test for issue6123: Allow opening empty archives.
347        # This test guarantees that tarfile.open() does not treat an empty
348        # file as an empty tar archive.
349        with open(tmpname, "wb"):
350            pass
351        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
352        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
353
354    def test_ignore_zeros(self):
355        # Test TarFile's ignore_zeros option.
356        # generate 512 pseudorandom bytes
357        data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
358        for char in (b'\0', b'a'):
359            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
360            # are ignored correctly.
361            with self.open(tmpname, "w") as fobj:
362                fobj.write(char * 1024)
363                tarinfo = tarfile.TarInfo("foo")
364                tarinfo.size = len(data)
365                fobj.write(tarinfo.tobuf())
366                fobj.write(data)
367
368            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
369            try:
370                self.assertListEqual(tar.getnames(), ["foo"],
371                    "ignore_zeros=True should have skipped the %r-blocks" %
372                    char)
373            finally:
374                tar.close()
375
376    def test_premature_end_of_archive(self):
377        for size in (512, 600, 1024, 1200):
378            with tarfile.open(tmpname, "w:") as tar:
379                t = tarfile.TarInfo("foo")
380                t.size = 1024
381                tar.addfile(t, io.BytesIO(b"a" * 1024))
382
383            with open(tmpname, "r+b") as fobj:
384                fobj.truncate(size)
385
386            with tarfile.open(tmpname) as tar:
387                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
388                    for t in tar:
389                        pass
390
391            with tarfile.open(tmpname) as tar:
392                t = tar.next()
393
394                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
395                    tar.extract(t, TEMPDIR)
396
397                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
398                    tar.extractfile(t).read()
399
400    def test_length_zero_header(self):
401        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
402        # with an exception
403        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
404            with tarfile.open(support.findfile('recursion.tar')) as tar:
405                pass
406
407class MiscReadTestBase(CommonReadTest):
408    def requires_name_attribute(self):
409        pass
410
411    def test_no_name_argument(self):
412        self.requires_name_attribute()
413        with open(self.tarname, "rb") as fobj:
414            self.assertIsInstance(fobj.name, str)
415            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
416                self.assertIsInstance(tar.name, str)
417                self.assertEqual(tar.name, os.path.abspath(fobj.name))
418
419    def test_no_name_attribute(self):
420        with open(self.tarname, "rb") as fobj:
421            data = fobj.read()
422        fobj = io.BytesIO(data)
423        self.assertRaises(AttributeError, getattr, fobj, "name")
424        tar = tarfile.open(fileobj=fobj, mode=self.mode)
425        self.assertIsNone(tar.name)
426
427    def test_empty_name_attribute(self):
428        with open(self.tarname, "rb") as fobj:
429            data = fobj.read()
430        fobj = io.BytesIO(data)
431        fobj.name = ""
432        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
433            self.assertIsNone(tar.name)
434
435    def test_int_name_attribute(self):
436        # Issue 21044: tarfile.open() should handle fileobj with an integer
437        # 'name' attribute.
438        fd = os.open(self.tarname, os.O_RDONLY)
439        with open(fd, 'rb') as fobj:
440            self.assertIsInstance(fobj.name, int)
441            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
442                self.assertIsNone(tar.name)
443
444    def test_bytes_name_attribute(self):
445        self.requires_name_attribute()
446        tarname = os.fsencode(self.tarname)
447        with open(tarname, 'rb') as fobj:
448            self.assertIsInstance(fobj.name, bytes)
449            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
450                self.assertIsInstance(tar.name, bytes)
451                self.assertEqual(tar.name, os.path.abspath(fobj.name))
452
453    def test_pathlike_name(self):
454        tarname = pathlib.Path(self.tarname)
455        with tarfile.open(tarname, mode=self.mode) as tar:
456            self.assertIsInstance(tar.name, str)
457            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
458        with self.taropen(tarname) as tar:
459            self.assertIsInstance(tar.name, str)
460            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
461        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
462            self.assertIsInstance(tar.name, str)
463            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
464        if self.suffix == '':
465            with tarfile.TarFile(tarname, mode='r') as tar:
466                self.assertIsInstance(tar.name, str)
467                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
468
469    def test_illegal_mode_arg(self):
470        with open(tmpname, 'wb'):
471            pass
472        with self.assertRaisesRegex(ValueError, 'mode must be '):
473            tar = self.taropen(tmpname, 'q')
474        with self.assertRaisesRegex(ValueError, 'mode must be '):
475            tar = self.taropen(tmpname, 'rw')
476        with self.assertRaisesRegex(ValueError, 'mode must be '):
477            tar = self.taropen(tmpname, '')
478
479    def test_fileobj_with_offset(self):
480        # Skip the first member and store values from the second member
481        # of the testtar.
482        tar = tarfile.open(self.tarname, mode=self.mode)
483        try:
484            tar.next()
485            t = tar.next()
486            name = t.name
487            offset = t.offset
488            with tar.extractfile(t) as f:
489                data = f.read()
490        finally:
491            tar.close()
492
493        # Open the testtar and seek to the offset of the second member.
494        with self.open(self.tarname) as fobj:
495            fobj.seek(offset)
496
497            # Test if the tarfile starts with the second member.
498            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
499                t = tar.next()
500                self.assertEqual(t.name, name)
501                # Read to the end of fileobj and test if seeking back to the
502                # beginning works.
503                tar.getmembers()
504                self.assertEqual(tar.extractfile(t).read(), data,
505                        "seek back did not work")
506
507    def test_fail_comp(self):
508        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
509        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
510        with open(tarname, "rb") as fobj:
511            self.assertRaises(tarfile.ReadError, tarfile.open,
512                              fileobj=fobj, mode=self.mode)
513
514    def test_v7_dirtype(self):
515        # Test old style dirtype member (bug #1336623):
516        # Old V7 tars create directory members using an AREGTYPE
517        # header with a "/" appended to the filename field.
518        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
519        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
520                "v7 dirtype failed")
521
522    def test_xstar_type(self):
523        # The xstar format stores extra atime and ctime fields inside the
524        # space reserved for the prefix field. The prefix field must be
525        # ignored in this case, otherwise it will mess up the name.
526        try:
527            self.tar.getmember("misc/regtype-xstar")
528        except KeyError:
529            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
530
531    def test_check_members(self):
532        for tarinfo in self.tar:
533            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
534                    "wrong mtime for %s" % tarinfo.name)
535            if not tarinfo.name.startswith("ustar/"):
536                continue
537            self.assertEqual(tarinfo.uname, "tarfile",
538                    "wrong uname for %s" % tarinfo.name)
539
540    def test_find_members(self):
541        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
542                "could not find all members")
543
544    @unittest.skipUnless(hasattr(os, "link"),
545                         "Missing hardlink implementation")
546    @support.skip_unless_symlink
547    def test_extract_hardlink(self):
548        # Test hardlink extraction (e.g. bug #857297).
549        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
550            tar.extract("ustar/regtype", TEMPDIR)
551            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
552
553            tar.extract("ustar/lnktype", TEMPDIR)
554            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
555            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
556                data = f.read()
557            self.assertEqual(sha256sum(data), sha256_regtype)
558
559            tar.extract("ustar/symtype", TEMPDIR)
560            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
561            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
562                data = f.read()
563            self.assertEqual(sha256sum(data), sha256_regtype)
564
565    def test_extractall(self):
566        # Test if extractall() correctly restores directory permissions
567        # and times (see issue1735).
568        tar = tarfile.open(tarname, encoding="iso8859-1")
569        DIR = os.path.join(TEMPDIR, "extractall")
570        os.mkdir(DIR)
571        try:
572            directories = [t for t in tar if t.isdir()]
573            tar.extractall(DIR, directories)
574            for tarinfo in directories:
575                path = os.path.join(DIR, tarinfo.name)
576                if sys.platform != "win32":
577                    # Win32 has no support for fine grained permissions.
578                    self.assertEqual(tarinfo.mode & 0o777,
579                                     os.stat(path).st_mode & 0o777)
580                def format_mtime(mtime):
581                    if isinstance(mtime, float):
582                        return "{} ({})".format(mtime, mtime.hex())
583                    else:
584                        return "{!r} (int)".format(mtime)
585                file_mtime = os.path.getmtime(path)
586                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
587                    format_mtime(tarinfo.mtime),
588                    format_mtime(file_mtime),
589                    path)
590                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
591        finally:
592            tar.close()
593            support.rmtree(DIR)
594
595    def test_extract_directory(self):
596        dirtype = "ustar/dirtype"
597        DIR = os.path.join(TEMPDIR, "extractdir")
598        os.mkdir(DIR)
599        try:
600            with tarfile.open(tarname, encoding="iso8859-1") as tar:
601                tarinfo = tar.getmember(dirtype)
602                tar.extract(tarinfo, path=DIR)
603                extracted = os.path.join(DIR, dirtype)
604                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
605                if sys.platform != "win32":
606                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
607        finally:
608            support.rmtree(DIR)
609
610    def test_extractall_pathlike_name(self):
611        DIR = pathlib.Path(TEMPDIR) / "extractall"
612        with support.temp_dir(DIR), \
613             tarfile.open(tarname, encoding="iso8859-1") as tar:
614            directories = [t for t in tar if t.isdir()]
615            tar.extractall(DIR, directories)
616            for tarinfo in directories:
617                path = DIR / tarinfo.name
618                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
619
620    def test_extract_pathlike_name(self):
621        dirtype = "ustar/dirtype"
622        DIR = pathlib.Path(TEMPDIR) / "extractall"
623        with support.temp_dir(DIR), \
624             tarfile.open(tarname, encoding="iso8859-1") as tar:
625            tarinfo = tar.getmember(dirtype)
626            tar.extract(tarinfo, path=DIR)
627            extracted = DIR / dirtype
628            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
629
630    def test_init_close_fobj(self):
631        # Issue #7341: Close the internal file object in the TarFile
632        # constructor in case of an error. For the test we rely on
633        # the fact that opening an empty file raises a ReadError.
634        empty = os.path.join(TEMPDIR, "empty")
635        with open(empty, "wb") as fobj:
636            fobj.write(b"")
637
638        try:
639            tar = object.__new__(tarfile.TarFile)
640            try:
641                tar.__init__(empty)
642            except tarfile.ReadError:
643                self.assertTrue(tar.fileobj.closed)
644            else:
645                self.fail("ReadError not raised")
646        finally:
647            support.unlink(empty)
648
649    def test_parallel_iteration(self):
650        # Issue #16601: Restarting iteration over tarfile continued
651        # from where it left off.
652        with tarfile.open(self.tarname) as tar:
653            for m1, m2 in zip(tar, tar):
654                self.assertEqual(m1.offset, m2.offset)
655                self.assertEqual(m1.get_info(), m2.get_info())
656
657class MiscReadTest(MiscReadTestBase, unittest.TestCase):
658    test_fail_comp = None
659
660class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
661    pass
662
663class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
664    def requires_name_attribute(self):
665        self.skipTest("BZ2File have no name attribute")
666
667class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
668    def requires_name_attribute(self):
669        self.skipTest("LZMAFile have no name attribute")
670
671
672class StreamReadTest(CommonReadTest, unittest.TestCase):
673
674    prefix="r|"
675
676    def test_read_through(self):
677        # Issue #11224: A poorly designed _FileInFile.read() method
678        # caused seeking errors with stream tar files.
679        for tarinfo in self.tar:
680            if not tarinfo.isreg():
681                continue
682            with self.tar.extractfile(tarinfo) as fobj:
683                while True:
684                    try:
685                        buf = fobj.read(512)
686                    except tarfile.StreamError:
687                        self.fail("simple read-through using "
688                                  "TarFile.extractfile() failed")
689                    if not buf:
690                        break
691
692    def test_fileobj_regular_file(self):
693        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
694        with self.tar.extractfile(tarinfo) as fobj:
695            data = fobj.read()
696        self.assertEqual(len(data), tarinfo.size,
697                "regular file extraction failed")
698        self.assertEqual(sha256sum(data), sha256_regtype,
699                "regular file extraction failed")
700
701    def test_provoke_stream_error(self):
702        tarinfos = self.tar.getmembers()
703        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
704            self.assertRaises(tarfile.StreamError, f.read)
705
706    def test_compare_members(self):
707        tar1 = tarfile.open(tarname, encoding="iso8859-1")
708        try:
709            tar2 = self.tar
710
711            while True:
712                t1 = tar1.next()
713                t2 = tar2.next()
714                if t1 is None:
715                    break
716                self.assertIsNotNone(t2, "stream.next() failed.")
717
718                if t2.islnk() or t2.issym():
719                    with self.assertRaises(tarfile.StreamError):
720                        tar2.extractfile(t2)
721                    continue
722
723                v1 = tar1.extractfile(t1)
724                v2 = tar2.extractfile(t2)
725                if v1 is None:
726                    continue
727                self.assertIsNotNone(v2, "stream.extractfile() failed")
728                self.assertEqual(v1.read(), v2.read(),
729                        "stream extraction failed")
730        finally:
731            tar1.close()
732
733class GzipStreamReadTest(GzipTest, StreamReadTest):
734    pass
735
736class Bz2StreamReadTest(Bz2Test, StreamReadTest):
737    pass
738
739class LzmaStreamReadTest(LzmaTest, StreamReadTest):
740    pass
741
742
743class DetectReadTest(TarTest, unittest.TestCase):
744    def _testfunc_file(self, name, mode):
745        try:
746            tar = tarfile.open(name, mode)
747        except tarfile.ReadError as e:
748            self.fail()
749        else:
750            tar.close()
751
752    def _testfunc_fileobj(self, name, mode):
753        try:
754            with open(name, "rb") as f:
755                tar = tarfile.open(name, mode, fileobj=f)
756        except tarfile.ReadError as e:
757            self.fail()
758        else:
759            tar.close()
760
761    def _test_modes(self, testfunc):
762        if self.suffix:
763            with self.assertRaises(tarfile.ReadError):
764                tarfile.open(tarname, mode="r:" + self.suffix)
765            with self.assertRaises(tarfile.ReadError):
766                tarfile.open(tarname, mode="r|" + self.suffix)
767            with self.assertRaises(tarfile.ReadError):
768                tarfile.open(self.tarname, mode="r:")
769            with self.assertRaises(tarfile.ReadError):
770                tarfile.open(self.tarname, mode="r|")
771        testfunc(self.tarname, "r")
772        testfunc(self.tarname, "r:" + self.suffix)
773        testfunc(self.tarname, "r:*")
774        testfunc(self.tarname, "r|" + self.suffix)
775        testfunc(self.tarname, "r|*")
776
777    def test_detect_file(self):
778        self._test_modes(self._testfunc_file)
779
780    def test_detect_fileobj(self):
781        self._test_modes(self._testfunc_fileobj)
782
783class GzipDetectReadTest(GzipTest, DetectReadTest):
784    pass
785
786class Bz2DetectReadTest(Bz2Test, DetectReadTest):
787    def test_detect_stream_bz2(self):
788        # Originally, tarfile's stream detection looked for the string
789        # "BZh91" at the start of the file. This is incorrect because
790        # the '9' represents the blocksize (900,000 bytes). If the file was
791        # compressed using another blocksize autodetection fails.
792        with open(tarname, "rb") as fobj:
793            data = fobj.read()
794
795        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
796        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
797            fobj.write(data)
798
799        self._testfunc_file(tmpname, "r|*")
800
801class LzmaDetectReadTest(LzmaTest, DetectReadTest):
802    pass
803
804
805class MemberReadTest(ReadTest, unittest.TestCase):
806
807    def _test_member(self, tarinfo, chksum=None, **kwargs):
808        if chksum is not None:
809            with self.tar.extractfile(tarinfo) as f:
810                self.assertEqual(sha256sum(f.read()), chksum,
811                        "wrong sha256sum for %s" % tarinfo.name)
812
813        kwargs["mtime"] = 0o7606136617
814        kwargs["uid"] = 1000
815        kwargs["gid"] = 100
816        if "old-v7" not in tarinfo.name:
817            # V7 tar can't handle alphabetic owners.
818            kwargs["uname"] = "tarfile"
819            kwargs["gname"] = "tarfile"
820        for k, v in kwargs.items():
821            self.assertEqual(getattr(tarinfo, k), v,
822                    "wrong value in %s field of %s" % (k, tarinfo.name))
823
824    def test_find_regtype(self):
825        tarinfo = self.tar.getmember("ustar/regtype")
826        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
827
828    def test_find_conttype(self):
829        tarinfo = self.tar.getmember("ustar/conttype")
830        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
831
832    def test_find_dirtype(self):
833        tarinfo = self.tar.getmember("ustar/dirtype")
834        self._test_member(tarinfo, size=0)
835
836    def test_find_dirtype_with_size(self):
837        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
838        self._test_member(tarinfo, size=255)
839
840    def test_find_lnktype(self):
841        tarinfo = self.tar.getmember("ustar/lnktype")
842        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
843
844    def test_find_symtype(self):
845        tarinfo = self.tar.getmember("ustar/symtype")
846        self._test_member(tarinfo, size=0, linkname="regtype")
847
848    def test_find_blktype(self):
849        tarinfo = self.tar.getmember("ustar/blktype")
850        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
851
852    def test_find_chrtype(self):
853        tarinfo = self.tar.getmember("ustar/chrtype")
854        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
855
856    def test_find_fifotype(self):
857        tarinfo = self.tar.getmember("ustar/fifotype")
858        self._test_member(tarinfo, size=0)
859
860    def test_find_sparse(self):
861        tarinfo = self.tar.getmember("ustar/sparse")
862        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
863
864    def test_find_gnusparse(self):
865        tarinfo = self.tar.getmember("gnu/sparse")
866        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
867
868    def test_find_gnusparse_00(self):
869        tarinfo = self.tar.getmember("gnu/sparse-0.0")
870        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
871
872    def test_find_gnusparse_01(self):
873        tarinfo = self.tar.getmember("gnu/sparse-0.1")
874        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
875
876    def test_find_gnusparse_10(self):
877        tarinfo = self.tar.getmember("gnu/sparse-1.0")
878        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
879
880    def test_find_umlauts(self):
881        tarinfo = self.tar.getmember("ustar/umlauts-"
882                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
883        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
884
885    def test_find_ustar_longname(self):
886        name = "ustar/" + "12345/" * 39 + "1234567/longname"
887        self.assertIn(name, self.tar.getnames())
888
889    def test_find_regtype_oldv7(self):
890        tarinfo = self.tar.getmember("misc/regtype-old-v7")
891        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
892
893    def test_find_pax_umlauts(self):
894        self.tar.close()
895        self.tar = tarfile.open(self.tarname, mode=self.mode,
896                                encoding="iso8859-1")
897        tarinfo = self.tar.getmember("pax/umlauts-"
898                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
899        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
900
901
902class LongnameTest:
903
904    def test_read_longname(self):
905        # Test reading of longname (bug #1471427).
906        longname = self.subdir + "/" + "123/" * 125 + "longname"
907        try:
908            tarinfo = self.tar.getmember(longname)
909        except KeyError:
910            self.fail("longname not found")
911        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
912                "read longname as dirtype")
913
914    def test_read_longlink(self):
915        longname = self.subdir + "/" + "123/" * 125 + "longname"
916        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
917        try:
918            tarinfo = self.tar.getmember(longlink)
919        except KeyError:
920            self.fail("longlink not found")
921        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
922
923    def test_truncated_longname(self):
924        longname = self.subdir + "/" + "123/" * 125 + "longname"
925        tarinfo = self.tar.getmember(longname)
926        offset = tarinfo.offset
927        self.tar.fileobj.seek(offset)
928        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
929        with self.assertRaises(tarfile.ReadError):
930            tarfile.open(name="foo.tar", fileobj=fobj)
931
932    def test_header_offset(self):
933        # Test if the start offset of the TarInfo object includes
934        # the preceding extended header.
935        longname = self.subdir + "/" + "123/" * 125 + "longname"
936        offset = self.tar.getmember(longname).offset
937        with open(tarname, "rb") as fobj:
938            fobj.seek(offset)
939            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
940                                              "iso8859-1", "strict")
941            self.assertEqual(tarinfo.type, self.longnametype)
942
943
944class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
945
946    subdir = "gnu"
947    longnametype = tarfile.GNUTYPE_LONGNAME
948
949    # Since 3.2 tarfile is supposed to accurately restore sparse members and
950    # produce files with holes. This is what we actually want to test here.
951    # Unfortunately, not all platforms/filesystems support sparse files, and
952    # even on platforms that do it is non-trivial to make reliable assertions
953    # about holes in files. Therefore, we first do one basic test which works
954    # an all platforms, and after that a test that will work only on
955    # platforms/filesystems that prove to support sparse files.
956    def _test_sparse_file(self, name):
957        self.tar.extract(name, TEMPDIR)
958        filename = os.path.join(TEMPDIR, name)
959        with open(filename, "rb") as fobj:
960            data = fobj.read()
961        self.assertEqual(sha256sum(data), sha256_sparse,
962                "wrong sha256sum for %s" % name)
963
964        if self._fs_supports_holes():
965            s = os.stat(filename)
966            self.assertLess(s.st_blocks * 512, s.st_size)
967
968    def test_sparse_file_old(self):
969        self._test_sparse_file("gnu/sparse")
970
971    def test_sparse_file_00(self):
972        self._test_sparse_file("gnu/sparse-0.0")
973
974    def test_sparse_file_01(self):
975        self._test_sparse_file("gnu/sparse-0.1")
976
977    def test_sparse_file_10(self):
978        self._test_sparse_file("gnu/sparse-1.0")
979
980    @staticmethod
981    def _fs_supports_holes():
982        # Return True if the platform knows the st_blocks stat attribute and
983        # uses st_blocks units of 512 bytes, and if the filesystem is able to
984        # store holes of 4 KiB in files.
985        #
986        # The function returns False if page size is larger than 4 KiB.
987        # For example, ppc64 uses pages of 64 KiB.
988        if sys.platform.startswith("linux"):
989            # Linux evidentially has 512 byte st_blocks units.
990            name = os.path.join(TEMPDIR, "sparse-test")
991            with open(name, "wb") as fobj:
992                # Seek to "punch a hole" of 4 KiB
993                fobj.seek(4096)
994                fobj.write(b'x' * 4096)
995                fobj.truncate()
996            s = os.stat(name)
997            support.unlink(name)
998            return (s.st_blocks * 512 < s.st_size)
999        else:
1000            return False
1001
1002
1003class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1004
1005    subdir = "pax"
1006    longnametype = tarfile.XHDTYPE
1007
1008    def test_pax_global_headers(self):
1009        tar = tarfile.open(tarname, encoding="iso8859-1")
1010        try:
1011            tarinfo = tar.getmember("pax/regtype1")
1012            self.assertEqual(tarinfo.uname, "foo")
1013            self.assertEqual(tarinfo.gname, "bar")
1014            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1015                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1016
1017            tarinfo = tar.getmember("pax/regtype2")
1018            self.assertEqual(tarinfo.uname, "")
1019            self.assertEqual(tarinfo.gname, "bar")
1020            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1021                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1022
1023            tarinfo = tar.getmember("pax/regtype3")
1024            self.assertEqual(tarinfo.uname, "tarfile")
1025            self.assertEqual(tarinfo.gname, "tarfile")
1026            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1027                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1028        finally:
1029            tar.close()
1030
1031    def test_pax_number_fields(self):
1032        # All following number fields are read from the pax header.
1033        tar = tarfile.open(tarname, encoding="iso8859-1")
1034        try:
1035            tarinfo = tar.getmember("pax/regtype4")
1036            self.assertEqual(tarinfo.size, 7011)
1037            self.assertEqual(tarinfo.uid, 123)
1038            self.assertEqual(tarinfo.gid, 123)
1039            self.assertEqual(tarinfo.mtime, 1041808783.0)
1040            self.assertEqual(type(tarinfo.mtime), float)
1041            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1042            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1043        finally:
1044            tar.close()
1045
1046
1047class WriteTestBase(TarTest):
1048    # Put all write tests in here that are supposed to be tested
1049    # in all possible mode combinations.
1050
1051    def test_fileobj_no_close(self):
1052        fobj = io.BytesIO()
1053        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1054            tar.addfile(tarfile.TarInfo("foo"))
1055        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1056        # Issue #20238: Incomplete gzip output with mode="w:gz"
1057        data = fobj.getvalue()
1058        del tar
1059        support.gc_collect()
1060        self.assertFalse(fobj.closed)
1061        self.assertEqual(data, fobj.getvalue())
1062
1063    def test_eof_marker(self):
1064        # Make sure an end of archive marker is written (two zero blocks).
1065        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1066        # So, we create an archive that has exactly 10240 bytes without the
1067        # marker, and has 20480 bytes once the marker is written.
1068        with tarfile.open(tmpname, self.mode) as tar:
1069            t = tarfile.TarInfo("foo")
1070            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1071            tar.addfile(t, io.BytesIO(b"a" * t.size))
1072
1073        with self.open(tmpname, "rb") as fobj:
1074            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1075
1076
1077class WriteTest(WriteTestBase, unittest.TestCase):
1078
1079    prefix = "w:"
1080
1081    def test_100_char_name(self):
1082        # The name field in a tar header stores strings of at most 100 chars.
1083        # If a string is shorter than 100 chars it has to be padded with '\0',
1084        # which implies that a string of exactly 100 chars is stored without
1085        # a trailing '\0'.
1086        name = "0123456789" * 10
1087        tar = tarfile.open(tmpname, self.mode)
1088        try:
1089            t = tarfile.TarInfo(name)
1090            tar.addfile(t)
1091        finally:
1092            tar.close()
1093
1094        tar = tarfile.open(tmpname)
1095        try:
1096            self.assertEqual(tar.getnames()[0], name,
1097                    "failed to store 100 char filename")
1098        finally:
1099            tar.close()
1100
1101    def test_tar_size(self):
1102        # Test for bug #1013882.
1103        tar = tarfile.open(tmpname, self.mode)
1104        try:
1105            path = os.path.join(TEMPDIR, "file")
1106            with open(path, "wb") as fobj:
1107                fobj.write(b"aaa")
1108            tar.add(path)
1109        finally:
1110            tar.close()
1111        self.assertGreater(os.path.getsize(tmpname), 0,
1112                "tarfile is empty")
1113
1114    # The test_*_size tests test for bug #1167128.
1115    def test_file_size(self):
1116        tar = tarfile.open(tmpname, self.mode)
1117        try:
1118            path = os.path.join(TEMPDIR, "file")
1119            with open(path, "wb"):
1120                pass
1121            tarinfo = tar.gettarinfo(path)
1122            self.assertEqual(tarinfo.size, 0)
1123
1124            with open(path, "wb") as fobj:
1125                fobj.write(b"aaa")
1126            tarinfo = tar.gettarinfo(path)
1127            self.assertEqual(tarinfo.size, 3)
1128        finally:
1129            tar.close()
1130
1131    def test_directory_size(self):
1132        path = os.path.join(TEMPDIR, "directory")
1133        os.mkdir(path)
1134        try:
1135            tar = tarfile.open(tmpname, self.mode)
1136            try:
1137                tarinfo = tar.gettarinfo(path)
1138                self.assertEqual(tarinfo.size, 0)
1139            finally:
1140                tar.close()
1141        finally:
1142            support.rmdir(path)
1143
1144    # mock the following:
1145    #  os.listdir: so we know that files are in the wrong order
1146    def test_ordered_recursion(self):
1147        path = os.path.join(TEMPDIR, "directory")
1148        os.mkdir(path)
1149        open(os.path.join(path, "1"), "a").close()
1150        open(os.path.join(path, "2"), "a").close()
1151        try:
1152            tar = tarfile.open(tmpname, self.mode)
1153            try:
1154                with unittest.mock.patch('os.listdir') as mock_listdir:
1155                    mock_listdir.return_value = ["2", "1"]
1156                    tar.add(path)
1157                paths = []
1158                for m in tar.getmembers():
1159                    paths.append(os.path.split(m.name)[-1])
1160                self.assertEqual(paths, ["directory", "1", "2"]);
1161            finally:
1162                tar.close()
1163        finally:
1164            support.unlink(os.path.join(path, "1"))
1165            support.unlink(os.path.join(path, "2"))
1166            support.rmdir(path)
1167
1168    def test_gettarinfo_pathlike_name(self):
1169        with tarfile.open(tmpname, self.mode) as tar:
1170            path = pathlib.Path(TEMPDIR) / "file"
1171            with open(path, "wb") as fobj:
1172                fobj.write(b"aaa")
1173            tarinfo = tar.gettarinfo(path)
1174            tarinfo2 = tar.gettarinfo(os.fspath(path))
1175            self.assertIsInstance(tarinfo.name, str)
1176            self.assertEqual(tarinfo.name, tarinfo2.name)
1177            self.assertEqual(tarinfo.size, 3)
1178
1179    @unittest.skipUnless(hasattr(os, "link"),
1180                         "Missing hardlink implementation")
1181    def test_link_size(self):
1182        link = os.path.join(TEMPDIR, "link")
1183        target = os.path.join(TEMPDIR, "link_target")
1184        with open(target, "wb") as fobj:
1185            fobj.write(b"aaa")
1186        try:
1187            os.link(target, link)
1188        except PermissionError as e:
1189            self.skipTest('os.link(): %s' % e)
1190        try:
1191            tar = tarfile.open(tmpname, self.mode)
1192            try:
1193                # Record the link target in the inodes list.
1194                tar.gettarinfo(target)
1195                tarinfo = tar.gettarinfo(link)
1196                self.assertEqual(tarinfo.size, 0)
1197            finally:
1198                tar.close()
1199        finally:
1200            support.unlink(target)
1201            support.unlink(link)
1202
1203    @support.skip_unless_symlink
1204    def test_symlink_size(self):
1205        path = os.path.join(TEMPDIR, "symlink")
1206        os.symlink("link_target", path)
1207        try:
1208            tar = tarfile.open(tmpname, self.mode)
1209            try:
1210                tarinfo = tar.gettarinfo(path)
1211                self.assertEqual(tarinfo.size, 0)
1212            finally:
1213                tar.close()
1214        finally:
1215            support.unlink(path)
1216
1217    def test_add_self(self):
1218        # Test for #1257255.
1219        dstname = os.path.abspath(tmpname)
1220        tar = tarfile.open(tmpname, self.mode)
1221        try:
1222            self.assertEqual(tar.name, dstname,
1223                    "archive name must be absolute")
1224            tar.add(dstname)
1225            self.assertEqual(tar.getnames(), [],
1226                    "added the archive to itself")
1227
1228            with support.change_cwd(TEMPDIR):
1229                tar.add(dstname)
1230            self.assertEqual(tar.getnames(), [],
1231                    "added the archive to itself")
1232        finally:
1233            tar.close()
1234
1235    def test_filter(self):
1236        tempdir = os.path.join(TEMPDIR, "filter")
1237        os.mkdir(tempdir)
1238        try:
1239            for name in ("foo", "bar", "baz"):
1240                name = os.path.join(tempdir, name)
1241                support.create_empty_file(name)
1242
1243            def filter(tarinfo):
1244                if os.path.basename(tarinfo.name) == "bar":
1245                    return
1246                tarinfo.uid = 123
1247                tarinfo.uname = "foo"
1248                return tarinfo
1249
1250            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1251            try:
1252                tar.add(tempdir, arcname="empty_dir", filter=filter)
1253            finally:
1254                tar.close()
1255
1256            # Verify that filter is a keyword-only argument
1257            with self.assertRaises(TypeError):
1258                tar.add(tempdir, "empty_dir", True, None, filter)
1259
1260            tar = tarfile.open(tmpname, "r")
1261            try:
1262                for tarinfo in tar:
1263                    self.assertEqual(tarinfo.uid, 123)
1264                    self.assertEqual(tarinfo.uname, "foo")
1265                self.assertEqual(len(tar.getmembers()), 3)
1266            finally:
1267                tar.close()
1268        finally:
1269            support.rmtree(tempdir)
1270
1271    # Guarantee that stored pathnames are not modified. Don't
1272    # remove ./ or ../ or double slashes. Still make absolute
1273    # pathnames relative.
1274    # For details see bug #6054.
1275    def _test_pathname(self, path, cmp_path=None, dir=False):
1276        # Create a tarfile with an empty member named path
1277        # and compare the stored name with the original.
1278        foo = os.path.join(TEMPDIR, "foo")
1279        if not dir:
1280            support.create_empty_file(foo)
1281        else:
1282            os.mkdir(foo)
1283
1284        tar = tarfile.open(tmpname, self.mode)
1285        try:
1286            tar.add(foo, arcname=path)
1287        finally:
1288            tar.close()
1289
1290        tar = tarfile.open(tmpname, "r")
1291        try:
1292            t = tar.next()
1293        finally:
1294            tar.close()
1295
1296        if not dir:
1297            support.unlink(foo)
1298        else:
1299            support.rmdir(foo)
1300
1301        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1302
1303
1304    @support.skip_unless_symlink
1305    def test_extractall_symlinks(self):
1306        # Test if extractall works properly when tarfile contains symlinks
1307        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1308        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1309        os.mkdir(tempdir)
1310        try:
1311            source_file = os.path.join(tempdir,'source')
1312            target_file = os.path.join(tempdir,'symlink')
1313            with open(source_file,'w') as f:
1314                f.write('something\n')
1315            os.symlink(source_file, target_file)
1316            with tarfile.open(temparchive, 'w') as tar:
1317                tar.add(source_file)
1318                tar.add(target_file)
1319            # Let's extract it to the location which contains the symlink
1320            with tarfile.open(temparchive) as tar:
1321                # this should not raise OSError: [Errno 17] File exists
1322                try:
1323                    tar.extractall(path=tempdir)
1324                except OSError:
1325                    self.fail("extractall failed with symlinked files")
1326        finally:
1327            support.unlink(temparchive)
1328            support.rmtree(tempdir)
1329
1330    def test_pathnames(self):
1331        self._test_pathname("foo")
1332        self._test_pathname(os.path.join("foo", ".", "bar"))
1333        self._test_pathname(os.path.join("foo", "..", "bar"))
1334        self._test_pathname(os.path.join(".", "foo"))
1335        self._test_pathname(os.path.join(".", "foo", "."))
1336        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1337        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1338        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1339        self._test_pathname(os.path.join("..", "foo"))
1340        self._test_pathname(os.path.join("..", "foo", ".."))
1341        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1342        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1343
1344        self._test_pathname("foo" + os.sep + os.sep + "bar")
1345        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1346
1347    def test_abs_pathnames(self):
1348        if sys.platform == "win32":
1349            self._test_pathname("C:\\foo", "foo")
1350        else:
1351            self._test_pathname("/foo", "foo")
1352            self._test_pathname("///foo", "foo")
1353
1354    def test_cwd(self):
1355        # Test adding the current working directory.
1356        with support.change_cwd(TEMPDIR):
1357            tar = tarfile.open(tmpname, self.mode)
1358            try:
1359                tar.add(".")
1360            finally:
1361                tar.close()
1362
1363            tar = tarfile.open(tmpname, "r")
1364            try:
1365                for t in tar:
1366                    if t.name != ".":
1367                        self.assertTrue(t.name.startswith("./"), t.name)
1368            finally:
1369                tar.close()
1370
1371    def test_open_nonwritable_fileobj(self):
1372        for exctype in OSError, EOFError, RuntimeError:
1373            class BadFile(io.BytesIO):
1374                first = True
1375                def write(self, data):
1376                    if self.first:
1377                        self.first = False
1378                        raise exctype
1379
1380            f = BadFile()
1381            with self.assertRaises(exctype):
1382                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1383                                   format=tarfile.PAX_FORMAT,
1384                                   pax_headers={'non': 'empty'})
1385            self.assertFalse(f.closed)
1386
1387class GzipWriteTest(GzipTest, WriteTest):
1388    pass
1389
1390class Bz2WriteTest(Bz2Test, WriteTest):
1391    pass
1392
1393class LzmaWriteTest(LzmaTest, WriteTest):
1394    pass
1395
1396
1397class StreamWriteTest(WriteTestBase, unittest.TestCase):
1398
1399    prefix = "w|"
1400    decompressor = None
1401
1402    def test_stream_padding(self):
1403        # Test for bug #1543303.
1404        tar = tarfile.open(tmpname, self.mode)
1405        tar.close()
1406        if self.decompressor:
1407            dec = self.decompressor()
1408            with open(tmpname, "rb") as fobj:
1409                data = fobj.read()
1410            data = dec.decompress(data)
1411            self.assertFalse(dec.unused_data, "found trailing data")
1412        else:
1413            with self.open(tmpname) as fobj:
1414                data = fobj.read()
1415        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1416                        "incorrect zero padding")
1417
1418    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1419                         "Missing umask implementation")
1420    def test_file_mode(self):
1421        # Test for issue #8464: Create files with correct
1422        # permissions.
1423        if os.path.exists(tmpname):
1424            support.unlink(tmpname)
1425
1426        original_umask = os.umask(0o022)
1427        try:
1428            tar = tarfile.open(tmpname, self.mode)
1429            tar.close()
1430            mode = os.stat(tmpname).st_mode & 0o777
1431            self.assertEqual(mode, 0o644, "wrong file permissions")
1432        finally:
1433            os.umask(original_umask)
1434
1435class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1436    pass
1437
1438class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1439    decompressor = bz2.BZ2Decompressor if bz2 else None
1440
1441class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1442    decompressor = lzma.LZMADecompressor if lzma else None
1443
1444
1445class GNUWriteTest(unittest.TestCase):
1446    # This testcase checks for correct creation of GNU Longname
1447    # and Longlink extended headers (cp. bug #812325).
1448
1449    def _length(self, s):
1450        blocks = len(s) // 512 + 1
1451        return blocks * 512
1452
1453    def _calc_size(self, name, link=None):
1454        # Initial tar header
1455        count = 512
1456
1457        if len(name) > tarfile.LENGTH_NAME:
1458            # GNU longname extended header + longname
1459            count += 512
1460            count += self._length(name)
1461        if link is not None and len(link) > tarfile.LENGTH_LINK:
1462            # GNU longlink extended header + longlink
1463            count += 512
1464            count += self._length(link)
1465        return count
1466
1467    def _test(self, name, link=None):
1468        tarinfo = tarfile.TarInfo(name)
1469        if link:
1470            tarinfo.linkname = link
1471            tarinfo.type = tarfile.LNKTYPE
1472
1473        tar = tarfile.open(tmpname, "w")
1474        try:
1475            tar.format = tarfile.GNU_FORMAT
1476            tar.addfile(tarinfo)
1477
1478            v1 = self._calc_size(name, link)
1479            v2 = tar.offset
1480            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1481        finally:
1482            tar.close()
1483
1484        tar = tarfile.open(tmpname)
1485        try:
1486            member = tar.next()
1487            self.assertIsNotNone(member,
1488                    "unable to read longname member")
1489            self.assertEqual(tarinfo.name, member.name,
1490                    "unable to read longname member")
1491            self.assertEqual(tarinfo.linkname, member.linkname,
1492                    "unable to read longname member")
1493        finally:
1494            tar.close()
1495
1496    def test_longname_1023(self):
1497        self._test(("longnam/" * 127) + "longnam")
1498
1499    def test_longname_1024(self):
1500        self._test(("longnam/" * 127) + "longname")
1501
1502    def test_longname_1025(self):
1503        self._test(("longnam/" * 127) + "longname_")
1504
1505    def test_longlink_1023(self):
1506        self._test("name", ("longlnk/" * 127) + "longlnk")
1507
1508    def test_longlink_1024(self):
1509        self._test("name", ("longlnk/" * 127) + "longlink")
1510
1511    def test_longlink_1025(self):
1512        self._test("name", ("longlnk/" * 127) + "longlink_")
1513
1514    def test_longnamelink_1023(self):
1515        self._test(("longnam/" * 127) + "longnam",
1516                   ("longlnk/" * 127) + "longlnk")
1517
1518    def test_longnamelink_1024(self):
1519        self._test(("longnam/" * 127) + "longname",
1520                   ("longlnk/" * 127) + "longlink")
1521
1522    def test_longnamelink_1025(self):
1523        self._test(("longnam/" * 127) + "longname_",
1524                   ("longlnk/" * 127) + "longlink_")
1525
1526
1527class CreateTest(WriteTestBase, unittest.TestCase):
1528
1529    prefix = "x:"
1530
1531    file_path = os.path.join(TEMPDIR, "spameggs42")
1532
1533    def setUp(self):
1534        support.unlink(tmpname)
1535
1536    @classmethod
1537    def setUpClass(cls):
1538        with open(cls.file_path, "wb") as fobj:
1539            fobj.write(b"aaa")
1540
1541    @classmethod
1542    def tearDownClass(cls):
1543        support.unlink(cls.file_path)
1544
1545    def test_create(self):
1546        with tarfile.open(tmpname, self.mode) as tobj:
1547            tobj.add(self.file_path)
1548
1549        with self.taropen(tmpname) as tobj:
1550            names = tobj.getnames()
1551        self.assertEqual(len(names), 1)
1552        self.assertIn('spameggs42', names[0])
1553
1554    def test_create_existing(self):
1555        with tarfile.open(tmpname, self.mode) as tobj:
1556            tobj.add(self.file_path)
1557
1558        with self.assertRaises(FileExistsError):
1559            tobj = tarfile.open(tmpname, self.mode)
1560
1561        with self.taropen(tmpname) as tobj:
1562            names = tobj.getnames()
1563        self.assertEqual(len(names), 1)
1564        self.assertIn('spameggs42', names[0])
1565
1566    def test_create_taropen(self):
1567        with self.taropen(tmpname, "x") as tobj:
1568            tobj.add(self.file_path)
1569
1570        with self.taropen(tmpname) as tobj:
1571            names = tobj.getnames()
1572        self.assertEqual(len(names), 1)
1573        self.assertIn('spameggs42', names[0])
1574
1575    def test_create_existing_taropen(self):
1576        with self.taropen(tmpname, "x") as tobj:
1577            tobj.add(self.file_path)
1578
1579        with self.assertRaises(FileExistsError):
1580            with self.taropen(tmpname, "x"):
1581                pass
1582
1583        with self.taropen(tmpname) as tobj:
1584            names = tobj.getnames()
1585        self.assertEqual(len(names), 1)
1586        self.assertIn("spameggs42", names[0])
1587
1588    def test_create_pathlike_name(self):
1589        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1590            self.assertIsInstance(tobj.name, str)
1591            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1592            tobj.add(pathlib.Path(self.file_path))
1593            names = tobj.getnames()
1594        self.assertEqual(len(names), 1)
1595        self.assertIn('spameggs42', names[0])
1596
1597        with self.taropen(tmpname) as tobj:
1598            names = tobj.getnames()
1599        self.assertEqual(len(names), 1)
1600        self.assertIn('spameggs42', names[0])
1601
1602    def test_create_taropen_pathlike_name(self):
1603        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1604            self.assertIsInstance(tobj.name, str)
1605            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1606            tobj.add(pathlib.Path(self.file_path))
1607            names = tobj.getnames()
1608        self.assertEqual(len(names), 1)
1609        self.assertIn('spameggs42', names[0])
1610
1611        with self.taropen(tmpname) as tobj:
1612            names = tobj.getnames()
1613        self.assertEqual(len(names), 1)
1614        self.assertIn('spameggs42', names[0])
1615
1616
1617class GzipCreateTest(GzipTest, CreateTest):
1618    pass
1619
1620
1621class Bz2CreateTest(Bz2Test, CreateTest):
1622    pass
1623
1624
1625class LzmaCreateTest(LzmaTest, CreateTest):
1626    pass
1627
1628
1629class CreateWithXModeTest(CreateTest):
1630
1631    prefix = "x"
1632
1633    test_create_taropen = None
1634    test_create_existing_taropen = None
1635
1636
1637@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1638class HardlinkTest(unittest.TestCase):
1639    # Test the creation of LNKTYPE (hardlink) members in an archive.
1640
1641    def setUp(self):
1642        self.foo = os.path.join(TEMPDIR, "foo")
1643        self.bar = os.path.join(TEMPDIR, "bar")
1644
1645        with open(self.foo, "wb") as fobj:
1646            fobj.write(b"foo")
1647
1648        try:
1649            os.link(self.foo, self.bar)
1650        except PermissionError as e:
1651            self.skipTest('os.link(): %s' % e)
1652
1653        self.tar = tarfile.open(tmpname, "w")
1654        self.tar.add(self.foo)
1655
1656    def tearDown(self):
1657        self.tar.close()
1658        support.unlink(self.foo)
1659        support.unlink(self.bar)
1660
1661    def test_add_twice(self):
1662        # The same name will be added as a REGTYPE every
1663        # time regardless of st_nlink.
1664        tarinfo = self.tar.gettarinfo(self.foo)
1665        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1666                "add file as regular failed")
1667
1668    def test_add_hardlink(self):
1669        tarinfo = self.tar.gettarinfo(self.bar)
1670        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1671                "add file as hardlink failed")
1672
1673    def test_dereference_hardlink(self):
1674        self.tar.dereference = True
1675        tarinfo = self.tar.gettarinfo(self.bar)
1676        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1677                "dereferencing hardlink failed")
1678
1679
1680class PaxWriteTest(GNUWriteTest):
1681
1682    def _test(self, name, link=None):
1683        # See GNUWriteTest.
1684        tarinfo = tarfile.TarInfo(name)
1685        if link:
1686            tarinfo.linkname = link
1687            tarinfo.type = tarfile.LNKTYPE
1688
1689        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1690        try:
1691            tar.addfile(tarinfo)
1692        finally:
1693            tar.close()
1694
1695        tar = tarfile.open(tmpname)
1696        try:
1697            if link:
1698                l = tar.getmembers()[0].linkname
1699                self.assertEqual(link, l, "PAX longlink creation failed")
1700            else:
1701                n = tar.getmembers()[0].name
1702                self.assertEqual(name, n, "PAX longname creation failed")
1703        finally:
1704            tar.close()
1705
1706    def test_pax_global_header(self):
1707        pax_headers = {
1708                "foo": "bar",
1709                "uid": "0",
1710                "mtime": "1.23",
1711                "test": "\xe4\xf6\xfc",
1712                "\xe4\xf6\xfc": "test"}
1713
1714        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1715                pax_headers=pax_headers)
1716        try:
1717            tar.addfile(tarfile.TarInfo("test"))
1718        finally:
1719            tar.close()
1720
1721        # Test if the global header was written correctly.
1722        tar = tarfile.open(tmpname, encoding="iso8859-1")
1723        try:
1724            self.assertEqual(tar.pax_headers, pax_headers)
1725            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1726            # Test if all the fields are strings.
1727            for key, val in tar.pax_headers.items():
1728                self.assertIsNot(type(key), bytes)
1729                self.assertIsNot(type(val), bytes)
1730                if key in tarfile.PAX_NUMBER_FIELDS:
1731                    try:
1732                        tarfile.PAX_NUMBER_FIELDS[key](val)
1733                    except (TypeError, ValueError):
1734                        self.fail("unable to convert pax header field")
1735        finally:
1736            tar.close()
1737
1738    def test_pax_extended_header(self):
1739        # The fields from the pax header have priority over the
1740        # TarInfo.
1741        pax_headers = {"path": "foo", "uid": "123"}
1742
1743        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1744                           encoding="iso8859-1")
1745        try:
1746            t = tarfile.TarInfo()
1747            t.name = "\xe4\xf6\xfc" # non-ASCII
1748            t.uid = 8**8 # too large
1749            t.pax_headers = pax_headers
1750            tar.addfile(t)
1751        finally:
1752            tar.close()
1753
1754        tar = tarfile.open(tmpname, encoding="iso8859-1")
1755        try:
1756            t = tar.getmembers()[0]
1757            self.assertEqual(t.pax_headers, pax_headers)
1758            self.assertEqual(t.name, "foo")
1759            self.assertEqual(t.uid, 123)
1760        finally:
1761            tar.close()
1762
1763
1764class UnicodeTest:
1765
1766    def test_iso8859_1_filename(self):
1767        self._test_unicode_filename("iso8859-1")
1768
1769    def test_utf7_filename(self):
1770        self._test_unicode_filename("utf7")
1771
1772    def test_utf8_filename(self):
1773        self._test_unicode_filename("utf-8")
1774
1775    def _test_unicode_filename(self, encoding):
1776        tar = tarfile.open(tmpname, "w", format=self.format,
1777                           encoding=encoding, errors="strict")
1778        try:
1779            name = "\xe4\xf6\xfc"
1780            tar.addfile(tarfile.TarInfo(name))
1781        finally:
1782            tar.close()
1783
1784        tar = tarfile.open(tmpname, encoding=encoding)
1785        try:
1786            self.assertEqual(tar.getmembers()[0].name, name)
1787        finally:
1788            tar.close()
1789
1790    def test_unicode_filename_error(self):
1791        tar = tarfile.open(tmpname, "w", format=self.format,
1792                           encoding="ascii", errors="strict")
1793        try:
1794            tarinfo = tarfile.TarInfo()
1795
1796            tarinfo.name = "\xe4\xf6\xfc"
1797            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1798
1799            tarinfo.name = "foo"
1800            tarinfo.uname = "\xe4\xf6\xfc"
1801            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1802        finally:
1803            tar.close()
1804
1805    def test_unicode_argument(self):
1806        tar = tarfile.open(tarname, "r",
1807                           encoding="iso8859-1", errors="strict")
1808        try:
1809            for t in tar:
1810                self.assertIs(type(t.name), str)
1811                self.assertIs(type(t.linkname), str)
1812                self.assertIs(type(t.uname), str)
1813                self.assertIs(type(t.gname), str)
1814        finally:
1815            tar.close()
1816
1817    def test_uname_unicode(self):
1818        t = tarfile.TarInfo("foo")
1819        t.uname = "\xe4\xf6\xfc"
1820        t.gname = "\xe4\xf6\xfc"
1821
1822        tar = tarfile.open(tmpname, mode="w", format=self.format,
1823                           encoding="iso8859-1")
1824        try:
1825            tar.addfile(t)
1826        finally:
1827            tar.close()
1828
1829        tar = tarfile.open(tmpname, encoding="iso8859-1")
1830        try:
1831            t = tar.getmember("foo")
1832            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1833            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1834
1835            if self.format != tarfile.PAX_FORMAT:
1836                tar.close()
1837                tar = tarfile.open(tmpname, encoding="ascii")
1838                t = tar.getmember("foo")
1839                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1840                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1841        finally:
1842            tar.close()
1843
1844
1845class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1846
1847    format = tarfile.USTAR_FORMAT
1848
1849    # Test whether the utf-8 encoded version of a filename exceeds the 100
1850    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1851    # bytes).
1852    def test_unicode_name1(self):
1853        self._test_ustar_name("0123456789" * 10)
1854        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1855        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1856        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1857
1858    def test_unicode_name2(self):
1859        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1860        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1861
1862    # Test whether the utf-8 encoded version of a filename exceeds the 155
1863    # bytes prefix + '/' + 100 bytes name limit.
1864    def test_unicode_longname1(self):
1865        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1866        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1867        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1868        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1869
1870    def test_unicode_longname2(self):
1871        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1872        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1873
1874    def test_unicode_longname3(self):
1875        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1876        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1877        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1878
1879    def test_unicode_longname4(self):
1880        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1881        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1882
1883    def _test_ustar_name(self, name, exc=None):
1884        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1885            t = tarfile.TarInfo(name)
1886            if exc is None:
1887                tar.addfile(t)
1888            else:
1889                self.assertRaises(exc, tar.addfile, t)
1890
1891        if exc is None:
1892            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1893                for t in tar:
1894                    self.assertEqual(name, t.name)
1895                    break
1896
1897    # Test the same as above for the 100 bytes link field.
1898    def test_unicode_link1(self):
1899        self._test_ustar_link("0123456789" * 10)
1900        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1901        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1902        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1903
1904    def test_unicode_link2(self):
1905        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1906        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1907
1908    def _test_ustar_link(self, name, exc=None):
1909        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1910            t = tarfile.TarInfo("foo")
1911            t.linkname = name
1912            if exc is None:
1913                tar.addfile(t)
1914            else:
1915                self.assertRaises(exc, tar.addfile, t)
1916
1917        if exc is None:
1918            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1919                for t in tar:
1920                    self.assertEqual(name, t.linkname)
1921                    break
1922
1923
1924class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1925
1926    format = tarfile.GNU_FORMAT
1927
1928    def test_bad_pax_header(self):
1929        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1930        # without a hdrcharset=BINARY header.
1931        for encoding, name in (
1932                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1933                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1934            with tarfile.open(tarname, encoding=encoding,
1935                              errors="surrogateescape") as tar:
1936                try:
1937                    t = tar.getmember(name)
1938                except KeyError:
1939                    self.fail("unable to read bad GNU tar pax header")
1940
1941
1942class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1943
1944    format = tarfile.PAX_FORMAT
1945
1946    # PAX_FORMAT ignores encoding in write mode.
1947    test_unicode_filename_error = None
1948
1949    def test_binary_header(self):
1950        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1951        for encoding, name in (
1952                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1953                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1954            with tarfile.open(tarname, encoding=encoding,
1955                              errors="surrogateescape") as tar:
1956                try:
1957                    t = tar.getmember(name)
1958                except KeyError:
1959                    self.fail("unable to read POSIX.1-2008 binary header")
1960
1961
1962class AppendTestBase:
1963    # Test append mode (cp. patch #1652681).
1964
1965    def setUp(self):
1966        self.tarname = tmpname
1967        if os.path.exists(self.tarname):
1968            support.unlink(self.tarname)
1969
1970    def _create_testtar(self, mode="w:"):
1971        with tarfile.open(tarname, encoding="iso8859-1") as src:
1972            t = src.getmember("ustar/regtype")
1973            t.name = "foo"
1974            with src.extractfile(t) as f:
1975                with tarfile.open(self.tarname, mode) as tar:
1976                    tar.addfile(t, f)
1977
1978    def test_append_compressed(self):
1979        self._create_testtar("w:" + self.suffix)
1980        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1981
1982class AppendTest(AppendTestBase, unittest.TestCase):
1983    test_append_compressed = None
1984
1985    def _add_testfile(self, fileobj=None):
1986        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1987            tar.addfile(tarfile.TarInfo("bar"))
1988
1989    def _test(self, names=["bar"], fileobj=None):
1990        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1991            self.assertEqual(tar.getnames(), names)
1992
1993    def test_non_existing(self):
1994        self._add_testfile()
1995        self._test()
1996
1997    def test_empty(self):
1998        tarfile.open(self.tarname, "w:").close()
1999        self._add_testfile()
2000        self._test()
2001
2002    def test_empty_fileobj(self):
2003        fobj = io.BytesIO(b"\0" * 1024)
2004        self._add_testfile(fobj)
2005        fobj.seek(0)
2006        self._test(fileobj=fobj)
2007
2008    def test_fileobj(self):
2009        self._create_testtar()
2010        with open(self.tarname, "rb") as fobj:
2011            data = fobj.read()
2012        fobj = io.BytesIO(data)
2013        self._add_testfile(fobj)
2014        fobj.seek(0)
2015        self._test(names=["foo", "bar"], fileobj=fobj)
2016
2017    def test_existing(self):
2018        self._create_testtar()
2019        self._add_testfile()
2020        self._test(names=["foo", "bar"])
2021
2022    # Append mode is supposed to fail if the tarfile to append to
2023    # does not end with a zero block.
2024    def _test_error(self, data):
2025        with open(self.tarname, "wb") as fobj:
2026            fobj.write(data)
2027        self.assertRaises(tarfile.ReadError, self._add_testfile)
2028
2029    def test_null(self):
2030        self._test_error(b"")
2031
2032    def test_incomplete(self):
2033        self._test_error(b"\0" * 13)
2034
2035    def test_premature_eof(self):
2036        data = tarfile.TarInfo("foo").tobuf()
2037        self._test_error(data)
2038
2039    def test_trailing_garbage(self):
2040        data = tarfile.TarInfo("foo").tobuf()
2041        self._test_error(data + b"\0" * 13)
2042
2043    def test_invalid(self):
2044        self._test_error(b"a" * 512)
2045
2046class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2047    pass
2048
2049class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2050    pass
2051
2052class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2053    pass
2054
2055
2056class LimitsTest(unittest.TestCase):
2057
2058    def test_ustar_limits(self):
2059        # 100 char name
2060        tarinfo = tarfile.TarInfo("0123456789" * 10)
2061        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2062
2063        # 101 char name that cannot be stored
2064        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2065        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2066
2067        # 256 char name with a slash at pos 156
2068        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2069        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2070
2071        # 256 char name that cannot be stored
2072        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2073        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2074
2075        # 512 char name
2076        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2077        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2078
2079        # 512 char linkname
2080        tarinfo = tarfile.TarInfo("longlink")
2081        tarinfo.linkname = "123/" * 126 + "longname"
2082        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2083
2084        # uid > 8 digits
2085        tarinfo = tarfile.TarInfo("name")
2086        tarinfo.uid = 0o10000000
2087        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2088
2089    def test_gnu_limits(self):
2090        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2091        tarinfo.tobuf(tarfile.GNU_FORMAT)
2092
2093        tarinfo = tarfile.TarInfo("longlink")
2094        tarinfo.linkname = "123/" * 126 + "longname"
2095        tarinfo.tobuf(tarfile.GNU_FORMAT)
2096
2097        # uid >= 256 ** 7
2098        tarinfo = tarfile.TarInfo("name")
2099        tarinfo.uid = 0o4000000000000000000
2100        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2101
2102    def test_pax_limits(self):
2103        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2104        tarinfo.tobuf(tarfile.PAX_FORMAT)
2105
2106        tarinfo = tarfile.TarInfo("longlink")
2107        tarinfo.linkname = "123/" * 126 + "longname"
2108        tarinfo.tobuf(tarfile.PAX_FORMAT)
2109
2110        tarinfo = tarfile.TarInfo("name")
2111        tarinfo.uid = 0o4000000000000000000
2112        tarinfo.tobuf(tarfile.PAX_FORMAT)
2113
2114
2115class MiscTest(unittest.TestCase):
2116
2117    def test_char_fields(self):
2118        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2119                         b"foo\0\0\0\0\0")
2120        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2121                         b"foo")
2122        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2123                         "foo")
2124        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2125                         "foo")
2126
2127    def test_read_number_fields(self):
2128        # Issue 13158: Test if GNU tar specific base-256 number fields
2129        # are decoded correctly.
2130        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2131        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2132        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2133                         0o10000000)
2134        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2135                         0xffffffff)
2136        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2137                         -1)
2138        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2139                         -100)
2140        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2141                         -0x100000000000000)
2142
2143        # Issue 24514: Test if empty number fields are converted to zero.
2144        self.assertEqual(tarfile.nti(b"\0"), 0)
2145        self.assertEqual(tarfile.nti(b"       \0"), 0)
2146
2147    def test_write_number_fields(self):
2148        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2149        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2150        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2151                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2152        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2153                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2154        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2155                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2156        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2157                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2158        self.assertEqual(tarfile.itn(-0x100000000000000,
2159                                     format=tarfile.GNU_FORMAT),
2160                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2161
2162        # Issue 32713: Test if itn() supports float values outside the
2163        # non-GNU format range
2164        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2165                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2166        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2167                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2168        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2169
2170    def test_number_field_limits(self):
2171        with self.assertRaises(ValueError):
2172            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2173        with self.assertRaises(ValueError):
2174            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2175        with self.assertRaises(ValueError):
2176            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2177        with self.assertRaises(ValueError):
2178            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2179
2180    def test__all__(self):
2181        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2182                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2183                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2184                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2185                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2186                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2187                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2188                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2189                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2190                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2191                     'filemode',
2192                     'EmptyHeaderError', 'TruncatedHeaderError',
2193                     'EOFHeaderError', 'InvalidHeaderError',
2194                     'SubsequentHeaderError', 'ExFileObject',
2195                     'main'}
2196        support.check__all__(self, tarfile, blacklist=blacklist)
2197
2198
2199class CommandLineTest(unittest.TestCase):
2200
2201    def tarfilecmd(self, *args, **kwargs):
2202        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2203                                                      **kwargs)
2204        return out.replace(os.linesep.encode(), b'\n')
2205
2206    def tarfilecmd_failure(self, *args):
2207        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2208
2209    def make_simple_tarfile(self, tar_name):
2210        files = [support.findfile('tokenize_tests.txt'),
2211                 support.findfile('tokenize_tests-no-coding-cookie-'
2212                                  'and-utf8-bom-sig-only.txt')]
2213        self.addCleanup(support.unlink, tar_name)
2214        with tarfile.open(tar_name, 'w') as tf:
2215            for tardata in files:
2216                tf.add(tardata, arcname=os.path.basename(tardata))
2217
2218    def test_bad_use(self):
2219        rc, out, err = self.tarfilecmd_failure()
2220        self.assertEqual(out, b'')
2221        self.assertIn(b'usage', err.lower())
2222        self.assertIn(b'error', err.lower())
2223        self.assertIn(b'required', err.lower())
2224        rc, out, err = self.tarfilecmd_failure('-l', '')
2225        self.assertEqual(out, b'')
2226        self.assertNotEqual(err.strip(), b'')
2227
2228    def test_test_command(self):
2229        for tar_name in testtarnames:
2230            for opt in '-t', '--test':
2231                out = self.tarfilecmd(opt, tar_name)
2232                self.assertEqual(out, b'')
2233
2234    def test_test_command_verbose(self):
2235        for tar_name in testtarnames:
2236            for opt in '-v', '--verbose':
2237                out = self.tarfilecmd(opt, '-t', tar_name)
2238                self.assertIn(b'is a tar archive.\n', out)
2239
2240    def test_test_command_invalid_file(self):
2241        zipname = support.findfile('zipdir.zip')
2242        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2243        self.assertIn(b' is not a tar archive.', err)
2244        self.assertEqual(out, b'')
2245        self.assertEqual(rc, 1)
2246
2247        for tar_name in testtarnames:
2248            with self.subTest(tar_name=tar_name):
2249                with open(tar_name, 'rb') as f:
2250                    data = f.read()
2251                try:
2252                    with open(tmpname, 'wb') as f:
2253                        f.write(data[:511])
2254                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2255                    self.assertEqual(out, b'')
2256                    self.assertEqual(rc, 1)
2257                finally:
2258                    support.unlink(tmpname)
2259
2260    def test_list_command(self):
2261        for tar_name in testtarnames:
2262            with support.captured_stdout() as t:
2263                with tarfile.open(tar_name, 'r') as tf:
2264                    tf.list(verbose=False)
2265            expected = t.getvalue().encode('ascii', 'backslashreplace')
2266            for opt in '-l', '--list':
2267                out = self.tarfilecmd(opt, tar_name,
2268                                      PYTHONIOENCODING='ascii')
2269                self.assertEqual(out, expected)
2270
2271    def test_list_command_verbose(self):
2272        for tar_name in testtarnames:
2273            with support.captured_stdout() as t:
2274                with tarfile.open(tar_name, 'r') as tf:
2275                    tf.list(verbose=True)
2276            expected = t.getvalue().encode('ascii', 'backslashreplace')
2277            for opt in '-v', '--verbose':
2278                out = self.tarfilecmd(opt, '-l', tar_name,
2279                                      PYTHONIOENCODING='ascii')
2280                self.assertEqual(out, expected)
2281
2282    def test_list_command_invalid_file(self):
2283        zipname = support.findfile('zipdir.zip')
2284        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2285        self.assertIn(b' is not a tar archive.', err)
2286        self.assertEqual(out, b'')
2287        self.assertEqual(rc, 1)
2288
2289    def test_create_command(self):
2290        files = [support.findfile('tokenize_tests.txt'),
2291                 support.findfile('tokenize_tests-no-coding-cookie-'
2292                                  'and-utf8-bom-sig-only.txt')]
2293        for opt in '-c', '--create':
2294            try:
2295                out = self.tarfilecmd(opt, tmpname, *files)
2296                self.assertEqual(out, b'')
2297                with tarfile.open(tmpname) as tar:
2298                    tar.getmembers()
2299            finally:
2300                support.unlink(tmpname)
2301
2302    def test_create_command_verbose(self):
2303        files = [support.findfile('tokenize_tests.txt'),
2304                 support.findfile('tokenize_tests-no-coding-cookie-'
2305                                  'and-utf8-bom-sig-only.txt')]
2306        for opt in '-v', '--verbose':
2307            try:
2308                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2309                self.assertIn(b' file created.', out)
2310                with tarfile.open(tmpname) as tar:
2311                    tar.getmembers()
2312            finally:
2313                support.unlink(tmpname)
2314
2315    def test_create_command_dotless_filename(self):
2316        files = [support.findfile('tokenize_tests.txt')]
2317        try:
2318            out = self.tarfilecmd('-c', dotlessname, *files)
2319            self.assertEqual(out, b'')
2320            with tarfile.open(dotlessname) as tar:
2321                tar.getmembers()
2322        finally:
2323            support.unlink(dotlessname)
2324
2325    def test_create_command_dot_started_filename(self):
2326        tar_name = os.path.join(TEMPDIR, ".testtar")
2327        files = [support.findfile('tokenize_tests.txt')]
2328        try:
2329            out = self.tarfilecmd('-c', tar_name, *files)
2330            self.assertEqual(out, b'')
2331            with tarfile.open(tar_name) as tar:
2332                tar.getmembers()
2333        finally:
2334            support.unlink(tar_name)
2335
2336    def test_create_command_compressed(self):
2337        files = [support.findfile('tokenize_tests.txt'),
2338                 support.findfile('tokenize_tests-no-coding-cookie-'
2339                                  'and-utf8-bom-sig-only.txt')]
2340        for filetype in (GzipTest, Bz2Test, LzmaTest):
2341            if not filetype.open:
2342                continue
2343            try:
2344                tar_name = tmpname + '.' + filetype.suffix
2345                out = self.tarfilecmd('-c', tar_name, *files)
2346                with filetype.taropen(tar_name) as tar:
2347                    tar.getmembers()
2348            finally:
2349                support.unlink(tar_name)
2350
2351    def test_extract_command(self):
2352        self.make_simple_tarfile(tmpname)
2353        for opt in '-e', '--extract':
2354            try:
2355                with support.temp_cwd(tarextdir):
2356                    out = self.tarfilecmd(opt, tmpname)
2357                self.assertEqual(out, b'')
2358            finally:
2359                support.rmtree(tarextdir)
2360
2361    def test_extract_command_verbose(self):
2362        self.make_simple_tarfile(tmpname)
2363        for opt in '-v', '--verbose':
2364            try:
2365                with support.temp_cwd(tarextdir):
2366                    out = self.tarfilecmd(opt, '-e', tmpname)
2367                self.assertIn(b' file is extracted.', out)
2368            finally:
2369                support.rmtree(tarextdir)
2370
2371    def test_extract_command_different_directory(self):
2372        self.make_simple_tarfile(tmpname)
2373        try:
2374            with support.temp_cwd(tarextdir):
2375                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2376            self.assertEqual(out, b'')
2377        finally:
2378            support.rmtree(tarextdir)
2379
2380    def test_extract_command_invalid_file(self):
2381        zipname = support.findfile('zipdir.zip')
2382        with support.temp_cwd(tarextdir):
2383            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2384        self.assertIn(b' is not a tar archive.', err)
2385        self.assertEqual(out, b'')
2386        self.assertEqual(rc, 1)
2387
2388
2389class ContextManagerTest(unittest.TestCase):
2390
2391    def test_basic(self):
2392        with tarfile.open(tarname) as tar:
2393            self.assertFalse(tar.closed, "closed inside runtime context")
2394        self.assertTrue(tar.closed, "context manager failed")
2395
2396    def test_closed(self):
2397        # The __enter__() method is supposed to raise OSError
2398        # if the TarFile object is already closed.
2399        tar = tarfile.open(tarname)
2400        tar.close()
2401        with self.assertRaises(OSError):
2402            with tar:
2403                pass
2404
2405    def test_exception(self):
2406        # Test if the OSError exception is passed through properly.
2407        with self.assertRaises(Exception) as exc:
2408            with tarfile.open(tarname) as tar:
2409                raise OSError
2410        self.assertIsInstance(exc.exception, OSError,
2411                              "wrong exception raised in context manager")
2412        self.assertTrue(tar.closed, "context manager failed")
2413
2414    def test_no_eof(self):
2415        # __exit__() must not write end-of-archive blocks if an
2416        # exception was raised.
2417        try:
2418            with tarfile.open(tmpname, "w") as tar:
2419                raise Exception
2420        except:
2421            pass
2422        self.assertEqual(os.path.getsize(tmpname), 0,
2423                "context manager wrote an end-of-archive block")
2424        self.assertTrue(tar.closed, "context manager failed")
2425
2426    def test_eof(self):
2427        # __exit__() must write end-of-archive blocks, i.e. call
2428        # TarFile.close() if there was no error.
2429        with tarfile.open(tmpname, "w"):
2430            pass
2431        self.assertNotEqual(os.path.getsize(tmpname), 0,
2432                "context manager wrote no end-of-archive block")
2433
2434    def test_fileobj(self):
2435        # Test that __exit__() did not close the external file
2436        # object.
2437        with open(tmpname, "wb") as fobj:
2438            try:
2439                with tarfile.open(fileobj=fobj, mode="w") as tar:
2440                    raise Exception
2441            except:
2442                pass
2443            self.assertFalse(fobj.closed, "external file object was closed")
2444            self.assertTrue(tar.closed, "context manager failed")
2445
2446
2447@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2448class LinkEmulationTest(ReadTest, unittest.TestCase):
2449
2450    # Test for issue #8741 regression. On platforms that do not support
2451    # symbolic or hard links tarfile tries to extract these types of members
2452    # as the regular files they point to.
2453    def _test_link_extraction(self, name):
2454        self.tar.extract(name, TEMPDIR)
2455        with open(os.path.join(TEMPDIR, name), "rb") as f:
2456            data = f.read()
2457        self.assertEqual(sha256sum(data), sha256_regtype)
2458
2459    # See issues #1578269, #8879, and #17689 for some history on these skips
2460    @unittest.skipIf(hasattr(os.path, "islink"),
2461                     "Skip emulation - has os.path.islink but not os.link")
2462    def test_hardlink_extraction1(self):
2463        self._test_link_extraction("ustar/lnktype")
2464
2465    @unittest.skipIf(hasattr(os.path, "islink"),
2466                     "Skip emulation - has os.path.islink but not os.link")
2467    def test_hardlink_extraction2(self):
2468        self._test_link_extraction("./ustar/linktest2/lnktype")
2469
2470    @unittest.skipIf(hasattr(os, "symlink"),
2471                     "Skip emulation if symlink exists")
2472    def test_symlink_extraction1(self):
2473        self._test_link_extraction("ustar/symtype")
2474
2475    @unittest.skipIf(hasattr(os, "symlink"),
2476                     "Skip emulation if symlink exists")
2477    def test_symlink_extraction2(self):
2478        self._test_link_extraction("./ustar/linktest2/symtype")
2479
2480
2481class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2482    # Issue5068: The _BZ2Proxy.read() method loops forever
2483    # on an empty or partial bzipped file.
2484
2485    def _test_partial_input(self, mode):
2486        class MyBytesIO(io.BytesIO):
2487            hit_eof = False
2488            def read(self, n):
2489                if self.hit_eof:
2490                    raise AssertionError("infinite loop detected in "
2491                                         "tarfile.open()")
2492                self.hit_eof = self.tell() == len(self.getvalue())
2493                return super(MyBytesIO, self).read(n)
2494            def seek(self, *args):
2495                self.hit_eof = False
2496                return super(MyBytesIO, self).seek(*args)
2497
2498        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2499        for x in range(len(data) + 1):
2500            try:
2501                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2502            except tarfile.ReadError:
2503                pass # we have no interest in ReadErrors
2504
2505    def test_partial_input(self):
2506        self._test_partial_input("r")
2507
2508    def test_partial_input_bz2(self):
2509        self._test_partial_input("r:bz2")
2510
2511
2512def root_is_uid_gid_0():
2513    try:
2514        import pwd, grp
2515    except ImportError:
2516        return False
2517    if pwd.getpwuid(0)[0] != 'root':
2518        return False
2519    if grp.getgrgid(0)[0] != 'root':
2520        return False
2521    return True
2522
2523
2524@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2525@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2526class NumericOwnerTest(unittest.TestCase):
2527    # mock the following:
2528    #  os.chown: so we can test what's being called
2529    #  os.chmod: so the modes are not actually changed. if they are, we can't
2530    #             delete the files/directories
2531    #  os.geteuid: so we can lie and say we're root (uid = 0)
2532
2533    @staticmethod
2534    def _make_test_archive(filename_1, dirname_1, filename_2):
2535        # the file contents to write
2536        fobj = io.BytesIO(b"content")
2537
2538        # create a tar file with a file, a directory, and a file within that
2539        #  directory. Assign various .uid/.gid values to them
2540        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2541                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2542                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2543                 ]
2544        with tarfile.open(tmpname, 'w') as tarfl:
2545            for name, uid, gid, typ, contents in items:
2546                t = tarfile.TarInfo(name)
2547                t.uid = uid
2548                t.gid = gid
2549                t.uname = 'root'
2550                t.gname = 'root'
2551                t.type = typ
2552                tarfl.addfile(t, contents)
2553
2554        # return the full pathname to the tar file
2555        return tmpname
2556
2557    @staticmethod
2558    @contextmanager
2559    def _setup_test(mock_geteuid):
2560        mock_geteuid.return_value = 0  # lie and say we're root
2561        fname = 'numeric-owner-testfile'
2562        dirname = 'dir'
2563
2564        # the names we want stored in the tarfile
2565        filename_1 = fname
2566        dirname_1 = dirname
2567        filename_2 = os.path.join(dirname, fname)
2568
2569        # create the tarfile with the contents we're after
2570        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2571                                                           dirname_1,
2572                                                           filename_2)
2573
2574        # open the tarfile for reading. yield it and the names of the items
2575        #  we stored into the file
2576        with tarfile.open(tar_filename) as tarfl:
2577            yield tarfl, filename_1, dirname_1, filename_2
2578
2579    @unittest.mock.patch('os.chown')
2580    @unittest.mock.patch('os.chmod')
2581    @unittest.mock.patch('os.geteuid')
2582    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2583                                        mock_chown):
2584        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2585                                                filename_2):
2586            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2587            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2588
2589        # convert to filesystem paths
2590        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2591        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2592
2593        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2594                                     unittest.mock.call(f_filename_2, 88, 87),
2595                                     ],
2596                                    any_order=True)
2597
2598    @unittest.mock.patch('os.chown')
2599    @unittest.mock.patch('os.chmod')
2600    @unittest.mock.patch('os.geteuid')
2601    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2602                                           mock_chown):
2603        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2604                                                filename_2):
2605            tarfl.extractall(TEMPDIR, numeric_owner=True)
2606
2607        # convert to filesystem paths
2608        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2609        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2610        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2611
2612        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2613                                     unittest.mock.call(f_dirname_1, 77, 76),
2614                                     unittest.mock.call(f_filename_2, 88, 87),
2615                                     ],
2616                                    any_order=True)
2617
2618    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2619    #  because the uname and gname in the test file are 'root', and extract()
2620    #  will look them up using pwd and grp to find their uid and gid, which we
2621    #  test here to be 0.
2622    @unittest.skipUnless(root_is_uid_gid_0(),
2623                         'uid=0,gid=0 must be named "root"')
2624    @unittest.mock.patch('os.chown')
2625    @unittest.mock.patch('os.chmod')
2626    @unittest.mock.patch('os.geteuid')
2627    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2628                                           mock_chown):
2629        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2630            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2631
2632        # convert to filesystem paths
2633        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2634
2635        mock_chown.assert_called_with(f_filename_1, 0, 0)
2636
2637    @unittest.mock.patch('os.geteuid')
2638    def test_keyword_only(self, mock_geteuid):
2639        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2640            self.assertRaises(TypeError,
2641                              tarfl.extract, filename_1, TEMPDIR, False, True)
2642
2643
2644def setUpModule():
2645    support.unlink(TEMPDIR)
2646    os.makedirs(TEMPDIR)
2647
2648    global testtarnames
2649    testtarnames = [tarname]
2650    with open(tarname, "rb") as fobj:
2651        data = fobj.read()
2652
2653    # Create compressed tarfiles.
2654    for c in GzipTest, Bz2Test, LzmaTest:
2655        if c.open:
2656            support.unlink(c.tarname)
2657            testtarnames.append(c.tarname)
2658            with c.open(c.tarname, "wb") as tar:
2659                tar.write(data)
2660
2661def tearDownModule():
2662    if os.path.exists(TEMPDIR):
2663        support.rmtree(TEMPDIR)
2664
2665if __name__ == "__main__":
2666    unittest.main()
2667