1"""Test script for the dumbdbm module 2 Original by Roger E. Masse 3""" 4 5import io 6import operator 7import os 8import stat 9import unittest 10import dbm.dumb as dumbdbm 11from test import support 12from functools import partial 13 14_fname = support.TESTFN 15 16def _delete_files(): 17 for ext in [".dir", ".dat", ".bak"]: 18 try: 19 os.unlink(_fname + ext) 20 except OSError: 21 pass 22 23class DumbDBMTestCase(unittest.TestCase): 24 _dict = {b'0': b'', 25 b'a': b'Python:', 26 b'b': b'Programming', 27 b'c': b'the', 28 b'd': b'way', 29 b'f': b'Guido', 30 b'g': b'intended', 31 '\u00fc'.encode('utf-8') : b'!', 32 } 33 34 def test_dumbdbm_creation(self): 35 f = dumbdbm.open(_fname, 'c') 36 self.assertEqual(list(f.keys()), []) 37 for key in self._dict: 38 f[key] = self._dict[key] 39 self.read_helper(f) 40 f.close() 41 42 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 43 @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()') 44 def test_dumbdbm_creation_mode(self): 45 try: 46 old_umask = os.umask(0o002) 47 f = dumbdbm.open(_fname, 'c', 0o637) 48 f.close() 49 finally: 50 os.umask(old_umask) 51 52 expected_mode = 0o635 53 if os.name != 'posix': 54 # Windows only supports setting the read-only attribute. 55 # This shouldn't fail, but doesn't work like Unix either. 56 expected_mode = 0o666 57 58 import stat 59 st = os.stat(_fname + '.dat') 60 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 61 st = os.stat(_fname + '.dir') 62 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 63 64 def test_close_twice(self): 65 f = dumbdbm.open(_fname) 66 f[b'a'] = b'b' 67 self.assertEqual(f[b'a'], b'b') 68 f.close() 69 f.close() 70 71 def test_dumbdbm_modification(self): 72 self.init_db() 73 f = dumbdbm.open(_fname, 'w') 74 self._dict[b'g'] = f[b'g'] = b"indented" 75 self.read_helper(f) 76 # setdefault() works as in the dict interface 77 self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') 78 self.assertEqual(f[b'xxx'], b'foo') 79 f.close() 80 81 def test_dumbdbm_read(self): 82 self.init_db() 83 f = dumbdbm.open(_fname, 'r') 84 self.read_helper(f) 85 with self.assertWarnsRegex(DeprecationWarning, 86 'The database is opened for reading only'): 87 f[b'g'] = b'x' 88 with self.assertWarnsRegex(DeprecationWarning, 89 'The database is opened for reading only'): 90 del f[b'a'] 91 # get() works as in the dict interface 92 self.assertEqual(f.get(b'b'), self._dict[b'b']) 93 self.assertEqual(f.get(b'xxx', b'foo'), b'foo') 94 self.assertIsNone(f.get(b'xxx')) 95 with self.assertRaises(KeyError): 96 f[b'xxx'] 97 f.close() 98 99 def test_dumbdbm_keys(self): 100 self.init_db() 101 f = dumbdbm.open(_fname) 102 keys = self.keys_helper(f) 103 f.close() 104 105 def test_write_contains(self): 106 f = dumbdbm.open(_fname) 107 f[b'1'] = b'hello' 108 self.assertIn(b'1', f) 109 f.close() 110 111 def test_write_write_read(self): 112 # test for bug #482460 113 f = dumbdbm.open(_fname) 114 f[b'1'] = b'hello' 115 f[b'1'] = b'hello2' 116 f.close() 117 f = dumbdbm.open(_fname) 118 self.assertEqual(f[b'1'], b'hello2') 119 f.close() 120 121 def test_str_read(self): 122 self.init_db() 123 f = dumbdbm.open(_fname, 'r') 124 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) 125 126 def test_str_write_contains(self): 127 self.init_db() 128 f = dumbdbm.open(_fname) 129 f['\u00fc'] = b'!' 130 f['1'] = 'a' 131 f.close() 132 f = dumbdbm.open(_fname, 'r') 133 self.assertIn('\u00fc', f) 134 self.assertEqual(f['\u00fc'.encode('utf-8')], 135 self._dict['\u00fc'.encode('utf-8')]) 136 self.assertEqual(f[b'1'], b'a') 137 138 def test_line_endings(self): 139 # test for bug #1172763: dumbdbm would die if the line endings 140 # weren't what was expected. 141 f = dumbdbm.open(_fname) 142 f[b'1'] = b'hello' 143 f[b'2'] = b'hello2' 144 f.close() 145 146 # Mangle the file by changing the line separator to Windows or Unix 147 with io.open(_fname + '.dir', 'rb') as file: 148 data = file.read() 149 if os.linesep == '\n': 150 data = data.replace(b'\n', b'\r\n') 151 else: 152 data = data.replace(b'\r\n', b'\n') 153 with io.open(_fname + '.dir', 'wb') as file: 154 file.write(data) 155 156 f = dumbdbm.open(_fname) 157 self.assertEqual(f[b'1'], b'hello') 158 self.assertEqual(f[b'2'], b'hello2') 159 160 161 def read_helper(self, f): 162 keys = self.keys_helper(f) 163 for key in self._dict: 164 self.assertEqual(self._dict[key], f[key]) 165 166 def init_db(self): 167 f = dumbdbm.open(_fname, 'n') 168 for k in self._dict: 169 f[k] = self._dict[k] 170 f.close() 171 172 def keys_helper(self, f): 173 keys = sorted(f.keys()) 174 dkeys = sorted(self._dict.keys()) 175 self.assertEqual(keys, dkeys) 176 return keys 177 178 # Perform randomized operations. This doesn't make assumptions about 179 # what *might* fail. 180 def test_random(self): 181 import random 182 d = {} # mirror the database 183 for dummy in range(5): 184 f = dumbdbm.open(_fname) 185 for dummy in range(100): 186 k = random.choice('abcdefghijklm') 187 if random.random() < 0.2: 188 if k in d: 189 del d[k] 190 del f[k] 191 else: 192 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) 193 d[k] = v 194 f[k] = v 195 self.assertEqual(f[k], v) 196 f.close() 197 198 f = dumbdbm.open(_fname) 199 expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) 200 got = sorted(f.items()) 201 self.assertEqual(expected, got) 202 f.close() 203 204 def test_context_manager(self): 205 with dumbdbm.open(_fname, 'c') as db: 206 db["dumbdbm context manager"] = "context manager" 207 208 with dumbdbm.open(_fname, 'r') as db: 209 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) 210 211 with self.assertRaises(dumbdbm.error): 212 db.keys() 213 214 def test_check_closed(self): 215 f = dumbdbm.open(_fname, 'c') 216 f.close() 217 218 for meth in (partial(operator.delitem, f), 219 partial(operator.setitem, f, 'b'), 220 partial(operator.getitem, f), 221 partial(operator.contains, f)): 222 with self.assertRaises(dumbdbm.error) as cm: 223 meth('test') 224 self.assertEqual(str(cm.exception), 225 "DBM object has already been closed") 226 227 for meth in (operator.methodcaller('keys'), 228 operator.methodcaller('iterkeys'), 229 operator.methodcaller('items'), 230 len): 231 with self.assertRaises(dumbdbm.error) as cm: 232 meth(f) 233 self.assertEqual(str(cm.exception), 234 "DBM object has already been closed") 235 236 def test_create_new(self): 237 with dumbdbm.open(_fname, 'n') as f: 238 for k in self._dict: 239 f[k] = self._dict[k] 240 241 with dumbdbm.open(_fname, 'n') as f: 242 self.assertEqual(f.keys(), []) 243 244 def test_eval(self): 245 with open(_fname + '.dir', 'w') as stream: 246 stream.write("str(print('Hacked!')), 0\n") 247 with support.captured_stdout() as stdout: 248 with self.assertRaises(ValueError): 249 with dumbdbm.open(_fname) as f: 250 pass 251 self.assertEqual(stdout.getvalue(), '') 252 253 def test_warn_on_ignored_flags(self): 254 for value in ('r', 'w'): 255 _delete_files() 256 with self.assertWarnsRegex(DeprecationWarning, 257 "The database file is missing, the " 258 "semantics of the 'c' flag will " 259 "be used."): 260 f = dumbdbm.open(_fname, value) 261 f.close() 262 263 def test_missing_index(self): 264 with dumbdbm.open(_fname, 'n') as f: 265 pass 266 os.unlink(_fname + '.dir') 267 for value in ('r', 'w'): 268 with self.assertWarnsRegex(DeprecationWarning, 269 "The index file is missing, the " 270 "semantics of the 'c' flag will " 271 "be used."): 272 f = dumbdbm.open(_fname, value) 273 f.close() 274 self.assertEqual(os.path.exists(_fname + '.dir'), value == 'w') 275 self.assertFalse(os.path.exists(_fname + '.bak')) 276 277 def test_invalid_flag(self): 278 for flag in ('x', 'rf', None): 279 with self.assertWarnsRegex(DeprecationWarning, 280 "Flag must be one of " 281 "'r', 'w', 'c', or 'n'"): 282 f = dumbdbm.open(_fname, flag) 283 f.close() 284 285 @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()') 286 def test_readonly_files(self): 287 with support.temp_dir() as dir: 288 fname = os.path.join(dir, 'db') 289 with dumbdbm.open(fname, 'n') as f: 290 self.assertEqual(list(f.keys()), []) 291 for key in self._dict: 292 f[key] = self._dict[key] 293 os.chmod(fname + ".dir", stat.S_IRUSR) 294 os.chmod(fname + ".dat", stat.S_IRUSR) 295 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) 296 with dumbdbm.open(fname, 'r') as f: 297 self.assertEqual(sorted(f.keys()), sorted(self._dict)) 298 f.close() # don't write 299 300 @unittest.skipUnless(support.TESTFN_NONASCII, 301 'requires OS support of non-ASCII encodings') 302 def test_nonascii_filename(self): 303 filename = support.TESTFN_NONASCII 304 for suffix in ['.dir', '.dat', '.bak']: 305 self.addCleanup(support.unlink, filename + suffix) 306 with dumbdbm.open(filename, 'c') as db: 307 db[b'key'] = b'value' 308 self.assertTrue(os.path.exists(filename + '.dat')) 309 self.assertTrue(os.path.exists(filename + '.dir')) 310 with dumbdbm.open(filename, 'r') as db: 311 self.assertEqual(list(db.keys()), [b'key']) 312 self.assertTrue(b'key' in db) 313 self.assertEqual(db[b'key'], b'value') 314 315 def tearDown(self): 316 _delete_files() 317 318 def setUp(self): 319 _delete_files() 320 321 322if __name__ == "__main__": 323 unittest.main() 324