• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from test import support
2gdbm = support.import_module("dbm.gnu") #skip if not supported
3import unittest
4import os
5from test.support import TESTFN, unlink
6
7
8filename = TESTFN
9
10class TestGdbm(unittest.TestCase):
11    def setUp(self):
12        self.g = None
13
14    def tearDown(self):
15        if self.g is not None:
16            self.g.close()
17        unlink(filename)
18
19    def test_key_methods(self):
20        self.g = gdbm.open(filename, 'c')
21        self.assertEqual(self.g.keys(), [])
22        self.g['a'] = 'b'
23        self.g['12345678910'] = '019237410982340912840198242'
24        self.g[b'bytes'] = b'data'
25        key_set = set(self.g.keys())
26        self.assertEqual(key_set, set([b'a', b'bytes', b'12345678910']))
27        self.assertIn('a', self.g)
28        self.assertIn(b'a', self.g)
29        self.assertEqual(self.g[b'bytes'], b'data')
30        key = self.g.firstkey()
31        while key:
32            self.assertIn(key, key_set)
33            key_set.remove(key)
34            key = self.g.nextkey(key)
35        self.assertRaises(KeyError, lambda: self.g['xxx'])
36        # get() and setdefault() work as in the dict interface
37        self.assertEqual(self.g.get(b'xxx', b'foo'), b'foo')
38        self.assertEqual(self.g.setdefault(b'xxx', b'foo'), b'foo')
39        self.assertEqual(self.g[b'xxx'], b'foo')
40
41    def test_error_conditions(self):
42        # Try to open a non-existent database.
43        unlink(filename)
44        self.assertRaises(gdbm.error, gdbm.open, filename, 'r')
45        # Try to access a closed database.
46        self.g = gdbm.open(filename, 'c')
47        self.g.close()
48        self.assertRaises(gdbm.error, lambda: self.g['a'])
49        # try pass an invalid open flag
50        self.assertRaises(gdbm.error, lambda: gdbm.open(filename, 'rx').close())
51
52    def test_flags(self):
53        # Test the flag parameter open() by trying all supported flag modes.
54        all = set(gdbm.open_flags)
55        # Test standard flags (presumably "crwn").
56        modes = all - set('fsu')
57        for mode in sorted(modes):  # put "c" mode first
58            self.g = gdbm.open(filename, mode)
59            self.g.close()
60
61        # Test additional flags (presumably "fsu").
62        flags = all - set('crwn')
63        for mode in modes:
64            for flag in flags:
65                self.g = gdbm.open(filename, mode + flag)
66                self.g.close()
67
68    def test_reorganize(self):
69        self.g = gdbm.open(filename, 'c')
70        size0 = os.path.getsize(filename)
71
72        self.g['x'] = 'x' * 10000
73        size1 = os.path.getsize(filename)
74        self.assertTrue(size0 < size1)
75
76        del self.g['x']
77        # 'size' is supposed to be the same even after deleting an entry.
78        self.assertEqual(os.path.getsize(filename), size1)
79
80        self.g.reorganize()
81        size2 = os.path.getsize(filename)
82        self.assertTrue(size1 > size2 >= size0)
83
84    def test_context_manager(self):
85        with gdbm.open(filename, 'c') as db:
86            db["gdbm context manager"] = "context manager"
87
88        with gdbm.open(filename, 'r') as db:
89            self.assertEqual(list(db.keys()), [b"gdbm context manager"])
90
91        with self.assertRaises(gdbm.error) as cm:
92            db.keys()
93        self.assertEqual(str(cm.exception),
94                         "GDBM object has already been closed")
95
96if __name__ == '__main__':
97    unittest.main()
98