• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8import shutil
9import re
10import warnings
11import stat
12
13import unittest
14import unittest.mock
15import tarfile
16
17from test import support
18from test.support import os_helper
19from test.support import script_helper
20from test.support import warnings_helper
21
22# Check for our compression modules.
23try:
24    import gzip
25except ImportError:
26    gzip = None
27try:
28    import zlib
29except ImportError:
30    zlib = None
31try:
32    import bz2
33except ImportError:
34    bz2 = None
35try:
36    import lzma
37except ImportError:
38    lzma = None
39
40def sha256sum(data):
41    return sha256(data).hexdigest()
42
43TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
44tarextdir = TEMPDIR + '-extract-test'
45tarname = support.findfile("testtar.tar")
46gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
47bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
48xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
49tmpname = os.path.join(TEMPDIR, "tmp.tar")
50dotlessname = os.path.join(TEMPDIR, "testtar")
51SPACE = b" "
52
53sha256_regtype = (
54    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
55)
56sha256_sparse = (
57    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
58)
59
60
61class TarTest:
62    tarname = tarname
63    suffix = ''
64    open = io.FileIO
65    taropen = tarfile.TarFile.taropen
66
67    @property
68    def mode(self):
69        return self.prefix + self.suffix
70
71@support.requires_gzip()
72class GzipTest:
73    tarname = gzipname
74    suffix = 'gz'
75    open = gzip.GzipFile if gzip else None
76    taropen = tarfile.TarFile.gzopen
77
78@support.requires_bz2()
79class Bz2Test:
80    tarname = bz2name
81    suffix = 'bz2'
82    open = bz2.BZ2File if bz2 else None
83    taropen = tarfile.TarFile.bz2open
84
85@support.requires_lzma()
86class LzmaTest:
87    tarname = xzname
88    suffix = 'xz'
89    open = lzma.LZMAFile if lzma else None
90    taropen = tarfile.TarFile.xzopen
91
92
93class ReadTest(TarTest):
94
95    prefix = "r:"
96
97    def setUp(self):
98        self.tar = tarfile.open(self.tarname, mode=self.mode,
99                                encoding="iso8859-1")
100
101    def tearDown(self):
102        self.tar.close()
103
104
105class UstarReadTest(ReadTest, unittest.TestCase):
106
107    def test_fileobj_regular_file(self):
108        tarinfo = self.tar.getmember("ustar/regtype")
109        with self.tar.extractfile(tarinfo) as fobj:
110            data = fobj.read()
111            self.assertEqual(len(data), tarinfo.size,
112                    "regular file extraction failed")
113            self.assertEqual(sha256sum(data), sha256_regtype,
114                    "regular file extraction failed")
115
116    def test_fileobj_readlines(self):
117        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
118        tarinfo = self.tar.getmember("ustar/regtype")
119        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
120            lines1 = fobj1.readlines()
121
122        with self.tar.extractfile(tarinfo) as fobj:
123            fobj2 = io.TextIOWrapper(fobj)
124            lines2 = fobj2.readlines()
125            self.assertEqual(lines1, lines2,
126                    "fileobj.readlines() failed")
127            self.assertEqual(len(lines2), 114,
128                    "fileobj.readlines() failed")
129            self.assertEqual(lines2[83],
130                    "I will gladly admit that Python is not the fastest "
131                    "running scripting language.\n",
132                    "fileobj.readlines() failed")
133
134    def test_fileobj_iter(self):
135        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
136        tarinfo = self.tar.getmember("ustar/regtype")
137        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
138            lines1 = fobj1.readlines()
139        with self.tar.extractfile(tarinfo) as fobj2:
140            lines2 = list(io.TextIOWrapper(fobj2))
141            self.assertEqual(lines1, lines2,
142                    "fileobj.__iter__() failed")
143
144    def test_fileobj_seek(self):
145        self.tar.extract("ustar/regtype", TEMPDIR,
146                         filter='data')
147        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
148            data = fobj.read()
149
150        tarinfo = self.tar.getmember("ustar/regtype")
151        with self.tar.extractfile(tarinfo) as fobj:
152            text = fobj.read()
153            fobj.seek(0)
154            self.assertEqual(0, fobj.tell(),
155                         "seek() to file's start failed")
156            fobj.seek(2048, 0)
157            self.assertEqual(2048, fobj.tell(),
158                         "seek() to absolute position failed")
159            fobj.seek(-1024, 1)
160            self.assertEqual(1024, fobj.tell(),
161                         "seek() to negative relative position failed")
162            fobj.seek(1024, 1)
163            self.assertEqual(2048, fobj.tell(),
164                         "seek() to positive relative position failed")
165            s = fobj.read(10)
166            self.assertEqual(s, data[2048:2058],
167                         "read() after seek failed")
168            fobj.seek(0, 2)
169            self.assertEqual(tarinfo.size, fobj.tell(),
170                         "seek() to file's end failed")
171            self.assertEqual(fobj.read(), b"",
172                         "read() at file's end did not return empty string")
173            fobj.seek(-tarinfo.size, 2)
174            self.assertEqual(0, fobj.tell(),
175                         "relative seek() to file's end failed")
176            fobj.seek(512)
177            s1 = fobj.readlines()
178            fobj.seek(512)
179            s2 = fobj.readlines()
180            self.assertEqual(s1, s2,
181                         "readlines() after seek failed")
182            fobj.seek(0)
183            self.assertEqual(len(fobj.readline()), fobj.tell(),
184                         "tell() after readline() failed")
185            fobj.seek(512)
186            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
187                         "tell() after seek() and readline() failed")
188            fobj.seek(0)
189            line = fobj.readline()
190            self.assertEqual(fobj.read(), data[len(line):],
191                         "read() after readline() failed")
192
193    def test_fileobj_text(self):
194        with self.tar.extractfile("ustar/regtype") as fobj:
195            fobj = io.TextIOWrapper(fobj)
196            data = fobj.read().encode("iso8859-1")
197            self.assertEqual(sha256sum(data), sha256_regtype)
198            try:
199                fobj.seek(100)
200            except AttributeError:
201                # Issue #13815: seek() complained about a missing
202                # flush() method.
203                self.fail("seeking failed in text mode")
204
205    # Test if symbolic and hard links are resolved by extractfile().  The
206    # test link members each point to a regular member whose data is
207    # supposed to be exported.
208    def _test_fileobj_link(self, lnktype, regtype):
209        with self.tar.extractfile(lnktype) as a, \
210             self.tar.extractfile(regtype) as b:
211            self.assertEqual(a.name, b.name)
212
213    def test_fileobj_link1(self):
214        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
215
216    def test_fileobj_link2(self):
217        self._test_fileobj_link("./ustar/linktest2/lnktype",
218                                "ustar/linktest1/regtype")
219
220    def test_fileobj_symlink1(self):
221        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
222
223    def test_fileobj_symlink2(self):
224        self._test_fileobj_link("./ustar/linktest2/symtype",
225                                "ustar/linktest1/regtype")
226
227    def test_issue14160(self):
228        self._test_fileobj_link("symtype2", "ustar/regtype")
229
230    def test_add_dir_getmember(self):
231        # bpo-21987
232        self.add_dir_and_getmember('bar')
233        self.add_dir_and_getmember('a'*101)
234
235    @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"),
236                         "Missing getuid or getgid implementation")
237    def add_dir_and_getmember(self, name):
238        def filter(tarinfo):
239            tarinfo.uid = tarinfo.gid = 100
240            return tarinfo
241
242        with os_helper.temp_cwd():
243            with tarfile.open(tmpname, 'w') as tar:
244                tar.format = tarfile.USTAR_FORMAT
245                try:
246                    os.mkdir(name)
247                    tar.add(name, filter=filter)
248                finally:
249                    os.rmdir(name)
250            with tarfile.open(tmpname) as tar:
251                self.assertEqual(
252                    tar.getmember(name),
253                    tar.getmember(name + '/')
254                )
255
256class GzipUstarReadTest(GzipTest, UstarReadTest):
257    pass
258
259class Bz2UstarReadTest(Bz2Test, UstarReadTest):
260    pass
261
262class LzmaUstarReadTest(LzmaTest, UstarReadTest):
263    pass
264
265
266class ListTest(ReadTest, unittest.TestCase):
267
268    # Override setUp to use default encoding (UTF-8)
269    def setUp(self):
270        self.tar = tarfile.open(self.tarname, mode=self.mode)
271
272    def test_list(self):
273        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
274        with support.swap_attr(sys, 'stdout', tio):
275            self.tar.list(verbose=False)
276        out = tio.detach().getvalue()
277        self.assertIn(b'ustar/conttype', out)
278        self.assertIn(b'ustar/regtype', out)
279        self.assertIn(b'ustar/lnktype', out)
280        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
281        self.assertIn(b'./ustar/linktest2/symtype', out)
282        self.assertIn(b'./ustar/linktest2/lnktype', out)
283        # Make sure it puts trailing slash for directory
284        self.assertIn(b'ustar/dirtype/', out)
285        self.assertIn(b'ustar/dirtype-with-size/', out)
286        # Make sure it is able to print unencodable characters
287        def conv(b):
288            s = b.decode(self.tar.encoding, 'surrogateescape')
289            return s.encode('ascii', 'backslashreplace')
290        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
291        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
292                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
293        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
294                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
295        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
296        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
297        # Make sure it prints files separated by one newline without any
298        # 'ls -l'-like accessories if verbose flag is not being used
299        # ...
300        # ustar/conttype
301        # ustar/regtype
302        # ...
303        self.assertRegex(out, br'ustar/conttype ?\r?\n'
304                              br'ustar/regtype ?\r?\n')
305        # Make sure it does not print the source of link without verbose flag
306        self.assertNotIn(b'link to', out)
307        self.assertNotIn(b'->', out)
308
309    def test_list_verbose(self):
310        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
311        with support.swap_attr(sys, 'stdout', tio):
312            self.tar.list(verbose=True)
313        out = tio.detach().getvalue()
314        # Make sure it prints files separated by one newline with 'ls -l'-like
315        # accessories if verbose flag is being used
316        # ...
317        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
318        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
319        # ...
320        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
321                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
322                               br'ustar/\w+type ?\r?\n') * 2)
323        # Make sure it prints the source of link with verbose flag
324        self.assertIn(b'ustar/symtype -> regtype', out)
325        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
326        self.assertIn(b'./ustar/linktest2/lnktype link to '
327                      b'./ustar/linktest1/regtype', out)
328        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
329                      (b'/123' * 125) + b'/longname', out)
330        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
331                      (b'/123' * 125) + b'/longname', out)
332
333    def test_list_members(self):
334        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
335        def members(tar):
336            for tarinfo in tar.getmembers():
337                if 'reg' in tarinfo.name:
338                    yield tarinfo
339        with support.swap_attr(sys, 'stdout', tio):
340            self.tar.list(verbose=False, members=members(self.tar))
341        out = tio.detach().getvalue()
342        self.assertIn(b'ustar/regtype', out)
343        self.assertNotIn(b'ustar/conttype', out)
344
345
346class GzipListTest(GzipTest, ListTest):
347    pass
348
349
350class Bz2ListTest(Bz2Test, ListTest):
351    pass
352
353
354class LzmaListTest(LzmaTest, ListTest):
355    pass
356
357
358class CommonReadTest(ReadTest):
359
360    def test_is_tarfile_erroneous(self):
361        with open(tmpname, "wb"):
362            pass
363
364        # is_tarfile works on filenames
365        self.assertFalse(tarfile.is_tarfile(tmpname))
366
367        # is_tarfile works on path-like objects
368        self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname)))
369
370        # is_tarfile works on file objects
371        with open(tmpname, "rb") as fobj:
372            self.assertFalse(tarfile.is_tarfile(fobj))
373
374        # is_tarfile works on file-like objects
375        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
376
377    def test_is_tarfile_valid(self):
378        # is_tarfile works on filenames
379        self.assertTrue(tarfile.is_tarfile(self.tarname))
380
381        # is_tarfile works on path-like objects
382        self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname)))
383
384        # is_tarfile works on file objects
385        with open(self.tarname, "rb") as fobj:
386            self.assertTrue(tarfile.is_tarfile(fobj))
387
388        # is_tarfile works on file-like objects
389        with open(self.tarname, "rb") as fobj:
390            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
391
392    def test_is_tarfile_keeps_position(self):
393        # Test for issue44289: tarfile.is_tarfile() modifies
394        # file object's current position
395        with open(self.tarname, "rb") as fobj:
396            tarfile.is_tarfile(fobj)
397            self.assertEqual(fobj.tell(), 0)
398
399        with open(self.tarname, "rb") as fobj:
400            file_like = io.BytesIO(fobj.read())
401            tarfile.is_tarfile(file_like)
402            self.assertEqual(file_like.tell(), 0)
403
404    def test_empty_tarfile(self):
405        # Test for issue6123: Allow opening empty archives.
406        # This test checks if tarfile.open() is able to open an empty tar
407        # archive successfully. Note that an empty tar archive is not the
408        # same as an empty file!
409        with tarfile.open(tmpname, self.mode.replace("r", "w")):
410            pass
411        try:
412            tar = tarfile.open(tmpname, self.mode)
413            tar.getnames()
414        except tarfile.ReadError:
415            self.fail("tarfile.open() failed on empty archive")
416        else:
417            self.assertListEqual(tar.getmembers(), [])
418        finally:
419            tar.close()
420
421    def test_non_existent_tarfile(self):
422        # Test for issue11513: prevent non-existent gzipped tarfiles raising
423        # multiple exceptions.
424        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
425            tarfile.open("xxx", self.mode)
426
427    def test_null_tarfile(self):
428        # Test for issue6123: Allow opening empty archives.
429        # This test guarantees that tarfile.open() does not treat an empty
430        # file as an empty tar archive.
431        with open(tmpname, "wb"):
432            pass
433        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
434        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
435
436    def test_ignore_zeros(self):
437        # Test TarFile's ignore_zeros option.
438        # generate 512 pseudorandom bytes
439        data = Random(0).randbytes(512)
440        for char in (b'\0', b'a'):
441            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
442            # are ignored correctly.
443            with self.open(tmpname, "w") as fobj:
444                fobj.write(char * 1024)
445                tarinfo = tarfile.TarInfo("foo")
446                tarinfo.size = len(data)
447                fobj.write(tarinfo.tobuf())
448                fobj.write(data)
449
450            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
451            try:
452                self.assertListEqual(tar.getnames(), ["foo"],
453                    "ignore_zeros=True should have skipped the %r-blocks" %
454                    char)
455            finally:
456                tar.close()
457
458    def test_premature_end_of_archive(self):
459        for size in (512, 600, 1024, 1200):
460            with tarfile.open(tmpname, "w:") as tar:
461                t = tarfile.TarInfo("foo")
462                t.size = 1024
463                tar.addfile(t, io.BytesIO(b"a" * 1024))
464
465            with open(tmpname, "r+b") as fobj:
466                fobj.truncate(size)
467
468            with tarfile.open(tmpname) as tar:
469                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
470                    for t in tar:
471                        pass
472
473            with tarfile.open(tmpname) as tar:
474                t = tar.next()
475
476                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
477                    tar.extract(t, TEMPDIR, filter='data')
478
479                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
480                    tar.extractfile(t).read()
481
482    def test_length_zero_header(self):
483        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
484        # with an exception
485        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
486            with tarfile.open(support.findfile('recursion.tar')) as tar:
487                pass
488
489class MiscReadTestBase(CommonReadTest):
490    def requires_name_attribute(self):
491        pass
492
493    def test_no_name_argument(self):
494        self.requires_name_attribute()
495        with open(self.tarname, "rb") as fobj:
496            self.assertIsInstance(fobj.name, str)
497            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
498                self.assertIsInstance(tar.name, str)
499                self.assertEqual(tar.name, os.path.abspath(fobj.name))
500
501    def test_no_name_attribute(self):
502        with open(self.tarname, "rb") as fobj:
503            data = fobj.read()
504        fobj = io.BytesIO(data)
505        self.assertRaises(AttributeError, getattr, fobj, "name")
506        tar = tarfile.open(fileobj=fobj, mode=self.mode)
507        self.assertIsNone(tar.name)
508
509    def test_empty_name_attribute(self):
510        with open(self.tarname, "rb") as fobj:
511            data = fobj.read()
512        fobj = io.BytesIO(data)
513        fobj.name = ""
514        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
515            self.assertIsNone(tar.name)
516
517    def test_int_name_attribute(self):
518        # Issue 21044: tarfile.open() should handle fileobj with an integer
519        # 'name' attribute.
520        fd = os.open(self.tarname, os.O_RDONLY)
521        with open(fd, 'rb') as fobj:
522            self.assertIsInstance(fobj.name, int)
523            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
524                self.assertIsNone(tar.name)
525
526    def test_bytes_name_attribute(self):
527        self.requires_name_attribute()
528        tarname = os.fsencode(self.tarname)
529        with open(tarname, 'rb') as fobj:
530            self.assertIsInstance(fobj.name, bytes)
531            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
532                self.assertIsInstance(tar.name, bytes)
533                self.assertEqual(tar.name, os.path.abspath(fobj.name))
534
535    def test_pathlike_name(self):
536        tarname = pathlib.Path(self.tarname)
537        with tarfile.open(tarname, mode=self.mode) as tar:
538            self.assertIsInstance(tar.name, str)
539            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
540        with self.taropen(tarname) as tar:
541            self.assertIsInstance(tar.name, str)
542            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
543        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
544            self.assertIsInstance(tar.name, str)
545            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
546        if self.suffix == '':
547            with tarfile.TarFile(tarname, mode='r') as tar:
548                self.assertIsInstance(tar.name, str)
549                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
550
551    def test_illegal_mode_arg(self):
552        with open(tmpname, 'wb'):
553            pass
554        with self.assertRaisesRegex(ValueError, 'mode must be '):
555            tar = self.taropen(tmpname, 'q')
556        with self.assertRaisesRegex(ValueError, 'mode must be '):
557            tar = self.taropen(tmpname, 'rw')
558        with self.assertRaisesRegex(ValueError, 'mode must be '):
559            tar = self.taropen(tmpname, '')
560
561    def test_fileobj_with_offset(self):
562        # Skip the first member and store values from the second member
563        # of the testtar.
564        tar = tarfile.open(self.tarname, mode=self.mode)
565        try:
566            tar.next()
567            t = tar.next()
568            name = t.name
569            offset = t.offset
570            with tar.extractfile(t) as f:
571                data = f.read()
572        finally:
573            tar.close()
574
575        # Open the testtar and seek to the offset of the second member.
576        with self.open(self.tarname) as fobj:
577            fobj.seek(offset)
578
579            # Test if the tarfile starts with the second member.
580            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
581                t = tar.next()
582                self.assertEqual(t.name, name)
583                # Read to the end of fileobj and test if seeking back to the
584                # beginning works.
585                tar.getmembers()
586                self.assertEqual(tar.extractfile(t).read(), data,
587                        "seek back did not work")
588
589    def test_fail_comp(self):
590        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
591        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
592        with open(tarname, "rb") as fobj:
593            self.assertRaises(tarfile.ReadError, tarfile.open,
594                              fileobj=fobj, mode=self.mode)
595
596    def test_v7_dirtype(self):
597        # Test old style dirtype member (bug #1336623):
598        # Old V7 tars create directory members using an AREGTYPE
599        # header with a "/" appended to the filename field.
600        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
601        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
602                "v7 dirtype failed")
603
604    def test_xstar_type(self):
605        # The xstar format stores extra atime and ctime fields inside the
606        # space reserved for the prefix field. The prefix field must be
607        # ignored in this case, otherwise it will mess up the name.
608        try:
609            self.tar.getmember("misc/regtype-xstar")
610        except KeyError:
611            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
612
613    def test_check_members(self):
614        for tarinfo in self.tar:
615            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
616                    "wrong mtime for %s" % tarinfo.name)
617            if not tarinfo.name.startswith("ustar/"):
618                continue
619            self.assertEqual(tarinfo.uname, "tarfile",
620                    "wrong uname for %s" % tarinfo.name)
621
622    def test_find_members(self):
623        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
624                "could not find all members")
625
626    @unittest.skipUnless(hasattr(os, "link"),
627                         "Missing hardlink implementation")
628    @os_helper.skip_unless_symlink
629    def test_extract_hardlink(self):
630        # Test hardlink extraction (e.g. bug #857297).
631        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
632            tar.extract("ustar/regtype", TEMPDIR, filter='data')
633            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
634
635            tar.extract("ustar/lnktype", TEMPDIR, filter='data')
636            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
637            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
638                data = f.read()
639            self.assertEqual(sha256sum(data), sha256_regtype)
640
641            tar.extract("ustar/symtype", TEMPDIR, filter='data')
642            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
643            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
644                data = f.read()
645            self.assertEqual(sha256sum(data), sha256_regtype)
646
647    @os_helper.skip_unless_working_chmod
648    def test_extractall(self):
649        # Test if extractall() correctly restores directory permissions
650        # and times (see issue1735).
651        tar = tarfile.open(tarname, encoding="iso8859-1")
652        DIR = os.path.join(TEMPDIR, "extractall")
653        os.mkdir(DIR)
654        try:
655            directories = [t for t in tar if t.isdir()]
656            tar.extractall(DIR, directories, filter='fully_trusted')
657            for tarinfo in directories:
658                path = os.path.join(DIR, tarinfo.name)
659                if sys.platform != "win32":
660                    # Win32 has no support for fine grained permissions.
661                    self.assertEqual(tarinfo.mode & 0o777,
662                                     os.stat(path).st_mode & 0o777,
663                                     tarinfo.name)
664                def format_mtime(mtime):
665                    if isinstance(mtime, float):
666                        return "{} ({})".format(mtime, mtime.hex())
667                    else:
668                        return "{!r} (int)".format(mtime)
669                file_mtime = os.path.getmtime(path)
670                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
671                    format_mtime(tarinfo.mtime),
672                    format_mtime(file_mtime),
673                    path)
674                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
675        finally:
676            tar.close()
677            os_helper.rmtree(DIR)
678
679    @os_helper.skip_unless_working_chmod
680    def test_extract_directory(self):
681        dirtype = "ustar/dirtype"
682        DIR = os.path.join(TEMPDIR, "extractdir")
683        os.mkdir(DIR)
684        try:
685            with tarfile.open(tarname, encoding="iso8859-1") as tar:
686                tarinfo = tar.getmember(dirtype)
687                tar.extract(tarinfo, path=DIR, filter='fully_trusted')
688                extracted = os.path.join(DIR, dirtype)
689                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
690                if sys.platform != "win32":
691                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
692        finally:
693            os_helper.rmtree(DIR)
694
695    def test_extractall_pathlike_name(self):
696        DIR = pathlib.Path(TEMPDIR) / "extractall"
697        with os_helper.temp_dir(DIR), \
698             tarfile.open(tarname, encoding="iso8859-1") as tar:
699            directories = [t for t in tar if t.isdir()]
700            tar.extractall(DIR, directories, filter='fully_trusted')
701            for tarinfo in directories:
702                path = DIR / tarinfo.name
703                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
704
705    def test_extract_pathlike_name(self):
706        dirtype = "ustar/dirtype"
707        DIR = pathlib.Path(TEMPDIR) / "extractall"
708        with os_helper.temp_dir(DIR), \
709             tarfile.open(tarname, encoding="iso8859-1") as tar:
710            tarinfo = tar.getmember(dirtype)
711            tar.extract(tarinfo, path=DIR, filter='fully_trusted')
712            extracted = DIR / dirtype
713            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
714
715    def test_init_close_fobj(self):
716        # Issue #7341: Close the internal file object in the TarFile
717        # constructor in case of an error. For the test we rely on
718        # the fact that opening an empty file raises a ReadError.
719        empty = os.path.join(TEMPDIR, "empty")
720        with open(empty, "wb") as fobj:
721            fobj.write(b"")
722
723        try:
724            tar = object.__new__(tarfile.TarFile)
725            try:
726                tar.__init__(empty)
727            except tarfile.ReadError:
728                self.assertTrue(tar.fileobj.closed)
729            else:
730                self.fail("ReadError not raised")
731        finally:
732            os_helper.unlink(empty)
733
734    def test_parallel_iteration(self):
735        # Issue #16601: Restarting iteration over tarfile continued
736        # from where it left off.
737        with tarfile.open(self.tarname) as tar:
738            for m1, m2 in zip(tar, tar):
739                self.assertEqual(m1.offset, m2.offset)
740                self.assertEqual(m1.get_info(), m2.get_info())
741
742    @unittest.skipIf(zlib is None, "requires zlib")
743    def test_zlib_error_does_not_leak(self):
744        # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when
745        # parsing certain types of invalid data
746        with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock:
747            mock.side_effect = zlib.error
748            with self.assertRaises(tarfile.ReadError):
749                tarfile.open(self.tarname)
750
751    def test_next_on_empty_tarfile(self):
752        fd = io.BytesIO()
753        tf = tarfile.open(fileobj=fd, mode="w")
754        tf.close()
755
756        fd.seek(0)
757        with tarfile.open(fileobj=fd, mode="r|") as tf:
758            self.assertEqual(tf.next(), None)
759
760        fd.seek(0)
761        with tarfile.open(fileobj=fd, mode="r") as tf:
762            self.assertEqual(tf.next(), None)
763
764class MiscReadTest(MiscReadTestBase, unittest.TestCase):
765    test_fail_comp = None
766
767class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
768    pass
769
770class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
771    def requires_name_attribute(self):
772        self.skipTest("BZ2File have no name attribute")
773
774class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
775    def requires_name_attribute(self):
776        self.skipTest("LZMAFile have no name attribute")
777
778
779class StreamReadTest(CommonReadTest, unittest.TestCase):
780
781    prefix="r|"
782
783    def test_read_through(self):
784        # Issue #11224: A poorly designed _FileInFile.read() method
785        # caused seeking errors with stream tar files.
786        for tarinfo in self.tar:
787            if not tarinfo.isreg():
788                continue
789            with self.tar.extractfile(tarinfo) as fobj:
790                while True:
791                    try:
792                        buf = fobj.read(512)
793                    except tarfile.StreamError:
794                        self.fail("simple read-through using "
795                                  "TarFile.extractfile() failed")
796                    if not buf:
797                        break
798
799    def test_fileobj_regular_file(self):
800        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
801        with self.tar.extractfile(tarinfo) as fobj:
802            data = fobj.read()
803        self.assertEqual(len(data), tarinfo.size,
804                "regular file extraction failed")
805        self.assertEqual(sha256sum(data), sha256_regtype,
806                "regular file extraction failed")
807
808    def test_provoke_stream_error(self):
809        tarinfos = self.tar.getmembers()
810        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
811            self.assertRaises(tarfile.StreamError, f.read)
812
813    def test_compare_members(self):
814        tar1 = tarfile.open(tarname, encoding="iso8859-1")
815        try:
816            tar2 = self.tar
817
818            while True:
819                t1 = tar1.next()
820                t2 = tar2.next()
821                if t1 is None:
822                    break
823                self.assertIsNotNone(t2, "stream.next() failed.")
824
825                if t2.islnk() or t2.issym():
826                    with self.assertRaises(tarfile.StreamError):
827                        tar2.extractfile(t2)
828                    continue
829
830                v1 = tar1.extractfile(t1)
831                v2 = tar2.extractfile(t2)
832                if v1 is None:
833                    continue
834                self.assertIsNotNone(v2, "stream.extractfile() failed")
835                self.assertEqual(v1.read(), v2.read(),
836                        "stream extraction failed")
837        finally:
838            tar1.close()
839
840class GzipStreamReadTest(GzipTest, StreamReadTest):
841    pass
842
843class Bz2StreamReadTest(Bz2Test, StreamReadTest):
844    pass
845
846class LzmaStreamReadTest(LzmaTest, StreamReadTest):
847    pass
848
849
850class DetectReadTest(TarTest, unittest.TestCase):
851    def _testfunc_file(self, name, mode):
852        try:
853            tar = tarfile.open(name, mode)
854        except tarfile.ReadError as e:
855            self.fail()
856        else:
857            tar.close()
858
859    def _testfunc_fileobj(self, name, mode):
860        try:
861            with open(name, "rb") as f:
862                tar = tarfile.open(name, mode, fileobj=f)
863        except tarfile.ReadError as e:
864            self.fail()
865        else:
866            tar.close()
867
868    def _test_modes(self, testfunc):
869        if self.suffix:
870            with self.assertRaises(tarfile.ReadError):
871                tarfile.open(tarname, mode="r:" + self.suffix)
872            with self.assertRaises(tarfile.ReadError):
873                tarfile.open(tarname, mode="r|" + self.suffix)
874            with self.assertRaises(tarfile.ReadError):
875                tarfile.open(self.tarname, mode="r:")
876            with self.assertRaises(tarfile.ReadError):
877                tarfile.open(self.tarname, mode="r|")
878        testfunc(self.tarname, "r")
879        testfunc(self.tarname, "r:" + self.suffix)
880        testfunc(self.tarname, "r:*")
881        testfunc(self.tarname, "r|" + self.suffix)
882        testfunc(self.tarname, "r|*")
883
884    def test_detect_file(self):
885        self._test_modes(self._testfunc_file)
886
887    def test_detect_fileobj(self):
888        self._test_modes(self._testfunc_fileobj)
889
890class GzipDetectReadTest(GzipTest, DetectReadTest):
891    pass
892
893class Bz2DetectReadTest(Bz2Test, DetectReadTest):
894    def test_detect_stream_bz2(self):
895        # Originally, tarfile's stream detection looked for the string
896        # "BZh91" at the start of the file. This is incorrect because
897        # the '9' represents the blocksize (900,000 bytes). If the file was
898        # compressed using another blocksize autodetection fails.
899        with open(tarname, "rb") as fobj:
900            data = fobj.read()
901
902        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
903        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
904            fobj.write(data)
905
906        self._testfunc_file(tmpname, "r|*")
907
908class LzmaDetectReadTest(LzmaTest, DetectReadTest):
909    pass
910
911
912class MemberReadTest(ReadTest, unittest.TestCase):
913
914    def _test_member(self, tarinfo, chksum=None, **kwargs):
915        if chksum is not None:
916            with self.tar.extractfile(tarinfo) as f:
917                self.assertEqual(sha256sum(f.read()), chksum,
918                        "wrong sha256sum for %s" % tarinfo.name)
919
920        kwargs["mtime"] = 0o7606136617
921        kwargs["uid"] = 1000
922        kwargs["gid"] = 100
923        if "old-v7" not in tarinfo.name:
924            # V7 tar can't handle alphabetic owners.
925            kwargs["uname"] = "tarfile"
926            kwargs["gname"] = "tarfile"
927        for k, v in kwargs.items():
928            self.assertEqual(getattr(tarinfo, k), v,
929                    "wrong value in %s field of %s" % (k, tarinfo.name))
930
931    def test_find_regtype(self):
932        tarinfo = self.tar.getmember("ustar/regtype")
933        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
934
935    def test_find_conttype(self):
936        tarinfo = self.tar.getmember("ustar/conttype")
937        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
938
939    def test_find_dirtype(self):
940        tarinfo = self.tar.getmember("ustar/dirtype")
941        self._test_member(tarinfo, size=0)
942
943    def test_find_dirtype_with_size(self):
944        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
945        self._test_member(tarinfo, size=255)
946
947    def test_find_lnktype(self):
948        tarinfo = self.tar.getmember("ustar/lnktype")
949        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
950
951    def test_find_symtype(self):
952        tarinfo = self.tar.getmember("ustar/symtype")
953        self._test_member(tarinfo, size=0, linkname="regtype")
954
955    def test_find_blktype(self):
956        tarinfo = self.tar.getmember("ustar/blktype")
957        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
958
959    def test_find_chrtype(self):
960        tarinfo = self.tar.getmember("ustar/chrtype")
961        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
962
963    def test_find_fifotype(self):
964        tarinfo = self.tar.getmember("ustar/fifotype")
965        self._test_member(tarinfo, size=0)
966
967    def test_find_sparse(self):
968        tarinfo = self.tar.getmember("ustar/sparse")
969        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
970
971    def test_find_gnusparse(self):
972        tarinfo = self.tar.getmember("gnu/sparse")
973        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
974
975    def test_find_gnusparse_00(self):
976        tarinfo = self.tar.getmember("gnu/sparse-0.0")
977        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
978
979    def test_find_gnusparse_01(self):
980        tarinfo = self.tar.getmember("gnu/sparse-0.1")
981        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
982
983    def test_find_gnusparse_10(self):
984        tarinfo = self.tar.getmember("gnu/sparse-1.0")
985        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
986
987    def test_find_umlauts(self):
988        tarinfo = self.tar.getmember("ustar/umlauts-"
989                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
990        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
991
992    def test_find_ustar_longname(self):
993        name = "ustar/" + "12345/" * 39 + "1234567/longname"
994        self.assertIn(name, self.tar.getnames())
995
996    def test_find_regtype_oldv7(self):
997        tarinfo = self.tar.getmember("misc/regtype-old-v7")
998        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
999
1000    def test_find_pax_umlauts(self):
1001        self.tar.close()
1002        self.tar = tarfile.open(self.tarname, mode=self.mode,
1003                                encoding="iso8859-1")
1004        tarinfo = self.tar.getmember("pax/umlauts-"
1005                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1006        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1007
1008
1009class LongnameTest:
1010
1011    def test_read_longname(self):
1012        # Test reading of longname (bug #1471427).
1013        longname = self.subdir + "/" + "123/" * 125 + "longname"
1014        try:
1015            tarinfo = self.tar.getmember(longname)
1016        except KeyError:
1017            self.fail("longname not found")
1018        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
1019                "read longname as dirtype")
1020
1021    def test_read_longlink(self):
1022        longname = self.subdir + "/" + "123/" * 125 + "longname"
1023        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
1024        try:
1025            tarinfo = self.tar.getmember(longlink)
1026        except KeyError:
1027            self.fail("longlink not found")
1028        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
1029
1030    def test_truncated_longname(self):
1031        longname = self.subdir + "/" + "123/" * 125 + "longname"
1032        tarinfo = self.tar.getmember(longname)
1033        offset = tarinfo.offset
1034        self.tar.fileobj.seek(offset)
1035        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
1036        with self.assertRaises(tarfile.ReadError):
1037            tarfile.open(name="foo.tar", fileobj=fobj)
1038
1039    def test_header_offset(self):
1040        # Test if the start offset of the TarInfo object includes
1041        # the preceding extended header.
1042        longname = self.subdir + "/" + "123/" * 125 + "longname"
1043        offset = self.tar.getmember(longname).offset
1044        with open(tarname, "rb") as fobj:
1045            fobj.seek(offset)
1046            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
1047                                              "iso8859-1", "strict")
1048            self.assertEqual(tarinfo.type, self.longnametype)
1049
1050    def test_longname_directory(self):
1051        # Test reading a longlink directory. Issue #47231.
1052        longdir = ('a' * 101) + '/'
1053        with os_helper.temp_cwd():
1054            with tarfile.open(tmpname, 'w') as tar:
1055                tar.format = self.format
1056                try:
1057                    os.mkdir(longdir)
1058                    tar.add(longdir)
1059                finally:
1060                    os.rmdir(longdir.rstrip("/"))
1061            with tarfile.open(tmpname) as tar:
1062                self.assertIsNotNone(tar.getmember(longdir))
1063                self.assertIsNotNone(tar.getmember(longdir.removesuffix('/')))
1064
1065class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
1066
1067    subdir = "gnu"
1068    longnametype = tarfile.GNUTYPE_LONGNAME
1069    format = tarfile.GNU_FORMAT
1070
1071    # Since 3.2 tarfile is supposed to accurately restore sparse members and
1072    # produce files with holes. This is what we actually want to test here.
1073    # Unfortunately, not all platforms/filesystems support sparse files, and
1074    # even on platforms that do it is non-trivial to make reliable assertions
1075    # about holes in files. Therefore, we first do one basic test which works
1076    # an all platforms, and after that a test that will work only on
1077    # platforms/filesystems that prove to support sparse files.
1078    def _test_sparse_file(self, name):
1079        self.tar.extract(name, TEMPDIR, filter='data')
1080        filename = os.path.join(TEMPDIR, name)
1081        with open(filename, "rb") as fobj:
1082            data = fobj.read()
1083        self.assertEqual(sha256sum(data), sha256_sparse,
1084                "wrong sha256sum for %s" % name)
1085
1086        if self._fs_supports_holes():
1087            s = os.stat(filename)
1088            self.assertLess(s.st_blocks * 512, s.st_size)
1089
1090    def test_sparse_file_old(self):
1091        self._test_sparse_file("gnu/sparse")
1092
1093    def test_sparse_file_00(self):
1094        self._test_sparse_file("gnu/sparse-0.0")
1095
1096    def test_sparse_file_01(self):
1097        self._test_sparse_file("gnu/sparse-0.1")
1098
1099    def test_sparse_file_10(self):
1100        self._test_sparse_file("gnu/sparse-1.0")
1101
1102    @staticmethod
1103    def _fs_supports_holes():
1104        # Return True if the platform knows the st_blocks stat attribute and
1105        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1106        # store holes of 4 KiB in files.
1107        #
1108        # The function returns False if page size is larger than 4 KiB.
1109        # For example, ppc64 uses pages of 64 KiB.
1110        if sys.platform.startswith("linux"):
1111            # Linux evidentially has 512 byte st_blocks units.
1112            name = os.path.join(TEMPDIR, "sparse-test")
1113            with open(name, "wb") as fobj:
1114                # Seek to "punch a hole" of 4 KiB
1115                fobj.seek(4096)
1116                fobj.write(b'x' * 4096)
1117                fobj.truncate()
1118            s = os.stat(name)
1119            os_helper.unlink(name)
1120            return (s.st_blocks * 512 < s.st_size)
1121        else:
1122            return False
1123
1124
1125class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1126
1127    subdir = "pax"
1128    longnametype = tarfile.XHDTYPE
1129    format = tarfile.PAX_FORMAT
1130
1131    def test_pax_global_headers(self):
1132        tar = tarfile.open(tarname, encoding="iso8859-1")
1133        try:
1134            tarinfo = tar.getmember("pax/regtype1")
1135            self.assertEqual(tarinfo.uname, "foo")
1136            self.assertEqual(tarinfo.gname, "bar")
1137            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1138                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1139
1140            tarinfo = tar.getmember("pax/regtype2")
1141            self.assertEqual(tarinfo.uname, "")
1142            self.assertEqual(tarinfo.gname, "bar")
1143            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1144                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1145
1146            tarinfo = tar.getmember("pax/regtype3")
1147            self.assertEqual(tarinfo.uname, "tarfile")
1148            self.assertEqual(tarinfo.gname, "tarfile")
1149            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1150                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1151        finally:
1152            tar.close()
1153
1154    def test_pax_number_fields(self):
1155        # All following number fields are read from the pax header.
1156        tar = tarfile.open(tarname, encoding="iso8859-1")
1157        try:
1158            tarinfo = tar.getmember("pax/regtype4")
1159            self.assertEqual(tarinfo.size, 7011)
1160            self.assertEqual(tarinfo.uid, 123)
1161            self.assertEqual(tarinfo.gid, 123)
1162            self.assertEqual(tarinfo.mtime, 1041808783.0)
1163            self.assertEqual(type(tarinfo.mtime), float)
1164            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1165            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1166        finally:
1167            tar.close()
1168
1169    def test_pax_header_bad_formats(self):
1170        # The fields from the pax header have priority over the
1171        # TarInfo.
1172        pax_header_replacements = (
1173            b" foo=bar\n",
1174            b"0 \n",
1175            b"1 \n",
1176            b"2 \n",
1177            b"3 =\n",
1178            b"4 =a\n",
1179            b"1000000 foo=bar\n",
1180            b"0 foo=bar\n",
1181            b"-12 foo=bar\n",
1182            b"000000000000000000000000036 foo=bar\n",
1183        )
1184        pax_headers = {"foo": "bar"}
1185
1186        for replacement in pax_header_replacements:
1187            with self.subTest(header=replacement):
1188                tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1189                                   encoding="iso8859-1")
1190                try:
1191                    t = tarfile.TarInfo()
1192                    t.name = "pax"  # non-ASCII
1193                    t.uid = 1
1194                    t.pax_headers = pax_headers
1195                    tar.addfile(t)
1196                finally:
1197                    tar.close()
1198
1199                with open(tmpname, "rb") as f:
1200                    data = f.read()
1201                    self.assertIn(b"11 foo=bar\n", data)
1202                    data = data.replace(b"11 foo=bar\n", replacement)
1203
1204                with open(tmpname, "wb") as f:
1205                    f.truncate()
1206                    f.write(data)
1207
1208                with self.assertRaisesRegex(tarfile.ReadError, r"method tar: ReadError\('invalid header'\)"):
1209                    tarfile.open(tmpname, encoding="iso8859-1")
1210
1211
1212class WriteTestBase(TarTest):
1213    # Put all write tests in here that are supposed to be tested
1214    # in all possible mode combinations.
1215
1216    def test_fileobj_no_close(self):
1217        fobj = io.BytesIO()
1218        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1219            tar.addfile(tarfile.TarInfo("foo"))
1220        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1221        # Issue #20238: Incomplete gzip output with mode="w:gz"
1222        data = fobj.getvalue()
1223        del tar
1224        support.gc_collect()
1225        self.assertFalse(fobj.closed)
1226        self.assertEqual(data, fobj.getvalue())
1227
1228    def test_eof_marker(self):
1229        # Make sure an end of archive marker is written (two zero blocks).
1230        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1231        # So, we create an archive that has exactly 10240 bytes without the
1232        # marker, and has 20480 bytes once the marker is written.
1233        with tarfile.open(tmpname, self.mode) as tar:
1234            t = tarfile.TarInfo("foo")
1235            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1236            tar.addfile(t, io.BytesIO(b"a" * t.size))
1237
1238        with self.open(tmpname, "rb") as fobj:
1239            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1240
1241
1242class WriteTest(WriteTestBase, unittest.TestCase):
1243
1244    prefix = "w:"
1245
1246    def test_100_char_name(self):
1247        # The name field in a tar header stores strings of at most 100 chars.
1248        # If a string is shorter than 100 chars it has to be padded with '\0',
1249        # which implies that a string of exactly 100 chars is stored without
1250        # a trailing '\0'.
1251        name = "0123456789" * 10
1252        tar = tarfile.open(tmpname, self.mode)
1253        try:
1254            t = tarfile.TarInfo(name)
1255            tar.addfile(t)
1256        finally:
1257            tar.close()
1258
1259        tar = tarfile.open(tmpname)
1260        try:
1261            self.assertEqual(tar.getnames()[0], name,
1262                    "failed to store 100 char filename")
1263        finally:
1264            tar.close()
1265
1266    def test_tar_size(self):
1267        # Test for bug #1013882.
1268        tar = tarfile.open(tmpname, self.mode)
1269        try:
1270            path = os.path.join(TEMPDIR, "file")
1271            with open(path, "wb") as fobj:
1272                fobj.write(b"aaa")
1273            tar.add(path)
1274        finally:
1275            tar.close()
1276        self.assertGreater(os.path.getsize(tmpname), 0,
1277                "tarfile is empty")
1278
1279    # The test_*_size tests test for bug #1167128.
1280    def test_file_size(self):
1281        tar = tarfile.open(tmpname, self.mode)
1282        try:
1283            path = os.path.join(TEMPDIR, "file")
1284            with open(path, "wb"):
1285                pass
1286            tarinfo = tar.gettarinfo(path)
1287            self.assertEqual(tarinfo.size, 0)
1288
1289            with open(path, "wb") as fobj:
1290                fobj.write(b"aaa")
1291            tarinfo = tar.gettarinfo(path)
1292            self.assertEqual(tarinfo.size, 3)
1293        finally:
1294            tar.close()
1295
1296    def test_directory_size(self):
1297        path = os.path.join(TEMPDIR, "directory")
1298        os.mkdir(path)
1299        try:
1300            tar = tarfile.open(tmpname, self.mode)
1301            try:
1302                tarinfo = tar.gettarinfo(path)
1303                self.assertEqual(tarinfo.size, 0)
1304            finally:
1305                tar.close()
1306        finally:
1307            os_helper.rmdir(path)
1308
1309    # mock the following:
1310    #  os.listdir: so we know that files are in the wrong order
1311    def test_ordered_recursion(self):
1312        path = os.path.join(TEMPDIR, "directory")
1313        os.mkdir(path)
1314        open(os.path.join(path, "1"), "a").close()
1315        open(os.path.join(path, "2"), "a").close()
1316        try:
1317            tar = tarfile.open(tmpname, self.mode)
1318            try:
1319                with unittest.mock.patch('os.listdir') as mock_listdir:
1320                    mock_listdir.return_value = ["2", "1"]
1321                    tar.add(path)
1322                paths = []
1323                for m in tar.getmembers():
1324                    paths.append(os.path.split(m.name)[-1])
1325                self.assertEqual(paths, ["directory", "1", "2"]);
1326            finally:
1327                tar.close()
1328        finally:
1329            os_helper.unlink(os.path.join(path, "1"))
1330            os_helper.unlink(os.path.join(path, "2"))
1331            os_helper.rmdir(path)
1332
1333    def test_gettarinfo_pathlike_name(self):
1334        with tarfile.open(tmpname, self.mode) as tar:
1335            path = pathlib.Path(TEMPDIR) / "file"
1336            with open(path, "wb") as fobj:
1337                fobj.write(b"aaa")
1338            tarinfo = tar.gettarinfo(path)
1339            tarinfo2 = tar.gettarinfo(os.fspath(path))
1340            self.assertIsInstance(tarinfo.name, str)
1341            self.assertEqual(tarinfo.name, tarinfo2.name)
1342            self.assertEqual(tarinfo.size, 3)
1343
1344    @unittest.skipUnless(hasattr(os, "link"),
1345                         "Missing hardlink implementation")
1346    def test_link_size(self):
1347        link = os.path.join(TEMPDIR, "link")
1348        target = os.path.join(TEMPDIR, "link_target")
1349        with open(target, "wb") as fobj:
1350            fobj.write(b"aaa")
1351        try:
1352            os.link(target, link)
1353        except PermissionError as e:
1354            self.skipTest('os.link(): %s' % e)
1355        try:
1356            tar = tarfile.open(tmpname, self.mode)
1357            try:
1358                # Record the link target in the inodes list.
1359                tar.gettarinfo(target)
1360                tarinfo = tar.gettarinfo(link)
1361                self.assertEqual(tarinfo.size, 0)
1362            finally:
1363                tar.close()
1364        finally:
1365            os_helper.unlink(target)
1366            os_helper.unlink(link)
1367
1368    @os_helper.skip_unless_symlink
1369    def test_symlink_size(self):
1370        path = os.path.join(TEMPDIR, "symlink")
1371        os.symlink("link_target", path)
1372        try:
1373            tar = tarfile.open(tmpname, self.mode)
1374            try:
1375                tarinfo = tar.gettarinfo(path)
1376                self.assertEqual(tarinfo.size, 0)
1377            finally:
1378                tar.close()
1379        finally:
1380            os_helper.unlink(path)
1381
1382    def test_add_self(self):
1383        # Test for #1257255.
1384        dstname = os.path.abspath(tmpname)
1385        tar = tarfile.open(tmpname, self.mode)
1386        try:
1387            self.assertEqual(tar.name, dstname,
1388                    "archive name must be absolute")
1389            tar.add(dstname)
1390            self.assertEqual(tar.getnames(), [],
1391                    "added the archive to itself")
1392
1393            with os_helper.change_cwd(TEMPDIR):
1394                tar.add(dstname)
1395            self.assertEqual(tar.getnames(), [],
1396                    "added the archive to itself")
1397        finally:
1398            tar.close()
1399
1400    def test_filter(self):
1401        tempdir = os.path.join(TEMPDIR, "filter")
1402        os.mkdir(tempdir)
1403        try:
1404            for name in ("foo", "bar", "baz"):
1405                name = os.path.join(tempdir, name)
1406                os_helper.create_empty_file(name)
1407
1408            def filter(tarinfo):
1409                if os.path.basename(tarinfo.name) == "bar":
1410                    return
1411                tarinfo.uid = 123
1412                tarinfo.uname = "foo"
1413                return tarinfo
1414
1415            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1416            try:
1417                tar.add(tempdir, arcname="empty_dir", filter=filter)
1418            finally:
1419                tar.close()
1420
1421            # Verify that filter is a keyword-only argument
1422            with self.assertRaises(TypeError):
1423                tar.add(tempdir, "empty_dir", True, None, filter)
1424
1425            tar = tarfile.open(tmpname, "r")
1426            try:
1427                for tarinfo in tar:
1428                    self.assertEqual(tarinfo.uid, 123)
1429                    self.assertEqual(tarinfo.uname, "foo")
1430                self.assertEqual(len(tar.getmembers()), 3)
1431            finally:
1432                tar.close()
1433        finally:
1434            os_helper.rmtree(tempdir)
1435
1436    # Guarantee that stored pathnames are not modified. Don't
1437    # remove ./ or ../ or double slashes. Still make absolute
1438    # pathnames relative.
1439    # For details see bug #6054.
1440    def _test_pathname(self, path, cmp_path=None, dir=False):
1441        # Create a tarfile with an empty member named path
1442        # and compare the stored name with the original.
1443        foo = os.path.join(TEMPDIR, "foo")
1444        if not dir:
1445            os_helper.create_empty_file(foo)
1446        else:
1447            os.mkdir(foo)
1448
1449        tar = tarfile.open(tmpname, self.mode)
1450        try:
1451            tar.add(foo, arcname=path)
1452        finally:
1453            tar.close()
1454
1455        tar = tarfile.open(tmpname, "r")
1456        try:
1457            t = tar.next()
1458        finally:
1459            tar.close()
1460
1461        if not dir:
1462            os_helper.unlink(foo)
1463        else:
1464            os_helper.rmdir(foo)
1465
1466        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1467
1468
1469    @os_helper.skip_unless_symlink
1470    def test_extractall_symlinks(self):
1471        # Test if extractall works properly when tarfile contains symlinks
1472        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1473        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1474        os.mkdir(tempdir)
1475        try:
1476            source_file = os.path.join(tempdir,'source')
1477            target_file = os.path.join(tempdir,'symlink')
1478            with open(source_file,'w') as f:
1479                f.write('something\n')
1480            os.symlink(source_file, target_file)
1481            with tarfile.open(temparchive, 'w') as tar:
1482                tar.add(source_file, arcname="source")
1483                tar.add(target_file, arcname="symlink")
1484            # Let's extract it to the location which contains the symlink
1485            with tarfile.open(temparchive, errorlevel=2) as tar:
1486                # this should not raise OSError: [Errno 17] File exists
1487                try:
1488                    tar.extractall(path=tempdir,
1489                                   filter='fully_trusted')
1490                except OSError:
1491                    self.fail("extractall failed with symlinked files")
1492        finally:
1493            os_helper.unlink(temparchive)
1494            os_helper.rmtree(tempdir)
1495
1496    def test_pathnames(self):
1497        self._test_pathname("foo")
1498        self._test_pathname(os.path.join("foo", ".", "bar"))
1499        self._test_pathname(os.path.join("foo", "..", "bar"))
1500        self._test_pathname(os.path.join(".", "foo"))
1501        self._test_pathname(os.path.join(".", "foo", "."))
1502        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1503        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1504        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1505        self._test_pathname(os.path.join("..", "foo"))
1506        self._test_pathname(os.path.join("..", "foo", ".."))
1507        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1508        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1509
1510        self._test_pathname("foo" + os.sep + os.sep + "bar")
1511        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1512
1513    def test_abs_pathnames(self):
1514        if sys.platform == "win32":
1515            self._test_pathname("C:\\foo", "foo")
1516        else:
1517            self._test_pathname("/foo", "foo")
1518            self._test_pathname("///foo", "foo")
1519
1520    def test_cwd(self):
1521        # Test adding the current working directory.
1522        with os_helper.change_cwd(TEMPDIR):
1523            tar = tarfile.open(tmpname, self.mode)
1524            try:
1525                tar.add(".")
1526            finally:
1527                tar.close()
1528
1529            tar = tarfile.open(tmpname, "r")
1530            try:
1531                for t in tar:
1532                    if t.name != ".":
1533                        self.assertTrue(t.name.startswith("./"), t.name)
1534            finally:
1535                tar.close()
1536
1537    def test_open_nonwritable_fileobj(self):
1538        for exctype in OSError, EOFError, RuntimeError:
1539            class BadFile(io.BytesIO):
1540                first = True
1541                def write(self, data):
1542                    if self.first:
1543                        self.first = False
1544                        raise exctype
1545
1546            f = BadFile()
1547            with self.assertRaises(exctype):
1548                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1549                                   format=tarfile.PAX_FORMAT,
1550                                   pax_headers={'non': 'empty'})
1551            self.assertFalse(f.closed)
1552
1553
1554class GzipWriteTest(GzipTest, WriteTest):
1555    pass
1556
1557
1558class Bz2WriteTest(Bz2Test, WriteTest):
1559    pass
1560
1561
1562class LzmaWriteTest(LzmaTest, WriteTest):
1563    pass
1564
1565
1566class StreamWriteTest(WriteTestBase, unittest.TestCase):
1567
1568    prefix = "w|"
1569    decompressor = None
1570
1571    def test_stream_padding(self):
1572        # Test for bug #1543303.
1573        tar = tarfile.open(tmpname, self.mode)
1574        tar.close()
1575        if self.decompressor:
1576            dec = self.decompressor()
1577            with open(tmpname, "rb") as fobj:
1578                data = fobj.read()
1579            data = dec.decompress(data)
1580            self.assertFalse(dec.unused_data, "found trailing data")
1581        else:
1582            with self.open(tmpname) as fobj:
1583                data = fobj.read()
1584        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1585                        "incorrect zero padding")
1586
1587    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1588                         "Missing umask implementation")
1589    @unittest.skipIf(
1590        support.is_emscripten or support.is_wasi,
1591        "Emscripten's/WASI's umask is a stub."
1592    )
1593    def test_file_mode(self):
1594        # Test for issue #8464: Create files with correct
1595        # permissions.
1596        if os.path.exists(tmpname):
1597            os_helper.unlink(tmpname)
1598
1599        original_umask = os.umask(0o022)
1600        try:
1601            tar = tarfile.open(tmpname, self.mode)
1602            tar.close()
1603            mode = os.stat(tmpname).st_mode & 0o777
1604            self.assertEqual(mode, 0o644, "wrong file permissions")
1605        finally:
1606            os.umask(original_umask)
1607
1608
1609class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1610    def test_source_directory_not_leaked(self):
1611        """
1612        Ensure the source directory is not included in the tar header
1613        per bpo-41316.
1614        """
1615        tarfile.open(tmpname, self.mode).close()
1616        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1617        assert os.path.dirname(tmpname) not in payload
1618
1619
1620class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1621    decompressor = bz2.BZ2Decompressor if bz2 else None
1622
1623class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1624    decompressor = lzma.LZMADecompressor if lzma else None
1625
1626
1627class GNUWriteTest(unittest.TestCase):
1628    # This testcase checks for correct creation of GNU Longname
1629    # and Longlink extended headers (cp. bug #812325).
1630
1631    def _length(self, s):
1632        blocks = len(s) // 512 + 1
1633        return blocks * 512
1634
1635    def _calc_size(self, name, link=None):
1636        # Initial tar header
1637        count = 512
1638
1639        if len(name) > tarfile.LENGTH_NAME:
1640            # GNU longname extended header + longname
1641            count += 512
1642            count += self._length(name)
1643        if link is not None and len(link) > tarfile.LENGTH_LINK:
1644            # GNU longlink extended header + longlink
1645            count += 512
1646            count += self._length(link)
1647        return count
1648
1649    def _test(self, name, link=None):
1650        tarinfo = tarfile.TarInfo(name)
1651        if link:
1652            tarinfo.linkname = link
1653            tarinfo.type = tarfile.LNKTYPE
1654
1655        tar = tarfile.open(tmpname, "w")
1656        try:
1657            tar.format = tarfile.GNU_FORMAT
1658            tar.addfile(tarinfo)
1659
1660            v1 = self._calc_size(name, link)
1661            v2 = tar.offset
1662            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1663        finally:
1664            tar.close()
1665
1666        tar = tarfile.open(tmpname)
1667        try:
1668            member = tar.next()
1669            self.assertIsNotNone(member,
1670                    "unable to read longname member")
1671            self.assertEqual(tarinfo.name, member.name,
1672                    "unable to read longname member")
1673            self.assertEqual(tarinfo.linkname, member.linkname,
1674                    "unable to read longname member")
1675        finally:
1676            tar.close()
1677
1678    def test_longname_1023(self):
1679        self._test(("longnam/" * 127) + "longnam")
1680
1681    def test_longname_1024(self):
1682        self._test(("longnam/" * 127) + "longname")
1683
1684    def test_longname_1025(self):
1685        self._test(("longnam/" * 127) + "longname_")
1686
1687    def test_longlink_1023(self):
1688        self._test("name", ("longlnk/" * 127) + "longlnk")
1689
1690    def test_longlink_1024(self):
1691        self._test("name", ("longlnk/" * 127) + "longlink")
1692
1693    def test_longlink_1025(self):
1694        self._test("name", ("longlnk/" * 127) + "longlink_")
1695
1696    def test_longnamelink_1023(self):
1697        self._test(("longnam/" * 127) + "longnam",
1698                   ("longlnk/" * 127) + "longlnk")
1699
1700    def test_longnamelink_1024(self):
1701        self._test(("longnam/" * 127) + "longname",
1702                   ("longlnk/" * 127) + "longlink")
1703
1704    def test_longnamelink_1025(self):
1705        self._test(("longnam/" * 127) + "longname_",
1706                   ("longlnk/" * 127) + "longlink_")
1707
1708
1709class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1710
1711    prefix = "w:"
1712
1713    def test_headers_written_only_for_device_files(self):
1714        # Regression test for bpo-18819.
1715        tempdir = os.path.join(TEMPDIR, "device_header_test")
1716        os.mkdir(tempdir)
1717        try:
1718            tar = tarfile.open(tmpname, self.mode)
1719            try:
1720                input_blk = tarfile.TarInfo(name="my_block_device")
1721                input_reg = tarfile.TarInfo(name="my_regular_file")
1722                input_blk.type = tarfile.BLKTYPE
1723                input_reg.type = tarfile.REGTYPE
1724                tar.addfile(input_blk)
1725                tar.addfile(input_reg)
1726            finally:
1727                tar.close()
1728
1729            # devmajor and devminor should be *interpreted* as 0 in both...
1730            tar = tarfile.open(tmpname, "r")
1731            try:
1732                output_blk = tar.getmember("my_block_device")
1733                output_reg = tar.getmember("my_regular_file")
1734            finally:
1735                tar.close()
1736            self.assertEqual(output_blk.devmajor, 0)
1737            self.assertEqual(output_blk.devminor, 0)
1738            self.assertEqual(output_reg.devmajor, 0)
1739            self.assertEqual(output_reg.devminor, 0)
1740
1741            # ...but the fields should not actually be set on regular files:
1742            with open(tmpname, "rb") as infile:
1743                buf = infile.read()
1744            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1745            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1746            # See `struct posixheader` in GNU docs for byte offsets:
1747            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1748            device_headers = slice(329, 329 + 16)
1749            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1750            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1751        finally:
1752            os_helper.rmtree(tempdir)
1753
1754
1755class CreateTest(WriteTestBase, unittest.TestCase):
1756
1757    prefix = "x:"
1758
1759    file_path = os.path.join(TEMPDIR, "spameggs42")
1760
1761    def setUp(self):
1762        os_helper.unlink(tmpname)
1763
1764    @classmethod
1765    def setUpClass(cls):
1766        with open(cls.file_path, "wb") as fobj:
1767            fobj.write(b"aaa")
1768
1769    @classmethod
1770    def tearDownClass(cls):
1771        os_helper.unlink(cls.file_path)
1772
1773    def test_create(self):
1774        with tarfile.open(tmpname, self.mode) as tobj:
1775            tobj.add(self.file_path)
1776
1777        with self.taropen(tmpname) as tobj:
1778            names = tobj.getnames()
1779        self.assertEqual(len(names), 1)
1780        self.assertIn('spameggs42', names[0])
1781
1782    def test_create_existing(self):
1783        with tarfile.open(tmpname, self.mode) as tobj:
1784            tobj.add(self.file_path)
1785
1786        with self.assertRaises(FileExistsError):
1787            tobj = tarfile.open(tmpname, self.mode)
1788
1789        with self.taropen(tmpname) as tobj:
1790            names = tobj.getnames()
1791        self.assertEqual(len(names), 1)
1792        self.assertIn('spameggs42', names[0])
1793
1794    def test_create_taropen(self):
1795        with self.taropen(tmpname, "x") as tobj:
1796            tobj.add(self.file_path)
1797
1798        with self.taropen(tmpname) as tobj:
1799            names = tobj.getnames()
1800        self.assertEqual(len(names), 1)
1801        self.assertIn('spameggs42', names[0])
1802
1803    def test_create_existing_taropen(self):
1804        with self.taropen(tmpname, "x") as tobj:
1805            tobj.add(self.file_path)
1806
1807        with self.assertRaises(FileExistsError):
1808            with self.taropen(tmpname, "x"):
1809                pass
1810
1811        with self.taropen(tmpname) as tobj:
1812            names = tobj.getnames()
1813        self.assertEqual(len(names), 1)
1814        self.assertIn("spameggs42", names[0])
1815
1816    def test_create_pathlike_name(self):
1817        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1818            self.assertIsInstance(tobj.name, str)
1819            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1820            tobj.add(pathlib.Path(self.file_path))
1821            names = tobj.getnames()
1822        self.assertEqual(len(names), 1)
1823        self.assertIn('spameggs42', names[0])
1824
1825        with self.taropen(tmpname) as tobj:
1826            names = tobj.getnames()
1827        self.assertEqual(len(names), 1)
1828        self.assertIn('spameggs42', names[0])
1829
1830    def test_create_taropen_pathlike_name(self):
1831        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1832            self.assertIsInstance(tobj.name, str)
1833            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1834            tobj.add(pathlib.Path(self.file_path))
1835            names = tobj.getnames()
1836        self.assertEqual(len(names), 1)
1837        self.assertIn('spameggs42', names[0])
1838
1839        with self.taropen(tmpname) as tobj:
1840            names = tobj.getnames()
1841        self.assertEqual(len(names), 1)
1842        self.assertIn('spameggs42', names[0])
1843
1844
1845class GzipCreateTest(GzipTest, CreateTest):
1846
1847    def test_create_with_compresslevel(self):
1848        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1849            tobj.add(self.file_path)
1850        with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
1851            pass
1852
1853
1854class Bz2CreateTest(Bz2Test, CreateTest):
1855
1856    def test_create_with_compresslevel(self):
1857        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1858            tobj.add(self.file_path)
1859        with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj:
1860            pass
1861
1862
1863class LzmaCreateTest(LzmaTest, CreateTest):
1864
1865    # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel.
1866    # It does not allow for preset to be specified when reading.
1867    def test_create_with_preset(self):
1868        with tarfile.open(tmpname, self.mode, preset=1) as tobj:
1869            tobj.add(self.file_path)
1870
1871
1872class CreateWithXModeTest(CreateTest):
1873
1874    prefix = "x"
1875
1876    test_create_taropen = None
1877    test_create_existing_taropen = None
1878
1879
1880@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1881class HardlinkTest(unittest.TestCase):
1882    # Test the creation of LNKTYPE (hardlink) members in an archive.
1883
1884    def setUp(self):
1885        self.foo = os.path.join(TEMPDIR, "foo")
1886        self.bar = os.path.join(TEMPDIR, "bar")
1887
1888        with open(self.foo, "wb") as fobj:
1889            fobj.write(b"foo")
1890
1891        try:
1892            os.link(self.foo, self.bar)
1893        except PermissionError as e:
1894            self.skipTest('os.link(): %s' % e)
1895
1896        self.tar = tarfile.open(tmpname, "w")
1897        self.tar.add(self.foo)
1898
1899    def tearDown(self):
1900        self.tar.close()
1901        os_helper.unlink(self.foo)
1902        os_helper.unlink(self.bar)
1903
1904    def test_add_twice(self):
1905        # The same name will be added as a REGTYPE every
1906        # time regardless of st_nlink.
1907        tarinfo = self.tar.gettarinfo(self.foo)
1908        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1909                "add file as regular failed")
1910
1911    def test_add_hardlink(self):
1912        tarinfo = self.tar.gettarinfo(self.bar)
1913        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1914                "add file as hardlink failed")
1915
1916    def test_dereference_hardlink(self):
1917        self.tar.dereference = True
1918        tarinfo = self.tar.gettarinfo(self.bar)
1919        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1920                "dereferencing hardlink failed")
1921
1922
1923class PaxWriteTest(GNUWriteTest):
1924
1925    def _test(self, name, link=None):
1926        # See GNUWriteTest.
1927        tarinfo = tarfile.TarInfo(name)
1928        if link:
1929            tarinfo.linkname = link
1930            tarinfo.type = tarfile.LNKTYPE
1931
1932        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1933        try:
1934            tar.addfile(tarinfo)
1935        finally:
1936            tar.close()
1937
1938        tar = tarfile.open(tmpname)
1939        try:
1940            if link:
1941                l = tar.getmembers()[0].linkname
1942                self.assertEqual(link, l, "PAX longlink creation failed")
1943            else:
1944                n = tar.getmembers()[0].name
1945                self.assertEqual(name, n, "PAX longname creation failed")
1946        finally:
1947            tar.close()
1948
1949    def test_pax_global_header(self):
1950        pax_headers = {
1951                "foo": "bar",
1952                "uid": "0",
1953                "mtime": "1.23",
1954                "test": "\xe4\xf6\xfc",
1955                "\xe4\xf6\xfc": "test"}
1956
1957        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1958                pax_headers=pax_headers)
1959        try:
1960            tar.addfile(tarfile.TarInfo("test"))
1961        finally:
1962            tar.close()
1963
1964        # Test if the global header was written correctly.
1965        tar = tarfile.open(tmpname, encoding="iso8859-1")
1966        try:
1967            self.assertEqual(tar.pax_headers, pax_headers)
1968            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1969            # Test if all the fields are strings.
1970            for key, val in tar.pax_headers.items():
1971                self.assertIsNot(type(key), bytes)
1972                self.assertIsNot(type(val), bytes)
1973                if key in tarfile.PAX_NUMBER_FIELDS:
1974                    try:
1975                        tarfile.PAX_NUMBER_FIELDS[key](val)
1976                    except (TypeError, ValueError):
1977                        self.fail("unable to convert pax header field")
1978        finally:
1979            tar.close()
1980
1981    def test_pax_extended_header(self):
1982        # The fields from the pax header have priority over the
1983        # TarInfo.
1984        pax_headers = {"path": "foo", "uid": "123"}
1985
1986        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1987                           encoding="iso8859-1")
1988        try:
1989            t = tarfile.TarInfo()
1990            t.name = "\xe4\xf6\xfc" # non-ASCII
1991            t.uid = 8**8 # too large
1992            t.pax_headers = pax_headers
1993            tar.addfile(t)
1994        finally:
1995            tar.close()
1996
1997        tar = tarfile.open(tmpname, encoding="iso8859-1")
1998        try:
1999            t = tar.getmembers()[0]
2000            self.assertEqual(t.pax_headers, pax_headers)
2001            self.assertEqual(t.name, "foo")
2002            self.assertEqual(t.uid, 123)
2003        finally:
2004            tar.close()
2005
2006    def test_create_pax_header(self):
2007        # The ustar header should contain values that can be
2008        # represented reasonably, even if a better (e.g. higher
2009        # precision) version is set in the pax header.
2010        # Issue #45863
2011
2012        # values that should be kept
2013        t = tarfile.TarInfo()
2014        t.name = "foo"
2015        t.mtime = 1000.1
2016        t.size = 100
2017        t.uid = 123
2018        t.gid = 124
2019        info = t.get_info()
2020        header = t.create_pax_header(info, encoding="iso8859-1")
2021        self.assertEqual(info['name'], "foo")
2022        # mtime should be rounded to nearest second
2023        self.assertIsInstance(info['mtime'], int)
2024        self.assertEqual(info['mtime'], 1000)
2025        self.assertEqual(info['size'], 100)
2026        self.assertEqual(info['uid'], 123)
2027        self.assertEqual(info['gid'], 124)
2028        self.assertEqual(header,
2029            b'././@PaxHeader' + bytes(86) \
2030            + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \
2031            + bytes(100) + b'ustar\x0000'+ bytes(247) \
2032            + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \
2033            + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \
2034            + bytes(100) + b'ustar\x0000' + bytes(247))
2035
2036        # values that should be changed
2037        t = tarfile.TarInfo()
2038        t.name = "foo\u3374" # can't be represented in ascii
2039        t.mtime = 10**10 # too big
2040        t.size = 10**10 # too big
2041        t.uid = 8**8 # too big
2042        t.gid = 8**8+1 # too big
2043        info = t.get_info()
2044        header = t.create_pax_header(info, encoding="iso8859-1")
2045        # name is kept as-is in info but should be added to pax header
2046        self.assertEqual(info['name'], "foo\u3374")
2047        self.assertEqual(info['mtime'], 0)
2048        self.assertEqual(info['size'], 0)
2049        self.assertEqual(info['uid'], 0)
2050        self.assertEqual(info['gid'], 0)
2051        self.assertEqual(header,
2052            b'././@PaxHeader' + bytes(86) \
2053            + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \
2054            + bytes(100) + b'ustar\x0000' + bytes(247) \
2055            + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \
2056            + b'16 gid=16777217\n20 size=10000000000\n' \
2057            + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \
2058            + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \
2059            + bytes(100) + b'ustar\x0000' + bytes(247))
2060
2061
2062class UnicodeTest:
2063
2064    def test_iso8859_1_filename(self):
2065        self._test_unicode_filename("iso8859-1")
2066
2067    def test_utf7_filename(self):
2068        self._test_unicode_filename("utf7")
2069
2070    def test_utf8_filename(self):
2071        self._test_unicode_filename("utf-8")
2072
2073    def _test_unicode_filename(self, encoding):
2074        tar = tarfile.open(tmpname, "w", format=self.format,
2075                           encoding=encoding, errors="strict")
2076        try:
2077            name = "\xe4\xf6\xfc"
2078            tar.addfile(tarfile.TarInfo(name))
2079        finally:
2080            tar.close()
2081
2082        tar = tarfile.open(tmpname, encoding=encoding)
2083        try:
2084            self.assertEqual(tar.getmembers()[0].name, name)
2085        finally:
2086            tar.close()
2087
2088    def test_unicode_filename_error(self):
2089        tar = tarfile.open(tmpname, "w", format=self.format,
2090                           encoding="ascii", errors="strict")
2091        try:
2092            tarinfo = tarfile.TarInfo()
2093
2094            tarinfo.name = "\xe4\xf6\xfc"
2095            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2096
2097            tarinfo.name = "foo"
2098            tarinfo.uname = "\xe4\xf6\xfc"
2099            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2100        finally:
2101            tar.close()
2102
2103    def test_unicode_argument(self):
2104        tar = tarfile.open(tarname, "r",
2105                           encoding="iso8859-1", errors="strict")
2106        try:
2107            for t in tar:
2108                self.assertIs(type(t.name), str)
2109                self.assertIs(type(t.linkname), str)
2110                self.assertIs(type(t.uname), str)
2111                self.assertIs(type(t.gname), str)
2112        finally:
2113            tar.close()
2114
2115    def test_uname_unicode(self):
2116        t = tarfile.TarInfo("foo")
2117        t.uname = "\xe4\xf6\xfc"
2118        t.gname = "\xe4\xf6\xfc"
2119
2120        tar = tarfile.open(tmpname, mode="w", format=self.format,
2121                           encoding="iso8859-1")
2122        try:
2123            tar.addfile(t)
2124        finally:
2125            tar.close()
2126
2127        tar = tarfile.open(tmpname, encoding="iso8859-1")
2128        try:
2129            t = tar.getmember("foo")
2130            self.assertEqual(t.uname, "\xe4\xf6\xfc")
2131            self.assertEqual(t.gname, "\xe4\xf6\xfc")
2132
2133            if self.format != tarfile.PAX_FORMAT:
2134                tar.close()
2135                tar = tarfile.open(tmpname, encoding="ascii")
2136                t = tar.getmember("foo")
2137                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
2138                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
2139        finally:
2140            tar.close()
2141
2142
2143class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
2144
2145    format = tarfile.USTAR_FORMAT
2146
2147    # Test whether the utf-8 encoded version of a filename exceeds the 100
2148    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
2149    # bytes).
2150    def test_unicode_name1(self):
2151        self._test_ustar_name("0123456789" * 10)
2152        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
2153        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
2154        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
2155
2156    def test_unicode_name2(self):
2157        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
2158        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
2159
2160    # Test whether the utf-8 encoded version of a filename exceeds the 155
2161    # bytes prefix + '/' + 100 bytes name limit.
2162    def test_unicode_longname1(self):
2163        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
2164        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
2165        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
2166        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
2167
2168    def test_unicode_longname2(self):
2169        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
2170        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
2171
2172    def test_unicode_longname3(self):
2173        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
2174        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
2175        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
2176
2177    def test_unicode_longname4(self):
2178        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
2179        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
2180
2181    def _test_ustar_name(self, name, exc=None):
2182        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2183            t = tarfile.TarInfo(name)
2184            if exc is None:
2185                tar.addfile(t)
2186            else:
2187                self.assertRaises(exc, tar.addfile, t)
2188
2189        if exc is None:
2190            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2191                for t in tar:
2192                    self.assertEqual(name, t.name)
2193                    break
2194
2195    # Test the same as above for the 100 bytes link field.
2196    def test_unicode_link1(self):
2197        self._test_ustar_link("0123456789" * 10)
2198        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
2199        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
2200        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
2201
2202    def test_unicode_link2(self):
2203        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
2204        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
2205
2206    def _test_ustar_link(self, name, exc=None):
2207        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2208            t = tarfile.TarInfo("foo")
2209            t.linkname = name
2210            if exc is None:
2211                tar.addfile(t)
2212            else:
2213                self.assertRaises(exc, tar.addfile, t)
2214
2215        if exc is None:
2216            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2217                for t in tar:
2218                    self.assertEqual(name, t.linkname)
2219                    break
2220
2221
2222class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2223
2224    format = tarfile.GNU_FORMAT
2225
2226    def test_bad_pax_header(self):
2227        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2228        # without a hdrcharset=BINARY header.
2229        for encoding, name in (
2230                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2231                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2232            with tarfile.open(tarname, encoding=encoding,
2233                              errors="surrogateescape") as tar:
2234                try:
2235                    t = tar.getmember(name)
2236                except KeyError:
2237                    self.fail("unable to read bad GNU tar pax header")
2238
2239
2240class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2241
2242    format = tarfile.PAX_FORMAT
2243
2244    # PAX_FORMAT ignores encoding in write mode.
2245    test_unicode_filename_error = None
2246
2247    def test_binary_header(self):
2248        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2249        for encoding, name in (
2250                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2251                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2252            with tarfile.open(tarname, encoding=encoding,
2253                              errors="surrogateescape") as tar:
2254                try:
2255                    t = tar.getmember(name)
2256                except KeyError:
2257                    self.fail("unable to read POSIX.1-2008 binary header")
2258
2259
2260class AppendTestBase:
2261    # Test append mode (cp. patch #1652681).
2262
2263    def setUp(self):
2264        self.tarname = tmpname
2265        if os.path.exists(self.tarname):
2266            os_helper.unlink(self.tarname)
2267
2268    def _create_testtar(self, mode="w:"):
2269        with tarfile.open(tarname, encoding="iso8859-1") as src:
2270            t = src.getmember("ustar/regtype")
2271            t.name = "foo"
2272            with src.extractfile(t) as f:
2273                with tarfile.open(self.tarname, mode) as tar:
2274                    tar.addfile(t, f)
2275
2276    def test_append_compressed(self):
2277        self._create_testtar("w:" + self.suffix)
2278        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2279
2280class AppendTest(AppendTestBase, unittest.TestCase):
2281    test_append_compressed = None
2282
2283    def _add_testfile(self, fileobj=None):
2284        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2285            tar.addfile(tarfile.TarInfo("bar"))
2286
2287    def _test(self, names=["bar"], fileobj=None):
2288        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2289            self.assertEqual(tar.getnames(), names)
2290
2291    def test_non_existing(self):
2292        self._add_testfile()
2293        self._test()
2294
2295    def test_empty(self):
2296        tarfile.open(self.tarname, "w:").close()
2297        self._add_testfile()
2298        self._test()
2299
2300    def test_empty_fileobj(self):
2301        fobj = io.BytesIO(b"\0" * 1024)
2302        self._add_testfile(fobj)
2303        fobj.seek(0)
2304        self._test(fileobj=fobj)
2305
2306    def test_fileobj(self):
2307        self._create_testtar()
2308        with open(self.tarname, "rb") as fobj:
2309            data = fobj.read()
2310        fobj = io.BytesIO(data)
2311        self._add_testfile(fobj)
2312        fobj.seek(0)
2313        self._test(names=["foo", "bar"], fileobj=fobj)
2314
2315    def test_existing(self):
2316        self._create_testtar()
2317        self._add_testfile()
2318        self._test(names=["foo", "bar"])
2319
2320    # Append mode is supposed to fail if the tarfile to append to
2321    # does not end with a zero block.
2322    def _test_error(self, data):
2323        with open(self.tarname, "wb") as fobj:
2324            fobj.write(data)
2325        self.assertRaises(tarfile.ReadError, self._add_testfile)
2326
2327    def test_null(self):
2328        self._test_error(b"")
2329
2330    def test_incomplete(self):
2331        self._test_error(b"\0" * 13)
2332
2333    def test_premature_eof(self):
2334        data = tarfile.TarInfo("foo").tobuf()
2335        self._test_error(data)
2336
2337    def test_trailing_garbage(self):
2338        data = tarfile.TarInfo("foo").tobuf()
2339        self._test_error(data + b"\0" * 13)
2340
2341    def test_invalid(self):
2342        self._test_error(b"a" * 512)
2343
2344class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2345    pass
2346
2347class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2348    pass
2349
2350class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2351    pass
2352
2353
2354class LimitsTest(unittest.TestCase):
2355
2356    def test_ustar_limits(self):
2357        # 100 char name
2358        tarinfo = tarfile.TarInfo("0123456789" * 10)
2359        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2360
2361        # 101 char name that cannot be stored
2362        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2363        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2364
2365        # 256 char name with a slash at pos 156
2366        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2367        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2368
2369        # 256 char name that cannot be stored
2370        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2371        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2372
2373        # 512 char name
2374        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2375        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2376
2377        # 512 char linkname
2378        tarinfo = tarfile.TarInfo("longlink")
2379        tarinfo.linkname = "123/" * 126 + "longname"
2380        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2381
2382        # uid > 8 digits
2383        tarinfo = tarfile.TarInfo("name")
2384        tarinfo.uid = 0o10000000
2385        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2386
2387    def test_gnu_limits(self):
2388        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2389        tarinfo.tobuf(tarfile.GNU_FORMAT)
2390
2391        tarinfo = tarfile.TarInfo("longlink")
2392        tarinfo.linkname = "123/" * 126 + "longname"
2393        tarinfo.tobuf(tarfile.GNU_FORMAT)
2394
2395        # uid >= 256 ** 7
2396        tarinfo = tarfile.TarInfo("name")
2397        tarinfo.uid = 0o4000000000000000000
2398        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2399
2400    def test_pax_limits(self):
2401        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2402        tarinfo.tobuf(tarfile.PAX_FORMAT)
2403
2404        tarinfo = tarfile.TarInfo("longlink")
2405        tarinfo.linkname = "123/" * 126 + "longname"
2406        tarinfo.tobuf(tarfile.PAX_FORMAT)
2407
2408        tarinfo = tarfile.TarInfo("name")
2409        tarinfo.uid = 0o4000000000000000000
2410        tarinfo.tobuf(tarfile.PAX_FORMAT)
2411
2412
2413class MiscTest(unittest.TestCase):
2414
2415    def test_char_fields(self):
2416        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2417                         b"foo\0\0\0\0\0")
2418        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2419                         b"foo")
2420        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2421                         "foo")
2422        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2423                         "foo")
2424
2425    def test_read_number_fields(self):
2426        # Issue 13158: Test if GNU tar specific base-256 number fields
2427        # are decoded correctly.
2428        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2429        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2430        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2431                         0o10000000)
2432        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2433                         0xffffffff)
2434        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2435                         -1)
2436        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2437                         -100)
2438        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2439                         -0x100000000000000)
2440
2441        # Issue 24514: Test if empty number fields are converted to zero.
2442        self.assertEqual(tarfile.nti(b"\0"), 0)
2443        self.assertEqual(tarfile.nti(b"       \0"), 0)
2444
2445    def test_write_number_fields(self):
2446        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2447        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2448        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2449                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2450        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2451                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2452        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2453                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2454        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2455                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2456        self.assertEqual(tarfile.itn(-0x100000000000000,
2457                                     format=tarfile.GNU_FORMAT),
2458                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2459
2460        # Issue 32713: Test if itn() supports float values outside the
2461        # non-GNU format range
2462        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2463                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2464        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2465                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2466        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2467
2468    def test_number_field_limits(self):
2469        with self.assertRaises(ValueError):
2470            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2471        with self.assertRaises(ValueError):
2472            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2473        with self.assertRaises(ValueError):
2474            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2475        with self.assertRaises(ValueError):
2476            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2477
2478    def test__all__(self):
2479        not_exported = {
2480            'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE',
2481            'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME',
2482            'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2483            'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE',
2484            'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE',
2485            'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES',
2486            'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS',
2487            'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums',
2488            'copyfileobj', 'filemode', 'EmptyHeaderError',
2489            'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError',
2490            'SubsequentHeaderError', 'ExFileObject', 'main',
2491            "fully_trusted_filter", "data_filter",
2492            "tar_filter", "FilterError", "AbsoluteLinkError",
2493            "OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
2494            "LinkOutsideDestinationError", "LinkFallbackError",
2495            }
2496        support.check__all__(self, tarfile, not_exported=not_exported)
2497
2498    def test_useful_error_message_when_modules_missing(self):
2499        fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz')
2500        with self.assertRaises(tarfile.ReadError) as excinfo:
2501            error = tarfile.CompressionError('lzma module is not available'),
2502            with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error):
2503                tarfile.open(fname)
2504
2505        self.assertIn(
2506            "\n- method xz: CompressionError('lzma module is not available')\n",
2507            str(excinfo.exception),
2508        )
2509
2510    @unittest.skipUnless(os_helper.can_symlink(), 'requires symlink support')
2511    @unittest.skipUnless(hasattr(os, 'chmod'), "missing os.chmod")
2512    @unittest.mock.patch('os.chmod')
2513    def test_deferred_directory_attributes_update(self, mock_chmod):
2514        # Regression test for gh-127987: setting attributes on arbitrary files
2515        tempdir = os.path.join(TEMPDIR, 'test127987')
2516        def mock_chmod_side_effect(path, mode, **kwargs):
2517            target_path = os.path.realpath(path)
2518            if os.path.commonpath([target_path, tempdir]) != tempdir:
2519                raise Exception("should not try to chmod anything outside the destination", target_path)
2520        mock_chmod.side_effect = mock_chmod_side_effect
2521
2522        outside_tree_dir = os.path.join(TEMPDIR, 'outside_tree_dir')
2523        with ArchiveMaker() as arc:
2524            arc.add('x', symlink_to='.')
2525            arc.add('x', type=tarfile.DIRTYPE, mode='?rwsrwsrwt')
2526            arc.add('x', symlink_to=outside_tree_dir)
2527
2528        os.makedirs(outside_tree_dir)
2529        try:
2530            arc.open().extractall(path=tempdir, filter='tar')
2531        finally:
2532            os_helper.rmtree(outside_tree_dir)
2533            os_helper.rmtree(tempdir)
2534
2535
2536class CommandLineTest(unittest.TestCase):
2537
2538    def tarfilecmd(self, *args, **kwargs):
2539        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2540                                                      **kwargs)
2541        return out.replace(os.linesep.encode(), b'\n')
2542
2543    def tarfilecmd_failure(self, *args):
2544        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2545
2546    def make_simple_tarfile(self, tar_name):
2547        files = [support.findfile('tokenize_tests.txt'),
2548                 support.findfile('tokenize_tests-no-coding-cookie-'
2549                                  'and-utf8-bom-sig-only.txt')]
2550        self.addCleanup(os_helper.unlink, tar_name)
2551        with tarfile.open(tar_name, 'w') as tf:
2552            for tardata in files:
2553                tf.add(tardata, arcname=os.path.basename(tardata))
2554
2555    def make_evil_tarfile(self, tar_name):
2556        files = [support.findfile('tokenize_tests.txt')]
2557        self.addCleanup(os_helper.unlink, tar_name)
2558        with tarfile.open(tar_name, 'w') as tf:
2559            benign = tarfile.TarInfo('benign')
2560            tf.addfile(benign, fileobj=io.BytesIO(b''))
2561            evil = tarfile.TarInfo('../evil')
2562            tf.addfile(evil, fileobj=io.BytesIO(b''))
2563
2564    def test_bad_use(self):
2565        rc, out, err = self.tarfilecmd_failure()
2566        self.assertEqual(out, b'')
2567        self.assertIn(b'usage', err.lower())
2568        self.assertIn(b'error', err.lower())
2569        self.assertIn(b'required', err.lower())
2570        rc, out, err = self.tarfilecmd_failure('-l', '')
2571        self.assertEqual(out, b'')
2572        self.assertNotEqual(err.strip(), b'')
2573
2574    def test_test_command(self):
2575        for tar_name in testtarnames:
2576            for opt in '-t', '--test':
2577                out = self.tarfilecmd(opt, tar_name)
2578                self.assertEqual(out, b'')
2579
2580    def test_test_command_verbose(self):
2581        for tar_name in testtarnames:
2582            for opt in '-v', '--verbose':
2583                out = self.tarfilecmd(opt, '-t', tar_name,
2584                                      PYTHONIOENCODING='utf-8')
2585                self.assertIn(b'is a tar archive.\n', out)
2586
2587    def test_test_command_invalid_file(self):
2588        zipname = support.findfile('zipdir.zip')
2589        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2590        self.assertIn(b' is not a tar archive.', err)
2591        self.assertEqual(out, b'')
2592        self.assertEqual(rc, 1)
2593
2594        for tar_name in testtarnames:
2595            with self.subTest(tar_name=tar_name):
2596                with open(tar_name, 'rb') as f:
2597                    data = f.read()
2598                try:
2599                    with open(tmpname, 'wb') as f:
2600                        f.write(data[:511])
2601                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2602                    self.assertEqual(out, b'')
2603                    self.assertEqual(rc, 1)
2604                finally:
2605                    os_helper.unlink(tmpname)
2606
2607    def test_list_command(self):
2608        for tar_name in testtarnames:
2609            with support.captured_stdout() as t:
2610                with tarfile.open(tar_name, 'r') as tf:
2611                    tf.list(verbose=False)
2612            expected = t.getvalue().encode('ascii', 'backslashreplace')
2613            for opt in '-l', '--list':
2614                out = self.tarfilecmd(opt, tar_name,
2615                                      PYTHONIOENCODING='ascii')
2616                self.assertEqual(out, expected)
2617
2618    def test_list_command_verbose(self):
2619        for tar_name in testtarnames:
2620            with support.captured_stdout() as t:
2621                with tarfile.open(tar_name, 'r') as tf:
2622                    tf.list(verbose=True)
2623            expected = t.getvalue().encode('ascii', 'backslashreplace')
2624            for opt in '-v', '--verbose':
2625                out = self.tarfilecmd(opt, '-l', tar_name,
2626                                      PYTHONIOENCODING='ascii')
2627                self.assertEqual(out, expected)
2628
2629    def test_list_command_invalid_file(self):
2630        zipname = support.findfile('zipdir.zip')
2631        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2632        self.assertIn(b' is not a tar archive.', err)
2633        self.assertEqual(out, b'')
2634        self.assertEqual(rc, 1)
2635
2636    def test_create_command(self):
2637        files = [support.findfile('tokenize_tests.txt'),
2638                 support.findfile('tokenize_tests-no-coding-cookie-'
2639                                  'and-utf8-bom-sig-only.txt')]
2640        for opt in '-c', '--create':
2641            try:
2642                out = self.tarfilecmd(opt, tmpname, *files)
2643                self.assertEqual(out, b'')
2644                with tarfile.open(tmpname) as tar:
2645                    tar.getmembers()
2646            finally:
2647                os_helper.unlink(tmpname)
2648
2649    def test_create_command_verbose(self):
2650        files = [support.findfile('tokenize_tests.txt'),
2651                 support.findfile('tokenize_tests-no-coding-cookie-'
2652                                  'and-utf8-bom-sig-only.txt')]
2653        for opt in '-v', '--verbose':
2654            try:
2655                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2656                                      PYTHONIOENCODING='utf-8')
2657                self.assertIn(b' file created.', out)
2658                with tarfile.open(tmpname) as tar:
2659                    tar.getmembers()
2660            finally:
2661                os_helper.unlink(tmpname)
2662
2663    def test_create_command_dotless_filename(self):
2664        files = [support.findfile('tokenize_tests.txt')]
2665        try:
2666            out = self.tarfilecmd('-c', dotlessname, *files)
2667            self.assertEqual(out, b'')
2668            with tarfile.open(dotlessname) as tar:
2669                tar.getmembers()
2670        finally:
2671            os_helper.unlink(dotlessname)
2672
2673    def test_create_command_dot_started_filename(self):
2674        tar_name = os.path.join(TEMPDIR, ".testtar")
2675        files = [support.findfile('tokenize_tests.txt')]
2676        try:
2677            out = self.tarfilecmd('-c', tar_name, *files)
2678            self.assertEqual(out, b'')
2679            with tarfile.open(tar_name) as tar:
2680                tar.getmembers()
2681        finally:
2682            os_helper.unlink(tar_name)
2683
2684    def test_create_command_compressed(self):
2685        files = [support.findfile('tokenize_tests.txt'),
2686                 support.findfile('tokenize_tests-no-coding-cookie-'
2687                                  'and-utf8-bom-sig-only.txt')]
2688        for filetype in (GzipTest, Bz2Test, LzmaTest):
2689            if not filetype.open:
2690                continue
2691            try:
2692                tar_name = tmpname + '.' + filetype.suffix
2693                out = self.tarfilecmd('-c', tar_name, *files)
2694                with filetype.taropen(tar_name) as tar:
2695                    tar.getmembers()
2696            finally:
2697                os_helper.unlink(tar_name)
2698
2699    def test_extract_command(self):
2700        self.make_simple_tarfile(tmpname)
2701        for opt in '-e', '--extract':
2702            try:
2703                with os_helper.temp_cwd(tarextdir):
2704                    out = self.tarfilecmd(opt, tmpname)
2705                self.assertEqual(out, b'')
2706            finally:
2707                os_helper.rmtree(tarextdir)
2708
2709    def test_extract_command_verbose(self):
2710        self.make_simple_tarfile(tmpname)
2711        for opt in '-v', '--verbose':
2712            try:
2713                with os_helper.temp_cwd(tarextdir):
2714                    out = self.tarfilecmd(opt, '-e', tmpname,
2715                                          PYTHONIOENCODING='utf-8')
2716                self.assertIn(b' file is extracted.', out)
2717            finally:
2718                os_helper.rmtree(tarextdir)
2719
2720    def test_extract_command_filter(self):
2721        self.make_evil_tarfile(tmpname)
2722        # Make an inner directory, so the member named '../evil'
2723        # is still extracted into `tarextdir`
2724        destdir = os.path.join(tarextdir, 'dest')
2725        os.mkdir(tarextdir)
2726        try:
2727            with os_helper.temp_cwd(destdir):
2728                self.tarfilecmd_failure('-e', tmpname,
2729                                        '-v',
2730                                        '--filter', 'data')
2731                out = self.tarfilecmd('-e', tmpname,
2732                                      '-v',
2733                                      '--filter', 'fully_trusted',
2734                                      PYTHONIOENCODING='utf-8')
2735                self.assertIn(b' file is extracted.', out)
2736        finally:
2737            os_helper.rmtree(tarextdir)
2738
2739    def test_extract_command_different_directory(self):
2740        self.make_simple_tarfile(tmpname)
2741        try:
2742            with os_helper.temp_cwd(tarextdir):
2743                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2744            self.assertEqual(out, b'')
2745        finally:
2746            os_helper.rmtree(tarextdir)
2747
2748    def test_extract_command_invalid_file(self):
2749        zipname = support.findfile('zipdir.zip')
2750        with os_helper.temp_cwd(tarextdir):
2751            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2752        self.assertIn(b' is not a tar archive.', err)
2753        self.assertEqual(out, b'')
2754        self.assertEqual(rc, 1)
2755
2756
2757class ContextManagerTest(unittest.TestCase):
2758
2759    def test_basic(self):
2760        with tarfile.open(tarname) as tar:
2761            self.assertFalse(tar.closed, "closed inside runtime context")
2762        self.assertTrue(tar.closed, "context manager failed")
2763
2764    def test_closed(self):
2765        # The __enter__() method is supposed to raise OSError
2766        # if the TarFile object is already closed.
2767        tar = tarfile.open(tarname)
2768        tar.close()
2769        with self.assertRaises(OSError):
2770            with tar:
2771                pass
2772
2773    def test_exception(self):
2774        # Test if the OSError exception is passed through properly.
2775        with self.assertRaises(Exception) as exc:
2776            with tarfile.open(tarname) as tar:
2777                raise OSError
2778        self.assertIsInstance(exc.exception, OSError,
2779                              "wrong exception raised in context manager")
2780        self.assertTrue(tar.closed, "context manager failed")
2781
2782    def test_no_eof(self):
2783        # __exit__() must not write end-of-archive blocks if an
2784        # exception was raised.
2785        try:
2786            with tarfile.open(tmpname, "w") as tar:
2787                raise Exception
2788        except:
2789            pass
2790        self.assertEqual(os.path.getsize(tmpname), 0,
2791                "context manager wrote an end-of-archive block")
2792        self.assertTrue(tar.closed, "context manager failed")
2793
2794    def test_eof(self):
2795        # __exit__() must write end-of-archive blocks, i.e. call
2796        # TarFile.close() if there was no error.
2797        with tarfile.open(tmpname, "w"):
2798            pass
2799        self.assertNotEqual(os.path.getsize(tmpname), 0,
2800                "context manager wrote no end-of-archive block")
2801
2802    def test_fileobj(self):
2803        # Test that __exit__() did not close the external file
2804        # object.
2805        with open(tmpname, "wb") as fobj:
2806            try:
2807                with tarfile.open(fileobj=fobj, mode="w") as tar:
2808                    raise Exception
2809            except:
2810                pass
2811            self.assertFalse(fobj.closed, "external file object was closed")
2812            self.assertTrue(tar.closed, "context manager failed")
2813
2814
2815@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2816class LinkEmulationTest(ReadTest, unittest.TestCase):
2817
2818    # Test for issue #8741 regression. On platforms that do not support
2819    # symbolic or hard links tarfile tries to extract these types of members
2820    # as the regular files they point to.
2821    def _test_link_extraction(self, name):
2822        self.tar.extract(name, TEMPDIR, filter='fully_trusted')
2823        with open(os.path.join(TEMPDIR, name), "rb") as f:
2824            data = f.read()
2825        self.assertEqual(sha256sum(data), sha256_regtype)
2826
2827    # See issues #1578269, #8879, and #17689 for some history on these skips
2828    @unittest.skipIf(hasattr(os.path, "islink"),
2829                     "Skip emulation - has os.path.islink but not os.link")
2830    def test_hardlink_extraction1(self):
2831        self._test_link_extraction("ustar/lnktype")
2832
2833    @unittest.skipIf(hasattr(os.path, "islink"),
2834                     "Skip emulation - has os.path.islink but not os.link")
2835    def test_hardlink_extraction2(self):
2836        self._test_link_extraction("./ustar/linktest2/lnktype")
2837
2838    @unittest.skipIf(hasattr(os, "symlink"),
2839                     "Skip emulation if symlink exists")
2840    def test_symlink_extraction1(self):
2841        self._test_link_extraction("ustar/symtype")
2842
2843    @unittest.skipIf(hasattr(os, "symlink"),
2844                     "Skip emulation if symlink exists")
2845    def test_symlink_extraction2(self):
2846        self._test_link_extraction("./ustar/linktest2/symtype")
2847
2848
2849class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2850    # Issue5068: The _BZ2Proxy.read() method loops forever
2851    # on an empty or partial bzipped file.
2852
2853    def _test_partial_input(self, mode):
2854        class MyBytesIO(io.BytesIO):
2855            hit_eof = False
2856            def read(self, n):
2857                if self.hit_eof:
2858                    raise AssertionError("infinite loop detected in "
2859                                         "tarfile.open()")
2860                self.hit_eof = self.tell() == len(self.getvalue())
2861                return super(MyBytesIO, self).read(n)
2862            def seek(self, *args):
2863                self.hit_eof = False
2864                return super(MyBytesIO, self).seek(*args)
2865
2866        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2867        for x in range(len(data) + 1):
2868            try:
2869                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2870            except tarfile.ReadError:
2871                pass # we have no interest in ReadErrors
2872
2873    def test_partial_input(self):
2874        self._test_partial_input("r")
2875
2876    def test_partial_input_bz2(self):
2877        self._test_partial_input("r:bz2")
2878
2879
2880def root_is_uid_gid_0():
2881    try:
2882        import pwd, grp
2883    except ImportError:
2884        return False
2885    if pwd.getpwuid(0)[0] != 'root':
2886        return False
2887    if grp.getgrgid(0)[0] != 'root':
2888        return False
2889    return True
2890
2891
2892@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2893@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2894class NumericOwnerTest(unittest.TestCase):
2895    # mock the following:
2896    #  os.chown: so we can test what's being called
2897    #  os.chmod: so the modes are not actually changed. if they are, we can't
2898    #             delete the files/directories
2899    #  os.geteuid: so we can lie and say we're root (uid = 0)
2900
2901    @staticmethod
2902    def _make_test_archive(filename_1, dirname_1, filename_2):
2903        # the file contents to write
2904        fobj = io.BytesIO(b"content")
2905
2906        # create a tar file with a file, a directory, and a file within that
2907        #  directory. Assign various .uid/.gid values to them
2908        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2909                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2910                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2911                 ]
2912        with tarfile.open(tmpname, 'w') as tarfl:
2913            for name, uid, gid, typ, contents in items:
2914                t = tarfile.TarInfo(name)
2915                t.uid = uid
2916                t.gid = gid
2917                t.uname = 'root'
2918                t.gname = 'root'
2919                t.type = typ
2920                tarfl.addfile(t, contents)
2921
2922        # return the full pathname to the tar file
2923        return tmpname
2924
2925    @staticmethod
2926    @contextmanager
2927    def _setup_test(mock_geteuid):
2928        mock_geteuid.return_value = 0  # lie and say we're root
2929        fname = 'numeric-owner-testfile'
2930        dirname = 'dir'
2931
2932        # the names we want stored in the tarfile
2933        filename_1 = fname
2934        dirname_1 = dirname
2935        filename_2 = os.path.join(dirname, fname)
2936
2937        # create the tarfile with the contents we're after
2938        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2939                                                           dirname_1,
2940                                                           filename_2)
2941
2942        # open the tarfile for reading. yield it and the names of the items
2943        #  we stored into the file
2944        with tarfile.open(tar_filename) as tarfl:
2945            yield tarfl, filename_1, dirname_1, filename_2
2946
2947    @unittest.mock.patch('os.chown')
2948    @unittest.mock.patch('os.chmod')
2949    @unittest.mock.patch('os.geteuid')
2950    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2951                                        mock_chown):
2952        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2953                                                filename_2):
2954            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True,
2955                          filter='fully_trusted')
2956            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True,
2957                          filter='fully_trusted')
2958
2959        # convert to filesystem paths
2960        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2961        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2962
2963        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2964                                     unittest.mock.call(f_filename_2, 88, 87),
2965                                     ],
2966                                    any_order=True)
2967
2968    @unittest.mock.patch('os.chown')
2969    @unittest.mock.patch('os.chmod')
2970    @unittest.mock.patch('os.geteuid')
2971    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2972                                           mock_chown):
2973        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2974                                                filename_2):
2975            tarfl.extractall(TEMPDIR, numeric_owner=True,
2976                             filter='fully_trusted')
2977
2978        # convert to filesystem paths
2979        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2980        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2981        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2982
2983        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2984                                     unittest.mock.call(f_dirname_1, 77, 76),
2985                                     unittest.mock.call(f_filename_2, 88, 87),
2986                                     ],
2987                                    any_order=True)
2988
2989    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2990    #  because the uname and gname in the test file are 'root', and extract()
2991    #  will look them up using pwd and grp to find their uid and gid, which we
2992    #  test here to be 0.
2993    @unittest.skipUnless(root_is_uid_gid_0(),
2994                         'uid=0,gid=0 must be named "root"')
2995    @unittest.mock.patch('os.chown')
2996    @unittest.mock.patch('os.chmod')
2997    @unittest.mock.patch('os.geteuid')
2998    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2999                                           mock_chown):
3000        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
3001            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False,
3002                          filter='fully_trusted')
3003
3004        # convert to filesystem paths
3005        f_filename_1 = os.path.join(TEMPDIR, filename_1)
3006
3007        mock_chown.assert_called_with(f_filename_1, 0, 0)
3008
3009    @unittest.mock.patch('os.geteuid')
3010    def test_keyword_only(self, mock_geteuid):
3011        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
3012            self.assertRaises(TypeError,
3013                              tarfl.extract, filename_1, TEMPDIR, False, True)
3014
3015
3016class ReplaceTests(ReadTest, unittest.TestCase):
3017    def test_replace_name(self):
3018        member = self.tar.getmember('ustar/regtype')
3019        replaced = member.replace(name='misc/other')
3020        self.assertEqual(replaced.name, 'misc/other')
3021        self.assertEqual(member.name, 'ustar/regtype')
3022        self.assertEqual(self.tar.getmember('ustar/regtype').name,
3023                         'ustar/regtype')
3024
3025    def test_replace_deep(self):
3026        member = self.tar.getmember('pax/regtype1')
3027        replaced = member.replace()
3028        replaced.pax_headers['gname'] = 'not-bar'
3029        self.assertEqual(member.pax_headers['gname'], 'bar')
3030        self.assertEqual(
3031            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar')
3032
3033    def test_replace_shallow(self):
3034        member = self.tar.getmember('pax/regtype1')
3035        replaced = member.replace(deep=False)
3036        replaced.pax_headers['gname'] = 'not-bar'
3037        self.assertEqual(member.pax_headers['gname'], 'not-bar')
3038        self.assertEqual(
3039            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar')
3040
3041    def test_replace_all(self):
3042        member = self.tar.getmember('ustar/regtype')
3043        for attr_name in ('name', 'mtime', 'mode', 'linkname',
3044                          'uid', 'gid', 'uname', 'gname'):
3045            with self.subTest(attr_name=attr_name):
3046                replaced = member.replace(**{attr_name: None})
3047                self.assertEqual(getattr(replaced, attr_name), None)
3048                self.assertNotEqual(getattr(member, attr_name), None)
3049
3050    def test_replace_internal(self):
3051        member = self.tar.getmember('ustar/regtype')
3052        with self.assertRaises(TypeError):
3053            member.replace(offset=123456789)
3054
3055
3056class NoneInfoExtractTests(ReadTest):
3057    # These mainly check that all kinds of members are extracted successfully
3058    # if some metadata is None.
3059    # Some of the methods do additional spot checks.
3060
3061    # We also test that the default filters can deal with None.
3062
3063    extraction_filter = None
3064
3065    @classmethod
3066    def setUpClass(cls):
3067        tar = tarfile.open(tarname, mode='r', encoding="iso8859-1")
3068        cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl"
3069        tar.errorlevel = 0
3070        tar.extractall(cls.control_dir, filter=cls.extraction_filter)
3071        tar.close()
3072        cls.control_paths = set(
3073            p.relative_to(cls.control_dir)
3074            for p in pathlib.Path(cls.control_dir).glob('**/*'))
3075
3076    @classmethod
3077    def tearDownClass(cls):
3078        shutil.rmtree(cls.control_dir)
3079
3080    def check_files_present(self, directory):
3081        got_paths = set(
3082            p.relative_to(directory)
3083            for p in pathlib.Path(directory).glob('**/*'))
3084        if self.extraction_filter == 'data':
3085            # The 'data' filter is expected to reject special files
3086            for path in 'ustar/fifotype', 'ustar/blktype', 'ustar/chrtype':
3087                got_paths.discard(pathlib.Path(path))
3088        self.assertEqual(self.control_paths, got_paths)
3089
3090    @contextmanager
3091    def extract_with_none(self, *attr_names):
3092        DIR = pathlib.Path(TEMPDIR) / "extractall_none"
3093        self.tar.errorlevel = 0
3094        for member in self.tar.getmembers():
3095            for attr_name in attr_names:
3096                setattr(member, attr_name, None)
3097        with os_helper.temp_dir(DIR):
3098            self.tar.extractall(DIR, filter='fully_trusted')
3099            self.check_files_present(DIR)
3100            yield DIR
3101
3102    def test_extractall_none_mtime(self):
3103        # mtimes of extracted files should be later than 'now' -- the mtime
3104        # of a previously created directory.
3105        now = pathlib.Path(TEMPDIR).stat().st_mtime
3106        with self.extract_with_none('mtime') as DIR:
3107            for path in pathlib.Path(DIR).glob('**/*'):
3108                with self.subTest(path=path):
3109                    try:
3110                        mtime = path.stat().st_mtime
3111                    except OSError:
3112                        # Some systems can't stat symlinks, ignore those
3113                        if not path.is_symlink():
3114                            raise
3115                    else:
3116                        self.assertGreaterEqual(path.stat().st_mtime, now)
3117
3118    def test_extractall_none_mode(self):
3119        # modes of directories and regular files should match the mode
3120        # of a "normally" created directory or regular file
3121        dir_mode = pathlib.Path(TEMPDIR).stat().st_mode
3122        regular_file = pathlib.Path(TEMPDIR) / 'regular_file'
3123        regular_file.write_text('')
3124        regular_file_mode = regular_file.stat().st_mode
3125        with self.extract_with_none('mode') as DIR:
3126            for path in pathlib.Path(DIR).glob('**/*'):
3127                with self.subTest(path=path):
3128                    if path.is_dir():
3129                        self.assertEqual(path.stat().st_mode, dir_mode)
3130                    elif path.is_file():
3131                        self.assertEqual(path.stat().st_mode,
3132                                         regular_file_mode)
3133
3134    def test_extractall_none_uid(self):
3135        with self.extract_with_none('uid'):
3136            pass
3137
3138    def test_extractall_none_gid(self):
3139        with self.extract_with_none('gid'):
3140            pass
3141
3142    def test_extractall_none_uname(self):
3143        with self.extract_with_none('uname'):
3144            pass
3145
3146    def test_extractall_none_gname(self):
3147        with self.extract_with_none('gname'):
3148            pass
3149
3150    def test_extractall_none_ownership(self):
3151        with self.extract_with_none('uid', 'gid', 'uname', 'gname'):
3152            pass
3153
3154class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase):
3155    extraction_filter = 'data'
3156
3157class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests,
3158                                        unittest.TestCase):
3159    extraction_filter = 'fully_trusted'
3160
3161class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase):
3162    extraction_filter = 'tar'
3163
3164class NoneInfoExtractTests_Default(NoneInfoExtractTests,
3165                                   unittest.TestCase):
3166    extraction_filter = None
3167
3168class NoneInfoTests_Misc(unittest.TestCase):
3169    def test_add(self):
3170        # When addfile() encounters None metadata, it raises a ValueError
3171        bio = io.BytesIO()
3172        for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT,
3173                          tarfile.PAX_FORMAT):
3174            with self.subTest(tarformat=tarformat):
3175                tar = tarfile.open(fileobj=bio, mode='w', format=tarformat)
3176                tarinfo = tar.gettarinfo(tarname)
3177                try:
3178                    tar.addfile(tarinfo)
3179                except Exception:
3180                    if tarformat == tarfile.USTAR_FORMAT:
3181                        # In the old, limited format, adding might fail for
3182                        # reasons like the UID being too large
3183                        pass
3184                    else:
3185                        raise
3186                else:
3187                    for attr_name in ('mtime', 'mode', 'uid', 'gid',
3188                                    'uname', 'gname'):
3189                        with self.subTest(attr_name=attr_name):
3190                            replaced = tarinfo.replace(**{attr_name: None})
3191                            with self.assertRaisesRegex(ValueError,
3192                                                        f"{attr_name}"):
3193                                tar.addfile(replaced)
3194
3195    def test_list(self):
3196        # Change some metadata to None, then compare list() output
3197        # word-for-word. We want list() to not raise, and to only change
3198        # printout for the affected piece of metadata.
3199        # (n.b.: some contents of the test archive are hardcoded.)
3200        for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'},
3201                           {'uname'}, {'gname'},
3202                           {'uid', 'uname'}, {'gid', 'gname'}):
3203            with (self.subTest(attr_names=attr_names),
3204                  tarfile.open(tarname, encoding="iso8859-1") as tar):
3205                tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3206                with support.swap_attr(sys, 'stdout', tio_prev):
3207                    tar.list()
3208                for member in tar.getmembers():
3209                    for attr_name in attr_names:
3210                        setattr(member, attr_name, None)
3211                tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3212                with support.swap_attr(sys, 'stdout', tio_new):
3213                    tar.list()
3214                for expected, got in zip(tio_prev.detach().getvalue().split(),
3215                                         tio_new.detach().getvalue().split()):
3216                    if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected):
3217                        self.assertEqual(got, b'????-??-??')
3218                    elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected):
3219                        self.assertEqual(got, b'??:??:??')
3220                    elif attr_names == {'mode'} and re.match(
3221                            rb'.([r-][w-][x-]){3}', expected):
3222                        self.assertEqual(got, b'??????????')
3223                    elif attr_names == {'uname'} and expected.startswith(
3224                            (b'tarfile/', b'lars/', b'foo/')):
3225                        exp_user, exp_group = expected.split(b'/')
3226                        got_user, got_group = got.split(b'/')
3227                        self.assertEqual(got_group, exp_group)
3228                        self.assertRegex(got_user, b'[0-9]+')
3229                    elif attr_names == {'gname'} and expected.endswith(
3230                            (b'/tarfile', b'/users', b'/bar')):
3231                        exp_user, exp_group = expected.split(b'/')
3232                        got_user, got_group = got.split(b'/')
3233                        self.assertEqual(got_user, exp_user)
3234                        self.assertRegex(got_group, b'[0-9]+')
3235                    elif attr_names == {'uid'} and expected.startswith(
3236                            (b'1000/')):
3237                        exp_user, exp_group = expected.split(b'/')
3238                        got_user, got_group = got.split(b'/')
3239                        self.assertEqual(got_group, exp_group)
3240                        self.assertEqual(got_user, b'None')
3241                    elif attr_names == {'gid'} and expected.endswith((b'/100')):
3242                        exp_user, exp_group = expected.split(b'/')
3243                        got_user, got_group = got.split(b'/')
3244                        self.assertEqual(got_user, exp_user)
3245                        self.assertEqual(got_group, b'None')
3246                    elif attr_names == {'uid', 'uname'} and expected.startswith(
3247                            (b'tarfile/', b'lars/', b'foo/', b'1000/')):
3248                        exp_user, exp_group = expected.split(b'/')
3249                        got_user, got_group = got.split(b'/')
3250                        self.assertEqual(got_group, exp_group)
3251                        self.assertEqual(got_user, b'None')
3252                    elif attr_names == {'gname', 'gid'} and expected.endswith(
3253                            (b'/tarfile', b'/users', b'/bar', b'/100')):
3254                        exp_user, exp_group = expected.split(b'/')
3255                        got_user, got_group = got.split(b'/')
3256                        self.assertEqual(got_user, exp_user)
3257                        self.assertEqual(got_group, b'None')
3258                    else:
3259                        # In other cases the output should be the same
3260                        self.assertEqual(expected, got)
3261
3262def _filemode_to_int(mode):
3263    """Inverse of `stat.filemode` (for permission bits)
3264
3265    Using mode strings rather than numbers makes the later tests more readable.
3266    """
3267    str_mode = mode[1:]
3268    result = (
3269          {'r': stat.S_IRUSR, '-': 0}[str_mode[0]]
3270        | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]]
3271        | {'x': stat.S_IXUSR, '-': 0,
3272           's': stat.S_IXUSR | stat.S_ISUID,
3273           'S': stat.S_ISUID}[str_mode[2]]
3274        | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]]
3275        | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]]
3276        | {'x': stat.S_IXGRP, '-': 0,
3277           's': stat.S_IXGRP | stat.S_ISGID,
3278           'S': stat.S_ISGID}[str_mode[5]]
3279        | {'r': stat.S_IROTH, '-': 0}[str_mode[6]]
3280        | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]]
3281        | {'x': stat.S_IXOTH, '-': 0,
3282           't': stat.S_IXOTH | stat.S_ISVTX,
3283           'T': stat.S_ISVTX}[str_mode[8]]
3284        )
3285    # check we did this right
3286    assert stat.filemode(result)[1:] == mode[1:]
3287
3288    return result
3289
3290class ArchiveMaker:
3291    """Helper to create a tar file with specific contents
3292
3293    Usage:
3294
3295        with ArchiveMaker() as t:
3296            t.add('filename', ...)
3297
3298        with t.open() as tar:
3299            ... # `tar` is now a TarFile with 'filename' in it!
3300    """
3301    def __init__(self):
3302        self.bio = io.BytesIO()
3303
3304    def __enter__(self):
3305        self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio)
3306        return self
3307
3308    def __exit__(self, *exc):
3309        self.tar_w.close()
3310        self.contents = self.bio.getvalue()
3311        self.bio = None
3312
3313    def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
3314            mode=None, size=None, content=None, **kwargs):
3315        """Add a member to the test archive. Call within `with`.
3316
3317        Provides many shortcuts:
3318        - default `type` is based on symlink_to, hardlink_to, and trailing `/`
3319          in name (which is stripped)
3320        - size & content defaults are based on each other
3321        - content can be str or bytes
3322        - mode should be textual ('-rwxrwxrwx')
3323
3324        (add more! this is unstable internal test-only API)
3325        """
3326        name = str(name)
3327        tarinfo = tarfile.TarInfo(name).replace(**kwargs)
3328        if content is not None:
3329            if isinstance(content, str):
3330                content = content.encode()
3331            size = len(content)
3332        if size is not None:
3333            tarinfo.size = size
3334            if content is None:
3335                content = bytes(tarinfo.size)
3336        if mode:
3337            tarinfo.mode = _filemode_to_int(mode)
3338        if symlink_to is not None:
3339            type = tarfile.SYMTYPE
3340            tarinfo.linkname = str(symlink_to)
3341        if hardlink_to is not None:
3342            type = tarfile.LNKTYPE
3343            tarinfo.linkname = str(hardlink_to)
3344        if name.endswith('/') and type is None:
3345            type = tarfile.DIRTYPE
3346        if type is not None:
3347            tarinfo.type = type
3348        if tarinfo.isreg():
3349            fileobj = io.BytesIO(content)
3350        else:
3351            fileobj = None
3352        self.tar_w.addfile(tarinfo, fileobj)
3353
3354    def open(self, **kwargs):
3355        """Open the resulting archive as TarFile. Call after `with`."""
3356        bio = io.BytesIO(self.contents)
3357        return tarfile.open(fileobj=bio, **kwargs)
3358
3359# Under WASI, `os_helper.can_symlink` is False to make
3360# `skip_unless_symlink` skip symlink tests. "
3361# But in the following tests we use can_symlink to *determine* which
3362# behavior is expected.
3363# Like other symlink tests, skip these on WASI for now.
3364if support.is_wasi:
3365    def symlink_test(f):
3366        return unittest.skip("WASI: Skip symlink test for now")(f)
3367else:
3368    def symlink_test(f):
3369        return f
3370
3371
3372class TestExtractionFilters(unittest.TestCase):
3373
3374    # A temporary directory for the extraction results.
3375    # All files that "escape" the destination path should still end
3376    # up in this directory.
3377    outerdir = pathlib.Path(TEMPDIR) / 'outerdir'
3378
3379    # The destination for the extraction, within `outerdir`
3380    destdir = outerdir / 'dest'
3381
3382    @contextmanager
3383    def check_context(self, tar, filter, *, check_flag=True):
3384        """Extracts `tar` to `self.destdir` and allows checking the result
3385
3386        If an error occurs, it must be checked using `expect_exception`
3387
3388        Otherwise, all resulting files must be checked using `expect_file`,
3389        except the destination directory itself and parent directories of
3390        other files.
3391        When checking directories, do so before their contents.
3392
3393        A file called 'flag' is made in outerdir (i.e. outside destdir)
3394        before extraction; it should not be altered nor should its contents
3395        be read/copied.
3396        """
3397        with os_helper.temp_dir(self.outerdir):
3398            flag_path = self.outerdir / 'flag'
3399            flag_path.write_text('capture me')
3400            try:
3401                tar.extractall(self.destdir, filter=filter)
3402            except Exception as exc:
3403                self.raised_exception = exc
3404                self.reraise_exception = True
3405                self.expected_paths = set()
3406            else:
3407                self.raised_exception = None
3408                self.reraise_exception = False
3409                self.expected_paths = set(self.outerdir.glob('**/*'))
3410                self.expected_paths.discard(self.destdir)
3411                self.expected_paths.discard(flag_path)
3412            try:
3413                yield self
3414            finally:
3415                tar.close()
3416            if self.reraise_exception:
3417                raise self.raised_exception
3418            self.assertEqual(self.expected_paths, set())
3419            if check_flag:
3420                self.assertEqual(flag_path.read_text(), 'capture me')
3421            else:
3422                assert filter == 'fully_trusted'
3423
3424    def expect_file(self, name, type=None, symlink_to=None, mode=None,
3425                    size=None, content=None):
3426        """Check a single file. See check_context."""
3427        if self.raised_exception:
3428            raise self.raised_exception
3429        # use normpath() rather than resolve() so we don't follow symlinks
3430        path = pathlib.Path(os.path.normpath(self.destdir / name))
3431        self.assertIn(path, self.expected_paths)
3432        self.expected_paths.remove(path)
3433        if mode is not None and os_helper.can_chmod():
3434            got = stat.filemode(stat.S_IMODE(path.stat().st_mode))
3435            self.assertEqual(got, mode)
3436        if type is None and isinstance(name, str) and name.endswith('/'):
3437            type = tarfile.DIRTYPE
3438        if symlink_to is not None:
3439            got = (self.destdir / name).readlink()
3440            expected = pathlib.Path(symlink_to)
3441            # The symlink might be the same (textually) as what we expect,
3442            # but some systems change the link to an equivalent path, so
3443            # we fall back to samefile().
3444            try:
3445                if expected != got:
3446                    self.assertTrue(got.samefile(expected))
3447            except Exception as e:
3448                # attach a note, so it's shown even if `samefile` fails
3449                e.add_note(f'{expected=}, {got=}')
3450                raise
3451        elif type == tarfile.REGTYPE or type is None:
3452            self.assertTrue(path.is_file())
3453        elif type == tarfile.DIRTYPE:
3454            self.assertTrue(path.is_dir())
3455        elif type == tarfile.FIFOTYPE:
3456            self.assertTrue(path.is_fifo())
3457        elif type == tarfile.SYMTYPE:
3458            self.assertTrue(path.is_symlink())
3459        else:
3460            raise NotImplementedError(type)
3461        if size is not None:
3462            self.assertEqual(path.stat().st_size, size)
3463        if content is not None:
3464            self.assertEqual(path.read_text(), content)
3465        for parent in path.parents:
3466            self.expected_paths.discard(parent)
3467
3468    def expect_any_tree(self, name):
3469        """Check a directory; forget about its contents."""
3470        tree_path = (self.destdir / name).resolve()
3471        self.expect_file(tree_path, type=tarfile.DIRTYPE)
3472        self.expected_paths = {
3473            p for p in self.expected_paths
3474            if tree_path not in p.parents
3475        }
3476
3477    def expect_exception(self, exc_type, message_re='.'):
3478        with self.assertRaisesRegex(exc_type, message_re):
3479            if self.raised_exception is not None:
3480                raise self.raised_exception
3481        self.reraise_exception = False
3482        return self.raised_exception
3483
3484    def test_benign_file(self):
3485        with ArchiveMaker() as arc:
3486            arc.add('benign.txt')
3487        for filter in 'fully_trusted', 'tar', 'data':
3488            with self.check_context(arc.open(), filter):
3489                self.expect_file('benign.txt')
3490
3491    def test_absolute(self):
3492        # Test handling a member with an absolute path
3493        # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives
3494        with ArchiveMaker() as arc:
3495            arc.add(self.outerdir / 'escaped.evil')
3496
3497        with self.check_context(arc.open(), 'fully_trusted'):
3498            self.expect_file('../escaped.evil')
3499
3500        for filter in 'tar', 'data':
3501            with self.check_context(arc.open(), filter):
3502                if str(self.outerdir).startswith('/'):
3503                    # We strip leading slashes, as e.g. GNU tar does
3504                    # (without --absolute-filenames).
3505                    outerdir_stripped = str(self.outerdir).lstrip('/')
3506                    self.expect_file(f'{outerdir_stripped}/escaped.evil')
3507                else:
3508                    # On this system, absolute paths don't have leading
3509                    # slashes.
3510                    # So, there's nothing to strip. We refuse to unpack
3511                    # to an absolute path, nonetheless.
3512                    self.expect_exception(
3513                        tarfile.AbsolutePathError,
3514                        """['"].*escaped.evil['"] has an absolute path""")
3515
3516    @symlink_test
3517    def test_parent_symlink(self):
3518        # Test interplaying symlinks
3519        # Inspired by 'dirsymlink2a' in jwilk/traversal-archives
3520        with ArchiveMaker() as arc:
3521            arc.add('current', symlink_to='.')
3522            arc.add('parent', symlink_to='current/..')
3523            arc.add('parent/evil')
3524
3525        if os_helper.can_symlink():
3526            with self.check_context(arc.open(), 'fully_trusted'):
3527                if self.raised_exception is not None:
3528                    # Windows will refuse to create a file that's a symlink to itself
3529                    # (and tarfile doesn't swallow that exception)
3530                    self.expect_exception(FileExistsError)
3531                    # The other cases will fail with this error too.
3532                    # Skip the rest of this test.
3533                    return
3534                else:
3535                    self.expect_file('current', symlink_to='.')
3536                    self.expect_file('parent', symlink_to='current/..')
3537                    self.expect_file('../evil')
3538
3539            with self.check_context(arc.open(), 'tar'):
3540                self.expect_exception(
3541                    tarfile.OutsideDestinationError,
3542                    """'parent/evil' would be extracted to ['"].*evil['"], """
3543                    + "which is outside the destination")
3544
3545            with self.check_context(arc.open(), 'data'):
3546                self.expect_exception(
3547                    tarfile.LinkOutsideDestinationError,
3548                    """'parent' would link to ['"].*outerdir['"], """
3549                    + "which is outside the destination")
3550
3551        else:
3552            # No symlink support. The symlinks are ignored.
3553            with self.check_context(arc.open(), 'fully_trusted'):
3554                self.expect_file('parent/evil')
3555            with self.check_context(arc.open(), 'tar'):
3556                self.expect_file('parent/evil')
3557            with self.check_context(arc.open(), 'data'):
3558                self.expect_file('parent/evil')
3559
3560    @symlink_test
3561    @os_helper.skip_unless_symlink
3562    def test_realpath_limit_attack(self):
3563        # (CVE-2025-4517)
3564
3565        with ArchiveMaker() as arc:
3566            # populate the symlinks and dirs that expand in os.path.realpath()
3567            # The component length is chosen so that in common cases, the unexpanded
3568            # path fits in PATH_MAX, but it overflows when the final symlink
3569            # is expanded
3570            steps = "abcdefghijklmnop"
3571            if sys.platform == 'win32':
3572                component = 'd' * 25
3573            elif 'PC_PATH_MAX' in os.pathconf_names:
3574                max_path_len = os.pathconf(self.outerdir.parent, "PC_PATH_MAX")
3575                path_sep_len = 1
3576                dest_len = len(str(self.destdir)) + path_sep_len
3577                component_len = (max_path_len - dest_len) // (len(steps) + path_sep_len)
3578                component = 'd' * component_len
3579            else:
3580                raise NotImplementedError("Need to guess component length for {sys.platform}")
3581            path = ""
3582            step_path = ""
3583            for i in steps:
3584                arc.add(os.path.join(path, component), type=tarfile.DIRTYPE,
3585                        mode='drwxrwxrwx')
3586                arc.add(os.path.join(path, i), symlink_to=component)
3587                path = os.path.join(path, component)
3588                step_path = os.path.join(step_path, i)
3589            # create the final symlink that exceeds PATH_MAX and simply points
3590            # to the top dir.
3591            # this link will never be expanded by
3592            # os.path.realpath(strict=False), nor anything after it.
3593            linkpath = os.path.join(*steps, "l"*254)
3594            parent_segments = [".."] * len(steps)
3595            arc.add(linkpath, symlink_to=os.path.join(*parent_segments))
3596            # make a symlink outside to keep the tar command happy
3597            arc.add("escape", symlink_to=os.path.join(linkpath, ".."))
3598            # use the symlinks above, that are not checked, to create a hardlink
3599            # to a file outside of the destination path
3600            arc.add("flaglink", hardlink_to=os.path.join("escape", "flag"))
3601            # now that we have the hardlink we can overwrite the file
3602            arc.add("flaglink", content='overwrite')
3603            # we can also create new files as well!
3604            arc.add("escape/newfile", content='new')
3605
3606        with (self.subTest('fully_trusted'),
3607              self.check_context(arc.open(), filter='fully_trusted',
3608                                 check_flag=False)):
3609            if sys.platform == 'win32':
3610                self.expect_exception((FileNotFoundError, FileExistsError))
3611            elif self.raised_exception:
3612                # Cannot symlink/hardlink: tarfile falls back to getmember()
3613                self.expect_exception(KeyError)
3614                # Otherwise, this block should never enter.
3615            else:
3616                self.expect_any_tree(component)
3617                self.expect_file('flaglink', content='overwrite')
3618                self.expect_file('../newfile', content='new')
3619                self.expect_file('escape', type=tarfile.SYMTYPE)
3620                self.expect_file('a', symlink_to=component)
3621
3622        for filter in 'tar', 'data':
3623            with self.subTest(filter), self.check_context(arc.open(), filter=filter):
3624                exc = self.expect_exception((OSError, KeyError))
3625                if isinstance(exc, OSError):
3626                    if sys.platform == 'win32':
3627                        # 3: ERROR_PATH_NOT_FOUND
3628                        # 5: ERROR_ACCESS_DENIED
3629                        # 206: ERROR_FILENAME_EXCED_RANGE
3630                        self.assertIn(exc.winerror, (3, 5, 206))
3631                    else:
3632                        self.assertEqual(exc.errno, errno.ENAMETOOLONG)
3633
3634    @symlink_test
3635    def test_parent_symlink2(self):
3636        # Test interplaying symlinks
3637        # Inspired by 'dirsymlink2b' in jwilk/traversal-archives
3638        with ArchiveMaker() as arc:
3639            arc.add('current', symlink_to='.')
3640            arc.add('current/parent', symlink_to='..')
3641            arc.add('parent/evil')
3642
3643        with self.check_context(arc.open(), 'fully_trusted'):
3644            if os_helper.can_symlink():
3645                self.expect_file('current', symlink_to='.')
3646                self.expect_file('parent', symlink_to='..')
3647                self.expect_file('../evil')
3648            else:
3649                self.expect_file('current/')
3650                self.expect_file('parent/evil')
3651
3652        with self.check_context(arc.open(), 'tar'):
3653            if os_helper.can_symlink():
3654                self.expect_exception(
3655                        tarfile.OutsideDestinationError,
3656                        "'parent/evil' would be extracted to "
3657                        + """['"].*evil['"], which is outside """
3658                        + "the destination")
3659            else:
3660                self.expect_file('current/')
3661                self.expect_file('parent/evil')
3662
3663        with self.check_context(arc.open(), 'data'):
3664            self.expect_exception(
3665                    tarfile.LinkOutsideDestinationError,
3666                    """'current/parent' would link to ['"].*['"], """
3667                    + "which is outside the destination")
3668
3669    @symlink_test
3670    def test_absolute_symlink(self):
3671        # Test symlink to an absolute path
3672        # Inspired by 'dirsymlink' in jwilk/traversal-archives
3673        with ArchiveMaker() as arc:
3674            arc.add('parent', symlink_to=self.outerdir)
3675            arc.add('parent/evil')
3676
3677        with self.check_context(arc.open(), 'fully_trusted'):
3678            if os_helper.can_symlink():
3679                self.expect_file('parent', symlink_to=self.outerdir)
3680                self.expect_file('../evil')
3681            else:
3682                self.expect_file('parent/evil')
3683
3684        with self.check_context(arc.open(), 'tar'):
3685            if os_helper.can_symlink():
3686                self.expect_exception(
3687                        tarfile.OutsideDestinationError,
3688                        "'parent/evil' would be extracted to "
3689                        + """['"].*evil['"], which is outside """
3690                        + "the destination")
3691            else:
3692                self.expect_file('parent/evil')
3693
3694        with self.check_context(arc.open(), 'data'):
3695            self.expect_exception(
3696                tarfile.AbsoluteLinkError,
3697                "'parent' is a symlink to an absolute path")
3698
3699    @symlink_test
3700    def test_sly_relative0(self):
3701        # Inspired by 'relative0' in jwilk/traversal-archives
3702        with ArchiveMaker() as arc:
3703            arc.add('../moo', symlink_to='..//tmp/moo')
3704
3705        try:
3706            with self.check_context(arc.open(), filter='fully_trusted'):
3707                if os_helper.can_symlink():
3708                    if isinstance(self.raised_exception, FileExistsError):
3709                        # XXX TarFile happens to fail creating a parent
3710                        # directory.
3711                        # This might be a bug, but fixing it would hurt
3712                        # security.
3713                        # Note that e.g. GNU `tar` rejects '..' components,
3714                        # so you could argue this is an invalid archive and we
3715                        # just raise an bad type of exception.
3716                        self.expect_exception(FileExistsError)
3717                    else:
3718                        self.expect_file('../moo', symlink_to='..//tmp/moo')
3719                else:
3720                    # The symlink can't be extracted and is ignored
3721                    pass
3722        except FileExistsError:
3723            pass
3724
3725        for filter in 'tar', 'data':
3726            with self.check_context(arc.open(), filter):
3727                self.expect_exception(
3728                        tarfile.OutsideDestinationError,
3729                        "'../moo' would be extracted to "
3730                        + "'.*moo', which is outside "
3731                        + "the destination")
3732
3733    @symlink_test
3734    def test_sly_relative2(self):
3735        # Inspired by 'relative2' in jwilk/traversal-archives
3736        with ArchiveMaker() as arc:
3737            arc.add('tmp/')
3738            arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo')
3739
3740        with self.check_context(arc.open(), 'fully_trusted'):
3741            self.expect_file('tmp', type=tarfile.DIRTYPE)
3742            if os_helper.can_symlink():
3743                self.expect_file('../moo', symlink_to='tmp/../../tmp/moo')
3744
3745        for filter in 'tar', 'data':
3746            with self.check_context(arc.open(), filter):
3747                self.expect_exception(
3748                    tarfile.OutsideDestinationError,
3749                    "'tmp/../../moo' would be extracted to "
3750                    + """['"].*moo['"], which is outside the """
3751                    + "destination")
3752
3753    @symlink_test
3754    def test_deep_symlink(self):
3755        # Test that symlinks and hardlinks inside a directory
3756        # point to the correct file (`target` of size 3).
3757        # If links aren't supported we get a copy of the file.
3758        with ArchiveMaker() as arc:
3759            arc.add('targetdir/target', size=3)
3760            # a hardlink's linkname is relative to the archive
3761            arc.add('linkdir/hardlink', hardlink_to=os.path.join(
3762                'targetdir', 'target'))
3763            # a symlink's  linkname is relative to the link's directory
3764            arc.add('linkdir/symlink', symlink_to=os.path.join(
3765                '..', 'targetdir', 'target'))
3766
3767        for filter in 'tar', 'data', 'fully_trusted':
3768            with self.check_context(arc.open(), filter):
3769                self.expect_file('targetdir/target', size=3)
3770                self.expect_file('linkdir/hardlink', size=3)
3771                if os_helper.can_symlink():
3772                    self.expect_file('linkdir/symlink', size=3,
3773                                     symlink_to='../targetdir/target')
3774                else:
3775                    self.expect_file('linkdir/symlink', size=3)
3776
3777    @symlink_test
3778    def test_chains(self):
3779        # Test chaining of symlinks/hardlinks.
3780        # Symlinks are created before the files they point to.
3781        with ArchiveMaker() as arc:
3782            arc.add('linkdir/symlink', symlink_to='hardlink')
3783            arc.add('symlink2', symlink_to=os.path.join(
3784                'linkdir', 'hardlink2'))
3785            arc.add('targetdir/target', size=3)
3786            arc.add('linkdir/hardlink', hardlink_to=os.path.join('targetdir', 'target'))
3787            arc.add('linkdir/hardlink2', hardlink_to=os.path.join('linkdir', 'symlink'))
3788
3789        for filter in 'tar', 'data', 'fully_trusted':
3790            with self.check_context(arc.open(), filter):
3791                self.expect_file('targetdir/target', size=3)
3792                self.expect_file('linkdir/hardlink', size=3)
3793                self.expect_file('linkdir/hardlink2', size=3)
3794                if os_helper.can_symlink():
3795                    self.expect_file('linkdir/symlink', size=3,
3796                                     symlink_to='hardlink')
3797                    self.expect_file('symlink2', size=3,
3798                                     symlink_to='linkdir/hardlink2')
3799                else:
3800                    self.expect_file('linkdir/symlink', size=3)
3801                    self.expect_file('symlink2', size=3)
3802
3803    @symlink_test
3804    def test_sneaky_hardlink_fallback(self):
3805        # (CVE-2025-4330)
3806        # Test that when hardlink extraction falls back to extracting members
3807        # from the archive, the extracted member is (re-)filtered.
3808        with ArchiveMaker() as arc:
3809            # Create a directory structure so the c/escape symlink stays
3810            # inside the path
3811            arc.add("a/t/dummy")
3812            # Create b/ directory
3813            arc.add("b/")
3814            # Point "c" to the bottom of the tree in "a"
3815            arc.add("c", symlink_to=os.path.join("a", "t"))
3816            # link to non-existant location under "a"
3817            arc.add("c/escape", symlink_to=os.path.join("..", "..",
3818                                                        "link_here"))
3819            # Move "c" to point to "b" ("c/escape" no longer exists)
3820            arc.add("c", symlink_to="b")
3821            # Attempt to create a hard link to "c/escape". Since it doesn't
3822            # exist it will attempt to extract "cescape" but at "boom".
3823            arc.add("boom", hardlink_to=os.path.join("c", "escape"))
3824
3825        with self.check_context(arc.open(), 'data'):
3826            if not os_helper.can_symlink():
3827                # When 'c/escape' is extracted, 'c' is a regular
3828                # directory, and 'c/escape' *would* point outside
3829                # the destination if symlinks were allowed.
3830                self.expect_exception(
3831                    tarfile.LinkOutsideDestinationError)
3832            elif sys.platform == "win32":
3833                # On Windows, 'c/escape' points outside the destination
3834                self.expect_exception(tarfile.LinkOutsideDestinationError)
3835            else:
3836                e = self.expect_exception(
3837                    tarfile.LinkFallbackError,
3838                    "link 'boom' would be extracted as a copy of "
3839                    + "'c/escape', which was rejected")
3840                self.assertIsInstance(e.__cause__,
3841                                      tarfile.LinkOutsideDestinationError)
3842        for filter in 'tar', 'fully_trusted':
3843            with self.subTest(filter), self.check_context(arc.open(), filter):
3844                if not os_helper.can_symlink():
3845                    self.expect_file("a/t/dummy")
3846                    self.expect_file("b/")
3847                    self.expect_file("c/")
3848                else:
3849                    self.expect_file("a/t/dummy")
3850                    self.expect_file("b/")
3851                    self.expect_file("a/t/escape", symlink_to='../../link_here')
3852                    self.expect_file("boom", symlink_to='../../link_here')
3853                    self.expect_file("c", symlink_to='b')
3854
3855    @symlink_test
3856    def test_exfiltration_via_symlink(self):
3857        # (CVE-2025-4138)
3858        # Test changing symlinks that result in a symlink pointing outside
3859        # the extraction directory, unless prevented by 'data' filter's
3860        # normalization.
3861        with ArchiveMaker() as arc:
3862            arc.add("escape", symlink_to=os.path.join('link', 'link', '..', '..', 'link-here'))
3863            arc.add("link", symlink_to='./')
3864
3865        for filter in 'tar', 'data', 'fully_trusted':
3866            with self.check_context(arc.open(), filter):
3867                if os_helper.can_symlink():
3868                    self.expect_file("link", symlink_to='./')
3869                    if filter == 'data':
3870                        self.expect_file("escape", symlink_to='link-here')
3871                    else:
3872                        self.expect_file("escape",
3873                                         symlink_to='link/link/../../link-here')
3874                else:
3875                    # Nothing is extracted.
3876                    pass
3877
3878    @symlink_test
3879    def test_chmod_outside_dir(self):
3880        # (CVE-2024-12718)
3881        # Test that members used for delayed updates of directory metadata
3882        # are (re-)filtered.
3883        with ArchiveMaker() as arc:
3884            # "pwn" is a veeeery innocent symlink:
3885            arc.add("a/pwn", symlink_to='.')
3886            # But now "pwn" is also a directory, so it's scheduled to have its
3887            # metadata updated later:
3888            arc.add("a/pwn/", mode='drwxrwxrwx')
3889            # Oops, "pwn" is not so innocent any more:
3890            arc.add("a/pwn", symlink_to='x/../')
3891            # Newly created symlink points to the dest dir,
3892            # so it's OK for the "data" filter.
3893            arc.add('a/x', symlink_to=('../'))
3894            # But now "pwn" points outside the dest dir
3895
3896        for filter in 'tar', 'data', 'fully_trusted':
3897            with self.check_context(arc.open(), filter) as cc:
3898                if not os_helper.can_symlink():
3899                    self.expect_file("a/pwn/")
3900                elif filter == 'data':
3901                    self.expect_file("a/x", symlink_to='../')
3902                    self.expect_file("a/pwn", symlink_to='.')
3903                else:
3904                    self.expect_file("a/x", symlink_to='../')
3905                    self.expect_file("a/pwn", symlink_to='x/../')
3906                if sys.platform != "win32":
3907                    st_mode = cc.outerdir.stat().st_mode
3908                    self.assertNotEqual(st_mode & 0o777, 0o777)
3909
3910    def test_link_fallback_normalizes(self):
3911        # Make sure hardlink fallbacks work for non-normalized paths for all
3912        # filters
3913        with ArchiveMaker() as arc:
3914            arc.add("dir/")
3915            arc.add("dir/../afile")
3916            arc.add("link1", hardlink_to='dir/../afile')
3917            arc.add("link2", hardlink_to='dir/../dir/../afile')
3918
3919        for filter in 'tar', 'data', 'fully_trusted':
3920            with self.check_context(arc.open(), filter) as cc:
3921                self.expect_file("dir/")
3922                self.expect_file("afile")
3923                self.expect_file("link1")
3924                self.expect_file("link2")
3925
3926    def test_modes(self):
3927        # Test how file modes are extracted
3928        # (Note that the modes are ignored on platforms without working chmod)
3929        with ArchiveMaker() as arc:
3930            arc.add('all_bits', mode='?rwsrwsrwt')
3931            arc.add('perm_bits', mode='?rwxrwxrwx')
3932            arc.add('exec_group_other', mode='?rw-rwxrwx')
3933            arc.add('read_group_only', mode='?---r-----')
3934            arc.add('no_bits', mode='?---------')
3935            arc.add('dir/', mode='?---rwsrwt')
3936
3937        # On some systems, setting the sticky bit is a no-op.
3938        # Check if that's the case.
3939        tmp_filename = os.path.join(TEMPDIR, "tmp.file")
3940        with open(tmp_filename, 'w'):
3941            pass
3942        os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX)
3943        have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX)
3944        os.unlink(tmp_filename)
3945
3946        os.mkdir(tmp_filename)
3947        os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX)
3948        have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX)
3949        os.rmdir(tmp_filename)
3950
3951        with self.check_context(arc.open(), 'fully_trusted'):
3952            if have_sticky_files:
3953                self.expect_file('all_bits', mode='?rwsrwsrwt')
3954            else:
3955                self.expect_file('all_bits', mode='?rwsrwsrwx')
3956            self.expect_file('perm_bits', mode='?rwxrwxrwx')
3957            self.expect_file('exec_group_other', mode='?rw-rwxrwx')
3958            self.expect_file('read_group_only', mode='?---r-----')
3959            self.expect_file('no_bits', mode='?---------')
3960            if have_sticky_dirs:
3961                self.expect_file('dir/', mode='?---rwsrwt')
3962            else:
3963                self.expect_file('dir/', mode='?---rwsrwx')
3964
3965        with self.check_context(arc.open(), 'tar'):
3966            self.expect_file('all_bits', mode='?rwxr-xr-x')
3967            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3968            self.expect_file('exec_group_other', mode='?rw-r-xr-x')
3969            self.expect_file('read_group_only', mode='?---r-----')
3970            self.expect_file('no_bits', mode='?---------')
3971            self.expect_file('dir/', mode='?---r-xr-x')
3972
3973        with self.check_context(arc.open(), 'data'):
3974            normal_dir_mode = stat.filemode(stat.S_IMODE(
3975                self.outerdir.stat().st_mode))
3976            self.expect_file('all_bits', mode='?rwxr-xr-x')
3977            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3978            self.expect_file('exec_group_other', mode='?rw-r--r--')
3979            self.expect_file('read_group_only', mode='?rw-r-----')
3980            self.expect_file('no_bits', mode='?rw-------')
3981            self.expect_file('dir/', mode=normal_dir_mode)
3982
3983    def test_pipe(self):
3984        # Test handling of a special file
3985        with ArchiveMaker() as arc:
3986            arc.add('foo', type=tarfile.FIFOTYPE)
3987
3988        for filter in 'fully_trusted', 'tar':
3989            with self.check_context(arc.open(), filter):
3990                if hasattr(os, 'mkfifo'):
3991                    self.expect_file('foo', type=tarfile.FIFOTYPE)
3992                else:
3993                    # The pipe can't be extracted and is skipped.
3994                    pass
3995
3996        with self.check_context(arc.open(), 'data'):
3997            self.expect_exception(
3998                tarfile.SpecialFileError,
3999                "'foo' is a special file")
4000
4001    def test_special_files(self):
4002        # Creating device files is tricky. Instead of attempting that let's
4003        # only check the filter result.
4004        for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE:
4005            tarinfo = tarfile.TarInfo('foo')
4006            tarinfo.type = special_type
4007            trusted = tarfile.fully_trusted_filter(tarinfo, '')
4008            self.assertIs(trusted, tarinfo)
4009            tar = tarfile.tar_filter(tarinfo, '')
4010            self.assertEqual(tar.type, special_type)
4011            with self.assertRaises(tarfile.SpecialFileError) as cm:
4012                tarfile.data_filter(tarinfo, '')
4013            self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo)
4014            self.assertEqual(cm.exception.tarinfo.name, 'foo')
4015
4016    def test_fully_trusted_filter(self):
4017        # The 'fully_trusted' filter returns the original TarInfo objects.
4018        with tarfile.TarFile.open(tarname) as tar:
4019            for tarinfo in tar.getmembers():
4020                filtered = tarfile.fully_trusted_filter(tarinfo, '')
4021                self.assertIs(filtered, tarinfo)
4022
4023    def test_tar_filter(self):
4024        # The 'tar' filter returns TarInfo objects with the same name/type.
4025        # (It can also fail for particularly "evil" input, but we don't have
4026        # that in the test archive.)
4027        with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar:
4028            for tarinfo in tar.getmembers():
4029                filtered = tarfile.tar_filter(tarinfo, '')
4030                self.assertIs(filtered.name, tarinfo.name)
4031                self.assertIs(filtered.type, tarinfo.type)
4032
4033    def test_data_filter(self):
4034        # The 'data' filter either raises, or returns TarInfo with the same
4035        # name/type.
4036        with tarfile.TarFile.open(tarname, encoding="iso8859-1") as tar:
4037            for tarinfo in tar.getmembers():
4038                try:
4039                    filtered = tarfile.data_filter(tarinfo, '')
4040                except tarfile.FilterError:
4041                    continue
4042                self.assertIs(filtered.name, tarinfo.name)
4043                self.assertIs(filtered.type, tarinfo.type)
4044
4045    def test_default_filter_warns_not(self):
4046        """Ensure the default filter does not warn (like in 3.12)"""
4047        with ArchiveMaker() as arc:
4048            arc.add('foo')
4049        with warnings_helper.check_no_warnings(self):
4050            with self.check_context(arc.open(), None):
4051                self.expect_file('foo')
4052
4053    def test_change_default_filter_on_instance(self):
4054        tar = tarfile.TarFile(tarname, 'r')
4055        def strict_filter(tarinfo, path):
4056            if tarinfo.name == 'ustar/regtype':
4057                return tarinfo
4058            else:
4059                return None
4060        tar.extraction_filter = strict_filter
4061        with self.check_context(tar, None):
4062            self.expect_file('ustar/regtype')
4063
4064    def test_change_default_filter_on_class(self):
4065        def strict_filter(tarinfo, path):
4066            if tarinfo.name == 'ustar/regtype':
4067                return tarinfo
4068            else:
4069                return None
4070        tar = tarfile.TarFile(tarname, 'r')
4071        with support.swap_attr(tarfile.TarFile, 'extraction_filter',
4072                               staticmethod(strict_filter)):
4073            with self.check_context(tar, None):
4074                self.expect_file('ustar/regtype')
4075
4076    def test_change_default_filter_on_subclass(self):
4077        class TarSubclass(tarfile.TarFile):
4078            def extraction_filter(self, tarinfo, path):
4079                if tarinfo.name == 'ustar/regtype':
4080                    return tarinfo
4081                else:
4082                    return None
4083
4084        tar = TarSubclass(tarname, 'r')
4085        with self.check_context(tar, None):
4086            self.expect_file('ustar/regtype')
4087
4088    def test_change_default_filter_to_string(self):
4089        tar = tarfile.TarFile(tarname, 'r')
4090        tar.extraction_filter = 'data'
4091        with self.check_context(tar, None):
4092            self.expect_exception(TypeError)
4093
4094    def test_custom_filter(self):
4095        def custom_filter(tarinfo, path):
4096            self.assertIs(path, self.destdir)
4097            if tarinfo.name == 'move_this':
4098                return tarinfo.replace(name='moved')
4099            if tarinfo.name == 'ignore_this':
4100                return None
4101            return tarinfo
4102
4103        with ArchiveMaker() as arc:
4104            arc.add('move_this')
4105            arc.add('ignore_this')
4106            arc.add('keep')
4107        with self.check_context(arc.open(), custom_filter):
4108            self.expect_file('moved')
4109            self.expect_file('keep')
4110
4111    def test_bad_filter_name(self):
4112        with ArchiveMaker() as arc:
4113            arc.add('foo')
4114        with self.check_context(arc.open(), 'bad filter name'):
4115            self.expect_exception(ValueError)
4116
4117    def test_stateful_filter(self):
4118        # Stateful filters should be possible.
4119        # (This doesn't really test tarfile. Rather, it demonstrates
4120        # that third parties can implement a stateful filter.)
4121        class StatefulFilter:
4122            def __enter__(self):
4123                self.num_files_processed = 0
4124                return self
4125
4126            def __call__(self, tarinfo, path):
4127                try:
4128                    tarinfo = tarfile.data_filter(tarinfo, path)
4129                except tarfile.FilterError:
4130                    return None
4131                self.num_files_processed += 1
4132                return tarinfo
4133
4134            def __exit__(self, *exc_info):
4135                self.done = True
4136
4137        with ArchiveMaker() as arc:
4138            arc.add('good')
4139            arc.add('bad', symlink_to='/')
4140            arc.add('good')
4141        with StatefulFilter() as custom_filter:
4142            with self.check_context(arc.open(), custom_filter):
4143                self.expect_file('good')
4144        self.assertEqual(custom_filter.num_files_processed, 2)
4145        self.assertEqual(custom_filter.done, True)
4146
4147    def test_errorlevel(self):
4148        def extracterror_filter(tarinfo, path):
4149            raise tarfile.ExtractError('failed with ExtractError')
4150        def filtererror_filter(tarinfo, path):
4151            raise tarfile.FilterError('failed with FilterError')
4152        def oserror_filter(tarinfo, path):
4153            raise OSError('failed with OSError')
4154        def tarerror_filter(tarinfo, path):
4155            raise tarfile.TarError('failed with base TarError')
4156        def valueerror_filter(tarinfo, path):
4157            raise ValueError('failed with ValueError')
4158
4159        with ArchiveMaker() as arc:
4160            arc.add('file')
4161
4162        # If errorlevel is 0, errors affected by errorlevel are ignored
4163
4164        with self.check_context(arc.open(errorlevel=0), extracterror_filter):
4165            pass
4166
4167        with self.check_context(arc.open(errorlevel=0), filtererror_filter):
4168            pass
4169
4170        with self.check_context(arc.open(errorlevel=0), oserror_filter):
4171            pass
4172
4173        with self.check_context(arc.open(errorlevel=0), tarerror_filter):
4174            self.expect_exception(tarfile.TarError)
4175
4176        with self.check_context(arc.open(errorlevel=0), valueerror_filter):
4177            self.expect_exception(ValueError)
4178
4179        # If 1, all fatal errors are raised
4180
4181        with self.check_context(arc.open(errorlevel=1), extracterror_filter):
4182            pass
4183
4184        with self.check_context(arc.open(errorlevel=1), filtererror_filter):
4185            self.expect_exception(tarfile.FilterError)
4186
4187        with self.check_context(arc.open(errorlevel=1), oserror_filter):
4188            self.expect_exception(OSError)
4189
4190        with self.check_context(arc.open(errorlevel=1), tarerror_filter):
4191            self.expect_exception(tarfile.TarError)
4192
4193        with self.check_context(arc.open(errorlevel=1), valueerror_filter):
4194            self.expect_exception(ValueError)
4195
4196        # If 2, all non-fatal errors are raised as well.
4197
4198        with self.check_context(arc.open(errorlevel=2), extracterror_filter):
4199            self.expect_exception(tarfile.ExtractError)
4200
4201        with self.check_context(arc.open(errorlevel=2), filtererror_filter):
4202            self.expect_exception(tarfile.FilterError)
4203
4204        with self.check_context(arc.open(errorlevel=2), oserror_filter):
4205            self.expect_exception(OSError)
4206
4207        with self.check_context(arc.open(errorlevel=2), tarerror_filter):
4208            self.expect_exception(tarfile.TarError)
4209
4210        with self.check_context(arc.open(errorlevel=2), valueerror_filter):
4211            self.expect_exception(ValueError)
4212
4213        # We only handle ExtractionError, FilterError & OSError specially.
4214
4215        with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter):
4216            self.expect_exception(TypeError)  # errorlevel is not int
4217
4218
4219class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
4220    testdir = os.path.join(TEMPDIR, "testoverwrite")
4221
4222    @classmethod
4223    def setUpClass(cls):
4224        p = cls.ar_with_file = os.path.join(TEMPDIR, 'tar-with-file.tar')
4225        cls.addClassCleanup(os_helper.unlink, p)
4226        with tarfile.open(p, 'w') as tar:
4227            t = tarfile.TarInfo('test')
4228            t.size = 10
4229            tar.addfile(t, io.BytesIO(b'newcontent'))
4230
4231        p = cls.ar_with_dir = os.path.join(TEMPDIR, 'tar-with-dir.tar')
4232        cls.addClassCleanup(os_helper.unlink, p)
4233        with tarfile.open(p, 'w') as tar:
4234            tar.addfile(tar.gettarinfo(os.curdir, 'test'))
4235
4236        p = os.path.join(TEMPDIR, 'tar-with-implicit-dir.tar')
4237        cls.ar_with_implicit_dir = p
4238        cls.addClassCleanup(os_helper.unlink, p)
4239        with tarfile.open(p, 'w') as tar:
4240            t = tarfile.TarInfo('test/file')
4241            t.size = 10
4242            tar.addfile(t, io.BytesIO(b'newcontent'))
4243
4244    def open(self, path):
4245        return tarfile.open(path, 'r')
4246
4247    def extractall(self, ar):
4248        ar.extractall(self.testdir, filter='fully_trusted')
4249
4250
4251class OffsetValidationTests(unittest.TestCase):
4252    tarname = tmpname
4253    invalid_posix_header = (
4254        # name: 100 bytes
4255        tarfile.NUL * tarfile.LENGTH_NAME
4256        # mode, space, null terminator: 8 bytes
4257        + b"000755" + SPACE + tarfile.NUL
4258        # uid, space, null terminator: 8 bytes
4259        + b"000001" + SPACE + tarfile.NUL
4260        # gid, space, null terminator: 8 bytes
4261        + b"000001" + SPACE + tarfile.NUL
4262        # size, space: 12 bytes
4263        + b"\xff" * 11 + SPACE
4264        # mtime, space: 12 bytes
4265        + tarfile.NUL * 11 + SPACE
4266        # chksum: 8 bytes
4267        + b"0011407" + tarfile.NUL
4268        # type: 1 byte
4269        + tarfile.REGTYPE
4270        # linkname: 100 bytes
4271        + tarfile.NUL * tarfile.LENGTH_LINK
4272        # magic: 6 bytes, version: 2 bytes
4273        + tarfile.POSIX_MAGIC
4274        # uname: 32 bytes
4275        + tarfile.NUL * 32
4276        # gname: 32 bytes
4277        + tarfile.NUL * 32
4278        # devmajor, space, null terminator: 8 bytes
4279        + tarfile.NUL * 6 + SPACE + tarfile.NUL
4280        # devminor, space, null terminator: 8 bytes
4281        + tarfile.NUL * 6 + SPACE + tarfile.NUL
4282        # prefix: 155 bytes
4283        + tarfile.NUL * tarfile.LENGTH_PREFIX
4284        # padding: 12 bytes
4285        + tarfile.NUL * 12
4286    )
4287    invalid_gnu_header = (
4288        # name: 100 bytes
4289        tarfile.NUL * tarfile.LENGTH_NAME
4290        # mode, null terminator: 8 bytes
4291        + b"0000755" + tarfile.NUL
4292        # uid, null terminator: 8 bytes
4293        + b"0000001" + tarfile.NUL
4294        # gid, space, null terminator: 8 bytes
4295        + b"0000001" + tarfile.NUL
4296        # size, space: 12 bytes
4297        + b"\xff" * 11 + SPACE
4298        # mtime, space: 12 bytes
4299        + tarfile.NUL * 11 + SPACE
4300        # chksum: 8 bytes
4301        + b"0011327" + tarfile.NUL
4302        # type: 1 byte
4303        + tarfile.REGTYPE
4304        # linkname: 100 bytes
4305        + tarfile.NUL * tarfile.LENGTH_LINK
4306        # magic: 8 bytes
4307        + tarfile.GNU_MAGIC
4308        # uname: 32 bytes
4309        + tarfile.NUL * 32
4310        # gname: 32 bytes
4311        + tarfile.NUL * 32
4312        # devmajor, null terminator: 8 bytes
4313        + tarfile.NUL * 8
4314        # devminor, null terminator: 8 bytes
4315        + tarfile.NUL * 8
4316        # padding: 167 bytes
4317        + tarfile.NUL * 167
4318    )
4319    invalid_v7_header = (
4320        # name: 100 bytes
4321        tarfile.NUL * tarfile.LENGTH_NAME
4322        # mode, space, null terminator: 8 bytes
4323        + b"000755" + SPACE + tarfile.NUL
4324        # uid, space, null terminator: 8 bytes
4325        + b"000001" + SPACE + tarfile.NUL
4326        # gid, space, null terminator: 8 bytes
4327        + b"000001" + SPACE + tarfile.NUL
4328        # size, space: 12 bytes
4329        + b"\xff" * 11 + SPACE
4330        # mtime, space: 12 bytes
4331        + tarfile.NUL * 11 + SPACE
4332        # chksum: 8 bytes
4333        + b"0010070" + tarfile.NUL
4334        # type: 1 byte
4335        + tarfile.REGTYPE
4336        # linkname: 100 bytes
4337        + tarfile.NUL * tarfile.LENGTH_LINK
4338        # padding: 255 bytes
4339        + tarfile.NUL * 255
4340    )
4341    valid_gnu_header = tarfile.TarInfo("filename").tobuf(tarfile.GNU_FORMAT)
4342    data_block = b"\xff" * tarfile.BLOCKSIZE
4343
4344    def _write_buffer(self, buffer):
4345        with open(self.tarname, "wb") as f:
4346            f.write(buffer)
4347
4348    def _get_members(self, ignore_zeros=None):
4349        with open(self.tarname, "rb") as f:
4350            with tarfile.open(
4351                mode="r", fileobj=f, ignore_zeros=ignore_zeros
4352            ) as tar:
4353                return tar.getmembers()
4354
4355    def _assert_raises_read_error_exception(self):
4356        with self.assertRaisesRegex(
4357            tarfile.ReadError, "file could not be opened successfully"
4358        ):
4359            self._get_members()
4360
4361    def test_invalid_offset_header_validations(self):
4362        for tar_format, invalid_header in (
4363            ("posix", self.invalid_posix_header),
4364            ("gnu", self.invalid_gnu_header),
4365            ("v7", self.invalid_v7_header),
4366        ):
4367            with self.subTest(format=tar_format):
4368                self._write_buffer(invalid_header)
4369                self._assert_raises_read_error_exception()
4370
4371    def test_early_stop_at_invalid_offset_header(self):
4372        buffer = self.valid_gnu_header + self.invalid_gnu_header + self.valid_gnu_header
4373        self._write_buffer(buffer)
4374        members = self._get_members()
4375        self.assertEqual(len(members), 1)
4376        self.assertEqual(members[0].name, "filename")
4377        self.assertEqual(members[0].offset, 0)
4378
4379    def test_ignore_invalid_archive(self):
4380        # 3 invalid headers with their respective data
4381        buffer = (self.invalid_gnu_header + self.data_block) * 3
4382        self._write_buffer(buffer)
4383        members = self._get_members(ignore_zeros=True)
4384        self.assertEqual(len(members), 0)
4385
4386    def test_ignore_invalid_offset_headers(self):
4387        for first_block, second_block, expected_offset in (
4388            (
4389                (self.valid_gnu_header),
4390                (self.invalid_gnu_header + self.data_block),
4391                0,
4392            ),
4393            (
4394                (self.invalid_gnu_header + self.data_block),
4395                (self.valid_gnu_header),
4396                1024,
4397            ),
4398        ):
4399            self._write_buffer(first_block + second_block)
4400            members = self._get_members(ignore_zeros=True)
4401            self.assertEqual(len(members), 1)
4402            self.assertEqual(members[0].name, "filename")
4403            self.assertEqual(members[0].offset, expected_offset)
4404
4405
4406def setUpModule():
4407    os_helper.unlink(TEMPDIR)
4408    os.makedirs(TEMPDIR)
4409
4410    global testtarnames
4411    testtarnames = [tarname]
4412    with open(tarname, "rb") as fobj:
4413        data = fobj.read()
4414
4415    # Create compressed tarfiles.
4416    for c in GzipTest, Bz2Test, LzmaTest:
4417        if c.open:
4418            os_helper.unlink(c.tarname)
4419            testtarnames.append(c.tarname)
4420            with c.open(c.tarname, "wb") as tar:
4421                tar.write(data)
4422
4423def tearDownModule():
4424    if os.path.exists(TEMPDIR):
4425        os_helper.rmtree(TEMPDIR)
4426
4427if __name__ == "__main__":
4428    unittest.main()
4429