1import _dummy_thread as _thread 2import time 3import queue 4import random 5import unittest 6from test import support 7from unittest import mock 8 9DELAY = 0 10 11 12class LockTests(unittest.TestCase): 13 """Test lock objects.""" 14 15 def setUp(self): 16 # Create a lock 17 self.lock = _thread.allocate_lock() 18 19 def test_initlock(self): 20 #Make sure locks start locked 21 self.assertFalse(self.lock.locked(), 22 "Lock object is not initialized unlocked.") 23 24 def test_release(self): 25 # Test self.lock.release() 26 self.lock.acquire() 27 self.lock.release() 28 self.assertFalse(self.lock.locked(), 29 "Lock object did not release properly.") 30 31 def test_LockType_context_manager(self): 32 with _thread.LockType(): 33 pass 34 self.assertFalse(self.lock.locked(), 35 "Acquired Lock was not released") 36 37 def test_improper_release(self): 38 #Make sure release of an unlocked thread raises RuntimeError 39 self.assertRaises(RuntimeError, self.lock.release) 40 41 def test_cond_acquire_success(self): 42 #Make sure the conditional acquiring of the lock works. 43 self.assertTrue(self.lock.acquire(0), 44 "Conditional acquiring of the lock failed.") 45 46 def test_cond_acquire_fail(self): 47 #Test acquiring locked lock returns False 48 self.lock.acquire(0) 49 self.assertFalse(self.lock.acquire(0), 50 "Conditional acquiring of a locked lock incorrectly " 51 "succeeded.") 52 53 def test_uncond_acquire_success(self): 54 #Make sure unconditional acquiring of a lock works. 55 self.lock.acquire() 56 self.assertTrue(self.lock.locked(), 57 "Uncondional locking failed.") 58 59 def test_uncond_acquire_return_val(self): 60 #Make sure that an unconditional locking returns True. 61 self.assertIs(self.lock.acquire(1), True, 62 "Unconditional locking did not return True.") 63 self.assertIs(self.lock.acquire(), True) 64 65 def test_uncond_acquire_blocking(self): 66 #Make sure that unconditional acquiring of a locked lock blocks. 67 def delay_unlock(to_unlock, delay): 68 """Hold on to lock for a set amount of time before unlocking.""" 69 time.sleep(delay) 70 to_unlock.release() 71 72 self.lock.acquire() 73 start_time = int(time.monotonic()) 74 _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) 75 if support.verbose: 76 print() 77 print("*** Waiting for thread to release the lock "\ 78 "(approx. %s sec.) ***" % DELAY) 79 self.lock.acquire() 80 end_time = int(time.monotonic()) 81 if support.verbose: 82 print("done") 83 self.assertGreaterEqual(end_time - start_time, DELAY, 84 "Blocking by unconditional acquiring failed.") 85 86 @mock.patch('time.sleep') 87 def test_acquire_timeout(self, mock_sleep): 88 """Test invoking acquire() with a positive timeout when the lock is 89 already acquired. Ensure that time.sleep() is invoked with the given 90 timeout and that False is returned.""" 91 92 self.lock.acquire() 93 retval = self.lock.acquire(waitflag=0, timeout=1) 94 self.assertTrue(mock_sleep.called) 95 mock_sleep.assert_called_once_with(1) 96 self.assertEqual(retval, False) 97 98 def test_lock_representation(self): 99 self.lock.acquire() 100 self.assertIn("locked", repr(self.lock)) 101 self.lock.release() 102 self.assertIn("unlocked", repr(self.lock)) 103 104 105class MiscTests(unittest.TestCase): 106 """Miscellaneous tests.""" 107 108 def test_exit(self): 109 self.assertRaises(SystemExit, _thread.exit) 110 111 def test_ident(self): 112 self.assertIsInstance(_thread.get_ident(), int, 113 "_thread.get_ident() returned a non-integer") 114 self.assertGreater(_thread.get_ident(), 0) 115 116 def test_LockType(self): 117 self.assertIsInstance(_thread.allocate_lock(), _thread.LockType, 118 "_thread.LockType is not an instance of what " 119 "is returned by _thread.allocate_lock()") 120 121 def test_set_sentinel(self): 122 self.assertIsInstance(_thread._set_sentinel(), _thread.LockType, 123 "_thread._set_sentinel() did not return a " 124 "LockType instance.") 125 126 def test_interrupt_main(self): 127 #Calling start_new_thread with a function that executes interrupt_main 128 # should raise KeyboardInterrupt upon completion. 129 def call_interrupt(): 130 _thread.interrupt_main() 131 132 self.assertRaises(KeyboardInterrupt, 133 _thread.start_new_thread, 134 call_interrupt, 135 tuple()) 136 137 def test_interrupt_in_main(self): 138 self.assertRaises(KeyboardInterrupt, _thread.interrupt_main) 139 140 def test_stack_size_None(self): 141 retval = _thread.stack_size(None) 142 self.assertEqual(retval, 0) 143 144 def test_stack_size_not_None(self): 145 with self.assertRaises(_thread.error) as cm: 146 _thread.stack_size("") 147 self.assertEqual(cm.exception.args[0], 148 "setting thread stack size not supported") 149 150 151class ThreadTests(unittest.TestCase): 152 """Test thread creation.""" 153 154 def test_arg_passing(self): 155 #Make sure that parameter passing works. 156 def arg_tester(queue, arg1=False, arg2=False): 157 """Use to test _thread.start_new_thread() passes args properly.""" 158 queue.put((arg1, arg2)) 159 160 testing_queue = queue.Queue(1) 161 _thread.start_new_thread(arg_tester, (testing_queue, True, True)) 162 result = testing_queue.get() 163 self.assertTrue(result[0] and result[1], 164 "Argument passing for thread creation " 165 "using tuple failed") 166 167 _thread.start_new_thread( 168 arg_tester, 169 tuple(), 170 {'queue':testing_queue, 'arg1':True, 'arg2':True}) 171 172 result = testing_queue.get() 173 self.assertTrue(result[0] and result[1], 174 "Argument passing for thread creation " 175 "using kwargs failed") 176 177 _thread.start_new_thread( 178 arg_tester, 179 (testing_queue, True), 180 {'arg2':True}) 181 182 result = testing_queue.get() 183 self.assertTrue(result[0] and result[1], 184 "Argument passing for thread creation using both tuple" 185 " and kwargs failed") 186 187 def test_multi_thread_creation(self): 188 def queue_mark(queue, delay): 189 time.sleep(delay) 190 queue.put(_thread.get_ident()) 191 192 thread_count = 5 193 testing_queue = queue.Queue(thread_count) 194 195 if support.verbose: 196 print() 197 print("*** Testing multiple thread creation " 198 "(will take approx. %s to %s sec.) ***" % ( 199 DELAY, thread_count)) 200 201 for count in range(thread_count): 202 if DELAY: 203 local_delay = round(random.random(), 1) 204 else: 205 local_delay = 0 206 _thread.start_new_thread(queue_mark, 207 (testing_queue, local_delay)) 208 time.sleep(DELAY) 209 if support.verbose: 210 print('done') 211 self.assertEqual(testing_queue.qsize(), thread_count, 212 "Not all %s threads executed properly " 213 "after %s sec." % (thread_count, DELAY)) 214 215 def test_args_not_tuple(self): 216 """ 217 Test invoking start_new_thread() with a non-tuple value for "args". 218 Expect TypeError with a meaningful error message to be raised. 219 """ 220 with self.assertRaises(TypeError) as cm: 221 _thread.start_new_thread(mock.Mock(), []) 222 self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") 223 224 def test_kwargs_not_dict(self): 225 """ 226 Test invoking start_new_thread() with a non-dict value for "kwargs". 227 Expect TypeError with a meaningful error message to be raised. 228 """ 229 with self.assertRaises(TypeError) as cm: 230 _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[]) 231 self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") 232 233 def test_SystemExit(self): 234 """ 235 Test invoking start_new_thread() with a function that raises 236 SystemExit. 237 The exception should be discarded. 238 """ 239 func = mock.Mock(side_effect=SystemExit()) 240 try: 241 _thread.start_new_thread(func, tuple()) 242 except SystemExit: 243 self.fail("start_new_thread raised SystemExit.") 244 245 @mock.patch('traceback.print_exc') 246 def test_RaiseException(self, mock_print_exc): 247 """ 248 Test invoking start_new_thread() with a function that raises exception. 249 250 The exception should be discarded and the traceback should be printed 251 via traceback.print_exc() 252 """ 253 func = mock.Mock(side_effect=Exception) 254 _thread.start_new_thread(func, tuple()) 255 self.assertTrue(mock_print_exc.called) 256