• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2TestCases for testing the locking sub-system.
3"""
4
5import time
6
7import unittest
8from test_all import db, test_support, verbose, have_threads, \
9        get_new_environment_path, get_new_database_path
10
11if have_threads :
12    from threading import Thread
13    import sys
14    if sys.version_info[0] < 3 :
15        from threading import currentThread
16    else :
17        from threading import current_thread as currentThread
18
19#----------------------------------------------------------------------
20
21class LockingTestCase(unittest.TestCase):
22    def setUp(self):
23        self.homeDir = get_new_environment_path()
24        self.env = db.DBEnv()
25        self.env.open(self.homeDir, db.DB_THREAD | db.DB_INIT_MPOOL |
26                                    db.DB_INIT_LOCK | db.DB_CREATE)
27
28
29    def tearDown(self):
30        self.env.close()
31        test_support.rmtree(self.homeDir)
32
33
34    def test01_simple(self):
35        if verbose:
36            print '\n', '-=' * 30
37            print "Running %s.test01_simple..." % self.__class__.__name__
38
39        anID = self.env.lock_id()
40        if verbose:
41            print "locker ID: %s" % anID
42        lock = self.env.lock_get(anID, "some locked thing", db.DB_LOCK_WRITE)
43        if verbose:
44            print "Acquired lock: %s" % lock
45        self.env.lock_put(lock)
46        if verbose:
47            print "Released lock: %s" % lock
48        self.env.lock_id_free(anID)
49
50
51    def test02_threaded(self):
52        if verbose:
53            print '\n', '-=' * 30
54            print "Running %s.test02_threaded..." % self.__class__.__name__
55
56        threads = []
57        threads.append(Thread(target = self.theThread,
58                              args=(db.DB_LOCK_WRITE,)))
59        threads.append(Thread(target = self.theThread,
60                              args=(db.DB_LOCK_READ,)))
61        threads.append(Thread(target = self.theThread,
62                              args=(db.DB_LOCK_READ,)))
63        threads.append(Thread(target = self.theThread,
64                              args=(db.DB_LOCK_WRITE,)))
65        threads.append(Thread(target = self.theThread,
66                              args=(db.DB_LOCK_READ,)))
67        threads.append(Thread(target = self.theThread,
68                              args=(db.DB_LOCK_READ,)))
69        threads.append(Thread(target = self.theThread,
70                              args=(db.DB_LOCK_WRITE,)))
71        threads.append(Thread(target = self.theThread,
72                              args=(db.DB_LOCK_WRITE,)))
73        threads.append(Thread(target = self.theThread,
74                              args=(db.DB_LOCK_WRITE,)))
75
76        for t in threads:
77            import sys
78            if sys.version_info[0] < 3 :
79                t.setDaemon(True)
80            else :
81                t.daemon = True
82            t.start()
83        for t in threads:
84            t.join()
85
86        def test03_lock_timeout(self):
87            self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
88            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)
89            self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
90            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)
91            self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
92            self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)
93            self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
94            self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)
95
96    def test04_lock_timeout2(self):
97        self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)
98        self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)
99        self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)
100        self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)
101
102        def deadlock_detection() :
103            while not deadlock_detection.end :
104                deadlock_detection.count = \
105                    self.env.lock_detect(db.DB_LOCK_EXPIRE)
106                if deadlock_detection.count :
107                    while not deadlock_detection.end :
108                        pass
109                    break
110                time.sleep(0.01)
111
112        deadlock_detection.end=False
113        deadlock_detection.count=0
114        t=Thread(target=deadlock_detection)
115        import sys
116        if sys.version_info[0] < 3 :
117            t.setDaemon(True)
118        else :
119            t.daemon = True
120        t.start()
121        self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)
122        anID = self.env.lock_id()
123        anID2 = self.env.lock_id()
124        self.assertNotEqual(anID, anID2)
125        lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)
126        start_time=time.time()
127        self.assertRaises(db.DBLockNotGrantedError,
128                self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)
129        end_time=time.time()
130        deadlock_detection.end=True
131        # Floating point rounding
132        self.assertTrue((end_time-start_time) >= 0.0999)
133        self.env.lock_put(lock)
134        t.join()
135
136        self.env.lock_id_free(anID)
137        self.env.lock_id_free(anID2)
138
139        if db.version() >= (4,6):
140            self.assertTrue(deadlock_detection.count>0)
141
142    def theThread(self, lockType):
143        import sys
144        if sys.version_info[0] < 3 :
145            name = currentThread().getName()
146        else :
147            name = currentThread().name
148
149        if lockType ==  db.DB_LOCK_WRITE:
150            lt = "write"
151        else:
152            lt = "read"
153
154        anID = self.env.lock_id()
155        if verbose:
156            print "%s: locker ID: %s" % (name, anID)
157
158        for i in xrange(1000) :
159            lock = self.env.lock_get(anID, "some locked thing", lockType)
160            if verbose:
161                print "%s: Acquired %s lock: %s" % (name, lt, lock)
162
163            self.env.lock_put(lock)
164            if verbose:
165                print "%s: Released %s lock: %s" % (name, lt, lock)
166
167        self.env.lock_id_free(anID)
168
169
170#----------------------------------------------------------------------
171
172def test_suite():
173    suite = unittest.TestSuite()
174
175    if have_threads:
176        suite.addTest(unittest.makeSuite(LockingTestCase))
177    else:
178        suite.addTest(unittest.makeSuite(LockingTestCase, 'test01'))
179
180    return suite
181
182
183if __name__ == '__main__':
184    unittest.main(defaultTest='test_suite')
185