• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Test script for the dumbdbm module
2   Original by Roger E. Masse
3"""
4
5import io
6import operator
7import os
8import stat
9import unittest
10import warnings
11import dbm.dumb as dumbdbm
12from test import support
13from functools import partial
14
15_fname = support.TESTFN
16
17def _delete_files():
18    for ext in [".dir", ".dat", ".bak"]:
19        try:
20            os.unlink(_fname + ext)
21        except OSError:
22            pass
23
24class DumbDBMTestCase(unittest.TestCase):
25    _dict = {b'0': b'',
26             b'a': b'Python:',
27             b'b': b'Programming',
28             b'c': b'the',
29             b'd': b'way',
30             b'f': b'Guido',
31             b'g': b'intended',
32             '\u00fc'.encode('utf-8') : b'!',
33             }
34
35    def test_dumbdbm_creation(self):
36        f = dumbdbm.open(_fname, 'c')
37        self.assertEqual(list(f.keys()), [])
38        for key in self._dict:
39            f[key] = self._dict[key]
40        self.read_helper(f)
41        f.close()
42
43    @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
44    @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
45    def test_dumbdbm_creation_mode(self):
46        try:
47            old_umask = os.umask(0o002)
48            f = dumbdbm.open(_fname, 'c', 0o637)
49            f.close()
50        finally:
51            os.umask(old_umask)
52
53        expected_mode = 0o635
54        if os.name != 'posix':
55            # Windows only supports setting the read-only attribute.
56            # This shouldn't fail, but doesn't work like Unix either.
57            expected_mode = 0o666
58
59        import stat
60        st = os.stat(_fname + '.dat')
61        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
62        st = os.stat(_fname + '.dir')
63        self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
64
65    def test_close_twice(self):
66        f = dumbdbm.open(_fname)
67        f[b'a'] = b'b'
68        self.assertEqual(f[b'a'], b'b')
69        f.close()
70        f.close()
71
72    def test_dumbdbm_modification(self):
73        self.init_db()
74        f = dumbdbm.open(_fname, 'w')
75        self._dict[b'g'] = f[b'g'] = b"indented"
76        self.read_helper(f)
77        f.close()
78
79    def test_dumbdbm_read(self):
80        self.init_db()
81        f = dumbdbm.open(_fname, 'r')
82        self.read_helper(f)
83        with self.assertWarnsRegex(DeprecationWarning,
84                                   'The database is opened for reading only'):
85            f[b'g'] = b'x'
86        with self.assertWarnsRegex(DeprecationWarning,
87                                   'The database is opened for reading only'):
88            del f[b'a']
89        f.close()
90
91    def test_dumbdbm_keys(self):
92        self.init_db()
93        f = dumbdbm.open(_fname)
94        keys = self.keys_helper(f)
95        f.close()
96
97    def test_write_contains(self):
98        f = dumbdbm.open(_fname)
99        f[b'1'] = b'hello'
100        self.assertIn(b'1', f)
101        f.close()
102
103    def test_write_write_read(self):
104        # test for bug #482460
105        f = dumbdbm.open(_fname)
106        f[b'1'] = b'hello'
107        f[b'1'] = b'hello2'
108        f.close()
109        f = dumbdbm.open(_fname)
110        self.assertEqual(f[b'1'], b'hello2')
111        f.close()
112
113    def test_str_read(self):
114        self.init_db()
115        f = dumbdbm.open(_fname, 'r')
116        self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
117
118    def test_str_write_contains(self):
119        self.init_db()
120        f = dumbdbm.open(_fname)
121        f['\u00fc'] = b'!'
122        f['1'] = 'a'
123        f.close()
124        f = dumbdbm.open(_fname, 'r')
125        self.assertIn('\u00fc', f)
126        self.assertEqual(f['\u00fc'.encode('utf-8')],
127                         self._dict['\u00fc'.encode('utf-8')])
128        self.assertEqual(f[b'1'], b'a')
129
130    def test_line_endings(self):
131        # test for bug #1172763: dumbdbm would die if the line endings
132        # weren't what was expected.
133        f = dumbdbm.open(_fname)
134        f[b'1'] = b'hello'
135        f[b'2'] = b'hello2'
136        f.close()
137
138        # Mangle the file by changing the line separator to Windows or Unix
139        with io.open(_fname + '.dir', 'rb') as file:
140            data = file.read()
141        if os.linesep == '\n':
142            data = data.replace(b'\n', b'\r\n')
143        else:
144            data = data.replace(b'\r\n', b'\n')
145        with io.open(_fname + '.dir', 'wb') as file:
146            file.write(data)
147
148        f = dumbdbm.open(_fname)
149        self.assertEqual(f[b'1'], b'hello')
150        self.assertEqual(f[b'2'], b'hello2')
151
152
153    def read_helper(self, f):
154        keys = self.keys_helper(f)
155        for key in self._dict:
156            self.assertEqual(self._dict[key], f[key])
157
158    def init_db(self):
159        f = dumbdbm.open(_fname, 'n')
160        for k in self._dict:
161            f[k] = self._dict[k]
162        f.close()
163
164    def keys_helper(self, f):
165        keys = sorted(f.keys())
166        dkeys = sorted(self._dict.keys())
167        self.assertEqual(keys, dkeys)
168        return keys
169
170    # Perform randomized operations.  This doesn't make assumptions about
171    # what *might* fail.
172    def test_random(self):
173        import random
174        d = {}  # mirror the database
175        for dummy in range(5):
176            f = dumbdbm.open(_fname)
177            for dummy in range(100):
178                k = random.choice('abcdefghijklm')
179                if random.random() < 0.2:
180                    if k in d:
181                        del d[k]
182                        del f[k]
183                else:
184                    v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
185                    d[k] = v
186                    f[k] = v
187                    self.assertEqual(f[k], v)
188            f.close()
189
190            f = dumbdbm.open(_fname)
191            expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
192            got = sorted(f.items())
193            self.assertEqual(expected, got)
194            f.close()
195
196    def test_context_manager(self):
197        with dumbdbm.open(_fname, 'c') as db:
198            db["dumbdbm context manager"] = "context manager"
199
200        with dumbdbm.open(_fname, 'r') as db:
201            self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
202
203        with self.assertRaises(dumbdbm.error):
204            db.keys()
205
206    def test_check_closed(self):
207        f = dumbdbm.open(_fname, 'c')
208        f.close()
209
210        for meth in (partial(operator.delitem, f),
211                     partial(operator.setitem, f, 'b'),
212                     partial(operator.getitem, f),
213                     partial(operator.contains, f)):
214            with self.assertRaises(dumbdbm.error) as cm:
215                meth('test')
216            self.assertEqual(str(cm.exception),
217                             "DBM object has already been closed")
218
219        for meth in (operator.methodcaller('keys'),
220                     operator.methodcaller('iterkeys'),
221                     operator.methodcaller('items'),
222                     len):
223            with self.assertRaises(dumbdbm.error) as cm:
224                meth(f)
225            self.assertEqual(str(cm.exception),
226                             "DBM object has already been closed")
227
228    def test_create_new(self):
229        with dumbdbm.open(_fname, 'n') as f:
230            for k in self._dict:
231                f[k] = self._dict[k]
232
233        with dumbdbm.open(_fname, 'n') as f:
234            self.assertEqual(f.keys(), [])
235
236    def test_eval(self):
237        with open(_fname + '.dir', 'w') as stream:
238            stream.write("str(print('Hacked!')), 0\n")
239        with support.captured_stdout() as stdout:
240            with self.assertRaises(ValueError):
241                with dumbdbm.open(_fname) as f:
242                    pass
243            self.assertEqual(stdout.getvalue(), '')
244
245    def test_warn_on_ignored_flags(self):
246        for value in ('r', 'w'):
247            _delete_files()
248            with self.assertWarnsRegex(DeprecationWarning,
249                                       "The database file is missing, the "
250                                       "semantics of the 'c' flag will "
251                                       "be used."):
252                f = dumbdbm.open(_fname, value)
253            f.close()
254
255    def test_invalid_flag(self):
256        for flag in ('x', 'rf', None):
257            with self.assertWarnsRegex(DeprecationWarning,
258                                       "Flag must be one of "
259                                       "'r', 'w', 'c', or 'n'"):
260                f = dumbdbm.open(_fname, flag)
261            f.close()
262
263    @unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
264    def test_readonly_files(self):
265        with support.temp_dir() as dir:
266            fname = os.path.join(dir, 'db')
267            with dumbdbm.open(fname, 'n') as f:
268                self.assertEqual(list(f.keys()), [])
269                for key in self._dict:
270                    f[key] = self._dict[key]
271            os.chmod(fname + ".dir", stat.S_IRUSR)
272            os.chmod(fname + ".dat", stat.S_IRUSR)
273            os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
274            with dumbdbm.open(fname, 'r') as f:
275                self.assertEqual(sorted(f.keys()), sorted(self._dict))
276                f.close()  # don't write
277
278    def tearDown(self):
279        _delete_files()
280
281    def setUp(self):
282        _delete_files()
283
284
285if __name__ == "__main__":
286    unittest.main()
287