1"""Test script for the dumbdbm module 2 Original by Roger E. Masse 3""" 4 5import contextlib 6import io 7import operator 8import os 9import stat 10import unittest 11import dbm.dumb as dumbdbm 12from test import support 13from test.support import os_helper 14from functools import partial 15 16_fname = os_helper.TESTFN 17 18 19def _delete_files(): 20 for ext in [".dir", ".dat", ".bak"]: 21 try: 22 os.unlink(_fname + ext) 23 except OSError: 24 pass 25 26class DumbDBMTestCase(unittest.TestCase): 27 _dict = {b'0': b'', 28 b'a': b'Python:', 29 b'b': b'Programming', 30 b'c': b'the', 31 b'd': b'way', 32 b'f': b'Guido', 33 b'g': b'intended', 34 '\u00fc'.encode('utf-8') : b'!', 35 } 36 37 def test_dumbdbm_creation(self): 38 with contextlib.closing(dumbdbm.open(_fname, 'c')) as f: 39 self.assertEqual(list(f.keys()), []) 40 for key in self._dict: 41 f[key] = self._dict[key] 42 self.read_helper(f) 43 44 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 45 @os_helper.skip_unless_working_chmod 46 def test_dumbdbm_creation_mode(self): 47 try: 48 old_umask = os.umask(0o002) 49 f = dumbdbm.open(_fname, 'c', 0o637) 50 f.close() 51 finally: 52 os.umask(old_umask) 53 54 expected_mode = 0o635 55 if os.name != 'posix': 56 # Windows only supports setting the read-only attribute. 57 # This shouldn't fail, but doesn't work like Unix either. 58 expected_mode = 0o666 59 60 import stat 61 st = os.stat(_fname + '.dat') 62 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 63 st = os.stat(_fname + '.dir') 64 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 65 66 def test_close_twice(self): 67 f = dumbdbm.open(_fname) 68 f[b'a'] = b'b' 69 self.assertEqual(f[b'a'], b'b') 70 f.close() 71 f.close() 72 73 def test_dumbdbm_modification(self): 74 self.init_db() 75 with contextlib.closing(dumbdbm.open(_fname, 'w')) as f: 76 self._dict[b'g'] = f[b'g'] = b"indented" 77 self.read_helper(f) 78 # setdefault() works as in the dict interface 79 self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') 80 self.assertEqual(f[b'xxx'], b'foo') 81 82 def test_dumbdbm_read(self): 83 self.init_db() 84 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 85 self.read_helper(f) 86 with self.assertRaisesRegex(dumbdbm.error, 87 'The database is opened for reading only'): 88 f[b'g'] = b'x' 89 with self.assertRaisesRegex(dumbdbm.error, 90 'The database is opened for reading only'): 91 del f[b'a'] 92 # get() works as in the dict interface 93 self.assertEqual(f.get(b'a'), self._dict[b'a']) 94 self.assertEqual(f.get(b'xxx', b'foo'), b'foo') 95 self.assertIsNone(f.get(b'xxx')) 96 with self.assertRaises(KeyError): 97 f[b'xxx'] 98 99 def test_dumbdbm_keys(self): 100 self.init_db() 101 with contextlib.closing(dumbdbm.open(_fname)) as f: 102 keys = self.keys_helper(f) 103 104 def test_write_contains(self): 105 with contextlib.closing(dumbdbm.open(_fname)) as f: 106 f[b'1'] = b'hello' 107 self.assertIn(b'1', f) 108 109 def test_write_write_read(self): 110 # test for bug #482460 111 with contextlib.closing(dumbdbm.open(_fname)) as f: 112 f[b'1'] = b'hello' 113 f[b'1'] = b'hello2' 114 with contextlib.closing(dumbdbm.open(_fname)) as f: 115 self.assertEqual(f[b'1'], b'hello2') 116 117 def test_str_read(self): 118 self.init_db() 119 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 120 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) 121 122 def test_str_write_contains(self): 123 self.init_db() 124 with contextlib.closing(dumbdbm.open(_fname)) as f: 125 f['\u00fc'] = b'!' 126 f['1'] = 'a' 127 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 128 self.assertIn('\u00fc', f) 129 self.assertEqual(f['\u00fc'.encode('utf-8')], 130 self._dict['\u00fc'.encode('utf-8')]) 131 self.assertEqual(f[b'1'], b'a') 132 133 def test_line_endings(self): 134 # test for bug #1172763: dumbdbm would die if the line endings 135 # weren't what was expected. 136 with contextlib.closing(dumbdbm.open(_fname)) as f: 137 f[b'1'] = b'hello' 138 f[b'2'] = b'hello2' 139 140 # Mangle the file by changing the line separator to Windows or Unix 141 with io.open(_fname + '.dir', 'rb') as file: 142 data = file.read() 143 if os.linesep == '\n': 144 data = data.replace(b'\n', b'\r\n') 145 else: 146 data = data.replace(b'\r\n', b'\n') 147 with io.open(_fname + '.dir', 'wb') as file: 148 file.write(data) 149 150 f = dumbdbm.open(_fname) 151 self.assertEqual(f[b'1'], b'hello') 152 self.assertEqual(f[b'2'], b'hello2') 153 154 155 def read_helper(self, f): 156 keys = self.keys_helper(f) 157 for key in self._dict: 158 self.assertEqual(self._dict[key], f[key]) 159 160 def init_db(self): 161 with contextlib.closing(dumbdbm.open(_fname, 'n')) as f: 162 for k in self._dict: 163 f[k] = self._dict[k] 164 165 def keys_helper(self, f): 166 keys = sorted(f.keys()) 167 dkeys = sorted(self._dict.keys()) 168 self.assertEqual(keys, dkeys) 169 return keys 170 171 # Perform randomized operations. This doesn't make assumptions about 172 # what *might* fail. 173 def test_random(self): 174 import random 175 d = {} # mirror the database 176 for dummy in range(5): 177 with contextlib.closing(dumbdbm.open(_fname)) as f: 178 for dummy in range(100): 179 k = random.choice('abcdefghijklm') 180 if random.random() < 0.2: 181 if k in d: 182 del d[k] 183 del f[k] 184 else: 185 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) 186 d[k] = v 187 f[k] = v 188 self.assertEqual(f[k], v) 189 190 with contextlib.closing(dumbdbm.open(_fname)) as f: 191 expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) 192 got = sorted(f.items()) 193 self.assertEqual(expected, got) 194 195 def test_context_manager(self): 196 with dumbdbm.open(_fname, 'c') as db: 197 db["dumbdbm context manager"] = "context manager" 198 199 with dumbdbm.open(_fname, 'r') as db: 200 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) 201 202 with self.assertRaises(dumbdbm.error): 203 db.keys() 204 205 def test_check_closed(self): 206 f = dumbdbm.open(_fname, 'c') 207 f.close() 208 209 for meth in (partial(operator.delitem, f), 210 partial(operator.setitem, f, 'b'), 211 partial(operator.getitem, f), 212 partial(operator.contains, f)): 213 with self.assertRaises(dumbdbm.error) as cm: 214 meth('test') 215 self.assertEqual(str(cm.exception), 216 "DBM object has already been closed") 217 218 for meth in (operator.methodcaller('keys'), 219 operator.methodcaller('iterkeys'), 220 operator.methodcaller('items'), 221 len): 222 with self.assertRaises(dumbdbm.error) as cm: 223 meth(f) 224 self.assertEqual(str(cm.exception), 225 "DBM object has already been closed") 226 227 def test_create_new(self): 228 with dumbdbm.open(_fname, 'n') as f: 229 for k in self._dict: 230 f[k] = self._dict[k] 231 232 with dumbdbm.open(_fname, 'n') as f: 233 self.assertEqual(f.keys(), []) 234 235 def test_eval(self): 236 with open(_fname + '.dir', 'w', encoding="utf-8") as stream: 237 stream.write("str(print('Hacked!')), 0\n") 238 with support.captured_stdout() as stdout: 239 with self.assertRaises(ValueError): 240 with dumbdbm.open(_fname) as f: 241 pass 242 self.assertEqual(stdout.getvalue(), '') 243 244 def test_missing_data(self): 245 for value in ('r', 'w'): 246 _delete_files() 247 with self.assertRaises(FileNotFoundError): 248 dumbdbm.open(_fname, value) 249 self.assertFalse(os.path.exists(_fname + '.dat')) 250 self.assertFalse(os.path.exists(_fname + '.dir')) 251 self.assertFalse(os.path.exists(_fname + '.bak')) 252 253 for value in ('c', 'n'): 254 _delete_files() 255 with dumbdbm.open(_fname, value) as f: 256 self.assertTrue(os.path.exists(_fname + '.dat')) 257 self.assertTrue(os.path.exists(_fname + '.dir')) 258 self.assertFalse(os.path.exists(_fname + '.bak')) 259 self.assertFalse(os.path.exists(_fname + '.bak')) 260 261 for value in ('c', 'n'): 262 _delete_files() 263 with dumbdbm.open(_fname, value) as f: 264 f['key'] = 'value' 265 self.assertTrue(os.path.exists(_fname + '.dat')) 266 self.assertTrue(os.path.exists(_fname + '.dir')) 267 self.assertFalse(os.path.exists(_fname + '.bak')) 268 self.assertTrue(os.path.exists(_fname + '.bak')) 269 270 def test_missing_index(self): 271 with dumbdbm.open(_fname, 'n') as f: 272 pass 273 os.unlink(_fname + '.dir') 274 for value in ('r', 'w'): 275 with self.assertRaises(FileNotFoundError): 276 dumbdbm.open(_fname, value) 277 self.assertFalse(os.path.exists(_fname + '.dir')) 278 self.assertFalse(os.path.exists(_fname + '.bak')) 279 280 for value in ('c', 'n'): 281 with dumbdbm.open(_fname, value) as f: 282 self.assertTrue(os.path.exists(_fname + '.dir')) 283 self.assertFalse(os.path.exists(_fname + '.bak')) 284 self.assertFalse(os.path.exists(_fname + '.bak')) 285 os.unlink(_fname + '.dir') 286 287 for value in ('c', 'n'): 288 with dumbdbm.open(_fname, value) as f: 289 f['key'] = 'value' 290 self.assertTrue(os.path.exists(_fname + '.dir')) 291 self.assertFalse(os.path.exists(_fname + '.bak')) 292 self.assertTrue(os.path.exists(_fname + '.bak')) 293 os.unlink(_fname + '.dir') 294 os.unlink(_fname + '.bak') 295 296 def test_sync_empty_unmodified(self): 297 with dumbdbm.open(_fname, 'n') as f: 298 pass 299 os.unlink(_fname + '.dir') 300 for value in ('c', 'n'): 301 with dumbdbm.open(_fname, value) as f: 302 self.assertTrue(os.path.exists(_fname + '.dir')) 303 self.assertFalse(os.path.exists(_fname + '.bak')) 304 f.sync() 305 self.assertTrue(os.path.exists(_fname + '.dir')) 306 self.assertFalse(os.path.exists(_fname + '.bak')) 307 os.unlink(_fname + '.dir') 308 f.sync() 309 self.assertFalse(os.path.exists(_fname + '.dir')) 310 self.assertFalse(os.path.exists(_fname + '.bak')) 311 self.assertFalse(os.path.exists(_fname + '.dir')) 312 self.assertFalse(os.path.exists(_fname + '.bak')) 313 314 def test_sync_nonempty_unmodified(self): 315 with dumbdbm.open(_fname, 'n') as f: 316 pass 317 os.unlink(_fname + '.dir') 318 for value in ('c', 'n'): 319 with dumbdbm.open(_fname, value) as f: 320 f['key'] = 'value' 321 self.assertTrue(os.path.exists(_fname + '.dir')) 322 self.assertFalse(os.path.exists(_fname + '.bak')) 323 f.sync() 324 self.assertTrue(os.path.exists(_fname + '.dir')) 325 self.assertTrue(os.path.exists(_fname + '.bak')) 326 os.unlink(_fname + '.dir') 327 os.unlink(_fname + '.bak') 328 f.sync() 329 self.assertFalse(os.path.exists(_fname + '.dir')) 330 self.assertFalse(os.path.exists(_fname + '.bak')) 331 self.assertFalse(os.path.exists(_fname + '.dir')) 332 self.assertFalse(os.path.exists(_fname + '.bak')) 333 334 def test_invalid_flag(self): 335 for flag in ('x', 'rf', None): 336 with self.assertRaisesRegex(ValueError, 337 "Flag must be one of " 338 "'r', 'w', 'c', or 'n'"): 339 dumbdbm.open(_fname, flag) 340 341 @os_helper.skip_unless_working_chmod 342 def test_readonly_files(self): 343 with os_helper.temp_dir() as dir: 344 fname = os.path.join(dir, 'db') 345 with dumbdbm.open(fname, 'n') as f: 346 self.assertEqual(list(f.keys()), []) 347 for key in self._dict: 348 f[key] = self._dict[key] 349 os.chmod(fname + ".dir", stat.S_IRUSR) 350 os.chmod(fname + ".dat", stat.S_IRUSR) 351 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) 352 with dumbdbm.open(fname, 'r') as f: 353 self.assertEqual(sorted(f.keys()), sorted(self._dict)) 354 f.close() # don't write 355 356 @unittest.skipUnless(os_helper.TESTFN_NONASCII, 357 'requires OS support of non-ASCII encodings') 358 def test_nonascii_filename(self): 359 filename = os_helper.TESTFN_NONASCII 360 for suffix in ['.dir', '.dat', '.bak']: 361 self.addCleanup(os_helper.unlink, filename + suffix) 362 with dumbdbm.open(filename, 'c') as db: 363 db[b'key'] = b'value' 364 self.assertTrue(os.path.exists(filename + '.dat')) 365 self.assertTrue(os.path.exists(filename + '.dir')) 366 with dumbdbm.open(filename, 'r') as db: 367 self.assertEqual(list(db.keys()), [b'key']) 368 self.assertTrue(b'key' in db) 369 self.assertEqual(db[b'key'], b'value') 370 371 def test_open_with_pathlib_path(self): 372 dumbdbm.open(os_helper.FakePath(_fname), "c").close() 373 374 def test_open_with_bytes_path(self): 375 dumbdbm.open(os.fsencode(_fname), "c").close() 376 377 def test_open_with_pathlib_bytes_path(self): 378 dumbdbm.open(os_helper.FakePath(os.fsencode(_fname)), "c").close() 379 380 def tearDown(self): 381 _delete_files() 382 383 def setUp(self): 384 _delete_files() 385 386 387if __name__ == "__main__": 388 unittest.main() 389