• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the dumbdbm module
2   Original by Roger E. Masse
3"""
4
5import io
6import operator
7import os
8import stat
9import unittest
10import dbm.dumb as dumbdbm
11from test import support
12from functools import partial
13
14_fname = support.TESTFN
15
16def _delete_files():
17    for ext in [".dir", ".dat", ".bak"]:
18        try:
19            os.unlink(_fname + ext)
20        except OSError:
21            pass
22
23class DumbDBMTestCase(unittest.TestCase):
24    _dict = {b'0': b'',
25             b'a': b'Python:',
26             b'b': b'Programming',
27             b'c': b'the',
28             b'd': b'way',
29             b'f': b'Guido',
30             b'g': b'intended',
31             '\u00fc'.encode('utf-8') : b'!',
32             }
33
34    def test_dumbdbm_creation(self):
35        f = dumbdbm.open(_fname, 'c')
36        self.assertEqual(list(f.keys()), [])
37        for key in self._dict:
38            f[key] = self._dict[key]
39        self.read_helper(f)
40        f.close()
41
42    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
43    @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
44    def test_dumbdbm_creation_mode(self):
45        try:
46            old_umask = os.umask(0o002)
47            f = dumbdbm.open(_fname, 'c', 0o637)
48            f.close()
49        finally:
50            os.umask(old_umask)
51
52        expected_mode = 0o635
53        if os.name != 'posix':
54            # Windows only supports setting the read-only attribute.
55            # This shouldn't fail, but doesn't work like Unix either.
56            expected_mode = 0o666
57
58        import stat
59        st = os.stat(_fname + '.dat')
60        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
61        st = os.stat(_fname + '.dir')
62        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
63
64    def test_close_twice(self):
65        f = dumbdbm.open(_fname)
66        f[b'a'] = b'b'
67        self.assertEqual(f[b'a'], b'b')
68        f.close()
69        f.close()
70
71    def test_dumbdbm_modification(self):
72        self.init_db()
73        f = dumbdbm.open(_fname, 'w')
74        self._dict[b'g'] = f[b'g'] = b"indented"
75        self.read_helper(f)
76        # setdefault() works as in the dict interface
77        self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
78        self.assertEqual(f[b'xxx'], b'foo')
79        f.close()
80
81    def test_dumbdbm_read(self):
82        self.init_db()
83        f = dumbdbm.open(_fname, 'r')
84        self.read_helper(f)
85        with self.assertWarnsRegex(DeprecationWarning,
86                                   'The database is opened for reading only'):
87            f[b'g'] = b'x'
88        with self.assertWarnsRegex(DeprecationWarning,
89                                   'The database is opened for reading only'):
90            del f[b'a']
91        # get() works as in the dict interface
92        self.assertEqual(f.get(b'b'), self._dict[b'b'])
93        self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
94        self.assertIsNone(f.get(b'xxx'))
95        with self.assertRaises(KeyError):
96            f[b'xxx']
97        f.close()
98
99    def test_dumbdbm_keys(self):
100        self.init_db()
101        f = dumbdbm.open(_fname)
102        keys = self.keys_helper(f)
103        f.close()
104
105    def test_write_contains(self):
106        f = dumbdbm.open(_fname)
107        f[b'1'] = b'hello'
108        self.assertIn(b'1', f)
109        f.close()
110
111    def test_write_write_read(self):
112        # test for bug #482460
113        f = dumbdbm.open(_fname)
114        f[b'1'] = b'hello'
115        f[b'1'] = b'hello2'
116        f.close()
117        f = dumbdbm.open(_fname)
118        self.assertEqual(f[b'1'], b'hello2')
119        f.close()
120
121    def test_str_read(self):
122        self.init_db()
123        f = dumbdbm.open(_fname, 'r')
124        self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
125
126    def test_str_write_contains(self):
127        self.init_db()
128        f = dumbdbm.open(_fname)
129        f['\u00fc'] = b'!'
130        f['1'] = 'a'
131        f.close()
132        f = dumbdbm.open(_fname, 'r')
133        self.assertIn('\u00fc', f)
134        self.assertEqual(f['\u00fc'.encode('utf-8')],
135                         self._dict['\u00fc'.encode('utf-8')])
136        self.assertEqual(f[b'1'], b'a')
137
138    def test_line_endings(self):
139        # test for bug #1172763: dumbdbm would die if the line endings
140        # weren't what was expected.
141        f = dumbdbm.open(_fname)
142        f[b'1'] = b'hello'
143        f[b'2'] = b'hello2'
144        f.close()
145
146        # Mangle the file by changing the line separator to Windows or Unix
147        with io.open(_fname + '.dir', 'rb') as file:
148            data = file.read()
149        if os.linesep == '\n':
150            data = data.replace(b'\n', b'\r\n')
151        else:
152            data = data.replace(b'\r\n', b'\n')
153        with io.open(_fname + '.dir', 'wb') as file:
154            file.write(data)
155
156        f = dumbdbm.open(_fname)
157        self.assertEqual(f[b'1'], b'hello')
158        self.assertEqual(f[b'2'], b'hello2')
159
160
161    def read_helper(self, f):
162        keys = self.keys_helper(f)
163        for key in self._dict:
164            self.assertEqual(self._dict[key], f[key])
165
166    def init_db(self):
167        f = dumbdbm.open(_fname, 'n')
168        for k in self._dict:
169            f[k] = self._dict[k]
170        f.close()
171
172    def keys_helper(self, f):
173        keys = sorted(f.keys())
174        dkeys = sorted(self._dict.keys())
175        self.assertEqual(keys, dkeys)
176        return keys
177
178    # Perform randomized operations.  This doesn't make assumptions about
179    # what *might* fail.
180    def test_random(self):
181        import random
182        d = {}  # mirror the database
183        for dummy in range(5):
184            f = dumbdbm.open(_fname)
185            for dummy in range(100):
186                k = random.choice('abcdefghijklm')
187                if random.random() < 0.2:
188                    if k in d:
189                        del d[k]
190                        del f[k]
191                else:
192                    v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
193                    d[k] = v
194                    f[k] = v
195                    self.assertEqual(f[k], v)
196            f.close()
197
198            f = dumbdbm.open(_fname)
199            expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
200            got = sorted(f.items())
201            self.assertEqual(expected, got)
202            f.close()
203
204    def test_context_manager(self):
205        with dumbdbm.open(_fname, 'c') as db:
206            db["dumbdbm context manager"] = "context manager"
207
208        with dumbdbm.open(_fname, 'r') as db:
209            self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
210
211        with self.assertRaises(dumbdbm.error):
212            db.keys()
213
214    def test_check_closed(self):
215        f = dumbdbm.open(_fname, 'c')
216        f.close()
217
218        for meth in (partial(operator.delitem, f),
219                     partial(operator.setitem, f, 'b'),
220                     partial(operator.getitem, f),
221                     partial(operator.contains, f)):
222            with self.assertRaises(dumbdbm.error) as cm:
223                meth('test')
224            self.assertEqual(str(cm.exception),
225                             "DBM object has already been closed")
226
227        for meth in (operator.methodcaller('keys'),
228                     operator.methodcaller('iterkeys'),
229                     operator.methodcaller('items'),
230                     len):
231            with self.assertRaises(dumbdbm.error) as cm:
232                meth(f)
233            self.assertEqual(str(cm.exception),
234                             "DBM object has already been closed")
235
236    def test_create_new(self):
237        with dumbdbm.open(_fname, 'n') as f:
238            for k in self._dict:
239                f[k] = self._dict[k]
240
241        with dumbdbm.open(_fname, 'n') as f:
242            self.assertEqual(f.keys(), [])
243
244    def test_eval(self):
245        with open(_fname + '.dir', 'w') as stream:
246            stream.write("str(print('Hacked!')), 0\n")
247        with support.captured_stdout() as stdout:
248            with self.assertRaises(ValueError):
249                with dumbdbm.open(_fname) as f:
250                    pass
251            self.assertEqual(stdout.getvalue(), '')
252
253    def test_warn_on_ignored_flags(self):
254        for value in ('r', 'w'):
255            _delete_files()
256            with self.assertWarnsRegex(DeprecationWarning,
257                                       "The database file is missing, the "
258                                       "semantics of the 'c' flag will "
259                                       "be used."):
260                f = dumbdbm.open(_fname, value)
261            f.close()
262
263    def test_missing_index(self):
264        with dumbdbm.open(_fname, 'n') as f:
265            pass
266        os.unlink(_fname + '.dir')
267        for value in ('r', 'w'):
268            with self.assertWarnsRegex(DeprecationWarning,
269                                       "The index file is missing, the "
270                                       "semantics of the 'c' flag will "
271                                       "be used."):
272                f = dumbdbm.open(_fname, value)
273            f.close()
274            self.assertEqual(os.path.exists(_fname + '.dir'), value == 'w')
275            self.assertFalse(os.path.exists(_fname + '.bak'))
276
277    def test_invalid_flag(self):
278        for flag in ('x', 'rf', None):
279            with self.assertWarnsRegex(DeprecationWarning,
280                                       "Flag must be one of "
281                                       "'r', 'w', 'c', or 'n'"):
282                f = dumbdbm.open(_fname, flag)
283            f.close()
284
285    @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
286    def test_readonly_files(self):
287        with support.temp_dir() as dir:
288            fname = os.path.join(dir, 'db')
289            with dumbdbm.open(fname, 'n') as f:
290                self.assertEqual(list(f.keys()), [])
291                for key in self._dict:
292                    f[key] = self._dict[key]
293            os.chmod(fname + ".dir", stat.S_IRUSR)
294            os.chmod(fname + ".dat", stat.S_IRUSR)
295            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
296            with dumbdbm.open(fname, 'r') as f:
297                self.assertEqual(sorted(f.keys()), sorted(self._dict))
298                f.close()  # don't write
299
300    @unittest.skipUnless(support.TESTFN_NONASCII,
301                         'requires OS support of non-ASCII encodings')
302    def test_nonascii_filename(self):
303        filename = support.TESTFN_NONASCII
304        for suffix in ['.dir', '.dat', '.bak']:
305            self.addCleanup(support.unlink, filename + suffix)
306        with dumbdbm.open(filename, 'c') as db:
307            db[b'key'] = b'value'
308        self.assertTrue(os.path.exists(filename + '.dat'))
309        self.assertTrue(os.path.exists(filename + '.dir'))
310        with dumbdbm.open(filename, 'r') as db:
311            self.assertEqual(list(db.keys()), [b'key'])
312            self.assertTrue(b'key' in db)
313            self.assertEqual(db[b'key'], b'value')
314
315    def tearDown(self):
316        _delete_files()
317
318    def setUp(self):
319        _delete_files()
320
321
322if __name__ == "__main__":
323    unittest.main()
324