• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the bsddb C module by Roger E. Masse
2   Adapted to unittest format and expanded scope by Raymond Hettinger
3"""
4import os, sys
5import unittest
6from test import test_support
7
8# Skip test if _bsddb wasn't built.
9test_support.import_module('_bsddb')
10
11bsddb = test_support.import_module('bsddb', deprecated=True)
12# Just so we know it's imported:
13test_support.import_module('dbhash', deprecated=True)
14
15
16class TestBSDDB(unittest.TestCase):
17    openflag = 'c'
18
19    def setUp(self):
20        self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
21        self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
22        for k, v in self.d.iteritems():
23            self.f[k] = v
24
25    def tearDown(self):
26        self.f.sync()
27        self.f.close()
28        if self.fname is None:
29            return
30        try:
31            os.remove(self.fname)
32        except os.error:
33            pass
34
35    def test_getitem(self):
36        for k, v in self.d.iteritems():
37            self.assertEqual(self.f[k], v)
38
39    def test_len(self):
40        self.assertEqual(len(self.f), len(self.d))
41
42    def test_change(self):
43        self.f['r'] = 'discovered'
44        self.assertEqual(self.f['r'], 'discovered')
45        self.assertIn('r', self.f.keys())
46        self.assertIn('discovered', self.f.values())
47
48    def test_close_and_reopen(self):
49        self.assertIsNotNone(self.fname)
50        self.f.close()
51        self.f = self.openmethod[0](self.fname, 'w')
52        for k, v in self.d.iteritems():
53            self.assertEqual(self.f[k], v)
54
55    def assertSetEquals(self, seqn1, seqn2):
56        self.assertEqual(set(seqn1), set(seqn2))
57
58    def test_mapping_iteration_methods(self):
59        f = self.f
60        d = self.d
61        self.assertSetEquals(d, f)
62        self.assertSetEquals(d.keys(), f.keys())
63        self.assertSetEquals(d.values(), f.values())
64        self.assertSetEquals(d.items(), f.items())
65        self.assertSetEquals(d.iterkeys(), f.iterkeys())
66        self.assertSetEquals(d.itervalues(), f.itervalues())
67        self.assertSetEquals(d.iteritems(), f.iteritems())
68
69    def test_iter_while_modifying_values(self):
70        di = iter(self.d)
71        while 1:
72            try:
73                key = di.next()
74                self.d[key] = 'modified '+key
75            except StopIteration:
76                break
77
78        # it should behave the same as a dict.  modifying values
79        # of existing keys should not break iteration.  (adding
80        # or removing keys should)
81        loops_left = len(self.f)
82        fi = iter(self.f)
83        while 1:
84            try:
85                key = fi.next()
86                self.f[key] = 'modified '+key
87                loops_left -= 1
88            except StopIteration:
89                break
90        self.assertEqual(loops_left, 0)
91
92        self.test_mapping_iteration_methods()
93
94    def test_iter_abort_on_changed_size(self):
95        def DictIterAbort():
96            di = iter(self.d)
97            while 1:
98                try:
99                    di.next()
100                    self.d['newkey'] = 'SPAM'
101                except StopIteration:
102                    break
103        self.assertRaises(RuntimeError, DictIterAbort)
104
105        def DbIterAbort():
106            fi = iter(self.f)
107            while 1:
108                try:
109                    fi.next()
110                    self.f['newkey'] = 'SPAM'
111                except StopIteration:
112                    break
113        self.assertRaises(RuntimeError, DbIterAbort)
114
115    def test_iteritems_abort_on_changed_size(self):
116        def DictIteritemsAbort():
117            di = self.d.iteritems()
118            while 1:
119                try:
120                    di.next()
121                    self.d['newkey'] = 'SPAM'
122                except StopIteration:
123                    break
124        self.assertRaises(RuntimeError, DictIteritemsAbort)
125
126        def DbIteritemsAbort():
127            fi = self.f.iteritems()
128            while 1:
129                try:
130                    key, value = fi.next()
131                    del self.f[key]
132                except StopIteration:
133                    break
134        self.assertRaises(RuntimeError, DbIteritemsAbort)
135
136    def test_iteritems_while_modifying_values(self):
137        di = self.d.iteritems()
138        while 1:
139            try:
140                k, v = di.next()
141                self.d[k] = 'modified '+v
142            except StopIteration:
143                break
144
145        # it should behave the same as a dict.  modifying values
146        # of existing keys should not break iteration.  (adding
147        # or removing keys should)
148        loops_left = len(self.f)
149        fi = self.f.iteritems()
150        while 1:
151            try:
152                k, v = fi.next()
153                self.f[k] = 'modified '+v
154                loops_left -= 1
155            except StopIteration:
156                break
157        self.assertEqual(loops_left, 0)
158
159        self.test_mapping_iteration_methods()
160
161    def test_first_next_looping(self):
162        items = [self.f.first()]
163        for i in xrange(1, len(self.f)):
164            items.append(self.f.next())
165        self.assertSetEquals(items, self.d.items())
166
167    def test_previous_last_looping(self):
168        items = [self.f.last()]
169        for i in xrange(1, len(self.f)):
170            items.append(self.f.previous())
171        self.assertSetEquals(items, self.d.items())
172
173    def test_first_while_deleting(self):
174        # Test for bug 1725856
175        self.assertGreaterEqual(len(self.d), 2, "test requires >=2 items")
176        for _ in self.d:
177            key = self.f.first()[0]
178            del self.f[key]
179        self.assertEqual([], self.f.items(), "expected empty db after test")
180
181    def test_last_while_deleting(self):
182        # Test for bug 1725856's evil twin
183        self.assertGreaterEqual(len(self.d), 2, "test requires >=2 items")
184        for _ in self.d:
185            key = self.f.last()[0]
186            del self.f[key]
187        self.assertEqual([], self.f.items(), "expected empty db after test")
188
189    def test_set_location(self):
190        self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
191
192    def test_contains(self):
193        for k in self.d:
194            self.assertIn(k, self.f)
195        self.assertNotIn('not here', self.f)
196
197    def test_has_key(self):
198        for k in self.d:
199            self.assertTrue(self.f.has_key(k))
200        self.assertFalse(self.f.has_key('not here'))
201
202    def test_clear(self):
203        self.f.clear()
204        self.assertEqual(len(self.f), 0)
205
206    def test__no_deadlock_first(self, debug=0):
207        # do this so that testers can see what function we're in in
208        # verbose mode when we deadlock.
209        sys.stdout.flush()
210
211        # in pybsddb's _DBWithCursor this causes an internal DBCursor
212        # object is created.  Other test_ methods in this class could
213        # inadvertently cause the deadlock but an explicit test is needed.
214        if debug: print "A"
215        k,v = self.f.first()
216        if debug: print "B", k
217        self.f[k] = "deadlock.  do not pass go.  do not collect $200."
218        if debug: print "C"
219        # if the bsddb implementation leaves the DBCursor open during
220        # the database write and locking+threading support is enabled
221        # the cursor's read lock will deadlock the write lock request..
222
223        # test the iterator interface
224        if True:
225            if debug: print "D"
226            i = self.f.iteritems()
227            k,v = i.next()
228            if debug: print "E"
229            self.f[k] = "please don't deadlock"
230            if debug: print "F"
231            while 1:
232                try:
233                    k,v = i.next()
234                except StopIteration:
235                    break
236            if debug: print "F2"
237
238            i = iter(self.f)
239            if debug: print "G"
240            while i:
241                try:
242                    if debug: print "H"
243                    k = i.next()
244                    if debug: print "I"
245                    self.f[k] = "deadlocks-r-us"
246                    if debug: print "J"
247                except StopIteration:
248                    i = None
249            if debug: print "K"
250
251        # test the legacy cursor interface mixed with writes
252        self.assertIn(self.f.first()[0], self.d)
253        k = self.f.next()[0]
254        self.assertIn(k, self.d)
255        self.f[k] = "be gone with ye deadlocks"
256        self.assertTrue(self.f[k], "be gone with ye deadlocks")
257
258    def test_for_cursor_memleak(self):
259        # do the bsddb._DBWithCursor iterator internals leak cursors?
260        nc1 = len(self.f._cursor_refs)
261        # create iterator
262        i = self.f.iteritems()
263        nc2 = len(self.f._cursor_refs)
264        # use the iterator (should run to the first yield, creating the cursor)
265        k, v = i.next()
266        nc3 = len(self.f._cursor_refs)
267        # destroy the iterator; this should cause the weakref callback
268        # to remove the cursor object from self.f._cursor_refs
269        del i
270        nc4 = len(self.f._cursor_refs)
271
272        self.assertEqual(nc1, nc2)
273        self.assertEqual(nc1, nc4)
274        self.assertEqual(nc3, nc1+1)
275
276    def test_popitem(self):
277        k, v = self.f.popitem()
278        self.assertIn(k, self.d)
279        self.assertIn(v, self.d.values())
280        self.assertNotIn(k, self.f)
281        self.assertEqual(len(self.d)-1, len(self.f))
282
283    def test_pop(self):
284        k = 'w'
285        v = self.f.pop(k)
286        self.assertEqual(v, self.d[k])
287        self.assertNotIn(k, self.f)
288        self.assertNotIn(v, self.f.values())
289        self.assertEqual(len(self.d)-1, len(self.f))
290
291    def test_get(self):
292        self.assertEqual(self.f.get('NotHere'), None)
293        self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
294        self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
295
296    def test_setdefault(self):
297        self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
298        self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
299
300    def test_update(self):
301        new = dict(y='life', u='of', i='brian')
302        self.f.update(new)
303        self.d.update(new)
304        for k, v in self.d.iteritems():
305            self.assertEqual(self.f[k], v)
306
307    def test_keyordering(self):
308        self.assertIs(self.openmethod[0], bsddb.btopen)
309        keys = self.d.keys()
310        keys.sort()
311        self.assertEqual(self.f.first()[0], keys[0])
312        self.assertEqual(self.f.next()[0], keys[1])
313        self.assertEqual(self.f.last()[0], keys[-1])
314        self.assertEqual(self.f.previous()[0], keys[-2])
315        self.assertEqual(list(self.f), keys)
316
317class TestBTree(TestBSDDB):
318    fname = test_support.TESTFN
319    openmethod = [bsddb.btopen]
320
321class TestBTree_InMemory(TestBSDDB):
322    fname = None
323    openmethod = [bsddb.btopen]
324
325    # if we're using an in-memory only db, we can't reopen it
326    test_close_and_reopen = None
327
328class TestBTree_InMemory_Truncate(TestBSDDB):
329    fname = None
330    openflag = 'n'
331    openmethod = [bsddb.btopen]
332
333    # if we're using an in-memory only db, we can't reopen it
334    test_close_and_reopen = None
335
336class TestHashTable(TestBSDDB):
337    fname = test_support.TESTFN
338    openmethod = [bsddb.hashopen]
339
340    # keyordering is specific to btopen method
341    test_keyordering = None
342
343class TestHashTable_InMemory(TestBSDDB):
344    fname = None
345    openmethod = [bsddb.hashopen]
346
347    # if we're using an in-memory only db, we can't reopen it
348    test_close_and_reopen = None
349
350    # keyordering is specific to btopen method
351    test_keyordering = None
352
353##         # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
354##         #                                   appears broken... at least on
355##         #                                   Solaris Intel - rmasse 1/97
356
357def test_main(verbose=None):
358    test_support.run_unittest(
359        TestBTree,
360        TestHashTable,
361        TestBTree_InMemory,
362        TestHashTable_InMemory,
363        TestBTree_InMemory_Truncate,
364    )
365
366if __name__ == "__main__":
367    test_main(verbose=True)
368