1"""Test script for the dumbdbm module 2 Original by Roger E. Masse 3""" 4 5import contextlib 6import io 7import operator 8import os 9import stat 10import unittest 11import dbm.dumb as dumbdbm 12from test import support 13from functools import partial 14 15_fname = support.TESTFN 16 17def _delete_files(): 18 for ext in [".dir", ".dat", ".bak"]: 19 try: 20 os.unlink(_fname + ext) 21 except OSError: 22 pass 23 24class DumbDBMTestCase(unittest.TestCase): 25 _dict = {b'0': b'', 26 b'a': b'Python:', 27 b'b': b'Programming', 28 b'c': b'the', 29 b'd': b'way', 30 b'f': b'Guido', 31 b'g': b'intended', 32 '\u00fc'.encode('utf-8') : b'!', 33 } 34 35 def test_dumbdbm_creation(self): 36 with contextlib.closing(dumbdbm.open(_fname, 'c')) as f: 37 self.assertEqual(list(f.keys()), []) 38 for key in self._dict: 39 f[key] = self._dict[key] 40 self.read_helper(f) 41 42 @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') 43 def test_dumbdbm_creation_mode(self): 44 try: 45 old_umask = os.umask(0o002) 46 f = dumbdbm.open(_fname, 'c', 0o637) 47 f.close() 48 finally: 49 os.umask(old_umask) 50 51 expected_mode = 0o635 52 if os.name != 'posix': 53 # Windows only supports setting the read-only attribute. 54 # This shouldn't fail, but doesn't work like Unix either. 55 expected_mode = 0o666 56 57 import stat 58 st = os.stat(_fname + '.dat') 59 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 60 st = os.stat(_fname + '.dir') 61 self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) 62 63 def test_close_twice(self): 64 f = dumbdbm.open(_fname) 65 f[b'a'] = b'b' 66 self.assertEqual(f[b'a'], b'b') 67 f.close() 68 f.close() 69 70 def test_dumbdbm_modification(self): 71 self.init_db() 72 with contextlib.closing(dumbdbm.open(_fname, 'w')) as f: 73 self._dict[b'g'] = f[b'g'] = b"indented" 74 self.read_helper(f) 75 # setdefault() works as in the dict interface 76 self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') 77 self.assertEqual(f[b'xxx'], b'foo') 78 79 def test_dumbdbm_read(self): 80 self.init_db() 81 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 82 self.read_helper(f) 83 with self.assertRaisesRegex(dumbdbm.error, 84 'The database is opened for reading only'): 85 f[b'g'] = b'x' 86 with self.assertRaisesRegex(dumbdbm.error, 87 'The database is opened for reading only'): 88 del f[b'a'] 89 # get() works as in the dict interface 90 self.assertEqual(f.get(b'a'), self._dict[b'a']) 91 self.assertEqual(f.get(b'xxx', b'foo'), b'foo') 92 self.assertIsNone(f.get(b'xxx')) 93 with self.assertRaises(KeyError): 94 f[b'xxx'] 95 96 def test_dumbdbm_keys(self): 97 self.init_db() 98 with contextlib.closing(dumbdbm.open(_fname)) as f: 99 keys = self.keys_helper(f) 100 101 def test_write_contains(self): 102 with contextlib.closing(dumbdbm.open(_fname)) as f: 103 f[b'1'] = b'hello' 104 self.assertIn(b'1', f) 105 106 def test_write_write_read(self): 107 # test for bug #482460 108 with contextlib.closing(dumbdbm.open(_fname)) as f: 109 f[b'1'] = b'hello' 110 f[b'1'] = b'hello2' 111 with contextlib.closing(dumbdbm.open(_fname)) as f: 112 self.assertEqual(f[b'1'], b'hello2') 113 114 def test_str_read(self): 115 self.init_db() 116 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 117 self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) 118 119 def test_str_write_contains(self): 120 self.init_db() 121 with contextlib.closing(dumbdbm.open(_fname)) as f: 122 f['\u00fc'] = b'!' 123 f['1'] = 'a' 124 with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: 125 self.assertIn('\u00fc', f) 126 self.assertEqual(f['\u00fc'.encode('utf-8')], 127 self._dict['\u00fc'.encode('utf-8')]) 128 self.assertEqual(f[b'1'], b'a') 129 130 def test_line_endings(self): 131 # test for bug #1172763: dumbdbm would die if the line endings 132 # weren't what was expected. 133 with contextlib.closing(dumbdbm.open(_fname)) as f: 134 f[b'1'] = b'hello' 135 f[b'2'] = b'hello2' 136 137 # Mangle the file by changing the line separator to Windows or Unix 138 with io.open(_fname + '.dir', 'rb') as file: 139 data = file.read() 140 if os.linesep == '\n': 141 data = data.replace(b'\n', b'\r\n') 142 else: 143 data = data.replace(b'\r\n', b'\n') 144 with io.open(_fname + '.dir', 'wb') as file: 145 file.write(data) 146 147 f = dumbdbm.open(_fname) 148 self.assertEqual(f[b'1'], b'hello') 149 self.assertEqual(f[b'2'], b'hello2') 150 151 152 def read_helper(self, f): 153 keys = self.keys_helper(f) 154 for key in self._dict: 155 self.assertEqual(self._dict[key], f[key]) 156 157 def init_db(self): 158 with contextlib.closing(dumbdbm.open(_fname, 'n')) as f: 159 for k in self._dict: 160 f[k] = self._dict[k] 161 162 def keys_helper(self, f): 163 keys = sorted(f.keys()) 164 dkeys = sorted(self._dict.keys()) 165 self.assertEqual(keys, dkeys) 166 return keys 167 168 # Perform randomized operations. This doesn't make assumptions about 169 # what *might* fail. 170 def test_random(self): 171 import random 172 d = {} # mirror the database 173 for dummy in range(5): 174 with contextlib.closing(dumbdbm.open(_fname)) as f: 175 for dummy in range(100): 176 k = random.choice('abcdefghijklm') 177 if random.random() < 0.2: 178 if k in d: 179 del d[k] 180 del f[k] 181 else: 182 v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) 183 d[k] = v 184 f[k] = v 185 self.assertEqual(f[k], v) 186 187 with contextlib.closing(dumbdbm.open(_fname)) as f: 188 expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) 189 got = sorted(f.items()) 190 self.assertEqual(expected, got) 191 192 def test_context_manager(self): 193 with dumbdbm.open(_fname, 'c') as db: 194 db["dumbdbm context manager"] = "context manager" 195 196 with dumbdbm.open(_fname, 'r') as db: 197 self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) 198 199 with self.assertRaises(dumbdbm.error): 200 db.keys() 201 202 def test_check_closed(self): 203 f = dumbdbm.open(_fname, 'c') 204 f.close() 205 206 for meth in (partial(operator.delitem, f), 207 partial(operator.setitem, f, 'b'), 208 partial(operator.getitem, f), 209 partial(operator.contains, f)): 210 with self.assertRaises(dumbdbm.error) as cm: 211 meth('test') 212 self.assertEqual(str(cm.exception), 213 "DBM object has already been closed") 214 215 for meth in (operator.methodcaller('keys'), 216 operator.methodcaller('iterkeys'), 217 operator.methodcaller('items'), 218 len): 219 with self.assertRaises(dumbdbm.error) as cm: 220 meth(f) 221 self.assertEqual(str(cm.exception), 222 "DBM object has already been closed") 223 224 def test_create_new(self): 225 with dumbdbm.open(_fname, 'n') as f: 226 for k in self._dict: 227 f[k] = self._dict[k] 228 229 with dumbdbm.open(_fname, 'n') as f: 230 self.assertEqual(f.keys(), []) 231 232 def test_eval(self): 233 with open(_fname + '.dir', 'w') as stream: 234 stream.write("str(print('Hacked!')), 0\n") 235 with support.captured_stdout() as stdout: 236 with self.assertRaises(ValueError): 237 with dumbdbm.open(_fname) as f: 238 pass 239 self.assertEqual(stdout.getvalue(), '') 240 241 def test_missing_data(self): 242 for value in ('r', 'w'): 243 _delete_files() 244 with self.assertRaises(FileNotFoundError): 245 dumbdbm.open(_fname, value) 246 self.assertFalse(os.path.exists(_fname + '.dir')) 247 self.assertFalse(os.path.exists(_fname + '.bak')) 248 249 def test_missing_index(self): 250 with dumbdbm.open(_fname, 'n') as f: 251 pass 252 os.unlink(_fname + '.dir') 253 for value in ('r', 'w'): 254 with self.assertRaises(FileNotFoundError): 255 dumbdbm.open(_fname, value) 256 self.assertFalse(os.path.exists(_fname + '.dir')) 257 self.assertFalse(os.path.exists(_fname + '.bak')) 258 259 def test_invalid_flag(self): 260 for flag in ('x', 'rf', None): 261 with self.assertRaisesRegex(ValueError, 262 "Flag must be one of " 263 "'r', 'w', 'c', or 'n'"): 264 dumbdbm.open(_fname, flag) 265 266 def test_readonly_files(self): 267 with support.temp_dir() as dir: 268 fname = os.path.join(dir, 'db') 269 with dumbdbm.open(fname, 'n') as f: 270 self.assertEqual(list(f.keys()), []) 271 for key in self._dict: 272 f[key] = self._dict[key] 273 os.chmod(fname + ".dir", stat.S_IRUSR) 274 os.chmod(fname + ".dat", stat.S_IRUSR) 275 os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) 276 with dumbdbm.open(fname, 'r') as f: 277 self.assertEqual(sorted(f.keys()), sorted(self._dict)) 278 f.close() # don't write 279 280 @unittest.skipUnless(support.TESTFN_NONASCII, 281 'requires OS support of non-ASCII encodings') 282 def test_nonascii_filename(self): 283 filename = support.TESTFN_NONASCII 284 for suffix in ['.dir', '.dat', '.bak']: 285 self.addCleanup(support.unlink, filename + suffix) 286 with dumbdbm.open(filename, 'c') as db: 287 db[b'key'] = b'value' 288 self.assertTrue(os.path.exists(filename + '.dat')) 289 self.assertTrue(os.path.exists(filename + '.dir')) 290 with dumbdbm.open(filename, 'r') as db: 291 self.assertEqual(list(db.keys()), [b'key']) 292 self.assertTrue(b'key' in db) 293 self.assertEqual(db[b'key'], b'value') 294 295 def tearDown(self): 296 _delete_files() 297 298 def setUp(self): 299 _delete_files() 300 301 302if __name__ == "__main__": 303 unittest.main() 304