1import os 2import unittest 3import random 4from test import support 5from test.support import threading_helper 6import _thread as thread 7import time 8import warnings 9import weakref 10 11from test import lock_tests 12 13threading_helper.requires_working_threading(module=True) 14 15NUMTASKS = 10 16NUMTRIPS = 3 17 18_print_mutex = thread.allocate_lock() 19 20def verbose_print(arg): 21 """Helper function for printing out debugging output.""" 22 if support.verbose: 23 with _print_mutex: 24 print(arg) 25 26 27class BasicThreadTest(unittest.TestCase): 28 29 def setUp(self): 30 self.done_mutex = thread.allocate_lock() 31 self.done_mutex.acquire() 32 self.running_mutex = thread.allocate_lock() 33 self.random_mutex = thread.allocate_lock() 34 self.created = 0 35 self.running = 0 36 self.next_ident = 0 37 38 key = threading_helper.threading_setup() 39 self.addCleanup(threading_helper.threading_cleanup, *key) 40 41 42class ThreadRunningTests(BasicThreadTest): 43 44 def newtask(self): 45 with self.running_mutex: 46 self.next_ident += 1 47 verbose_print("creating task %s" % self.next_ident) 48 thread.start_new_thread(self.task, (self.next_ident,)) 49 self.created += 1 50 self.running += 1 51 52 def task(self, ident): 53 with self.random_mutex: 54 delay = random.random() / 10000.0 55 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 56 time.sleep(delay) 57 verbose_print("task %s done" % ident) 58 with self.running_mutex: 59 self.running -= 1 60 if self.created == NUMTASKS and self.running == 0: 61 self.done_mutex.release() 62 63 def test_starting_threads(self): 64 with threading_helper.wait_threads_exit(): 65 # Basic test for thread creation. 66 for i in range(NUMTASKS): 67 self.newtask() 68 verbose_print("waiting for tasks to complete...") 69 self.done_mutex.acquire() 70 verbose_print("all tasks done") 71 72 def test_stack_size(self): 73 # Various stack size tests. 74 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 75 76 thread.stack_size(0) 77 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 78 79 @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') 80 def test_nt_and_posix_stack_size(self): 81 try: 82 thread.stack_size(4096) 83 except ValueError: 84 verbose_print("caught expected ValueError setting " 85 "stack_size(4096)") 86 except thread.error: 87 self.skipTest("platform does not support changing thread stack " 88 "size") 89 90 fail_msg = "stack_size(%d) failed - should succeed" 91 for tss in (262144, 0x100000, 0): 92 thread.stack_size(tss) 93 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 94 verbose_print("successfully set stack_size(%d)" % tss) 95 96 for tss in (262144, 0x100000): 97 verbose_print("trying stack_size = (%d)" % tss) 98 self.next_ident = 0 99 self.created = 0 100 with threading_helper.wait_threads_exit(): 101 for i in range(NUMTASKS): 102 self.newtask() 103 104 verbose_print("waiting for all tasks to complete") 105 self.done_mutex.acquire() 106 verbose_print("all tasks done") 107 108 thread.stack_size(0) 109 110 def test__count(self): 111 # Test the _count() function. 112 orig = thread._count() 113 mut = thread.allocate_lock() 114 mut.acquire() 115 started = [] 116 117 def task(): 118 started.append(None) 119 mut.acquire() 120 mut.release() 121 122 with threading_helper.wait_threads_exit(): 123 thread.start_new_thread(task, ()) 124 for _ in support.sleeping_retry(support.LONG_TIMEOUT): 125 if started: 126 break 127 self.assertEqual(thread._count(), orig + 1) 128 129 # Allow the task to finish. 130 mut.release() 131 132 # The only reliable way to be sure that the thread ended from the 133 # interpreter's point of view is to wait for the function object to 134 # be destroyed. 135 done = [] 136 wr = weakref.ref(task, lambda _: done.append(None)) 137 del task 138 139 for _ in support.sleeping_retry(support.LONG_TIMEOUT): 140 if done: 141 break 142 support.gc_collect() # For PyPy or other GCs. 143 self.assertEqual(thread._count(), orig) 144 145 def test_unraisable_exception(self): 146 def task(): 147 started.release() 148 raise ValueError("task failed") 149 150 started = thread.allocate_lock() 151 with support.catch_unraisable_exception() as cm: 152 with threading_helper.wait_threads_exit(): 153 started.acquire() 154 thread.start_new_thread(task, ()) 155 started.acquire() 156 157 self.assertEqual(str(cm.unraisable.exc_value), "task failed") 158 self.assertIsNone(cm.unraisable.object) 159 self.assertEqual(cm.unraisable.err_msg, 160 f"Exception ignored in thread started by {task!r}") 161 self.assertIsNotNone(cm.unraisable.exc_traceback) 162 163 def test_join_thread(self): 164 finished = [] 165 166 def task(): 167 time.sleep(0.05) 168 finished.append(thread.get_ident()) 169 170 with threading_helper.wait_threads_exit(): 171 handle = thread.start_joinable_thread(task) 172 handle.join() 173 self.assertEqual(len(finished), 1) 174 self.assertEqual(handle.ident, finished[0]) 175 176 def test_join_thread_already_exited(self): 177 def task(): 178 pass 179 180 with threading_helper.wait_threads_exit(): 181 handle = thread.start_joinable_thread(task) 182 time.sleep(0.05) 183 handle.join() 184 185 def test_join_several_times(self): 186 def task(): 187 pass 188 189 with threading_helper.wait_threads_exit(): 190 handle = thread.start_joinable_thread(task) 191 handle.join() 192 # Subsequent join() calls should succeed 193 handle.join() 194 195 def test_joinable_not_joined(self): 196 handle_destroyed = thread.allocate_lock() 197 handle_destroyed.acquire() 198 199 def task(): 200 handle_destroyed.acquire() 201 202 with threading_helper.wait_threads_exit(): 203 handle = thread.start_joinable_thread(task) 204 del handle 205 handle_destroyed.release() 206 207 def test_join_from_self(self): 208 errors = [] 209 handles = [] 210 start_joinable_thread_returned = thread.allocate_lock() 211 start_joinable_thread_returned.acquire() 212 task_tried_to_join = thread.allocate_lock() 213 task_tried_to_join.acquire() 214 215 def task(): 216 start_joinable_thread_returned.acquire() 217 try: 218 handles[0].join() 219 except Exception as e: 220 errors.append(e) 221 finally: 222 task_tried_to_join.release() 223 224 with threading_helper.wait_threads_exit(): 225 handle = thread.start_joinable_thread(task) 226 handles.append(handle) 227 start_joinable_thread_returned.release() 228 # Can still join after joining failed in other thread 229 task_tried_to_join.acquire() 230 handle.join() 231 232 assert len(errors) == 1 233 with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): 234 raise errors[0] 235 236 def test_join_then_self_join(self): 237 # make sure we can't deadlock in the following scenario with 238 # threads t0 and t1 (see comment in `ThreadHandle_join()` for more 239 # details): 240 # 241 # - t0 joins t1 242 # - t1 self joins 243 def make_lock(): 244 lock = thread.allocate_lock() 245 lock.acquire() 246 return lock 247 248 error = None 249 self_joiner_handle = None 250 self_joiner_started = make_lock() 251 self_joiner_barrier = make_lock() 252 def self_joiner(): 253 nonlocal error 254 255 self_joiner_started.release() 256 self_joiner_barrier.acquire() 257 258 try: 259 self_joiner_handle.join() 260 except Exception as e: 261 error = e 262 263 joiner_started = make_lock() 264 def joiner(): 265 joiner_started.release() 266 self_joiner_handle.join() 267 268 with threading_helper.wait_threads_exit(): 269 self_joiner_handle = thread.start_joinable_thread(self_joiner) 270 # Wait for the self-joining thread to start 271 self_joiner_started.acquire() 272 273 # Start the thread that joins the self-joiner 274 joiner_handle = thread.start_joinable_thread(joiner) 275 276 # Wait for the joiner to start 277 joiner_started.acquire() 278 279 # Not great, but I don't think there's a deterministic way to make 280 # sure that the self-joining thread has been joined. 281 time.sleep(0.1) 282 283 # Unblock the self-joiner 284 self_joiner_barrier.release() 285 286 self_joiner_handle.join() 287 joiner_handle.join() 288 289 with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"): 290 raise error 291 292 def test_join_with_timeout(self): 293 lock = thread.allocate_lock() 294 lock.acquire() 295 296 def thr(): 297 lock.acquire() 298 299 with threading_helper.wait_threads_exit(): 300 handle = thread.start_joinable_thread(thr) 301 handle.join(0.1) 302 self.assertFalse(handle.is_done()) 303 lock.release() 304 handle.join() 305 self.assertTrue(handle.is_done()) 306 307 def test_join_unstarted(self): 308 handle = thread._ThreadHandle() 309 with self.assertRaisesRegex(RuntimeError, "thread not started"): 310 handle.join() 311 312 def test_set_done_unstarted(self): 313 handle = thread._ThreadHandle() 314 with self.assertRaisesRegex(RuntimeError, "thread not started"): 315 handle._set_done() 316 317 def test_start_duplicate_handle(self): 318 lock = thread.allocate_lock() 319 lock.acquire() 320 321 def func(): 322 lock.acquire() 323 324 handle = thread._ThreadHandle() 325 with threading_helper.wait_threads_exit(): 326 thread.start_joinable_thread(func, handle=handle) 327 with self.assertRaisesRegex(RuntimeError, "thread already started"): 328 thread.start_joinable_thread(func, handle=handle) 329 lock.release() 330 handle.join() 331 332 def test_start_with_none_handle(self): 333 def func(): 334 pass 335 336 with threading_helper.wait_threads_exit(): 337 handle = thread.start_joinable_thread(func, handle=None) 338 handle.join() 339 340 341class Barrier: 342 def __init__(self, num_threads): 343 self.num_threads = num_threads 344 self.waiting = 0 345 self.checkin_mutex = thread.allocate_lock() 346 self.checkout_mutex = thread.allocate_lock() 347 self.checkout_mutex.acquire() 348 349 def enter(self): 350 self.checkin_mutex.acquire() 351 self.waiting = self.waiting + 1 352 if self.waiting == self.num_threads: 353 self.waiting = self.num_threads - 1 354 self.checkout_mutex.release() 355 return 356 self.checkin_mutex.release() 357 358 self.checkout_mutex.acquire() 359 self.waiting = self.waiting - 1 360 if self.waiting == 0: 361 self.checkin_mutex.release() 362 return 363 self.checkout_mutex.release() 364 365 366class BarrierTest(BasicThreadTest): 367 368 def test_barrier(self): 369 with threading_helper.wait_threads_exit(): 370 self.bar = Barrier(NUMTASKS) 371 self.running = NUMTASKS 372 for i in range(NUMTASKS): 373 thread.start_new_thread(self.task2, (i,)) 374 verbose_print("waiting for tasks to end") 375 self.done_mutex.acquire() 376 verbose_print("tasks done") 377 378 def task2(self, ident): 379 for i in range(NUMTRIPS): 380 if ident == 0: 381 # give it a good chance to enter the next 382 # barrier before the others are all out 383 # of the current one 384 delay = 0 385 else: 386 with self.random_mutex: 387 delay = random.random() / 10000.0 388 verbose_print("task %s will run for %sus" % 389 (ident, round(delay * 1e6))) 390 time.sleep(delay) 391 verbose_print("task %s entering %s" % (ident, i)) 392 self.bar.enter() 393 verbose_print("task %s leaving barrier" % ident) 394 with self.running_mutex: 395 self.running -= 1 396 # Must release mutex before releasing done, else the main thread can 397 # exit and set mutex to None as part of global teardown; then 398 # mutex.release() raises AttributeError. 399 finished = self.running == 0 400 if finished: 401 self.done_mutex.release() 402 403class LockTests(lock_tests.LockTests): 404 locktype = thread.allocate_lock 405 406 407class TestForkInThread(unittest.TestCase): 408 def setUp(self): 409 self.read_fd, self.write_fd = os.pipe() 410 411 @support.requires_fork() 412 @threading_helper.reap_threads 413 def test_forkinthread(self): 414 pid = None 415 416 def fork_thread(read_fd, write_fd): 417 nonlocal pid 418 419 # Ignore the warning about fork with threads. 420 with warnings.catch_warnings(category=DeprecationWarning, 421 action="ignore"): 422 # fork in a thread (DANGER, undefined per POSIX) 423 if (pid := os.fork()): 424 # parent process 425 return 426 427 # child process 428 try: 429 os.close(read_fd) 430 os.write(write_fd, b"OK") 431 finally: 432 os._exit(0) 433 434 with threading_helper.wait_threads_exit(): 435 thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd)) 436 self.assertEqual(os.read(self.read_fd, 2), b"OK") 437 os.close(self.write_fd) 438 439 self.assertIsNotNone(pid) 440 support.wait_process(pid, exitcode=0) 441 442 def tearDown(self): 443 try: 444 os.close(self.read_fd) 445 except OSError: 446 pass 447 448 try: 449 os.close(self.write_fd) 450 except OSError: 451 pass 452 453 454if __name__ == "__main__": 455 unittest.main() 456