1"""Test script for the bsddb C module by Roger E. Masse 2 Adapted to unittest format and expanded scope by Raymond Hettinger 3""" 4import os, sys 5import unittest 6from test import test_support 7 8# Skip test if _bsddb wasn't built. 9test_support.import_module('_bsddb') 10 11bsddb = test_support.import_module('bsddb', deprecated=True) 12# Just so we know it's imported: 13test_support.import_module('dbhash', deprecated=True) 14 15 16class TestBSDDB(unittest.TestCase): 17 openflag = 'c' 18 19 def setUp(self): 20 self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768) 21 self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='') 22 for k, v in self.d.iteritems(): 23 self.f[k] = v 24 25 def tearDown(self): 26 self.f.sync() 27 self.f.close() 28 if self.fname is None: 29 return 30 try: 31 os.remove(self.fname) 32 except os.error: 33 pass 34 35 def test_getitem(self): 36 for k, v in self.d.iteritems(): 37 self.assertEqual(self.f[k], v) 38 39 def test_len(self): 40 self.assertEqual(len(self.f), len(self.d)) 41 42 def test_change(self): 43 self.f['r'] = 'discovered' 44 self.assertEqual(self.f['r'], 'discovered') 45 self.assertIn('r', self.f.keys()) 46 self.assertIn('discovered', self.f.values()) 47 48 def test_close_and_reopen(self): 49 self.assertIsNotNone(self.fname) 50 self.f.close() 51 self.f = self.openmethod[0](self.fname, 'w') 52 for k, v in self.d.iteritems(): 53 self.assertEqual(self.f[k], v) 54 55 def assertSetEquals(self, seqn1, seqn2): 56 self.assertEqual(set(seqn1), set(seqn2)) 57 58 def test_mapping_iteration_methods(self): 59 f = self.f 60 d = self.d 61 self.assertSetEquals(d, f) 62 self.assertSetEquals(d.keys(), f.keys()) 63 self.assertSetEquals(d.values(), f.values()) 64 self.assertSetEquals(d.items(), f.items()) 65 self.assertSetEquals(d.iterkeys(), f.iterkeys()) 66 self.assertSetEquals(d.itervalues(), f.itervalues()) 67 self.assertSetEquals(d.iteritems(), f.iteritems()) 68 69 def test_iter_while_modifying_values(self): 70 di = iter(self.d) 71 while 1: 72 try: 73 key = di.next() 74 self.d[key] = 'modified '+key 75 except StopIteration: 76 break 77 78 # it should behave the same as a dict. modifying values 79 # of existing keys should not break iteration. (adding 80 # or removing keys should) 81 loops_left = len(self.f) 82 fi = iter(self.f) 83 while 1: 84 try: 85 key = fi.next() 86 self.f[key] = 'modified '+key 87 loops_left -= 1 88 except StopIteration: 89 break 90 self.assertEqual(loops_left, 0) 91 92 self.test_mapping_iteration_methods() 93 94 def test_iter_abort_on_changed_size(self): 95 def DictIterAbort(): 96 di = iter(self.d) 97 while 1: 98 try: 99 di.next() 100 self.d['newkey'] = 'SPAM' 101 except StopIteration: 102 break 103 self.assertRaises(RuntimeError, DictIterAbort) 104 105 def DbIterAbort(): 106 fi = iter(self.f) 107 while 1: 108 try: 109 fi.next() 110 self.f['newkey'] = 'SPAM' 111 except StopIteration: 112 break 113 self.assertRaises(RuntimeError, DbIterAbort) 114 115 def test_iteritems_abort_on_changed_size(self): 116 def DictIteritemsAbort(): 117 di = self.d.iteritems() 118 while 1: 119 try: 120 di.next() 121 self.d['newkey'] = 'SPAM' 122 except StopIteration: 123 break 124 self.assertRaises(RuntimeError, DictIteritemsAbort) 125 126 def DbIteritemsAbort(): 127 fi = self.f.iteritems() 128 while 1: 129 try: 130 key, value = fi.next() 131 del self.f[key] 132 except StopIteration: 133 break 134 self.assertRaises(RuntimeError, DbIteritemsAbort) 135 136 def test_iteritems_while_modifying_values(self): 137 di = self.d.iteritems() 138 while 1: 139 try: 140 k, v = di.next() 141 self.d[k] = 'modified '+v 142 except StopIteration: 143 break 144 145 # it should behave the same as a dict. modifying values 146 # of existing keys should not break iteration. (adding 147 # or removing keys should) 148 loops_left = len(self.f) 149 fi = self.f.iteritems() 150 while 1: 151 try: 152 k, v = fi.next() 153 self.f[k] = 'modified '+v 154 loops_left -= 1 155 except StopIteration: 156 break 157 self.assertEqual(loops_left, 0) 158 159 self.test_mapping_iteration_methods() 160 161 def test_first_next_looping(self): 162 items = [self.f.first()] 163 for i in xrange(1, len(self.f)): 164 items.append(self.f.next()) 165 self.assertSetEquals(items, self.d.items()) 166 167 def test_previous_last_looping(self): 168 items = [self.f.last()] 169 for i in xrange(1, len(self.f)): 170 items.append(self.f.previous()) 171 self.assertSetEquals(items, self.d.items()) 172 173 def test_first_while_deleting(self): 174 # Test for bug 1725856 175 self.assertGreaterEqual(len(self.d), 2, "test requires >=2 items") 176 for _ in self.d: 177 key = self.f.first()[0] 178 del self.f[key] 179 self.assertEqual([], self.f.items(), "expected empty db after test") 180 181 def test_last_while_deleting(self): 182 # Test for bug 1725856's evil twin 183 self.assertGreaterEqual(len(self.d), 2, "test requires >=2 items") 184 for _ in self.d: 185 key = self.f.last()[0] 186 del self.f[key] 187 self.assertEqual([], self.f.items(), "expected empty db after test") 188 189 def test_set_location(self): 190 self.assertEqual(self.f.set_location('e'), ('e', self.d['e'])) 191 192 def test_contains(self): 193 for k in self.d: 194 self.assertIn(k, self.f) 195 self.assertNotIn('not here', self.f) 196 197 def test_has_key(self): 198 for k in self.d: 199 self.assertTrue(self.f.has_key(k)) 200 self.assertFalse(self.f.has_key('not here')) 201 202 def test_clear(self): 203 self.f.clear() 204 self.assertEqual(len(self.f), 0) 205 206 def test__no_deadlock_first(self, debug=0): 207 # do this so that testers can see what function we're in in 208 # verbose mode when we deadlock. 209 sys.stdout.flush() 210 211 # in pybsddb's _DBWithCursor this causes an internal DBCursor 212 # object is created. Other test_ methods in this class could 213 # inadvertently cause the deadlock but an explicit test is needed. 214 if debug: print "A" 215 k,v = self.f.first() 216 if debug: print "B", k 217 self.f[k] = "deadlock. do not pass go. do not collect $200." 218 if debug: print "C" 219 # if the bsddb implementation leaves the DBCursor open during 220 # the database write and locking+threading support is enabled 221 # the cursor's read lock will deadlock the write lock request.. 222 223 # test the iterator interface 224 if True: 225 if debug: print "D" 226 i = self.f.iteritems() 227 k,v = i.next() 228 if debug: print "E" 229 self.f[k] = "please don't deadlock" 230 if debug: print "F" 231 while 1: 232 try: 233 k,v = i.next() 234 except StopIteration: 235 break 236 if debug: print "F2" 237 238 i = iter(self.f) 239 if debug: print "G" 240 while i: 241 try: 242 if debug: print "H" 243 k = i.next() 244 if debug: print "I" 245 self.f[k] = "deadlocks-r-us" 246 if debug: print "J" 247 except StopIteration: 248 i = None 249 if debug: print "K" 250 251 # test the legacy cursor interface mixed with writes 252 self.assertIn(self.f.first()[0], self.d) 253 k = self.f.next()[0] 254 self.assertIn(k, self.d) 255 self.f[k] = "be gone with ye deadlocks" 256 self.assertTrue(self.f[k], "be gone with ye deadlocks") 257 258 def test_for_cursor_memleak(self): 259 # do the bsddb._DBWithCursor iterator internals leak cursors? 260 nc1 = len(self.f._cursor_refs) 261 # create iterator 262 i = self.f.iteritems() 263 nc2 = len(self.f._cursor_refs) 264 # use the iterator (should run to the first yield, creating the cursor) 265 k, v = i.next() 266 nc3 = len(self.f._cursor_refs) 267 # destroy the iterator; this should cause the weakref callback 268 # to remove the cursor object from self.f._cursor_refs 269 del i 270 nc4 = len(self.f._cursor_refs) 271 272 self.assertEqual(nc1, nc2) 273 self.assertEqual(nc1, nc4) 274 self.assertEqual(nc3, nc1+1) 275 276 def test_popitem(self): 277 k, v = self.f.popitem() 278 self.assertIn(k, self.d) 279 self.assertIn(v, self.d.values()) 280 self.assertNotIn(k, self.f) 281 self.assertEqual(len(self.d)-1, len(self.f)) 282 283 def test_pop(self): 284 k = 'w' 285 v = self.f.pop(k) 286 self.assertEqual(v, self.d[k]) 287 self.assertNotIn(k, self.f) 288 self.assertNotIn(v, self.f.values()) 289 self.assertEqual(len(self.d)-1, len(self.f)) 290 291 def test_get(self): 292 self.assertEqual(self.f.get('NotHere'), None) 293 self.assertEqual(self.f.get('NotHere', 'Default'), 'Default') 294 self.assertEqual(self.f.get('q', 'Default'), self.d['q']) 295 296 def test_setdefault(self): 297 self.assertEqual(self.f.setdefault('new', 'dog'), 'dog') 298 self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r']) 299 300 def test_update(self): 301 new = dict(y='life', u='of', i='brian') 302 self.f.update(new) 303 self.d.update(new) 304 for k, v in self.d.iteritems(): 305 self.assertEqual(self.f[k], v) 306 307 def test_keyordering(self): 308 self.assertIs(self.openmethod[0], bsddb.btopen) 309 keys = self.d.keys() 310 keys.sort() 311 self.assertEqual(self.f.first()[0], keys[0]) 312 self.assertEqual(self.f.next()[0], keys[1]) 313 self.assertEqual(self.f.last()[0], keys[-1]) 314 self.assertEqual(self.f.previous()[0], keys[-2]) 315 self.assertEqual(list(self.f), keys) 316 317class TestBTree(TestBSDDB): 318 fname = test_support.TESTFN 319 openmethod = [bsddb.btopen] 320 321class TestBTree_InMemory(TestBSDDB): 322 fname = None 323 openmethod = [bsddb.btopen] 324 325 # if we're using an in-memory only db, we can't reopen it 326 test_close_and_reopen = None 327 328class TestBTree_InMemory_Truncate(TestBSDDB): 329 fname = None 330 openflag = 'n' 331 openmethod = [bsddb.btopen] 332 333 # if we're using an in-memory only db, we can't reopen it 334 test_close_and_reopen = None 335 336class TestHashTable(TestBSDDB): 337 fname = test_support.TESTFN 338 openmethod = [bsddb.hashopen] 339 340 # keyordering is specific to btopen method 341 test_keyordering = None 342 343class TestHashTable_InMemory(TestBSDDB): 344 fname = None 345 openmethod = [bsddb.hashopen] 346 347 # if we're using an in-memory only db, we can't reopen it 348 test_close_and_reopen = None 349 350 # keyordering is specific to btopen method 351 test_keyordering = None 352 353## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85 354## # appears broken... at least on 355## # Solaris Intel - rmasse 1/97 356 357def test_main(verbose=None): 358 test_support.run_unittest( 359 TestBTree, 360 TestHashTable, 361 TestBTree_InMemory, 362 TestHashTable_InMemory, 363 TestBTree_InMemory_Truncate, 364 ) 365 366if __name__ == "__main__": 367 test_main(verbose=True) 368