1import _dummy_thread as _thread 2import time 3import queue 4import random 5import unittest 6from test import support 7from unittest import mock 8 9DELAY = 0 10 11 12class LockTests(unittest.TestCase): 13 """Test lock objects.""" 14 15 def setUp(self): 16 # Create a lock 17 self.lock = _thread.allocate_lock() 18 19 def test_initlock(self): 20 #Make sure locks start locked 21 self.assertFalse(self.lock.locked(), 22 "Lock object is not initialized unlocked.") 23 24 def test_release(self): 25 # Test self.lock.release() 26 self.lock.acquire() 27 self.lock.release() 28 self.assertFalse(self.lock.locked(), 29 "Lock object did not release properly.") 30 31 def test_LockType_context_manager(self): 32 with _thread.LockType(): 33 pass 34 self.assertFalse(self.lock.locked(), 35 "Acquired Lock was not released") 36 37 def test_improper_release(self): 38 #Make sure release of an unlocked thread raises RuntimeError 39 self.assertRaises(RuntimeError, self.lock.release) 40 41 def test_cond_acquire_success(self): 42 #Make sure the conditional acquiring of the lock works. 43 self.assertTrue(self.lock.acquire(0), 44 "Conditional acquiring of the lock failed.") 45 46 def test_cond_acquire_fail(self): 47 #Test acquiring locked lock returns False 48 self.lock.acquire(0) 49 self.assertFalse(self.lock.acquire(0), 50 "Conditional acquiring of a locked lock incorrectly " 51 "succeeded.") 52 53 def test_uncond_acquire_success(self): 54 #Make sure unconditional acquiring of a lock works. 55 self.lock.acquire() 56 self.assertTrue(self.lock.locked(), 57 "Uncondional locking failed.") 58 59 def test_uncond_acquire_return_val(self): 60 #Make sure that an unconditional locking returns True. 61 self.assertIs(self.lock.acquire(1), True, 62 "Unconditional locking did not return True.") 63 self.assertIs(self.lock.acquire(), True) 64 65 def test_uncond_acquire_blocking(self): 66 #Make sure that unconditional acquiring of a locked lock blocks. 67 def delay_unlock(to_unlock, delay): 68 """Hold on to lock for a set amount of time before unlocking.""" 69 time.sleep(delay) 70 to_unlock.release() 71 72 self.lock.acquire() 73 start_time = int(time.monotonic()) 74 _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) 75 if support.verbose: 76 print() 77 print("*** Waiting for thread to release the lock "\ 78 "(approx. %s sec.) ***" % DELAY) 79 self.lock.acquire() 80 end_time = int(time.monotonic()) 81 if support.verbose: 82 print("done") 83 self.assertGreaterEqual(end_time - start_time, DELAY, 84 "Blocking by unconditional acquiring failed.") 85 86 @mock.patch('time.sleep') 87 def test_acquire_timeout(self, mock_sleep): 88 """Test invoking acquire() with a positive timeout when the lock is 89 already acquired. Ensure that time.sleep() is invoked with the given 90 timeout and that False is returned.""" 91 92 self.lock.acquire() 93 retval = self.lock.acquire(waitflag=0, timeout=1) 94 self.assertTrue(mock_sleep.called) 95 mock_sleep.assert_called_once_with(1) 96 self.assertEqual(retval, False) 97 98 def test_lock_representation(self): 99 self.lock.acquire() 100 self.assertIn("locked", repr(self.lock)) 101 self.lock.release() 102 self.assertIn("unlocked", repr(self.lock)) 103 104 105class RLockTests(unittest.TestCase): 106 """Test dummy RLock objects.""" 107 108 def setUp(self): 109 self.rlock = _thread.RLock() 110 111 def test_multiple_acquire(self): 112 self.assertIn("unlocked", repr(self.rlock)) 113 self.rlock.acquire() 114 self.rlock.acquire() 115 self.assertIn("locked", repr(self.rlock)) 116 self.rlock.release() 117 self.assertIn("locked", repr(self.rlock)) 118 self.rlock.release() 119 self.assertIn("unlocked", repr(self.rlock)) 120 self.assertRaises(RuntimeError, self.rlock.release) 121 122 123class MiscTests(unittest.TestCase): 124 """Miscellaneous tests.""" 125 126 def test_exit(self): 127 self.assertRaises(SystemExit, _thread.exit) 128 129 def test_ident(self): 130 self.assertIsInstance(_thread.get_ident(), int, 131 "_thread.get_ident() returned a non-integer") 132 self.assertGreater(_thread.get_ident(), 0) 133 134 def test_LockType(self): 135 self.assertIsInstance(_thread.allocate_lock(), _thread.LockType, 136 "_thread.LockType is not an instance of what " 137 "is returned by _thread.allocate_lock()") 138 139 def test_set_sentinel(self): 140 self.assertIsInstance(_thread._set_sentinel(), _thread.LockType, 141 "_thread._set_sentinel() did not return a " 142 "LockType instance.") 143 144 def test_interrupt_main(self): 145 #Calling start_new_thread with a function that executes interrupt_main 146 # should raise KeyboardInterrupt upon completion. 147 def call_interrupt(): 148 _thread.interrupt_main() 149 150 self.assertRaises(KeyboardInterrupt, 151 _thread.start_new_thread, 152 call_interrupt, 153 tuple()) 154 155 def test_interrupt_in_main(self): 156 self.assertRaises(KeyboardInterrupt, _thread.interrupt_main) 157 158 def test_stack_size_None(self): 159 retval = _thread.stack_size(None) 160 self.assertEqual(retval, 0) 161 162 def test_stack_size_not_None(self): 163 with self.assertRaises(_thread.error) as cm: 164 _thread.stack_size("") 165 self.assertEqual(cm.exception.args[0], 166 "setting thread stack size not supported") 167 168 169class ThreadTests(unittest.TestCase): 170 """Test thread creation.""" 171 172 def test_arg_passing(self): 173 #Make sure that parameter passing works. 174 def arg_tester(queue, arg1=False, arg2=False): 175 """Use to test _thread.start_new_thread() passes args properly.""" 176 queue.put((arg1, arg2)) 177 178 testing_queue = queue.Queue(1) 179 _thread.start_new_thread(arg_tester, (testing_queue, True, True)) 180 result = testing_queue.get() 181 self.assertTrue(result[0] and result[1], 182 "Argument passing for thread creation " 183 "using tuple failed") 184 185 _thread.start_new_thread( 186 arg_tester, 187 tuple(), 188 {'queue':testing_queue, 'arg1':True, 'arg2':True}) 189 190 result = testing_queue.get() 191 self.assertTrue(result[0] and result[1], 192 "Argument passing for thread creation " 193 "using kwargs failed") 194 195 _thread.start_new_thread( 196 arg_tester, 197 (testing_queue, True), 198 {'arg2':True}) 199 200 result = testing_queue.get() 201 self.assertTrue(result[0] and result[1], 202 "Argument passing for thread creation using both tuple" 203 " and kwargs failed") 204 205 def test_multi_thread_creation(self): 206 def queue_mark(queue, delay): 207 time.sleep(delay) 208 queue.put(_thread.get_ident()) 209 210 thread_count = 5 211 testing_queue = queue.Queue(thread_count) 212 213 if support.verbose: 214 print() 215 print("*** Testing multiple thread creation " 216 "(will take approx. %s to %s sec.) ***" % ( 217 DELAY, thread_count)) 218 219 for count in range(thread_count): 220 if DELAY: 221 local_delay = round(random.random(), 1) 222 else: 223 local_delay = 0 224 _thread.start_new_thread(queue_mark, 225 (testing_queue, local_delay)) 226 time.sleep(DELAY) 227 if support.verbose: 228 print('done') 229 self.assertEqual(testing_queue.qsize(), thread_count, 230 "Not all %s threads executed properly " 231 "after %s sec." % (thread_count, DELAY)) 232 233 def test_args_not_tuple(self): 234 """ 235 Test invoking start_new_thread() with a non-tuple value for "args". 236 Expect TypeError with a meaningful error message to be raised. 237 """ 238 with self.assertRaises(TypeError) as cm: 239 _thread.start_new_thread(mock.Mock(), []) 240 self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") 241 242 def test_kwargs_not_dict(self): 243 """ 244 Test invoking start_new_thread() with a non-dict value for "kwargs". 245 Expect TypeError with a meaningful error message to be raised. 246 """ 247 with self.assertRaises(TypeError) as cm: 248 _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[]) 249 self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") 250 251 def test_SystemExit(self): 252 """ 253 Test invoking start_new_thread() with a function that raises 254 SystemExit. 255 The exception should be discarded. 256 """ 257 func = mock.Mock(side_effect=SystemExit()) 258 try: 259 _thread.start_new_thread(func, tuple()) 260 except SystemExit: 261 self.fail("start_new_thread raised SystemExit.") 262 263 @mock.patch('traceback.print_exc') 264 def test_RaiseException(self, mock_print_exc): 265 """ 266 Test invoking start_new_thread() with a function that raises exception. 267 268 The exception should be discarded and the traceback should be printed 269 via traceback.print_exc() 270 """ 271 func = mock.Mock(side_effect=Exception) 272 _thread.start_new_thread(func, tuple()) 273 self.assertTrue(mock_print_exc.called) 274 275if __name__ == '__main__': 276 unittest.main() 277