• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from . import util as test_util
2
3init = test_util.import_importlib('importlib')
4
5import sys
6import threading
7import unittest
8import weakref
9
10from test import support
11from test.support import threading_helper
12from test import lock_tests
13
14
15class ModuleLockAsRLockTests:
16    locktype = classmethod(lambda cls: cls.LockType("some_lock"))
17
18    # _is_owned() unsupported
19    test__is_owned = None
20    # acquire(blocking=False) unsupported
21    test_try_acquire = None
22    test_try_acquire_contended = None
23    # `with` unsupported
24    test_with = None
25    # acquire(timeout=...) unsupported
26    test_timeout = None
27    # _release_save() unsupported
28    test_release_save_unacquired = None
29    # lock status in repr unsupported
30    test_repr = None
31    test_locked_repr = None
32
33LOCK_TYPES = {kind: splitinit._bootstrap._ModuleLock
34              for kind, splitinit in init.items()}
35
36(Frozen_ModuleLockAsRLockTests,
37 Source_ModuleLockAsRLockTests
38 ) = test_util.test_both(ModuleLockAsRLockTests, lock_tests.RLockTests,
39                         LockType=LOCK_TYPES)
40
41
42class DeadlockAvoidanceTests:
43
44    def setUp(self):
45        try:
46            self.old_switchinterval = sys.getswitchinterval()
47            support.setswitchinterval(0.000001)
48        except AttributeError:
49            self.old_switchinterval = None
50
51    def tearDown(self):
52        if self.old_switchinterval is not None:
53            sys.setswitchinterval(self.old_switchinterval)
54
55    def run_deadlock_avoidance_test(self, create_deadlock):
56        NLOCKS = 10
57        locks = [self.LockType(str(i)) for i in range(NLOCKS)]
58        pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
59        if create_deadlock:
60            NTHREADS = NLOCKS
61        else:
62            NTHREADS = NLOCKS - 1
63        barrier = threading.Barrier(NTHREADS)
64        results = []
65
66        def _acquire(lock):
67            """Try to acquire the lock. Return True on success,
68            False on deadlock."""
69            try:
70                lock.acquire()
71            except self.DeadlockError:
72                return False
73            else:
74                return True
75
76        def f():
77            a, b = pairs.pop()
78            ra = _acquire(a)
79            barrier.wait()
80            rb = _acquire(b)
81            results.append((ra, rb))
82            if rb:
83                b.release()
84            if ra:
85                a.release()
86        lock_tests.Bunch(f, NTHREADS).wait_for_finished()
87        self.assertEqual(len(results), NTHREADS)
88        return results
89
90    def test_deadlock(self):
91        results = self.run_deadlock_avoidance_test(True)
92        # At least one of the threads detected a potential deadlock on its
93        # second acquire() call.  It may be several of them, because the
94        # deadlock avoidance mechanism is conservative.
95        nb_deadlocks = results.count((True, False))
96        self.assertGreaterEqual(nb_deadlocks, 1)
97        self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
98
99    def test_no_deadlock(self):
100        results = self.run_deadlock_avoidance_test(False)
101        self.assertEqual(results.count((True, False)), 0)
102        self.assertEqual(results.count((True, True)), len(results))
103
104
105DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
106                   for kind, splitinit in init.items()}
107
108(Frozen_DeadlockAvoidanceTests,
109 Source_DeadlockAvoidanceTests
110 ) = test_util.test_both(DeadlockAvoidanceTests,
111                         LockType=LOCK_TYPES,
112                         DeadlockError=DEADLOCK_ERRORS)
113
114
115class LifetimeTests:
116
117    @property
118    def bootstrap(self):
119        return self.init._bootstrap
120
121    def test_lock_lifetime(self):
122        name = "xyzzy"
123        self.assertNotIn(name, self.bootstrap._module_locks)
124        lock = self.bootstrap._get_module_lock(name)
125        self.assertIn(name, self.bootstrap._module_locks)
126        wr = weakref.ref(lock)
127        del lock
128        support.gc_collect()
129        self.assertNotIn(name, self.bootstrap._module_locks)
130        self.assertIsNone(wr())
131
132    def test_all_locks(self):
133        support.gc_collect()
134        self.assertEqual(0, len(self.bootstrap._module_locks),
135                         self.bootstrap._module_locks)
136
137
138(Frozen_LifetimeTests,
139 Source_LifetimeTests
140 ) = test_util.test_both(LifetimeTests, init=init)
141
142
143def setUpModule():
144    thread_info = threading_helper.threading_setup()
145    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
146
147
148if __name__ == '__main__':
149    unittets.main()
150