• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the dumbdbm module
2   Original by Roger E. Masse
3"""
4
5import contextlib
6import io
7import operator
8import os
9import stat
10import unittest
11import dbm.dumb as dumbdbm
12from test import support
13from functools import partial
14
15_fname = support.TESTFN
16
17def _delete_files():
18    for ext in [".dir", ".dat", ".bak"]:
19        try:
20            os.unlink(_fname + ext)
21        except OSError:
22            pass
23
24class DumbDBMTestCase(unittest.TestCase):
25    _dict = {b'0': b'',
26             b'a': b'Python:',
27             b'b': b'Programming',
28             b'c': b'the',
29             b'd': b'way',
30             b'f': b'Guido',
31             b'g': b'intended',
32             '\u00fc'.encode('utf-8') : b'!',
33             }
34
35    def test_dumbdbm_creation(self):
36        with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
37            self.assertEqual(list(f.keys()), [])
38            for key in self._dict:
39                f[key] = self._dict[key]
40            self.read_helper(f)
41
42    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
43    def test_dumbdbm_creation_mode(self):
44        try:
45            old_umask = os.umask(0o002)
46            f = dumbdbm.open(_fname, 'c', 0o637)
47            f.close()
48        finally:
49            os.umask(old_umask)
50
51        expected_mode = 0o635
52        if os.name != 'posix':
53            # Windows only supports setting the read-only attribute.
54            # This shouldn't fail, but doesn't work like Unix either.
55            expected_mode = 0o666
56
57        import stat
58        st = os.stat(_fname + '.dat')
59        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
60        st = os.stat(_fname + '.dir')
61        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
62
63    def test_close_twice(self):
64        f = dumbdbm.open(_fname)
65        f[b'a'] = b'b'
66        self.assertEqual(f[b'a'], b'b')
67        f.close()
68        f.close()
69
70    def test_dumbdbm_modification(self):
71        self.init_db()
72        with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
73            self._dict[b'g'] = f[b'g'] = b"indented"
74            self.read_helper(f)
75            # setdefault() works as in the dict interface
76            self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
77            self.assertEqual(f[b'xxx'], b'foo')
78
79    def test_dumbdbm_read(self):
80        self.init_db()
81        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
82            self.read_helper(f)
83            with self.assertRaisesRegex(dumbdbm.error,
84                                    'The database is opened for reading only'):
85                f[b'g'] = b'x'
86            with self.assertRaisesRegex(dumbdbm.error,
87                                    'The database is opened for reading only'):
88                del f[b'a']
89            # get() works as in the dict interface
90            self.assertEqual(f.get(b'a'), self._dict[b'a'])
91            self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
92            self.assertIsNone(f.get(b'xxx'))
93            with self.assertRaises(KeyError):
94                f[b'xxx']
95
96    def test_dumbdbm_keys(self):
97        self.init_db()
98        with contextlib.closing(dumbdbm.open(_fname)) as f:
99            keys = self.keys_helper(f)
100
101    def test_write_contains(self):
102        with contextlib.closing(dumbdbm.open(_fname)) as f:
103            f[b'1'] = b'hello'
104            self.assertIn(b'1', f)
105
106    def test_write_write_read(self):
107        # test for bug #482460
108        with contextlib.closing(dumbdbm.open(_fname)) as f:
109            f[b'1'] = b'hello'
110            f[b'1'] = b'hello2'
111        with contextlib.closing(dumbdbm.open(_fname)) as f:
112            self.assertEqual(f[b'1'], b'hello2')
113
114    def test_str_read(self):
115        self.init_db()
116        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
117            self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
118
119    def test_str_write_contains(self):
120        self.init_db()
121        with contextlib.closing(dumbdbm.open(_fname)) as f:
122            f['\u00fc'] = b'!'
123            f['1'] = 'a'
124        with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
125            self.assertIn('\u00fc', f)
126            self.assertEqual(f['\u00fc'.encode('utf-8')],
127                             self._dict['\u00fc'.encode('utf-8')])
128            self.assertEqual(f[b'1'], b'a')
129
130    def test_line_endings(self):
131        # test for bug #1172763: dumbdbm would die if the line endings
132        # weren't what was expected.
133        with contextlib.closing(dumbdbm.open(_fname)) as f:
134            f[b'1'] = b'hello'
135            f[b'2'] = b'hello2'
136
137        # Mangle the file by changing the line separator to Windows or Unix
138        with io.open(_fname + '.dir', 'rb') as file:
139            data = file.read()
140        if os.linesep == '\n':
141            data = data.replace(b'\n', b'\r\n')
142        else:
143            data = data.replace(b'\r\n', b'\n')
144        with io.open(_fname + '.dir', 'wb') as file:
145            file.write(data)
146
147        f = dumbdbm.open(_fname)
148        self.assertEqual(f[b'1'], b'hello')
149        self.assertEqual(f[b'2'], b'hello2')
150
151
152    def read_helper(self, f):
153        keys = self.keys_helper(f)
154        for key in self._dict:
155            self.assertEqual(self._dict[key], f[key])
156
157    def init_db(self):
158        with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
159            for k in self._dict:
160                f[k] = self._dict[k]
161
162    def keys_helper(self, f):
163        keys = sorted(f.keys())
164        dkeys = sorted(self._dict.keys())
165        self.assertEqual(keys, dkeys)
166        return keys
167
168    # Perform randomized operations.  This doesn't make assumptions about
169    # what *might* fail.
170    def test_random(self):
171        import random
172        d = {}  # mirror the database
173        for dummy in range(5):
174            with contextlib.closing(dumbdbm.open(_fname)) as f:
175                for dummy in range(100):
176                    k = random.choice('abcdefghijklm')
177                    if random.random() < 0.2:
178                        if k in d:
179                            del d[k]
180                            del f[k]
181                    else:
182                        v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
183                        d[k] = v
184                        f[k] = v
185                        self.assertEqual(f[k], v)
186
187            with contextlib.closing(dumbdbm.open(_fname)) as f:
188                expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
189                got = sorted(f.items())
190                self.assertEqual(expected, got)
191
192    def test_context_manager(self):
193        with dumbdbm.open(_fname, 'c') as db:
194            db["dumbdbm context manager"] = "context manager"
195
196        with dumbdbm.open(_fname, 'r') as db:
197            self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
198
199        with self.assertRaises(dumbdbm.error):
200            db.keys()
201
202    def test_check_closed(self):
203        f = dumbdbm.open(_fname, 'c')
204        f.close()
205
206        for meth in (partial(operator.delitem, f),
207                     partial(operator.setitem, f, 'b'),
208                     partial(operator.getitem, f),
209                     partial(operator.contains, f)):
210            with self.assertRaises(dumbdbm.error) as cm:
211                meth('test')
212            self.assertEqual(str(cm.exception),
213                             "DBM object has already been closed")
214
215        for meth in (operator.methodcaller('keys'),
216                     operator.methodcaller('iterkeys'),
217                     operator.methodcaller('items'),
218                     len):
219            with self.assertRaises(dumbdbm.error) as cm:
220                meth(f)
221            self.assertEqual(str(cm.exception),
222                             "DBM object has already been closed")
223
224    def test_create_new(self):
225        with dumbdbm.open(_fname, 'n') as f:
226            for k in self._dict:
227                f[k] = self._dict[k]
228
229        with dumbdbm.open(_fname, 'n') as f:
230            self.assertEqual(f.keys(), [])
231
232    def test_eval(self):
233        with open(_fname + '.dir', 'w') as stream:
234            stream.write("str(print('Hacked!')), 0\n")
235        with support.captured_stdout() as stdout:
236            with self.assertRaises(ValueError):
237                with dumbdbm.open(_fname) as f:
238                    pass
239            self.assertEqual(stdout.getvalue(), '')
240
241    def test_missing_data(self):
242        for value in ('r', 'w'):
243            _delete_files()
244            with self.assertRaises(FileNotFoundError):
245                dumbdbm.open(_fname, value)
246            self.assertFalse(os.path.exists(_fname + '.dir'))
247            self.assertFalse(os.path.exists(_fname + '.bak'))
248
249    def test_missing_index(self):
250        with dumbdbm.open(_fname, 'n') as f:
251            pass
252        os.unlink(_fname + '.dir')
253        for value in ('r', 'w'):
254            with self.assertRaises(FileNotFoundError):
255                dumbdbm.open(_fname, value)
256            self.assertFalse(os.path.exists(_fname + '.dir'))
257            self.assertFalse(os.path.exists(_fname + '.bak'))
258
259    def test_invalid_flag(self):
260        for flag in ('x', 'rf', None):
261            with self.assertRaisesRegex(ValueError,
262                                        "Flag must be one of "
263                                        "'r', 'w', 'c', or 'n'"):
264                dumbdbm.open(_fname, flag)
265
266    def test_readonly_files(self):
267        with support.temp_dir() as dir:
268            fname = os.path.join(dir, 'db')
269            with dumbdbm.open(fname, 'n') as f:
270                self.assertEqual(list(f.keys()), [])
271                for key in self._dict:
272                    f[key] = self._dict[key]
273            os.chmod(fname + ".dir", stat.S_IRUSR)
274            os.chmod(fname + ".dat", stat.S_IRUSR)
275            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
276            with dumbdbm.open(fname, 'r') as f:
277                self.assertEqual(sorted(f.keys()), sorted(self._dict))
278                f.close()  # don't write
279
280    @unittest.skipUnless(support.TESTFN_NONASCII,
281                         'requires OS support of non-ASCII encodings')
282    def test_nonascii_filename(self):
283        filename = support.TESTFN_NONASCII
284        for suffix in ['.dir', '.dat', '.bak']:
285            self.addCleanup(support.unlink, filename + suffix)
286        with dumbdbm.open(filename, 'c') as db:
287            db[b'key'] = b'value'
288        self.assertTrue(os.path.exists(filename + '.dat'))
289        self.assertTrue(os.path.exists(filename + '.dir'))
290        with dumbdbm.open(filename, 'r') as db:
291            self.assertEqual(list(db.keys()), [b'key'])
292            self.assertTrue(b'key' in db)
293            self.assertEqual(db[b'key'], b'value')
294
295    def tearDown(self):
296        _delete_files()
297
298    def setUp(self):
299        _delete_files()
300
301
302if __name__ == "__main__":
303    unittest.main()
304