• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Tests common to tarfile and zipfile."""
2
3import os
4import sys
5
6from test.support import swap_attr
7from test.support import os_helper
8
9class OverwriteTests:
10
11    def setUp(self):
12        os.makedirs(self.testdir)
13        self.addCleanup(os_helper.rmtree, self.testdir)
14
15    def create_file(self, path, content=b''):
16        with open(path, 'wb') as f:
17            f.write(content)
18
19    def open(self, path):
20        raise NotImplementedError
21
22    def extractall(self, ar):
23        raise NotImplementedError
24
25
26    def test_overwrite_file_as_file(self):
27        target = os.path.join(self.testdir, 'test')
28        self.create_file(target, b'content')
29        with self.open(self.ar_with_file) as ar:
30            self.extractall(ar)
31        self.assertTrue(os.path.isfile(target))
32        with open(target, 'rb') as f:
33            self.assertEqual(f.read(), b'newcontent')
34
35    def test_overwrite_dir_as_dir(self):
36        target = os.path.join(self.testdir, 'test')
37        os.mkdir(target)
38        with self.open(self.ar_with_dir) as ar:
39            self.extractall(ar)
40        self.assertTrue(os.path.isdir(target))
41
42    def test_overwrite_dir_as_implicit_dir(self):
43        target = os.path.join(self.testdir, 'test')
44        os.mkdir(target)
45        with self.open(self.ar_with_implicit_dir) as ar:
46            self.extractall(ar)
47        self.assertTrue(os.path.isdir(target))
48        self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
49        with open(os.path.join(target, 'file'), 'rb') as f:
50            self.assertEqual(f.read(), b'newcontent')
51
52    def test_overwrite_dir_as_file(self):
53        target = os.path.join(self.testdir, 'test')
54        os.mkdir(target)
55        with self.open(self.ar_with_file) as ar:
56            with self.assertRaises(PermissionError if sys.platform == 'win32'
57                                   else IsADirectoryError):
58                self.extractall(ar)
59        self.assertTrue(os.path.isdir(target))
60
61    def test_overwrite_file_as_dir(self):
62        target = os.path.join(self.testdir, 'test')
63        self.create_file(target, b'content')
64        with self.open(self.ar_with_dir) as ar:
65            with self.assertRaises(FileExistsError):
66                self.extractall(ar)
67        self.assertTrue(os.path.isfile(target))
68        with open(target, 'rb') as f:
69            self.assertEqual(f.read(), b'content')
70
71    def test_overwrite_file_as_implicit_dir(self):
72        target = os.path.join(self.testdir, 'test')
73        self.create_file(target, b'content')
74        with self.open(self.ar_with_implicit_dir) as ar:
75            with self.assertRaises(FileNotFoundError if sys.platform == 'win32'
76                                   else NotADirectoryError):
77                self.extractall(ar)
78        self.assertTrue(os.path.isfile(target))
79        with open(target, 'rb') as f:
80            self.assertEqual(f.read(), b'content')
81
82    @os_helper.skip_unless_symlink
83    def test_overwrite_file_symlink_as_file(self):
84        # XXX: It is potential security vulnerability.
85        target = os.path.join(self.testdir, 'test')
86        target2 = os.path.join(self.testdir, 'test2')
87        self.create_file(target2, b'content')
88        os.symlink('test2', target)
89        with self.open(self.ar_with_file) as ar:
90            self.extractall(ar)
91        self.assertTrue(os.path.islink(target))
92        self.assertTrue(os.path.isfile(target2))
93        with open(target2, 'rb') as f:
94            self.assertEqual(f.read(), b'newcontent')
95
96    @os_helper.skip_unless_symlink
97    def test_overwrite_broken_file_symlink_as_file(self):
98        # XXX: It is potential security vulnerability.
99        target = os.path.join(self.testdir, 'test')
100        target2 = os.path.join(self.testdir, 'test2')
101        os.symlink('test2', target)
102        with self.open(self.ar_with_file) as ar:
103            self.extractall(ar)
104        self.assertTrue(os.path.islink(target))
105        self.assertTrue(os.path.isfile(target2))
106        with open(target2, 'rb') as f:
107            self.assertEqual(f.read(), b'newcontent')
108
109    @os_helper.skip_unless_symlink
110    def test_overwrite_dir_symlink_as_dir(self):
111        # XXX: It is potential security vulnerability.
112        target = os.path.join(self.testdir, 'test')
113        target2 = os.path.join(self.testdir, 'test2')
114        os.mkdir(target2)
115        os.symlink('test2', target, target_is_directory=True)
116        with self.open(self.ar_with_dir) as ar:
117            self.extractall(ar)
118        self.assertTrue(os.path.islink(target))
119        self.assertTrue(os.path.isdir(target2))
120
121    @os_helper.skip_unless_symlink
122    def test_overwrite_dir_symlink_as_implicit_dir(self):
123        # XXX: It is potential security vulnerability.
124        target = os.path.join(self.testdir, 'test')
125        target2 = os.path.join(self.testdir, 'test2')
126        os.mkdir(target2)
127        os.symlink('test2', target, target_is_directory=True)
128        with self.open(self.ar_with_implicit_dir) as ar:
129            self.extractall(ar)
130        self.assertTrue(os.path.islink(target))
131        self.assertTrue(os.path.isdir(target2))
132        self.assertTrue(os.path.isfile(os.path.join(target2, 'file')))
133        with open(os.path.join(target2, 'file'), 'rb') as f:
134            self.assertEqual(f.read(), b'newcontent')
135
136    @os_helper.skip_unless_symlink
137    def test_overwrite_broken_dir_symlink_as_dir(self):
138        target = os.path.join(self.testdir, 'test')
139        target2 = os.path.join(self.testdir, 'test2')
140        os.symlink('test2', target, target_is_directory=True)
141        with self.open(self.ar_with_dir) as ar:
142            with self.assertRaises(FileExistsError):
143                self.extractall(ar)
144        self.assertTrue(os.path.islink(target))
145        self.assertFalse(os.path.exists(target2))
146
147    @os_helper.skip_unless_symlink
148    def test_overwrite_broken_dir_symlink_as_implicit_dir(self):
149        target = os.path.join(self.testdir, 'test')
150        target2 = os.path.join(self.testdir, 'test2')
151        os.symlink('test2', target, target_is_directory=True)
152        with self.open(self.ar_with_implicit_dir) as ar:
153            with self.assertRaises(FileExistsError):
154                self.extractall(ar)
155        self.assertTrue(os.path.islink(target))
156        self.assertFalse(os.path.exists(target2))
157
158    def test_concurrent_extract_dir(self):
159        target = os.path.join(self.testdir, 'test')
160        def concurrent_mkdir(*args, **kwargs):
161            orig_mkdir(*args, **kwargs)
162            orig_mkdir(*args, **kwargs)
163        with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir:
164            with self.open(self.ar_with_dir) as ar:
165                self.extractall(ar)
166        self.assertTrue(os.path.isdir(target))
167
168    def test_concurrent_extract_implicit_dir(self):
169        target = os.path.join(self.testdir, 'test')
170        def concurrent_mkdir(*args, **kwargs):
171            orig_mkdir(*args, **kwargs)
172            orig_mkdir(*args, **kwargs)
173        with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir:
174            with self.open(self.ar_with_implicit_dir) as ar:
175                self.extractall(ar)
176        self.assertTrue(os.path.isdir(target))
177        self.assertTrue(os.path.isfile(os.path.join(target, 'file')))
178