1import unittest 2import dbm 3import os 4import shelve 5import glob 6import pickle 7 8from test import support 9from test.support import os_helper 10from collections.abc import MutableMapping 11from test.test_dbm import dbm_iterator 12 13def L1(s): 14 return s.decode("latin-1") 15 16class byteskeydict(MutableMapping): 17 "Mapping that supports bytes keys" 18 19 def __init__(self): 20 self.d = {} 21 22 def __getitem__(self, key): 23 return self.d[L1(key)] 24 25 def __setitem__(self, key, value): 26 self.d[L1(key)] = value 27 28 def __delitem__(self, key): 29 del self.d[L1(key)] 30 31 def __len__(self): 32 return len(self.d) 33 34 def iterkeys(self): 35 for k in self.d.keys(): 36 yield k.encode("latin-1") 37 38 __iter__ = iterkeys 39 40 def keys(self): 41 return list(self.iterkeys()) 42 43 def copy(self): 44 return byteskeydict(self.d) 45 46 47class TestCase(unittest.TestCase): 48 dirname = os_helper.TESTFN 49 fn = os.path.join(os_helper.TESTFN, "shelftemp.db") 50 51 def test_close(self): 52 d1 = {} 53 s = shelve.Shelf(d1, protocol=2, writeback=False) 54 s['key1'] = [1,2,3,4] 55 self.assertEqual(s['key1'], [1,2,3,4]) 56 self.assertEqual(len(s), 1) 57 s.close() 58 self.assertRaises(ValueError, len, s) 59 try: 60 s['key1'] 61 except ValueError: 62 pass 63 else: 64 self.fail('Closed shelf should not find a key') 65 66 def test_open_template(self, protocol=None): 67 os.mkdir(self.dirname) 68 self.addCleanup(os_helper.rmtree, self.dirname) 69 s = shelve.open(self.fn, protocol=protocol) 70 try: 71 s['key1'] = (1,2,3,4) 72 self.assertEqual(s['key1'], (1,2,3,4)) 73 finally: 74 s.close() 75 76 def test_ascii_file_shelf(self): 77 self.test_open_template(protocol=0) 78 79 def test_binary_file_shelf(self): 80 self.test_open_template(protocol=1) 81 82 def test_proto2_file_shelf(self): 83 self.test_open_template(protocol=2) 84 85 def test_in_memory_shelf(self): 86 d1 = byteskeydict() 87 with shelve.Shelf(d1, protocol=0) as s: 88 s['key1'] = (1,2,3,4) 89 self.assertEqual(s['key1'], (1,2,3,4)) 90 d2 = byteskeydict() 91 with shelve.Shelf(d2, protocol=1) as s: 92 s['key1'] = (1,2,3,4) 93 self.assertEqual(s['key1'], (1,2,3,4)) 94 95 self.assertEqual(len(d1), 1) 96 self.assertEqual(len(d2), 1) 97 self.assertNotEqual(d1.items(), d2.items()) 98 99 def test_mutable_entry(self): 100 d1 = byteskeydict() 101 with shelve.Shelf(d1, protocol=2, writeback=False) as s: 102 s['key1'] = [1,2,3,4] 103 self.assertEqual(s['key1'], [1,2,3,4]) 104 s['key1'].append(5) 105 self.assertEqual(s['key1'], [1,2,3,4]) 106 107 d2 = byteskeydict() 108 with shelve.Shelf(d2, protocol=2, writeback=True) as s: 109 s['key1'] = [1,2,3,4] 110 self.assertEqual(s['key1'], [1,2,3,4]) 111 s['key1'].append(5) 112 self.assertEqual(s['key1'], [1,2,3,4,5]) 113 114 self.assertEqual(len(d1), 1) 115 self.assertEqual(len(d2), 1) 116 117 def test_keyencoding(self): 118 d = {} 119 key = 'Pöp' 120 # the default keyencoding is utf-8 121 shelve.Shelf(d)[key] = [1] 122 self.assertIn(key.encode('utf-8'), d) 123 # but a different one can be given 124 shelve.Shelf(d, keyencoding='latin-1')[key] = [1] 125 self.assertIn(key.encode('latin-1'), d) 126 # with all consequences 127 s = shelve.Shelf(d, keyencoding='ascii') 128 self.assertRaises(UnicodeEncodeError, s.__setitem__, key, [1]) 129 130 def test_writeback_also_writes_immediately(self): 131 # Issue 5754 132 d = {} 133 key = 'key' 134 encodedkey = key.encode('utf-8') 135 with shelve.Shelf(d, writeback=True) as s: 136 s[key] = [1] 137 p1 = d[encodedkey] # Will give a KeyError if backing store not updated 138 s['key'].append(2) 139 p2 = d[encodedkey] 140 self.assertNotEqual(p1, p2) # Write creates new object in store 141 142 def test_with(self): 143 d1 = {} 144 with shelve.Shelf(d1, protocol=2, writeback=False) as s: 145 s['key1'] = [1,2,3,4] 146 self.assertEqual(s['key1'], [1,2,3,4]) 147 self.assertEqual(len(s), 1) 148 self.assertRaises(ValueError, len, s) 149 try: 150 s['key1'] 151 except ValueError: 152 pass 153 else: 154 self.fail('Closed shelf should not find a key') 155 156 def test_default_protocol(self): 157 with shelve.Shelf({}) as s: 158 self.assertEqual(s._protocol, pickle.DEFAULT_PROTOCOL) 159 160 161class TestShelveBase: 162 type2test = shelve.Shelf 163 164 def _reference(self): 165 return {"key1":"value1", "key2":2, "key3":(1,2,3)} 166 167 168class TestShelveInMemBase(TestShelveBase): 169 def _empty_mapping(self): 170 return shelve.Shelf(byteskeydict(), **self._args) 171 172 173class TestShelveFileBase(TestShelveBase): 174 counter = 0 175 176 def _empty_mapping(self): 177 self.counter += 1 178 x = shelve.open(self.base_path + str(self.counter), **self._args) 179 self.addCleanup(x.close) 180 return x 181 182 def setUp(self): 183 dirname = os_helper.TESTFN 184 os.mkdir(dirname) 185 self.addCleanup(os_helper.rmtree, dirname) 186 self.base_path = os.path.join(dirname, "shelftemp.db") 187 self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod) 188 dbm._defaultmod = self.dbm_mod 189 190 191from test import mapping_tests 192 193for proto in range(pickle.HIGHEST_PROTOCOL + 1): 194 bases = (TestShelveInMemBase, mapping_tests.BasicTestMappingProtocol) 195 name = f'TestProto{proto}MemShelve' 196 globals()[name] = type(name, bases, 197 {'_args': {'protocol': proto}}) 198 bases = (TestShelveFileBase, mapping_tests.BasicTestMappingProtocol) 199 for dbm_mod in dbm_iterator(): 200 assert dbm_mod.__name__.startswith('dbm.') 201 suffix = dbm_mod.__name__[4:] 202 name = f'TestProto{proto}File_{suffix}Shelve' 203 globals()[name] = type(name, bases, 204 {'dbm_mod': dbm_mod, '_args': {'protocol': proto}}) 205 206 207if __name__ == "__main__": 208 unittest.main() 209