• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2import sys
3import os
4import io
5from hashlib import sha256
6from contextlib import contextmanager, ExitStack
7from random import Random
8import pathlib
9import shutil
10import re
11import warnings
12import stat
13
14import unittest
15import unittest.mock
16import tarfile
17
18from test import archiver_tests
19from test import support
20from test.support import os_helper
21from test.support import script_helper
22from test.support import warnings_helper
23
24# Check for our compression modules.
25try:
26    import gzip
27except ImportError:
28    gzip = None
29try:
30    import zlib
31except ImportError:
32    zlib = None
33try:
34    import bz2
35except ImportError:
36    bz2 = None
37try:
38    import lzma
39except ImportError:
40    lzma = None
41
42def sha256sum(data):
43    return sha256(data).hexdigest()
44
45TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
46tarextdir = TEMPDIR + '-extract-test'
47tarname = support.findfile("testtar.tar", subdir="archivetestdata")
48gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
49bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
50xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
51tmpname = os.path.join(TEMPDIR, "tmp.tar")
52dotlessname = os.path.join(TEMPDIR, "testtar")
53
54sha256_regtype = (
55    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
56)
57sha256_sparse = (
58    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
59)
60
61
62class TarTest:
63    tarname = tarname
64    suffix = ''
65    open = io.FileIO
66    taropen = tarfile.TarFile.taropen
67
68    @property
69    def mode(self):
70        return self.prefix + self.suffix
71
72@support.requires_gzip()
73class GzipTest:
74    tarname = gzipname
75    suffix = 'gz'
76    open = gzip.GzipFile if gzip else None
77    taropen = tarfile.TarFile.gzopen
78
79@support.requires_bz2()
80class Bz2Test:
81    tarname = bz2name
82    suffix = 'bz2'
83    open = bz2.BZ2File if bz2 else None
84    taropen = tarfile.TarFile.bz2open
85
86@support.requires_lzma()
87class LzmaTest:
88    tarname = xzname
89    suffix = 'xz'
90    open = lzma.LZMAFile if lzma else None
91    taropen = tarfile.TarFile.xzopen
92
93
94class ReadTest(TarTest):
95
96    prefix = "r:"
97
98    def setUp(self):
99        self.tar = tarfile.open(self.tarname, mode=self.mode,
100                                encoding="iso8859-1")
101
102    def tearDown(self):
103        self.tar.close()
104
105class StreamModeTest(ReadTest):
106
107    # Only needs to change how the tarfile is opened to set
108    # stream mode
109    def setUp(self):
110        self.tar = tarfile.open(self.tarname, mode=self.mode,
111                                encoding="iso8859-1",
112                                stream=True)
113
114class UstarReadTest(ReadTest, unittest.TestCase):
115
116    def test_fileobj_regular_file(self):
117        tarinfo = self.tar.getmember("ustar/regtype")
118        with self.tar.extractfile(tarinfo) as fobj:
119            data = fobj.read()
120            self.assertEqual(len(data), tarinfo.size,
121                    "regular file extraction failed")
122            self.assertEqual(sha256sum(data), sha256_regtype,
123                    "regular file extraction failed")
124
125    def test_fileobj_readlines(self):
126        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
127        tarinfo = self.tar.getmember("ustar/regtype")
128        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
129            lines1 = fobj1.readlines()
130
131        with self.tar.extractfile(tarinfo) as fobj:
132            fobj2 = io.TextIOWrapper(fobj)
133            lines2 = fobj2.readlines()
134            self.assertEqual(lines1, lines2,
135                    "fileobj.readlines() failed")
136            self.assertEqual(len(lines2), 114,
137                    "fileobj.readlines() failed")
138            self.assertEqual(lines2[83],
139                    "I will gladly admit that Python is not the fastest "
140                    "running scripting language.\n",
141                    "fileobj.readlines() failed")
142
143    def test_fileobj_iter(self):
144        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
145        tarinfo = self.tar.getmember("ustar/regtype")
146        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
147            lines1 = fobj1.readlines()
148        with self.tar.extractfile(tarinfo) as fobj2:
149            lines2 = list(io.TextIOWrapper(fobj2))
150            self.assertEqual(lines1, lines2,
151                    "fileobj.__iter__() failed")
152
153    def test_fileobj_seek(self):
154        self.tar.extract("ustar/regtype", TEMPDIR,
155                         filter='data')
156        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
157            data = fobj.read()
158
159        tarinfo = self.tar.getmember("ustar/regtype")
160        with self.tar.extractfile(tarinfo) as fobj:
161            text = fobj.read()
162            fobj.seek(0)
163            self.assertEqual(0, fobj.tell(),
164                         "seek() to file's start failed")
165            fobj.seek(2048, 0)
166            self.assertEqual(2048, fobj.tell(),
167                         "seek() to absolute position failed")
168            fobj.seek(-1024, 1)
169            self.assertEqual(1024, fobj.tell(),
170                         "seek() to negative relative position failed")
171            fobj.seek(1024, 1)
172            self.assertEqual(2048, fobj.tell(),
173                         "seek() to positive relative position failed")
174            s = fobj.read(10)
175            self.assertEqual(s, data[2048:2058],
176                         "read() after seek failed")
177            fobj.seek(0, 2)
178            self.assertEqual(tarinfo.size, fobj.tell(),
179                         "seek() to file's end failed")
180            self.assertEqual(fobj.read(), b"",
181                         "read() at file's end did not return empty string")
182            fobj.seek(-tarinfo.size, 2)
183            self.assertEqual(0, fobj.tell(),
184                         "relative seek() to file's end failed")
185            fobj.seek(512)
186            s1 = fobj.readlines()
187            fobj.seek(512)
188            s2 = fobj.readlines()
189            self.assertEqual(s1, s2,
190                         "readlines() after seek failed")
191            fobj.seek(0)
192            self.assertEqual(len(fobj.readline()), fobj.tell(),
193                         "tell() after readline() failed")
194            fobj.seek(512)
195            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
196                         "tell() after seek() and readline() failed")
197            fobj.seek(0)
198            line = fobj.readline()
199            self.assertEqual(fobj.read(), data[len(line):],
200                         "read() after readline() failed")
201
202    def test_fileobj_text(self):
203        with self.tar.extractfile("ustar/regtype") as fobj:
204            fobj = io.TextIOWrapper(fobj)
205            data = fobj.read().encode("iso8859-1")
206            self.assertEqual(sha256sum(data), sha256_regtype)
207            try:
208                fobj.seek(100)
209            except AttributeError:
210                # Issue #13815: seek() complained about a missing
211                # flush() method.
212                self.fail("seeking failed in text mode")
213
214    # Test if symbolic and hard links are resolved by extractfile().  The
215    # test link members each point to a regular member whose data is
216    # supposed to be exported.
217    def _test_fileobj_link(self, lnktype, regtype):
218        with self.tar.extractfile(lnktype) as a, \
219             self.tar.extractfile(regtype) as b:
220            self.assertEqual(a.name, b.name)
221
222    def test_fileobj_link1(self):
223        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
224
225    def test_fileobj_link2(self):
226        self._test_fileobj_link("./ustar/linktest2/lnktype",
227                                "ustar/linktest1/regtype")
228
229    def test_fileobj_symlink1(self):
230        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
231
232    def test_fileobj_symlink2(self):
233        self._test_fileobj_link("./ustar/linktest2/symtype",
234                                "ustar/linktest1/regtype")
235
236    def test_issue14160(self):
237        self._test_fileobj_link("symtype2", "ustar/regtype")
238
239    def test_add_dir_getmember(self):
240        # bpo-21987
241        self.add_dir_and_getmember('bar')
242        self.add_dir_and_getmember('a'*101)
243
244    @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"),
245                         "Missing getuid or getgid implementation")
246    def add_dir_and_getmember(self, name):
247        def filter(tarinfo):
248            tarinfo.uid = tarinfo.gid = 100
249            return tarinfo
250
251        with os_helper.temp_cwd():
252            with tarfile.open(tmpname, 'w') as tar:
253                tar.format = tarfile.USTAR_FORMAT
254                try:
255                    os.mkdir(name)
256                    tar.add(name, filter=filter)
257                finally:
258                    os.rmdir(name)
259            with tarfile.open(tmpname) as tar:
260                self.assertEqual(
261                    tar.getmember(name),
262                    tar.getmember(name + '/')
263                )
264
265class GzipUstarReadTest(GzipTest, UstarReadTest):
266    pass
267
268class Bz2UstarReadTest(Bz2Test, UstarReadTest):
269    pass
270
271class LzmaUstarReadTest(LzmaTest, UstarReadTest):
272    pass
273
274
275class ListTest(ReadTest, unittest.TestCase):
276
277    # Override setUp to use default encoding (UTF-8)
278    def setUp(self):
279        self.tar = tarfile.open(self.tarname, mode=self.mode)
280
281    def test_list(self):
282        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
283        with support.swap_attr(sys, 'stdout', tio):
284            self.tar.list(verbose=False)
285        out = tio.detach().getvalue()
286        self.assertIn(b'ustar/conttype', out)
287        self.assertIn(b'ustar/regtype', out)
288        self.assertIn(b'ustar/lnktype', out)
289        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
290        self.assertIn(b'./ustar/linktest2/symtype', out)
291        self.assertIn(b'./ustar/linktest2/lnktype', out)
292        # Make sure it puts trailing slash for directory
293        self.assertIn(b'ustar/dirtype/', out)
294        self.assertIn(b'ustar/dirtype-with-size/', out)
295        # Make sure it is able to print unencodable characters
296        def conv(b):
297            s = b.decode(self.tar.encoding, 'surrogateescape')
298            return s.encode('ascii', 'backslashreplace')
299        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
300        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
301                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
302        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
303                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
304        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
305        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
306        # Make sure it prints files separated by one newline without any
307        # 'ls -l'-like accessories if verbose flag is not being used
308        # ...
309        # ustar/conttype
310        # ustar/regtype
311        # ...
312        self.assertRegex(out, br'ustar/conttype ?\r?\n'
313                              br'ustar/regtype ?\r?\n')
314        # Make sure it does not print the source of link without verbose flag
315        self.assertNotIn(b'link to', out)
316        self.assertNotIn(b'->', out)
317
318    def test_list_verbose(self):
319        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
320        with support.swap_attr(sys, 'stdout', tio):
321            self.tar.list(verbose=True)
322        out = tio.detach().getvalue()
323        # Make sure it prints files separated by one newline with 'ls -l'-like
324        # accessories if verbose flag is being used
325        # ...
326        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
327        # -rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
328        # drwxr-xr-x tarfile/tarfile        0 2003-01-05 15:19:43 ustar/dirtype/
329        # ...
330        #
331        # Array of values to modify the regex below:
332        #  ((file_type, file_permissions, file_length), ...)
333        type_perm_lengths = (
334            (br'\?', b'rw-r--r--', b'7011'), (b'-', b'rw-r--r--', b'7011'),
335            (b'd', b'rwxr-xr-x', b'0'), (b'd', b'rwxr-xr-x', b'255'),
336            (br'\?', b'rw-r--r--', b'0'), (b'l', b'rwxrwxrwx', b'0'),
337            (b'b', b'rw-rw----', b'3,0'), (b'c', b'rw-rw-rw-', b'1,3'),
338            (b'p', b'rw-r--r--', b'0'))
339        self.assertRegex(out, b''.join(
340            [(tp + (br'%s tarfile/tarfile\s+%s ' % (perm, ln) +
341                    br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
342                    br'ustar/\w+type[/>\sa-z-]*\n')) for tp, perm, ln
343             in type_perm_lengths]))
344        # Make sure it prints the source of link with verbose flag
345        self.assertIn(b'ustar/symtype -> regtype', out)
346        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
347        self.assertIn(b'./ustar/linktest2/lnktype link to '
348                      b'./ustar/linktest1/regtype', out)
349        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
350                      (b'/123' * 125) + b'/longname', out)
351        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
352                      (b'/123' * 125) + b'/longname', out)
353
354    def test_list_members(self):
355        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
356        def members(tar):
357            for tarinfo in tar.getmembers():
358                if 'reg' in tarinfo.name:
359                    yield tarinfo
360        with support.swap_attr(sys, 'stdout', tio):
361            self.tar.list(verbose=False, members=members(self.tar))
362        out = tio.detach().getvalue()
363        self.assertIn(b'ustar/regtype', out)
364        self.assertNotIn(b'ustar/conttype', out)
365
366
367class GzipListTest(GzipTest, ListTest):
368    pass
369
370
371class Bz2ListTest(Bz2Test, ListTest):
372    pass
373
374
375class LzmaListTest(LzmaTest, ListTest):
376    pass
377
378
379class CommonReadTest(ReadTest):
380
381    def test_is_tarfile_erroneous(self):
382        with open(tmpname, "wb"):
383            pass
384
385        # is_tarfile works on filenames
386        self.assertFalse(tarfile.is_tarfile(tmpname))
387
388        # is_tarfile works on path-like objects
389        self.assertFalse(tarfile.is_tarfile(os_helper.FakePath(tmpname)))
390
391        # is_tarfile works on file objects
392        with open(tmpname, "rb") as fobj:
393            self.assertFalse(tarfile.is_tarfile(fobj))
394
395        # is_tarfile works on file-like objects
396        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
397
398    def test_is_tarfile_valid(self):
399        # is_tarfile works on filenames
400        self.assertTrue(tarfile.is_tarfile(self.tarname))
401
402        # is_tarfile works on path-like objects
403        self.assertTrue(tarfile.is_tarfile(os_helper.FakePath(self.tarname)))
404
405        # is_tarfile works on file objects
406        with open(self.tarname, "rb") as fobj:
407            self.assertTrue(tarfile.is_tarfile(fobj))
408
409        # is_tarfile works on file-like objects
410        with open(self.tarname, "rb") as fobj:
411            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
412
413    def test_is_tarfile_keeps_position(self):
414        # Test for issue44289: tarfile.is_tarfile() modifies
415        # file object's current position
416        with open(self.tarname, "rb") as fobj:
417            tarfile.is_tarfile(fobj)
418            self.assertEqual(fobj.tell(), 0)
419
420        with open(self.tarname, "rb") as fobj:
421            file_like = io.BytesIO(fobj.read())
422            tarfile.is_tarfile(file_like)
423            self.assertEqual(file_like.tell(), 0)
424
425    def test_empty_tarfile(self):
426        # Test for issue6123: Allow opening empty archives.
427        # This test checks if tarfile.open() is able to open an empty tar
428        # archive successfully. Note that an empty tar archive is not the
429        # same as an empty file!
430        with tarfile.open(tmpname, self.mode.replace("r", "w")):
431            pass
432        try:
433            tar = tarfile.open(tmpname, self.mode)
434            tar.getnames()
435        except tarfile.ReadError:
436            self.fail("tarfile.open() failed on empty archive")
437        else:
438            self.assertListEqual(tar.getmembers(), [])
439        finally:
440            tar.close()
441
442    def test_non_existent_tarfile(self):
443        # Test for issue11513: prevent non-existent gzipped tarfiles raising
444        # multiple exceptions.
445        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
446            tarfile.open("xxx", self.mode)
447
448    def test_null_tarfile(self):
449        # Test for issue6123: Allow opening empty archives.
450        # This test guarantees that tarfile.open() does not treat an empty
451        # file as an empty tar archive.
452        with open(tmpname, "wb"):
453            pass
454        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
455        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
456
457    def test_ignore_zeros(self):
458        # Test TarFile's ignore_zeros option.
459        # generate 512 pseudorandom bytes
460        data = Random(0).randbytes(512)
461        for char in (b'\0', b'a'):
462            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
463            # are ignored correctly.
464            with self.open(tmpname, "w") as fobj:
465                fobj.write(char * 1024)
466                tarinfo = tarfile.TarInfo("foo")
467                tarinfo.size = len(data)
468                fobj.write(tarinfo.tobuf())
469                fobj.write(data)
470
471            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
472            try:
473                self.assertListEqual(tar.getnames(), ["foo"],
474                    "ignore_zeros=True should have skipped the %r-blocks" %
475                    char)
476            finally:
477                tar.close()
478
479    def test_premature_end_of_archive(self):
480        for size in (512, 600, 1024, 1200):
481            with tarfile.open(tmpname, "w:") as tar:
482                t = tarfile.TarInfo("foo")
483                t.size = 1024
484                tar.addfile(t, io.BytesIO(b"a" * 1024))
485
486            with open(tmpname, "r+b") as fobj:
487                fobj.truncate(size)
488
489            with tarfile.open(tmpname) as tar:
490                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
491                    for t in tar:
492                        pass
493
494            with tarfile.open(tmpname) as tar:
495                t = tar.next()
496
497                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
498                    tar.extract(t, TEMPDIR, filter='data')
499
500                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
501                    tar.extractfile(t).read()
502
503    def test_length_zero_header(self):
504        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
505        # with an exception
506        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
507            with tarfile.open(support.findfile('recursion.tar', subdir='archivetestdata')):
508                pass
509
510    def test_extractfile_attrs(self):
511        # gh-74468: TarFile.name must name a file, not a parent archive.
512        file = self.tar.getmember('ustar/regtype')
513        with self.tar.extractfile(file) as fobj:
514            self.assertEqual(fobj.name, 'ustar/regtype')
515            self.assertRaises(AttributeError, fobj.fileno)
516            self.assertEqual(fobj.mode, 'rb')
517            self.assertIs(fobj.readable(), True)
518            self.assertIs(fobj.writable(), False)
519            if self.is_stream:
520                self.assertRaises(AttributeError, fobj.seekable)
521            else:
522                self.assertIs(fobj.seekable(), True)
523            self.assertIs(fobj.closed, False)
524        self.assertIs(fobj.closed, True)
525        self.assertEqual(fobj.name, 'ustar/regtype')
526        self.assertRaises(AttributeError, fobj.fileno)
527        self.assertEqual(fobj.mode, 'rb')
528        self.assertIs(fobj.readable(), True)
529        self.assertIs(fobj.writable(), False)
530        if self.is_stream:
531            self.assertRaises(AttributeError, fobj.seekable)
532        else:
533            self.assertIs(fobj.seekable(), True)
534
535
536class MiscReadTestBase(CommonReadTest):
537    is_stream = False
538
539    def test_no_name_argument(self):
540        with open(self.tarname, "rb") as fobj:
541            self.assertIsInstance(fobj.name, str)
542            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
543                self.assertIsInstance(tar.name, str)
544                self.assertEqual(tar.name, os.path.abspath(fobj.name))
545
546    def test_no_name_attribute(self):
547        with open(self.tarname, "rb") as fobj:
548            data = fobj.read()
549        fobj = io.BytesIO(data)
550        self.assertRaises(AttributeError, getattr, fobj, "name")
551        tar = tarfile.open(fileobj=fobj, mode=self.mode)
552        self.assertIsNone(tar.name)
553
554    def test_empty_name_attribute(self):
555        with open(self.tarname, "rb") as fobj:
556            data = fobj.read()
557        fobj = io.BytesIO(data)
558        fobj.name = ""
559        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
560            self.assertIsNone(tar.name)
561
562    def test_int_name_attribute(self):
563        # Issue 21044: tarfile.open() should handle fileobj with an integer
564        # 'name' attribute.
565        fd = os.open(self.tarname, os.O_RDONLY)
566        with open(fd, 'rb') as fobj:
567            self.assertIsInstance(fobj.name, int)
568            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
569                self.assertIsNone(tar.name)
570
571    def test_bytes_name_attribute(self):
572        tarname = os.fsencode(self.tarname)
573        with open(tarname, 'rb') as fobj:
574            self.assertIsInstance(fobj.name, bytes)
575            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
576                self.assertIsInstance(tar.name, bytes)
577                self.assertEqual(tar.name, os.path.abspath(fobj.name))
578
579    def test_pathlike_name(self, tarname=None):
580        if tarname is None:
581            tarname = self.tarname
582        expected = os.path.abspath(tarname)
583        tarname = os_helper.FakePath(tarname)
584        with tarfile.open(tarname, mode=self.mode) as tar:
585            self.assertEqual(tar.name, expected)
586        with self.taropen(tarname) as tar:
587            self.assertEqual(tar.name, expected)
588        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
589            self.assertEqual(tar.name, expected)
590        if self.suffix == '':
591            with tarfile.TarFile(tarname, mode='r') as tar:
592                self.assertEqual(tar.name, expected)
593
594    def test_pathlike_bytes_name(self):
595        self.test_pathlike_name(os.fsencode(self.tarname))
596
597    def test_illegal_mode_arg(self):
598        with open(tmpname, 'wb'):
599            pass
600        with self.assertRaisesRegex(ValueError, 'mode must be '):
601            tar = self.taropen(tmpname, 'q')
602        with self.assertRaisesRegex(ValueError, 'mode must be '):
603            tar = self.taropen(tmpname, 'rw')
604        with self.assertRaisesRegex(ValueError, 'mode must be '):
605            tar = self.taropen(tmpname, '')
606
607    def test_fileobj_with_offset(self):
608        # Skip the first member and store values from the second member
609        # of the testtar.
610        tar = tarfile.open(self.tarname, mode=self.mode)
611        try:
612            tar.next()
613            t = tar.next()
614            name = t.name
615            offset = t.offset
616            with tar.extractfile(t) as f:
617                data = f.read()
618        finally:
619            tar.close()
620
621        # Open the testtar and seek to the offset of the second member.
622        with self.open(self.tarname) as fobj:
623            fobj.seek(offset)
624
625            # Test if the tarfile starts with the second member.
626            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
627                t = tar.next()
628                self.assertEqual(t.name, name)
629                # Read to the end of fileobj and test if seeking back to the
630                # beginning works.
631                tar.getmembers()
632                self.assertEqual(tar.extractfile(t).read(), data,
633                        "seek back did not work")
634
635    def test_fail_comp(self):
636        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
637        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
638        with open(tarname, "rb") as fobj:
639            self.assertRaises(tarfile.ReadError, tarfile.open,
640                              fileobj=fobj, mode=self.mode)
641
642    def test_v7_dirtype(self):
643        # Test old style dirtype member (bug #1336623):
644        # Old V7 tars create directory members using an AREGTYPE
645        # header with a "/" appended to the filename field.
646        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
647        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
648                "v7 dirtype failed")
649
650    def test_xstar_type(self):
651        # The xstar format stores extra atime and ctime fields inside the
652        # space reserved for the prefix field. The prefix field must be
653        # ignored in this case, otherwise it will mess up the name.
654        try:
655            self.tar.getmember("misc/regtype-xstar")
656        except KeyError:
657            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
658
659    def test_check_members(self):
660        for tarinfo in self.tar:
661            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
662                    "wrong mtime for %s" % tarinfo.name)
663            if not tarinfo.name.startswith("ustar/"):
664                continue
665            self.assertEqual(tarinfo.uname, "tarfile",
666                    "wrong uname for %s" % tarinfo.name)
667
668    def test_find_members(self):
669        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
670                "could not find all members")
671
672    @unittest.skipUnless(hasattr(os, "link"),
673                         "Missing hardlink implementation")
674    @os_helper.skip_unless_symlink
675    def test_extract_hardlink(self):
676        # Test hardlink extraction (e.g. bug #857297).
677        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
678            tar.extract("ustar/regtype", TEMPDIR, filter='data')
679            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
680
681            tar.extract("ustar/lnktype", TEMPDIR, filter='data')
682            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
683            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
684                data = f.read()
685            self.assertEqual(sha256sum(data), sha256_regtype)
686
687            tar.extract("ustar/symtype", TEMPDIR, filter='data')
688            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
689            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
690                data = f.read()
691            self.assertEqual(sha256sum(data), sha256_regtype)
692
693    @os_helper.skip_unless_working_chmod
694    def test_extractall(self):
695        # Test if extractall() correctly restores directory permissions
696        # and times (see issue1735).
697        tar = tarfile.open(tarname, encoding="iso8859-1")
698        DIR = os.path.join(TEMPDIR, "extractall")
699        os.mkdir(DIR)
700        try:
701            directories = [t for t in tar if t.isdir()]
702            tar.extractall(DIR, directories, filter='fully_trusted')
703            for tarinfo in directories:
704                path = os.path.join(DIR, tarinfo.name)
705                if sys.platform != "win32":
706                    # Win32 has no support for fine grained permissions.
707                    self.assertEqual(tarinfo.mode & 0o777,
708                                     os.stat(path).st_mode & 0o777,
709                                     tarinfo.name)
710                def format_mtime(mtime):
711                    if isinstance(mtime, float):
712                        return "{} ({})".format(mtime, mtime.hex())
713                    else:
714                        return "{!r} (int)".format(mtime)
715                file_mtime = os.path.getmtime(path)
716                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
717                    format_mtime(tarinfo.mtime),
718                    format_mtime(file_mtime),
719                    path)
720                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
721        finally:
722            tar.close()
723            os_helper.rmtree(DIR)
724
725    @os_helper.skip_unless_working_chmod
726    def test_extract_directory(self):
727        dirtype = "ustar/dirtype"
728        DIR = os.path.join(TEMPDIR, "extractdir")
729        os.mkdir(DIR)
730        try:
731            with tarfile.open(tarname, encoding="iso8859-1") as tar:
732                tarinfo = tar.getmember(dirtype)
733                tar.extract(tarinfo, path=DIR, filter='fully_trusted')
734                extracted = os.path.join(DIR, dirtype)
735                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
736                if sys.platform != "win32":
737                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
738        finally:
739            os_helper.rmtree(DIR)
740
741    def test_deprecation_if_no_filter_passed_to_extractall(self):
742        DIR = pathlib.Path(TEMPDIR) / "extractall"
743        with (
744            os_helper.temp_dir(DIR),
745            tarfile.open(tarname, encoding="iso8859-1") as tar
746        ):
747            directories = [t for t in tar if t.isdir()]
748            with self.assertWarnsRegex(DeprecationWarning, "Use the filter argument") as cm:
749                tar.extractall(DIR, directories)
750            # check that the stacklevel of the deprecation warning is correct:
751            self.assertEqual(cm.filename, __file__)
752
753    def test_deprecation_if_no_filter_passed_to_extract(self):
754        dirtype = "ustar/dirtype"
755        DIR = pathlib.Path(TEMPDIR) / "extractall"
756        with (
757            os_helper.temp_dir(DIR),
758            tarfile.open(tarname, encoding="iso8859-1") as tar
759        ):
760            tarinfo = tar.getmember(dirtype)
761            with self.assertWarnsRegex(DeprecationWarning, "Use the filter argument") as cm:
762                tar.extract(tarinfo, path=DIR)
763            # check that the stacklevel of the deprecation warning is correct:
764            self.assertEqual(cm.filename, __file__)
765
766    def test_extractall_pathlike_dir(self):
767        DIR = os.path.join(TEMPDIR, "extractall")
768        with os_helper.temp_dir(DIR), \
769             tarfile.open(tarname, encoding="iso8859-1") as tar:
770            directories = [t for t in tar if t.isdir()]
771            tar.extractall(os_helper.FakePath(DIR), directories, filter='fully_trusted')
772            for tarinfo in directories:
773                path = os.path.join(DIR, tarinfo.name)
774                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
775
776    def test_extract_pathlike_dir(self):
777        dirtype = "ustar/dirtype"
778        DIR = os.path.join(TEMPDIR, "extractall")
779        with os_helper.temp_dir(DIR), \
780             tarfile.open(tarname, encoding="iso8859-1") as tar:
781            tarinfo = tar.getmember(dirtype)
782            tar.extract(tarinfo, path=os_helper.FakePath(DIR), filter='fully_trusted')
783            extracted = os.path.join(DIR, dirtype)
784            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
785
786    def test_init_close_fobj(self):
787        # Issue #7341: Close the internal file object in the TarFile
788        # constructor in case of an error. For the test we rely on
789        # the fact that opening an empty file raises a ReadError.
790        empty = os.path.join(TEMPDIR, "empty")
791        with open(empty, "wb") as fobj:
792            fobj.write(b"")
793
794        try:
795            tar = object.__new__(tarfile.TarFile)
796            try:
797                tar.__init__(empty)
798            except tarfile.ReadError:
799                self.assertTrue(tar.fileobj.closed)
800            else:
801                self.fail("ReadError not raised")
802        finally:
803            os_helper.unlink(empty)
804
805    def test_parallel_iteration(self):
806        # Issue #16601: Restarting iteration over tarfile continued
807        # from where it left off.
808        with tarfile.open(self.tarname) as tar:
809            for m1, m2 in zip(tar, tar):
810                self.assertEqual(m1.offset, m2.offset)
811                self.assertEqual(m1.get_info(), m2.get_info())
812
813    @unittest.skipIf(zlib is None, "requires zlib")
814    def test_zlib_error_does_not_leak(self):
815        # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when
816        # parsing certain types of invalid data
817        with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock:
818            mock.side_effect = zlib.error
819            with self.assertRaises(tarfile.ReadError):
820                tarfile.open(self.tarname)
821
822    def test_next_on_empty_tarfile(self):
823        fd = io.BytesIO()
824        tf = tarfile.open(fileobj=fd, mode="w")
825        tf.close()
826
827        fd.seek(0)
828        with tarfile.open(fileobj=fd, mode="r|") as tf:
829            self.assertEqual(tf.next(), None)
830
831        fd.seek(0)
832        with tarfile.open(fileobj=fd, mode="r") as tf:
833            self.assertEqual(tf.next(), None)
834
835class MiscReadTest(MiscReadTestBase, unittest.TestCase):
836    test_fail_comp = None
837
838class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
839    pass
840
841class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
842    pass
843
844class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
845    pass
846
847
848class StreamReadTest(CommonReadTest, unittest.TestCase):
849
850    prefix="r|"
851    is_stream = True
852
853    def test_read_through(self):
854        # Issue #11224: A poorly designed _FileInFile.read() method
855        # caused seeking errors with stream tar files.
856        for tarinfo in self.tar:
857            if not tarinfo.isreg():
858                continue
859            with self.tar.extractfile(tarinfo) as fobj:
860                while True:
861                    try:
862                        buf = fobj.read(512)
863                    except tarfile.StreamError:
864                        self.fail("simple read-through using "
865                                  "TarFile.extractfile() failed")
866                    if not buf:
867                        break
868
869    def test_fileobj_regular_file(self):
870        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
871        with self.tar.extractfile(tarinfo) as fobj:
872            data = fobj.read()
873        self.assertEqual(len(data), tarinfo.size,
874                "regular file extraction failed")
875        self.assertEqual(sha256sum(data), sha256_regtype,
876                "regular file extraction failed")
877
878    def test_provoke_stream_error(self):
879        tarinfos = self.tar.getmembers()
880        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
881            self.assertRaises(tarfile.StreamError, f.read)
882
883    def test_compare_members(self):
884        tar1 = tarfile.open(tarname, encoding="iso8859-1")
885        try:
886            tar2 = self.tar
887
888            while True:
889                t1 = tar1.next()
890                t2 = tar2.next()
891                if t1 is None:
892                    break
893                self.assertIsNotNone(t2, "stream.next() failed.")
894
895                if t2.islnk() or t2.issym():
896                    with self.assertRaises(tarfile.StreamError):
897                        tar2.extractfile(t2)
898                    continue
899
900                v1 = tar1.extractfile(t1)
901                v2 = tar2.extractfile(t2)
902                if v1 is None:
903                    continue
904                self.assertIsNotNone(v2, "stream.extractfile() failed")
905                self.assertEqual(v1.read(), v2.read(),
906                        "stream extraction failed")
907        finally:
908            tar1.close()
909
910class GzipStreamReadTest(GzipTest, StreamReadTest):
911    pass
912
913class Bz2StreamReadTest(Bz2Test, StreamReadTest):
914    pass
915
916class LzmaStreamReadTest(LzmaTest, StreamReadTest):
917    pass
918
919class TarStreamModeReadTest(StreamModeTest, unittest.TestCase):
920
921    def test_stream_mode_no_cache(self):
922        for _ in self.tar:
923            pass
924        self.assertEqual(self.tar.members, [])
925
926class GzipStreamModeReadTest(GzipTest, TarStreamModeReadTest):
927    pass
928
929class Bz2StreamModeReadTest(Bz2Test, TarStreamModeReadTest):
930    pass
931
932class LzmaStreamModeReadTest(LzmaTest, TarStreamModeReadTest):
933    pass
934
935class DetectReadTest(TarTest, unittest.TestCase):
936    def _testfunc_file(self, name, mode):
937        try:
938            tar = tarfile.open(name, mode)
939        except tarfile.ReadError as e:
940            self.fail()
941        else:
942            tar.close()
943
944    def _testfunc_fileobj(self, name, mode):
945        try:
946            with open(name, "rb") as f:
947                tar = tarfile.open(name, mode, fileobj=f)
948        except tarfile.ReadError as e:
949            self.fail()
950        else:
951            tar.close()
952
953    def _test_modes(self, testfunc):
954        if self.suffix:
955            with self.assertRaises(tarfile.ReadError):
956                tarfile.open(tarname, mode="r:" + self.suffix)
957            with self.assertRaises(tarfile.ReadError):
958                tarfile.open(tarname, mode="r|" + self.suffix)
959            with self.assertRaises(tarfile.ReadError):
960                tarfile.open(self.tarname, mode="r:")
961            with self.assertRaises(tarfile.ReadError):
962                tarfile.open(self.tarname, mode="r|")
963        testfunc(self.tarname, "r")
964        testfunc(self.tarname, "r:" + self.suffix)
965        testfunc(self.tarname, "r:*")
966        testfunc(self.tarname, "r|" + self.suffix)
967        testfunc(self.tarname, "r|*")
968
969    def test_detect_file(self):
970        self._test_modes(self._testfunc_file)
971
972    def test_detect_fileobj(self):
973        self._test_modes(self._testfunc_fileobj)
974
975class GzipDetectReadTest(GzipTest, DetectReadTest):
976    pass
977
978class Bz2DetectReadTest(Bz2Test, DetectReadTest):
979    def test_detect_stream_bz2(self):
980        # Originally, tarfile's stream detection looked for the string
981        # "BZh91" at the start of the file. This is incorrect because
982        # the '9' represents the blocksize (900,000 bytes). If the file was
983        # compressed using another blocksize autodetection fails.
984        with open(tarname, "rb") as fobj:
985            data = fobj.read()
986
987        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
988        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
989            fobj.write(data)
990
991        self._testfunc_file(tmpname, "r|*")
992
993class LzmaDetectReadTest(LzmaTest, DetectReadTest):
994    pass
995
996
997class GzipBrokenHeaderCorrectException(GzipTest, unittest.TestCase):
998    """
999    See: https://github.com/python/cpython/issues/107396
1000    """
1001    def runTest(self):
1002        f = io.BytesIO(
1003            b'\x1f\x8b'  # header
1004            b'\x08'  # compression method
1005            b'\x04'  # flags
1006            b'\0\0\0\0\0\0'  # timestamp, compression data, OS ID
1007            b'\0\x01'  # size
1008            b'\0\0\0\0\0'  # corrupt data (zeros)
1009        )
1010        with self.assertRaises(tarfile.ReadError):
1011            tarfile.open(fileobj=f, mode='r|gz')
1012
1013
1014class MemberReadTest(ReadTest, unittest.TestCase):
1015
1016    def _test_member(self, tarinfo, chksum=None, **kwargs):
1017        if chksum is not None:
1018            with self.tar.extractfile(tarinfo) as f:
1019                self.assertEqual(sha256sum(f.read()), chksum,
1020                        "wrong sha256sum for %s" % tarinfo.name)
1021
1022        kwargs["mtime"] = 0o7606136617
1023        kwargs["uid"] = 1000
1024        kwargs["gid"] = 100
1025        if "old-v7" not in tarinfo.name:
1026            # V7 tar can't handle alphabetic owners.
1027            kwargs["uname"] = "tarfile"
1028            kwargs["gname"] = "tarfile"
1029        for k, v in kwargs.items():
1030            self.assertEqual(getattr(tarinfo, k), v,
1031                    "wrong value in %s field of %s" % (k, tarinfo.name))
1032
1033    def test_find_regtype(self):
1034        tarinfo = self.tar.getmember("ustar/regtype")
1035        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1036
1037    def test_find_conttype(self):
1038        tarinfo = self.tar.getmember("ustar/conttype")
1039        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1040
1041    def test_find_dirtype(self):
1042        tarinfo = self.tar.getmember("ustar/dirtype")
1043        self._test_member(tarinfo, size=0)
1044
1045    def test_find_dirtype_with_size(self):
1046        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
1047        self._test_member(tarinfo, size=255)
1048
1049    def test_find_lnktype(self):
1050        tarinfo = self.tar.getmember("ustar/lnktype")
1051        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
1052
1053    def test_find_symtype(self):
1054        tarinfo = self.tar.getmember("ustar/symtype")
1055        self._test_member(tarinfo, size=0, linkname="regtype")
1056
1057    def test_find_blktype(self):
1058        tarinfo = self.tar.getmember("ustar/blktype")
1059        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
1060
1061    def test_find_chrtype(self):
1062        tarinfo = self.tar.getmember("ustar/chrtype")
1063        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
1064
1065    def test_find_fifotype(self):
1066        tarinfo = self.tar.getmember("ustar/fifotype")
1067        self._test_member(tarinfo, size=0)
1068
1069    def test_find_sparse(self):
1070        tarinfo = self.tar.getmember("ustar/sparse")
1071        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
1072
1073    def test_find_gnusparse(self):
1074        tarinfo = self.tar.getmember("gnu/sparse")
1075        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
1076
1077    def test_find_gnusparse_00(self):
1078        tarinfo = self.tar.getmember("gnu/sparse-0.0")
1079        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
1080
1081    def test_find_gnusparse_01(self):
1082        tarinfo = self.tar.getmember("gnu/sparse-0.1")
1083        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
1084
1085    def test_find_gnusparse_10(self):
1086        tarinfo = self.tar.getmember("gnu/sparse-1.0")
1087        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
1088
1089    def test_find_umlauts(self):
1090        tarinfo = self.tar.getmember("ustar/umlauts-"
1091                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1092        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1093
1094    def test_find_ustar_longname(self):
1095        name = "ustar/" + "12345/" * 39 + "1234567/longname"
1096        self.assertIn(name, self.tar.getnames())
1097
1098    def test_find_regtype_oldv7(self):
1099        tarinfo = self.tar.getmember("misc/regtype-old-v7")
1100        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1101
1102    def test_find_pax_umlauts(self):
1103        self.tar.close()
1104        self.tar = tarfile.open(self.tarname, mode=self.mode,
1105                                encoding="iso8859-1")
1106        tarinfo = self.tar.getmember("pax/umlauts-"
1107                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1108        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1109
1110
1111class LongnameTest:
1112
1113    def test_read_longname(self):
1114        # Test reading of longname (bug #1471427).
1115        longname = self.subdir + "/" + "123/" * 125 + "longname"
1116        try:
1117            tarinfo = self.tar.getmember(longname)
1118        except KeyError:
1119            self.fail("longname not found")
1120        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
1121                "read longname as dirtype")
1122
1123    def test_read_longlink(self):
1124        longname = self.subdir + "/" + "123/" * 125 + "longname"
1125        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
1126        try:
1127            tarinfo = self.tar.getmember(longlink)
1128        except KeyError:
1129            self.fail("longlink not found")
1130        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
1131
1132    def test_truncated_longname(self):
1133        longname = self.subdir + "/" + "123/" * 125 + "longname"
1134        tarinfo = self.tar.getmember(longname)
1135        offset = tarinfo.offset
1136        self.tar.fileobj.seek(offset)
1137        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
1138        with self.assertRaises(tarfile.ReadError):
1139            tarfile.open(name="foo.tar", fileobj=fobj)
1140
1141    def test_header_offset(self):
1142        # Test if the start offset of the TarInfo object includes
1143        # the preceding extended header.
1144        longname = self.subdir + "/" + "123/" * 125 + "longname"
1145        offset = self.tar.getmember(longname).offset
1146        with open(tarname, "rb") as fobj:
1147            fobj.seek(offset)
1148            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
1149                                              "iso8859-1", "strict")
1150            self.assertEqual(tarinfo.type, self.longnametype)
1151
1152    def test_longname_directory(self):
1153        # Test reading a longlink directory. Issue #47231.
1154        longdir = ('a' * 101) + '/'
1155        with os_helper.temp_cwd():
1156            with tarfile.open(tmpname, 'w') as tar:
1157                tar.format = self.format
1158                try:
1159                    os.mkdir(longdir)
1160                    tar.add(longdir)
1161                finally:
1162                    os.rmdir(longdir.rstrip("/"))
1163            with tarfile.open(tmpname) as tar:
1164                self.assertIsNotNone(tar.getmember(longdir))
1165                self.assertIsNotNone(tar.getmember(longdir.removesuffix('/')))
1166
1167class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
1168
1169    subdir = "gnu"
1170    longnametype = tarfile.GNUTYPE_LONGNAME
1171    format = tarfile.GNU_FORMAT
1172
1173    # Since 3.2 tarfile is supposed to accurately restore sparse members and
1174    # produce files with holes. This is what we actually want to test here.
1175    # Unfortunately, not all platforms/filesystems support sparse files, and
1176    # even on platforms that do it is non-trivial to make reliable assertions
1177    # about holes in files. Therefore, we first do one basic test which works
1178    # an all platforms, and after that a test that will work only on
1179    # platforms/filesystems that prove to support sparse files.
1180    def _test_sparse_file(self, name):
1181        self.tar.extract(name, TEMPDIR, filter='data')
1182        filename = os.path.join(TEMPDIR, name)
1183        with open(filename, "rb") as fobj:
1184            data = fobj.read()
1185        self.assertEqual(sha256sum(data), sha256_sparse,
1186                "wrong sha256sum for %s" % name)
1187
1188        if self._fs_supports_holes():
1189            s = os.stat(filename)
1190            self.assertLess(s.st_blocks * 512, s.st_size)
1191
1192    def test_sparse_file_old(self):
1193        self._test_sparse_file("gnu/sparse")
1194
1195    def test_sparse_file_00(self):
1196        self._test_sparse_file("gnu/sparse-0.0")
1197
1198    def test_sparse_file_01(self):
1199        self._test_sparse_file("gnu/sparse-0.1")
1200
1201    def test_sparse_file_10(self):
1202        self._test_sparse_file("gnu/sparse-1.0")
1203
1204    @staticmethod
1205    def _fs_supports_holes():
1206        # Return True if the platform knows the st_blocks stat attribute and
1207        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1208        # store holes of 4 KiB in files.
1209        #
1210        # The function returns False if page size is larger than 4 KiB.
1211        # For example, ppc64 uses pages of 64 KiB.
1212        if sys.platform.startswith(("linux", "android")):
1213            # Linux evidentially has 512 byte st_blocks units.
1214            name = os.path.join(TEMPDIR, "sparse-test")
1215            with open(name, "wb") as fobj:
1216                # Seek to "punch a hole" of 4 KiB
1217                fobj.seek(4096)
1218                fobj.write(b'x' * 4096)
1219                fobj.truncate()
1220            s = os.stat(name)
1221            os_helper.unlink(name)
1222            return (s.st_blocks * 512 < s.st_size)
1223        else:
1224            return False
1225
1226
1227class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1228
1229    subdir = "pax"
1230    longnametype = tarfile.XHDTYPE
1231    format = tarfile.PAX_FORMAT
1232
1233    def test_pax_global_headers(self):
1234        tar = tarfile.open(tarname, encoding="iso8859-1")
1235        try:
1236            tarinfo = tar.getmember("pax/regtype1")
1237            self.assertEqual(tarinfo.uname, "foo")
1238            self.assertEqual(tarinfo.gname, "bar")
1239            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1240                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1241
1242            tarinfo = tar.getmember("pax/regtype2")
1243            self.assertEqual(tarinfo.uname, "")
1244            self.assertEqual(tarinfo.gname, "bar")
1245            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1246                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1247
1248            tarinfo = tar.getmember("pax/regtype3")
1249            self.assertEqual(tarinfo.uname, "tarfile")
1250            self.assertEqual(tarinfo.gname, "tarfile")
1251            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1252                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1253        finally:
1254            tar.close()
1255
1256    def test_pax_number_fields(self):
1257        # All following number fields are read from the pax header.
1258        tar = tarfile.open(tarname, encoding="iso8859-1")
1259        try:
1260            tarinfo = tar.getmember("pax/regtype4")
1261            self.assertEqual(tarinfo.size, 7011)
1262            self.assertEqual(tarinfo.uid, 123)
1263            self.assertEqual(tarinfo.gid, 123)
1264            self.assertEqual(tarinfo.mtime, 1041808783.0)
1265            self.assertEqual(type(tarinfo.mtime), float)
1266            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1267            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1268        finally:
1269            tar.close()
1270
1271    def test_pax_header_bad_formats(self):
1272        # The fields from the pax header have priority over the
1273        # TarInfo.
1274        pax_header_replacements = (
1275            b" foo=bar\n",
1276            b"0 \n",
1277            b"1 \n",
1278            b"2 \n",
1279            b"3 =\n",
1280            b"4 =a\n",
1281            b"1000000 foo=bar\n",
1282            b"0 foo=bar\n",
1283            b"-12 foo=bar\n",
1284            b"000000000000000000000000036 foo=bar\n",
1285        )
1286        pax_headers = {"foo": "bar"}
1287
1288        for replacement in pax_header_replacements:
1289            with self.subTest(header=replacement):
1290                tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1291                                   encoding="iso8859-1")
1292                try:
1293                    t = tarfile.TarInfo()
1294                    t.name = "pax"  # non-ASCII
1295                    t.uid = 1
1296                    t.pax_headers = pax_headers
1297                    tar.addfile(t)
1298                finally:
1299                    tar.close()
1300
1301                with open(tmpname, "rb") as f:
1302                    data = f.read()
1303                    self.assertIn(b"11 foo=bar\n", data)
1304                    data = data.replace(b"11 foo=bar\n", replacement)
1305
1306                with open(tmpname, "wb") as f:
1307                    f.truncate()
1308                    f.write(data)
1309
1310                with self.assertRaisesRegex(tarfile.ReadError, r"method tar: ReadError\('invalid header'\)"):
1311                    tarfile.open(tmpname, encoding="iso8859-1")
1312
1313
1314class WriteTestBase(TarTest):
1315    # Put all write tests in here that are supposed to be tested
1316    # in all possible mode combinations.
1317
1318    def test_fileobj_no_close(self):
1319        fobj = io.BytesIO()
1320        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1321            tar.addfile(tarfile.TarInfo("foo"))
1322        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1323        # Issue #20238: Incomplete gzip output with mode="w:gz"
1324        data = fobj.getvalue()
1325        del tar
1326        support.gc_collect()
1327        self.assertFalse(fobj.closed)
1328        self.assertEqual(data, fobj.getvalue())
1329
1330    def test_eof_marker(self):
1331        # Make sure an end of archive marker is written (two zero blocks).
1332        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1333        # So, we create an archive that has exactly 10240 bytes without the
1334        # marker, and has 20480 bytes once the marker is written.
1335        with tarfile.open(tmpname, self.mode) as tar:
1336            t = tarfile.TarInfo("foo")
1337            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1338            tar.addfile(t, io.BytesIO(b"a" * t.size))
1339
1340        with self.open(tmpname, "rb") as fobj:
1341            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1342
1343
1344class WriteTest(WriteTestBase, unittest.TestCase):
1345
1346    prefix = "w:"
1347
1348    def test_100_char_name(self):
1349        # The name field in a tar header stores strings of at most 100 chars.
1350        # If a string is shorter than 100 chars it has to be padded with '\0',
1351        # which implies that a string of exactly 100 chars is stored without
1352        # a trailing '\0'.
1353        name = "0123456789" * 10
1354        tar = tarfile.open(tmpname, self.mode)
1355        try:
1356            t = tarfile.TarInfo(name)
1357            tar.addfile(t)
1358        finally:
1359            tar.close()
1360
1361        tar = tarfile.open(tmpname)
1362        try:
1363            self.assertEqual(tar.getnames()[0], name,
1364                    "failed to store 100 char filename")
1365        finally:
1366            tar.close()
1367
1368    def test_tar_size(self):
1369        # Test for bug #1013882.
1370        tar = tarfile.open(tmpname, self.mode)
1371        try:
1372            path = os.path.join(TEMPDIR, "file")
1373            with open(path, "wb") as fobj:
1374                fobj.write(b"aaa")
1375            tar.add(path)
1376        finally:
1377            tar.close()
1378        self.assertGreater(os.path.getsize(tmpname), 0,
1379                "tarfile is empty")
1380
1381    # The test_*_size tests test for bug #1167128.
1382    def test_file_size(self):
1383        tar = tarfile.open(tmpname, self.mode)
1384        try:
1385            path = os.path.join(TEMPDIR, "file")
1386            with open(path, "wb"):
1387                pass
1388            tarinfo = tar.gettarinfo(path)
1389            self.assertEqual(tarinfo.size, 0)
1390
1391            with open(path, "wb") as fobj:
1392                fobj.write(b"aaa")
1393            tarinfo = tar.gettarinfo(path)
1394            self.assertEqual(tarinfo.size, 3)
1395        finally:
1396            tar.close()
1397
1398    def test_directory_size(self):
1399        path = os.path.join(TEMPDIR, "directory")
1400        os.mkdir(path)
1401        try:
1402            tar = tarfile.open(tmpname, self.mode)
1403            try:
1404                tarinfo = tar.gettarinfo(path)
1405                self.assertEqual(tarinfo.size, 0)
1406            finally:
1407                tar.close()
1408        finally:
1409            os_helper.rmdir(path)
1410
1411    # mock the following:
1412    #  os.listdir: so we know that files are in the wrong order
1413    def test_ordered_recursion(self):
1414        path = os.path.join(TEMPDIR, "directory")
1415        os.mkdir(path)
1416        open(os.path.join(path, "1"), "a").close()
1417        open(os.path.join(path, "2"), "a").close()
1418        try:
1419            tar = tarfile.open(tmpname, self.mode)
1420            try:
1421                with unittest.mock.patch('os.listdir') as mock_listdir:
1422                    mock_listdir.return_value = ["2", "1"]
1423                    tar.add(path)
1424                paths = []
1425                for m in tar.getmembers():
1426                    paths.append(os.path.split(m.name)[-1])
1427                self.assertEqual(paths, ["directory", "1", "2"]);
1428            finally:
1429                tar.close()
1430        finally:
1431            os_helper.unlink(os.path.join(path, "1"))
1432            os_helper.unlink(os.path.join(path, "2"))
1433            os_helper.rmdir(path)
1434
1435    def test_gettarinfo_pathlike_name(self):
1436        with tarfile.open(tmpname, self.mode) as tar:
1437            path = os.path.join(TEMPDIR, "file")
1438            with open(path, "wb") as fobj:
1439                fobj.write(b"aaa")
1440            tarinfo = tar.gettarinfo(os_helper.FakePath(path))
1441            tarinfo2 = tar.gettarinfo(path)
1442            self.assertIsInstance(tarinfo.name, str)
1443            self.assertEqual(tarinfo.name, tarinfo2.name)
1444            self.assertEqual(tarinfo.size, 3)
1445
1446    @unittest.skipUnless(hasattr(os, "link"),
1447                         "Missing hardlink implementation")
1448    def test_link_size(self):
1449        link = os.path.join(TEMPDIR, "link")
1450        target = os.path.join(TEMPDIR, "link_target")
1451        with open(target, "wb") as fobj:
1452            fobj.write(b"aaa")
1453        try:
1454            os.link(target, link)
1455        except PermissionError as e:
1456            self.skipTest('os.link(): %s' % e)
1457        try:
1458            tar = tarfile.open(tmpname, self.mode)
1459            try:
1460                # Record the link target in the inodes list.
1461                tar.gettarinfo(target)
1462                tarinfo = tar.gettarinfo(link)
1463                self.assertEqual(tarinfo.size, 0)
1464            finally:
1465                tar.close()
1466        finally:
1467            os_helper.unlink(target)
1468            os_helper.unlink(link)
1469
1470    @os_helper.skip_unless_symlink
1471    def test_symlink_size(self):
1472        path = os.path.join(TEMPDIR, "symlink")
1473        os.symlink("link_target", path)
1474        try:
1475            tar = tarfile.open(tmpname, self.mode)
1476            try:
1477                tarinfo = tar.gettarinfo(path)
1478                self.assertEqual(tarinfo.size, 0)
1479            finally:
1480                tar.close()
1481        finally:
1482            os_helper.unlink(path)
1483
1484    def test_add_self(self):
1485        # Test for #1257255.
1486        dstname = os.path.abspath(tmpname)
1487        tar = tarfile.open(tmpname, self.mode)
1488        try:
1489            self.assertEqual(tar.name, dstname,
1490                    "archive name must be absolute")
1491            tar.add(dstname)
1492            self.assertEqual(tar.getnames(), [],
1493                    "added the archive to itself")
1494
1495            with os_helper.change_cwd(TEMPDIR):
1496                tar.add(dstname)
1497            self.assertEqual(tar.getnames(), [],
1498                    "added the archive to itself")
1499        finally:
1500            tar.close()
1501
1502    def test_filter(self):
1503        tempdir = os.path.join(TEMPDIR, "filter")
1504        os.mkdir(tempdir)
1505        try:
1506            for name in ("foo", "bar", "baz"):
1507                name = os.path.join(tempdir, name)
1508                os_helper.create_empty_file(name)
1509
1510            def filter(tarinfo):
1511                if os.path.basename(tarinfo.name) == "bar":
1512                    return
1513                tarinfo.uid = 123
1514                tarinfo.uname = "foo"
1515                return tarinfo
1516
1517            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1518            try:
1519                tar.add(tempdir, arcname="empty_dir", filter=filter)
1520            finally:
1521                tar.close()
1522
1523            # Verify that filter is a keyword-only argument
1524            with self.assertRaises(TypeError):
1525                tar.add(tempdir, "empty_dir", True, None, filter)
1526
1527            tar = tarfile.open(tmpname, "r")
1528            try:
1529                for tarinfo in tar:
1530                    self.assertEqual(tarinfo.uid, 123)
1531                    self.assertEqual(tarinfo.uname, "foo")
1532                self.assertEqual(len(tar.getmembers()), 3)
1533            finally:
1534                tar.close()
1535        finally:
1536            os_helper.rmtree(tempdir)
1537
1538    # Guarantee that stored pathnames are not modified. Don't
1539    # remove ./ or ../ or double slashes. Still make absolute
1540    # pathnames relative.
1541    # For details see bug #6054.
1542    def _test_pathname(self, path, cmp_path=None, dir=False):
1543        # Create a tarfile with an empty member named path
1544        # and compare the stored name with the original.
1545        foo = os.path.join(TEMPDIR, "foo")
1546        if not dir:
1547            os_helper.create_empty_file(foo)
1548        else:
1549            os.mkdir(foo)
1550
1551        tar = tarfile.open(tmpname, self.mode)
1552        try:
1553            tar.add(foo, arcname=path)
1554        finally:
1555            tar.close()
1556
1557        tar = tarfile.open(tmpname, "r")
1558        try:
1559            t = tar.next()
1560        finally:
1561            tar.close()
1562
1563        if not dir:
1564            os_helper.unlink(foo)
1565        else:
1566            os_helper.rmdir(foo)
1567
1568        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1569
1570
1571    @os_helper.skip_unless_symlink
1572    def test_extractall_symlinks(self):
1573        # Test if extractall works properly when tarfile contains symlinks
1574        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1575        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1576        os.mkdir(tempdir)
1577        try:
1578            source_file = os.path.join(tempdir,'source')
1579            target_file = os.path.join(tempdir,'symlink')
1580            with open(source_file,'w') as f:
1581                f.write('something\n')
1582            os.symlink(source_file, target_file)
1583            with tarfile.open(temparchive, 'w') as tar:
1584                tar.add(source_file, arcname="source")
1585                tar.add(target_file, arcname="symlink")
1586            # Let's extract it to the location which contains the symlink
1587            with tarfile.open(temparchive, errorlevel=2) as tar:
1588                # this should not raise OSError: [Errno 17] File exists
1589                try:
1590                    tar.extractall(path=tempdir,
1591                                   filter='fully_trusted')
1592                except OSError:
1593                    self.fail("extractall failed with symlinked files")
1594        finally:
1595            os_helper.unlink(temparchive)
1596            os_helper.rmtree(tempdir)
1597
1598    def test_pathnames(self):
1599        self._test_pathname("foo")
1600        self._test_pathname(os.path.join("foo", ".", "bar"))
1601        self._test_pathname(os.path.join("foo", "..", "bar"))
1602        self._test_pathname(os.path.join(".", "foo"))
1603        self._test_pathname(os.path.join(".", "foo", "."))
1604        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1605        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1606        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1607        self._test_pathname(os.path.join("..", "foo"))
1608        self._test_pathname(os.path.join("..", "foo", ".."))
1609        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1610        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1611
1612        self._test_pathname("foo" + os.sep + os.sep + "bar")
1613        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1614
1615    def test_abs_pathnames(self):
1616        if sys.platform == "win32":
1617            self._test_pathname("C:\\foo", "foo")
1618        else:
1619            self._test_pathname("/foo", "foo")
1620            self._test_pathname("///foo", "foo")
1621
1622    def test_cwd(self):
1623        # Test adding the current working directory.
1624        with os_helper.change_cwd(TEMPDIR):
1625            tar = tarfile.open(tmpname, self.mode)
1626            try:
1627                tar.add(".")
1628            finally:
1629                tar.close()
1630
1631            tar = tarfile.open(tmpname, "r")
1632            try:
1633                for t in tar:
1634                    if t.name != ".":
1635                        self.assertTrue(t.name.startswith("./"), t.name)
1636            finally:
1637                tar.close()
1638
1639    def test_open_nonwritable_fileobj(self):
1640        for exctype in OSError, EOFError, RuntimeError:
1641            class BadFile(io.BytesIO):
1642                first = True
1643                def write(self, data):
1644                    if self.first:
1645                        self.first = False
1646                        raise exctype
1647
1648            f = BadFile()
1649            with self.assertRaises(exctype):
1650                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1651                                   format=tarfile.PAX_FORMAT,
1652                                   pax_headers={'non': 'empty'})
1653            self.assertFalse(f.closed)
1654
1655    def test_missing_fileobj(self):
1656        with tarfile.open(tmpname, self.mode) as tar:
1657            tarinfo = tar.gettarinfo(tarname)
1658            with self.assertRaises(ValueError):
1659                tar.addfile(tarinfo)
1660
1661
1662class GzipWriteTest(GzipTest, WriteTest):
1663    pass
1664
1665
1666class Bz2WriteTest(Bz2Test, WriteTest):
1667    pass
1668
1669
1670class LzmaWriteTest(LzmaTest, WriteTest):
1671    pass
1672
1673
1674class StreamWriteTest(WriteTestBase, unittest.TestCase):
1675
1676    prefix = "w|"
1677    decompressor = None
1678
1679    def test_stream_padding(self):
1680        # Test for bug #1543303.
1681        tar = tarfile.open(tmpname, self.mode)
1682        tar.close()
1683        if self.decompressor:
1684            dec = self.decompressor()
1685            with open(tmpname, "rb") as fobj:
1686                data = fobj.read()
1687            data = dec.decompress(data)
1688            self.assertFalse(dec.unused_data, "found trailing data")
1689        else:
1690            with self.open(tmpname) as fobj:
1691                data = fobj.read()
1692        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1693                        "incorrect zero padding")
1694
1695    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1696                         "Missing umask implementation")
1697    @unittest.skipIf(
1698        support.is_emscripten or support.is_wasi,
1699        "Emscripten's/WASI's umask is a stub."
1700    )
1701    def test_file_mode(self):
1702        # Test for issue #8464: Create files with correct
1703        # permissions.
1704        if os.path.exists(tmpname):
1705            os_helper.unlink(tmpname)
1706
1707        original_umask = os.umask(0o022)
1708        try:
1709            tar = tarfile.open(tmpname, self.mode)
1710            tar.close()
1711            mode = os.stat(tmpname).st_mode & 0o777
1712            self.assertEqual(mode, 0o644, "wrong file permissions")
1713        finally:
1714            os.umask(original_umask)
1715
1716
1717class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1718    def test_source_directory_not_leaked(self):
1719        """
1720        Ensure the source directory is not included in the tar header
1721        per bpo-41316.
1722        """
1723        tarfile.open(tmpname, self.mode).close()
1724        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1725        assert os.path.dirname(tmpname) not in payload
1726
1727
1728class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1729    decompressor = bz2.BZ2Decompressor if bz2 else None
1730
1731class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1732    decompressor = lzma.LZMADecompressor if lzma else None
1733
1734class _CompressedWriteTest(TarTest):
1735    # This is not actually a standalone test.
1736    # It does not inherit WriteTest because it only makes sense with gz,bz2
1737    source = (b"And we move to Bristol where they have a special, " +
1738              b"Very Silly candidate")
1739
1740    def _compressed_tar(self, compresslevel):
1741        fobj = io.BytesIO()
1742        with tarfile.open(tmpname, self.mode, fobj,
1743                          compresslevel=compresslevel) as tarfl:
1744            tarfl.addfile(tarfile.TarInfo("foo"), io.BytesIO(self.source))
1745        return fobj
1746
1747    def _test_bz2_header(self, compresslevel):
1748        fobj = self._compressed_tar(compresslevel)
1749        self.assertEqual(fobj.getvalue()[0:10],
1750                         b"BZh%d1AY&SY" % compresslevel)
1751
1752    def _test_gz_header(self, compresslevel):
1753        fobj = self._compressed_tar(compresslevel)
1754        self.assertEqual(fobj.getvalue()[:3], b"\x1f\x8b\x08")
1755
1756class Bz2CompressWriteTest(Bz2Test, _CompressedWriteTest, unittest.TestCase):
1757    prefix = "w:"
1758    def test_compression_levels(self):
1759        self._test_bz2_header(1)
1760        self._test_bz2_header(5)
1761        self._test_bz2_header(9)
1762
1763class Bz2CompressStreamWriteTest(Bz2Test, _CompressedWriteTest,
1764        unittest.TestCase):
1765    prefix = "w|"
1766    def test_compression_levels(self):
1767        self._test_bz2_header(1)
1768        self._test_bz2_header(5)
1769        self._test_bz2_header(9)
1770
1771class GzCompressWriteTest(GzipTest,  _CompressedWriteTest, unittest.TestCase):
1772    prefix = "w:"
1773    def test_compression_levels(self):
1774        self._test_gz_header(1)
1775        self._test_gz_header(5)
1776        self._test_gz_header(9)
1777
1778class GzCompressStreamWriteTest(GzipTest, _CompressedWriteTest,
1779        unittest.TestCase):
1780    prefix = "w|"
1781    def test_compression_levels(self):
1782        self._test_gz_header(1)
1783        self._test_gz_header(5)
1784        self._test_gz_header(9)
1785
1786class CompressLevelRaises(unittest.TestCase):
1787    def test_compresslevel_wrong_modes(self):
1788        compresslevel = 5
1789        fobj = io.BytesIO()
1790        with self.assertRaises(TypeError):
1791            tarfile.open(tmpname, "w:", fobj, compresslevel=compresslevel)
1792
1793    @support.requires_bz2()
1794    def test_wrong_compresslevels(self):
1795        # BZ2 checks that the compresslevel is in [1,9]. gz does not
1796        fobj = io.BytesIO()
1797        with self.assertRaises(ValueError):
1798            tarfile.open(tmpname, "w:bz2", fobj, compresslevel=0)
1799        with self.assertRaises(ValueError):
1800            tarfile.open(tmpname, "w:bz2", fobj, compresslevel=10)
1801        with self.assertRaises(ValueError):
1802            tarfile.open(tmpname, "w|bz2", fobj, compresslevel=10)
1803
1804class GNUWriteTest(unittest.TestCase):
1805    # This testcase checks for correct creation of GNU Longname
1806    # and Longlink extended headers (cp. bug #812325).
1807
1808    def _length(self, s):
1809        blocks = len(s) // 512 + 1
1810        return blocks * 512
1811
1812    def _calc_size(self, name, link=None):
1813        # Initial tar header
1814        count = 512
1815
1816        if len(name) > tarfile.LENGTH_NAME:
1817            # GNU longname extended header + longname
1818            count += 512
1819            count += self._length(name)
1820        if link is not None and len(link) > tarfile.LENGTH_LINK:
1821            # GNU longlink extended header + longlink
1822            count += 512
1823            count += self._length(link)
1824        return count
1825
1826    def _test(self, name, link=None):
1827        tarinfo = tarfile.TarInfo(name)
1828        if link:
1829            tarinfo.linkname = link
1830            tarinfo.type = tarfile.LNKTYPE
1831
1832        tar = tarfile.open(tmpname, "w")
1833        try:
1834            tar.format = tarfile.GNU_FORMAT
1835            tar.addfile(tarinfo)
1836
1837            v1 = self._calc_size(name, link)
1838            v2 = tar.offset
1839            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1840        finally:
1841            tar.close()
1842
1843        tar = tarfile.open(tmpname)
1844        try:
1845            member = tar.next()
1846            self.assertIsNotNone(member,
1847                    "unable to read longname member")
1848            self.assertEqual(tarinfo.name, member.name,
1849                    "unable to read longname member")
1850            self.assertEqual(tarinfo.linkname, member.linkname,
1851                    "unable to read longname member")
1852        finally:
1853            tar.close()
1854
1855    def test_longname_1023(self):
1856        self._test(("longnam/" * 127) + "longnam")
1857
1858    def test_longname_1024(self):
1859        self._test(("longnam/" * 127) + "longname")
1860
1861    def test_longname_1025(self):
1862        self._test(("longnam/" * 127) + "longname_")
1863
1864    def test_longlink_1023(self):
1865        self._test("name", ("longlnk/" * 127) + "longlnk")
1866
1867    def test_longlink_1024(self):
1868        self._test("name", ("longlnk/" * 127) + "longlink")
1869
1870    def test_longlink_1025(self):
1871        self._test("name", ("longlnk/" * 127) + "longlink_")
1872
1873    def test_longnamelink_1023(self):
1874        self._test(("longnam/" * 127) + "longnam",
1875                   ("longlnk/" * 127) + "longlnk")
1876
1877    def test_longnamelink_1024(self):
1878        self._test(("longnam/" * 127) + "longname",
1879                   ("longlnk/" * 127) + "longlink")
1880
1881    def test_longnamelink_1025(self):
1882        self._test(("longnam/" * 127) + "longname_",
1883                   ("longlnk/" * 127) + "longlink_")
1884
1885
1886class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1887
1888    prefix = "w:"
1889
1890    def test_headers_written_only_for_device_files(self):
1891        # Regression test for bpo-18819.
1892        tempdir = os.path.join(TEMPDIR, "device_header_test")
1893        os.mkdir(tempdir)
1894        try:
1895            tar = tarfile.open(tmpname, self.mode)
1896            try:
1897                input_blk = tarfile.TarInfo(name="my_block_device")
1898                input_reg = tarfile.TarInfo(name="my_regular_file")
1899                input_blk.type = tarfile.BLKTYPE
1900                input_reg.type = tarfile.REGTYPE
1901                tar.addfile(input_blk)
1902                tar.addfile(input_reg)
1903            finally:
1904                tar.close()
1905
1906            # devmajor and devminor should be *interpreted* as 0 in both...
1907            tar = tarfile.open(tmpname, "r")
1908            try:
1909                output_blk = tar.getmember("my_block_device")
1910                output_reg = tar.getmember("my_regular_file")
1911            finally:
1912                tar.close()
1913            self.assertEqual(output_blk.devmajor, 0)
1914            self.assertEqual(output_blk.devminor, 0)
1915            self.assertEqual(output_reg.devmajor, 0)
1916            self.assertEqual(output_reg.devminor, 0)
1917
1918            # ...but the fields should not actually be set on regular files:
1919            with open(tmpname, "rb") as infile:
1920                buf = infile.read()
1921            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1922            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1923            # See `struct posixheader` in GNU docs for byte offsets:
1924            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1925            device_headers = slice(329, 329 + 16)
1926            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1927            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1928        finally:
1929            os_helper.rmtree(tempdir)
1930
1931
1932class CreateTest(WriteTestBase, unittest.TestCase):
1933
1934    prefix = "x:"
1935
1936    file_path = os.path.join(TEMPDIR, "spameggs42")
1937
1938    def setUp(self):
1939        os_helper.unlink(tmpname)
1940
1941    @classmethod
1942    def setUpClass(cls):
1943        with open(cls.file_path, "wb") as fobj:
1944            fobj.write(b"aaa")
1945
1946    @classmethod
1947    def tearDownClass(cls):
1948        os_helper.unlink(cls.file_path)
1949
1950    def test_create(self):
1951        with tarfile.open(tmpname, self.mode) as tobj:
1952            tobj.add(self.file_path)
1953
1954        with self.taropen(tmpname) as tobj:
1955            names = tobj.getnames()
1956        self.assertEqual(len(names), 1)
1957        self.assertIn('spameggs42', names[0])
1958
1959    def test_create_existing(self):
1960        with tarfile.open(tmpname, self.mode) as tobj:
1961            tobj.add(self.file_path)
1962
1963        with self.assertRaises(FileExistsError):
1964            tobj = tarfile.open(tmpname, self.mode)
1965
1966        with self.taropen(tmpname) as tobj:
1967            names = tobj.getnames()
1968        self.assertEqual(len(names), 1)
1969        self.assertIn('spameggs42', names[0])
1970
1971    def test_create_taropen(self):
1972        with self.taropen(tmpname, "x") as tobj:
1973            tobj.add(self.file_path)
1974
1975        with self.taropen(tmpname) as tobj:
1976            names = tobj.getnames()
1977        self.assertEqual(len(names), 1)
1978        self.assertIn('spameggs42', names[0])
1979
1980    def test_create_existing_taropen(self):
1981        with self.taropen(tmpname, "x") as tobj:
1982            tobj.add(self.file_path)
1983
1984        with self.assertRaises(FileExistsError):
1985            with self.taropen(tmpname, "x"):
1986                pass
1987
1988        with self.taropen(tmpname) as tobj:
1989            names = tobj.getnames()
1990        self.assertEqual(len(names), 1)
1991        self.assertIn("spameggs42", names[0])
1992
1993    def test_create_pathlike_name(self):
1994        with tarfile.open(os_helper.FakePath(tmpname), self.mode) as tobj:
1995            self.assertIsInstance(tobj.name, str)
1996            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1997            tobj.add(os_helper.FakePath(self.file_path))
1998            names = tobj.getnames()
1999        self.assertEqual(len(names), 1)
2000        self.assertIn('spameggs42', names[0])
2001
2002        with self.taropen(tmpname) as tobj:
2003            names = tobj.getnames()
2004        self.assertEqual(len(names), 1)
2005        self.assertIn('spameggs42', names[0])
2006
2007    def test_create_taropen_pathlike_name(self):
2008        with self.taropen(os_helper.FakePath(tmpname), "x") as tobj:
2009            self.assertIsInstance(tobj.name, str)
2010            self.assertEqual(tobj.name, os.path.abspath(tmpname))
2011            tobj.add(os_helper.FakePath(self.file_path))
2012            names = tobj.getnames()
2013        self.assertEqual(len(names), 1)
2014        self.assertIn('spameggs42', names[0])
2015
2016        with self.taropen(tmpname) as tobj:
2017            names = tobj.getnames()
2018        self.assertEqual(len(names), 1)
2019        self.assertIn('spameggs42', names[0])
2020
2021
2022class GzipCreateTest(GzipTest, CreateTest):
2023
2024    def test_create_with_compresslevel(self):
2025        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
2026            tobj.add(self.file_path)
2027        with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
2028            pass
2029
2030
2031class Bz2CreateTest(Bz2Test, CreateTest):
2032
2033    def test_create_with_compresslevel(self):
2034        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
2035            tobj.add(self.file_path)
2036        with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj:
2037            pass
2038
2039
2040class LzmaCreateTest(LzmaTest, CreateTest):
2041
2042    # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel.
2043    # It does not allow for preset to be specified when reading.
2044    def test_create_with_preset(self):
2045        with tarfile.open(tmpname, self.mode, preset=1) as tobj:
2046            tobj.add(self.file_path)
2047
2048
2049class CreateWithXModeTest(CreateTest):
2050
2051    prefix = "x"
2052
2053    test_create_taropen = None
2054    test_create_existing_taropen = None
2055
2056
2057@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
2058class HardlinkTest(unittest.TestCase):
2059    # Test the creation of LNKTYPE (hardlink) members in an archive.
2060
2061    def setUp(self):
2062        self.foo = os.path.join(TEMPDIR, "foo")
2063        self.bar = os.path.join(TEMPDIR, "bar")
2064
2065        with open(self.foo, "wb") as fobj:
2066            fobj.write(b"foo")
2067
2068        try:
2069            os.link(self.foo, self.bar)
2070        except PermissionError as e:
2071            self.skipTest('os.link(): %s' % e)
2072
2073        self.tar = tarfile.open(tmpname, "w")
2074        self.tar.add(self.foo)
2075
2076    def tearDown(self):
2077        self.tar.close()
2078        os_helper.unlink(self.foo)
2079        os_helper.unlink(self.bar)
2080
2081    def test_add_twice(self):
2082        # The same name will be added as a REGTYPE every
2083        # time regardless of st_nlink.
2084        tarinfo = self.tar.gettarinfo(self.foo)
2085        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
2086                "add file as regular failed")
2087
2088    def test_add_hardlink(self):
2089        tarinfo = self.tar.gettarinfo(self.bar)
2090        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
2091                "add file as hardlink failed")
2092
2093    def test_dereference_hardlink(self):
2094        self.tar.dereference = True
2095        tarinfo = self.tar.gettarinfo(self.bar)
2096        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
2097                "dereferencing hardlink failed")
2098
2099
2100class PaxWriteTest(GNUWriteTest):
2101
2102    def _test(self, name, link=None):
2103        # See GNUWriteTest.
2104        tarinfo = tarfile.TarInfo(name)
2105        if link:
2106            tarinfo.linkname = link
2107            tarinfo.type = tarfile.LNKTYPE
2108
2109        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
2110        try:
2111            tar.addfile(tarinfo)
2112        finally:
2113            tar.close()
2114
2115        tar = tarfile.open(tmpname)
2116        try:
2117            if link:
2118                l = tar.getmembers()[0].linkname
2119                self.assertEqual(link, l, "PAX longlink creation failed")
2120            else:
2121                n = tar.getmembers()[0].name
2122                self.assertEqual(name, n, "PAX longname creation failed")
2123        finally:
2124            tar.close()
2125
2126    def test_pax_global_header(self):
2127        pax_headers = {
2128                "foo": "bar",
2129                "uid": "0",
2130                "mtime": "1.23",
2131                "test": "\xe4\xf6\xfc",
2132                "\xe4\xf6\xfc": "test"}
2133
2134        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
2135                pax_headers=pax_headers)
2136        try:
2137            tar.addfile(tarfile.TarInfo("test"))
2138        finally:
2139            tar.close()
2140
2141        # Test if the global header was written correctly.
2142        tar = tarfile.open(tmpname, encoding="iso8859-1")
2143        try:
2144            self.assertEqual(tar.pax_headers, pax_headers)
2145            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
2146            # Test if all the fields are strings.
2147            for key, val in tar.pax_headers.items():
2148                self.assertIsNot(type(key), bytes)
2149                self.assertIsNot(type(val), bytes)
2150                if key in tarfile.PAX_NUMBER_FIELDS:
2151                    try:
2152                        tarfile.PAX_NUMBER_FIELDS[key](val)
2153                    except (TypeError, ValueError):
2154                        self.fail("unable to convert pax header field")
2155        finally:
2156            tar.close()
2157
2158    def test_pax_extended_header(self):
2159        # The fields from the pax header have priority over the
2160        # TarInfo.
2161        pax_headers = {"path": "foo", "uid": "123"}
2162
2163        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
2164                           encoding="iso8859-1")
2165        try:
2166            t = tarfile.TarInfo()
2167            t.name = "\xe4\xf6\xfc" # non-ASCII
2168            t.uid = 8**8 # too large
2169            t.pax_headers = pax_headers
2170            tar.addfile(t)
2171        finally:
2172            tar.close()
2173
2174        tar = tarfile.open(tmpname, encoding="iso8859-1")
2175        try:
2176            t = tar.getmembers()[0]
2177            self.assertEqual(t.pax_headers, pax_headers)
2178            self.assertEqual(t.name, "foo")
2179            self.assertEqual(t.uid, 123)
2180        finally:
2181            tar.close()
2182
2183    def test_create_pax_header(self):
2184        # The ustar header should contain values that can be
2185        # represented reasonably, even if a better (e.g. higher
2186        # precision) version is set in the pax header.
2187        # Issue #45863
2188
2189        # values that should be kept
2190        t = tarfile.TarInfo()
2191        t.name = "foo"
2192        t.mtime = 1000.1
2193        t.size = 100
2194        t.uid = 123
2195        t.gid = 124
2196        info = t.get_info()
2197        header = t.create_pax_header(info, encoding="iso8859-1")
2198        self.assertEqual(info['name'], "foo")
2199        # mtime should be rounded to nearest second
2200        self.assertIsInstance(info['mtime'], int)
2201        self.assertEqual(info['mtime'], 1000)
2202        self.assertEqual(info['size'], 100)
2203        self.assertEqual(info['uid'], 123)
2204        self.assertEqual(info['gid'], 124)
2205        self.assertEqual(header,
2206            b'././@PaxHeader' + bytes(86) \
2207            + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \
2208            + bytes(100) + b'ustar\x0000'+ bytes(247) \
2209            + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \
2210            + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \
2211            + bytes(100) + b'ustar\x0000' + bytes(247))
2212
2213        # values that should be changed
2214        t = tarfile.TarInfo()
2215        t.name = "foo\u3374" # can't be represented in ascii
2216        t.mtime = 10**10 # too big
2217        t.size = 10**10 # too big
2218        t.uid = 8**8 # too big
2219        t.gid = 8**8+1 # too big
2220        info = t.get_info()
2221        header = t.create_pax_header(info, encoding="iso8859-1")
2222        # name is kept as-is in info but should be added to pax header
2223        self.assertEqual(info['name'], "foo\u3374")
2224        self.assertEqual(info['mtime'], 0)
2225        self.assertEqual(info['size'], 0)
2226        self.assertEqual(info['uid'], 0)
2227        self.assertEqual(info['gid'], 0)
2228        self.assertEqual(header,
2229            b'././@PaxHeader' + bytes(86) \
2230            + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \
2231            + bytes(100) + b'ustar\x0000' + bytes(247) \
2232            + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \
2233            + b'16 gid=16777217\n20 size=10000000000\n' \
2234            + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \
2235            + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \
2236            + bytes(100) + b'ustar\x0000' + bytes(247))
2237
2238
2239class UnicodeTest:
2240
2241    def test_iso8859_1_filename(self):
2242        self._test_unicode_filename("iso8859-1")
2243
2244    def test_utf7_filename(self):
2245        self._test_unicode_filename("utf7")
2246
2247    def test_utf8_filename(self):
2248        self._test_unicode_filename("utf-8")
2249
2250    def _test_unicode_filename(self, encoding):
2251        tar = tarfile.open(tmpname, "w", format=self.format,
2252                           encoding=encoding, errors="strict")
2253        try:
2254            name = "\xe4\xf6\xfc"
2255            tar.addfile(tarfile.TarInfo(name))
2256        finally:
2257            tar.close()
2258
2259        tar = tarfile.open(tmpname, encoding=encoding)
2260        try:
2261            self.assertEqual(tar.getmembers()[0].name, name)
2262        finally:
2263            tar.close()
2264
2265    def test_unicode_filename_error(self):
2266        tar = tarfile.open(tmpname, "w", format=self.format,
2267                           encoding="ascii", errors="strict")
2268        try:
2269            tarinfo = tarfile.TarInfo()
2270
2271            tarinfo.name = "\xe4\xf6\xfc"
2272            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2273
2274            tarinfo.name = "foo"
2275            tarinfo.uname = "\xe4\xf6\xfc"
2276            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2277        finally:
2278            tar.close()
2279
2280    def test_unicode_argument(self):
2281        tar = tarfile.open(tarname, "r",
2282                           encoding="iso8859-1", errors="strict")
2283        try:
2284            for t in tar:
2285                self.assertIs(type(t.name), str)
2286                self.assertIs(type(t.linkname), str)
2287                self.assertIs(type(t.uname), str)
2288                self.assertIs(type(t.gname), str)
2289        finally:
2290            tar.close()
2291
2292    def test_uname_unicode(self):
2293        t = tarfile.TarInfo("foo")
2294        t.uname = "\xe4\xf6\xfc"
2295        t.gname = "\xe4\xf6\xfc"
2296
2297        tar = tarfile.open(tmpname, mode="w", format=self.format,
2298                           encoding="iso8859-1")
2299        try:
2300            tar.addfile(t)
2301        finally:
2302            tar.close()
2303
2304        tar = tarfile.open(tmpname, encoding="iso8859-1")
2305        try:
2306            t = tar.getmember("foo")
2307            self.assertEqual(t.uname, "\xe4\xf6\xfc")
2308            self.assertEqual(t.gname, "\xe4\xf6\xfc")
2309
2310            if self.format != tarfile.PAX_FORMAT:
2311                tar.close()
2312                tar = tarfile.open(tmpname, encoding="ascii")
2313                t = tar.getmember("foo")
2314                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
2315                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
2316        finally:
2317            tar.close()
2318
2319
2320class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
2321
2322    format = tarfile.USTAR_FORMAT
2323
2324    # Test whether the utf-8 encoded version of a filename exceeds the 100
2325    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
2326    # bytes).
2327    def test_unicode_name1(self):
2328        self._test_ustar_name("0123456789" * 10)
2329        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
2330        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
2331        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
2332
2333    def test_unicode_name2(self):
2334        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
2335        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
2336
2337    # Test whether the utf-8 encoded version of a filename exceeds the 155
2338    # bytes prefix + '/' + 100 bytes name limit.
2339    def test_unicode_longname1(self):
2340        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
2341        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
2342        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
2343        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
2344
2345    def test_unicode_longname2(self):
2346        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
2347        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
2348
2349    def test_unicode_longname3(self):
2350        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
2351        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
2352        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
2353
2354    def test_unicode_longname4(self):
2355        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
2356        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
2357
2358    def _test_ustar_name(self, name, exc=None):
2359        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2360            t = tarfile.TarInfo(name)
2361            if exc is None:
2362                tar.addfile(t)
2363            else:
2364                self.assertRaises(exc, tar.addfile, t)
2365
2366        if exc is None:
2367            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2368                for t in tar:
2369                    self.assertEqual(name, t.name)
2370                    break
2371
2372    # Test the same as above for the 100 bytes link field.
2373    def test_unicode_link1(self):
2374        self._test_ustar_link("0123456789" * 10)
2375        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
2376        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
2377        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
2378
2379    def test_unicode_link2(self):
2380        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
2381        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
2382
2383    def _test_ustar_link(self, name, exc=None):
2384        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2385            t = tarfile.TarInfo("foo")
2386            t.linkname = name
2387            if exc is None:
2388                tar.addfile(t)
2389            else:
2390                self.assertRaises(exc, tar.addfile, t)
2391
2392        if exc is None:
2393            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2394                for t in tar:
2395                    self.assertEqual(name, t.linkname)
2396                    break
2397
2398
2399class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2400
2401    format = tarfile.GNU_FORMAT
2402
2403    def test_bad_pax_header(self):
2404        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2405        # without a hdrcharset=BINARY header.
2406        for encoding, name in (
2407                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2408                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2409            with tarfile.open(tarname, encoding=encoding,
2410                              errors="surrogateescape") as tar:
2411                try:
2412                    t = tar.getmember(name)
2413                except KeyError:
2414                    self.fail("unable to read bad GNU tar pax header")
2415
2416
2417class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2418
2419    format = tarfile.PAX_FORMAT
2420
2421    # PAX_FORMAT ignores encoding in write mode.
2422    test_unicode_filename_error = None
2423
2424    def test_binary_header(self):
2425        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2426        for encoding, name in (
2427                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2428                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2429            with tarfile.open(tarname, encoding=encoding,
2430                              errors="surrogateescape") as tar:
2431                try:
2432                    t = tar.getmember(name)
2433                except KeyError:
2434                    self.fail("unable to read POSIX.1-2008 binary header")
2435
2436
2437class AppendTestBase:
2438    # Test append mode (cp. patch #1652681).
2439
2440    def setUp(self):
2441        self.tarname = tmpname
2442        if os.path.exists(self.tarname):
2443            os_helper.unlink(self.tarname)
2444
2445    def _create_testtar(self, mode="w:"):
2446        with tarfile.open(tarname, encoding="iso8859-1") as src:
2447            t = src.getmember("ustar/regtype")
2448            t.name = "foo"
2449            with src.extractfile(t) as f:
2450                with tarfile.open(self.tarname, mode) as tar:
2451                    tar.addfile(t, f)
2452
2453    def test_append_compressed(self):
2454        self._create_testtar("w:" + self.suffix)
2455        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2456
2457class AppendTest(AppendTestBase, unittest.TestCase):
2458    test_append_compressed = None
2459
2460    def _add_testfile(self, fileobj=None):
2461        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2462            tar.addfile(tarfile.TarInfo("bar"))
2463
2464    def _test(self, names=["bar"], fileobj=None):
2465        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2466            self.assertEqual(tar.getnames(), names)
2467
2468    def test_non_existing(self):
2469        self._add_testfile()
2470        self._test()
2471
2472    def test_empty(self):
2473        tarfile.open(self.tarname, "w:").close()
2474        self._add_testfile()
2475        self._test()
2476
2477    def test_empty_fileobj(self):
2478        fobj = io.BytesIO(b"\0" * 1024)
2479        self._add_testfile(fobj)
2480        fobj.seek(0)
2481        self._test(fileobj=fobj)
2482
2483    def test_fileobj(self):
2484        self._create_testtar()
2485        with open(self.tarname, "rb") as fobj:
2486            data = fobj.read()
2487        fobj = io.BytesIO(data)
2488        self._add_testfile(fobj)
2489        fobj.seek(0)
2490        self._test(names=["foo", "bar"], fileobj=fobj)
2491
2492    def test_existing(self):
2493        self._create_testtar()
2494        self._add_testfile()
2495        self._test(names=["foo", "bar"])
2496
2497    # Append mode is supposed to fail if the tarfile to append to
2498    # does not end with a zero block.
2499    def _test_error(self, data):
2500        with open(self.tarname, "wb") as fobj:
2501            fobj.write(data)
2502        self.assertRaises(tarfile.ReadError, self._add_testfile)
2503
2504    def test_null(self):
2505        self._test_error(b"")
2506
2507    def test_incomplete(self):
2508        self._test_error(b"\0" * 13)
2509
2510    def test_premature_eof(self):
2511        data = tarfile.TarInfo("foo").tobuf()
2512        self._test_error(data)
2513
2514    def test_trailing_garbage(self):
2515        data = tarfile.TarInfo("foo").tobuf()
2516        self._test_error(data + b"\0" * 13)
2517
2518    def test_invalid(self):
2519        self._test_error(b"a" * 512)
2520
2521class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2522    pass
2523
2524class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2525    pass
2526
2527class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2528    pass
2529
2530
2531class LimitsTest(unittest.TestCase):
2532
2533    def test_ustar_limits(self):
2534        # 100 char name
2535        tarinfo = tarfile.TarInfo("0123456789" * 10)
2536        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2537
2538        # 101 char name that cannot be stored
2539        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2540        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2541
2542        # 256 char name with a slash at pos 156
2543        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2544        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2545
2546        # 256 char name that cannot be stored
2547        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2548        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2549
2550        # 512 char name
2551        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2552        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2553
2554        # 512 char linkname
2555        tarinfo = tarfile.TarInfo("longlink")
2556        tarinfo.linkname = "123/" * 126 + "longname"
2557        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2558
2559        # uid > 8 digits
2560        tarinfo = tarfile.TarInfo("name")
2561        tarinfo.uid = 0o10000000
2562        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2563
2564    def test_gnu_limits(self):
2565        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2566        tarinfo.tobuf(tarfile.GNU_FORMAT)
2567
2568        tarinfo = tarfile.TarInfo("longlink")
2569        tarinfo.linkname = "123/" * 126 + "longname"
2570        tarinfo.tobuf(tarfile.GNU_FORMAT)
2571
2572        # uid >= 256 ** 7
2573        tarinfo = tarfile.TarInfo("name")
2574        tarinfo.uid = 0o4000000000000000000
2575        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2576
2577    def test_pax_limits(self):
2578        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2579        tarinfo.tobuf(tarfile.PAX_FORMAT)
2580
2581        tarinfo = tarfile.TarInfo("longlink")
2582        tarinfo.linkname = "123/" * 126 + "longname"
2583        tarinfo.tobuf(tarfile.PAX_FORMAT)
2584
2585        tarinfo = tarfile.TarInfo("name")
2586        tarinfo.uid = 0o4000000000000000000
2587        tarinfo.tobuf(tarfile.PAX_FORMAT)
2588
2589
2590class MiscTest(unittest.TestCase):
2591
2592    def test_char_fields(self):
2593        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2594                         b"foo\0\0\0\0\0")
2595        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2596                         b"foo")
2597        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2598                         "foo")
2599        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2600                         "foo")
2601
2602    def test_read_number_fields(self):
2603        # Issue 13158: Test if GNU tar specific base-256 number fields
2604        # are decoded correctly.
2605        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2606        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2607        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2608                         0o10000000)
2609        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2610                         0xffffffff)
2611        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2612                         -1)
2613        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2614                         -100)
2615        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2616                         -0x100000000000000)
2617
2618        # Issue 24514: Test if empty number fields are converted to zero.
2619        self.assertEqual(tarfile.nti(b"\0"), 0)
2620        self.assertEqual(tarfile.nti(b"       \0"), 0)
2621
2622    def test_write_number_fields(self):
2623        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2624        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2625        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2626                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2627        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2628                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2629        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2630                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2631        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2632                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2633        self.assertEqual(tarfile.itn(-0x100000000000000,
2634                                     format=tarfile.GNU_FORMAT),
2635                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2636
2637        # Issue 32713: Test if itn() supports float values outside the
2638        # non-GNU format range
2639        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2640                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2641        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2642                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2643        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2644
2645    def test_number_field_limits(self):
2646        with self.assertRaises(ValueError):
2647            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2648        with self.assertRaises(ValueError):
2649            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2650        with self.assertRaises(ValueError):
2651            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2652        with self.assertRaises(ValueError):
2653            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2654
2655    def test__all__(self):
2656        not_exported = {
2657            'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE',
2658            'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME',
2659            'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2660            'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE',
2661            'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE',
2662            'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES',
2663            'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS',
2664            'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums',
2665            'copyfileobj', 'filemode', 'EmptyHeaderError',
2666            'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError',
2667            'SubsequentHeaderError', 'ExFileObject', 'main'}
2668        support.check__all__(self, tarfile, not_exported=not_exported)
2669
2670    def test_useful_error_message_when_modules_missing(self):
2671        fname = os.path.join(os.path.dirname(__file__), 'archivetestdata', 'testtar.tar.xz')
2672        with self.assertRaises(tarfile.ReadError) as excinfo:
2673            error = tarfile.CompressionError('lzma module is not available'),
2674            with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error):
2675                tarfile.open(fname)
2676
2677        self.assertIn(
2678            "\n- method xz: CompressionError('lzma module is not available')\n",
2679            str(excinfo.exception),
2680        )
2681
2682
2683class CommandLineTest(unittest.TestCase):
2684
2685    def tarfilecmd(self, *args, **kwargs):
2686        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2687                                                      **kwargs)
2688        return out.replace(os.linesep.encode(), b'\n')
2689
2690    def tarfilecmd_failure(self, *args):
2691        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2692
2693    def make_simple_tarfile(self, tar_name):
2694        files = [support.findfile('tokenize_tests.txt',
2695                                  subdir='tokenizedata'),
2696                 support.findfile('tokenize_tests-no-coding-cookie-'
2697                                  'and-utf8-bom-sig-only.txt',
2698                                  subdir='tokenizedata')]
2699        self.addCleanup(os_helper.unlink, tar_name)
2700        with tarfile.open(tar_name, 'w') as tf:
2701            for tardata in files:
2702                tf.add(tardata, arcname=os.path.basename(tardata))
2703
2704    def make_evil_tarfile(self, tar_name):
2705        self.addCleanup(os_helper.unlink, tar_name)
2706        with tarfile.open(tar_name, 'w') as tf:
2707            benign = tarfile.TarInfo('benign')
2708            tf.addfile(benign, fileobj=io.BytesIO(b''))
2709            evil = tarfile.TarInfo('../evil')
2710            tf.addfile(evil, fileobj=io.BytesIO(b''))
2711
2712    def test_bad_use(self):
2713        rc, out, err = self.tarfilecmd_failure()
2714        self.assertEqual(out, b'')
2715        self.assertIn(b'usage', err.lower())
2716        self.assertIn(b'error', err.lower())
2717        self.assertIn(b'required', err.lower())
2718        rc, out, err = self.tarfilecmd_failure('-l', '')
2719        self.assertEqual(out, b'')
2720        self.assertNotEqual(err.strip(), b'')
2721
2722    def test_test_command(self):
2723        for tar_name in testtarnames:
2724            for opt in '-t', '--test':
2725                out = self.tarfilecmd(opt, tar_name)
2726                self.assertEqual(out, b'')
2727
2728    def test_test_command_verbose(self):
2729        for tar_name in testtarnames:
2730            for opt in '-v', '--verbose':
2731                out = self.tarfilecmd(opt, '-t', tar_name,
2732                                      PYTHONIOENCODING='utf-8')
2733                self.assertIn(b'is a tar archive.\n', out)
2734
2735    def test_test_command_invalid_file(self):
2736        zipname = support.findfile('zipdir.zip', subdir='archivetestdata')
2737        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2738        self.assertIn(b' is not a tar archive.', err)
2739        self.assertEqual(out, b'')
2740        self.assertEqual(rc, 1)
2741
2742        for tar_name in testtarnames:
2743            with self.subTest(tar_name=tar_name):
2744                with open(tar_name, 'rb') as f:
2745                    data = f.read()
2746                try:
2747                    with open(tmpname, 'wb') as f:
2748                        f.write(data[:511])
2749                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2750                    self.assertEqual(out, b'')
2751                    self.assertEqual(rc, 1)
2752                finally:
2753                    os_helper.unlink(tmpname)
2754
2755    def test_list_command(self):
2756        for tar_name in testtarnames:
2757            with support.captured_stdout() as t:
2758                with tarfile.open(tar_name, 'r') as tf:
2759                    tf.list(verbose=False)
2760            expected = t.getvalue().encode('ascii', 'backslashreplace')
2761            for opt in '-l', '--list':
2762                out = self.tarfilecmd(opt, tar_name,
2763                                      PYTHONIOENCODING='ascii')
2764                self.assertEqual(out, expected)
2765
2766    def test_list_command_verbose(self):
2767        for tar_name in testtarnames:
2768            with support.captured_stdout() as t:
2769                with tarfile.open(tar_name, 'r') as tf:
2770                    tf.list(verbose=True)
2771            expected = t.getvalue().encode('ascii', 'backslashreplace')
2772            for opt in '-v', '--verbose':
2773                out = self.tarfilecmd(opt, '-l', tar_name,
2774                                      PYTHONIOENCODING='ascii')
2775                self.assertEqual(out, expected)
2776
2777    def test_list_command_invalid_file(self):
2778        zipname = support.findfile('zipdir.zip', subdir='archivetestdata')
2779        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2780        self.assertIn(b' is not a tar archive.', err)
2781        self.assertEqual(out, b'')
2782        self.assertEqual(rc, 1)
2783
2784    def test_create_command(self):
2785        files = [support.findfile('tokenize_tests.txt',
2786                                  subdir='tokenizedata'),
2787                 support.findfile('tokenize_tests-no-coding-cookie-'
2788                                  'and-utf8-bom-sig-only.txt',
2789                                  subdir='tokenizedata')]
2790        for opt in '-c', '--create':
2791            try:
2792                out = self.tarfilecmd(opt, tmpname, *files)
2793                self.assertEqual(out, b'')
2794                with tarfile.open(tmpname) as tar:
2795                    tar.getmembers()
2796            finally:
2797                os_helper.unlink(tmpname)
2798
2799    def test_create_command_verbose(self):
2800        files = [support.findfile('tokenize_tests.txt',
2801                                  subdir='tokenizedata'),
2802                 support.findfile('tokenize_tests-no-coding-cookie-'
2803                                  'and-utf8-bom-sig-only.txt',
2804                                  subdir='tokenizedata')]
2805        for opt in '-v', '--verbose':
2806            try:
2807                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2808                                      PYTHONIOENCODING='utf-8')
2809                self.assertIn(b' file created.', out)
2810                with tarfile.open(tmpname) as tar:
2811                    tar.getmembers()
2812            finally:
2813                os_helper.unlink(tmpname)
2814
2815    def test_create_command_dotless_filename(self):
2816        files = [support.findfile('tokenize_tests.txt', subdir='tokenizedata')]
2817        try:
2818            out = self.tarfilecmd('-c', dotlessname, *files)
2819            self.assertEqual(out, b'')
2820            with tarfile.open(dotlessname) as tar:
2821                tar.getmembers()
2822        finally:
2823            os_helper.unlink(dotlessname)
2824
2825    def test_create_command_dot_started_filename(self):
2826        tar_name = os.path.join(TEMPDIR, ".testtar")
2827        files = [support.findfile('tokenize_tests.txt', subdir='tokenizedata')]
2828        try:
2829            out = self.tarfilecmd('-c', tar_name, *files)
2830            self.assertEqual(out, b'')
2831            with tarfile.open(tar_name) as tar:
2832                tar.getmembers()
2833        finally:
2834            os_helper.unlink(tar_name)
2835
2836    def test_create_command_compressed(self):
2837        files = [support.findfile('tokenize_tests.txt',
2838                                  subdir='tokenizedata'),
2839                 support.findfile('tokenize_tests-no-coding-cookie-'
2840                                  'and-utf8-bom-sig-only.txt',
2841                                  subdir='tokenizedata')]
2842        for filetype in (GzipTest, Bz2Test, LzmaTest):
2843            if not filetype.open:
2844                continue
2845            try:
2846                tar_name = tmpname + '.' + filetype.suffix
2847                out = self.tarfilecmd('-c', tar_name, *files)
2848                with filetype.taropen(tar_name) as tar:
2849                    tar.getmembers()
2850            finally:
2851                os_helper.unlink(tar_name)
2852
2853    def test_extract_command(self):
2854        self.make_simple_tarfile(tmpname)
2855        for opt in '-e', '--extract':
2856            try:
2857                with os_helper.temp_cwd(tarextdir):
2858                    out = self.tarfilecmd(opt, tmpname)
2859                self.assertEqual(out, b'')
2860            finally:
2861                os_helper.rmtree(tarextdir)
2862
2863    def test_extract_command_verbose(self):
2864        self.make_simple_tarfile(tmpname)
2865        for opt in '-v', '--verbose':
2866            try:
2867                with os_helper.temp_cwd(tarextdir):
2868                    out = self.tarfilecmd(opt, '-e', tmpname,
2869                                          PYTHONIOENCODING='utf-8')
2870                self.assertIn(b' file is extracted.', out)
2871            finally:
2872                os_helper.rmtree(tarextdir)
2873
2874    def test_extract_command_filter(self):
2875        self.make_evil_tarfile(tmpname)
2876        # Make an inner directory, so the member named '../evil'
2877        # is still extracted into `tarextdir`
2878        destdir = os.path.join(tarextdir, 'dest')
2879        os.mkdir(tarextdir)
2880        try:
2881            with os_helper.temp_cwd(destdir):
2882                self.tarfilecmd_failure('-e', tmpname,
2883                                        '-v',
2884                                        '--filter', 'data')
2885                out = self.tarfilecmd('-e', tmpname,
2886                                      '-v',
2887                                      '--filter', 'fully_trusted',
2888                                      PYTHONIOENCODING='utf-8')
2889                self.assertIn(b' file is extracted.', out)
2890        finally:
2891            os_helper.rmtree(tarextdir)
2892
2893    def test_extract_command_different_directory(self):
2894        self.make_simple_tarfile(tmpname)
2895        try:
2896            with os_helper.temp_cwd(tarextdir):
2897                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2898            self.assertEqual(out, b'')
2899        finally:
2900            os_helper.rmtree(tarextdir)
2901
2902    def test_extract_command_invalid_file(self):
2903        zipname = support.findfile('zipdir.zip', subdir='archivetestdata')
2904        with os_helper.temp_cwd(tarextdir):
2905            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2906        self.assertIn(b' is not a tar archive.', err)
2907        self.assertEqual(out, b'')
2908        self.assertEqual(rc, 1)
2909
2910
2911class ContextManagerTest(unittest.TestCase):
2912
2913    def test_basic(self):
2914        with tarfile.open(tarname) as tar:
2915            self.assertFalse(tar.closed, "closed inside runtime context")
2916        self.assertTrue(tar.closed, "context manager failed")
2917
2918    def test_closed(self):
2919        # The __enter__() method is supposed to raise OSError
2920        # if the TarFile object is already closed.
2921        tar = tarfile.open(tarname)
2922        tar.close()
2923        with self.assertRaises(OSError):
2924            with tar:
2925                pass
2926
2927    def test_exception(self):
2928        # Test if the OSError exception is passed through properly.
2929        with self.assertRaises(Exception) as exc:
2930            with tarfile.open(tarname) as tar:
2931                raise OSError
2932        self.assertIsInstance(exc.exception, OSError,
2933                              "wrong exception raised in context manager")
2934        self.assertTrue(tar.closed, "context manager failed")
2935
2936    def test_no_eof(self):
2937        # __exit__() must not write end-of-archive blocks if an
2938        # exception was raised.
2939        try:
2940            with tarfile.open(tmpname, "w") as tar:
2941                raise Exception
2942        except:
2943            pass
2944        self.assertEqual(os.path.getsize(tmpname), 0,
2945                "context manager wrote an end-of-archive block")
2946        self.assertTrue(tar.closed, "context manager failed")
2947
2948    def test_eof(self):
2949        # __exit__() must write end-of-archive blocks, i.e. call
2950        # TarFile.close() if there was no error.
2951        with tarfile.open(tmpname, "w"):
2952            pass
2953        self.assertNotEqual(os.path.getsize(tmpname), 0,
2954                "context manager wrote no end-of-archive block")
2955
2956    def test_fileobj(self):
2957        # Test that __exit__() did not close the external file
2958        # object.
2959        with open(tmpname, "wb") as fobj:
2960            try:
2961                with tarfile.open(fileobj=fobj, mode="w") as tar:
2962                    raise Exception
2963            except:
2964                pass
2965            self.assertFalse(fobj.closed, "external file object was closed")
2966            self.assertTrue(tar.closed, "context manager failed")
2967
2968
2969@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2970class LinkEmulationTest(ReadTest, unittest.TestCase):
2971
2972    # Test for issue #8741 regression. On platforms that do not support
2973    # symbolic or hard links tarfile tries to extract these types of members
2974    # as the regular files they point to.
2975    def _test_link_extraction(self, name):
2976        self.tar.extract(name, TEMPDIR, filter='fully_trusted')
2977        with open(os.path.join(TEMPDIR, name), "rb") as f:
2978            data = f.read()
2979        self.assertEqual(sha256sum(data), sha256_regtype)
2980
2981    # See issues #1578269, #8879, and #17689 for some history on these skips
2982    @unittest.skipIf(hasattr(os.path, "islink"),
2983                     "Skip emulation - has os.path.islink but not os.link")
2984    def test_hardlink_extraction1(self):
2985        self._test_link_extraction("ustar/lnktype")
2986
2987    @unittest.skipIf(hasattr(os.path, "islink"),
2988                     "Skip emulation - has os.path.islink but not os.link")
2989    def test_hardlink_extraction2(self):
2990        self._test_link_extraction("./ustar/linktest2/lnktype")
2991
2992    @unittest.skipIf(hasattr(os, "symlink"),
2993                     "Skip emulation if symlink exists")
2994    def test_symlink_extraction1(self):
2995        self._test_link_extraction("ustar/symtype")
2996
2997    @unittest.skipIf(hasattr(os, "symlink"),
2998                     "Skip emulation if symlink exists")
2999    def test_symlink_extraction2(self):
3000        self._test_link_extraction("./ustar/linktest2/symtype")
3001
3002
3003class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
3004    # Issue5068: The _BZ2Proxy.read() method loops forever
3005    # on an empty or partial bzipped file.
3006
3007    def _test_partial_input(self, mode):
3008        class MyBytesIO(io.BytesIO):
3009            hit_eof = False
3010            def read(self, n):
3011                if self.hit_eof:
3012                    raise AssertionError("infinite loop detected in "
3013                                         "tarfile.open()")
3014                self.hit_eof = self.tell() == len(self.getvalue())
3015                return super(MyBytesIO, self).read(n)
3016            def seek(self, *args):
3017                self.hit_eof = False
3018                return super(MyBytesIO, self).seek(*args)
3019
3020        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
3021        for x in range(len(data) + 1):
3022            try:
3023                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
3024            except tarfile.ReadError:
3025                pass # we have no interest in ReadErrors
3026
3027    def test_partial_input(self):
3028        self._test_partial_input("r")
3029
3030    def test_partial_input_bz2(self):
3031        self._test_partial_input("r:bz2")
3032
3033
3034def root_is_uid_gid_0():
3035    try:
3036        import pwd, grp
3037    except ImportError:
3038        return False
3039    if pwd.getpwuid(0)[0] != 'root':
3040        return False
3041    if grp.getgrgid(0)[0] != 'root':
3042        return False
3043    return True
3044
3045
3046@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
3047@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
3048class NumericOwnerTest(unittest.TestCase):
3049    # mock the following:
3050    #  os.chown: so we can test what's being called
3051    #  os.chmod: so the modes are not actually changed. if they are, we can't
3052    #             delete the files/directories
3053    #  os.geteuid: so we can lie and say we're root (uid = 0)
3054
3055    @staticmethod
3056    def _make_test_archive(filename_1, dirname_1, filename_2):
3057        # the file contents to write
3058        fobj = io.BytesIO(b"content")
3059
3060        # create a tar file with a file, a directory, and a file within that
3061        #  directory. Assign various .uid/.gid values to them
3062        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
3063                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
3064                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
3065                 ]
3066        with tarfile.open(tmpname, 'w') as tarfl:
3067            for name, uid, gid, typ, contents in items:
3068                t = tarfile.TarInfo(name)
3069                t.uid = uid
3070                t.gid = gid
3071                t.uname = 'root'
3072                t.gname = 'root'
3073                t.type = typ
3074                tarfl.addfile(t, contents)
3075
3076        # return the full pathname to the tar file
3077        return tmpname
3078
3079    @staticmethod
3080    @contextmanager
3081    def _setup_test(mock_geteuid):
3082        mock_geteuid.return_value = 0  # lie and say we're root
3083        fname = 'numeric-owner-testfile'
3084        dirname = 'dir'
3085
3086        # the names we want stored in the tarfile
3087        filename_1 = fname
3088        dirname_1 = dirname
3089        filename_2 = os.path.join(dirname, fname)
3090
3091        # create the tarfile with the contents we're after
3092        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
3093                                                           dirname_1,
3094                                                           filename_2)
3095
3096        # open the tarfile for reading. yield it and the names of the items
3097        #  we stored into the file
3098        with tarfile.open(tar_filename) as tarfl:
3099            yield tarfl, filename_1, dirname_1, filename_2
3100
3101    @unittest.mock.patch('os.chown')
3102    @unittest.mock.patch('os.chmod')
3103    @unittest.mock.patch('os.geteuid')
3104    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
3105                                        mock_chown):
3106        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
3107                                                filename_2):
3108            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True,
3109                          filter='fully_trusted')
3110            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True,
3111                          filter='fully_trusted')
3112
3113        # convert to filesystem paths
3114        f_filename_1 = os.path.join(TEMPDIR, filename_1)
3115        f_filename_2 = os.path.join(TEMPDIR, filename_2)
3116
3117        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
3118                                     unittest.mock.call(f_filename_2, 88, 87),
3119                                     ],
3120                                    any_order=True)
3121
3122    @unittest.mock.patch('os.chown')
3123    @unittest.mock.patch('os.chmod')
3124    @unittest.mock.patch('os.geteuid')
3125    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
3126                                           mock_chown):
3127        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
3128                                                filename_2):
3129            tarfl.extractall(TEMPDIR, numeric_owner=True,
3130                             filter='fully_trusted')
3131
3132        # convert to filesystem paths
3133        f_filename_1 = os.path.join(TEMPDIR, filename_1)
3134        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
3135        f_filename_2 = os.path.join(TEMPDIR, filename_2)
3136
3137        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
3138                                     unittest.mock.call(f_dirname_1, 77, 76),
3139                                     unittest.mock.call(f_filename_2, 88, 87),
3140                                     ],
3141                                    any_order=True)
3142
3143    # this test requires that uid=0 and gid=0 really be named 'root'. that's
3144    #  because the uname and gname in the test file are 'root', and extract()
3145    #  will look them up using pwd and grp to find their uid and gid, which we
3146    #  test here to be 0.
3147    @unittest.skipUnless(root_is_uid_gid_0(),
3148                         'uid=0,gid=0 must be named "root"')
3149    @unittest.mock.patch('os.chown')
3150    @unittest.mock.patch('os.chmod')
3151    @unittest.mock.patch('os.geteuid')
3152    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
3153                                           mock_chown):
3154        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
3155            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False,
3156                          filter='fully_trusted')
3157
3158        # convert to filesystem paths
3159        f_filename_1 = os.path.join(TEMPDIR, filename_1)
3160
3161        mock_chown.assert_called_with(f_filename_1, 0, 0)
3162
3163    @unittest.mock.patch('os.geteuid')
3164    def test_keyword_only(self, mock_geteuid):
3165        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
3166            self.assertRaises(TypeError,
3167                              tarfl.extract, filename_1, TEMPDIR, False, True)
3168
3169
3170class ReplaceTests(ReadTest, unittest.TestCase):
3171    def test_replace_name(self):
3172        member = self.tar.getmember('ustar/regtype')
3173        replaced = member.replace(name='misc/other')
3174        self.assertEqual(replaced.name, 'misc/other')
3175        self.assertEqual(member.name, 'ustar/regtype')
3176        self.assertEqual(self.tar.getmember('ustar/regtype').name,
3177                         'ustar/regtype')
3178
3179    def test_replace_deep(self):
3180        member = self.tar.getmember('pax/regtype1')
3181        replaced = member.replace()
3182        replaced.pax_headers['gname'] = 'not-bar'
3183        self.assertEqual(member.pax_headers['gname'], 'bar')
3184        self.assertEqual(
3185            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar')
3186
3187    def test_replace_shallow(self):
3188        member = self.tar.getmember('pax/regtype1')
3189        replaced = member.replace(deep=False)
3190        replaced.pax_headers['gname'] = 'not-bar'
3191        self.assertEqual(member.pax_headers['gname'], 'not-bar')
3192        self.assertEqual(
3193            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar')
3194
3195    def test_replace_all(self):
3196        member = self.tar.getmember('ustar/regtype')
3197        for attr_name in ('name', 'mtime', 'mode', 'linkname',
3198                          'uid', 'gid', 'uname', 'gname'):
3199            with self.subTest(attr_name=attr_name):
3200                replaced = member.replace(**{attr_name: None})
3201                self.assertEqual(getattr(replaced, attr_name), None)
3202                self.assertNotEqual(getattr(member, attr_name), None)
3203
3204    def test_replace_internal(self):
3205        member = self.tar.getmember('ustar/regtype')
3206        with self.assertRaises(TypeError):
3207            member.replace(offset=123456789)
3208
3209
3210class NoneInfoExtractTests(ReadTest):
3211    # These mainly check that all kinds of members are extracted successfully
3212    # if some metadata is None.
3213    # Some of the methods do additional spot checks.
3214
3215    # We also test that the default filters can deal with None.
3216
3217    extraction_filter = None
3218
3219    @classmethod
3220    def setUpClass(cls):
3221        tar = tarfile.open(tarname, mode='r', encoding="iso8859-1")
3222        cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl"
3223        tar.errorlevel = 0
3224        with ExitStack() as cm:
3225            if cls.extraction_filter is None:
3226                cm.enter_context(warnings.catch_warnings(
3227                    action="ignore", category=DeprecationWarning))
3228            tar.extractall(cls.control_dir, filter=cls.extraction_filter)
3229        tar.close()
3230        cls.control_paths = set(
3231            p.relative_to(cls.control_dir)
3232            for p in pathlib.Path(cls.control_dir).glob('**/*'))
3233
3234    @classmethod
3235    def tearDownClass(cls):
3236        shutil.rmtree(cls.control_dir)
3237
3238    def check_files_present(self, directory):
3239        got_paths = set(
3240            p.relative_to(directory)
3241            for p in pathlib.Path(directory).glob('**/*'))
3242        self.assertEqual(self.control_paths, got_paths)
3243
3244    @contextmanager
3245    def extract_with_none(self, *attr_names):
3246        DIR = pathlib.Path(TEMPDIR) / "extractall_none"
3247        self.tar.errorlevel = 0
3248        for member in self.tar.getmembers():
3249            for attr_name in attr_names:
3250                setattr(member, attr_name, None)
3251        with os_helper.temp_dir(DIR):
3252            self.tar.extractall(DIR, filter='fully_trusted')
3253            self.check_files_present(DIR)
3254            yield DIR
3255
3256    def test_extractall_none_mtime(self):
3257        # mtimes of extracted files should be later than 'now' -- the mtime
3258        # of a previously created directory.
3259        now = pathlib.Path(TEMPDIR).stat().st_mtime
3260        with self.extract_with_none('mtime') as DIR:
3261            for path in pathlib.Path(DIR).glob('**/*'):
3262                with self.subTest(path=path):
3263                    try:
3264                        mtime = path.stat().st_mtime
3265                    except OSError:
3266                        # Some systems can't stat symlinks, ignore those
3267                        if not path.is_symlink():
3268                            raise
3269                    else:
3270                        self.assertGreaterEqual(path.stat().st_mtime, now)
3271
3272    def test_extractall_none_mode(self):
3273        # modes of directories and regular files should match the mode
3274        # of a "normally" created directory or regular file
3275        dir_mode = pathlib.Path(TEMPDIR).stat().st_mode
3276        regular_file = pathlib.Path(TEMPDIR) / 'regular_file'
3277        regular_file.write_text('')
3278        regular_file_mode = regular_file.stat().st_mode
3279        with self.extract_with_none('mode') as DIR:
3280            for path in pathlib.Path(DIR).glob('**/*'):
3281                with self.subTest(path=path):
3282                    if path.is_dir():
3283                        self.assertEqual(path.stat().st_mode, dir_mode)
3284                    elif path.is_file():
3285                        self.assertEqual(path.stat().st_mode,
3286                                         regular_file_mode)
3287
3288    def test_extractall_none_uid(self):
3289        with self.extract_with_none('uid'):
3290            pass
3291
3292    def test_extractall_none_gid(self):
3293        with self.extract_with_none('gid'):
3294            pass
3295
3296    def test_extractall_none_uname(self):
3297        with self.extract_with_none('uname'):
3298            pass
3299
3300    def test_extractall_none_gname(self):
3301        with self.extract_with_none('gname'):
3302            pass
3303
3304    def test_extractall_none_ownership(self):
3305        with self.extract_with_none('uid', 'gid', 'uname', 'gname'):
3306            pass
3307
3308class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase):
3309    extraction_filter = 'data'
3310
3311class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests,
3312                                        unittest.TestCase):
3313    extraction_filter = 'fully_trusted'
3314
3315class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase):
3316    extraction_filter = 'tar'
3317
3318class NoneInfoExtractTests_Default(NoneInfoExtractTests,
3319                                   unittest.TestCase):
3320    extraction_filter = None
3321
3322class NoneInfoTests_Misc(unittest.TestCase):
3323    def test_add(self):
3324        # When addfile() encounters None metadata, it raises a ValueError
3325        bio = io.BytesIO()
3326        for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT,
3327                          tarfile.PAX_FORMAT):
3328            with self.subTest(tarformat=tarformat):
3329                tar = tarfile.open(fileobj=bio, mode='w', format=tarformat)
3330                tarinfo = tar.gettarinfo(tarname)
3331                try:
3332                    with open(tarname, 'rb') as f:
3333                        tar.addfile(tarinfo, f)
3334                except Exception:
3335                    if tarformat == tarfile.USTAR_FORMAT:
3336                        # In the old, limited format, adding might fail for
3337                        # reasons like the UID being too large
3338                        pass
3339                    else:
3340                        raise
3341                else:
3342                    for attr_name in ('mtime', 'mode', 'uid', 'gid',
3343                                    'uname', 'gname'):
3344                        with self.subTest(attr_name=attr_name):
3345                            replaced = tarinfo.replace(**{attr_name: None})
3346                            with self.assertRaisesRegex(ValueError,
3347                                                        f"{attr_name}"):
3348                                with open(tarname, 'rb') as f:
3349                                    tar.addfile(replaced, f)
3350
3351    def test_list(self):
3352        # Change some metadata to None, then compare list() output
3353        # word-for-word. We want list() to not raise, and to only change
3354        # printout for the affected piece of metadata.
3355        # (n.b.: some contents of the test archive are hardcoded.)
3356        for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'},
3357                           {'uname'}, {'gname'},
3358                           {'uid', 'uname'}, {'gid', 'gname'}):
3359            with (self.subTest(attr_names=attr_names),
3360                  tarfile.open(tarname, encoding="iso8859-1") as tar):
3361                tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3362                with support.swap_attr(sys, 'stdout', tio_prev):
3363                    tar.list()
3364                for member in tar.getmembers():
3365                    for attr_name in attr_names:
3366                        setattr(member, attr_name, None)
3367                tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3368                with support.swap_attr(sys, 'stdout', tio_new):
3369                    tar.list()
3370                for expected, got in zip(tio_prev.detach().getvalue().split(),
3371                                         tio_new.detach().getvalue().split()):
3372                    if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected):
3373                        self.assertEqual(got, b'????-??-??')
3374                    elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected):
3375                        self.assertEqual(got, b'??:??:??')
3376                    elif attr_names == {'mode'} and re.match(
3377                            rb'.([r-][w-][x-]){3}', expected):
3378                        self.assertEqual(got, b'??????????')
3379                    elif attr_names == {'uname'} and expected.startswith(
3380                            (b'tarfile/', b'lars/', b'foo/')):
3381                        exp_user, exp_group = expected.split(b'/')
3382                        got_user, got_group = got.split(b'/')
3383                        self.assertEqual(got_group, exp_group)
3384                        self.assertRegex(got_user, b'[0-9]+')
3385                    elif attr_names == {'gname'} and expected.endswith(
3386                            (b'/tarfile', b'/users', b'/bar')):
3387                        exp_user, exp_group = expected.split(b'/')
3388                        got_user, got_group = got.split(b'/')
3389                        self.assertEqual(got_user, exp_user)
3390                        self.assertRegex(got_group, b'[0-9]+')
3391                    elif attr_names == {'uid'} and expected.startswith(
3392                            (b'1000/')):
3393                        exp_user, exp_group = expected.split(b'/')
3394                        got_user, got_group = got.split(b'/')
3395                        self.assertEqual(got_group, exp_group)
3396                        self.assertEqual(got_user, b'None')
3397                    elif attr_names == {'gid'} and expected.endswith((b'/100')):
3398                        exp_user, exp_group = expected.split(b'/')
3399                        got_user, got_group = got.split(b'/')
3400                        self.assertEqual(got_user, exp_user)
3401                        self.assertEqual(got_group, b'None')
3402                    elif attr_names == {'uid', 'uname'} and expected.startswith(
3403                            (b'tarfile/', b'lars/', b'foo/', b'1000/')):
3404                        exp_user, exp_group = expected.split(b'/')
3405                        got_user, got_group = got.split(b'/')
3406                        self.assertEqual(got_group, exp_group)
3407                        self.assertEqual(got_user, b'None')
3408                    elif attr_names == {'gname', 'gid'} and expected.endswith(
3409                            (b'/tarfile', b'/users', b'/bar', b'/100')):
3410                        exp_user, exp_group = expected.split(b'/')
3411                        got_user, got_group = got.split(b'/')
3412                        self.assertEqual(got_user, exp_user)
3413                        self.assertEqual(got_group, b'None')
3414                    else:
3415                        # In other cases the output should be the same
3416                        self.assertEqual(expected, got)
3417
3418def _filemode_to_int(mode):
3419    """Inverse of `stat.filemode` (for permission bits)
3420
3421    Using mode strings rather than numbers makes the later tests more readable.
3422    """
3423    str_mode = mode[1:]
3424    result = (
3425          {'r': stat.S_IRUSR, '-': 0}[str_mode[0]]
3426        | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]]
3427        | {'x': stat.S_IXUSR, '-': 0,
3428           's': stat.S_IXUSR | stat.S_ISUID,
3429           'S': stat.S_ISUID}[str_mode[2]]
3430        | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]]
3431        | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]]
3432        | {'x': stat.S_IXGRP, '-': 0,
3433           's': stat.S_IXGRP | stat.S_ISGID,
3434           'S': stat.S_ISGID}[str_mode[5]]
3435        | {'r': stat.S_IROTH, '-': 0}[str_mode[6]]
3436        | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]]
3437        | {'x': stat.S_IXOTH, '-': 0,
3438           't': stat.S_IXOTH | stat.S_ISVTX,
3439           'T': stat.S_ISVTX}[str_mode[8]]
3440        )
3441    # check we did this right
3442    assert stat.filemode(result)[1:] == mode[1:]
3443
3444    return result
3445
3446class ArchiveMaker:
3447    """Helper to create a tar file with specific contents
3448
3449    Usage:
3450
3451        with ArchiveMaker() as t:
3452            t.add('filename', ...)
3453
3454        with t.open() as tar:
3455            ... # `tar` is now a TarFile with 'filename' in it!
3456    """
3457    def __init__(self):
3458        self.bio = io.BytesIO()
3459
3460    def __enter__(self):
3461        self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio)
3462        return self
3463
3464    def __exit__(self, *exc):
3465        self.tar_w.close()
3466        self.contents = self.bio.getvalue()
3467        self.bio = None
3468
3469    def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
3470            mode=None, size=None, **kwargs):
3471        """Add a member to the test archive. Call within `with`."""
3472        name = str(name)
3473        tarinfo = tarfile.TarInfo(name).replace(**kwargs)
3474        if size is not None:
3475            tarinfo.size = size
3476        if mode:
3477            tarinfo.mode = _filemode_to_int(mode)
3478        if symlink_to is not None:
3479            type = tarfile.SYMTYPE
3480            tarinfo.linkname = str(symlink_to)
3481        if hardlink_to is not None:
3482            type = tarfile.LNKTYPE
3483            tarinfo.linkname = str(hardlink_to)
3484        if name.endswith('/') and type is None:
3485            type = tarfile.DIRTYPE
3486        if type is not None:
3487            tarinfo.type = type
3488        if tarinfo.isreg():
3489            fileobj = io.BytesIO(bytes(tarinfo.size))
3490        else:
3491            fileobj = None
3492        self.tar_w.addfile(tarinfo, fileobj)
3493
3494    def open(self, **kwargs):
3495        """Open the resulting archive as TarFile. Call after `with`."""
3496        bio = io.BytesIO(self.contents)
3497        return tarfile.open(fileobj=bio, **kwargs)
3498
3499# Under WASI, `os_helper.can_symlink` is False to make
3500# `skip_unless_symlink` skip symlink tests. "
3501# But in the following tests we use can_symlink to *determine* which
3502# behavior is expected.
3503# Like other symlink tests, skip these on WASI for now.
3504if support.is_wasi:
3505    def symlink_test(f):
3506        return unittest.skip("WASI: Skip symlink test for now")(f)
3507else:
3508    def symlink_test(f):
3509        return f
3510
3511
3512class TestExtractionFilters(unittest.TestCase):
3513
3514    # A temporary directory for the extraction results.
3515    # All files that "escape" the destination path should still end
3516    # up in this directory.
3517    outerdir = pathlib.Path(TEMPDIR) / 'outerdir'
3518
3519    # The destination for the extraction, within `outerdir`
3520    destdir = outerdir / 'dest'
3521
3522    @contextmanager
3523    def check_context(self, tar, filter):
3524        """Extracts `tar` to `self.destdir` and allows checking the result
3525
3526        If an error occurs, it must be checked using `expect_exception`
3527
3528        Otherwise, all resulting files must be checked using `expect_file`,
3529        except the destination directory itself and parent directories of
3530        other files.
3531        When checking directories, do so before their contents.
3532        """
3533        with os_helper.temp_dir(self.outerdir):
3534            try:
3535                tar.extractall(self.destdir, filter=filter)
3536            except Exception as exc:
3537                self.raised_exception = exc
3538                self.expected_paths = set()
3539            else:
3540                self.raised_exception = None
3541                self.expected_paths = set(self.outerdir.glob('**/*'))
3542                self.expected_paths.discard(self.destdir)
3543            try:
3544                yield
3545            finally:
3546                tar.close()
3547            if self.raised_exception:
3548                raise self.raised_exception
3549            self.assertEqual(self.expected_paths, set())
3550
3551    def expect_file(self, name, type=None, symlink_to=None, mode=None,
3552                    size=None):
3553        """Check a single file. See check_context."""
3554        if self.raised_exception:
3555            raise self.raised_exception
3556        # use normpath() rather than resolve() so we don't follow symlinks
3557        path = pathlib.Path(os.path.normpath(self.destdir / name))
3558        self.assertIn(path, self.expected_paths)
3559        self.expected_paths.remove(path)
3560        if mode is not None and os_helper.can_chmod() and os.name != 'nt':
3561            got = stat.filemode(stat.S_IMODE(path.stat().st_mode))
3562            self.assertEqual(got, mode)
3563        if type is None and isinstance(name, str) and name.endswith('/'):
3564            type = tarfile.DIRTYPE
3565        if symlink_to is not None:
3566            got = (self.destdir / name).readlink()
3567            expected = pathlib.Path(symlink_to)
3568            # The symlink might be the same (textually) as what we expect,
3569            # but some systems change the link to an equivalent path, so
3570            # we fall back to samefile().
3571            if expected != got:
3572                self.assertTrue(got.samefile(expected))
3573        elif type == tarfile.REGTYPE or type is None:
3574            self.assertTrue(path.is_file())
3575        elif type == tarfile.DIRTYPE:
3576            self.assertTrue(path.is_dir())
3577        elif type == tarfile.FIFOTYPE:
3578            self.assertTrue(path.is_fifo())
3579        else:
3580            raise NotImplementedError(type)
3581        if size is not None:
3582            self.assertEqual(path.stat().st_size, size)
3583        for parent in path.parents:
3584            self.expected_paths.discard(parent)
3585
3586    def expect_exception(self, exc_type, message_re='.'):
3587        with self.assertRaisesRegex(exc_type, message_re):
3588            if self.raised_exception is not None:
3589                raise self.raised_exception
3590        self.raised_exception = None
3591
3592    def test_benign_file(self):
3593        with ArchiveMaker() as arc:
3594            arc.add('benign.txt')
3595        for filter in 'fully_trusted', 'tar', 'data':
3596            with self.check_context(arc.open(), filter):
3597                self.expect_file('benign.txt')
3598
3599    def test_absolute(self):
3600        # Test handling a member with an absolute path
3601        # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives
3602        with ArchiveMaker() as arc:
3603            arc.add(self.outerdir / 'escaped.evil')
3604
3605        with self.check_context(arc.open(), 'fully_trusted'):
3606            self.expect_file('../escaped.evil')
3607
3608        for filter in 'tar', 'data':
3609            with self.check_context(arc.open(), filter):
3610                if str(self.outerdir).startswith('/'):
3611                    # We strip leading slashes, as e.g. GNU tar does
3612                    # (without --absolute-filenames).
3613                    outerdir_stripped = str(self.outerdir).lstrip('/')
3614                    self.expect_file(f'{outerdir_stripped}/escaped.evil')
3615                else:
3616                    # On this system, absolute paths don't have leading
3617                    # slashes.
3618                    # So, there's nothing to strip. We refuse to unpack
3619                    # to an absolute path, nonetheless.
3620                    self.expect_exception(
3621                        tarfile.AbsolutePathError,
3622                        """['"].*escaped.evil['"] has an absolute path""")
3623
3624    @symlink_test
3625    def test_parent_symlink(self):
3626        # Test interplaying symlinks
3627        # Inspired by 'dirsymlink2a' in jwilk/traversal-archives
3628        with ArchiveMaker() as arc:
3629
3630            # `current` links to `.` which is both:
3631            #   - the destination directory
3632            #   - `current` itself
3633            arc.add('current', symlink_to='.')
3634
3635            # effectively points to ./../
3636            arc.add('parent', symlink_to='current/..')
3637
3638            arc.add('parent/evil')
3639
3640        if os_helper.can_symlink():
3641            with self.check_context(arc.open(), 'fully_trusted'):
3642                if self.raised_exception is not None:
3643                    # Windows will refuse to create a file that's a symlink to itself
3644                    # (and tarfile doesn't swallow that exception)
3645                    self.expect_exception(FileExistsError)
3646                    # The other cases will fail with this error too.
3647                    # Skip the rest of this test.
3648                    return
3649                else:
3650                    self.expect_file('current', symlink_to='.')
3651                    self.expect_file('parent', symlink_to='current/..')
3652                    self.expect_file('../evil')
3653
3654            with self.check_context(arc.open(), 'tar'):
3655                self.expect_exception(
3656                    tarfile.OutsideDestinationError,
3657                    """'parent/evil' would be extracted to ['"].*evil['"], """
3658                    + "which is outside the destination")
3659
3660            with self.check_context(arc.open(), 'data'):
3661                self.expect_exception(
3662                    tarfile.LinkOutsideDestinationError,
3663                    """'parent' would link to ['"].*outerdir['"], """
3664                    + "which is outside the destination")
3665
3666        else:
3667            # No symlink support. The symlinks are ignored.
3668            with self.check_context(arc.open(), 'fully_trusted'):
3669                self.expect_file('parent/evil')
3670            with self.check_context(arc.open(), 'tar'):
3671                self.expect_file('parent/evil')
3672            with self.check_context(arc.open(), 'data'):
3673                self.expect_file('parent/evil')
3674
3675    @symlink_test
3676    def test_parent_symlink2(self):
3677        # Test interplaying symlinks
3678        # Inspired by 'dirsymlink2b' in jwilk/traversal-archives
3679
3680        # Posix and Windows have different pathname resolution:
3681        # either symlink or a '..' component resolve first.
3682        # Let's see which we are on.
3683        if os_helper.can_symlink():
3684            testpath = os.path.join(TEMPDIR, 'resolution_test')
3685            os.mkdir(testpath)
3686
3687            # testpath/current links to `.` which is all of:
3688            #   - `testpath`
3689            #   - `testpath/current`
3690            #   - `testpath/current/current`
3691            #   - etc.
3692            os.symlink('.', os.path.join(testpath, 'current'))
3693
3694            # we'll test where `testpath/current/../file` ends up
3695            with open(os.path.join(testpath, 'current', '..', 'file'), 'w'):
3696                pass
3697
3698            if os.path.exists(os.path.join(testpath, 'file')):
3699                # Windows collapses 'current\..' to '.' first, leaving
3700                # 'testpath\file'
3701                dotdot_resolves_early = True
3702            elif os.path.exists(os.path.join(testpath, '..', 'file')):
3703                # Posix resolves 'current' to '.' first, leaving
3704                # 'testpath/../file'
3705                dotdot_resolves_early = False
3706            else:
3707                raise AssertionError('Could not determine link resolution')
3708
3709        with ArchiveMaker() as arc:
3710
3711            # `current` links to `.` which is both the destination directory
3712            # and `current` itself
3713            arc.add('current', symlink_to='.')
3714
3715            # `current/parent` is also available as `./parent`,
3716            # and effectively points to `./../`
3717            arc.add('current/parent', symlink_to='..')
3718
3719            arc.add('parent/evil')
3720
3721        with self.check_context(arc.open(), 'fully_trusted'):
3722            if os_helper.can_symlink():
3723                self.expect_file('current', symlink_to='.')
3724                self.expect_file('parent', symlink_to='..')
3725                self.expect_file('../evil')
3726            else:
3727                self.expect_file('current/')
3728                self.expect_file('parent/evil')
3729
3730        with self.check_context(arc.open(), 'tar'):
3731            if os_helper.can_symlink():
3732                # Fail when extracting a file outside destination
3733                self.expect_exception(
3734                        tarfile.OutsideDestinationError,
3735                        "'parent/evil' would be extracted to "
3736                        + """['"].*evil['"], which is outside """
3737                        + "the destination")
3738            else:
3739                self.expect_file('current/')
3740                self.expect_file('parent/evil')
3741
3742        with self.check_context(arc.open(), 'data'):
3743            if os_helper.can_symlink():
3744                if dotdot_resolves_early:
3745                    # Fail when extracting a file outside destination
3746                    self.expect_exception(
3747                            tarfile.OutsideDestinationError,
3748                            "'parent/evil' would be extracted to "
3749                            + """['"].*evil['"], which is outside """
3750                            + "the destination")
3751                else:
3752                    # Fail as soon as we have a symlink outside the destination
3753                    self.expect_exception(
3754                            tarfile.LinkOutsideDestinationError,
3755                            "'current/parent' would link to "
3756                            + """['"].*outerdir['"], which is outside """
3757                            + "the destination")
3758            else:
3759                self.expect_file('current/')
3760                self.expect_file('parent/evil')
3761
3762    @symlink_test
3763    def test_absolute_symlink(self):
3764        # Test symlink to an absolute path
3765        # Inspired by 'dirsymlink' in jwilk/traversal-archives
3766        with ArchiveMaker() as arc:
3767            arc.add('parent', symlink_to=self.outerdir)
3768            arc.add('parent/evil')
3769
3770        with self.check_context(arc.open(), 'fully_trusted'):
3771            if os_helper.can_symlink():
3772                self.expect_file('parent', symlink_to=self.outerdir)
3773                self.expect_file('../evil')
3774            else:
3775                self.expect_file('parent/evil')
3776
3777        with self.check_context(arc.open(), 'tar'):
3778            if os_helper.can_symlink():
3779                self.expect_exception(
3780                        tarfile.OutsideDestinationError,
3781                        "'parent/evil' would be extracted to "
3782                        + """['"].*evil['"], which is outside """
3783                        + "the destination")
3784            else:
3785                self.expect_file('parent/evil')
3786
3787        with self.check_context(arc.open(), 'data'):
3788            self.expect_exception(
3789                tarfile.AbsoluteLinkError,
3790                "'parent' is a link to an absolute path")
3791
3792    def test_absolute_hardlink(self):
3793        # Test hardlink to an absolute path
3794        # Inspired by 'dirsymlink' in https://github.com/jwilk/traversal-archives
3795        with ArchiveMaker() as arc:
3796            arc.add('parent', hardlink_to=self.outerdir / 'foo')
3797
3798        with self.check_context(arc.open(), 'fully_trusted'):
3799            self.expect_exception(KeyError, ".*foo. not found")
3800
3801        with self.check_context(arc.open(), 'tar'):
3802            self.expect_exception(KeyError, ".*foo. not found")
3803
3804        with self.check_context(arc.open(), 'data'):
3805            self.expect_exception(
3806                tarfile.AbsoluteLinkError,
3807                "'parent' is a link to an absolute path")
3808
3809    @symlink_test
3810    def test_sly_relative0(self):
3811        # Inspired by 'relative0' in jwilk/traversal-archives
3812        with ArchiveMaker() as arc:
3813            # points to `../../tmp/moo`
3814            arc.add('../moo', symlink_to='..//tmp/moo')
3815
3816        try:
3817            with self.check_context(arc.open(), filter='fully_trusted'):
3818                if os_helper.can_symlink():
3819                    if isinstance(self.raised_exception, FileExistsError):
3820                        # XXX TarFile happens to fail creating a parent
3821                        # directory.
3822                        # This might be a bug, but fixing it would hurt
3823                        # security.
3824                        # Note that e.g. GNU `tar` rejects '..' components,
3825                        # so you could argue this is an invalid archive and we
3826                        # just raise an bad type of exception.
3827                        self.expect_exception(FileExistsError)
3828                    else:
3829                        self.expect_file('../moo', symlink_to='..//tmp/moo')
3830                else:
3831                    # The symlink can't be extracted and is ignored
3832                    pass
3833        except FileExistsError:
3834            pass
3835
3836        for filter in 'tar', 'data':
3837            with self.check_context(arc.open(), filter):
3838                self.expect_exception(
3839                        tarfile.OutsideDestinationError,
3840                        "'../moo' would be extracted to "
3841                        + "'.*moo', which is outside "
3842                        + "the destination")
3843
3844    @symlink_test
3845    def test_sly_relative2(self):
3846        # Inspired by 'relative2' in jwilk/traversal-archives
3847        with ArchiveMaker() as arc:
3848            arc.add('tmp/')
3849            arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo')
3850
3851        with self.check_context(arc.open(), 'fully_trusted'):
3852            self.expect_file('tmp', type=tarfile.DIRTYPE)
3853            if os_helper.can_symlink():
3854                self.expect_file('../moo', symlink_to='tmp/../../tmp/moo')
3855
3856        for filter in 'tar', 'data':
3857            with self.check_context(arc.open(), filter):
3858                self.expect_exception(
3859                    tarfile.OutsideDestinationError,
3860                    "'tmp/../../moo' would be extracted to "
3861                    + """['"].*moo['"], which is outside the """
3862                    + "destination")
3863
3864    @symlink_test
3865    def test_deep_symlink(self):
3866        # Test that symlinks and hardlinks inside a directory
3867        # point to the correct file (`target` of size 3).
3868        # If links aren't supported we get a copy of the file.
3869        with ArchiveMaker() as arc:
3870            arc.add('targetdir/target', size=3)
3871            # a hardlink's linkname is relative to the archive
3872            arc.add('linkdir/hardlink', hardlink_to=os.path.join(
3873                'targetdir', 'target'))
3874            # a symlink's  linkname is relative to the link's directory
3875            arc.add('linkdir/symlink', symlink_to=os.path.join(
3876                '..', 'targetdir', 'target'))
3877
3878        for filter in 'tar', 'data', 'fully_trusted':
3879            with self.check_context(arc.open(), filter):
3880                self.expect_file('targetdir/target', size=3)
3881                self.expect_file('linkdir/hardlink', size=3)
3882                if os_helper.can_symlink():
3883                    self.expect_file('linkdir/symlink', size=3,
3884                                     symlink_to='../targetdir/target')
3885                else:
3886                    self.expect_file('linkdir/symlink', size=3)
3887
3888    @symlink_test
3889    def test_chains(self):
3890        # Test chaining of symlinks/hardlinks.
3891        # Symlinks are created before the files they point to.
3892        with ArchiveMaker() as arc:
3893            arc.add('linkdir/symlink', symlink_to='hardlink')
3894            arc.add('symlink2', symlink_to=os.path.join(
3895                'linkdir', 'hardlink2'))
3896            arc.add('targetdir/target', size=3)
3897            arc.add('linkdir/hardlink', hardlink_to='targetdir/target')
3898            arc.add('linkdir/hardlink2', hardlink_to='linkdir/symlink')
3899
3900        for filter in 'tar', 'data', 'fully_trusted':
3901            with self.check_context(arc.open(), filter):
3902                self.expect_file('targetdir/target', size=3)
3903                self.expect_file('linkdir/hardlink', size=3)
3904                self.expect_file('linkdir/hardlink2', size=3)
3905                if os_helper.can_symlink():
3906                    self.expect_file('linkdir/symlink', size=3,
3907                                     symlink_to='hardlink')
3908                    self.expect_file('symlink2', size=3,
3909                                     symlink_to='linkdir/hardlink2')
3910                else:
3911                    self.expect_file('linkdir/symlink', size=3)
3912                    self.expect_file('symlink2', size=3)
3913
3914    def test_modes(self):
3915        # Test how file modes are extracted
3916        # (Note that the modes are ignored on platforms without working chmod)
3917        with ArchiveMaker() as arc:
3918            arc.add('all_bits', mode='?rwsrwsrwt')
3919            arc.add('perm_bits', mode='?rwxrwxrwx')
3920            arc.add('exec_group_other', mode='?rw-rwxrwx')
3921            arc.add('read_group_only', mode='?---r-----')
3922            arc.add('no_bits', mode='?---------')
3923            arc.add('dir/', mode='?---rwsrwt')
3924            arc.add('dir_all_bits/', mode='?rwsrwsrwt')
3925
3926        # On some systems, setting the uid, gid, and/or sticky bit is a no-ops.
3927        # Check which bits we can set, so we can compare tarfile machinery to
3928        # a simple chmod.
3929        tmp_filename = os.path.join(TEMPDIR, "tmp.file")
3930        with open(tmp_filename, 'w'):
3931            pass
3932        try:
3933            new_mode = (os.stat(tmp_filename).st_mode
3934                        | stat.S_ISVTX | stat.S_ISGID | stat.S_ISUID)
3935            try:
3936                os.chmod(tmp_filename, new_mode)
3937            except OSError as exc:
3938                if exc.errno == getattr(errno, "EFTYPE", 0):
3939                    # gh-108948: On FreeBSD, regular users cannot set
3940                    # the sticky bit.
3941                    self.skipTest("chmod() failed with EFTYPE: "
3942                                  "regular users cannot set sticky bit")
3943                else:
3944                    raise
3945
3946            got_mode = os.stat(tmp_filename).st_mode
3947            _t_file = 't' if (got_mode & stat.S_ISVTX) else 'x'
3948            _suid_file = 's' if (got_mode & stat.S_ISUID) else 'x'
3949            _sgid_file = 's' if (got_mode & stat.S_ISGID) else 'x'
3950        finally:
3951            os.unlink(tmp_filename)
3952
3953        os.mkdir(tmp_filename)
3954        new_mode = (os.stat(tmp_filename).st_mode
3955                    | stat.S_ISVTX | stat.S_ISGID | stat.S_ISUID)
3956        os.chmod(tmp_filename, new_mode)
3957        got_mode = os.stat(tmp_filename).st_mode
3958        _t_dir = 't' if (got_mode & stat.S_ISVTX) else 'x'
3959        _suid_dir = 's' if (got_mode & stat.S_ISUID) else 'x'
3960        _sgid_dir = 's' if (got_mode & stat.S_ISGID) else 'x'
3961        os.rmdir(tmp_filename)
3962
3963        with self.check_context(arc.open(), 'fully_trusted'):
3964            self.expect_file('all_bits',
3965                             mode=f'?rw{_suid_file}rw{_sgid_file}rw{_t_file}')
3966            self.expect_file('perm_bits', mode='?rwxrwxrwx')
3967            self.expect_file('exec_group_other', mode='?rw-rwxrwx')
3968            self.expect_file('read_group_only', mode='?---r-----')
3969            self.expect_file('no_bits', mode='?---------')
3970            self.expect_file('dir/', mode=f'?---rw{_sgid_dir}rw{_t_dir}')
3971            self.expect_file('dir_all_bits/',
3972                             mode=f'?rw{_suid_dir}rw{_sgid_dir}rw{_t_dir}')
3973
3974        with self.check_context(arc.open(), 'tar'):
3975            self.expect_file('all_bits', mode='?rwxr-xr-x')
3976            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3977            self.expect_file('exec_group_other', mode='?rw-r-xr-x')
3978            self.expect_file('read_group_only', mode='?---r-----')
3979            self.expect_file('no_bits', mode='?---------')
3980            self.expect_file('dir/', mode='?---r-xr-x')
3981            self.expect_file('dir_all_bits/', mode='?rwxr-xr-x')
3982
3983        with self.check_context(arc.open(), 'data'):
3984            normal_dir_mode = stat.filemode(stat.S_IMODE(
3985                self.outerdir.stat().st_mode))
3986            self.expect_file('all_bits', mode='?rwxr-xr-x')
3987            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3988            self.expect_file('exec_group_other', mode='?rw-r--r--')
3989            self.expect_file('read_group_only', mode='?rw-r-----')
3990            self.expect_file('no_bits', mode='?rw-------')
3991            self.expect_file('dir/', mode=normal_dir_mode)
3992            self.expect_file('dir_all_bits/', mode=normal_dir_mode)
3993
3994    def test_pipe(self):
3995        # Test handling of a special file
3996        with ArchiveMaker() as arc:
3997            arc.add('foo', type=tarfile.FIFOTYPE)
3998
3999        for filter in 'fully_trusted', 'tar':
4000            with self.check_context(arc.open(), filter):
4001                if hasattr(os, 'mkfifo'):
4002                    self.expect_file('foo', type=tarfile.FIFOTYPE)
4003                else:
4004                    # The pipe can't be extracted and is skipped.
4005                    pass
4006
4007        with self.check_context(arc.open(), 'data'):
4008            self.expect_exception(
4009                tarfile.SpecialFileError,
4010                "'foo' is a special file")
4011
4012    def test_special_files(self):
4013        # Creating device files is tricky. Instead of attempting that let's
4014        # only check the filter result.
4015        for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE:
4016            tarinfo = tarfile.TarInfo('foo')
4017            tarinfo.type = special_type
4018            trusted = tarfile.fully_trusted_filter(tarinfo, '')
4019            self.assertIs(trusted, tarinfo)
4020            tar = tarfile.tar_filter(tarinfo, '')
4021            self.assertEqual(tar.type, special_type)
4022            with self.assertRaises(tarfile.SpecialFileError) as cm:
4023                tarfile.data_filter(tarinfo, '')
4024            self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo)
4025            self.assertEqual(cm.exception.tarinfo.name, 'foo')
4026
4027    def test_fully_trusted_filter(self):
4028        # The 'fully_trusted' filter returns the original TarInfo objects.
4029        with tarfile.TarFile.open(tarname) as tar:
4030            for tarinfo in tar.getmembers():
4031                filtered = tarfile.fully_trusted_filter(tarinfo, '')
4032                self.assertIs(filtered, tarinfo)
4033
4034    def test_tar_filter(self):
4035        # The 'tar' filter returns TarInfo objects with the same name/type.
4036        # (It can also fail for particularly "evil" input, but we don't have
4037        # that in the test archive.)
4038        with tarfile.TarFile.open(tarname) as tar:
4039            for tarinfo in tar.getmembers():
4040                filtered = tarfile.tar_filter(tarinfo, '')
4041                self.assertIs(filtered.name, tarinfo.name)
4042                self.assertIs(filtered.type, tarinfo.type)
4043
4044    def test_data_filter(self):
4045        # The 'data' filter either raises, or returns TarInfo with the same
4046        # name/type.
4047        with tarfile.TarFile.open(tarname) as tar:
4048            for tarinfo in tar.getmembers():
4049                try:
4050                    filtered = tarfile.data_filter(tarinfo, '')
4051                except tarfile.FilterError:
4052                    continue
4053                self.assertIs(filtered.name, tarinfo.name)
4054                self.assertIs(filtered.type, tarinfo.type)
4055
4056    def test_default_filter_warns(self):
4057        """Ensure the default filter warns"""
4058        with ArchiveMaker() as arc:
4059            arc.add('foo')
4060        with warnings_helper.check_warnings(
4061                ('Python 3.14', DeprecationWarning)):
4062            with self.check_context(arc.open(), None):
4063                self.expect_file('foo')
4064
4065    def test_change_default_filter_on_instance(self):
4066        tar = tarfile.TarFile(tarname, 'r')
4067        def strict_filter(tarinfo, path):
4068            if tarinfo.name == 'ustar/regtype':
4069                return tarinfo
4070            else:
4071                return None
4072        tar.extraction_filter = strict_filter
4073        with self.check_context(tar, None):
4074            self.expect_file('ustar/regtype')
4075
4076    def test_change_default_filter_on_class(self):
4077        def strict_filter(tarinfo, path):
4078            if tarinfo.name == 'ustar/regtype':
4079                return tarinfo
4080            else:
4081                return None
4082        tar = tarfile.TarFile(tarname, 'r')
4083        with support.swap_attr(tarfile.TarFile, 'extraction_filter',
4084                               staticmethod(strict_filter)):
4085            with self.check_context(tar, None):
4086                self.expect_file('ustar/regtype')
4087
4088    def test_change_default_filter_on_subclass(self):
4089        class TarSubclass(tarfile.TarFile):
4090            def extraction_filter(self, tarinfo, path):
4091                if tarinfo.name == 'ustar/regtype':
4092                    return tarinfo
4093                else:
4094                    return None
4095
4096        tar = TarSubclass(tarname, 'r')
4097        with self.check_context(tar, None):
4098            self.expect_file('ustar/regtype')
4099
4100    def test_change_default_filter_to_string(self):
4101        tar = tarfile.TarFile(tarname, 'r')
4102        tar.extraction_filter = 'data'
4103        with self.check_context(tar, None):
4104            self.expect_exception(TypeError)
4105
4106    def test_custom_filter(self):
4107        def custom_filter(tarinfo, path):
4108            self.assertIs(path, self.destdir)
4109            if tarinfo.name == 'move_this':
4110                return tarinfo.replace(name='moved')
4111            if tarinfo.name == 'ignore_this':
4112                return None
4113            return tarinfo
4114
4115        with ArchiveMaker() as arc:
4116            arc.add('move_this')
4117            arc.add('ignore_this')
4118            arc.add('keep')
4119        with self.check_context(arc.open(), custom_filter):
4120            self.expect_file('moved')
4121            self.expect_file('keep')
4122
4123    def test_bad_filter_name(self):
4124        with ArchiveMaker() as arc:
4125            arc.add('foo')
4126        with self.check_context(arc.open(), 'bad filter name'):
4127            self.expect_exception(ValueError)
4128
4129    def test_stateful_filter(self):
4130        # Stateful filters should be possible.
4131        # (This doesn't really test tarfile. Rather, it demonstrates
4132        # that third parties can implement a stateful filter.)
4133        class StatefulFilter:
4134            def __enter__(self):
4135                self.num_files_processed = 0
4136                return self
4137
4138            def __call__(self, tarinfo, path):
4139                try:
4140                    tarinfo = tarfile.data_filter(tarinfo, path)
4141                except tarfile.FilterError:
4142                    return None
4143                self.num_files_processed += 1
4144                return tarinfo
4145
4146            def __exit__(self, *exc_info):
4147                self.done = True
4148
4149        with ArchiveMaker() as arc:
4150            arc.add('good')
4151            arc.add('bad', symlink_to='/')
4152            arc.add('good')
4153        with StatefulFilter() as custom_filter:
4154            with self.check_context(arc.open(), custom_filter):
4155                self.expect_file('good')
4156        self.assertEqual(custom_filter.num_files_processed, 2)
4157        self.assertEqual(custom_filter.done, True)
4158
4159    def test_errorlevel(self):
4160        def extracterror_filter(tarinfo, path):
4161            raise tarfile.ExtractError('failed with ExtractError')
4162        def filtererror_filter(tarinfo, path):
4163            raise tarfile.FilterError('failed with FilterError')
4164        def oserror_filter(tarinfo, path):
4165            raise OSError('failed with OSError')
4166        def tarerror_filter(tarinfo, path):
4167            raise tarfile.TarError('failed with base TarError')
4168        def valueerror_filter(tarinfo, path):
4169            raise ValueError('failed with ValueError')
4170
4171        with ArchiveMaker() as arc:
4172            arc.add('file')
4173
4174        # If errorlevel is 0, errors affected by errorlevel are ignored
4175
4176        with self.check_context(arc.open(errorlevel=0), extracterror_filter):
4177            self.expect_file('file')
4178
4179        with self.check_context(arc.open(errorlevel=0), filtererror_filter):
4180            self.expect_file('file')
4181
4182        with self.check_context(arc.open(errorlevel=0), oserror_filter):
4183            self.expect_file('file')
4184
4185        with self.check_context(arc.open(errorlevel=0), tarerror_filter):
4186            self.expect_exception(tarfile.TarError)
4187
4188        with self.check_context(arc.open(errorlevel=0), valueerror_filter):
4189            self.expect_exception(ValueError)
4190
4191        # If 1, all fatal errors are raised
4192
4193        with self.check_context(arc.open(errorlevel=1), extracterror_filter):
4194            self.expect_file('file')
4195
4196        with self.check_context(arc.open(errorlevel=1), filtererror_filter):
4197            self.expect_exception(tarfile.FilterError)
4198
4199        with self.check_context(arc.open(errorlevel=1), oserror_filter):
4200            self.expect_exception(OSError)
4201
4202        with self.check_context(arc.open(errorlevel=1), tarerror_filter):
4203            self.expect_exception(tarfile.TarError)
4204
4205        with self.check_context(arc.open(errorlevel=1), valueerror_filter):
4206            self.expect_exception(ValueError)
4207
4208        # If 2, all non-fatal errors are raised as well.
4209
4210        with self.check_context(arc.open(errorlevel=2), extracterror_filter):
4211            self.expect_exception(tarfile.ExtractError)
4212
4213        with self.check_context(arc.open(errorlevel=2), filtererror_filter):
4214            self.expect_exception(tarfile.FilterError)
4215
4216        with self.check_context(arc.open(errorlevel=2), oserror_filter):
4217            self.expect_exception(OSError)
4218
4219        with self.check_context(arc.open(errorlevel=2), tarerror_filter):
4220            self.expect_exception(tarfile.TarError)
4221
4222        with self.check_context(arc.open(errorlevel=2), valueerror_filter):
4223            self.expect_exception(ValueError)
4224
4225        # We only handle ExtractionError, FilterError & OSError specially.
4226
4227        with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter):
4228            self.expect_exception(TypeError)  # errorlevel is not int
4229
4230
4231class OverwriteTests(archiver_tests.OverwriteTests, unittest.TestCase):
4232    testdir = os.path.join(TEMPDIR, "testoverwrite")
4233
4234    @classmethod
4235    def setUpClass(cls):
4236        p = cls.ar_with_file = os.path.join(TEMPDIR, 'tar-with-file.tar')
4237        cls.addClassCleanup(os_helper.unlink, p)
4238        with tarfile.open(p, 'w') as tar:
4239            t = tarfile.TarInfo('test')
4240            t.size = 10
4241            tar.addfile(t, io.BytesIO(b'newcontent'))
4242
4243        p = cls.ar_with_dir = os.path.join(TEMPDIR, 'tar-with-dir.tar')
4244        cls.addClassCleanup(os_helper.unlink, p)
4245        with tarfile.open(p, 'w') as tar:
4246            tar.addfile(tar.gettarinfo(os.curdir, 'test'))
4247
4248        p = os.path.join(TEMPDIR, 'tar-with-implicit-dir.tar')
4249        cls.ar_with_implicit_dir = p
4250        cls.addClassCleanup(os_helper.unlink, p)
4251        with tarfile.open(p, 'w') as tar:
4252            t = tarfile.TarInfo('test/file')
4253            t.size = 10
4254            tar.addfile(t, io.BytesIO(b'newcontent'))
4255
4256    def open(self, path):
4257        return tarfile.open(path, 'r')
4258
4259    def extractall(self, ar):
4260        ar.extractall(self.testdir, filter='fully_trusted')
4261
4262
4263def setUpModule():
4264    os_helper.unlink(TEMPDIR)
4265    os.makedirs(TEMPDIR)
4266
4267    global testtarnames
4268    testtarnames = [tarname]
4269    with open(tarname, "rb") as fobj:
4270        data = fobj.read()
4271
4272    # Create compressed tarfiles.
4273    for c in GzipTest, Bz2Test, LzmaTest:
4274        if c.open:
4275            os_helper.unlink(c.tarname)
4276            testtarnames.append(c.tarname)
4277            with c.open(c.tarname, "wb") as tar:
4278                tar.write(data)
4279
4280def tearDownModule():
4281    if os.path.exists(TEMPDIR):
4282        os_helper.rmtree(TEMPDIR)
4283
4284if __name__ == "__main__":
4285    unittest.main()
4286