• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the dumbdbm module
2   Original by Roger E. Masse
3"""
4
5import contextlib
6import io
7import operator
8import os
9import stat
10import unittest
11import dbm.dumb as dumbdbm
12from test import support
13from test.support import os_helper
14from functools import partial
15
16_fname = os_helper.TESTFN
17
18
19def _delete_files():
20    for ext in [".dir", ".dat", ".bak"]:
21        try:
22            os.unlink(_fname + ext)
23        except OSError:
24            pass
25
26class DumbDBMTestCase(unittest.TestCase):
27    _dict = {b'0': b'',
28             b'a': b'Python:',
29             b'b': b'Programming',
30             b'c': b'the',
31             b'd': b'way',
32             b'f': b'Guido',
33             b'g': b'intended',
34             '\u00fc'.encode('utf-8') : b'!',
35             }
36
37    def test_dumbdbm_creation(self):
38        with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
39            self.assertEqual(list(f.keys()), [])
40            for key in self._dict:
41                f[key] = self._dict[key]
42            self.read_helper(f)
43
44    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
45    def test_dumbdbm_creation_mode(self):
46        try:
47            old_umask = os.umask(0o002)
48            f = dumbdbm.open(_fname, 'c', 0o637)
49            f.close()
50        finally:
51            os.umask(old_umask)
52
53        expected_mode = 0o635
54        if os.name != 'posix':
55            # Windows only supports setting the read-only attribute.
56            # This shouldn't fail, but doesn't work like Unix either.
57            expected_mode = 0o666
58
59        import stat
60        st = os.stat(_fname + '.dat')
61        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
62        st = os.stat(_fname + '.dir')
63        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
64
65    def test_close_twice(self):
66        f = dumbdbm.open(_fname)
67        f[b'a'] = b'b'
68        self.assertEqual(f[b'a'], b'b')
69        f.close()
70        f.close()
71
72    def test_dumbdbm_modification(self):
73        self.init_db()
74        with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
75            self._dict[b'g'] = f[b'g'] = b"indented"
76            self.read_helper(f)
77            # setdefault() works as in the dict interface
78            self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
79            self.assertEqual(f[b'xxx'], b'foo')
80
81    def test_dumbdbm_read(self):
82        self.init_db()
83        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
84            self.read_helper(f)
85            with self.assertRaisesRegex(dumbdbm.error,
86                                    'The database is opened for reading only'):
87                f[b'g'] = b'x'
88            with self.assertRaisesRegex(dumbdbm.error,
89                                    'The database is opened for reading only'):
90                del f[b'a']
91            # get() works as in the dict interface
92            self.assertEqual(f.get(b'a'), self._dict[b'a'])
93            self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
94            self.assertIsNone(f.get(b'xxx'))
95            with self.assertRaises(KeyError):
96                f[b'xxx']
97
98    def test_dumbdbm_keys(self):
99        self.init_db()
100        with contextlib.closing(dumbdbm.open(_fname)) as f:
101            keys = self.keys_helper(f)
102
103    def test_write_contains(self):
104        with contextlib.closing(dumbdbm.open(_fname)) as f:
105            f[b'1'] = b'hello'
106            self.assertIn(b'1', f)
107
108    def test_write_write_read(self):
109        # test for bug #482460
110        with contextlib.closing(dumbdbm.open(_fname)) as f:
111            f[b'1'] = b'hello'
112            f[b'1'] = b'hello2'
113        with contextlib.closing(dumbdbm.open(_fname)) as f:
114            self.assertEqual(f[b'1'], b'hello2')
115
116    def test_str_read(self):
117        self.init_db()
118        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
119            self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
120
121    def test_str_write_contains(self):
122        self.init_db()
123        with contextlib.closing(dumbdbm.open(_fname)) as f:
124            f['\u00fc'] = b'!'
125            f['1'] = 'a'
126        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
127            self.assertIn('\u00fc', f)
128            self.assertEqual(f['\u00fc'.encode('utf-8')],
129                             self._dict['\u00fc'.encode('utf-8')])
130            self.assertEqual(f[b'1'], b'a')
131
132    def test_line_endings(self):
133        # test for bug #1172763: dumbdbm would die if the line endings
134        # weren't what was expected.
135        with contextlib.closing(dumbdbm.open(_fname)) as f:
136            f[b'1'] = b'hello'
137            f[b'2'] = b'hello2'
138
139        # Mangle the file by changing the line separator to Windows or Unix
140        with io.open(_fname + '.dir', 'rb') as file:
141            data = file.read()
142        if os.linesep == '\n':
143            data = data.replace(b'\n', b'\r\n')
144        else:
145            data = data.replace(b'\r\n', b'\n')
146        with io.open(_fname + '.dir', 'wb') as file:
147            file.write(data)
148
149        f = dumbdbm.open(_fname)
150        self.assertEqual(f[b'1'], b'hello')
151        self.assertEqual(f[b'2'], b'hello2')
152
153
154    def read_helper(self, f):
155        keys = self.keys_helper(f)
156        for key in self._dict:
157            self.assertEqual(self._dict[key], f[key])
158
159    def init_db(self):
160        with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
161            for k in self._dict:
162                f[k] = self._dict[k]
163
164    def keys_helper(self, f):
165        keys = sorted(f.keys())
166        dkeys = sorted(self._dict.keys())
167        self.assertEqual(keys, dkeys)
168        return keys
169
170    # Perform randomized operations.  This doesn't make assumptions about
171    # what *might* fail.
172    def test_random(self):
173        import random
174        d = {}  # mirror the database
175        for dummy in range(5):
176            with contextlib.closing(dumbdbm.open(_fname)) as f:
177                for dummy in range(100):
178                    k = random.choice('abcdefghijklm')
179                    if random.random() < 0.2:
180                        if k in d:
181                            del d[k]
182                            del f[k]
183                    else:
184                        v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
185                        d[k] = v
186                        f[k] = v
187                        self.assertEqual(f[k], v)
188
189            with contextlib.closing(dumbdbm.open(_fname)) as f:
190                expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
191                got = sorted(f.items())
192                self.assertEqual(expected, got)
193
194    def test_context_manager(self):
195        with dumbdbm.open(_fname, 'c') as db:
196            db["dumbdbm context manager"] = "context manager"
197
198        with dumbdbm.open(_fname, 'r') as db:
199            self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
200
201        with self.assertRaises(dumbdbm.error):
202            db.keys()
203
204    def test_check_closed(self):
205        f = dumbdbm.open(_fname, 'c')
206        f.close()
207
208        for meth in (partial(operator.delitem, f),
209                     partial(operator.setitem, f, 'b'),
210                     partial(operator.getitem, f),
211                     partial(operator.contains, f)):
212            with self.assertRaises(dumbdbm.error) as cm:
213                meth('test')
214            self.assertEqual(str(cm.exception),
215                             "DBM object has already been closed")
216
217        for meth in (operator.methodcaller('keys'),
218                     operator.methodcaller('iterkeys'),
219                     operator.methodcaller('items'),
220                     len):
221            with self.assertRaises(dumbdbm.error) as cm:
222                meth(f)
223            self.assertEqual(str(cm.exception),
224                             "DBM object has already been closed")
225
226    def test_create_new(self):
227        with dumbdbm.open(_fname, 'n') as f:
228            for k in self._dict:
229                f[k] = self._dict[k]
230
231        with dumbdbm.open(_fname, 'n') as f:
232            self.assertEqual(f.keys(), [])
233
234    def test_eval(self):
235        with open(_fname + '.dir', 'w', encoding="utf-8") as stream:
236            stream.write("str(print('Hacked!')), 0\n")
237        with support.captured_stdout() as stdout:
238            with self.assertRaises(ValueError):
239                with dumbdbm.open(_fname) as f:
240                    pass
241            self.assertEqual(stdout.getvalue(), '')
242
243    def test_missing_data(self):
244        for value in ('r', 'w'):
245            _delete_files()
246            with self.assertRaises(FileNotFoundError):
247                dumbdbm.open(_fname, value)
248            self.assertFalse(os.path.exists(_fname + '.dir'))
249            self.assertFalse(os.path.exists(_fname + '.bak'))
250
251    def test_missing_index(self):
252        with dumbdbm.open(_fname, 'n') as f:
253            pass
254        os.unlink(_fname + '.dir')
255        for value in ('r', 'w'):
256            with self.assertRaises(FileNotFoundError):
257                dumbdbm.open(_fname, value)
258            self.assertFalse(os.path.exists(_fname + '.dir'))
259            self.assertFalse(os.path.exists(_fname + '.bak'))
260
261    def test_invalid_flag(self):
262        for flag in ('x', 'rf', None):
263            with self.assertRaisesRegex(ValueError,
264                                        "Flag must be one of "
265                                        "'r', 'w', 'c', or 'n'"):
266                dumbdbm.open(_fname, flag)
267
268    def test_readonly_files(self):
269        with os_helper.temp_dir() as dir:
270            fname = os.path.join(dir, 'db')
271            with dumbdbm.open(fname, 'n') as f:
272                self.assertEqual(list(f.keys()), [])
273                for key in self._dict:
274                    f[key] = self._dict[key]
275            os.chmod(fname + ".dir", stat.S_IRUSR)
276            os.chmod(fname + ".dat", stat.S_IRUSR)
277            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
278            with dumbdbm.open(fname, 'r') as f:
279                self.assertEqual(sorted(f.keys()), sorted(self._dict))
280                f.close()  # don't write
281
282    @unittest.skipUnless(os_helper.TESTFN_NONASCII,
283                         'requires OS support of non-ASCII encodings')
284    def test_nonascii_filename(self):
285        filename = os_helper.TESTFN_NONASCII
286        for suffix in ['.dir', '.dat', '.bak']:
287            self.addCleanup(os_helper.unlink, filename + suffix)
288        with dumbdbm.open(filename, 'c') as db:
289            db[b'key'] = b'value'
290        self.assertTrue(os.path.exists(filename + '.dat'))
291        self.assertTrue(os.path.exists(filename + '.dir'))
292        with dumbdbm.open(filename, 'r') as db:
293            self.assertEqual(list(db.keys()), [b'key'])
294            self.assertTrue(b'key' in db)
295            self.assertEqual(db[b'key'], b'value')
296
297    def tearDown(self):
298        _delete_files()
299
300    def setUp(self):
301        _delete_files()
302
303
304if __name__ == "__main__":
305    unittest.main()
306