1""" 2TestCases for testing the locking sub-system. 3""" 4 5import sys 6import time 7 8import unittest 9from test_all import db, test_support, verbose, have_threads, \ 10 get_new_environment_path, get_new_database_path 11 12if have_threads : 13 from threading import Thread 14 if sys.version_info[0] < 3 : 15 from threading import currentThread 16 else : 17 from threading import current_thread as currentThread 18 19#---------------------------------------------------------------------- 20 21class LockingTestCase(unittest.TestCase): 22 def setUp(self): 23 self.homeDir = get_new_environment_path() 24 self.env = db.DBEnv() 25 self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL | 26 db.DB_INIT_LOCK | db.DB_CREATE) 27 28 29 def tearDown(self): 30 self.env.close() 31 test_support.rmtree(self.homeDir) 32 33 34 def test01_simple(self): 35 if verbose: 36 print '\n', '-=' * 30 37 print "Running %s.test01_simple..." % self.__class__.__name__ 38 39 anID = self.env.lock_id() 40 if verbose: 41 print "locker ID: %s" % anID 42 lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE) 43 if verbose: 44 print "Acquired lock: %s" % lock 45 self.env.lock_put(lock) 46 if verbose: 47 print "Released lock: %s" % lock 48 self.env.lock_id_free(anID) 49 50 51 def test02_threaded(self): 52 if verbose: 53 print '\n', '-=' * 30 54 print "Running %s.test02_threaded..." % self.__class__.__name__ 55 56 threads = [] 57 threads.append(Thread(target = self.theThread, 58 args=(db.DB_LOCK_WRITE,))) 59 threads.append(Thread(target = self.theThread, 60 args=(db.DB_LOCK_READ,))) 61 threads.append(Thread(target = self.theThread, 62 args=(db.DB_LOCK_READ,))) 63 threads.append(Thread(target = self.theThread, 64 args=(db.DB_LOCK_WRITE,))) 65 threads.append(Thread(target = self.theThread, 66 args=(db.DB_LOCK_READ,))) 67 threads.append(Thread(target = self.theThread, 68 args=(db.DB_LOCK_READ,))) 69 threads.append(Thread(target = self.theThread, 70 args=(db.DB_LOCK_WRITE,))) 71 threads.append(Thread(target = self.theThread, 72 args=(db.DB_LOCK_WRITE,))) 73 threads.append(Thread(target = self.theThread, 74 args=(db.DB_LOCK_WRITE,))) 75 76 for t in threads: 77 import sys 78 if sys.version_info[0] < 3 : 79 t.setDaemon(True) 80 else : 81 t.daemon = True 82 t.start() 83 for t in threads: 84 t.join() 85 86 def test03_lock_timeout(self): 87 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT) 88 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0) 89 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT) 90 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0) 91 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT) 92 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456) 93 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT) 94 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123) 95 96 def test04_lock_timeout2(self): 97 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT) 98 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT) 99 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT) 100 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT) 101 102 def deadlock_detection() : 103 while not deadlock_detection.end : 104 deadlock_detection.count = \ 105 self.env.lock_detect(db.DB_LOCK_EXPIRE) 106 if deadlock_detection.count : 107 while not deadlock_detection.end : 108 pass 109 break 110 time.sleep(0.01) 111 112 deadlock_detection.end=False 113 deadlock_detection.count=0 114 t=Thread(target=deadlock_detection) 115 import sys 116 if sys.version_info[0] < 3 : 117 t.setDaemon(True) 118 else : 119 t.daemon = True 120 t.start() 121 self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT) 122 anID = self.env.lock_id() 123 anID2 = self.env.lock_id() 124 self.assertNotEqual(anID, anID2) 125 lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE) 126 start_time=time.time() 127 self.assertRaises(db.DBLockNotGrantedError, 128 self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ) 129 end_time=time.time() 130 deadlock_detection.end=True 131 # Floating point rounding 132 if sys.platform == 'win32': 133 # bpo-30850: On Windows, tolerate 50 ms whereas 100 ms is expected. 134 # The lock sometimes times out after only 58 ms. Windows clocks 135 # have a bad resolution and bad accuracy. 136 min_dt = 0.050 137 else: 138 min_dt = 0.0999 139 self.assertGreaterEqual(end_time-start_time, min_dt) 140 self.env.lock_put(lock) 141 t.join() 142 143 self.env.lock_id_free(anID) 144 self.env.lock_id_free(anID2) 145 146 if db.version() >= (4,6): 147 self.assertGreater(deadlock_detection.count, 0) 148 149 def theThread(self, lockType): 150 import sys 151 if sys.version_info[0] < 3 : 152 name = currentThread().getName() 153 else : 154 name = currentThread().name 155 156 if lockType == db.DB_LOCK_WRITE: 157 lt = "write" 158 else: 159 lt = "read" 160 161 anID = self.env.lock_id() 162 if verbose: 163 print "%s: locker ID: %s" % (name, anID) 164 165 for i in xrange(1000) : 166 lock = self.env.lock_get(anID, "some locked thing", lockType) 167 if verbose: 168 print "%s: Acquired %s lock: %s" % (name, lt, lock) 169 170 self.env.lock_put(lock) 171 if verbose: 172 print "%s: Released %s lock: %s" % (name, lt, lock) 173 174 self.env.lock_id_free(anID) 175 176 177#---------------------------------------------------------------------- 178 179def test_suite(): 180 suite = unittest.TestSuite() 181 182 if have_threads: 183 suite.addTest(unittest.makeSuite(LockingTestCase)) 184 else: 185 suite.addTest(unittest.makeSuite(LockingTestCase, 'test01')) 186 187 return suite 188 189 190if __name__ == '__main__': 191 unittest.main(defaultTest='test_suite') 192