• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import os
3import io
4from hashlib import md5
5from contextlib import contextmanager
6from random import Random
7import pathlib
8
9import unittest
10import unittest.mock
11import tarfile
12
13from test import support
14from test.support import script_helper
15
16# Check for our compression modules.
17try:
18    import gzip
19except ImportError:
20    gzip = None
21try:
22    import bz2
23except ImportError:
24    bz2 = None
25try:
26    import lzma
27except ImportError:
28    lzma = None
29
30def md5sum(data):
31    return md5(data).hexdigest()
32
33TEMPDIR = os.path.abspath(support.TESTFN) + "-tardir"
34tarextdir = TEMPDIR + '-extract-test'
35tarname = support.findfile("testtar.tar")
36gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
37bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
38xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
39tmpname = os.path.join(TEMPDIR, "tmp.tar")
40dotlessname = os.path.join(TEMPDIR, "testtar")
41
42md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
43md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
44
45
46class TarTest:
47    tarname = tarname
48    suffix = ''
49    open = io.FileIO
50    taropen = tarfile.TarFile.taropen
51
52    @property
53    def mode(self):
54        return self.prefix + self.suffix
55
56@support.requires_gzip
57class GzipTest:
58    tarname = gzipname
59    suffix = 'gz'
60    open = gzip.GzipFile if gzip else None
61    taropen = tarfile.TarFile.gzopen
62
63@support.requires_bz2
64class Bz2Test:
65    tarname = bz2name
66    suffix = 'bz2'
67    open = bz2.BZ2File if bz2 else None
68    taropen = tarfile.TarFile.bz2open
69
70@support.requires_lzma
71class LzmaTest:
72    tarname = xzname
73    suffix = 'xz'
74    open = lzma.LZMAFile if lzma else None
75    taropen = tarfile.TarFile.xzopen
76
77
78class ReadTest(TarTest):
79
80    prefix = "r:"
81
82    def setUp(self):
83        self.tar = tarfile.open(self.tarname, mode=self.mode,
84                                encoding="iso8859-1")
85
86    def tearDown(self):
87        self.tar.close()
88
89
90class UstarReadTest(ReadTest, unittest.TestCase):
91
92    def test_fileobj_regular_file(self):
93        tarinfo = self.tar.getmember("ustar/regtype")
94        with self.tar.extractfile(tarinfo) as fobj:
95            data = fobj.read()
96            self.assertEqual(len(data), tarinfo.size,
97                    "regular file extraction failed")
98            self.assertEqual(md5sum(data), md5_regtype,
99                    "regular file extraction failed")
100
101    def test_fileobj_readlines(self):
102        self.tar.extract("ustar/regtype", TEMPDIR)
103        tarinfo = self.tar.getmember("ustar/regtype")
104        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
105            lines1 = fobj1.readlines()
106
107        with self.tar.extractfile(tarinfo) as fobj:
108            fobj2 = io.TextIOWrapper(fobj)
109            lines2 = fobj2.readlines()
110            self.assertEqual(lines1, lines2,
111                    "fileobj.readlines() failed")
112            self.assertEqual(len(lines2), 114,
113                    "fileobj.readlines() failed")
114            self.assertEqual(lines2[83],
115                    "I will gladly admit that Python is not the fastest "
116                    "running scripting language.\n",
117                    "fileobj.readlines() failed")
118
119    def test_fileobj_iter(self):
120        self.tar.extract("ustar/regtype", TEMPDIR)
121        tarinfo = self.tar.getmember("ustar/regtype")
122        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
123            lines1 = fobj1.readlines()
124        with self.tar.extractfile(tarinfo) as fobj2:
125            lines2 = list(io.TextIOWrapper(fobj2))
126            self.assertEqual(lines1, lines2,
127                    "fileobj.__iter__() failed")
128
129    def test_fileobj_seek(self):
130        self.tar.extract("ustar/regtype", TEMPDIR)
131        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
132            data = fobj.read()
133
134        tarinfo = self.tar.getmember("ustar/regtype")
135        fobj = self.tar.extractfile(tarinfo)
136
137        text = fobj.read()
138        fobj.seek(0)
139        self.assertEqual(0, fobj.tell(),
140                     "seek() to file's start failed")
141        fobj.seek(2048, 0)
142        self.assertEqual(2048, fobj.tell(),
143                     "seek() to absolute position failed")
144        fobj.seek(-1024, 1)
145        self.assertEqual(1024, fobj.tell(),
146                     "seek() to negative relative position failed")
147        fobj.seek(1024, 1)
148        self.assertEqual(2048, fobj.tell(),
149                     "seek() to positive relative position failed")
150        s = fobj.read(10)
151        self.assertEqual(s, data[2048:2058],
152                     "read() after seek failed")
153        fobj.seek(0, 2)
154        self.assertEqual(tarinfo.size, fobj.tell(),
155                     "seek() to file's end failed")
156        self.assertEqual(fobj.read(), b"",
157                     "read() at file's end did not return empty string")
158        fobj.seek(-tarinfo.size, 2)
159        self.assertEqual(0, fobj.tell(),
160                     "relative seek() to file's end failed")
161        fobj.seek(512)
162        s1 = fobj.readlines()
163        fobj.seek(512)
164        s2 = fobj.readlines()
165        self.assertEqual(s1, s2,
166                     "readlines() after seek failed")
167        fobj.seek(0)
168        self.assertEqual(len(fobj.readline()), fobj.tell(),
169                     "tell() after readline() failed")
170        fobj.seek(512)
171        self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
172                     "tell() after seek() and readline() failed")
173        fobj.seek(0)
174        line = fobj.readline()
175        self.assertEqual(fobj.read(), data[len(line):],
176                     "read() after readline() failed")
177        fobj.close()
178
179    def test_fileobj_text(self):
180        with self.tar.extractfile("ustar/regtype") as fobj:
181            fobj = io.TextIOWrapper(fobj)
182            data = fobj.read().encode("iso8859-1")
183            self.assertEqual(md5sum(data), md5_regtype)
184            try:
185                fobj.seek(100)
186            except AttributeError:
187                # Issue #13815: seek() complained about a missing
188                # flush() method.
189                self.fail("seeking failed in text mode")
190
191    # Test if symbolic and hard links are resolved by extractfile().  The
192    # test link members each point to a regular member whose data is
193    # supposed to be exported.
194    def _test_fileobj_link(self, lnktype, regtype):
195        with self.tar.extractfile(lnktype) as a, \
196             self.tar.extractfile(regtype) as b:
197            self.assertEqual(a.name, b.name)
198
199    def test_fileobj_link1(self):
200        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
201
202    def test_fileobj_link2(self):
203        self._test_fileobj_link("./ustar/linktest2/lnktype",
204                                "ustar/linktest1/regtype")
205
206    def test_fileobj_symlink1(self):
207        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
208
209    def test_fileobj_symlink2(self):
210        self._test_fileobj_link("./ustar/linktest2/symtype",
211                                "ustar/linktest1/regtype")
212
213    def test_issue14160(self):
214        self._test_fileobj_link("symtype2", "ustar/regtype")
215
216class GzipUstarReadTest(GzipTest, UstarReadTest):
217    pass
218
219class Bz2UstarReadTest(Bz2Test, UstarReadTest):
220    pass
221
222class LzmaUstarReadTest(LzmaTest, UstarReadTest):
223    pass
224
225
226class ListTest(ReadTest, unittest.TestCase):
227
228    # Override setUp to use default encoding (UTF-8)
229    def setUp(self):
230        self.tar = tarfile.open(self.tarname, mode=self.mode)
231
232    def test_list(self):
233        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
234        with support.swap_attr(sys, 'stdout', tio):
235            self.tar.list(verbose=False)
236        out = tio.detach().getvalue()
237        self.assertIn(b'ustar/conttype', out)
238        self.assertIn(b'ustar/regtype', out)
239        self.assertIn(b'ustar/lnktype', out)
240        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
241        self.assertIn(b'./ustar/linktest2/symtype', out)
242        self.assertIn(b'./ustar/linktest2/lnktype', out)
243        # Make sure it puts trailing slash for directory
244        self.assertIn(b'ustar/dirtype/', out)
245        self.assertIn(b'ustar/dirtype-with-size/', out)
246        # Make sure it is able to print unencodable characters
247        def conv(b):
248            s = b.decode(self.tar.encoding, 'surrogateescape')
249            return s.encode('ascii', 'backslashreplace')
250        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
251        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
252                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
253        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
254                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
255        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
256        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
257        # Make sure it prints files separated by one newline without any
258        # 'ls -l'-like accessories if verbose flag is not being used
259        # ...
260        # ustar/conttype
261        # ustar/regtype
262        # ...
263        self.assertRegex(out, br'ustar/conttype ?\r?\n'
264                              br'ustar/regtype ?\r?\n')
265        # Make sure it does not print the source of link without verbose flag
266        self.assertNotIn(b'link to', out)
267        self.assertNotIn(b'->', out)
268
269    def test_list_verbose(self):
270        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
271        with support.swap_attr(sys, 'stdout', tio):
272            self.tar.list(verbose=True)
273        out = tio.detach().getvalue()
274        # Make sure it prints files separated by one newline with 'ls -l'-like
275        # accessories if verbose flag is being used
276        # ...
277        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
278        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
279        # ...
280        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
281                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
282                               br'ustar/\w+type ?\r?\n') * 2)
283        # Make sure it prints the source of link with verbose flag
284        self.assertIn(b'ustar/symtype -> regtype', out)
285        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
286        self.assertIn(b'./ustar/linktest2/lnktype link to '
287                      b'./ustar/linktest1/regtype', out)
288        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
289                      (b'/123' * 125) + b'/longname', out)
290        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
291                      (b'/123' * 125) + b'/longname', out)
292
293    def test_list_members(self):
294        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
295        def members(tar):
296            for tarinfo in tar.getmembers():
297                if 'reg' in tarinfo.name:
298                    yield tarinfo
299        with support.swap_attr(sys, 'stdout', tio):
300            self.tar.list(verbose=False, members=members(self.tar))
301        out = tio.detach().getvalue()
302        self.assertIn(b'ustar/regtype', out)
303        self.assertNotIn(b'ustar/conttype', out)
304
305
306class GzipListTest(GzipTest, ListTest):
307    pass
308
309
310class Bz2ListTest(Bz2Test, ListTest):
311    pass
312
313
314class LzmaListTest(LzmaTest, ListTest):
315    pass
316
317
318class CommonReadTest(ReadTest):
319
320    def test_empty_tarfile(self):
321        # Test for issue6123: Allow opening empty archives.
322        # This test checks if tarfile.open() is able to open an empty tar
323        # archive successfully. Note that an empty tar archive is not the
324        # same as an empty file!
325        with tarfile.open(tmpname, self.mode.replace("r", "w")):
326            pass
327        try:
328            tar = tarfile.open(tmpname, self.mode)
329            tar.getnames()
330        except tarfile.ReadError:
331            self.fail("tarfile.open() failed on empty archive")
332        else:
333            self.assertListEqual(tar.getmembers(), [])
334        finally:
335            tar.close()
336
337    def test_non_existent_tarfile(self):
338        # Test for issue11513: prevent non-existent gzipped tarfiles raising
339        # multiple exceptions.
340        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
341            tarfile.open("xxx", self.mode)
342
343    def test_null_tarfile(self):
344        # Test for issue6123: Allow opening empty archives.
345        # This test guarantees that tarfile.open() does not treat an empty
346        # file as an empty tar archive.
347        with open(tmpname, "wb"):
348            pass
349        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
350        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
351
352    def test_ignore_zeros(self):
353        # Test TarFile's ignore_zeros option.
354        # generate 512 pseudorandom bytes
355        data = Random(0).getrandbits(512*8).to_bytes(512, 'big')
356        for char in (b'\0', b'a'):
357            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
358            # are ignored correctly.
359            with self.open(tmpname, "w") as fobj:
360                fobj.write(char * 1024)
361                tarinfo = tarfile.TarInfo("foo")
362                tarinfo.size = len(data)
363                fobj.write(tarinfo.tobuf())
364                fobj.write(data)
365
366            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
367            try:
368                self.assertListEqual(tar.getnames(), ["foo"],
369                    "ignore_zeros=True should have skipped the %r-blocks" %
370                    char)
371            finally:
372                tar.close()
373
374    def test_premature_end_of_archive(self):
375        for size in (512, 600, 1024, 1200):
376            with tarfile.open(tmpname, "w:") as tar:
377                t = tarfile.TarInfo("foo")
378                t.size = 1024
379                tar.addfile(t, io.BytesIO(b"a" * 1024))
380
381            with open(tmpname, "r+b") as fobj:
382                fobj.truncate(size)
383
384            with tarfile.open(tmpname) as tar:
385                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
386                    for t in tar:
387                        pass
388
389            with tarfile.open(tmpname) as tar:
390                t = tar.next()
391
392                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
393                    tar.extract(t, TEMPDIR)
394
395                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
396                    tar.extractfile(t).read()
397
398class MiscReadTestBase(CommonReadTest):
399    def requires_name_attribute(self):
400        pass
401
402    def test_no_name_argument(self):
403        self.requires_name_attribute()
404        with open(self.tarname, "rb") as fobj:
405            self.assertIsInstance(fobj.name, str)
406            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
407                self.assertIsInstance(tar.name, str)
408                self.assertEqual(tar.name, os.path.abspath(fobj.name))
409
410    def test_no_name_attribute(self):
411        with open(self.tarname, "rb") as fobj:
412            data = fobj.read()
413        fobj = io.BytesIO(data)
414        self.assertRaises(AttributeError, getattr, fobj, "name")
415        tar = tarfile.open(fileobj=fobj, mode=self.mode)
416        self.assertIsNone(tar.name)
417
418    def test_empty_name_attribute(self):
419        with open(self.tarname, "rb") as fobj:
420            data = fobj.read()
421        fobj = io.BytesIO(data)
422        fobj.name = ""
423        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
424            self.assertIsNone(tar.name)
425
426    def test_int_name_attribute(self):
427        # Issue 21044: tarfile.open() should handle fileobj with an integer
428        # 'name' attribute.
429        fd = os.open(self.tarname, os.O_RDONLY)
430        with open(fd, 'rb') as fobj:
431            self.assertIsInstance(fobj.name, int)
432            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
433                self.assertIsNone(tar.name)
434
435    def test_bytes_name_attribute(self):
436        self.requires_name_attribute()
437        tarname = os.fsencode(self.tarname)
438        with open(tarname, 'rb') as fobj:
439            self.assertIsInstance(fobj.name, bytes)
440            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
441                self.assertIsInstance(tar.name, bytes)
442                self.assertEqual(tar.name, os.path.abspath(fobj.name))
443
444    def test_pathlike_name(self):
445        tarname = pathlib.Path(self.tarname)
446        with tarfile.open(tarname, mode=self.mode) as tar:
447            self.assertIsInstance(tar.name, str)
448            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
449        with self.taropen(tarname) as tar:
450            self.assertIsInstance(tar.name, str)
451            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
452        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
453            self.assertIsInstance(tar.name, str)
454            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
455        if self.suffix == '':
456            with tarfile.TarFile(tarname, mode='r') as tar:
457                self.assertIsInstance(tar.name, str)
458                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
459
460    def test_illegal_mode_arg(self):
461        with open(tmpname, 'wb'):
462            pass
463        with self.assertRaisesRegex(ValueError, 'mode must be '):
464            tar = self.taropen(tmpname, 'q')
465        with self.assertRaisesRegex(ValueError, 'mode must be '):
466            tar = self.taropen(tmpname, 'rw')
467        with self.assertRaisesRegex(ValueError, 'mode must be '):
468            tar = self.taropen(tmpname, '')
469
470    def test_fileobj_with_offset(self):
471        # Skip the first member and store values from the second member
472        # of the testtar.
473        tar = tarfile.open(self.tarname, mode=self.mode)
474        try:
475            tar.next()
476            t = tar.next()
477            name = t.name
478            offset = t.offset
479            with tar.extractfile(t) as f:
480                data = f.read()
481        finally:
482            tar.close()
483
484        # Open the testtar and seek to the offset of the second member.
485        with self.open(self.tarname) as fobj:
486            fobj.seek(offset)
487
488            # Test if the tarfile starts with the second member.
489            tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
490            t = tar.next()
491            self.assertEqual(t.name, name)
492            # Read to the end of fileobj and test if seeking back to the
493            # beginning works.
494            tar.getmembers()
495            self.assertEqual(tar.extractfile(t).read(), data,
496                    "seek back did not work")
497            tar.close()
498
499    def test_fail_comp(self):
500        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
501        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
502        with open(tarname, "rb") as fobj:
503            self.assertRaises(tarfile.ReadError, tarfile.open,
504                              fileobj=fobj, mode=self.mode)
505
506    def test_v7_dirtype(self):
507        # Test old style dirtype member (bug #1336623):
508        # Old V7 tars create directory members using an AREGTYPE
509        # header with a "/" appended to the filename field.
510        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
511        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
512                "v7 dirtype failed")
513
514    def test_xstar_type(self):
515        # The xstar format stores extra atime and ctime fields inside the
516        # space reserved for the prefix field. The prefix field must be
517        # ignored in this case, otherwise it will mess up the name.
518        try:
519            self.tar.getmember("misc/regtype-xstar")
520        except KeyError:
521            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
522
523    def test_check_members(self):
524        for tarinfo in self.tar:
525            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
526                    "wrong mtime for %s" % tarinfo.name)
527            if not tarinfo.name.startswith("ustar/"):
528                continue
529            self.assertEqual(tarinfo.uname, "tarfile",
530                    "wrong uname for %s" % tarinfo.name)
531
532    def test_find_members(self):
533        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
534                "could not find all members")
535
536    @unittest.skipUnless(hasattr(os, "link"),
537                         "Missing hardlink implementation")
538    @support.skip_unless_symlink
539    def test_extract_hardlink(self):
540        # Test hardlink extraction (e.g. bug #857297).
541        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
542            tar.extract("ustar/regtype", TEMPDIR)
543            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
544
545            tar.extract("ustar/lnktype", TEMPDIR)
546            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
547            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
548                data = f.read()
549            self.assertEqual(md5sum(data), md5_regtype)
550
551            tar.extract("ustar/symtype", TEMPDIR)
552            self.addCleanup(support.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
553            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
554                data = f.read()
555            self.assertEqual(md5sum(data), md5_regtype)
556
557    def test_extractall(self):
558        # Test if extractall() correctly restores directory permissions
559        # and times (see issue1735).
560        tar = tarfile.open(tarname, encoding="iso8859-1")
561        DIR = os.path.join(TEMPDIR, "extractall")
562        os.mkdir(DIR)
563        try:
564            directories = [t for t in tar if t.isdir()]
565            tar.extractall(DIR, directories)
566            for tarinfo in directories:
567                path = os.path.join(DIR, tarinfo.name)
568                if sys.platform != "win32":
569                    # Win32 has no support for fine grained permissions.
570                    self.assertEqual(tarinfo.mode & 0o777,
571                                     os.stat(path).st_mode & 0o777)
572                def format_mtime(mtime):
573                    if isinstance(mtime, float):
574                        return "{} ({})".format(mtime, mtime.hex())
575                    else:
576                        return "{!r} (int)".format(mtime)
577                file_mtime = os.path.getmtime(path)
578                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
579                    format_mtime(tarinfo.mtime),
580                    format_mtime(file_mtime),
581                    path)
582                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
583        finally:
584            tar.close()
585            support.rmtree(DIR)
586
587    def test_extract_directory(self):
588        dirtype = "ustar/dirtype"
589        DIR = os.path.join(TEMPDIR, "extractdir")
590        os.mkdir(DIR)
591        try:
592            with tarfile.open(tarname, encoding="iso8859-1") as tar:
593                tarinfo = tar.getmember(dirtype)
594                tar.extract(tarinfo, path=DIR)
595                extracted = os.path.join(DIR, dirtype)
596                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
597                if sys.platform != "win32":
598                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
599        finally:
600            support.rmtree(DIR)
601
602    def test_extractall_pathlike_name(self):
603        DIR = pathlib.Path(TEMPDIR) / "extractall"
604        with support.temp_dir(DIR), \
605             tarfile.open(tarname, encoding="iso8859-1") as tar:
606            directories = [t for t in tar if t.isdir()]
607            tar.extractall(DIR, directories)
608            for tarinfo in directories:
609                path = DIR / tarinfo.name
610                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
611
612    def test_extract_pathlike_name(self):
613        dirtype = "ustar/dirtype"
614        DIR = pathlib.Path(TEMPDIR) / "extractall"
615        with support.temp_dir(DIR), \
616             tarfile.open(tarname, encoding="iso8859-1") as tar:
617            tarinfo = tar.getmember(dirtype)
618            tar.extract(tarinfo, path=DIR)
619            extracted = DIR / dirtype
620            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
621
622    def test_init_close_fobj(self):
623        # Issue #7341: Close the internal file object in the TarFile
624        # constructor in case of an error. For the test we rely on
625        # the fact that opening an empty file raises a ReadError.
626        empty = os.path.join(TEMPDIR, "empty")
627        with open(empty, "wb") as fobj:
628            fobj.write(b"")
629
630        try:
631            tar = object.__new__(tarfile.TarFile)
632            try:
633                tar.__init__(empty)
634            except tarfile.ReadError:
635                self.assertTrue(tar.fileobj.closed)
636            else:
637                self.fail("ReadError not raised")
638        finally:
639            support.unlink(empty)
640
641    def test_parallel_iteration(self):
642        # Issue #16601: Restarting iteration over tarfile continued
643        # from where it left off.
644        with tarfile.open(self.tarname) as tar:
645            for m1, m2 in zip(tar, tar):
646                self.assertEqual(m1.offset, m2.offset)
647                self.assertEqual(m1.get_info(), m2.get_info())
648
649class MiscReadTest(MiscReadTestBase, unittest.TestCase):
650    test_fail_comp = None
651
652class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
653    pass
654
655class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
656    def requires_name_attribute(self):
657        self.skipTest("BZ2File have no name attribute")
658
659class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
660    def requires_name_attribute(self):
661        self.skipTest("LZMAFile have no name attribute")
662
663
664class StreamReadTest(CommonReadTest, unittest.TestCase):
665
666    prefix="r|"
667
668    def test_read_through(self):
669        # Issue #11224: A poorly designed _FileInFile.read() method
670        # caused seeking errors with stream tar files.
671        for tarinfo in self.tar:
672            if not tarinfo.isreg():
673                continue
674            with self.tar.extractfile(tarinfo) as fobj:
675                while True:
676                    try:
677                        buf = fobj.read(512)
678                    except tarfile.StreamError:
679                        self.fail("simple read-through using "
680                                  "TarFile.extractfile() failed")
681                    if not buf:
682                        break
683
684    def test_fileobj_regular_file(self):
685        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
686        with self.tar.extractfile(tarinfo) as fobj:
687            data = fobj.read()
688        self.assertEqual(len(data), tarinfo.size,
689                "regular file extraction failed")
690        self.assertEqual(md5sum(data), md5_regtype,
691                "regular file extraction failed")
692
693    def test_provoke_stream_error(self):
694        tarinfos = self.tar.getmembers()
695        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
696            self.assertRaises(tarfile.StreamError, f.read)
697
698    def test_compare_members(self):
699        tar1 = tarfile.open(tarname, encoding="iso8859-1")
700        try:
701            tar2 = self.tar
702
703            while True:
704                t1 = tar1.next()
705                t2 = tar2.next()
706                if t1 is None:
707                    break
708                self.assertIsNotNone(t2, "stream.next() failed.")
709
710                if t2.islnk() or t2.issym():
711                    with self.assertRaises(tarfile.StreamError):
712                        tar2.extractfile(t2)
713                    continue
714
715                v1 = tar1.extractfile(t1)
716                v2 = tar2.extractfile(t2)
717                if v1 is None:
718                    continue
719                self.assertIsNotNone(v2, "stream.extractfile() failed")
720                self.assertEqual(v1.read(), v2.read(),
721                        "stream extraction failed")
722        finally:
723            tar1.close()
724
725class GzipStreamReadTest(GzipTest, StreamReadTest):
726    pass
727
728class Bz2StreamReadTest(Bz2Test, StreamReadTest):
729    pass
730
731class LzmaStreamReadTest(LzmaTest, StreamReadTest):
732    pass
733
734
735class DetectReadTest(TarTest, unittest.TestCase):
736    def _testfunc_file(self, name, mode):
737        try:
738            tar = tarfile.open(name, mode)
739        except tarfile.ReadError as e:
740            self.fail()
741        else:
742            tar.close()
743
744    def _testfunc_fileobj(self, name, mode):
745        try:
746            with open(name, "rb") as f:
747                tar = tarfile.open(name, mode, fileobj=f)
748        except tarfile.ReadError as e:
749            self.fail()
750        else:
751            tar.close()
752
753    def _test_modes(self, testfunc):
754        if self.suffix:
755            with self.assertRaises(tarfile.ReadError):
756                tarfile.open(tarname, mode="r:" + self.suffix)
757            with self.assertRaises(tarfile.ReadError):
758                tarfile.open(tarname, mode="r|" + self.suffix)
759            with self.assertRaises(tarfile.ReadError):
760                tarfile.open(self.tarname, mode="r:")
761            with self.assertRaises(tarfile.ReadError):
762                tarfile.open(self.tarname, mode="r|")
763        testfunc(self.tarname, "r")
764        testfunc(self.tarname, "r:" + self.suffix)
765        testfunc(self.tarname, "r:*")
766        testfunc(self.tarname, "r|" + self.suffix)
767        testfunc(self.tarname, "r|*")
768
769    def test_detect_file(self):
770        self._test_modes(self._testfunc_file)
771
772    def test_detect_fileobj(self):
773        self._test_modes(self._testfunc_fileobj)
774
775class GzipDetectReadTest(GzipTest, DetectReadTest):
776    pass
777
778class Bz2DetectReadTest(Bz2Test, DetectReadTest):
779    def test_detect_stream_bz2(self):
780        # Originally, tarfile's stream detection looked for the string
781        # "BZh91" at the start of the file. This is incorrect because
782        # the '9' represents the blocksize (900,000 bytes). If the file was
783        # compressed using another blocksize autodetection fails.
784        with open(tarname, "rb") as fobj:
785            data = fobj.read()
786
787        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
788        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
789            fobj.write(data)
790
791        self._testfunc_file(tmpname, "r|*")
792
793class LzmaDetectReadTest(LzmaTest, DetectReadTest):
794    pass
795
796
797class MemberReadTest(ReadTest, unittest.TestCase):
798
799    def _test_member(self, tarinfo, chksum=None, **kwargs):
800        if chksum is not None:
801            with self.tar.extractfile(tarinfo) as f:
802                self.assertEqual(md5sum(f.read()), chksum,
803                        "wrong md5sum for %s" % tarinfo.name)
804
805        kwargs["mtime"] = 0o7606136617
806        kwargs["uid"] = 1000
807        kwargs["gid"] = 100
808        if "old-v7" not in tarinfo.name:
809            # V7 tar can't handle alphabetic owners.
810            kwargs["uname"] = "tarfile"
811            kwargs["gname"] = "tarfile"
812        for k, v in kwargs.items():
813            self.assertEqual(getattr(tarinfo, k), v,
814                    "wrong value in %s field of %s" % (k, tarinfo.name))
815
816    def test_find_regtype(self):
817        tarinfo = self.tar.getmember("ustar/regtype")
818        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
819
820    def test_find_conttype(self):
821        tarinfo = self.tar.getmember("ustar/conttype")
822        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
823
824    def test_find_dirtype(self):
825        tarinfo = self.tar.getmember("ustar/dirtype")
826        self._test_member(tarinfo, size=0)
827
828    def test_find_dirtype_with_size(self):
829        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
830        self._test_member(tarinfo, size=255)
831
832    def test_find_lnktype(self):
833        tarinfo = self.tar.getmember("ustar/lnktype")
834        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
835
836    def test_find_symtype(self):
837        tarinfo = self.tar.getmember("ustar/symtype")
838        self._test_member(tarinfo, size=0, linkname="regtype")
839
840    def test_find_blktype(self):
841        tarinfo = self.tar.getmember("ustar/blktype")
842        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
843
844    def test_find_chrtype(self):
845        tarinfo = self.tar.getmember("ustar/chrtype")
846        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
847
848    def test_find_fifotype(self):
849        tarinfo = self.tar.getmember("ustar/fifotype")
850        self._test_member(tarinfo, size=0)
851
852    def test_find_sparse(self):
853        tarinfo = self.tar.getmember("ustar/sparse")
854        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
855
856    def test_find_gnusparse(self):
857        tarinfo = self.tar.getmember("gnu/sparse")
858        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
859
860    def test_find_gnusparse_00(self):
861        tarinfo = self.tar.getmember("gnu/sparse-0.0")
862        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
863
864    def test_find_gnusparse_01(self):
865        tarinfo = self.tar.getmember("gnu/sparse-0.1")
866        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
867
868    def test_find_gnusparse_10(self):
869        tarinfo = self.tar.getmember("gnu/sparse-1.0")
870        self._test_member(tarinfo, size=86016, chksum=md5_sparse)
871
872    def test_find_umlauts(self):
873        tarinfo = self.tar.getmember("ustar/umlauts-"
874                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
875        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
876
877    def test_find_ustar_longname(self):
878        name = "ustar/" + "12345/" * 39 + "1234567/longname"
879        self.assertIn(name, self.tar.getnames())
880
881    def test_find_regtype_oldv7(self):
882        tarinfo = self.tar.getmember("misc/regtype-old-v7")
883        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
884
885    def test_find_pax_umlauts(self):
886        self.tar.close()
887        self.tar = tarfile.open(self.tarname, mode=self.mode,
888                                encoding="iso8859-1")
889        tarinfo = self.tar.getmember("pax/umlauts-"
890                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
891        self._test_member(tarinfo, size=7011, chksum=md5_regtype)
892
893
894class LongnameTest:
895
896    def test_read_longname(self):
897        # Test reading of longname (bug #1471427).
898        longname = self.subdir + "/" + "123/" * 125 + "longname"
899        try:
900            tarinfo = self.tar.getmember(longname)
901        except KeyError:
902            self.fail("longname not found")
903        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
904                "read longname as dirtype")
905
906    def test_read_longlink(self):
907        longname = self.subdir + "/" + "123/" * 125 + "longname"
908        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
909        try:
910            tarinfo = self.tar.getmember(longlink)
911        except KeyError:
912            self.fail("longlink not found")
913        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
914
915    def test_truncated_longname(self):
916        longname = self.subdir + "/" + "123/" * 125 + "longname"
917        tarinfo = self.tar.getmember(longname)
918        offset = tarinfo.offset
919        self.tar.fileobj.seek(offset)
920        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
921        with self.assertRaises(tarfile.ReadError):
922            tarfile.open(name="foo.tar", fileobj=fobj)
923
924    def test_header_offset(self):
925        # Test if the start offset of the TarInfo object includes
926        # the preceding extended header.
927        longname = self.subdir + "/" + "123/" * 125 + "longname"
928        offset = self.tar.getmember(longname).offset
929        with open(tarname, "rb") as fobj:
930            fobj.seek(offset)
931            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
932                                              "iso8859-1", "strict")
933            self.assertEqual(tarinfo.type, self.longnametype)
934
935
936class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
937
938    subdir = "gnu"
939    longnametype = tarfile.GNUTYPE_LONGNAME
940
941    # Since 3.2 tarfile is supposed to accurately restore sparse members and
942    # produce files with holes. This is what we actually want to test here.
943    # Unfortunately, not all platforms/filesystems support sparse files, and
944    # even on platforms that do it is non-trivial to make reliable assertions
945    # about holes in files. Therefore, we first do one basic test which works
946    # an all platforms, and after that a test that will work only on
947    # platforms/filesystems that prove to support sparse files.
948    def _test_sparse_file(self, name):
949        self.tar.extract(name, TEMPDIR)
950        filename = os.path.join(TEMPDIR, name)
951        with open(filename, "rb") as fobj:
952            data = fobj.read()
953        self.assertEqual(md5sum(data), md5_sparse,
954                "wrong md5sum for %s" % name)
955
956        if self._fs_supports_holes():
957            s = os.stat(filename)
958            self.assertLess(s.st_blocks * 512, s.st_size)
959
960    def test_sparse_file_old(self):
961        self._test_sparse_file("gnu/sparse")
962
963    def test_sparse_file_00(self):
964        self._test_sparse_file("gnu/sparse-0.0")
965
966    def test_sparse_file_01(self):
967        self._test_sparse_file("gnu/sparse-0.1")
968
969    def test_sparse_file_10(self):
970        self._test_sparse_file("gnu/sparse-1.0")
971
972    @staticmethod
973    def _fs_supports_holes():
974        # Return True if the platform knows the st_blocks stat attribute and
975        # uses st_blocks units of 512 bytes, and if the filesystem is able to
976        # store holes of 4 KiB in files.
977        #
978        # The function returns False if page size is larger than 4 KiB.
979        # For example, ppc64 uses pages of 64 KiB.
980        if sys.platform.startswith("linux"):
981            # Linux evidentially has 512 byte st_blocks units.
982            name = os.path.join(TEMPDIR, "sparse-test")
983            with open(name, "wb") as fobj:
984                # Seek to "punch a hole" of 4 KiB
985                fobj.seek(4096)
986                fobj.write(b'x' * 4096)
987                fobj.truncate()
988            s = os.stat(name)
989            support.unlink(name)
990            return (s.st_blocks * 512 < s.st_size)
991        else:
992            return False
993
994
995class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
996
997    subdir = "pax"
998    longnametype = tarfile.XHDTYPE
999
1000    def test_pax_global_headers(self):
1001        tar = tarfile.open(tarname, encoding="iso8859-1")
1002        try:
1003            tarinfo = tar.getmember("pax/regtype1")
1004            self.assertEqual(tarinfo.uname, "foo")
1005            self.assertEqual(tarinfo.gname, "bar")
1006            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1007                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1008
1009            tarinfo = tar.getmember("pax/regtype2")
1010            self.assertEqual(tarinfo.uname, "")
1011            self.assertEqual(tarinfo.gname, "bar")
1012            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1013                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1014
1015            tarinfo = tar.getmember("pax/regtype3")
1016            self.assertEqual(tarinfo.uname, "tarfile")
1017            self.assertEqual(tarinfo.gname, "tarfile")
1018            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1019                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1020        finally:
1021            tar.close()
1022
1023    def test_pax_number_fields(self):
1024        # All following number fields are read from the pax header.
1025        tar = tarfile.open(tarname, encoding="iso8859-1")
1026        try:
1027            tarinfo = tar.getmember("pax/regtype4")
1028            self.assertEqual(tarinfo.size, 7011)
1029            self.assertEqual(tarinfo.uid, 123)
1030            self.assertEqual(tarinfo.gid, 123)
1031            self.assertEqual(tarinfo.mtime, 1041808783.0)
1032            self.assertEqual(type(tarinfo.mtime), float)
1033            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1034            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1035        finally:
1036            tar.close()
1037
1038
1039class WriteTestBase(TarTest):
1040    # Put all write tests in here that are supposed to be tested
1041    # in all possible mode combinations.
1042
1043    def test_fileobj_no_close(self):
1044        fobj = io.BytesIO()
1045        tar = tarfile.open(fileobj=fobj, mode=self.mode)
1046        tar.addfile(tarfile.TarInfo("foo"))
1047        tar.close()
1048        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1049        # Issue #20238: Incomplete gzip output with mode="w:gz"
1050        data = fobj.getvalue()
1051        del tar
1052        support.gc_collect()
1053        self.assertFalse(fobj.closed)
1054        self.assertEqual(data, fobj.getvalue())
1055
1056    def test_eof_marker(self):
1057        # Make sure an end of archive marker is written (two zero blocks).
1058        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1059        # So, we create an archive that has exactly 10240 bytes without the
1060        # marker, and has 20480 bytes once the marker is written.
1061        with tarfile.open(tmpname, self.mode) as tar:
1062            t = tarfile.TarInfo("foo")
1063            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1064            tar.addfile(t, io.BytesIO(b"a" * t.size))
1065
1066        with self.open(tmpname, "rb") as fobj:
1067            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1068
1069
1070class WriteTest(WriteTestBase, unittest.TestCase):
1071
1072    prefix = "w:"
1073
1074    def test_100_char_name(self):
1075        # The name field in a tar header stores strings of at most 100 chars.
1076        # If a string is shorter than 100 chars it has to be padded with '\0',
1077        # which implies that a string of exactly 100 chars is stored without
1078        # a trailing '\0'.
1079        name = "0123456789" * 10
1080        tar = tarfile.open(tmpname, self.mode)
1081        try:
1082            t = tarfile.TarInfo(name)
1083            tar.addfile(t)
1084        finally:
1085            tar.close()
1086
1087        tar = tarfile.open(tmpname)
1088        try:
1089            self.assertEqual(tar.getnames()[0], name,
1090                    "failed to store 100 char filename")
1091        finally:
1092            tar.close()
1093
1094    def test_tar_size(self):
1095        # Test for bug #1013882.
1096        tar = tarfile.open(tmpname, self.mode)
1097        try:
1098            path = os.path.join(TEMPDIR, "file")
1099            with open(path, "wb") as fobj:
1100                fobj.write(b"aaa")
1101            tar.add(path)
1102        finally:
1103            tar.close()
1104        self.assertGreater(os.path.getsize(tmpname), 0,
1105                "tarfile is empty")
1106
1107    # The test_*_size tests test for bug #1167128.
1108    def test_file_size(self):
1109        tar = tarfile.open(tmpname, self.mode)
1110        try:
1111            path = os.path.join(TEMPDIR, "file")
1112            with open(path, "wb"):
1113                pass
1114            tarinfo = tar.gettarinfo(path)
1115            self.assertEqual(tarinfo.size, 0)
1116
1117            with open(path, "wb") as fobj:
1118                fobj.write(b"aaa")
1119            tarinfo = tar.gettarinfo(path)
1120            self.assertEqual(tarinfo.size, 3)
1121        finally:
1122            tar.close()
1123
1124    def test_directory_size(self):
1125        path = os.path.join(TEMPDIR, "directory")
1126        os.mkdir(path)
1127        try:
1128            tar = tarfile.open(tmpname, self.mode)
1129            try:
1130                tarinfo = tar.gettarinfo(path)
1131                self.assertEqual(tarinfo.size, 0)
1132            finally:
1133                tar.close()
1134        finally:
1135            support.rmdir(path)
1136
1137    # mock the following:
1138    #  os.listdir: so we know that files are in the wrong order
1139    def test_ordered_recursion(self):
1140        path = os.path.join(TEMPDIR, "directory")
1141        os.mkdir(path)
1142        open(os.path.join(path, "1"), "a").close()
1143        open(os.path.join(path, "2"), "a").close()
1144        try:
1145            tar = tarfile.open(tmpname, self.mode)
1146            try:
1147                with unittest.mock.patch('os.listdir') as mock_listdir:
1148                    mock_listdir.return_value = ["2", "1"]
1149                    tar.add(path)
1150                paths = []
1151                for m in tar.getmembers():
1152                    paths.append(os.path.split(m.name)[-1])
1153                self.assertEqual(paths, ["directory", "1", "2"]);
1154            finally:
1155                tar.close()
1156        finally:
1157            support.unlink(os.path.join(path, "1"))
1158            support.unlink(os.path.join(path, "2"))
1159            support.rmdir(path)
1160
1161    def test_gettarinfo_pathlike_name(self):
1162        with tarfile.open(tmpname, self.mode) as tar:
1163            path = pathlib.Path(TEMPDIR) / "file"
1164            with open(path, "wb") as fobj:
1165                fobj.write(b"aaa")
1166            tarinfo = tar.gettarinfo(path)
1167            tarinfo2 = tar.gettarinfo(os.fspath(path))
1168            self.assertIsInstance(tarinfo.name, str)
1169            self.assertEqual(tarinfo.name, tarinfo2.name)
1170            self.assertEqual(tarinfo.size, 3)
1171
1172    @unittest.skipUnless(hasattr(os, "link"),
1173                         "Missing hardlink implementation")
1174    def test_link_size(self):
1175        link = os.path.join(TEMPDIR, "link")
1176        target = os.path.join(TEMPDIR, "link_target")
1177        with open(target, "wb") as fobj:
1178            fobj.write(b"aaa")
1179        try:
1180            os.link(target, link)
1181        except PermissionError as e:
1182            self.skipTest('os.link(): %s' % e)
1183        try:
1184            tar = tarfile.open(tmpname, self.mode)
1185            try:
1186                # Record the link target in the inodes list.
1187                tar.gettarinfo(target)
1188                tarinfo = tar.gettarinfo(link)
1189                self.assertEqual(tarinfo.size, 0)
1190            finally:
1191                tar.close()
1192        finally:
1193            support.unlink(target)
1194            support.unlink(link)
1195
1196    @support.skip_unless_symlink
1197    def test_symlink_size(self):
1198        path = os.path.join(TEMPDIR, "symlink")
1199        os.symlink("link_target", path)
1200        try:
1201            tar = tarfile.open(tmpname, self.mode)
1202            try:
1203                tarinfo = tar.gettarinfo(path)
1204                self.assertEqual(tarinfo.size, 0)
1205            finally:
1206                tar.close()
1207        finally:
1208            support.unlink(path)
1209
1210    def test_add_self(self):
1211        # Test for #1257255.
1212        dstname = os.path.abspath(tmpname)
1213        tar = tarfile.open(tmpname, self.mode)
1214        try:
1215            self.assertEqual(tar.name, dstname,
1216                    "archive name must be absolute")
1217            tar.add(dstname)
1218            self.assertEqual(tar.getnames(), [],
1219                    "added the archive to itself")
1220
1221            with support.change_cwd(TEMPDIR):
1222                tar.add(dstname)
1223            self.assertEqual(tar.getnames(), [],
1224                    "added the archive to itself")
1225        finally:
1226            tar.close()
1227
1228    def test_filter(self):
1229        tempdir = os.path.join(TEMPDIR, "filter")
1230        os.mkdir(tempdir)
1231        try:
1232            for name in ("foo", "bar", "baz"):
1233                name = os.path.join(tempdir, name)
1234                support.create_empty_file(name)
1235
1236            def filter(tarinfo):
1237                if os.path.basename(tarinfo.name) == "bar":
1238                    return
1239                tarinfo.uid = 123
1240                tarinfo.uname = "foo"
1241                return tarinfo
1242
1243            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1244            try:
1245                tar.add(tempdir, arcname="empty_dir", filter=filter)
1246            finally:
1247                tar.close()
1248
1249            # Verify that filter is a keyword-only argument
1250            with self.assertRaises(TypeError):
1251                tar.add(tempdir, "empty_dir", True, None, filter)
1252
1253            tar = tarfile.open(tmpname, "r")
1254            try:
1255                for tarinfo in tar:
1256                    self.assertEqual(tarinfo.uid, 123)
1257                    self.assertEqual(tarinfo.uname, "foo")
1258                self.assertEqual(len(tar.getmembers()), 3)
1259            finally:
1260                tar.close()
1261        finally:
1262            support.rmtree(tempdir)
1263
1264    # Guarantee that stored pathnames are not modified. Don't
1265    # remove ./ or ../ or double slashes. Still make absolute
1266    # pathnames relative.
1267    # For details see bug #6054.
1268    def _test_pathname(self, path, cmp_path=None, dir=False):
1269        # Create a tarfile with an empty member named path
1270        # and compare the stored name with the original.
1271        foo = os.path.join(TEMPDIR, "foo")
1272        if not dir:
1273            support.create_empty_file(foo)
1274        else:
1275            os.mkdir(foo)
1276
1277        tar = tarfile.open(tmpname, self.mode)
1278        try:
1279            tar.add(foo, arcname=path)
1280        finally:
1281            tar.close()
1282
1283        tar = tarfile.open(tmpname, "r")
1284        try:
1285            t = tar.next()
1286        finally:
1287            tar.close()
1288
1289        if not dir:
1290            support.unlink(foo)
1291        else:
1292            support.rmdir(foo)
1293
1294        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1295
1296
1297    @support.skip_unless_symlink
1298    def test_extractall_symlinks(self):
1299        # Test if extractall works properly when tarfile contains symlinks
1300        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1301        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1302        os.mkdir(tempdir)
1303        try:
1304            source_file = os.path.join(tempdir,'source')
1305            target_file = os.path.join(tempdir,'symlink')
1306            with open(source_file,'w') as f:
1307                f.write('something\n')
1308            os.symlink(source_file, target_file)
1309            tar = tarfile.open(temparchive,'w')
1310            tar.add(source_file)
1311            tar.add(target_file)
1312            tar.close()
1313            # Let's extract it to the location which contains the symlink
1314            tar = tarfile.open(temparchive,'r')
1315            # this should not raise OSError: [Errno 17] File exists
1316            try:
1317                tar.extractall(path=tempdir)
1318            except OSError:
1319                self.fail("extractall failed with symlinked files")
1320            finally:
1321                tar.close()
1322        finally:
1323            support.unlink(temparchive)
1324            support.rmtree(tempdir)
1325
1326    def test_pathnames(self):
1327        self._test_pathname("foo")
1328        self._test_pathname(os.path.join("foo", ".", "bar"))
1329        self._test_pathname(os.path.join("foo", "..", "bar"))
1330        self._test_pathname(os.path.join(".", "foo"))
1331        self._test_pathname(os.path.join(".", "foo", "."))
1332        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1333        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1334        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1335        self._test_pathname(os.path.join("..", "foo"))
1336        self._test_pathname(os.path.join("..", "foo", ".."))
1337        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1338        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1339
1340        self._test_pathname("foo" + os.sep + os.sep + "bar")
1341        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1342
1343    def test_abs_pathnames(self):
1344        if sys.platform == "win32":
1345            self._test_pathname("C:\\foo", "foo")
1346        else:
1347            self._test_pathname("/foo", "foo")
1348            self._test_pathname("///foo", "foo")
1349
1350    def test_cwd(self):
1351        # Test adding the current working directory.
1352        with support.change_cwd(TEMPDIR):
1353            tar = tarfile.open(tmpname, self.mode)
1354            try:
1355                tar.add(".")
1356            finally:
1357                tar.close()
1358
1359            tar = tarfile.open(tmpname, "r")
1360            try:
1361                for t in tar:
1362                    if t.name != ".":
1363                        self.assertTrue(t.name.startswith("./"), t.name)
1364            finally:
1365                tar.close()
1366
1367    def test_open_nonwritable_fileobj(self):
1368        for exctype in OSError, EOFError, RuntimeError:
1369            class BadFile(io.BytesIO):
1370                first = True
1371                def write(self, data):
1372                    if self.first:
1373                        self.first = False
1374                        raise exctype
1375
1376            f = BadFile()
1377            with self.assertRaises(exctype):
1378                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1379                                   format=tarfile.PAX_FORMAT,
1380                                   pax_headers={'non': 'empty'})
1381            self.assertFalse(f.closed)
1382
1383class GzipWriteTest(GzipTest, WriteTest):
1384    pass
1385
1386class Bz2WriteTest(Bz2Test, WriteTest):
1387    pass
1388
1389class LzmaWriteTest(LzmaTest, WriteTest):
1390    pass
1391
1392
1393class StreamWriteTest(WriteTestBase, unittest.TestCase):
1394
1395    prefix = "w|"
1396    decompressor = None
1397
1398    def test_stream_padding(self):
1399        # Test for bug #1543303.
1400        tar = tarfile.open(tmpname, self.mode)
1401        tar.close()
1402        if self.decompressor:
1403            dec = self.decompressor()
1404            with open(tmpname, "rb") as fobj:
1405                data = fobj.read()
1406            data = dec.decompress(data)
1407            self.assertFalse(dec.unused_data, "found trailing data")
1408        else:
1409            with self.open(tmpname) as fobj:
1410                data = fobj.read()
1411        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1412                        "incorrect zero padding")
1413
1414    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1415                         "Missing umask implementation")
1416    def test_file_mode(self):
1417        # Test for issue #8464: Create files with correct
1418        # permissions.
1419        if os.path.exists(tmpname):
1420            support.unlink(tmpname)
1421
1422        original_umask = os.umask(0o022)
1423        try:
1424            tar = tarfile.open(tmpname, self.mode)
1425            tar.close()
1426            mode = os.stat(tmpname).st_mode & 0o777
1427            self.assertEqual(mode, 0o644, "wrong file permissions")
1428        finally:
1429            os.umask(original_umask)
1430
1431class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1432    pass
1433
1434class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1435    decompressor = bz2.BZ2Decompressor if bz2 else None
1436
1437class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1438    decompressor = lzma.LZMADecompressor if lzma else None
1439
1440
1441class GNUWriteTest(unittest.TestCase):
1442    # This testcase checks for correct creation of GNU Longname
1443    # and Longlink extended headers (cp. bug #812325).
1444
1445    def _length(self, s):
1446        blocks = len(s) // 512 + 1
1447        return blocks * 512
1448
1449    def _calc_size(self, name, link=None):
1450        # Initial tar header
1451        count = 512
1452
1453        if len(name) > tarfile.LENGTH_NAME:
1454            # GNU longname extended header + longname
1455            count += 512
1456            count += self._length(name)
1457        if link is not None and len(link) > tarfile.LENGTH_LINK:
1458            # GNU longlink extended header + longlink
1459            count += 512
1460            count += self._length(link)
1461        return count
1462
1463    def _test(self, name, link=None):
1464        tarinfo = tarfile.TarInfo(name)
1465        if link:
1466            tarinfo.linkname = link
1467            tarinfo.type = tarfile.LNKTYPE
1468
1469        tar = tarfile.open(tmpname, "w")
1470        try:
1471            tar.format = tarfile.GNU_FORMAT
1472            tar.addfile(tarinfo)
1473
1474            v1 = self._calc_size(name, link)
1475            v2 = tar.offset
1476            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1477        finally:
1478            tar.close()
1479
1480        tar = tarfile.open(tmpname)
1481        try:
1482            member = tar.next()
1483            self.assertIsNotNone(member,
1484                    "unable to read longname member")
1485            self.assertEqual(tarinfo.name, member.name,
1486                    "unable to read longname member")
1487            self.assertEqual(tarinfo.linkname, member.linkname,
1488                    "unable to read longname member")
1489        finally:
1490            tar.close()
1491
1492    def test_longname_1023(self):
1493        self._test(("longnam/" * 127) + "longnam")
1494
1495    def test_longname_1024(self):
1496        self._test(("longnam/" * 127) + "longname")
1497
1498    def test_longname_1025(self):
1499        self._test(("longnam/" * 127) + "longname_")
1500
1501    def test_longlink_1023(self):
1502        self._test("name", ("longlnk/" * 127) + "longlnk")
1503
1504    def test_longlink_1024(self):
1505        self._test("name", ("longlnk/" * 127) + "longlink")
1506
1507    def test_longlink_1025(self):
1508        self._test("name", ("longlnk/" * 127) + "longlink_")
1509
1510    def test_longnamelink_1023(self):
1511        self._test(("longnam/" * 127) + "longnam",
1512                   ("longlnk/" * 127) + "longlnk")
1513
1514    def test_longnamelink_1024(self):
1515        self._test(("longnam/" * 127) + "longname",
1516                   ("longlnk/" * 127) + "longlink")
1517
1518    def test_longnamelink_1025(self):
1519        self._test(("longnam/" * 127) + "longname_",
1520                   ("longlnk/" * 127) + "longlink_")
1521
1522
1523class CreateTest(WriteTestBase, unittest.TestCase):
1524
1525    prefix = "x:"
1526
1527    file_path = os.path.join(TEMPDIR, "spameggs42")
1528
1529    def setUp(self):
1530        support.unlink(tmpname)
1531
1532    @classmethod
1533    def setUpClass(cls):
1534        with open(cls.file_path, "wb") as fobj:
1535            fobj.write(b"aaa")
1536
1537    @classmethod
1538    def tearDownClass(cls):
1539        support.unlink(cls.file_path)
1540
1541    def test_create(self):
1542        with tarfile.open(tmpname, self.mode) as tobj:
1543            tobj.add(self.file_path)
1544
1545        with self.taropen(tmpname) as tobj:
1546            names = tobj.getnames()
1547        self.assertEqual(len(names), 1)
1548        self.assertIn('spameggs42', names[0])
1549
1550    def test_create_existing(self):
1551        with tarfile.open(tmpname, self.mode) as tobj:
1552            tobj.add(self.file_path)
1553
1554        with self.assertRaises(FileExistsError):
1555            tobj = tarfile.open(tmpname, self.mode)
1556
1557        with self.taropen(tmpname) as tobj:
1558            names = tobj.getnames()
1559        self.assertEqual(len(names), 1)
1560        self.assertIn('spameggs42', names[0])
1561
1562    def test_create_taropen(self):
1563        with self.taropen(tmpname, "x") as tobj:
1564            tobj.add(self.file_path)
1565
1566        with self.taropen(tmpname) as tobj:
1567            names = tobj.getnames()
1568        self.assertEqual(len(names), 1)
1569        self.assertIn('spameggs42', names[0])
1570
1571    def test_create_existing_taropen(self):
1572        with self.taropen(tmpname, "x") as tobj:
1573            tobj.add(self.file_path)
1574
1575        with self.assertRaises(FileExistsError):
1576            with self.taropen(tmpname, "x"):
1577                pass
1578
1579        with self.taropen(tmpname) as tobj:
1580            names = tobj.getnames()
1581        self.assertEqual(len(names), 1)
1582        self.assertIn("spameggs42", names[0])
1583
1584    def test_create_pathlike_name(self):
1585        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1586            self.assertIsInstance(tobj.name, str)
1587            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1588            tobj.add(pathlib.Path(self.file_path))
1589            names = tobj.getnames()
1590        self.assertEqual(len(names), 1)
1591        self.assertIn('spameggs42', names[0])
1592
1593        with self.taropen(tmpname) as tobj:
1594            names = tobj.getnames()
1595        self.assertEqual(len(names), 1)
1596        self.assertIn('spameggs42', names[0])
1597
1598    def test_create_taropen_pathlike_name(self):
1599        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1600            self.assertIsInstance(tobj.name, str)
1601            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1602            tobj.add(pathlib.Path(self.file_path))
1603            names = tobj.getnames()
1604        self.assertEqual(len(names), 1)
1605        self.assertIn('spameggs42', names[0])
1606
1607        with self.taropen(tmpname) as tobj:
1608            names = tobj.getnames()
1609        self.assertEqual(len(names), 1)
1610        self.assertIn('spameggs42', names[0])
1611
1612
1613class GzipCreateTest(GzipTest, CreateTest):
1614    pass
1615
1616
1617class Bz2CreateTest(Bz2Test, CreateTest):
1618    pass
1619
1620
1621class LzmaCreateTest(LzmaTest, CreateTest):
1622    pass
1623
1624
1625class CreateWithXModeTest(CreateTest):
1626
1627    prefix = "x"
1628
1629    test_create_taropen = None
1630    test_create_existing_taropen = None
1631
1632
1633@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1634class HardlinkTest(unittest.TestCase):
1635    # Test the creation of LNKTYPE (hardlink) members in an archive.
1636
1637    def setUp(self):
1638        self.foo = os.path.join(TEMPDIR, "foo")
1639        self.bar = os.path.join(TEMPDIR, "bar")
1640
1641        with open(self.foo, "wb") as fobj:
1642            fobj.write(b"foo")
1643
1644        try:
1645            os.link(self.foo, self.bar)
1646        except PermissionError as e:
1647            self.skipTest('os.link(): %s' % e)
1648
1649        self.tar = tarfile.open(tmpname, "w")
1650        self.tar.add(self.foo)
1651
1652    def tearDown(self):
1653        self.tar.close()
1654        support.unlink(self.foo)
1655        support.unlink(self.bar)
1656
1657    def test_add_twice(self):
1658        # The same name will be added as a REGTYPE every
1659        # time regardless of st_nlink.
1660        tarinfo = self.tar.gettarinfo(self.foo)
1661        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1662                "add file as regular failed")
1663
1664    def test_add_hardlink(self):
1665        tarinfo = self.tar.gettarinfo(self.bar)
1666        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1667                "add file as hardlink failed")
1668
1669    def test_dereference_hardlink(self):
1670        self.tar.dereference = True
1671        tarinfo = self.tar.gettarinfo(self.bar)
1672        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1673                "dereferencing hardlink failed")
1674
1675
1676class PaxWriteTest(GNUWriteTest):
1677
1678    def _test(self, name, link=None):
1679        # See GNUWriteTest.
1680        tarinfo = tarfile.TarInfo(name)
1681        if link:
1682            tarinfo.linkname = link
1683            tarinfo.type = tarfile.LNKTYPE
1684
1685        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1686        try:
1687            tar.addfile(tarinfo)
1688        finally:
1689            tar.close()
1690
1691        tar = tarfile.open(tmpname)
1692        try:
1693            if link:
1694                l = tar.getmembers()[0].linkname
1695                self.assertEqual(link, l, "PAX longlink creation failed")
1696            else:
1697                n = tar.getmembers()[0].name
1698                self.assertEqual(name, n, "PAX longname creation failed")
1699        finally:
1700            tar.close()
1701
1702    def test_pax_global_header(self):
1703        pax_headers = {
1704                "foo": "bar",
1705                "uid": "0",
1706                "mtime": "1.23",
1707                "test": "\xe4\xf6\xfc",
1708                "\xe4\xf6\xfc": "test"}
1709
1710        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1711                pax_headers=pax_headers)
1712        try:
1713            tar.addfile(tarfile.TarInfo("test"))
1714        finally:
1715            tar.close()
1716
1717        # Test if the global header was written correctly.
1718        tar = tarfile.open(tmpname, encoding="iso8859-1")
1719        try:
1720            self.assertEqual(tar.pax_headers, pax_headers)
1721            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1722            # Test if all the fields are strings.
1723            for key, val in tar.pax_headers.items():
1724                self.assertIsNot(type(key), bytes)
1725                self.assertIsNot(type(val), bytes)
1726                if key in tarfile.PAX_NUMBER_FIELDS:
1727                    try:
1728                        tarfile.PAX_NUMBER_FIELDS[key](val)
1729                    except (TypeError, ValueError):
1730                        self.fail("unable to convert pax header field")
1731        finally:
1732            tar.close()
1733
1734    def test_pax_extended_header(self):
1735        # The fields from the pax header have priority over the
1736        # TarInfo.
1737        pax_headers = {"path": "foo", "uid": "123"}
1738
1739        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1740                           encoding="iso8859-1")
1741        try:
1742            t = tarfile.TarInfo()
1743            t.name = "\xe4\xf6\xfc" # non-ASCII
1744            t.uid = 8**8 # too large
1745            t.pax_headers = pax_headers
1746            tar.addfile(t)
1747        finally:
1748            tar.close()
1749
1750        tar = tarfile.open(tmpname, encoding="iso8859-1")
1751        try:
1752            t = tar.getmembers()[0]
1753            self.assertEqual(t.pax_headers, pax_headers)
1754            self.assertEqual(t.name, "foo")
1755            self.assertEqual(t.uid, 123)
1756        finally:
1757            tar.close()
1758
1759
1760class UnicodeTest:
1761
1762    def test_iso8859_1_filename(self):
1763        self._test_unicode_filename("iso8859-1")
1764
1765    def test_utf7_filename(self):
1766        self._test_unicode_filename("utf7")
1767
1768    def test_utf8_filename(self):
1769        self._test_unicode_filename("utf-8")
1770
1771    def _test_unicode_filename(self, encoding):
1772        tar = tarfile.open(tmpname, "w", format=self.format,
1773                           encoding=encoding, errors="strict")
1774        try:
1775            name = "\xe4\xf6\xfc"
1776            tar.addfile(tarfile.TarInfo(name))
1777        finally:
1778            tar.close()
1779
1780        tar = tarfile.open(tmpname, encoding=encoding)
1781        try:
1782            self.assertEqual(tar.getmembers()[0].name, name)
1783        finally:
1784            tar.close()
1785
1786    def test_unicode_filename_error(self):
1787        tar = tarfile.open(tmpname, "w", format=self.format,
1788                           encoding="ascii", errors="strict")
1789        try:
1790            tarinfo = tarfile.TarInfo()
1791
1792            tarinfo.name = "\xe4\xf6\xfc"
1793            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1794
1795            tarinfo.name = "foo"
1796            tarinfo.uname = "\xe4\xf6\xfc"
1797            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
1798        finally:
1799            tar.close()
1800
1801    def test_unicode_argument(self):
1802        tar = tarfile.open(tarname, "r",
1803                           encoding="iso8859-1", errors="strict")
1804        try:
1805            for t in tar:
1806                self.assertIs(type(t.name), str)
1807                self.assertIs(type(t.linkname), str)
1808                self.assertIs(type(t.uname), str)
1809                self.assertIs(type(t.gname), str)
1810        finally:
1811            tar.close()
1812
1813    def test_uname_unicode(self):
1814        t = tarfile.TarInfo("foo")
1815        t.uname = "\xe4\xf6\xfc"
1816        t.gname = "\xe4\xf6\xfc"
1817
1818        tar = tarfile.open(tmpname, mode="w", format=self.format,
1819                           encoding="iso8859-1")
1820        try:
1821            tar.addfile(t)
1822        finally:
1823            tar.close()
1824
1825        tar = tarfile.open(tmpname, encoding="iso8859-1")
1826        try:
1827            t = tar.getmember("foo")
1828            self.assertEqual(t.uname, "\xe4\xf6\xfc")
1829            self.assertEqual(t.gname, "\xe4\xf6\xfc")
1830
1831            if self.format != tarfile.PAX_FORMAT:
1832                tar.close()
1833                tar = tarfile.open(tmpname, encoding="ascii")
1834                t = tar.getmember("foo")
1835                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
1836                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
1837        finally:
1838            tar.close()
1839
1840
1841class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
1842
1843    format = tarfile.USTAR_FORMAT
1844
1845    # Test whether the utf-8 encoded version of a filename exceeds the 100
1846    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
1847    # bytes).
1848    def test_unicode_name1(self):
1849        self._test_ustar_name("0123456789" * 10)
1850        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
1851        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
1852        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
1853
1854    def test_unicode_name2(self):
1855        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
1856        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
1857
1858    # Test whether the utf-8 encoded version of a filename exceeds the 155
1859    # bytes prefix + '/' + 100 bytes name limit.
1860    def test_unicode_longname1(self):
1861        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
1862        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
1863        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
1864        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
1865
1866    def test_unicode_longname2(self):
1867        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
1868        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
1869
1870    def test_unicode_longname3(self):
1871        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
1872        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
1873        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
1874
1875    def test_unicode_longname4(self):
1876        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
1877        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
1878
1879    def _test_ustar_name(self, name, exc=None):
1880        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1881            t = tarfile.TarInfo(name)
1882            if exc is None:
1883                tar.addfile(t)
1884            else:
1885                self.assertRaises(exc, tar.addfile, t)
1886
1887        if exc is None:
1888            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1889                for t in tar:
1890                    self.assertEqual(name, t.name)
1891                    break
1892
1893    # Test the same as above for the 100 bytes link field.
1894    def test_unicode_link1(self):
1895        self._test_ustar_link("0123456789" * 10)
1896        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
1897        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
1898        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
1899
1900    def test_unicode_link2(self):
1901        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
1902        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
1903
1904    def _test_ustar_link(self, name, exc=None):
1905        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
1906            t = tarfile.TarInfo("foo")
1907            t.linkname = name
1908            if exc is None:
1909                tar.addfile(t)
1910            else:
1911                self.assertRaises(exc, tar.addfile, t)
1912
1913        if exc is None:
1914            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
1915                for t in tar:
1916                    self.assertEqual(name, t.linkname)
1917                    break
1918
1919
1920class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
1921
1922    format = tarfile.GNU_FORMAT
1923
1924    def test_bad_pax_header(self):
1925        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
1926        # without a hdrcharset=BINARY header.
1927        for encoding, name in (
1928                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
1929                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
1930            with tarfile.open(tarname, encoding=encoding,
1931                              errors="surrogateescape") as tar:
1932                try:
1933                    t = tar.getmember(name)
1934                except KeyError:
1935                    self.fail("unable to read bad GNU tar pax header")
1936
1937
1938class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
1939
1940    format = tarfile.PAX_FORMAT
1941
1942    # PAX_FORMAT ignores encoding in write mode.
1943    test_unicode_filename_error = None
1944
1945    def test_binary_header(self):
1946        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
1947        for encoding, name in (
1948                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
1949                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
1950            with tarfile.open(tarname, encoding=encoding,
1951                              errors="surrogateescape") as tar:
1952                try:
1953                    t = tar.getmember(name)
1954                except KeyError:
1955                    self.fail("unable to read POSIX.1-2008 binary header")
1956
1957
1958class AppendTestBase:
1959    # Test append mode (cp. patch #1652681).
1960
1961    def setUp(self):
1962        self.tarname = tmpname
1963        if os.path.exists(self.tarname):
1964            support.unlink(self.tarname)
1965
1966    def _create_testtar(self, mode="w:"):
1967        with tarfile.open(tarname, encoding="iso8859-1") as src:
1968            t = src.getmember("ustar/regtype")
1969            t.name = "foo"
1970            with src.extractfile(t) as f:
1971                with tarfile.open(self.tarname, mode) as tar:
1972                    tar.addfile(t, f)
1973
1974    def test_append_compressed(self):
1975        self._create_testtar("w:" + self.suffix)
1976        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
1977
1978class AppendTest(AppendTestBase, unittest.TestCase):
1979    test_append_compressed = None
1980
1981    def _add_testfile(self, fileobj=None):
1982        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
1983            tar.addfile(tarfile.TarInfo("bar"))
1984
1985    def _test(self, names=["bar"], fileobj=None):
1986        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
1987            self.assertEqual(tar.getnames(), names)
1988
1989    def test_non_existing(self):
1990        self._add_testfile()
1991        self._test()
1992
1993    def test_empty(self):
1994        tarfile.open(self.tarname, "w:").close()
1995        self._add_testfile()
1996        self._test()
1997
1998    def test_empty_fileobj(self):
1999        fobj = io.BytesIO(b"\0" * 1024)
2000        self._add_testfile(fobj)
2001        fobj.seek(0)
2002        self._test(fileobj=fobj)
2003
2004    def test_fileobj(self):
2005        self._create_testtar()
2006        with open(self.tarname, "rb") as fobj:
2007            data = fobj.read()
2008        fobj = io.BytesIO(data)
2009        self._add_testfile(fobj)
2010        fobj.seek(0)
2011        self._test(names=["foo", "bar"], fileobj=fobj)
2012
2013    def test_existing(self):
2014        self._create_testtar()
2015        self._add_testfile()
2016        self._test(names=["foo", "bar"])
2017
2018    # Append mode is supposed to fail if the tarfile to append to
2019    # does not end with a zero block.
2020    def _test_error(self, data):
2021        with open(self.tarname, "wb") as fobj:
2022            fobj.write(data)
2023        self.assertRaises(tarfile.ReadError, self._add_testfile)
2024
2025    def test_null(self):
2026        self._test_error(b"")
2027
2028    def test_incomplete(self):
2029        self._test_error(b"\0" * 13)
2030
2031    def test_premature_eof(self):
2032        data = tarfile.TarInfo("foo").tobuf()
2033        self._test_error(data)
2034
2035    def test_trailing_garbage(self):
2036        data = tarfile.TarInfo("foo").tobuf()
2037        self._test_error(data + b"\0" * 13)
2038
2039    def test_invalid(self):
2040        self._test_error(b"a" * 512)
2041
2042class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2043    pass
2044
2045class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2046    pass
2047
2048class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2049    pass
2050
2051
2052class LimitsTest(unittest.TestCase):
2053
2054    def test_ustar_limits(self):
2055        # 100 char name
2056        tarinfo = tarfile.TarInfo("0123456789" * 10)
2057        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2058
2059        # 101 char name that cannot be stored
2060        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2061        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2062
2063        # 256 char name with a slash at pos 156
2064        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2065        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2066
2067        # 256 char name that cannot be stored
2068        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2069        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2070
2071        # 512 char name
2072        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2073        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2074
2075        # 512 char linkname
2076        tarinfo = tarfile.TarInfo("longlink")
2077        tarinfo.linkname = "123/" * 126 + "longname"
2078        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2079
2080        # uid > 8 digits
2081        tarinfo = tarfile.TarInfo("name")
2082        tarinfo.uid = 0o10000000
2083        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2084
2085    def test_gnu_limits(self):
2086        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2087        tarinfo.tobuf(tarfile.GNU_FORMAT)
2088
2089        tarinfo = tarfile.TarInfo("longlink")
2090        tarinfo.linkname = "123/" * 126 + "longname"
2091        tarinfo.tobuf(tarfile.GNU_FORMAT)
2092
2093        # uid >= 256 ** 7
2094        tarinfo = tarfile.TarInfo("name")
2095        tarinfo.uid = 0o4000000000000000000
2096        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2097
2098    def test_pax_limits(self):
2099        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2100        tarinfo.tobuf(tarfile.PAX_FORMAT)
2101
2102        tarinfo = tarfile.TarInfo("longlink")
2103        tarinfo.linkname = "123/" * 126 + "longname"
2104        tarinfo.tobuf(tarfile.PAX_FORMAT)
2105
2106        tarinfo = tarfile.TarInfo("name")
2107        tarinfo.uid = 0o4000000000000000000
2108        tarinfo.tobuf(tarfile.PAX_FORMAT)
2109
2110
2111class MiscTest(unittest.TestCase):
2112
2113    def test_char_fields(self):
2114        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2115                         b"foo\0\0\0\0\0")
2116        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2117                         b"foo")
2118        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2119                         "foo")
2120        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2121                         "foo")
2122
2123    def test_read_number_fields(self):
2124        # Issue 13158: Test if GNU tar specific base-256 number fields
2125        # are decoded correctly.
2126        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2127        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2128        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2129                         0o10000000)
2130        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2131                         0xffffffff)
2132        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2133                         -1)
2134        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2135                         -100)
2136        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2137                         -0x100000000000000)
2138
2139        # Issue 24514: Test if empty number fields are converted to zero.
2140        self.assertEqual(tarfile.nti(b"\0"), 0)
2141        self.assertEqual(tarfile.nti(b"       \0"), 0)
2142
2143    def test_write_number_fields(self):
2144        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2145        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2146        self.assertEqual(tarfile.itn(0o10000000),
2147                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2148        self.assertEqual(tarfile.itn(0xffffffff),
2149                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2150        self.assertEqual(tarfile.itn(-1),
2151                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2152        self.assertEqual(tarfile.itn(-100),
2153                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2154        self.assertEqual(tarfile.itn(-0x100000000000000),
2155                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2156
2157        # Issue 32713: Test if itn() supports float values outside the
2158        # non-GNU format range
2159        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2160                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2161        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2162                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2163        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2164
2165    def test_number_field_limits(self):
2166        with self.assertRaises(ValueError):
2167            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2168        with self.assertRaises(ValueError):
2169            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2170        with self.assertRaises(ValueError):
2171            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2172        with self.assertRaises(ValueError):
2173            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2174
2175    def test__all__(self):
2176        blacklist = {'version', 'grp', 'pwd', 'symlink_exception',
2177                     'NUL', 'BLOCKSIZE', 'RECORDSIZE', 'GNU_MAGIC',
2178                     'POSIX_MAGIC', 'LENGTH_NAME', 'LENGTH_LINK',
2179                     'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2180                     'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE',
2181                     'CONTTYPE', 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK',
2182                     'GNUTYPE_SPARSE', 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE',
2183                     'SUPPORTED_TYPES', 'REGULAR_TYPES', 'GNU_TYPES',
2184                     'PAX_FIELDS', 'PAX_NAME_FIELDS', 'PAX_NUMBER_FIELDS',
2185                     'stn', 'nts', 'nti', 'itn', 'calc_chksums', 'copyfileobj',
2186                     'filemode',
2187                     'EmptyHeaderError', 'TruncatedHeaderError',
2188                     'EOFHeaderError', 'InvalidHeaderError',
2189                     'SubsequentHeaderError', 'ExFileObject',
2190                     'main'}
2191        support.check__all__(self, tarfile, blacklist=blacklist)
2192
2193
2194class CommandLineTest(unittest.TestCase):
2195
2196    def tarfilecmd(self, *args, **kwargs):
2197        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2198                                                      **kwargs)
2199        return out.replace(os.linesep.encode(), b'\n')
2200
2201    def tarfilecmd_failure(self, *args):
2202        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2203
2204    def make_simple_tarfile(self, tar_name):
2205        files = [support.findfile('tokenize_tests.txt'),
2206                 support.findfile('tokenize_tests-no-coding-cookie-'
2207                                  'and-utf8-bom-sig-only.txt')]
2208        self.addCleanup(support.unlink, tar_name)
2209        with tarfile.open(tar_name, 'w') as tf:
2210            for tardata in files:
2211                tf.add(tardata, arcname=os.path.basename(tardata))
2212
2213    def test_bad_use(self):
2214        rc, out, err = self.tarfilecmd_failure()
2215        self.assertEqual(out, b'')
2216        self.assertIn(b'usage', err.lower())
2217        self.assertIn(b'error', err.lower())
2218        self.assertIn(b'required', err.lower())
2219        rc, out, err = self.tarfilecmd_failure('-l', '')
2220        self.assertEqual(out, b'')
2221        self.assertNotEqual(err.strip(), b'')
2222
2223    def test_test_command(self):
2224        for tar_name in testtarnames:
2225            for opt in '-t', '--test':
2226                out = self.tarfilecmd(opt, tar_name)
2227                self.assertEqual(out, b'')
2228
2229    def test_test_command_verbose(self):
2230        for tar_name in testtarnames:
2231            for opt in '-v', '--verbose':
2232                out = self.tarfilecmd(opt, '-t', tar_name)
2233                self.assertIn(b'is a tar archive.\n', out)
2234
2235    def test_test_command_invalid_file(self):
2236        zipname = support.findfile('zipdir.zip')
2237        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2238        self.assertIn(b' is not a tar archive.', err)
2239        self.assertEqual(out, b'')
2240        self.assertEqual(rc, 1)
2241
2242        for tar_name in testtarnames:
2243            with self.subTest(tar_name=tar_name):
2244                with open(tar_name, 'rb') as f:
2245                    data = f.read()
2246                try:
2247                    with open(tmpname, 'wb') as f:
2248                        f.write(data[:511])
2249                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2250                    self.assertEqual(out, b'')
2251                    self.assertEqual(rc, 1)
2252                finally:
2253                    support.unlink(tmpname)
2254
2255    def test_list_command(self):
2256        for tar_name in testtarnames:
2257            with support.captured_stdout() as t:
2258                with tarfile.open(tar_name, 'r') as tf:
2259                    tf.list(verbose=False)
2260            expected = t.getvalue().encode('ascii', 'backslashreplace')
2261            for opt in '-l', '--list':
2262                out = self.tarfilecmd(opt, tar_name,
2263                                      PYTHONIOENCODING='ascii')
2264                self.assertEqual(out, expected)
2265
2266    def test_list_command_verbose(self):
2267        for tar_name in testtarnames:
2268            with support.captured_stdout() as t:
2269                with tarfile.open(tar_name, 'r') as tf:
2270                    tf.list(verbose=True)
2271            expected = t.getvalue().encode('ascii', 'backslashreplace')
2272            for opt in '-v', '--verbose':
2273                out = self.tarfilecmd(opt, '-l', tar_name,
2274                                      PYTHONIOENCODING='ascii')
2275                self.assertEqual(out, expected)
2276
2277    def test_list_command_invalid_file(self):
2278        zipname = support.findfile('zipdir.zip')
2279        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2280        self.assertIn(b' is not a tar archive.', err)
2281        self.assertEqual(out, b'')
2282        self.assertEqual(rc, 1)
2283
2284    def test_create_command(self):
2285        files = [support.findfile('tokenize_tests.txt'),
2286                 support.findfile('tokenize_tests-no-coding-cookie-'
2287                                  'and-utf8-bom-sig-only.txt')]
2288        for opt in '-c', '--create':
2289            try:
2290                out = self.tarfilecmd(opt, tmpname, *files)
2291                self.assertEqual(out, b'')
2292                with tarfile.open(tmpname) as tar:
2293                    tar.getmembers()
2294            finally:
2295                support.unlink(tmpname)
2296
2297    def test_create_command_verbose(self):
2298        files = [support.findfile('tokenize_tests.txt'),
2299                 support.findfile('tokenize_tests-no-coding-cookie-'
2300                                  'and-utf8-bom-sig-only.txt')]
2301        for opt in '-v', '--verbose':
2302            try:
2303                out = self.tarfilecmd(opt, '-c', tmpname, *files)
2304                self.assertIn(b' file created.', out)
2305                with tarfile.open(tmpname) as tar:
2306                    tar.getmembers()
2307            finally:
2308                support.unlink(tmpname)
2309
2310    def test_create_command_dotless_filename(self):
2311        files = [support.findfile('tokenize_tests.txt')]
2312        try:
2313            out = self.tarfilecmd('-c', dotlessname, *files)
2314            self.assertEqual(out, b'')
2315            with tarfile.open(dotlessname) as tar:
2316                tar.getmembers()
2317        finally:
2318            support.unlink(dotlessname)
2319
2320    def test_create_command_dot_started_filename(self):
2321        tar_name = os.path.join(TEMPDIR, ".testtar")
2322        files = [support.findfile('tokenize_tests.txt')]
2323        try:
2324            out = self.tarfilecmd('-c', tar_name, *files)
2325            self.assertEqual(out, b'')
2326            with tarfile.open(tar_name) as tar:
2327                tar.getmembers()
2328        finally:
2329            support.unlink(tar_name)
2330
2331    def test_create_command_compressed(self):
2332        files = [support.findfile('tokenize_tests.txt'),
2333                 support.findfile('tokenize_tests-no-coding-cookie-'
2334                                  'and-utf8-bom-sig-only.txt')]
2335        for filetype in (GzipTest, Bz2Test, LzmaTest):
2336            if not filetype.open:
2337                continue
2338            try:
2339                tar_name = tmpname + '.' + filetype.suffix
2340                out = self.tarfilecmd('-c', tar_name, *files)
2341                with filetype.taropen(tar_name) as tar:
2342                    tar.getmembers()
2343            finally:
2344                support.unlink(tar_name)
2345
2346    def test_extract_command(self):
2347        self.make_simple_tarfile(tmpname)
2348        for opt in '-e', '--extract':
2349            try:
2350                with support.temp_cwd(tarextdir):
2351                    out = self.tarfilecmd(opt, tmpname)
2352                self.assertEqual(out, b'')
2353            finally:
2354                support.rmtree(tarextdir)
2355
2356    def test_extract_command_verbose(self):
2357        self.make_simple_tarfile(tmpname)
2358        for opt in '-v', '--verbose':
2359            try:
2360                with support.temp_cwd(tarextdir):
2361                    out = self.tarfilecmd(opt, '-e', tmpname)
2362                self.assertIn(b' file is extracted.', out)
2363            finally:
2364                support.rmtree(tarextdir)
2365
2366    def test_extract_command_different_directory(self):
2367        self.make_simple_tarfile(tmpname)
2368        try:
2369            with support.temp_cwd(tarextdir):
2370                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2371            self.assertEqual(out, b'')
2372        finally:
2373            support.rmtree(tarextdir)
2374
2375    def test_extract_command_invalid_file(self):
2376        zipname = support.findfile('zipdir.zip')
2377        with support.temp_cwd(tarextdir):
2378            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2379        self.assertIn(b' is not a tar archive.', err)
2380        self.assertEqual(out, b'')
2381        self.assertEqual(rc, 1)
2382
2383
2384class ContextManagerTest(unittest.TestCase):
2385
2386    def test_basic(self):
2387        with tarfile.open(tarname) as tar:
2388            self.assertFalse(tar.closed, "closed inside runtime context")
2389        self.assertTrue(tar.closed, "context manager failed")
2390
2391    def test_closed(self):
2392        # The __enter__() method is supposed to raise OSError
2393        # if the TarFile object is already closed.
2394        tar = tarfile.open(tarname)
2395        tar.close()
2396        with self.assertRaises(OSError):
2397            with tar:
2398                pass
2399
2400    def test_exception(self):
2401        # Test if the OSError exception is passed through properly.
2402        with self.assertRaises(Exception) as exc:
2403            with tarfile.open(tarname) as tar:
2404                raise OSError
2405        self.assertIsInstance(exc.exception, OSError,
2406                              "wrong exception raised in context manager")
2407        self.assertTrue(tar.closed, "context manager failed")
2408
2409    def test_no_eof(self):
2410        # __exit__() must not write end-of-archive blocks if an
2411        # exception was raised.
2412        try:
2413            with tarfile.open(tmpname, "w") as tar:
2414                raise Exception
2415        except:
2416            pass
2417        self.assertEqual(os.path.getsize(tmpname), 0,
2418                "context manager wrote an end-of-archive block")
2419        self.assertTrue(tar.closed, "context manager failed")
2420
2421    def test_eof(self):
2422        # __exit__() must write end-of-archive blocks, i.e. call
2423        # TarFile.close() if there was no error.
2424        with tarfile.open(tmpname, "w"):
2425            pass
2426        self.assertNotEqual(os.path.getsize(tmpname), 0,
2427                "context manager wrote no end-of-archive block")
2428
2429    def test_fileobj(self):
2430        # Test that __exit__() did not close the external file
2431        # object.
2432        with open(tmpname, "wb") as fobj:
2433            try:
2434                with tarfile.open(fileobj=fobj, mode="w") as tar:
2435                    raise Exception
2436            except:
2437                pass
2438            self.assertFalse(fobj.closed, "external file object was closed")
2439            self.assertTrue(tar.closed, "context manager failed")
2440
2441
2442@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2443class LinkEmulationTest(ReadTest, unittest.TestCase):
2444
2445    # Test for issue #8741 regression. On platforms that do not support
2446    # symbolic or hard links tarfile tries to extract these types of members
2447    # as the regular files they point to.
2448    def _test_link_extraction(self, name):
2449        self.tar.extract(name, TEMPDIR)
2450        with open(os.path.join(TEMPDIR, name), "rb") as f:
2451            data = f.read()
2452        self.assertEqual(md5sum(data), md5_regtype)
2453
2454    # See issues #1578269, #8879, and #17689 for some history on these skips
2455    @unittest.skipIf(hasattr(os.path, "islink"),
2456                     "Skip emulation - has os.path.islink but not os.link")
2457    def test_hardlink_extraction1(self):
2458        self._test_link_extraction("ustar/lnktype")
2459
2460    @unittest.skipIf(hasattr(os.path, "islink"),
2461                     "Skip emulation - has os.path.islink but not os.link")
2462    def test_hardlink_extraction2(self):
2463        self._test_link_extraction("./ustar/linktest2/lnktype")
2464
2465    @unittest.skipIf(hasattr(os, "symlink"),
2466                     "Skip emulation if symlink exists")
2467    def test_symlink_extraction1(self):
2468        self._test_link_extraction("ustar/symtype")
2469
2470    @unittest.skipIf(hasattr(os, "symlink"),
2471                     "Skip emulation if symlink exists")
2472    def test_symlink_extraction2(self):
2473        self._test_link_extraction("./ustar/linktest2/symtype")
2474
2475
2476class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2477    # Issue5068: The _BZ2Proxy.read() method loops forever
2478    # on an empty or partial bzipped file.
2479
2480    def _test_partial_input(self, mode):
2481        class MyBytesIO(io.BytesIO):
2482            hit_eof = False
2483            def read(self, n):
2484                if self.hit_eof:
2485                    raise AssertionError("infinite loop detected in "
2486                                         "tarfile.open()")
2487                self.hit_eof = self.tell() == len(self.getvalue())
2488                return super(MyBytesIO, self).read(n)
2489            def seek(self, *args):
2490                self.hit_eof = False
2491                return super(MyBytesIO, self).seek(*args)
2492
2493        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2494        for x in range(len(data) + 1):
2495            try:
2496                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2497            except tarfile.ReadError:
2498                pass # we have no interest in ReadErrors
2499
2500    def test_partial_input(self):
2501        self._test_partial_input("r")
2502
2503    def test_partial_input_bz2(self):
2504        self._test_partial_input("r:bz2")
2505
2506
2507def root_is_uid_gid_0():
2508    try:
2509        import pwd, grp
2510    except ImportError:
2511        return False
2512    if pwd.getpwuid(0)[0] != 'root':
2513        return False
2514    if grp.getgrgid(0)[0] != 'root':
2515        return False
2516    return True
2517
2518
2519@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2520@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2521class NumericOwnerTest(unittest.TestCase):
2522    # mock the following:
2523    #  os.chown: so we can test what's being called
2524    #  os.chmod: so the modes are not actually changed. if they are, we can't
2525    #             delete the files/directories
2526    #  os.geteuid: so we can lie and say we're root (uid = 0)
2527
2528    @staticmethod
2529    def _make_test_archive(filename_1, dirname_1, filename_2):
2530        # the file contents to write
2531        fobj = io.BytesIO(b"content")
2532
2533        # create a tar file with a file, a directory, and a file within that
2534        #  directory. Assign various .uid/.gid values to them
2535        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2536                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2537                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2538                 ]
2539        with tarfile.open(tmpname, 'w') as tarfl:
2540            for name, uid, gid, typ, contents in items:
2541                t = tarfile.TarInfo(name)
2542                t.uid = uid
2543                t.gid = gid
2544                t.uname = 'root'
2545                t.gname = 'root'
2546                t.type = typ
2547                tarfl.addfile(t, contents)
2548
2549        # return the full pathname to the tar file
2550        return tmpname
2551
2552    @staticmethod
2553    @contextmanager
2554    def _setup_test(mock_geteuid):
2555        mock_geteuid.return_value = 0  # lie and say we're root
2556        fname = 'numeric-owner-testfile'
2557        dirname = 'dir'
2558
2559        # the names we want stored in the tarfile
2560        filename_1 = fname
2561        dirname_1 = dirname
2562        filename_2 = os.path.join(dirname, fname)
2563
2564        # create the tarfile with the contents we're after
2565        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2566                                                           dirname_1,
2567                                                           filename_2)
2568
2569        # open the tarfile for reading. yield it and the names of the items
2570        #  we stored into the file
2571        with tarfile.open(tar_filename) as tarfl:
2572            yield tarfl, filename_1, dirname_1, filename_2
2573
2574    @unittest.mock.patch('os.chown')
2575    @unittest.mock.patch('os.chmod')
2576    @unittest.mock.patch('os.geteuid')
2577    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2578                                        mock_chown):
2579        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2580                                                filename_2):
2581            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True)
2582            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True)
2583
2584        # convert to filesystem paths
2585        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2586        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2587
2588        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2589                                     unittest.mock.call(f_filename_2, 88, 87),
2590                                     ],
2591                                    any_order=True)
2592
2593    @unittest.mock.patch('os.chown')
2594    @unittest.mock.patch('os.chmod')
2595    @unittest.mock.patch('os.geteuid')
2596    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2597                                           mock_chown):
2598        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2599                                                filename_2):
2600            tarfl.extractall(TEMPDIR, numeric_owner=True)
2601
2602        # convert to filesystem paths
2603        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2604        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2605        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2606
2607        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2608                                     unittest.mock.call(f_dirname_1, 77, 76),
2609                                     unittest.mock.call(f_filename_2, 88, 87),
2610                                     ],
2611                                    any_order=True)
2612
2613    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2614    #  because the uname and gname in the test file are 'root', and extract()
2615    #  will look them up using pwd and grp to find their uid and gid, which we
2616    #  test here to be 0.
2617    @unittest.skipUnless(root_is_uid_gid_0(),
2618                         'uid=0,gid=0 must be named "root"')
2619    @unittest.mock.patch('os.chown')
2620    @unittest.mock.patch('os.chmod')
2621    @unittest.mock.patch('os.geteuid')
2622    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2623                                           mock_chown):
2624        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2625            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False)
2626
2627        # convert to filesystem paths
2628        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2629
2630        mock_chown.assert_called_with(f_filename_1, 0, 0)
2631
2632    @unittest.mock.patch('os.geteuid')
2633    def test_keyword_only(self, mock_geteuid):
2634        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2635            self.assertRaises(TypeError,
2636                              tarfl.extract, filename_1, TEMPDIR, False, True)
2637
2638
2639def setUpModule():
2640    support.unlink(TEMPDIR)
2641    os.makedirs(TEMPDIR)
2642
2643    global testtarnames
2644    testtarnames = [tarname]
2645    with open(tarname, "rb") as fobj:
2646        data = fobj.read()
2647
2648    # Create compressed tarfiles.
2649    for c in GzipTest, Bz2Test, LzmaTest:
2650        if c.open:
2651            support.unlink(c.tarname)
2652            testtarnames.append(c.tarname)
2653            with c.open(c.tarname, "wb") as tar:
2654                tar.write(data)
2655
2656def tearDownModule():
2657    if os.path.exists(TEMPDIR):
2658        support.rmtree(TEMPDIR)
2659
2660if __name__ == "__main__":
2661    unittest.main()
2662