1import _dummy_thread as _thread 2import time 3import queue 4import random 5import unittest 6from test import support 7from unittest import mock 8 9DELAY = 0 10 11 12class LockTests(unittest.TestCase): 13 """Test lock objects.""" 14 15 def setUp(self): 16 # Create a lock 17 self.lock = _thread.allocate_lock() 18 19 def test_initlock(self): 20 #Make sure locks start locked 21 self.assertFalse(self.lock.locked(), 22 "Lock object is not initialized unlocked.") 23 24 def test_release(self): 25 # Test self.lock.release() 26 self.lock.acquire() 27 self.lock.release() 28 self.assertFalse(self.lock.locked(), 29 "Lock object did not release properly.") 30 31 def test_LockType_context_manager(self): 32 with _thread.LockType(): 33 pass 34 self.assertFalse(self.lock.locked(), 35 "Acquired Lock was not released") 36 37 def test_improper_release(self): 38 #Make sure release of an unlocked thread raises RuntimeError 39 self.assertRaises(RuntimeError, self.lock.release) 40 41 def test_cond_acquire_success(self): 42 #Make sure the conditional acquiring of the lock works. 43 self.assertTrue(self.lock.acquire(0), 44 "Conditional acquiring of the lock failed.") 45 46 def test_cond_acquire_fail(self): 47 #Test acquiring locked lock returns False 48 self.lock.acquire(0) 49 self.assertFalse(self.lock.acquire(0), 50 "Conditional acquiring of a locked lock incorrectly " 51 "succeeded.") 52 53 def test_uncond_acquire_success(self): 54 #Make sure unconditional acquiring of a lock works. 55 self.lock.acquire() 56 self.assertTrue(self.lock.locked(), 57 "Uncondional locking failed.") 58 59 def test_uncond_acquire_return_val(self): 60 #Make sure that an unconditional locking returns True. 61 self.assertIs(self.lock.acquire(1), True, 62 "Unconditional locking did not return True.") 63 self.assertIs(self.lock.acquire(), True) 64 65 def test_uncond_acquire_blocking(self): 66 #Make sure that unconditional acquiring of a locked lock blocks. 67 def delay_unlock(to_unlock, delay): 68 """Hold on to lock for a set amount of time before unlocking.""" 69 time.sleep(delay) 70 to_unlock.release() 71 72 self.lock.acquire() 73 start_time = int(time.time()) 74 _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) 75 if support.verbose: 76 print() 77 print("*** Waiting for thread to release the lock "\ 78 "(approx. %s sec.) ***" % DELAY) 79 self.lock.acquire() 80 end_time = int(time.time()) 81 if support.verbose: 82 print("done") 83 self.assertGreaterEqual(end_time - start_time, DELAY, 84 "Blocking by unconditional acquiring failed.") 85 86 @mock.patch('time.sleep') 87 def test_acquire_timeout(self, mock_sleep): 88 """Test invoking acquire() with a positive timeout when the lock is 89 already acquired. Ensure that time.sleep() is invoked with the given 90 timeout and that False is returned.""" 91 92 self.lock.acquire() 93 retval = self.lock.acquire(waitflag=0, timeout=1) 94 self.assertTrue(mock_sleep.called) 95 mock_sleep.assert_called_once_with(1) 96 self.assertEqual(retval, False) 97 98 def test_lock_representation(self): 99 self.lock.acquire() 100 self.assertIn("locked", repr(self.lock)) 101 self.lock.release() 102 self.assertIn("unlocked", repr(self.lock)) 103 104 105class MiscTests(unittest.TestCase): 106 """Miscellaneous tests.""" 107 108 def test_exit(self): 109 self.assertRaises(SystemExit, _thread.exit) 110 111 def test_ident(self): 112 self.assertIsInstance(_thread.get_ident(), int, 113 "_thread.get_ident() returned a non-integer") 114 self.assertNotEqual(_thread.get_ident(), 0, 115 "_thread.get_ident() returned 0") 116 117 def test_LockType(self): 118 self.assertIsInstance(_thread.allocate_lock(), _thread.LockType, 119 "_thread.LockType is not an instance of what " 120 "is returned by _thread.allocate_lock()") 121 122 def test_set_sentinel(self): 123 self.assertIsInstance(_thread._set_sentinel(), _thread.LockType, 124 "_thread._set_sentinel() did not return a " 125 "LockType instance.") 126 127 def test_interrupt_main(self): 128 #Calling start_new_thread with a function that executes interrupt_main 129 # should raise KeyboardInterrupt upon completion. 130 def call_interrupt(): 131 _thread.interrupt_main() 132 133 self.assertRaises(KeyboardInterrupt, 134 _thread.start_new_thread, 135 call_interrupt, 136 tuple()) 137 138 def test_interrupt_in_main(self): 139 self.assertRaises(KeyboardInterrupt, _thread.interrupt_main) 140 141 def test_stack_size_None(self): 142 retval = _thread.stack_size(None) 143 self.assertEqual(retval, 0) 144 145 def test_stack_size_not_None(self): 146 with self.assertRaises(_thread.error) as cm: 147 _thread.stack_size("") 148 self.assertEqual(cm.exception.args[0], 149 "setting thread stack size not supported") 150 151 152class ThreadTests(unittest.TestCase): 153 """Test thread creation.""" 154 155 def test_arg_passing(self): 156 #Make sure that parameter passing works. 157 def arg_tester(queue, arg1=False, arg2=False): 158 """Use to test _thread.start_new_thread() passes args properly.""" 159 queue.put((arg1, arg2)) 160 161 testing_queue = queue.Queue(1) 162 _thread.start_new_thread(arg_tester, (testing_queue, True, True)) 163 result = testing_queue.get() 164 self.assertTrue(result[0] and result[1], 165 "Argument passing for thread creation " 166 "using tuple failed") 167 168 _thread.start_new_thread( 169 arg_tester, 170 tuple(), 171 {'queue':testing_queue, 'arg1':True, 'arg2':True}) 172 173 result = testing_queue.get() 174 self.assertTrue(result[0] and result[1], 175 "Argument passing for thread creation " 176 "using kwargs failed") 177 178 _thread.start_new_thread( 179 arg_tester, 180 (testing_queue, True), 181 {'arg2':True}) 182 183 result = testing_queue.get() 184 self.assertTrue(result[0] and result[1], 185 "Argument passing for thread creation using both tuple" 186 " and kwargs failed") 187 188 def test_multi_thread_creation(self): 189 def queue_mark(queue, delay): 190 time.sleep(delay) 191 queue.put(_thread.get_ident()) 192 193 thread_count = 5 194 testing_queue = queue.Queue(thread_count) 195 196 if support.verbose: 197 print() 198 print("*** Testing multiple thread creation " 199 "(will take approx. %s to %s sec.) ***" % ( 200 DELAY, thread_count)) 201 202 for count in range(thread_count): 203 if DELAY: 204 local_delay = round(random.random(), 1) 205 else: 206 local_delay = 0 207 _thread.start_new_thread(queue_mark, 208 (testing_queue, local_delay)) 209 time.sleep(DELAY) 210 if support.verbose: 211 print('done') 212 self.assertEqual(testing_queue.qsize(), thread_count, 213 "Not all %s threads executed properly " 214 "after %s sec." % (thread_count, DELAY)) 215 216 def test_args_not_tuple(self): 217 """ 218 Test invoking start_new_thread() with a non-tuple value for "args". 219 Expect TypeError with a meaningful error message to be raised. 220 """ 221 with self.assertRaises(TypeError) as cm: 222 _thread.start_new_thread(mock.Mock(), []) 223 self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") 224 225 def test_kwargs_not_dict(self): 226 """ 227 Test invoking start_new_thread() with a non-dict value for "kwargs". 228 Expect TypeError with a meaningful error message to be raised. 229 """ 230 with self.assertRaises(TypeError) as cm: 231 _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[]) 232 self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") 233 234 def test_SystemExit(self): 235 """ 236 Test invoking start_new_thread() with a function that raises 237 SystemExit. 238 The exception should be discarded. 239 """ 240 func = mock.Mock(side_effect=SystemExit()) 241 try: 242 _thread.start_new_thread(func, tuple()) 243 except SystemExit: 244 self.fail("start_new_thread raised SystemExit.") 245 246 @mock.patch('traceback.print_exc') 247 def test_RaiseException(self, mock_print_exc): 248 """ 249 Test invoking start_new_thread() with a function that raises exception. 250 251 The exception should be discarded and the traceback should be printed 252 via traceback.print_exc() 253 """ 254 func = mock.Mock(side_effect=Exception) 255 _thread.start_new_thread(func, tuple()) 256 self.assertTrue(mock_print_exc.called) 257