1"""Test script for the dumbdbm module 2 Original by Roger E. Masse 3""" 4 5import contextlib 6import io 7import operator 8import os 9import stat 10import unittest 11import dbm.dumb as dumbdbm 12from test import support 13from test.support import os_helper 14from functools import partial 15 16_fname = os_helper.TESTFN 17 18 19def _delete_files(): 20 for ext in [".dir", ".dat", ".bak"]: 21 try: 22 os.unlink(_fname + ext) 23 except OSError: 24 pass 25 26class DumbDBMTestCase(unittest.TestCase): 27 _dict = {b'0': b'', 28 b'a': b'Python:', 29 b'b': b'Programming', 30 b'c': b'the', 31 b'd': b'way', 32 b'f': b'Guido', 33 b'g': b'intended', 34 '\u00fc'.encode('utf-8') : b'!', 35 } 36 37 def test_dumbdbm_creation(self): 38 with contextlib.closing(dumbdbm.open(_fname, 'c')) as f: 39 self.assertEqual(list(f.keys()), []) 40 for key in self._dict: 41 f[key] = self._dict[key] 42 self.read_helper(f) 43 44 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 45 def test_dumbdbm_creation_mode(self): 46 try: 47 old_umask = os.umask(0o002) 48 f = dumbdbm.open(_fname, 'c', 0o637) 49 f.close() 50 finally: 51 os.umask(old_umask) 52 53 expected_mode = 0o635 54 if os.name != 'posix': 55 # Windows only supports setting the read-only attribute. 56 # This shouldn't fail, but doesn't work like Unix either. 57 expected_mode = 0o666 58 59 import stat 60 st = os.stat(_fname + '.dat') 61 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 62 st = os.stat(_fname + '.dir') 63 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 64 65 def test_close_twice(self): 66 f = dumbdbm.open(_fname) 67 f[b'a'] = b'b' 68 self.assertEqual(f[b'a'], b'b') 69 f.close() 70 f.close() 71 72 def test_dumbdbm_modification(self): 73 self.init_db() 74 with contextlib.closing(dumbdbm.open(_fname, 'w')) as f: 75 self._dict[b'g'] = f[b'g'] = b"indented" 76 self.read_helper(f) 77 # setdefault() works as in the dict interface 78 self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') 79 self.assertEqual(f[b'xxx'], b'foo') 80 81 def test_dumbdbm_read(self): 82 self.init_db() 83 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 84 self.read_helper(f) 85 with self.assertRaisesRegex(dumbdbm.error, 86 'The database is opened for reading only'): 87 f[b'g'] = b'x' 88 with self.assertRaisesRegex(dumbdbm.error, 89 'The database is opened for reading only'): 90 del f[b'a'] 91 # get() works as in the dict interface 92 self.assertEqual(f.get(b'a'), self._dict[b'a']) 93 self.assertEqual(f.get(b'xxx', b'foo'), b'foo') 94 self.assertIsNone(f.get(b'xxx')) 95 with self.assertRaises(KeyError): 96 f[b'xxx'] 97 98 def test_dumbdbm_keys(self): 99 self.init_db() 100 with contextlib.closing(dumbdbm.open(_fname)) as f: 101 keys = self.keys_helper(f) 102 103 def test_write_contains(self): 104 with contextlib.closing(dumbdbm.open(_fname)) as f: 105 f[b'1'] = b'hello' 106 self.assertIn(b'1', f) 107 108 def test_write_write_read(self): 109 # test for bug #482460 110 with contextlib.closing(dumbdbm.open(_fname)) as f: 111 f[b'1'] = b'hello' 112 f[b'1'] = b'hello2' 113 with contextlib.closing(dumbdbm.open(_fname)) as f: 114 self.assertEqual(f[b'1'], b'hello2') 115 116 def test_str_read(self): 117 self.init_db() 118 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 119 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) 120 121 def test_str_write_contains(self): 122 self.init_db() 123 with contextlib.closing(dumbdbm.open(_fname)) as f: 124 f['\u00fc'] = b'!' 125 f['1'] = 'a' 126 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 127 self.assertIn('\u00fc', f) 128 self.assertEqual(f['\u00fc'.encode('utf-8')], 129 self._dict['\u00fc'.encode('utf-8')]) 130 self.assertEqual(f[b'1'], b'a') 131 132 def test_line_endings(self): 133 # test for bug #1172763: dumbdbm would die if the line endings 134 # weren't what was expected. 135 with contextlib.closing(dumbdbm.open(_fname)) as f: 136 f[b'1'] = b'hello' 137 f[b'2'] = b'hello2' 138 139 # Mangle the file by changing the line separator to Windows or Unix 140 with io.open(_fname + '.dir', 'rb') as file: 141 data = file.read() 142 if os.linesep == '\n': 143 data = data.replace(b'\n', b'\r\n') 144 else: 145 data = data.replace(b'\r\n', b'\n') 146 with io.open(_fname + '.dir', 'wb') as file: 147 file.write(data) 148 149 f = dumbdbm.open(_fname) 150 self.assertEqual(f[b'1'], b'hello') 151 self.assertEqual(f[b'2'], b'hello2') 152 153 154 def read_helper(self, f): 155 keys = self.keys_helper(f) 156 for key in self._dict: 157 self.assertEqual(self._dict[key], f[key]) 158 159 def init_db(self): 160 with contextlib.closing(dumbdbm.open(_fname, 'n')) as f: 161 for k in self._dict: 162 f[k] = self._dict[k] 163 164 def keys_helper(self, f): 165 keys = sorted(f.keys()) 166 dkeys = sorted(self._dict.keys()) 167 self.assertEqual(keys, dkeys) 168 return keys 169 170 # Perform randomized operations. This doesn't make assumptions about 171 # what *might* fail. 172 def test_random(self): 173 import random 174 d = {} # mirror the database 175 for dummy in range(5): 176 with contextlib.closing(dumbdbm.open(_fname)) as f: 177 for dummy in range(100): 178 k = random.choice('abcdefghijklm') 179 if random.random() < 0.2: 180 if k in d: 181 del d[k] 182 del f[k] 183 else: 184 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) 185 d[k] = v 186 f[k] = v 187 self.assertEqual(f[k], v) 188 189 with contextlib.closing(dumbdbm.open(_fname)) as f: 190 expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) 191 got = sorted(f.items()) 192 self.assertEqual(expected, got) 193 194 def test_context_manager(self): 195 with dumbdbm.open(_fname, 'c') as db: 196 db["dumbdbm context manager"] = "context manager" 197 198 with dumbdbm.open(_fname, 'r') as db: 199 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) 200 201 with self.assertRaises(dumbdbm.error): 202 db.keys() 203 204 def test_check_closed(self): 205 f = dumbdbm.open(_fname, 'c') 206 f.close() 207 208 for meth in (partial(operator.delitem, f), 209 partial(operator.setitem, f, 'b'), 210 partial(operator.getitem, f), 211 partial(operator.contains, f)): 212 with self.assertRaises(dumbdbm.error) as cm: 213 meth('test') 214 self.assertEqual(str(cm.exception), 215 "DBM object has already been closed") 216 217 for meth in (operator.methodcaller('keys'), 218 operator.methodcaller('iterkeys'), 219 operator.methodcaller('items'), 220 len): 221 with self.assertRaises(dumbdbm.error) as cm: 222 meth(f) 223 self.assertEqual(str(cm.exception), 224 "DBM object has already been closed") 225 226 def test_create_new(self): 227 with dumbdbm.open(_fname, 'n') as f: 228 for k in self._dict: 229 f[k] = self._dict[k] 230 231 with dumbdbm.open(_fname, 'n') as f: 232 self.assertEqual(f.keys(), []) 233 234 def test_eval(self): 235 with open(_fname + '.dir', 'w', encoding="utf-8") as stream: 236 stream.write("str(print('Hacked!')), 0\n") 237 with support.captured_stdout() as stdout: 238 with self.assertRaises(ValueError): 239 with dumbdbm.open(_fname) as f: 240 pass 241 self.assertEqual(stdout.getvalue(), '') 242 243 def test_missing_data(self): 244 for value in ('r', 'w'): 245 _delete_files() 246 with self.assertRaises(FileNotFoundError): 247 dumbdbm.open(_fname, value) 248 self.assertFalse(os.path.exists(_fname + '.dir')) 249 self.assertFalse(os.path.exists(_fname + '.bak')) 250 251 def test_missing_index(self): 252 with dumbdbm.open(_fname, 'n') as f: 253 pass 254 os.unlink(_fname + '.dir') 255 for value in ('r', 'w'): 256 with self.assertRaises(FileNotFoundError): 257 dumbdbm.open(_fname, value) 258 self.assertFalse(os.path.exists(_fname + '.dir')) 259 self.assertFalse(os.path.exists(_fname + '.bak')) 260 261 def test_invalid_flag(self): 262 for flag in ('x', 'rf', None): 263 with self.assertRaisesRegex(ValueError, 264 "Flag must be one of " 265 "'r', 'w', 'c', or 'n'"): 266 dumbdbm.open(_fname, flag) 267 268 def test_readonly_files(self): 269 with os_helper.temp_dir() as dir: 270 fname = os.path.join(dir, 'db') 271 with dumbdbm.open(fname, 'n') as f: 272 self.assertEqual(list(f.keys()), []) 273 for key in self._dict: 274 f[key] = self._dict[key] 275 os.chmod(fname + ".dir", stat.S_IRUSR) 276 os.chmod(fname + ".dat", stat.S_IRUSR) 277 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) 278 with dumbdbm.open(fname, 'r') as f: 279 self.assertEqual(sorted(f.keys()), sorted(self._dict)) 280 f.close() # don't write 281 282 @unittest.skipUnless(os_helper.TESTFN_NONASCII, 283 'requires OS support of non-ASCII encodings') 284 def test_nonascii_filename(self): 285 filename = os_helper.TESTFN_NONASCII 286 for suffix in ['.dir', '.dat', '.bak']: 287 self.addCleanup(os_helper.unlink, filename + suffix) 288 with dumbdbm.open(filename, 'c') as db: 289 db[b'key'] = b'value' 290 self.assertTrue(os.path.exists(filename + '.dat')) 291 self.assertTrue(os.path.exists(filename + '.dir')) 292 with dumbdbm.open(filename, 'r') as db: 293 self.assertEqual(list(db.keys()), [b'key']) 294 self.assertTrue(b'key' in db) 295 self.assertEqual(db[b'key'], b'value') 296 297 def tearDown(self): 298 _delete_files() 299 300 def setUp(self): 301 _delete_files() 302 303 304if __name__ == "__main__": 305 unittest.main() 306