1""" 2Tests for the threading module. 3""" 4 5import test.support 6from test.support import threading_helper 7from test.support import verbose, cpython_only, os_helper 8from test.support.import_helper import import_module 9from test.support.script_helper import assert_python_ok, assert_python_failure 10 11import random 12import sys 13import _thread 14import threading 15import time 16import unittest 17import weakref 18import os 19import subprocess 20import signal 21import textwrap 22import traceback 23 24from unittest import mock 25from test import lock_tests 26from test import support 27 28 29# Between fork() and exec(), only async-safe functions are allowed (issues 30# #12316 and #11870), and fork() from a worker thread is known to trigger 31# problems with some operating systems (issue #3863): skip problematic tests 32# on platforms known to behave badly. 33platforms_to_skip = ('netbsd5', 'hp-ux11') 34 35# Is Python built with Py_DEBUG macro defined? 36Py_DEBUG = hasattr(sys, 'gettotalrefcount') 37 38 39def restore_default_excepthook(testcase): 40 testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook) 41 threading.excepthook = threading.__excepthook__ 42 43 44# A trivial mutable counter. 45class Counter(object): 46 def __init__(self): 47 self.value = 0 48 def inc(self): 49 self.value += 1 50 def dec(self): 51 self.value -= 1 52 def get(self): 53 return self.value 54 55class TestThread(threading.Thread): 56 def __init__(self, name, testcase, sema, mutex, nrunning): 57 threading.Thread.__init__(self, name=name) 58 self.testcase = testcase 59 self.sema = sema 60 self.mutex = mutex 61 self.nrunning = nrunning 62 63 def run(self): 64 delay = random.random() / 10000.0 65 if verbose: 66 print('task %s will run for %.1f usec' % 67 (self.name, delay * 1e6)) 68 69 with self.sema: 70 with self.mutex: 71 self.nrunning.inc() 72 if verbose: 73 print(self.nrunning.get(), 'tasks are running') 74 self.testcase.assertLessEqual(self.nrunning.get(), 3) 75 76 time.sleep(delay) 77 if verbose: 78 print('task', self.name, 'done') 79 80 with self.mutex: 81 self.nrunning.dec() 82 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 83 if verbose: 84 print('%s is finished. %d tasks are running' % 85 (self.name, self.nrunning.get())) 86 87 88class BaseTestCase(unittest.TestCase): 89 def setUp(self): 90 self._threads = threading_helper.threading_setup() 91 92 def tearDown(self): 93 threading_helper.threading_cleanup(*self._threads) 94 test.support.reap_children() 95 96 97class ThreadTests(BaseTestCase): 98 99 @cpython_only 100 def test_name(self): 101 def func(): pass 102 103 thread = threading.Thread(name="myname1") 104 self.assertEqual(thread.name, "myname1") 105 106 # Convert int name to str 107 thread = threading.Thread(name=123) 108 self.assertEqual(thread.name, "123") 109 110 # target name is ignored if name is specified 111 thread = threading.Thread(target=func, name="myname2") 112 self.assertEqual(thread.name, "myname2") 113 114 with mock.patch.object(threading, '_counter', return_value=2): 115 thread = threading.Thread(name="") 116 self.assertEqual(thread.name, "Thread-2") 117 118 with mock.patch.object(threading, '_counter', return_value=3): 119 thread = threading.Thread() 120 self.assertEqual(thread.name, "Thread-3") 121 122 with mock.patch.object(threading, '_counter', return_value=5): 123 thread = threading.Thread(target=func) 124 self.assertEqual(thread.name, "Thread-5 (func)") 125 126 @cpython_only 127 def test_disallow_instantiation(self): 128 # Ensure that the type disallows instantiation (bpo-43916) 129 lock = threading.Lock() 130 test.support.check_disallow_instantiation(self, type(lock)) 131 132 # Create a bunch of threads, let each do some work, wait until all are 133 # done. 134 def test_various_ops(self): 135 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 136 # times about 1 second per clump). 137 NUMTASKS = 10 138 139 # no more than 3 of the 10 can run at once 140 sema = threading.BoundedSemaphore(value=3) 141 mutex = threading.RLock() 142 numrunning = Counter() 143 144 threads = [] 145 146 for i in range(NUMTASKS): 147 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 148 threads.append(t) 149 self.assertIsNone(t.ident) 150 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') 151 t.start() 152 153 if hasattr(threading, 'get_native_id'): 154 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()} 155 self.assertNotIn(None, native_ids) 156 self.assertEqual(len(native_ids), NUMTASKS + 1) 157 158 if verbose: 159 print('waiting for all tasks to complete') 160 for t in threads: 161 t.join() 162 self.assertFalse(t.is_alive()) 163 self.assertNotEqual(t.ident, 0) 164 self.assertIsNotNone(t.ident) 165 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') 166 if verbose: 167 print('all tasks done') 168 self.assertEqual(numrunning.get(), 0) 169 170 def test_ident_of_no_threading_threads(self): 171 # The ident still must work for the main thread and dummy threads. 172 self.assertIsNotNone(threading.current_thread().ident) 173 def f(): 174 ident.append(threading.current_thread().ident) 175 done.set() 176 done = threading.Event() 177 ident = [] 178 with threading_helper.wait_threads_exit(): 179 tid = _thread.start_new_thread(f, ()) 180 done.wait() 181 self.assertEqual(ident[0], tid) 182 # Kill the "immortal" _DummyThread 183 del threading._active[ident[0]] 184 185 # run with a small(ish) thread stack size (256 KiB) 186 def test_various_ops_small_stack(self): 187 if verbose: 188 print('with 256 KiB thread stack size...') 189 try: 190 threading.stack_size(262144) 191 except _thread.error: 192 raise unittest.SkipTest( 193 'platform does not support changing thread stack size') 194 self.test_various_ops() 195 threading.stack_size(0) 196 197 # run with a large thread stack size (1 MiB) 198 def test_various_ops_large_stack(self): 199 if verbose: 200 print('with 1 MiB thread stack size...') 201 try: 202 threading.stack_size(0x100000) 203 except _thread.error: 204 raise unittest.SkipTest( 205 'platform does not support changing thread stack size') 206 self.test_various_ops() 207 threading.stack_size(0) 208 209 def test_foreign_thread(self): 210 # Check that a "foreign" thread can use the threading module. 211 def f(mutex): 212 # Calling current_thread() forces an entry for the foreign 213 # thread to get made in the threading._active map. 214 threading.current_thread() 215 mutex.release() 216 217 mutex = threading.Lock() 218 mutex.acquire() 219 with threading_helper.wait_threads_exit(): 220 tid = _thread.start_new_thread(f, (mutex,)) 221 # Wait for the thread to finish. 222 mutex.acquire() 223 self.assertIn(tid, threading._active) 224 self.assertIsInstance(threading._active[tid], threading._DummyThread) 225 #Issue 29376 226 self.assertTrue(threading._active[tid].is_alive()) 227 self.assertRegex(repr(threading._active[tid]), '_DummyThread') 228 del threading._active[tid] 229 230 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 231 # exposed at the Python level. This test relies on ctypes to get at it. 232 def test_PyThreadState_SetAsyncExc(self): 233 ctypes = import_module("ctypes") 234 235 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 236 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object) 237 238 class AsyncExc(Exception): 239 pass 240 241 exception = ctypes.py_object(AsyncExc) 242 243 # First check it works when setting the exception from the same thread. 244 tid = threading.get_ident() 245 self.assertIsInstance(tid, int) 246 self.assertGreater(tid, 0) 247 248 try: 249 result = set_async_exc(tid, exception) 250 # The exception is async, so we might have to keep the VM busy until 251 # it notices. 252 while True: 253 pass 254 except AsyncExc: 255 pass 256 else: 257 # This code is unreachable but it reflects the intent. If we wanted 258 # to be smarter the above loop wouldn't be infinite. 259 self.fail("AsyncExc not raised") 260 try: 261 self.assertEqual(result, 1) # one thread state modified 262 except UnboundLocalError: 263 # The exception was raised too quickly for us to get the result. 264 pass 265 266 # `worker_started` is set by the thread when it's inside a try/except 267 # block waiting to catch the asynchronously set AsyncExc exception. 268 # `worker_saw_exception` is set by the thread upon catching that 269 # exception. 270 worker_started = threading.Event() 271 worker_saw_exception = threading.Event() 272 273 class Worker(threading.Thread): 274 def run(self): 275 self.id = threading.get_ident() 276 self.finished = False 277 278 try: 279 while True: 280 worker_started.set() 281 time.sleep(0.1) 282 except AsyncExc: 283 self.finished = True 284 worker_saw_exception.set() 285 286 t = Worker() 287 t.daemon = True # so if this fails, we don't hang Python at shutdown 288 t.start() 289 if verbose: 290 print(" started worker thread") 291 292 # Try a thread id that doesn't make sense. 293 if verbose: 294 print(" trying nonsensical thread id") 295 result = set_async_exc(-1, exception) 296 self.assertEqual(result, 0) # no thread states modified 297 298 # Now raise an exception in the worker thread. 299 if verbose: 300 print(" waiting for worker thread to get started") 301 ret = worker_started.wait() 302 self.assertTrue(ret) 303 if verbose: 304 print(" verifying worker hasn't exited") 305 self.assertFalse(t.finished) 306 if verbose: 307 print(" attempting to raise asynch exception in worker") 308 result = set_async_exc(t.id, exception) 309 self.assertEqual(result, 1) # one thread state modified 310 if verbose: 311 print(" waiting for worker to say it caught the exception") 312 worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT) 313 self.assertTrue(t.finished) 314 if verbose: 315 print(" all OK -- joining worker") 316 if t.finished: 317 t.join() 318 # else the thread is still running, and we have no way to kill it 319 320 def test_limbo_cleanup(self): 321 # Issue 7481: Failure to start thread should cleanup the limbo map. 322 def fail_new_thread(*args): 323 raise threading.ThreadError() 324 _start_new_thread = threading._start_new_thread 325 threading._start_new_thread = fail_new_thread 326 try: 327 t = threading.Thread(target=lambda: None) 328 self.assertRaises(threading.ThreadError, t.start) 329 self.assertFalse( 330 t in threading._limbo, 331 "Failed to cleanup _limbo map on failure of Thread.start().") 332 finally: 333 threading._start_new_thread = _start_new_thread 334 335 def test_finalize_running_thread(self): 336 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 337 # very late on python exit: on deallocation of a running thread for 338 # example. 339 import_module("ctypes") 340 341 rc, out, err = assert_python_failure("-c", """if 1: 342 import ctypes, sys, time, _thread 343 344 # This lock is used as a simple event variable. 345 ready = _thread.allocate_lock() 346 ready.acquire() 347 348 # Module globals are cleared before __del__ is run 349 # So we save the functions in class dict 350 class C: 351 ensure = ctypes.pythonapi.PyGILState_Ensure 352 release = ctypes.pythonapi.PyGILState_Release 353 def __del__(self): 354 state = self.ensure() 355 self.release(state) 356 357 def waitingThread(): 358 x = C() 359 ready.release() 360 time.sleep(100) 361 362 _thread.start_new_thread(waitingThread, ()) 363 ready.acquire() # Be sure the other thread is waiting. 364 sys.exit(42) 365 """) 366 self.assertEqual(rc, 42) 367 368 def test_finalize_with_trace(self): 369 # Issue1733757 370 # Avoid a deadlock when sys.settrace steps into threading._shutdown 371 assert_python_ok("-c", """if 1: 372 import sys, threading 373 374 # A deadlock-killer, to prevent the 375 # testsuite to hang forever 376 def killer(): 377 import os, time 378 time.sleep(2) 379 print('program blocked; aborting') 380 os._exit(2) 381 t = threading.Thread(target=killer) 382 t.daemon = True 383 t.start() 384 385 # This is the trace function 386 def func(frame, event, arg): 387 threading.current_thread() 388 return func 389 390 sys.settrace(func) 391 """) 392 393 def test_join_nondaemon_on_shutdown(self): 394 # Issue 1722344 395 # Raising SystemExit skipped threading._shutdown 396 rc, out, err = assert_python_ok("-c", """if 1: 397 import threading 398 from time import sleep 399 400 def child(): 401 sleep(1) 402 # As a non-daemon thread we SHOULD wake up and nothing 403 # should be torn down yet 404 print("Woke up, sleep function is:", sleep) 405 406 threading.Thread(target=child).start() 407 raise SystemExit 408 """) 409 self.assertEqual(out.strip(), 410 b"Woke up, sleep function is: <built-in function sleep>") 411 self.assertEqual(err, b"") 412 413 def test_enumerate_after_join(self): 414 # Try hard to trigger #1703448: a thread is still returned in 415 # threading.enumerate() after it has been join()ed. 416 enum = threading.enumerate 417 old_interval = sys.getswitchinterval() 418 try: 419 for i in range(1, 100): 420 sys.setswitchinterval(i * 0.0002) 421 t = threading.Thread(target=lambda: None) 422 t.start() 423 t.join() 424 l = enum() 425 self.assertNotIn(t, l, 426 "#1703448 triggered after %d trials: %s" % (i, l)) 427 finally: 428 sys.setswitchinterval(old_interval) 429 430 def test_no_refcycle_through_target(self): 431 class RunSelfFunction(object): 432 def __init__(self, should_raise): 433 # The links in this refcycle from Thread back to self 434 # should be cleaned up when the thread completes. 435 self.should_raise = should_raise 436 self.thread = threading.Thread(target=self._run, 437 args=(self,), 438 kwargs={'yet_another':self}) 439 self.thread.start() 440 441 def _run(self, other_ref, yet_another): 442 if self.should_raise: 443 raise SystemExit 444 445 restore_default_excepthook(self) 446 447 cyclic_object = RunSelfFunction(should_raise=False) 448 weak_cyclic_object = weakref.ref(cyclic_object) 449 cyclic_object.thread.join() 450 del cyclic_object 451 self.assertIsNone(weak_cyclic_object(), 452 msg=('%d references still around' % 453 sys.getrefcount(weak_cyclic_object()))) 454 455 raising_cyclic_object = RunSelfFunction(should_raise=True) 456 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 457 raising_cyclic_object.thread.join() 458 del raising_cyclic_object 459 self.assertIsNone(weak_raising_cyclic_object(), 460 msg=('%d references still around' % 461 sys.getrefcount(weak_raising_cyclic_object()))) 462 463 def test_old_threading_api(self): 464 # Just a quick sanity check to make sure the old method names are 465 # still present 466 t = threading.Thread() 467 with self.assertWarnsRegex(DeprecationWarning, 468 r'get the daemon attribute'): 469 t.isDaemon() 470 with self.assertWarnsRegex(DeprecationWarning, 471 r'set the daemon attribute'): 472 t.setDaemon(True) 473 with self.assertWarnsRegex(DeprecationWarning, 474 r'get the name attribute'): 475 t.getName() 476 with self.assertWarnsRegex(DeprecationWarning, 477 r'set the name attribute'): 478 t.setName("name") 479 480 e = threading.Event() 481 with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'): 482 e.isSet() 483 484 cond = threading.Condition() 485 cond.acquire() 486 with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'): 487 cond.notifyAll() 488 489 with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'): 490 threading.activeCount() 491 with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'): 492 threading.currentThread() 493 494 def test_repr_daemon(self): 495 t = threading.Thread() 496 self.assertNotIn('daemon', repr(t)) 497 t.daemon = True 498 self.assertIn('daemon', repr(t)) 499 500 def test_daemon_param(self): 501 t = threading.Thread() 502 self.assertFalse(t.daemon) 503 t = threading.Thread(daemon=False) 504 self.assertFalse(t.daemon) 505 t = threading.Thread(daemon=True) 506 self.assertTrue(t.daemon) 507 508 @unittest.skipUnless(hasattr(os, 'fork'), 'needs os.fork()') 509 def test_fork_at_exit(self): 510 # bpo-42350: Calling os.fork() after threading._shutdown() must 511 # not log an error. 512 code = textwrap.dedent(""" 513 import atexit 514 import os 515 import sys 516 from test.support import wait_process 517 518 # Import the threading module to register its "at fork" callback 519 import threading 520 521 def exit_handler(): 522 pid = os.fork() 523 if not pid: 524 print("child process ok", file=sys.stderr, flush=True) 525 # child process 526 else: 527 wait_process(pid, exitcode=0) 528 529 # exit_handler() will be called after threading._shutdown() 530 atexit.register(exit_handler) 531 """) 532 _, out, err = assert_python_ok("-c", code) 533 self.assertEqual(out, b'') 534 self.assertEqual(err.rstrip(), b'child process ok') 535 536 @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') 537 def test_dummy_thread_after_fork(self): 538 # Issue #14308: a dummy thread in the active list doesn't mess up 539 # the after-fork mechanism. 540 code = """if 1: 541 import _thread, threading, os, time 542 543 def background_thread(evt): 544 # Creates and registers the _DummyThread instance 545 threading.current_thread() 546 evt.set() 547 time.sleep(10) 548 549 evt = threading.Event() 550 _thread.start_new_thread(background_thread, (evt,)) 551 evt.wait() 552 assert threading.active_count() == 2, threading.active_count() 553 if os.fork() == 0: 554 assert threading.active_count() == 1, threading.active_count() 555 os._exit(0) 556 else: 557 os.wait() 558 """ 559 _, out, err = assert_python_ok("-c", code) 560 self.assertEqual(out, b'') 561 self.assertEqual(err, b'') 562 563 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 564 def test_is_alive_after_fork(self): 565 # Try hard to trigger #18418: is_alive() could sometimes be True on 566 # threads that vanished after a fork. 567 old_interval = sys.getswitchinterval() 568 self.addCleanup(sys.setswitchinterval, old_interval) 569 570 # Make the bug more likely to manifest. 571 test.support.setswitchinterval(1e-6) 572 573 for i in range(20): 574 t = threading.Thread(target=lambda: None) 575 t.start() 576 pid = os.fork() 577 if pid == 0: 578 os._exit(11 if t.is_alive() else 10) 579 else: 580 t.join() 581 582 support.wait_process(pid, exitcode=10) 583 584 def test_main_thread(self): 585 main = threading.main_thread() 586 self.assertEqual(main.name, 'MainThread') 587 self.assertEqual(main.ident, threading.current_thread().ident) 588 self.assertEqual(main.ident, threading.get_ident()) 589 590 def f(): 591 self.assertNotEqual(threading.main_thread().ident, 592 threading.current_thread().ident) 593 th = threading.Thread(target=f) 594 th.start() 595 th.join() 596 597 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 598 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 599 def test_main_thread_after_fork(self): 600 code = """if 1: 601 import os, threading 602 from test import support 603 604 pid = os.fork() 605 if pid == 0: 606 main = threading.main_thread() 607 print(main.name) 608 print(main.ident == threading.current_thread().ident) 609 print(main.ident == threading.get_ident()) 610 else: 611 support.wait_process(pid, exitcode=0) 612 """ 613 _, out, err = assert_python_ok("-c", code) 614 data = out.decode().replace('\r', '') 615 self.assertEqual(err, b"") 616 self.assertEqual(data, "MainThread\nTrue\nTrue\n") 617 618 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 619 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 620 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 621 def test_main_thread_after_fork_from_nonmain_thread(self): 622 code = """if 1: 623 import os, threading, sys 624 from test import support 625 626 def func(): 627 pid = os.fork() 628 if pid == 0: 629 main = threading.main_thread() 630 print(main.name) 631 print(main.ident == threading.current_thread().ident) 632 print(main.ident == threading.get_ident()) 633 # stdout is fully buffered because not a tty, 634 # we have to flush before exit. 635 sys.stdout.flush() 636 else: 637 support.wait_process(pid, exitcode=0) 638 639 th = threading.Thread(target=func) 640 th.start() 641 th.join() 642 """ 643 _, out, err = assert_python_ok("-c", code) 644 data = out.decode().replace('\r', '') 645 self.assertEqual(err, b"") 646 self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n") 647 648 def test_main_thread_during_shutdown(self): 649 # bpo-31516: current_thread() should still point to the main thread 650 # at shutdown 651 code = """if 1: 652 import gc, threading 653 654 main_thread = threading.current_thread() 655 assert main_thread is threading.main_thread() # sanity check 656 657 class RefCycle: 658 def __init__(self): 659 self.cycle = self 660 661 def __del__(self): 662 print("GC:", 663 threading.current_thread() is main_thread, 664 threading.main_thread() is main_thread, 665 threading.enumerate() == [main_thread]) 666 667 RefCycle() 668 gc.collect() # sanity check 669 x = RefCycle() 670 """ 671 _, out, err = assert_python_ok("-c", code) 672 data = out.decode() 673 self.assertEqual(err, b"") 674 self.assertEqual(data.splitlines(), 675 ["GC: True True True"] * 2) 676 677 def test_finalization_shutdown(self): 678 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait 679 # until Python thread states of all non-daemon threads get deleted. 680 # 681 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but 682 # test the finalization of the main interpreter. 683 code = """if 1: 684 import os 685 import threading 686 import time 687 import random 688 689 def random_sleep(): 690 seconds = random.random() * 0.010 691 time.sleep(seconds) 692 693 class Sleeper: 694 def __del__(self): 695 random_sleep() 696 697 tls = threading.local() 698 699 def f(): 700 # Sleep a bit so that the thread is still running when 701 # Py_Finalize() is called. 702 random_sleep() 703 tls.x = Sleeper() 704 random_sleep() 705 706 threading.Thread(target=f).start() 707 random_sleep() 708 """ 709 rc, out, err = assert_python_ok("-c", code) 710 self.assertEqual(err, b"") 711 712 def test_tstate_lock(self): 713 # Test an implementation detail of Thread objects. 714 started = _thread.allocate_lock() 715 finish = _thread.allocate_lock() 716 started.acquire() 717 finish.acquire() 718 def f(): 719 started.release() 720 finish.acquire() 721 time.sleep(0.01) 722 # The tstate lock is None until the thread is started 723 t = threading.Thread(target=f) 724 self.assertIs(t._tstate_lock, None) 725 t.start() 726 started.acquire() 727 self.assertTrue(t.is_alive()) 728 # The tstate lock can't be acquired when the thread is running 729 # (or suspended). 730 tstate_lock = t._tstate_lock 731 self.assertFalse(tstate_lock.acquire(timeout=0), False) 732 finish.release() 733 # When the thread ends, the state_lock can be successfully 734 # acquired. 735 self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False) 736 # But is_alive() is still True: we hold _tstate_lock now, which 737 # prevents is_alive() from knowing the thread's end-of-life C code 738 # is done. 739 self.assertTrue(t.is_alive()) 740 # Let is_alive() find out the C code is done. 741 tstate_lock.release() 742 self.assertFalse(t.is_alive()) 743 # And verify the thread disposed of _tstate_lock. 744 self.assertIsNone(t._tstate_lock) 745 t.join() 746 747 def test_repr_stopped(self): 748 # Verify that "stopped" shows up in repr(Thread) appropriately. 749 started = _thread.allocate_lock() 750 finish = _thread.allocate_lock() 751 started.acquire() 752 finish.acquire() 753 def f(): 754 started.release() 755 finish.acquire() 756 t = threading.Thread(target=f) 757 t.start() 758 started.acquire() 759 self.assertIn("started", repr(t)) 760 finish.release() 761 # "stopped" should appear in the repr in a reasonable amount of time. 762 # Implementation detail: as of this writing, that's trivially true 763 # if .join() is called, and almost trivially true if .is_alive() is 764 # called. The detail we're testing here is that "stopped" shows up 765 # "all on its own". 766 LOOKING_FOR = "stopped" 767 for i in range(500): 768 if LOOKING_FOR in repr(t): 769 break 770 time.sleep(0.01) 771 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds 772 t.join() 773 774 def test_BoundedSemaphore_limit(self): 775 # BoundedSemaphore should raise ValueError if released too often. 776 for limit in range(1, 10): 777 bs = threading.BoundedSemaphore(limit) 778 threads = [threading.Thread(target=bs.acquire) 779 for _ in range(limit)] 780 for t in threads: 781 t.start() 782 for t in threads: 783 t.join() 784 threads = [threading.Thread(target=bs.release) 785 for _ in range(limit)] 786 for t in threads: 787 t.start() 788 for t in threads: 789 t.join() 790 self.assertRaises(ValueError, bs.release) 791 792 @cpython_only 793 def test_frame_tstate_tracing(self): 794 # Issue #14432: Crash when a generator is created in a C thread that is 795 # destroyed while the generator is still used. The issue was that a 796 # generator contains a frame, and the frame kept a reference to the 797 # Python state of the destroyed C thread. The crash occurs when a trace 798 # function is setup. 799 800 def noop_trace(frame, event, arg): 801 # no operation 802 return noop_trace 803 804 def generator(): 805 while 1: 806 yield "generator" 807 808 def callback(): 809 if callback.gen is None: 810 callback.gen = generator() 811 return next(callback.gen) 812 callback.gen = None 813 814 old_trace = sys.gettrace() 815 sys.settrace(noop_trace) 816 try: 817 # Install a trace function 818 threading.settrace(noop_trace) 819 820 # Create a generator in a C thread which exits after the call 821 import _testcapi 822 _testcapi.call_in_temporary_c_thread(callback) 823 824 # Call the generator in a different Python thread, check that the 825 # generator didn't keep a reference to the destroyed thread state 826 for test in range(3): 827 # The trace function is still called here 828 callback() 829 finally: 830 sys.settrace(old_trace) 831 832 def test_gettrace(self): 833 def noop_trace(frame, event, arg): 834 # no operation 835 return noop_trace 836 old_trace = threading.gettrace() 837 try: 838 threading.settrace(noop_trace) 839 trace_func = threading.gettrace() 840 self.assertEqual(noop_trace,trace_func) 841 finally: 842 threading.settrace(old_trace) 843 844 def test_getprofile(self): 845 def fn(*args): pass 846 old_profile = threading.getprofile() 847 try: 848 threading.setprofile(fn) 849 self.assertEqual(fn, threading.getprofile()) 850 finally: 851 threading.setprofile(old_profile) 852 853 @cpython_only 854 def test_shutdown_locks(self): 855 for daemon in (False, True): 856 with self.subTest(daemon=daemon): 857 event = threading.Event() 858 thread = threading.Thread(target=event.wait, daemon=daemon) 859 860 # Thread.start() must add lock to _shutdown_locks, 861 # but only for non-daemon thread 862 thread.start() 863 tstate_lock = thread._tstate_lock 864 if not daemon: 865 self.assertIn(tstate_lock, threading._shutdown_locks) 866 else: 867 self.assertNotIn(tstate_lock, threading._shutdown_locks) 868 869 # unblock the thread and join it 870 event.set() 871 thread.join() 872 873 # Thread._stop() must remove tstate_lock from _shutdown_locks. 874 # Daemon threads must never add it to _shutdown_locks. 875 self.assertNotIn(tstate_lock, threading._shutdown_locks) 876 877 def test_locals_at_exit(self): 878 # bpo-19466: thread locals must not be deleted before destructors 879 # are called 880 rc, out, err = assert_python_ok("-c", """if 1: 881 import threading 882 883 class Atexit: 884 def __del__(self): 885 print("thread_dict.atexit = %r" % thread_dict.atexit) 886 887 thread_dict = threading.local() 888 thread_dict.atexit = "value" 889 890 atexit = Atexit() 891 """) 892 self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'") 893 894 def test_boolean_target(self): 895 # bpo-41149: A thread that had a boolean value of False would not 896 # run, regardless of whether it was callable. The correct behaviour 897 # is for a thread to do nothing if its target is None, and to call 898 # the target otherwise. 899 class BooleanTarget(object): 900 def __init__(self): 901 self.ran = False 902 def __bool__(self): 903 return False 904 def __call__(self): 905 self.ran = True 906 907 target = BooleanTarget() 908 thread = threading.Thread(target=target) 909 thread.start() 910 thread.join() 911 self.assertTrue(target.ran) 912 913 def test_leak_without_join(self): 914 # bpo-37788: Test that a thread which is not joined explicitly 915 # does not leak. Test written for reference leak checks. 916 def noop(): pass 917 with threading_helper.wait_threads_exit(): 918 threading.Thread(target=noop).start() 919 # Thread.join() is not called 920 921 @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)') 922 def test_debug_deprecation(self): 923 # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated 924 rc, out, err = assert_python_ok("-Wdefault", "-c", "pass", 925 PYTHONTHREADDEBUG="1") 926 msg = (b'DeprecationWarning: The threading debug ' 927 b'(PYTHONTHREADDEBUG environment variable) ' 928 b'is deprecated and will be removed in Python 3.12') 929 self.assertIn(msg, err) 930 931 def test_import_from_another_thread(self): 932 # bpo-1596321: If the threading module is first import from a thread 933 # different than the main thread, threading._shutdown() must handle 934 # this case without logging an error at Python exit. 935 code = textwrap.dedent(''' 936 import _thread 937 import sys 938 939 event = _thread.allocate_lock() 940 event.acquire() 941 942 def import_threading(): 943 import threading 944 event.release() 945 946 if 'threading' in sys.modules: 947 raise Exception('threading is already imported') 948 949 _thread.start_new_thread(import_threading, ()) 950 951 # wait until the threading module is imported 952 event.acquire() 953 event.release() 954 955 if 'threading' not in sys.modules: 956 raise Exception('threading is not imported') 957 958 # don't wait until the thread completes 959 ''') 960 rc, out, err = assert_python_ok("-c", code) 961 self.assertEqual(out, b'') 962 self.assertEqual(err, b'') 963 964 965class ThreadJoinOnShutdown(BaseTestCase): 966 967 def _run_and_join(self, script): 968 script = """if 1: 969 import sys, os, time, threading 970 971 # a thread, which waits for the main program to terminate 972 def joiningfunc(mainthread): 973 mainthread.join() 974 print('end of thread') 975 # stdout is fully buffered because not a tty, we have to flush 976 # before exit. 977 sys.stdout.flush() 978 \n""" + script 979 980 rc, out, err = assert_python_ok("-c", script) 981 data = out.decode().replace('\r', '') 982 self.assertEqual(data, "end of main\nend of thread\n") 983 984 def test_1_join_on_shutdown(self): 985 # The usual case: on exit, wait for a non-daemon thread 986 script = """if 1: 987 import os 988 t = threading.Thread(target=joiningfunc, 989 args=(threading.current_thread(),)) 990 t.start() 991 time.sleep(0.1) 992 print('end of main') 993 """ 994 self._run_and_join(script) 995 996 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 997 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 998 def test_2_join_in_forked_process(self): 999 # Like the test above, but from a forked interpreter 1000 script = """if 1: 1001 from test import support 1002 1003 childpid = os.fork() 1004 if childpid != 0: 1005 # parent process 1006 support.wait_process(childpid, exitcode=0) 1007 sys.exit(0) 1008 1009 # child process 1010 t = threading.Thread(target=joiningfunc, 1011 args=(threading.current_thread(),)) 1012 t.start() 1013 print('end of main') 1014 """ 1015 self._run_and_join(script) 1016 1017 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 1018 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1019 def test_3_join_in_forked_from_thread(self): 1020 # Like the test above, but fork() was called from a worker thread 1021 # In the forked process, the main Thread object must be marked as stopped. 1022 1023 script = """if 1: 1024 from test import support 1025 1026 main_thread = threading.current_thread() 1027 def worker(): 1028 childpid = os.fork() 1029 if childpid != 0: 1030 # parent process 1031 support.wait_process(childpid, exitcode=0) 1032 sys.exit(0) 1033 1034 # child process 1035 t = threading.Thread(target=joiningfunc, 1036 args=(main_thread,)) 1037 print('end of main') 1038 t.start() 1039 t.join() # Should not block: main_thread is already stopped 1040 1041 w = threading.Thread(target=worker) 1042 w.start() 1043 """ 1044 self._run_and_join(script) 1045 1046 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1047 def test_4_daemon_threads(self): 1048 # Check that a daemon thread cannot crash the interpreter on shutdown 1049 # by manipulating internal structures that are being disposed of in 1050 # the main thread. 1051 script = """if True: 1052 import os 1053 import random 1054 import sys 1055 import time 1056 import threading 1057 1058 thread_has_run = set() 1059 1060 def random_io(): 1061 '''Loop for a while sleeping random tiny amounts and doing some I/O.''' 1062 while True: 1063 with open(os.__file__, 'rb') as in_f: 1064 stuff = in_f.read(200) 1065 with open(os.devnull, 'wb') as null_f: 1066 null_f.write(stuff) 1067 time.sleep(random.random() / 1995) 1068 thread_has_run.add(threading.current_thread()) 1069 1070 def main(): 1071 count = 0 1072 for _ in range(40): 1073 new_thread = threading.Thread(target=random_io) 1074 new_thread.daemon = True 1075 new_thread.start() 1076 count += 1 1077 while len(thread_has_run) < count: 1078 time.sleep(0.001) 1079 # Trigger process shutdown 1080 sys.exit(0) 1081 1082 main() 1083 """ 1084 rc, out, err = assert_python_ok('-c', script) 1085 self.assertFalse(err) 1086 1087 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 1088 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1089 def test_reinit_tls_after_fork(self): 1090 # Issue #13817: fork() would deadlock in a multithreaded program with 1091 # the ad-hoc TLS implementation. 1092 1093 def do_fork_and_wait(): 1094 # just fork a child process and wait it 1095 pid = os.fork() 1096 if pid > 0: 1097 support.wait_process(pid, exitcode=50) 1098 else: 1099 os._exit(50) 1100 1101 # start a bunch of threads that will fork() child processes 1102 threads = [] 1103 for i in range(16): 1104 t = threading.Thread(target=do_fork_and_wait) 1105 threads.append(t) 1106 t.start() 1107 1108 for t in threads: 1109 t.join() 1110 1111 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 1112 def test_clear_threads_states_after_fork(self): 1113 # Issue #17094: check that threads states are cleared after fork() 1114 1115 # start a bunch of threads 1116 threads = [] 1117 for i in range(16): 1118 t = threading.Thread(target=lambda : time.sleep(0.3)) 1119 threads.append(t) 1120 t.start() 1121 1122 pid = os.fork() 1123 if pid == 0: 1124 # check that threads states have been cleared 1125 if len(sys._current_frames()) == 1: 1126 os._exit(51) 1127 else: 1128 os._exit(52) 1129 else: 1130 support.wait_process(pid, exitcode=51) 1131 1132 for t in threads: 1133 t.join() 1134 1135 1136class SubinterpThreadingTests(BaseTestCase): 1137 def pipe(self): 1138 r, w = os.pipe() 1139 self.addCleanup(os.close, r) 1140 self.addCleanup(os.close, w) 1141 if hasattr(os, 'set_blocking'): 1142 os.set_blocking(r, False) 1143 return (r, w) 1144 1145 def test_threads_join(self): 1146 # Non-daemon threads should be joined at subinterpreter shutdown 1147 # (issue #18808) 1148 r, w = self.pipe() 1149 code = textwrap.dedent(r""" 1150 import os 1151 import random 1152 import threading 1153 import time 1154 1155 def random_sleep(): 1156 seconds = random.random() * 0.010 1157 time.sleep(seconds) 1158 1159 def f(): 1160 # Sleep a bit so that the thread is still running when 1161 # Py_EndInterpreter is called. 1162 random_sleep() 1163 os.write(%d, b"x") 1164 1165 threading.Thread(target=f).start() 1166 random_sleep() 1167 """ % (w,)) 1168 ret = test.support.run_in_subinterp(code) 1169 self.assertEqual(ret, 0) 1170 # The thread was joined properly. 1171 self.assertEqual(os.read(r, 1), b"x") 1172 1173 def test_threads_join_2(self): 1174 # Same as above, but a delay gets introduced after the thread's 1175 # Python code returned but before the thread state is deleted. 1176 # To achieve this, we register a thread-local object which sleeps 1177 # a bit when deallocated. 1178 r, w = self.pipe() 1179 code = textwrap.dedent(r""" 1180 import os 1181 import random 1182 import threading 1183 import time 1184 1185 def random_sleep(): 1186 seconds = random.random() * 0.010 1187 time.sleep(seconds) 1188 1189 class Sleeper: 1190 def __del__(self): 1191 random_sleep() 1192 1193 tls = threading.local() 1194 1195 def f(): 1196 # Sleep a bit so that the thread is still running when 1197 # Py_EndInterpreter is called. 1198 random_sleep() 1199 tls.x = Sleeper() 1200 os.write(%d, b"x") 1201 1202 threading.Thread(target=f).start() 1203 random_sleep() 1204 """ % (w,)) 1205 ret = test.support.run_in_subinterp(code) 1206 self.assertEqual(ret, 0) 1207 # The thread was joined properly. 1208 self.assertEqual(os.read(r, 1), b"x") 1209 1210 @cpython_only 1211 def test_daemon_threads_fatal_error(self): 1212 subinterp_code = f"""if 1: 1213 import os 1214 import threading 1215 import time 1216 1217 def f(): 1218 # Make sure the daemon thread is still running when 1219 # Py_EndInterpreter is called. 1220 time.sleep({test.support.SHORT_TIMEOUT}) 1221 threading.Thread(target=f, daemon=True).start() 1222 """ 1223 script = r"""if 1: 1224 import _testcapi 1225 1226 _testcapi.run_in_subinterp(%r) 1227 """ % (subinterp_code,) 1228 with test.support.SuppressCrashReport(): 1229 rc, out, err = assert_python_failure("-c", script) 1230 self.assertIn("Fatal Python error: Py_EndInterpreter: " 1231 "not the last thread", err.decode()) 1232 1233 1234class ThreadingExceptionTests(BaseTestCase): 1235 # A RuntimeError should be raised if Thread.start() is called 1236 # multiple times. 1237 def test_start_thread_again(self): 1238 thread = threading.Thread() 1239 thread.start() 1240 self.assertRaises(RuntimeError, thread.start) 1241 thread.join() 1242 1243 def test_joining_current_thread(self): 1244 current_thread = threading.current_thread() 1245 self.assertRaises(RuntimeError, current_thread.join); 1246 1247 def test_joining_inactive_thread(self): 1248 thread = threading.Thread() 1249 self.assertRaises(RuntimeError, thread.join) 1250 1251 def test_daemonize_active_thread(self): 1252 thread = threading.Thread() 1253 thread.start() 1254 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 1255 thread.join() 1256 1257 def test_releasing_unacquired_lock(self): 1258 lock = threading.Lock() 1259 self.assertRaises(RuntimeError, lock.release) 1260 1261 def test_recursion_limit(self): 1262 # Issue 9670 1263 # test that excessive recursion within a non-main thread causes 1264 # an exception rather than crashing the interpreter on platforms 1265 # like Mac OS X or FreeBSD which have small default stack sizes 1266 # for threads 1267 script = """if True: 1268 import threading 1269 1270 def recurse(): 1271 return recurse() 1272 1273 def outer(): 1274 try: 1275 recurse() 1276 except RecursionError: 1277 pass 1278 1279 w = threading.Thread(target=outer) 1280 w.start() 1281 w.join() 1282 print('end of main thread') 1283 """ 1284 expected_output = "end of main thread\n" 1285 p = subprocess.Popen([sys.executable, "-c", script], 1286 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1287 stdout, stderr = p.communicate() 1288 data = stdout.decode().replace('\r', '') 1289 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) 1290 self.assertEqual(data, expected_output) 1291 1292 def test_print_exception(self): 1293 script = r"""if True: 1294 import threading 1295 import time 1296 1297 running = False 1298 def run(): 1299 global running 1300 running = True 1301 while running: 1302 time.sleep(0.01) 1303 1/0 1304 t = threading.Thread(target=run) 1305 t.start() 1306 while not running: 1307 time.sleep(0.01) 1308 running = False 1309 t.join() 1310 """ 1311 rc, out, err = assert_python_ok("-c", script) 1312 self.assertEqual(out, b'') 1313 err = err.decode() 1314 self.assertIn("Exception in thread", err) 1315 self.assertIn("Traceback (most recent call last):", err) 1316 self.assertIn("ZeroDivisionError", err) 1317 self.assertNotIn("Unhandled exception", err) 1318 1319 def test_print_exception_stderr_is_none_1(self): 1320 script = r"""if True: 1321 import sys 1322 import threading 1323 import time 1324 1325 running = False 1326 def run(): 1327 global running 1328 running = True 1329 while running: 1330 time.sleep(0.01) 1331 1/0 1332 t = threading.Thread(target=run) 1333 t.start() 1334 while not running: 1335 time.sleep(0.01) 1336 sys.stderr = None 1337 running = False 1338 t.join() 1339 """ 1340 rc, out, err = assert_python_ok("-c", script) 1341 self.assertEqual(out, b'') 1342 err = err.decode() 1343 self.assertIn("Exception in thread", err) 1344 self.assertIn("Traceback (most recent call last):", err) 1345 self.assertIn("ZeroDivisionError", err) 1346 self.assertNotIn("Unhandled exception", err) 1347 1348 def test_print_exception_stderr_is_none_2(self): 1349 script = r"""if True: 1350 import sys 1351 import threading 1352 import time 1353 1354 running = False 1355 def run(): 1356 global running 1357 running = True 1358 while running: 1359 time.sleep(0.01) 1360 1/0 1361 sys.stderr = None 1362 t = threading.Thread(target=run) 1363 t.start() 1364 while not running: 1365 time.sleep(0.01) 1366 running = False 1367 t.join() 1368 """ 1369 rc, out, err = assert_python_ok("-c", script) 1370 self.assertEqual(out, b'') 1371 self.assertNotIn("Unhandled exception", err.decode()) 1372 1373 def test_bare_raise_in_brand_new_thread(self): 1374 def bare_raise(): 1375 raise 1376 1377 class Issue27558(threading.Thread): 1378 exc = None 1379 1380 def run(self): 1381 try: 1382 bare_raise() 1383 except Exception as exc: 1384 self.exc = exc 1385 1386 thread = Issue27558() 1387 thread.start() 1388 thread.join() 1389 self.assertIsNotNone(thread.exc) 1390 self.assertIsInstance(thread.exc, RuntimeError) 1391 # explicitly break the reference cycle to not leak a dangling thread 1392 thread.exc = None 1393 1394 def test_multithread_modify_file_noerror(self): 1395 # See issue25872 1396 def modify_file(): 1397 with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp: 1398 fp.write(' ') 1399 traceback.format_stack() 1400 1401 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1402 threads = [ 1403 threading.Thread(target=modify_file) 1404 for i in range(100) 1405 ] 1406 for t in threads: 1407 t.start() 1408 t.join() 1409 1410 1411class ThreadRunFail(threading.Thread): 1412 def run(self): 1413 raise ValueError("run failed") 1414 1415 1416class ExceptHookTests(BaseTestCase): 1417 def setUp(self): 1418 restore_default_excepthook(self) 1419 super().setUp() 1420 1421 def test_excepthook(self): 1422 with support.captured_output("stderr") as stderr: 1423 thread = ThreadRunFail(name="excepthook thread") 1424 thread.start() 1425 thread.join() 1426 1427 stderr = stderr.getvalue().strip() 1428 self.assertIn(f'Exception in thread {thread.name}:\n', stderr) 1429 self.assertIn('Traceback (most recent call last):\n', stderr) 1430 self.assertIn(' raise ValueError("run failed")', stderr) 1431 self.assertIn('ValueError: run failed', stderr) 1432 1433 @support.cpython_only 1434 def test_excepthook_thread_None(self): 1435 # threading.excepthook called with thread=None: log the thread 1436 # identifier in this case. 1437 with support.captured_output("stderr") as stderr: 1438 try: 1439 raise ValueError("bug") 1440 except Exception as exc: 1441 args = threading.ExceptHookArgs([*sys.exc_info(), None]) 1442 try: 1443 threading.excepthook(args) 1444 finally: 1445 # Explicitly break a reference cycle 1446 args = None 1447 1448 stderr = stderr.getvalue().strip() 1449 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr) 1450 self.assertIn('Traceback (most recent call last):\n', stderr) 1451 self.assertIn(' raise ValueError("bug")', stderr) 1452 self.assertIn('ValueError: bug', stderr) 1453 1454 def test_system_exit(self): 1455 class ThreadExit(threading.Thread): 1456 def run(self): 1457 sys.exit(1) 1458 1459 # threading.excepthook() silently ignores SystemExit 1460 with support.captured_output("stderr") as stderr: 1461 thread = ThreadExit() 1462 thread.start() 1463 thread.join() 1464 1465 self.assertEqual(stderr.getvalue(), '') 1466 1467 def test_custom_excepthook(self): 1468 args = None 1469 1470 def hook(hook_args): 1471 nonlocal args 1472 args = hook_args 1473 1474 try: 1475 with support.swap_attr(threading, 'excepthook', hook): 1476 thread = ThreadRunFail() 1477 thread.start() 1478 thread.join() 1479 1480 self.assertEqual(args.exc_type, ValueError) 1481 self.assertEqual(str(args.exc_value), 'run failed') 1482 self.assertEqual(args.exc_traceback, args.exc_value.__traceback__) 1483 self.assertIs(args.thread, thread) 1484 finally: 1485 # Break reference cycle 1486 args = None 1487 1488 def test_custom_excepthook_fail(self): 1489 def threading_hook(args): 1490 raise ValueError("threading_hook failed") 1491 1492 err_str = None 1493 1494 def sys_hook(exc_type, exc_value, exc_traceback): 1495 nonlocal err_str 1496 err_str = str(exc_value) 1497 1498 with support.swap_attr(threading, 'excepthook', threading_hook), \ 1499 support.swap_attr(sys, 'excepthook', sys_hook), \ 1500 support.captured_output('stderr') as stderr: 1501 thread = ThreadRunFail() 1502 thread.start() 1503 thread.join() 1504 1505 self.assertEqual(stderr.getvalue(), 1506 'Exception in threading.excepthook:\n') 1507 self.assertEqual(err_str, 'threading_hook failed') 1508 1509 def test_original_excepthook(self): 1510 def run_thread(): 1511 with support.captured_output("stderr") as output: 1512 thread = ThreadRunFail(name="excepthook thread") 1513 thread.start() 1514 thread.join() 1515 return output.getvalue() 1516 1517 def threading_hook(args): 1518 print("Running a thread failed", file=sys.stderr) 1519 1520 default_output = run_thread() 1521 with support.swap_attr(threading, 'excepthook', threading_hook): 1522 custom_hook_output = run_thread() 1523 threading.excepthook = threading.__excepthook__ 1524 recovered_output = run_thread() 1525 1526 self.assertEqual(default_output, recovered_output) 1527 self.assertNotEqual(default_output, custom_hook_output) 1528 self.assertEqual(custom_hook_output, "Running a thread failed\n") 1529 1530 1531class TimerTests(BaseTestCase): 1532 1533 def setUp(self): 1534 BaseTestCase.setUp(self) 1535 self.callback_args = [] 1536 self.callback_event = threading.Event() 1537 1538 def test_init_immutable_default_args(self): 1539 # Issue 17435: constructor defaults were mutable objects, they could be 1540 # mutated via the object attributes and affect other Timer objects. 1541 timer1 = threading.Timer(0.01, self._callback_spy) 1542 timer1.start() 1543 self.callback_event.wait() 1544 timer1.args.append("blah") 1545 timer1.kwargs["foo"] = "bar" 1546 self.callback_event.clear() 1547 timer2 = threading.Timer(0.01, self._callback_spy) 1548 timer2.start() 1549 self.callback_event.wait() 1550 self.assertEqual(len(self.callback_args), 2) 1551 self.assertEqual(self.callback_args, [((), {}), ((), {})]) 1552 timer1.join() 1553 timer2.join() 1554 1555 def _callback_spy(self, *args, **kwargs): 1556 self.callback_args.append((args[:], kwargs.copy())) 1557 self.callback_event.set() 1558 1559class LockTests(lock_tests.LockTests): 1560 locktype = staticmethod(threading.Lock) 1561 1562class PyRLockTests(lock_tests.RLockTests): 1563 locktype = staticmethod(threading._PyRLock) 1564 1565@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') 1566class CRLockTests(lock_tests.RLockTests): 1567 locktype = staticmethod(threading._CRLock) 1568 1569class EventTests(lock_tests.EventTests): 1570 eventtype = staticmethod(threading.Event) 1571 1572class ConditionAsRLockTests(lock_tests.RLockTests): 1573 # Condition uses an RLock by default and exports its API. 1574 locktype = staticmethod(threading.Condition) 1575 1576class ConditionTests(lock_tests.ConditionTests): 1577 condtype = staticmethod(threading.Condition) 1578 1579class SemaphoreTests(lock_tests.SemaphoreTests): 1580 semtype = staticmethod(threading.Semaphore) 1581 1582class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 1583 semtype = staticmethod(threading.BoundedSemaphore) 1584 1585class BarrierTests(lock_tests.BarrierTests): 1586 barriertype = staticmethod(threading.Barrier) 1587 1588 1589class MiscTestCase(unittest.TestCase): 1590 def test__all__(self): 1591 restore_default_excepthook(self) 1592 1593 extra = {"ThreadError"} 1594 not_exported = {'currentThread', 'activeCount'} 1595 support.check__all__(self, threading, ('threading', '_thread'), 1596 extra=extra, not_exported=not_exported) 1597 1598 1599class InterruptMainTests(unittest.TestCase): 1600 def check_interrupt_main_with_signal_handler(self, signum): 1601 def handler(signum, frame): 1602 1/0 1603 1604 old_handler = signal.signal(signum, handler) 1605 self.addCleanup(signal.signal, signum, old_handler) 1606 1607 with self.assertRaises(ZeroDivisionError): 1608 _thread.interrupt_main() 1609 1610 def check_interrupt_main_noerror(self, signum): 1611 handler = signal.getsignal(signum) 1612 try: 1613 # No exception should arise. 1614 signal.signal(signum, signal.SIG_IGN) 1615 _thread.interrupt_main(signum) 1616 1617 signal.signal(signum, signal.SIG_DFL) 1618 _thread.interrupt_main(signum) 1619 finally: 1620 # Restore original handler 1621 signal.signal(signum, handler) 1622 1623 def test_interrupt_main_subthread(self): 1624 # Calling start_new_thread with a function that executes interrupt_main 1625 # should raise KeyboardInterrupt upon completion. 1626 def call_interrupt(): 1627 _thread.interrupt_main() 1628 t = threading.Thread(target=call_interrupt) 1629 with self.assertRaises(KeyboardInterrupt): 1630 t.start() 1631 t.join() 1632 t.join() 1633 1634 def test_interrupt_main_mainthread(self): 1635 # Make sure that if interrupt_main is called in main thread that 1636 # KeyboardInterrupt is raised instantly. 1637 with self.assertRaises(KeyboardInterrupt): 1638 _thread.interrupt_main() 1639 1640 def test_interrupt_main_with_signal_handler(self): 1641 self.check_interrupt_main_with_signal_handler(signal.SIGINT) 1642 self.check_interrupt_main_with_signal_handler(signal.SIGTERM) 1643 1644 def test_interrupt_main_noerror(self): 1645 self.check_interrupt_main_noerror(signal.SIGINT) 1646 self.check_interrupt_main_noerror(signal.SIGTERM) 1647 1648 def test_interrupt_main_invalid_signal(self): 1649 self.assertRaises(ValueError, _thread.interrupt_main, -1) 1650 self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG) 1651 self.assertRaises(ValueError, _thread.interrupt_main, 1000000) 1652 1653 @threading_helper.reap_threads 1654 def test_can_interrupt_tight_loops(self): 1655 cont = [True] 1656 started = [False] 1657 interrupted = [False] 1658 1659 def worker(started, cont, interrupted): 1660 iterations = 100_000_000 1661 started[0] = True 1662 while cont[0]: 1663 if iterations: 1664 iterations -= 1 1665 else: 1666 return 1667 pass 1668 interrupted[0] = True 1669 1670 t = threading.Thread(target=worker,args=(started, cont, interrupted)) 1671 t.start() 1672 while not started[0]: 1673 pass 1674 cont[0] = False 1675 t.join() 1676 self.assertTrue(interrupted[0]) 1677 1678 1679class AtexitTests(unittest.TestCase): 1680 1681 def test_atexit_output(self): 1682 rc, out, err = assert_python_ok("-c", """if True: 1683 import threading 1684 1685 def run_last(): 1686 print('parrot') 1687 1688 threading._register_atexit(run_last) 1689 """) 1690 1691 self.assertFalse(err) 1692 self.assertEqual(out.strip(), b'parrot') 1693 1694 def test_atexit_called_once(self): 1695 rc, out, err = assert_python_ok("-c", """if True: 1696 import threading 1697 from unittest.mock import Mock 1698 1699 mock = Mock() 1700 threading._register_atexit(mock) 1701 mock.assert_not_called() 1702 # force early shutdown to ensure it was called once 1703 threading._shutdown() 1704 mock.assert_called_once() 1705 """) 1706 1707 self.assertFalse(err) 1708 1709 def test_atexit_after_shutdown(self): 1710 # The only way to do this is by registering an atexit within 1711 # an atexit, which is intended to raise an exception. 1712 rc, out, err = assert_python_ok("-c", """if True: 1713 import threading 1714 1715 def func(): 1716 pass 1717 1718 def run_last(): 1719 threading._register_atexit(func) 1720 1721 threading._register_atexit(run_last) 1722 """) 1723 1724 self.assertTrue(err) 1725 self.assertIn("RuntimeError: can't register atexit after shutdown", 1726 err.decode()) 1727 1728 1729if __name__ == "__main__": 1730 unittest.main() 1731