1"""Tests common to tarfile and zipfile.""" 2 3import os 4import sys 5 6from test.support import swap_attr 7from test.support import os_helper 8 9class OverwriteTests: 10 11 def setUp(self): 12 os.makedirs(self.testdir) 13 self.addCleanup(os_helper.rmtree, self.testdir) 14 15 def create_file(self, path, content=b''): 16 with open(path, 'wb') as f: 17 f.write(content) 18 19 def open(self, path): 20 raise NotImplementedError 21 22 def extractall(self, ar): 23 raise NotImplementedError 24 25 26 def test_overwrite_file_as_file(self): 27 target = os.path.join(self.testdir, 'test') 28 self.create_file(target, b'content') 29 with self.open(self.ar_with_file) as ar: 30 self.extractall(ar) 31 self.assertTrue(os.path.isfile(target)) 32 with open(target, 'rb') as f: 33 self.assertEqual(f.read(), b'newcontent') 34 35 def test_overwrite_dir_as_dir(self): 36 target = os.path.join(self.testdir, 'test') 37 os.mkdir(target) 38 with self.open(self.ar_with_dir) as ar: 39 self.extractall(ar) 40 self.assertTrue(os.path.isdir(target)) 41 42 def test_overwrite_dir_as_implicit_dir(self): 43 target = os.path.join(self.testdir, 'test') 44 os.mkdir(target) 45 with self.open(self.ar_with_implicit_dir) as ar: 46 self.extractall(ar) 47 self.assertTrue(os.path.isdir(target)) 48 self.assertTrue(os.path.isfile(os.path.join(target, 'file'))) 49 with open(os.path.join(target, 'file'), 'rb') as f: 50 self.assertEqual(f.read(), b'newcontent') 51 52 def test_overwrite_dir_as_file(self): 53 target = os.path.join(self.testdir, 'test') 54 os.mkdir(target) 55 with self.open(self.ar_with_file) as ar: 56 with self.assertRaises(PermissionError if sys.platform == 'win32' 57 else IsADirectoryError): 58 self.extractall(ar) 59 self.assertTrue(os.path.isdir(target)) 60 61 def test_overwrite_file_as_dir(self): 62 target = os.path.join(self.testdir, 'test') 63 self.create_file(target, b'content') 64 with self.open(self.ar_with_dir) as ar: 65 with self.assertRaises(FileExistsError): 66 self.extractall(ar) 67 self.assertTrue(os.path.isfile(target)) 68 with open(target, 'rb') as f: 69 self.assertEqual(f.read(), b'content') 70 71 def test_overwrite_file_as_implicit_dir(self): 72 target = os.path.join(self.testdir, 'test') 73 self.create_file(target, b'content') 74 with self.open(self.ar_with_implicit_dir) as ar: 75 with self.assertRaises(FileNotFoundError if sys.platform == 'win32' 76 else NotADirectoryError): 77 self.extractall(ar) 78 self.assertTrue(os.path.isfile(target)) 79 with open(target, 'rb') as f: 80 self.assertEqual(f.read(), b'content') 81 82 @os_helper.skip_unless_symlink 83 def test_overwrite_file_symlink_as_file(self): 84 # XXX: It is potential security vulnerability. 85 target = os.path.join(self.testdir, 'test') 86 target2 = os.path.join(self.testdir, 'test2') 87 self.create_file(target2, b'content') 88 os.symlink('test2', target) 89 with self.open(self.ar_with_file) as ar: 90 self.extractall(ar) 91 self.assertTrue(os.path.islink(target)) 92 self.assertTrue(os.path.isfile(target2)) 93 with open(target2, 'rb') as f: 94 self.assertEqual(f.read(), b'newcontent') 95 96 @os_helper.skip_unless_symlink 97 def test_overwrite_broken_file_symlink_as_file(self): 98 # XXX: It is potential security vulnerability. 99 target = os.path.join(self.testdir, 'test') 100 target2 = os.path.join(self.testdir, 'test2') 101 os.symlink('test2', target) 102 with self.open(self.ar_with_file) as ar: 103 self.extractall(ar) 104 self.assertTrue(os.path.islink(target)) 105 self.assertTrue(os.path.isfile(target2)) 106 with open(target2, 'rb') as f: 107 self.assertEqual(f.read(), b'newcontent') 108 109 @os_helper.skip_unless_symlink 110 def test_overwrite_dir_symlink_as_dir(self): 111 # XXX: It is potential security vulnerability. 112 target = os.path.join(self.testdir, 'test') 113 target2 = os.path.join(self.testdir, 'test2') 114 os.mkdir(target2) 115 os.symlink('test2', target, target_is_directory=True) 116 with self.open(self.ar_with_dir) as ar: 117 self.extractall(ar) 118 self.assertTrue(os.path.islink(target)) 119 self.assertTrue(os.path.isdir(target2)) 120 121 @os_helper.skip_unless_symlink 122 def test_overwrite_dir_symlink_as_implicit_dir(self): 123 # XXX: It is potential security vulnerability. 124 target = os.path.join(self.testdir, 'test') 125 target2 = os.path.join(self.testdir, 'test2') 126 os.mkdir(target2) 127 os.symlink('test2', target, target_is_directory=True) 128 with self.open(self.ar_with_implicit_dir) as ar: 129 self.extractall(ar) 130 self.assertTrue(os.path.islink(target)) 131 self.assertTrue(os.path.isdir(target2)) 132 self.assertTrue(os.path.isfile(os.path.join(target2, 'file'))) 133 with open(os.path.join(target2, 'file'), 'rb') as f: 134 self.assertEqual(f.read(), b'newcontent') 135 136 @os_helper.skip_unless_symlink 137 def test_overwrite_broken_dir_symlink_as_dir(self): 138 target = os.path.join(self.testdir, 'test') 139 target2 = os.path.join(self.testdir, 'test2') 140 os.symlink('test2', target, target_is_directory=True) 141 with self.open(self.ar_with_dir) as ar: 142 with self.assertRaises(FileExistsError): 143 self.extractall(ar) 144 self.assertTrue(os.path.islink(target)) 145 self.assertFalse(os.path.exists(target2)) 146 147 @os_helper.skip_unless_symlink 148 def test_overwrite_broken_dir_symlink_as_implicit_dir(self): 149 target = os.path.join(self.testdir, 'test') 150 target2 = os.path.join(self.testdir, 'test2') 151 os.symlink('test2', target, target_is_directory=True) 152 with self.open(self.ar_with_implicit_dir) as ar: 153 with self.assertRaises(FileExistsError): 154 self.extractall(ar) 155 self.assertTrue(os.path.islink(target)) 156 self.assertFalse(os.path.exists(target2)) 157 158 def test_concurrent_extract_dir(self): 159 target = os.path.join(self.testdir, 'test') 160 def concurrent_mkdir(*args, **kwargs): 161 orig_mkdir(*args, **kwargs) 162 orig_mkdir(*args, **kwargs) 163 with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir: 164 with self.open(self.ar_with_dir) as ar: 165 self.extractall(ar) 166 self.assertTrue(os.path.isdir(target)) 167 168 def test_concurrent_extract_implicit_dir(self): 169 target = os.path.join(self.testdir, 'test') 170 def concurrent_mkdir(*args, **kwargs): 171 orig_mkdir(*args, **kwargs) 172 orig_mkdir(*args, **kwargs) 173 with swap_attr(os, 'mkdir', concurrent_mkdir) as orig_mkdir: 174 with self.open(self.ar_with_implicit_dir) as ar: 175 self.extractall(ar) 176 self.assertTrue(os.path.isdir(target)) 177 self.assertTrue(os.path.isfile(os.path.join(target, 'file'))) 178