• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import _dummy_thread as _thread
2import time
3import queue
4import random
5import unittest
6from test import support
7from unittest import mock
8
9DELAY = 0
10
11
12class LockTests(unittest.TestCase):
13    """Test lock objects."""
14
15    def setUp(self):
16        # Create a lock
17        self.lock = _thread.allocate_lock()
18
19    def test_initlock(self):
20        #Make sure locks start locked
21        self.assertFalse(self.lock.locked(),
22                        "Lock object is not initialized unlocked.")
23
24    def test_release(self):
25        # Test self.lock.release()
26        self.lock.acquire()
27        self.lock.release()
28        self.assertFalse(self.lock.locked(),
29                        "Lock object did not release properly.")
30
31    def test_LockType_context_manager(self):
32        with _thread.LockType():
33            pass
34        self.assertFalse(self.lock.locked(),
35                         "Acquired Lock was not released")
36
37    def test_improper_release(self):
38        #Make sure release of an unlocked thread raises RuntimeError
39        self.assertRaises(RuntimeError, self.lock.release)
40
41    def test_cond_acquire_success(self):
42        #Make sure the conditional acquiring of the lock works.
43        self.assertTrue(self.lock.acquire(0),
44                        "Conditional acquiring of the lock failed.")
45
46    def test_cond_acquire_fail(self):
47        #Test acquiring locked lock returns False
48        self.lock.acquire(0)
49        self.assertFalse(self.lock.acquire(0),
50                        "Conditional acquiring of a locked lock incorrectly "
51                         "succeeded.")
52
53    def test_uncond_acquire_success(self):
54        #Make sure unconditional acquiring of a lock works.
55        self.lock.acquire()
56        self.assertTrue(self.lock.locked(),
57                        "Uncondional locking failed.")
58
59    def test_uncond_acquire_return_val(self):
60        #Make sure that an unconditional locking returns True.
61        self.assertIs(self.lock.acquire(1), True,
62                        "Unconditional locking did not return True.")
63        self.assertIs(self.lock.acquire(), True)
64
65    def test_uncond_acquire_blocking(self):
66        #Make sure that unconditional acquiring of a locked lock blocks.
67        def delay_unlock(to_unlock, delay):
68            """Hold on to lock for a set amount of time before unlocking."""
69            time.sleep(delay)
70            to_unlock.release()
71
72        self.lock.acquire()
73        start_time = int(time.monotonic())
74        _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
75        if support.verbose:
76            print()
77            print("*** Waiting for thread to release the lock "\
78            "(approx. %s sec.) ***" % DELAY)
79        self.lock.acquire()
80        end_time = int(time.monotonic())
81        if support.verbose:
82            print("done")
83        self.assertGreaterEqual(end_time - start_time, DELAY,
84                        "Blocking by unconditional acquiring failed.")
85
86    @mock.patch('time.sleep')
87    def test_acquire_timeout(self, mock_sleep):
88        """Test invoking acquire() with a positive timeout when the lock is
89        already acquired. Ensure that time.sleep() is invoked with the given
90        timeout and that False is returned."""
91
92        self.lock.acquire()
93        retval = self.lock.acquire(waitflag=0, timeout=1)
94        self.assertTrue(mock_sleep.called)
95        mock_sleep.assert_called_once_with(1)
96        self.assertEqual(retval, False)
97
98    def test_lock_representation(self):
99        self.lock.acquire()
100        self.assertIn("locked", repr(self.lock))
101        self.lock.release()
102        self.assertIn("unlocked", repr(self.lock))
103
104
105class RLockTests(unittest.TestCase):
106    """Test dummy RLock objects."""
107
108    def setUp(self):
109        self.rlock = _thread.RLock()
110
111    def test_multiple_acquire(self):
112        self.assertIn("unlocked", repr(self.rlock))
113        self.rlock.acquire()
114        self.rlock.acquire()
115        self.assertIn("locked", repr(self.rlock))
116        self.rlock.release()
117        self.assertIn("locked", repr(self.rlock))
118        self.rlock.release()
119        self.assertIn("unlocked", repr(self.rlock))
120        self.assertRaises(RuntimeError, self.rlock.release)
121
122
123class MiscTests(unittest.TestCase):
124    """Miscellaneous tests."""
125
126    def test_exit(self):
127        self.assertRaises(SystemExit, _thread.exit)
128
129    def test_ident(self):
130        self.assertIsInstance(_thread.get_ident(), int,
131                              "_thread.get_ident() returned a non-integer")
132        self.assertGreater(_thread.get_ident(), 0)
133
134    def test_LockType(self):
135        self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
136                              "_thread.LockType is not an instance of what "
137                              "is returned by _thread.allocate_lock()")
138
139    def test_set_sentinel(self):
140        self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
141                              "_thread._set_sentinel() did not return a "
142                              "LockType instance.")
143
144    def test_interrupt_main(self):
145        #Calling start_new_thread with a function that executes interrupt_main
146        # should raise KeyboardInterrupt upon completion.
147        def call_interrupt():
148            _thread.interrupt_main()
149
150        self.assertRaises(KeyboardInterrupt,
151                          _thread.start_new_thread,
152                          call_interrupt,
153                          tuple())
154
155    def test_interrupt_in_main(self):
156        self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
157
158    def test_stack_size_None(self):
159        retval = _thread.stack_size(None)
160        self.assertEqual(retval, 0)
161
162    def test_stack_size_not_None(self):
163        with self.assertRaises(_thread.error) as cm:
164            _thread.stack_size("")
165        self.assertEqual(cm.exception.args[0],
166                         "setting thread stack size not supported")
167
168
169class ThreadTests(unittest.TestCase):
170    """Test thread creation."""
171
172    def test_arg_passing(self):
173        #Make sure that parameter passing works.
174        def arg_tester(queue, arg1=False, arg2=False):
175            """Use to test _thread.start_new_thread() passes args properly."""
176            queue.put((arg1, arg2))
177
178        testing_queue = queue.Queue(1)
179        _thread.start_new_thread(arg_tester, (testing_queue, True, True))
180        result = testing_queue.get()
181        self.assertTrue(result[0] and result[1],
182                        "Argument passing for thread creation "
183                        "using tuple failed")
184
185        _thread.start_new_thread(
186                arg_tester,
187                tuple(),
188                {'queue':testing_queue, 'arg1':True, 'arg2':True})
189
190        result = testing_queue.get()
191        self.assertTrue(result[0] and result[1],
192                        "Argument passing for thread creation "
193                        "using kwargs failed")
194
195        _thread.start_new_thread(
196                arg_tester,
197                (testing_queue, True),
198                {'arg2':True})
199
200        result = testing_queue.get()
201        self.assertTrue(result[0] and result[1],
202                        "Argument passing for thread creation using both tuple"
203                        " and kwargs failed")
204
205    def test_multi_thread_creation(self):
206        def queue_mark(queue, delay):
207            time.sleep(delay)
208            queue.put(_thread.get_ident())
209
210        thread_count = 5
211        testing_queue = queue.Queue(thread_count)
212
213        if support.verbose:
214            print()
215            print("*** Testing multiple thread creation "
216                  "(will take approx. %s to %s sec.) ***" % (
217                    DELAY, thread_count))
218
219        for count in range(thread_count):
220            if DELAY:
221                local_delay = round(random.random(), 1)
222            else:
223                local_delay = 0
224            _thread.start_new_thread(queue_mark,
225                                     (testing_queue, local_delay))
226        time.sleep(DELAY)
227        if support.verbose:
228            print('done')
229        self.assertEqual(testing_queue.qsize(), thread_count,
230                         "Not all %s threads executed properly "
231                         "after %s sec." % (thread_count, DELAY))
232
233    def test_args_not_tuple(self):
234        """
235        Test invoking start_new_thread() with a non-tuple value for "args".
236        Expect TypeError with a meaningful error message to be raised.
237        """
238        with self.assertRaises(TypeError) as cm:
239            _thread.start_new_thread(mock.Mock(), [])
240        self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
241
242    def test_kwargs_not_dict(self):
243        """
244        Test invoking start_new_thread() with a non-dict value for "kwargs".
245        Expect TypeError with a meaningful error message to be raised.
246        """
247        with self.assertRaises(TypeError) as cm:
248            _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
249        self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
250
251    def test_SystemExit(self):
252        """
253        Test invoking start_new_thread() with a function that raises
254        SystemExit.
255        The exception should be discarded.
256        """
257        func = mock.Mock(side_effect=SystemExit())
258        try:
259            _thread.start_new_thread(func, tuple())
260        except SystemExit:
261            self.fail("start_new_thread raised SystemExit.")
262
263    @mock.patch('traceback.print_exc')
264    def test_RaiseException(self, mock_print_exc):
265        """
266        Test invoking start_new_thread() with a function that raises exception.
267
268        The exception should be discarded and the traceback should be printed
269        via traceback.print_exc()
270        """
271        func = mock.Mock(side_effect=Exception)
272        _thread.start_new_thread(func, tuple())
273        self.assertTrue(mock_print_exc.called)
274
275if __name__ == '__main__':
276    unittest.main()
277