• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2TestCases for testing the locking sub-system.
3"""
4
5import sys
6import time
7
8import unittest
9from test_all import db, test_support, verbose, have_threads, \
10        get_new_environment_path, get_new_database_path
11
12if have_threads :
13    from threading import Thread
14    if sys.version_info[0] < 3 :
15        from threading import currentThread
16    else :
17        from threading import current_thread as currentThread
18
19#----------------------------------------------------------------------
20
21class LockingTestCase(unittest.TestCase):
22    def setUp(self):
23        self.homeDir = get_new_environment_path()
24        self.env = db.DBEnv()
25        self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
26                                    db.DB_INIT_LOCK | db.DB_CREATE)
27
28
29    def tearDown(self):
30        self.env.close()
31        test_support.rmtree(self.homeDir)
32
33
34    def test01_simple(self):
35        if verbose:
36            print '\n', '-=' * 30
37            print "Running %s.test01_simple..." % self.__class__.__name__
38
39        anID = self.env.lock_id()
40        if verbose:
41            print "locker ID: %s" % anID
42        lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
43        if verbose:
44            print "Acquired lock: %s" % lock
45        self.env.lock_put(lock)
46        if verbose:
47            print "Released lock: %s" % lock
48        self.env.lock_id_free(anID)
49
50
51    def test02_threaded(self):
52        if verbose:
53            print '\n', '-=' * 30
54            print "Running %s.test02_threaded..." % self.__class__.__name__
55
56        threads = []
57        threads.append(Thread(target = self.theThread,
58                              args=(db.DB_LOCK_WRITE,)))
59        threads.append(Thread(target = self.theThread,
60                              args=(db.DB_LOCK_READ,)))
61        threads.append(Thread(target = self.theThread,
62                              args=(db.DB_LOCK_READ,)))
63        threads.append(Thread(target = self.theThread,
64                              args=(db.DB_LOCK_WRITE,)))
65        threads.append(Thread(target = self.theThread,
66                              args=(db.DB_LOCK_READ,)))
67        threads.append(Thread(target = self.theThread,
68                              args=(db.DB_LOCK_READ,)))
69        threads.append(Thread(target = self.theThread,
70                              args=(db.DB_LOCK_WRITE,)))
71        threads.append(Thread(target = self.theThread,
72                              args=(db.DB_LOCK_WRITE,)))
73        threads.append(Thread(target = self.theThread,
74                              args=(db.DB_LOCK_WRITE,)))
75
76        for t in threads:
77            import sys
78            if sys.version_info[0] < 3 :
79                t.setDaemon(True)
80            else :
81                t.daemon = True
82            t.start()
83        for t in threads:
84            t.join()
85
86        def test03_lock_timeout(self):
87            self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
88            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)
89            self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
90            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)
91            self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
92            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)
93            self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
94            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)
95
96    def test04_lock_timeout2(self):
97        self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
98        self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
99        self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
100        self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
101
102        def deadlock_detection() :
103            while not deadlock_detection.end :
104                deadlock_detection.count = \
105                    self.env.lock_detect(db.DB_LOCK_EXPIRE)
106                if deadlock_detection.count :
107                    while not deadlock_detection.end :
108                        pass
109                    break
110                time.sleep(0.01)
111
112        deadlock_detection.end=False
113        deadlock_detection.count=0
114        t=Thread(target=deadlock_detection)
115        import sys
116        if sys.version_info[0] < 3 :
117            t.setDaemon(True)
118        else :
119            t.daemon = True
120        t.start()
121        self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
122        anID = self.env.lock_id()
123        anID2 = self.env.lock_id()
124        self.assertNotEqual(anID, anID2)
125        lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
126        start_time=time.time()
127        self.assertRaises(db.DBLockNotGrantedError,
128                self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
129        end_time=time.time()
130        deadlock_detection.end=True
131        # Floating point rounding
132        if sys.platform == 'win32':
133            # bpo-30850: On Windows, tolerate 50 ms whereas 100 ms is expected.
134            # The lock sometimes times out after only 58 ms. Windows clocks
135            # have a bad resolution and bad accuracy.
136            min_dt = 0.050
137        else:
138            min_dt = 0.0999
139        self.assertGreaterEqual(end_time-start_time, min_dt)
140        self.env.lock_put(lock)
141        t.join()
142
143        self.env.lock_id_free(anID)
144        self.env.lock_id_free(anID2)
145
146        if db.version() >= (4,6):
147            self.assertGreater(deadlock_detection.count, 0)
148
149    def theThread(self, lockType):
150        import sys
151        if sys.version_info[0] < 3 :
152            name = currentThread().getName()
153        else :
154            name = currentThread().name
155
156        if lockType ==  db.DB_LOCK_WRITE:
157            lt = "write"
158        else:
159            lt = "read"
160
161        anID = self.env.lock_id()
162        if verbose:
163            print "%s: locker ID: %s" % (name, anID)
164
165        for i in xrange(1000) :
166            lock = self.env.lock_get(anID, "some locked thing", lockType)
167            if verbose:
168                print "%s: Acquired %s lock: %s" % (name, lt, lock)
169
170            self.env.lock_put(lock)
171            if verbose:
172                print "%s: Released %s lock: %s" % (name, lt, lock)
173
174        self.env.lock_id_free(anID)
175
176
177#----------------------------------------------------------------------
178
179def test_suite():
180    suite = unittest.TestSuite()
181
182    if have_threads:
183        suite.addTest(unittest.makeSuite(LockingTestCase))
184    else:
185        suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
186
187    return suite
188
189
190if __name__ == '__main__':
191    unittest.main(defaultTest='test_suite')
192