1"""Test script for the dumbdbm module 2 Original by Roger E. Masse 3""" 4 5import io 6import operator 7import os 8import stat 9import unittest 10import warnings 11import dbm.dumb as dumbdbm 12from test import support 13from functools import partial 14 15_fname = support.TESTFN 16 17def _delete_files(): 18 for ext in [".dir", ".dat", ".bak"]: 19 try: 20 os.unlink(_fname + ext) 21 except OSError: 22 pass 23 24class DumbDBMTestCase(unittest.TestCase): 25 _dict = {b'0': b'', 26 b'a': b'Python:', 27 b'b': b'Programming', 28 b'c': b'the', 29 b'd': b'way', 30 b'f': b'Guido', 31 b'g': b'intended', 32 '\u00fc'.encode('utf-8') : b'!', 33 } 34 35 def test_dumbdbm_creation(self): 36 f = dumbdbm.open(_fname, 'c') 37 self.assertEqual(list(f.keys()), []) 38 for key in self._dict: 39 f[key] = self._dict[key] 40 self.read_helper(f) 41 f.close() 42 43 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 44 @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()') 45 def test_dumbdbm_creation_mode(self): 46 try: 47 old_umask = os.umask(0o002) 48 f = dumbdbm.open(_fname, 'c', 0o637) 49 f.close() 50 finally: 51 os.umask(old_umask) 52 53 expected_mode = 0o635 54 if os.name != 'posix': 55 # Windows only supports setting the read-only attribute. 56 # This shouldn't fail, but doesn't work like Unix either. 57 expected_mode = 0o666 58 59 import stat 60 st = os.stat(_fname + '.dat') 61 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 62 st = os.stat(_fname + '.dir') 63 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 64 65 def test_close_twice(self): 66 f = dumbdbm.open(_fname) 67 f[b'a'] = b'b' 68 self.assertEqual(f[b'a'], b'b') 69 f.close() 70 f.close() 71 72 def test_dumbdbm_modification(self): 73 self.init_db() 74 f = dumbdbm.open(_fname, 'w') 75 self._dict[b'g'] = f[b'g'] = b"indented" 76 self.read_helper(f) 77 f.close() 78 79 def test_dumbdbm_read(self): 80 self.init_db() 81 f = dumbdbm.open(_fname, 'r') 82 self.read_helper(f) 83 with self.assertWarnsRegex(DeprecationWarning, 84 'The database is opened for reading only'): 85 f[b'g'] = b'x' 86 with self.assertWarnsRegex(DeprecationWarning, 87 'The database is opened for reading only'): 88 del f[b'a'] 89 f.close() 90 91 def test_dumbdbm_keys(self): 92 self.init_db() 93 f = dumbdbm.open(_fname) 94 keys = self.keys_helper(f) 95 f.close() 96 97 def test_write_contains(self): 98 f = dumbdbm.open(_fname) 99 f[b'1'] = b'hello' 100 self.assertIn(b'1', f) 101 f.close() 102 103 def test_write_write_read(self): 104 # test for bug #482460 105 f = dumbdbm.open(_fname) 106 f[b'1'] = b'hello' 107 f[b'1'] = b'hello2' 108 f.close() 109 f = dumbdbm.open(_fname) 110 self.assertEqual(f[b'1'], b'hello2') 111 f.close() 112 113 def test_str_read(self): 114 self.init_db() 115 f = dumbdbm.open(_fname, 'r') 116 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) 117 118 def test_str_write_contains(self): 119 self.init_db() 120 f = dumbdbm.open(_fname) 121 f['\u00fc'] = b'!' 122 f['1'] = 'a' 123 f.close() 124 f = dumbdbm.open(_fname, 'r') 125 self.assertIn('\u00fc', f) 126 self.assertEqual(f['\u00fc'.encode('utf-8')], 127 self._dict['\u00fc'.encode('utf-8')]) 128 self.assertEqual(f[b'1'], b'a') 129 130 def test_line_endings(self): 131 # test for bug #1172763: dumbdbm would die if the line endings 132 # weren't what was expected. 133 f = dumbdbm.open(_fname) 134 f[b'1'] = b'hello' 135 f[b'2'] = b'hello2' 136 f.close() 137 138 # Mangle the file by changing the line separator to Windows or Unix 139 with io.open(_fname + '.dir', 'rb') as file: 140 data = file.read() 141 if os.linesep == '\n': 142 data = data.replace(b'\n', b'\r\n') 143 else: 144 data = data.replace(b'\r\n', b'\n') 145 with io.open(_fname + '.dir', 'wb') as file: 146 file.write(data) 147 148 f = dumbdbm.open(_fname) 149 self.assertEqual(f[b'1'], b'hello') 150 self.assertEqual(f[b'2'], b'hello2') 151 152 153 def read_helper(self, f): 154 keys = self.keys_helper(f) 155 for key in self._dict: 156 self.assertEqual(self._dict[key], f[key]) 157 158 def init_db(self): 159 f = dumbdbm.open(_fname, 'n') 160 for k in self._dict: 161 f[k] = self._dict[k] 162 f.close() 163 164 def keys_helper(self, f): 165 keys = sorted(f.keys()) 166 dkeys = sorted(self._dict.keys()) 167 self.assertEqual(keys, dkeys) 168 return keys 169 170 # Perform randomized operations. This doesn't make assumptions about 171 # what *might* fail. 172 def test_random(self): 173 import random 174 d = {} # mirror the database 175 for dummy in range(5): 176 f = dumbdbm.open(_fname) 177 for dummy in range(100): 178 k = random.choice('abcdefghijklm') 179 if random.random() < 0.2: 180 if k in d: 181 del d[k] 182 del f[k] 183 else: 184 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) 185 d[k] = v 186 f[k] = v 187 self.assertEqual(f[k], v) 188 f.close() 189 190 f = dumbdbm.open(_fname) 191 expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) 192 got = sorted(f.items()) 193 self.assertEqual(expected, got) 194 f.close() 195 196 def test_context_manager(self): 197 with dumbdbm.open(_fname, 'c') as db: 198 db["dumbdbm context manager"] = "context manager" 199 200 with dumbdbm.open(_fname, 'r') as db: 201 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) 202 203 with self.assertRaises(dumbdbm.error): 204 db.keys() 205 206 def test_check_closed(self): 207 f = dumbdbm.open(_fname, 'c') 208 f.close() 209 210 for meth in (partial(operator.delitem, f), 211 partial(operator.setitem, f, 'b'), 212 partial(operator.getitem, f), 213 partial(operator.contains, f)): 214 with self.assertRaises(dumbdbm.error) as cm: 215 meth('test') 216 self.assertEqual(str(cm.exception), 217 "DBM object has already been closed") 218 219 for meth in (operator.methodcaller('keys'), 220 operator.methodcaller('iterkeys'), 221 operator.methodcaller('items'), 222 len): 223 with self.assertRaises(dumbdbm.error) as cm: 224 meth(f) 225 self.assertEqual(str(cm.exception), 226 "DBM object has already been closed") 227 228 def test_create_new(self): 229 with dumbdbm.open(_fname, 'n') as f: 230 for k in self._dict: 231 f[k] = self._dict[k] 232 233 with dumbdbm.open(_fname, 'n') as f: 234 self.assertEqual(f.keys(), []) 235 236 def test_eval(self): 237 with open(_fname + '.dir', 'w') as stream: 238 stream.write("str(print('Hacked!')), 0\n") 239 with support.captured_stdout() as stdout: 240 with self.assertRaises(ValueError): 241 with dumbdbm.open(_fname) as f: 242 pass 243 self.assertEqual(stdout.getvalue(), '') 244 245 def test_warn_on_ignored_flags(self): 246 for value in ('r', 'w'): 247 _delete_files() 248 with self.assertWarnsRegex(DeprecationWarning, 249 "The database file is missing, the " 250 "semantics of the 'c' flag will " 251 "be used."): 252 f = dumbdbm.open(_fname, value) 253 f.close() 254 255 def test_invalid_flag(self): 256 for flag in ('x', 'rf', None): 257 with self.assertWarnsRegex(DeprecationWarning, 258 "Flag must be one of " 259 "'r', 'w', 'c', or 'n'"): 260 f = dumbdbm.open(_fname, flag) 261 f.close() 262 263 @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()') 264 def test_readonly_files(self): 265 with support.temp_dir() as dir: 266 fname = os.path.join(dir, 'db') 267 with dumbdbm.open(fname, 'n') as f: 268 self.assertEqual(list(f.keys()), []) 269 for key in self._dict: 270 f[key] = self._dict[key] 271 os.chmod(fname + ".dir", stat.S_IRUSR) 272 os.chmod(fname + ".dat", stat.S_IRUSR) 273 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) 274 with dumbdbm.open(fname, 'r') as f: 275 self.assertEqual(sorted(f.keys()), sorted(self._dict)) 276 f.close() # don't write 277 278 def tearDown(self): 279 _delete_files() 280 281 def setUp(self): 282 _delete_files() 283 284 285if __name__ == "__main__": 286 unittest.main() 287