1""" 2Tests for the threading module. 3""" 4 5import test.support 6from test.support import threading_helper, requires_subprocess, requires_gil_enabled 7from test.support import verbose, cpython_only, os_helper 8from test.support.import_helper import import_module 9from test.support.script_helper import assert_python_ok, assert_python_failure 10from test.support import force_not_colorized 11 12import random 13import sys 14import _thread 15import threading 16import time 17import unittest 18import weakref 19import os 20import subprocess 21import signal 22import textwrap 23import traceback 24import warnings 25 26from unittest import mock 27from test import lock_tests 28from test import support 29 30try: 31 from test.support import interpreters 32except ImportError: 33 interpreters = None 34 35threading_helper.requires_working_threading(module=True) 36 37# Between fork() and exec(), only async-safe functions are allowed (issues 38# #12316 and #11870), and fork() from a worker thread is known to trigger 39# problems with some operating systems (issue #3863): skip problematic tests 40# on platforms known to behave badly. 41platforms_to_skip = ('netbsd5', 'hp-ux11') 42 43 44def skip_unless_reliable_fork(test): 45 if not support.has_fork_support: 46 return unittest.skip("requires working os.fork()")(test) 47 if sys.platform in platforms_to_skip: 48 return unittest.skip("due to known OS bug related to thread+fork")(test) 49 if support.HAVE_ASAN_FORK_BUG: 50 return unittest.skip("libasan has a pthread_create() dead lock related to thread+fork")(test) 51 if support.check_sanitizer(thread=True): 52 return unittest.skip("TSAN doesn't support threads after fork")(test) 53 return test 54 55 56def requires_subinterpreters(meth): 57 """Decorator to skip a test if subinterpreters are not supported.""" 58 return unittest.skipIf(interpreters is None, 59 'subinterpreters required')(meth) 60 61 62def restore_default_excepthook(testcase): 63 testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook) 64 threading.excepthook = threading.__excepthook__ 65 66 67# A trivial mutable counter. 68class Counter(object): 69 def __init__(self): 70 self.value = 0 71 def inc(self): 72 self.value += 1 73 def dec(self): 74 self.value -= 1 75 def get(self): 76 return self.value 77 78class TestThread(threading.Thread): 79 def __init__(self, name, testcase, sema, mutex, nrunning): 80 threading.Thread.__init__(self, name=name) 81 self.testcase = testcase 82 self.sema = sema 83 self.mutex = mutex 84 self.nrunning = nrunning 85 86 def run(self): 87 delay = random.random() / 10000.0 88 if verbose: 89 print('task %s will run for %.1f usec' % 90 (self.name, delay * 1e6)) 91 92 with self.sema: 93 with self.mutex: 94 self.nrunning.inc() 95 if verbose: 96 print(self.nrunning.get(), 'tasks are running') 97 self.testcase.assertLessEqual(self.nrunning.get(), 3) 98 99 time.sleep(delay) 100 if verbose: 101 print('task', self.name, 'done') 102 103 with self.mutex: 104 self.nrunning.dec() 105 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 106 if verbose: 107 print('%s is finished. %d tasks are running' % 108 (self.name, self.nrunning.get())) 109 110 111class BaseTestCase(unittest.TestCase): 112 def setUp(self): 113 self._threads = threading_helper.threading_setup() 114 115 def tearDown(self): 116 threading_helper.threading_cleanup(*self._threads) 117 test.support.reap_children() 118 119 120class ThreadTests(BaseTestCase): 121 maxDiff = 9999 122 123 @cpython_only 124 def test_name(self): 125 def func(): pass 126 127 thread = threading.Thread(name="myname1") 128 self.assertEqual(thread.name, "myname1") 129 130 # Convert int name to str 131 thread = threading.Thread(name=123) 132 self.assertEqual(thread.name, "123") 133 134 # target name is ignored if name is specified 135 thread = threading.Thread(target=func, name="myname2") 136 self.assertEqual(thread.name, "myname2") 137 138 with mock.patch.object(threading, '_counter', return_value=2): 139 thread = threading.Thread(name="") 140 self.assertEqual(thread.name, "Thread-2") 141 142 with mock.patch.object(threading, '_counter', return_value=3): 143 thread = threading.Thread() 144 self.assertEqual(thread.name, "Thread-3") 145 146 with mock.patch.object(threading, '_counter', return_value=5): 147 thread = threading.Thread(target=func) 148 self.assertEqual(thread.name, "Thread-5 (func)") 149 150 def test_args_argument(self): 151 # bpo-45735: Using list or tuple as *args* in constructor could 152 # achieve the same effect. 153 num_list = [1] 154 num_tuple = (1,) 155 156 str_list = ["str"] 157 str_tuple = ("str",) 158 159 list_in_tuple = ([1],) 160 tuple_in_list = [(1,)] 161 162 test_cases = ( 163 (num_list, lambda arg: self.assertEqual(arg, 1)), 164 (num_tuple, lambda arg: self.assertEqual(arg, 1)), 165 (str_list, lambda arg: self.assertEqual(arg, "str")), 166 (str_tuple, lambda arg: self.assertEqual(arg, "str")), 167 (list_in_tuple, lambda arg: self.assertEqual(arg, [1])), 168 (tuple_in_list, lambda arg: self.assertEqual(arg, (1,))) 169 ) 170 171 for args, target in test_cases: 172 with self.subTest(target=target, args=args): 173 t = threading.Thread(target=target, args=args) 174 t.start() 175 t.join() 176 177 def test_lock_no_args(self): 178 threading.Lock() # works 179 self.assertRaises(TypeError, threading.Lock, 1) 180 self.assertRaises(TypeError, threading.Lock, a=1) 181 self.assertRaises(TypeError, threading.Lock, 1, 2, a=1, b=2) 182 183 def test_lock_no_subclass(self): 184 # Intentionally disallow subclasses of threading.Lock because they have 185 # never been allowed, so why start now just because the type is public? 186 with self.assertRaises(TypeError): 187 class MyLock(threading.Lock): pass 188 189 def test_lock_or_none(self): 190 import types 191 self.assertIsInstance(threading.Lock | None, types.UnionType) 192 193 # Create a bunch of threads, let each do some work, wait until all are 194 # done. 195 def test_various_ops(self): 196 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 197 # times about 1 second per clump). 198 NUMTASKS = 10 199 200 # no more than 3 of the 10 can run at once 201 sema = threading.BoundedSemaphore(value=3) 202 mutex = threading.RLock() 203 numrunning = Counter() 204 205 threads = [] 206 207 for i in range(NUMTASKS): 208 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 209 threads.append(t) 210 self.assertIsNone(t.ident) 211 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') 212 t.start() 213 214 if hasattr(threading, 'get_native_id'): 215 native_ids = set(t.native_id for t in threads) | {threading.get_native_id()} 216 self.assertNotIn(None, native_ids) 217 self.assertEqual(len(native_ids), NUMTASKS + 1) 218 219 if verbose: 220 print('waiting for all tasks to complete') 221 for t in threads: 222 t.join() 223 self.assertFalse(t.is_alive()) 224 self.assertNotEqual(t.ident, 0) 225 self.assertIsNotNone(t.ident) 226 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') 227 if verbose: 228 print('all tasks done') 229 self.assertEqual(numrunning.get(), 0) 230 231 def test_ident_of_no_threading_threads(self): 232 # The ident still must work for the main thread and dummy threads. 233 self.assertIsNotNone(threading.current_thread().ident) 234 def f(): 235 ident.append(threading.current_thread().ident) 236 done.set() 237 done = threading.Event() 238 ident = [] 239 with threading_helper.wait_threads_exit(): 240 tid = _thread.start_new_thread(f, ()) 241 done.wait() 242 self.assertEqual(ident[0], tid) 243 244 # run with a small(ish) thread stack size (256 KiB) 245 def test_various_ops_small_stack(self): 246 if verbose: 247 print('with 256 KiB thread stack size...') 248 try: 249 threading.stack_size(262144) 250 except _thread.error: 251 raise unittest.SkipTest( 252 'platform does not support changing thread stack size') 253 self.test_various_ops() 254 threading.stack_size(0) 255 256 # run with a large thread stack size (1 MiB) 257 def test_various_ops_large_stack(self): 258 if verbose: 259 print('with 1 MiB thread stack size...') 260 try: 261 threading.stack_size(0x100000) 262 except _thread.error: 263 raise unittest.SkipTest( 264 'platform does not support changing thread stack size') 265 self.test_various_ops() 266 threading.stack_size(0) 267 268 def test_foreign_thread(self): 269 # Check that a "foreign" thread can use the threading module. 270 dummy_thread = None 271 error = None 272 def f(mutex): 273 try: 274 nonlocal dummy_thread 275 nonlocal error 276 # Calling current_thread() forces an entry for the foreign 277 # thread to get made in the threading._active map. 278 dummy_thread = threading.current_thread() 279 tid = dummy_thread.ident 280 self.assertIn(tid, threading._active) 281 self.assertIsInstance(dummy_thread, threading._DummyThread) 282 self.assertIs(threading._active.get(tid), dummy_thread) 283 # gh-29376 284 self.assertTrue( 285 dummy_thread.is_alive(), 286 'Expected _DummyThread to be considered alive.' 287 ) 288 self.assertIn('_DummyThread', repr(dummy_thread)) 289 except BaseException as e: 290 error = e 291 finally: 292 mutex.release() 293 294 mutex = threading.Lock() 295 mutex.acquire() 296 with threading_helper.wait_threads_exit(): 297 tid = _thread.start_new_thread(f, (mutex,)) 298 # Wait for the thread to finish. 299 mutex.acquire() 300 if error is not None: 301 raise error 302 self.assertEqual(tid, dummy_thread.ident) 303 # Issue gh-106236: 304 with self.assertRaises(RuntimeError): 305 dummy_thread.join() 306 dummy_thread._started.clear() 307 with self.assertRaises(RuntimeError): 308 dummy_thread.is_alive() 309 # Busy wait for the following condition: after the thread dies, the 310 # related dummy thread must be removed from threading._active. 311 timeout = 5 312 timeout_at = time.monotonic() + timeout 313 while time.monotonic() < timeout_at: 314 if threading._active.get(dummy_thread.ident) is not dummy_thread: 315 break 316 time.sleep(.1) 317 else: 318 self.fail('It was expected that the created threading._DummyThread was removed from threading._active.') 319 320 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 321 # exposed at the Python level. This test relies on ctypes to get at it. 322 def test_PyThreadState_SetAsyncExc(self): 323 ctypes = import_module("ctypes") 324 325 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 326 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object) 327 328 class AsyncExc(Exception): 329 pass 330 331 exception = ctypes.py_object(AsyncExc) 332 333 # First check it works when setting the exception from the same thread. 334 tid = threading.get_ident() 335 self.assertIsInstance(tid, int) 336 self.assertGreater(tid, 0) 337 338 try: 339 result = set_async_exc(tid, exception) 340 # The exception is async, so we might have to keep the VM busy until 341 # it notices. 342 while True: 343 pass 344 except AsyncExc: 345 pass 346 else: 347 # This code is unreachable but it reflects the intent. If we wanted 348 # to be smarter the above loop wouldn't be infinite. 349 self.fail("AsyncExc not raised") 350 try: 351 self.assertEqual(result, 1) # one thread state modified 352 except UnboundLocalError: 353 # The exception was raised too quickly for us to get the result. 354 pass 355 356 # `worker_started` is set by the thread when it's inside a try/except 357 # block waiting to catch the asynchronously set AsyncExc exception. 358 # `worker_saw_exception` is set by the thread upon catching that 359 # exception. 360 worker_started = threading.Event() 361 worker_saw_exception = threading.Event() 362 363 class Worker(threading.Thread): 364 def run(self): 365 self.id = threading.get_ident() 366 self.finished = False 367 368 try: 369 while True: 370 worker_started.set() 371 time.sleep(0.1) 372 except AsyncExc: 373 self.finished = True 374 worker_saw_exception.set() 375 376 t = Worker() 377 t.daemon = True # so if this fails, we don't hang Python at shutdown 378 t.start() 379 if verbose: 380 print(" started worker thread") 381 382 # Try a thread id that doesn't make sense. 383 if verbose: 384 print(" trying nonsensical thread id") 385 result = set_async_exc(-1, exception) 386 self.assertEqual(result, 0) # no thread states modified 387 388 # Now raise an exception in the worker thread. 389 if verbose: 390 print(" waiting for worker thread to get started") 391 ret = worker_started.wait() 392 self.assertTrue(ret) 393 if verbose: 394 print(" verifying worker hasn't exited") 395 self.assertFalse(t.finished) 396 if verbose: 397 print(" attempting to raise asynch exception in worker") 398 result = set_async_exc(t.id, exception) 399 self.assertEqual(result, 1) # one thread state modified 400 if verbose: 401 print(" waiting for worker to say it caught the exception") 402 worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT) 403 self.assertTrue(t.finished) 404 if verbose: 405 print(" all OK -- joining worker") 406 if t.finished: 407 t.join() 408 # else the thread is still running, and we have no way to kill it 409 410 def test_limbo_cleanup(self): 411 # Issue 7481: Failure to start thread should cleanup the limbo map. 412 def fail_new_thread(*args, **kwargs): 413 raise threading.ThreadError() 414 _start_joinable_thread = threading._start_joinable_thread 415 threading._start_joinable_thread = fail_new_thread 416 try: 417 t = threading.Thread(target=lambda: None) 418 self.assertRaises(threading.ThreadError, t.start) 419 self.assertFalse( 420 t in threading._limbo, 421 "Failed to cleanup _limbo map on failure of Thread.start().") 422 finally: 423 threading._start_joinable_thread = _start_joinable_thread 424 425 def test_finalize_running_thread(self): 426 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 427 # very late on python exit: on deallocation of a running thread for 428 # example. 429 if support.check_sanitizer(thread=True): 430 # the thread running `time.sleep(100)` below will still be alive 431 # at process exit 432 self.skipTest("TSAN would report thread leak") 433 import_module("ctypes") 434 435 rc, out, err = assert_python_failure("-c", """if 1: 436 import ctypes, sys, time, _thread 437 438 # This lock is used as a simple event variable. 439 ready = _thread.allocate_lock() 440 ready.acquire() 441 442 # Module globals are cleared before __del__ is run 443 # So we save the functions in class dict 444 class C: 445 ensure = ctypes.pythonapi.PyGILState_Ensure 446 release = ctypes.pythonapi.PyGILState_Release 447 def __del__(self): 448 state = self.ensure() 449 self.release(state) 450 451 def waitingThread(): 452 x = C() 453 ready.release() 454 time.sleep(100) 455 456 _thread.start_new_thread(waitingThread, ()) 457 ready.acquire() # Be sure the other thread is waiting. 458 sys.exit(42) 459 """) 460 self.assertEqual(rc, 42) 461 462 def test_finalize_with_trace(self): 463 # Issue1733757 464 # Avoid a deadlock when sys.settrace steps into threading._shutdown 465 if support.check_sanitizer(thread=True): 466 # the thread running `time.sleep(2)` below will still be alive 467 # at process exit 468 self.skipTest("TSAN would report thread leak") 469 470 assert_python_ok("-c", """if 1: 471 import sys, threading 472 473 # A deadlock-killer, to prevent the 474 # testsuite to hang forever 475 def killer(): 476 import os, time 477 time.sleep(2) 478 print('program blocked; aborting') 479 os._exit(2) 480 t = threading.Thread(target=killer) 481 t.daemon = True 482 t.start() 483 484 # This is the trace function 485 def func(frame, event, arg): 486 threading.current_thread() 487 return func 488 489 sys.settrace(func) 490 """) 491 492 def test_join_nondaemon_on_shutdown(self): 493 # Issue 1722344 494 # Raising SystemExit skipped threading._shutdown 495 rc, out, err = assert_python_ok("-c", """if 1: 496 import threading 497 from time import sleep 498 499 def child(): 500 sleep(1) 501 # As a non-daemon thread we SHOULD wake up and nothing 502 # should be torn down yet 503 print("Woke up, sleep function is:", sleep) 504 505 threading.Thread(target=child).start() 506 raise SystemExit 507 """) 508 self.assertEqual(out.strip(), 509 b"Woke up, sleep function is: <built-in function sleep>") 510 self.assertEqual(err, b"") 511 512 def test_enumerate_after_join(self): 513 # Try hard to trigger #1703448: a thread is still returned in 514 # threading.enumerate() after it has been join()ed. 515 enum = threading.enumerate 516 old_interval = sys.getswitchinterval() 517 try: 518 for i in range(1, 100): 519 support.setswitchinterval(i * 0.0002) 520 t = threading.Thread(target=lambda: None) 521 t.start() 522 t.join() 523 l = enum() 524 self.assertNotIn(t, l, 525 "#1703448 triggered after %d trials: %s" % (i, l)) 526 finally: 527 sys.setswitchinterval(old_interval) 528 529 def test_join_from_multiple_threads(self): 530 # Thread.join() should be thread-safe 531 errors = [] 532 533 def worker(): 534 time.sleep(0.005) 535 536 def joiner(thread): 537 try: 538 thread.join() 539 except Exception as e: 540 errors.append(e) 541 542 for N in range(2, 20): 543 threads = [threading.Thread(target=worker)] 544 for i in range(N): 545 threads.append(threading.Thread(target=joiner, 546 args=(threads[0],))) 547 for t in threads: 548 t.start() 549 time.sleep(0.01) 550 for t in threads: 551 t.join() 552 if errors: 553 raise errors[0] 554 555 def test_join_with_timeout(self): 556 lock = _thread.allocate_lock() 557 lock.acquire() 558 559 def worker(): 560 lock.acquire() 561 562 thread = threading.Thread(target=worker) 563 thread.start() 564 thread.join(timeout=0.01) 565 assert thread.is_alive() 566 lock.release() 567 thread.join() 568 assert not thread.is_alive() 569 570 def test_no_refcycle_through_target(self): 571 class RunSelfFunction(object): 572 def __init__(self, should_raise): 573 # The links in this refcycle from Thread back to self 574 # should be cleaned up when the thread completes. 575 self.should_raise = should_raise 576 self.thread = threading.Thread(target=self._run, 577 args=(self,), 578 kwargs={'yet_another':self}) 579 self.thread.start() 580 581 def _run(self, other_ref, yet_another): 582 if self.should_raise: 583 raise SystemExit 584 585 restore_default_excepthook(self) 586 587 cyclic_object = RunSelfFunction(should_raise=False) 588 weak_cyclic_object = weakref.ref(cyclic_object) 589 cyclic_object.thread.join() 590 del cyclic_object 591 self.assertIsNone(weak_cyclic_object(), 592 msg=('%d references still around' % 593 sys.getrefcount(weak_cyclic_object()))) 594 595 raising_cyclic_object = RunSelfFunction(should_raise=True) 596 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 597 raising_cyclic_object.thread.join() 598 del raising_cyclic_object 599 self.assertIsNone(weak_raising_cyclic_object(), 600 msg=('%d references still around' % 601 sys.getrefcount(weak_raising_cyclic_object()))) 602 603 def test_old_threading_api(self): 604 # Just a quick sanity check to make sure the old method names are 605 # still present 606 t = threading.Thread() 607 with self.assertWarnsRegex(DeprecationWarning, 608 r'get the daemon attribute'): 609 t.isDaemon() 610 with self.assertWarnsRegex(DeprecationWarning, 611 r'set the daemon attribute'): 612 t.setDaemon(True) 613 with self.assertWarnsRegex(DeprecationWarning, 614 r'get the name attribute'): 615 t.getName() 616 with self.assertWarnsRegex(DeprecationWarning, 617 r'set the name attribute'): 618 t.setName("name") 619 620 e = threading.Event() 621 with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'): 622 e.isSet() 623 624 cond = threading.Condition() 625 cond.acquire() 626 with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'): 627 cond.notifyAll() 628 629 with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'): 630 threading.activeCount() 631 with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'): 632 threading.currentThread() 633 634 def test_repr_daemon(self): 635 t = threading.Thread() 636 self.assertNotIn('daemon', repr(t)) 637 t.daemon = True 638 self.assertIn('daemon', repr(t)) 639 640 def test_daemon_param(self): 641 t = threading.Thread() 642 self.assertFalse(t.daemon) 643 t = threading.Thread(daemon=False) 644 self.assertFalse(t.daemon) 645 t = threading.Thread(daemon=True) 646 self.assertTrue(t.daemon) 647 648 @skip_unless_reliable_fork 649 def test_dummy_thread_after_fork(self): 650 # Issue #14308: a dummy thread in the active list doesn't mess up 651 # the after-fork mechanism. 652 code = """if 1: 653 import _thread, threading, os, time, warnings 654 655 def background_thread(evt): 656 # Creates and registers the _DummyThread instance 657 threading.current_thread() 658 evt.set() 659 time.sleep(10) 660 661 evt = threading.Event() 662 _thread.start_new_thread(background_thread, (evt,)) 663 evt.wait() 664 assert threading.active_count() == 2, threading.active_count() 665 with warnings.catch_warnings(record=True) as ws: 666 warnings.filterwarnings( 667 "always", category=DeprecationWarning) 668 if os.fork() == 0: 669 assert threading.active_count() == 1, threading.active_count() 670 os._exit(0) 671 else: 672 assert ws[0].category == DeprecationWarning, ws[0] 673 assert 'fork' in str(ws[0].message), ws[0] 674 os.wait() 675 """ 676 _, out, err = assert_python_ok("-c", code) 677 self.assertEqual(out, b'') 678 self.assertEqual(err, b'') 679 680 @skip_unless_reliable_fork 681 def test_is_alive_after_fork(self): 682 # Try hard to trigger #18418: is_alive() could sometimes be True on 683 # threads that vanished after a fork. 684 old_interval = sys.getswitchinterval() 685 self.addCleanup(sys.setswitchinterval, old_interval) 686 687 # Make the bug more likely to manifest. 688 test.support.setswitchinterval(1e-6) 689 690 for i in range(20): 691 t = threading.Thread(target=lambda: None) 692 t.start() 693 # Ignore the warning about fork with threads. 694 with warnings.catch_warnings(category=DeprecationWarning, 695 action="ignore"): 696 if (pid := os.fork()) == 0: 697 os._exit(11 if t.is_alive() else 10) 698 else: 699 t.join() 700 701 support.wait_process(pid, exitcode=10) 702 703 def test_main_thread(self): 704 main = threading.main_thread() 705 self.assertEqual(main.name, 'MainThread') 706 self.assertEqual(main.ident, threading.current_thread().ident) 707 self.assertEqual(main.ident, threading.get_ident()) 708 709 def f(): 710 self.assertNotEqual(threading.main_thread().ident, 711 threading.current_thread().ident) 712 th = threading.Thread(target=f) 713 th.start() 714 th.join() 715 716 @skip_unless_reliable_fork 717 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 718 def test_main_thread_after_fork(self): 719 code = """if 1: 720 import os, threading 721 from test import support 722 723 ident = threading.get_ident() 724 pid = os.fork() 725 if pid == 0: 726 print("current ident", threading.get_ident() == ident) 727 main = threading.main_thread() 728 print("main", main.name) 729 print("main ident", main.ident == ident) 730 print("current is main", threading.current_thread() is main) 731 else: 732 support.wait_process(pid, exitcode=0) 733 """ 734 _, out, err = assert_python_ok("-c", code) 735 data = out.decode().replace('\r', '') 736 self.assertEqual(err, b"") 737 self.assertEqual(data, 738 "current ident True\n" 739 "main MainThread\n" 740 "main ident True\n" 741 "current is main True\n") 742 743 @skip_unless_reliable_fork 744 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 745 def test_main_thread_after_fork_from_nonmain_thread(self): 746 code = """if 1: 747 import os, threading, sys, warnings 748 from test import support 749 750 def func(): 751 ident = threading.get_ident() 752 with warnings.catch_warnings(record=True) as ws: 753 warnings.filterwarnings( 754 "always", category=DeprecationWarning) 755 pid = os.fork() 756 if pid == 0: 757 print("current ident", threading.get_ident() == ident) 758 main = threading.main_thread() 759 print("main", main.name, type(main).__name__) 760 print("main ident", main.ident == ident) 761 print("current is main", threading.current_thread() is main) 762 # stdout is fully buffered because not a tty, 763 # we have to flush before exit. 764 sys.stdout.flush() 765 else: 766 assert ws[0].category == DeprecationWarning, ws[0] 767 assert 'fork' in str(ws[0].message), ws[0] 768 support.wait_process(pid, exitcode=0) 769 770 th = threading.Thread(target=func) 771 th.start() 772 th.join() 773 """ 774 _, out, err = assert_python_ok("-c", code) 775 data = out.decode().replace('\r', '') 776 self.assertEqual(err.decode('utf-8'), "") 777 self.assertEqual(data, 778 "current ident True\n" 779 "main Thread-1 (func) Thread\n" 780 "main ident True\n" 781 "current is main True\n" 782 ) 783 784 @skip_unless_reliable_fork 785 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 786 def test_main_thread_after_fork_from_foreign_thread(self, create_dummy=False): 787 code = """if 1: 788 import os, threading, sys, traceback, _thread 789 from test import support 790 791 def func(lock): 792 ident = threading.get_ident() 793 if %s: 794 # call current_thread() before fork to allocate DummyThread 795 current = threading.current_thread() 796 print("current", current.name, type(current).__name__) 797 print("ident in _active", ident in threading._active) 798 # flush before fork, so child won't flush it again 799 sys.stdout.flush() 800 pid = os.fork() 801 if pid == 0: 802 print("current ident", threading.get_ident() == ident) 803 main = threading.main_thread() 804 print("main", main.name, type(main).__name__) 805 print("main ident", main.ident == ident) 806 print("current is main", threading.current_thread() is main) 807 print("_dangling", [t.name for t in list(threading._dangling)]) 808 # stdout is fully buffered because not a tty, 809 # we have to flush before exit. 810 sys.stdout.flush() 811 try: 812 threading._shutdown() 813 os._exit(0) 814 except: 815 traceback.print_exc() 816 sys.stderr.flush() 817 os._exit(1) 818 else: 819 try: 820 support.wait_process(pid, exitcode=0) 821 except Exception: 822 # avoid 'could not acquire lock for 823 # <_io.BufferedWriter name='<stderr>'> at interpreter shutdown,' 824 traceback.print_exc() 825 sys.stderr.flush() 826 finally: 827 lock.release() 828 829 join_lock = _thread.allocate_lock() 830 join_lock.acquire() 831 th = _thread.start_new_thread(func, (join_lock,)) 832 join_lock.acquire() 833 """ % create_dummy 834 # "DeprecationWarning: This process is multi-threaded, use of fork() 835 # may lead to deadlocks in the child" 836 _, out, err = assert_python_ok("-W", "ignore::DeprecationWarning", "-c", code) 837 data = out.decode().replace('\r', '') 838 self.assertEqual(err.decode(), "") 839 self.assertEqual(data, 840 ("current Dummy-1 _DummyThread\n" if create_dummy else "") + 841 f"ident in _active {create_dummy!s}\n" + 842 "current ident True\n" 843 "main MainThread _MainThread\n" 844 "main ident True\n" 845 "current is main True\n" 846 "_dangling ['MainThread']\n") 847 848 def test_main_thread_after_fork_from_dummy_thread(self, create_dummy=False): 849 self.test_main_thread_after_fork_from_foreign_thread(create_dummy=True) 850 851 def test_main_thread_during_shutdown(self): 852 # bpo-31516: current_thread() should still point to the main thread 853 # at shutdown 854 code = """if 1: 855 import gc, threading 856 857 main_thread = threading.current_thread() 858 assert main_thread is threading.main_thread() # sanity check 859 860 class RefCycle: 861 def __init__(self): 862 self.cycle = self 863 864 def __del__(self): 865 print("GC:", 866 threading.current_thread() is main_thread, 867 threading.main_thread() is main_thread, 868 threading.enumerate() == [main_thread]) 869 870 RefCycle() 871 gc.collect() # sanity check 872 x = RefCycle() 873 """ 874 _, out, err = assert_python_ok("-c", code) 875 data = out.decode() 876 self.assertEqual(err, b"") 877 self.assertEqual(data.splitlines(), 878 ["GC: True True True"] * 2) 879 880 def test_finalization_shutdown(self): 881 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait 882 # until Python thread states of all non-daemon threads get deleted. 883 # 884 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but 885 # test the finalization of the main interpreter. 886 code = """if 1: 887 import os 888 import threading 889 import time 890 import random 891 892 def random_sleep(): 893 seconds = random.random() * 0.010 894 time.sleep(seconds) 895 896 class Sleeper: 897 def __del__(self): 898 random_sleep() 899 900 tls = threading.local() 901 902 def f(): 903 # Sleep a bit so that the thread is still running when 904 # Py_Finalize() is called. 905 random_sleep() 906 tls.x = Sleeper() 907 random_sleep() 908 909 threading.Thread(target=f).start() 910 random_sleep() 911 """ 912 rc, out, err = assert_python_ok("-c", code) 913 self.assertEqual(err, b"") 914 915 def test_repr_stopped(self): 916 # Verify that "stopped" shows up in repr(Thread) appropriately. 917 started = _thread.allocate_lock() 918 finish = _thread.allocate_lock() 919 started.acquire() 920 finish.acquire() 921 def f(): 922 started.release() 923 finish.acquire() 924 t = threading.Thread(target=f) 925 t.start() 926 started.acquire() 927 self.assertIn("started", repr(t)) 928 finish.release() 929 # "stopped" should appear in the repr in a reasonable amount of time. 930 # Implementation detail: as of this writing, that's trivially true 931 # if .join() is called, and almost trivially true if .is_alive() is 932 # called. The detail we're testing here is that "stopped" shows up 933 # "all on its own". 934 LOOKING_FOR = "stopped" 935 for i in range(500): 936 if LOOKING_FOR in repr(t): 937 break 938 time.sleep(0.01) 939 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds 940 t.join() 941 942 def test_BoundedSemaphore_limit(self): 943 # BoundedSemaphore should raise ValueError if released too often. 944 for limit in range(1, 10): 945 bs = threading.BoundedSemaphore(limit) 946 threads = [threading.Thread(target=bs.acquire) 947 for _ in range(limit)] 948 for t in threads: 949 t.start() 950 for t in threads: 951 t.join() 952 threads = [threading.Thread(target=bs.release) 953 for _ in range(limit)] 954 for t in threads: 955 t.start() 956 for t in threads: 957 t.join() 958 self.assertRaises(ValueError, bs.release) 959 960 @cpython_only 961 def test_frame_tstate_tracing(self): 962 _testcapi = import_module("_testcapi") 963 # Issue #14432: Crash when a generator is created in a C thread that is 964 # destroyed while the generator is still used. The issue was that a 965 # generator contains a frame, and the frame kept a reference to the 966 # Python state of the destroyed C thread. The crash occurs when a trace 967 # function is setup. 968 969 def noop_trace(frame, event, arg): 970 # no operation 971 return noop_trace 972 973 def generator(): 974 while 1: 975 yield "generator" 976 977 def callback(): 978 if callback.gen is None: 979 callback.gen = generator() 980 return next(callback.gen) 981 callback.gen = None 982 983 old_trace = sys.gettrace() 984 sys.settrace(noop_trace) 985 try: 986 # Install a trace function 987 threading.settrace(noop_trace) 988 989 # Create a generator in a C thread which exits after the call 990 _testcapi.call_in_temporary_c_thread(callback) 991 992 # Call the generator in a different Python thread, check that the 993 # generator didn't keep a reference to the destroyed thread state 994 for test in range(3): 995 # The trace function is still called here 996 callback() 997 finally: 998 sys.settrace(old_trace) 999 threading.settrace(old_trace) 1000 1001 def test_gettrace(self): 1002 def noop_trace(frame, event, arg): 1003 # no operation 1004 return noop_trace 1005 old_trace = threading.gettrace() 1006 try: 1007 threading.settrace(noop_trace) 1008 trace_func = threading.gettrace() 1009 self.assertEqual(noop_trace,trace_func) 1010 finally: 1011 threading.settrace(old_trace) 1012 1013 def test_gettrace_all_threads(self): 1014 def fn(*args): pass 1015 old_trace = threading.gettrace() 1016 first_check = threading.Event() 1017 second_check = threading.Event() 1018 1019 trace_funcs = [] 1020 def checker(): 1021 trace_funcs.append(sys.gettrace()) 1022 first_check.set() 1023 second_check.wait() 1024 trace_funcs.append(sys.gettrace()) 1025 1026 try: 1027 t = threading.Thread(target=checker) 1028 t.start() 1029 first_check.wait() 1030 threading.settrace_all_threads(fn) 1031 second_check.set() 1032 t.join() 1033 self.assertEqual(trace_funcs, [None, fn]) 1034 self.assertEqual(threading.gettrace(), fn) 1035 self.assertEqual(sys.gettrace(), fn) 1036 finally: 1037 threading.settrace_all_threads(old_trace) 1038 1039 self.assertEqual(threading.gettrace(), old_trace) 1040 self.assertEqual(sys.gettrace(), old_trace) 1041 1042 def test_getprofile(self): 1043 def fn(*args): pass 1044 old_profile = threading.getprofile() 1045 try: 1046 threading.setprofile(fn) 1047 self.assertEqual(fn, threading.getprofile()) 1048 finally: 1049 threading.setprofile(old_profile) 1050 1051 def test_getprofile_all_threads(self): 1052 def fn(*args): pass 1053 old_profile = threading.getprofile() 1054 first_check = threading.Event() 1055 second_check = threading.Event() 1056 1057 profile_funcs = [] 1058 def checker(): 1059 profile_funcs.append(sys.getprofile()) 1060 first_check.set() 1061 second_check.wait() 1062 profile_funcs.append(sys.getprofile()) 1063 1064 try: 1065 t = threading.Thread(target=checker) 1066 t.start() 1067 first_check.wait() 1068 threading.setprofile_all_threads(fn) 1069 second_check.set() 1070 t.join() 1071 self.assertEqual(profile_funcs, [None, fn]) 1072 self.assertEqual(threading.getprofile(), fn) 1073 self.assertEqual(sys.getprofile(), fn) 1074 finally: 1075 threading.setprofile_all_threads(old_profile) 1076 1077 self.assertEqual(threading.getprofile(), old_profile) 1078 self.assertEqual(sys.getprofile(), old_profile) 1079 1080 def test_locals_at_exit(self): 1081 # bpo-19466: thread locals must not be deleted before destructors 1082 # are called 1083 rc, out, err = assert_python_ok("-c", """if 1: 1084 import threading 1085 1086 class Atexit: 1087 def __del__(self): 1088 print("thread_dict.atexit = %r" % thread_dict.atexit) 1089 1090 thread_dict = threading.local() 1091 thread_dict.atexit = "value" 1092 1093 atexit = Atexit() 1094 """) 1095 self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'") 1096 1097 def test_boolean_target(self): 1098 # bpo-41149: A thread that had a boolean value of False would not 1099 # run, regardless of whether it was callable. The correct behaviour 1100 # is for a thread to do nothing if its target is None, and to call 1101 # the target otherwise. 1102 class BooleanTarget(object): 1103 def __init__(self): 1104 self.ran = False 1105 def __bool__(self): 1106 return False 1107 def __call__(self): 1108 self.ran = True 1109 1110 target = BooleanTarget() 1111 thread = threading.Thread(target=target) 1112 thread.start() 1113 thread.join() 1114 self.assertTrue(target.ran) 1115 1116 def test_leak_without_join(self): 1117 # bpo-37788: Test that a thread which is not joined explicitly 1118 # does not leak. Test written for reference leak checks. 1119 def noop(): pass 1120 with threading_helper.wait_threads_exit(): 1121 threading.Thread(target=noop).start() 1122 # Thread.join() is not called 1123 1124 def test_import_from_another_thread(self): 1125 # bpo-1596321: If the threading module is first import from a thread 1126 # different than the main thread, threading._shutdown() must handle 1127 # this case without logging an error at Python exit. 1128 code = textwrap.dedent(''' 1129 import _thread 1130 import sys 1131 1132 event = _thread.allocate_lock() 1133 event.acquire() 1134 1135 def import_threading(): 1136 import threading 1137 event.release() 1138 1139 if 'threading' in sys.modules: 1140 raise Exception('threading is already imported') 1141 1142 _thread.start_new_thread(import_threading, ()) 1143 1144 # wait until the threading module is imported 1145 event.acquire() 1146 event.release() 1147 1148 if 'threading' not in sys.modules: 1149 raise Exception('threading is not imported') 1150 1151 # don't wait until the thread completes 1152 ''') 1153 rc, out, err = assert_python_ok("-c", code) 1154 self.assertEqual(out, b'') 1155 self.assertEqual(err, b'') 1156 1157 def test_start_new_thread_at_finalization(self): 1158 code = """if 1: 1159 import _thread 1160 1161 def f(): 1162 print("shouldn't be printed") 1163 1164 class AtFinalization: 1165 def __del__(self): 1166 print("OK") 1167 _thread.start_new_thread(f, ()) 1168 at_finalization = AtFinalization() 1169 """ 1170 _, out, err = assert_python_ok("-c", code) 1171 self.assertEqual(out.strip(), b"OK") 1172 self.assertIn(b"can't create new thread at interpreter shutdown", err) 1173 1174 def test_start_new_thread_failed(self): 1175 # gh-109746: if Python fails to start newly created thread 1176 # due to failure of underlying PyThread_start_new_thread() call, 1177 # its state should be removed from interpreter' thread states list 1178 # to avoid its double cleanup 1179 try: 1180 from resource import setrlimit, RLIMIT_NPROC 1181 except ImportError as err: 1182 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 1183 code = """if 1: 1184 import resource 1185 import _thread 1186 1187 def f(): 1188 print("shouldn't be printed") 1189 1190 limits = resource.getrlimit(resource.RLIMIT_NPROC) 1191 [_, hard] = limits 1192 resource.setrlimit(resource.RLIMIT_NPROC, (0, hard)) 1193 1194 try: 1195 _thread.start_new_thread(f, ()) 1196 except RuntimeError: 1197 print('ok') 1198 else: 1199 print('!skip!') 1200 """ 1201 _, out, err = assert_python_ok("-u", "-c", code) 1202 out = out.strip() 1203 if b'!skip!' in out: 1204 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 1205 self.assertEqual(out, b'ok') 1206 self.assertEqual(err, b'') 1207 1208 1209class ThreadJoinOnShutdown(BaseTestCase): 1210 1211 def _run_and_join(self, script): 1212 script = """if 1: 1213 import sys, os, time, threading 1214 1215 # a thread, which waits for the main program to terminate 1216 def joiningfunc(mainthread): 1217 mainthread.join() 1218 print('end of thread') 1219 # stdout is fully buffered because not a tty, we have to flush 1220 # before exit. 1221 sys.stdout.flush() 1222 \n""" + script 1223 1224 rc, out, err = assert_python_ok("-c", script) 1225 data = out.decode().replace('\r', '') 1226 self.assertEqual(data, "end of main\nend of thread\n") 1227 1228 def test_1_join_on_shutdown(self): 1229 # The usual case: on exit, wait for a non-daemon thread 1230 script = """if 1: 1231 import os 1232 t = threading.Thread(target=joiningfunc, 1233 args=(threading.current_thread(),)) 1234 t.start() 1235 time.sleep(0.1) 1236 print('end of main') 1237 """ 1238 self._run_and_join(script) 1239 1240 @skip_unless_reliable_fork 1241 def test_2_join_in_forked_process(self): 1242 # Like the test above, but from a forked interpreter 1243 script = """if 1: 1244 from test import support 1245 1246 childpid = os.fork() 1247 if childpid != 0: 1248 # parent process 1249 support.wait_process(childpid, exitcode=0) 1250 sys.exit(0) 1251 1252 # child process 1253 t = threading.Thread(target=joiningfunc, 1254 args=(threading.current_thread(),)) 1255 t.start() 1256 print('end of main') 1257 """ 1258 self._run_and_join(script) 1259 1260 @skip_unless_reliable_fork 1261 def test_3_join_in_forked_from_thread(self): 1262 # Like the test above, but fork() was called from a worker thread 1263 # In the forked process, the main Thread object must be marked as stopped. 1264 1265 script = """if 1: 1266 from test import support 1267 1268 main_thread = threading.current_thread() 1269 def worker(): 1270 childpid = os.fork() 1271 if childpid != 0: 1272 # parent process 1273 support.wait_process(childpid, exitcode=0) 1274 sys.exit(0) 1275 1276 # child process 1277 t = threading.Thread(target=joiningfunc, 1278 args=(main_thread,)) 1279 print('end of main') 1280 t.start() 1281 t.join() # Should not block: main_thread is already stopped 1282 1283 w = threading.Thread(target=worker) 1284 w.start() 1285 """ 1286 self._run_and_join(script) 1287 1288 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 1289 def test_4_daemon_threads(self): 1290 # Check that a daemon thread cannot crash the interpreter on shutdown 1291 # by manipulating internal structures that are being disposed of in 1292 # the main thread. 1293 if support.check_sanitizer(thread=True): 1294 # some of the threads running `random_io` below will still be alive 1295 # at process exit 1296 self.skipTest("TSAN would report thread leak") 1297 1298 script = """if True: 1299 import os 1300 import random 1301 import sys 1302 import time 1303 import threading 1304 1305 thread_has_run = set() 1306 1307 def random_io(): 1308 '''Loop for a while sleeping random tiny amounts and doing some I/O.''' 1309 import test.test_threading as mod 1310 while True: 1311 with open(mod.__file__, 'rb') as in_f: 1312 stuff = in_f.read(200) 1313 with open(os.devnull, 'wb') as null_f: 1314 null_f.write(stuff) 1315 time.sleep(random.random() / 1995) 1316 thread_has_run.add(threading.current_thread()) 1317 1318 def main(): 1319 count = 0 1320 for _ in range(40): 1321 new_thread = threading.Thread(target=random_io) 1322 new_thread.daemon = True 1323 new_thread.start() 1324 count += 1 1325 while len(thread_has_run) < count: 1326 time.sleep(0.001) 1327 # Trigger process shutdown 1328 sys.exit(0) 1329 1330 main() 1331 """ 1332 rc, out, err = assert_python_ok('-c', script) 1333 self.assertFalse(err) 1334 1335 def test_thread_from_thread(self): 1336 script = """if True: 1337 import threading 1338 import time 1339 1340 def thread2(): 1341 time.sleep(0.05) 1342 print("OK") 1343 1344 def thread1(): 1345 time.sleep(0.05) 1346 t2 = threading.Thread(target=thread2) 1347 t2.start() 1348 1349 t = threading.Thread(target=thread1) 1350 t.start() 1351 # do not join() -- the interpreter waits for non-daemon threads to 1352 # finish. 1353 """ 1354 rc, out, err = assert_python_ok('-c', script) 1355 self.assertEqual(err, b"") 1356 self.assertEqual(out.strip(), b"OK") 1357 self.assertEqual(rc, 0) 1358 1359 @skip_unless_reliable_fork 1360 def test_reinit_tls_after_fork(self): 1361 # Issue #13817: fork() would deadlock in a multithreaded program with 1362 # the ad-hoc TLS implementation. 1363 1364 def do_fork_and_wait(): 1365 # just fork a child process and wait it 1366 pid = os.fork() 1367 if pid > 0: 1368 support.wait_process(pid, exitcode=50) 1369 else: 1370 os._exit(50) 1371 1372 # Ignore the warning about fork with threads. 1373 with warnings.catch_warnings(category=DeprecationWarning, 1374 action="ignore"): 1375 # start a bunch of threads that will fork() child processes 1376 threads = [] 1377 for i in range(16): 1378 t = threading.Thread(target=do_fork_and_wait) 1379 threads.append(t) 1380 t.start() 1381 1382 for t in threads: 1383 t.join() 1384 1385 @skip_unless_reliable_fork 1386 def test_clear_threads_states_after_fork(self): 1387 # Issue #17094: check that threads states are cleared after fork() 1388 1389 # start a bunch of threads 1390 threads = [] 1391 for i in range(16): 1392 t = threading.Thread(target=lambda : time.sleep(0.3)) 1393 threads.append(t) 1394 t.start() 1395 1396 try: 1397 # Ignore the warning about fork with threads. 1398 with warnings.catch_warnings(category=DeprecationWarning, 1399 action="ignore"): 1400 pid = os.fork() 1401 if pid == 0: 1402 # check that threads states have been cleared 1403 if len(sys._current_frames()) == 1: 1404 os._exit(51) 1405 else: 1406 os._exit(52) 1407 else: 1408 support.wait_process(pid, exitcode=51) 1409 finally: 1410 for t in threads: 1411 t.join() 1412 1413 1414class SubinterpThreadingTests(BaseTestCase): 1415 def pipe(self): 1416 r, w = os.pipe() 1417 self.addCleanup(os.close, r) 1418 self.addCleanup(os.close, w) 1419 if hasattr(os, 'set_blocking'): 1420 os.set_blocking(r, False) 1421 return (r, w) 1422 1423 def test_threads_join(self): 1424 # Non-daemon threads should be joined at subinterpreter shutdown 1425 # (issue #18808) 1426 r, w = self.pipe() 1427 code = textwrap.dedent(r""" 1428 import os 1429 import random 1430 import threading 1431 import time 1432 1433 def random_sleep(): 1434 seconds = random.random() * 0.010 1435 time.sleep(seconds) 1436 1437 def f(): 1438 # Sleep a bit so that the thread is still running when 1439 # Py_EndInterpreter is called. 1440 random_sleep() 1441 os.write(%d, b"x") 1442 1443 threading.Thread(target=f).start() 1444 random_sleep() 1445 """ % (w,)) 1446 ret = test.support.run_in_subinterp(code) 1447 self.assertEqual(ret, 0) 1448 # The thread was joined properly. 1449 self.assertEqual(os.read(r, 1), b"x") 1450 1451 def test_threads_join_2(self): 1452 # Same as above, but a delay gets introduced after the thread's 1453 # Python code returned but before the thread state is deleted. 1454 # To achieve this, we register a thread-local object which sleeps 1455 # a bit when deallocated. 1456 r, w = self.pipe() 1457 code = textwrap.dedent(r""" 1458 import os 1459 import random 1460 import threading 1461 import time 1462 1463 def random_sleep(): 1464 seconds = random.random() * 0.010 1465 time.sleep(seconds) 1466 1467 class Sleeper: 1468 def __del__(self): 1469 random_sleep() 1470 1471 tls = threading.local() 1472 1473 def f(): 1474 # Sleep a bit so that the thread is still running when 1475 # Py_EndInterpreter is called. 1476 random_sleep() 1477 tls.x = Sleeper() 1478 os.write(%d, b"x") 1479 1480 threading.Thread(target=f).start() 1481 random_sleep() 1482 """ % (w,)) 1483 ret = test.support.run_in_subinterp(code) 1484 self.assertEqual(ret, 0) 1485 # The thread was joined properly. 1486 self.assertEqual(os.read(r, 1), b"x") 1487 1488 @requires_subinterpreters 1489 def test_threads_join_with_no_main(self): 1490 r_interp, w_interp = self.pipe() 1491 1492 INTERP = b'I' 1493 FINI = b'F' 1494 DONE = b'D' 1495 1496 interp = interpreters.create() 1497 interp.exec(f"""if True: 1498 import os 1499 import threading 1500 import time 1501 1502 done = False 1503 1504 def notify_fini(): 1505 global done 1506 done = True 1507 os.write({w_interp}, {FINI!r}) 1508 t.join() 1509 threading._register_atexit(notify_fini) 1510 1511 def task(): 1512 while not done: 1513 time.sleep(0.1) 1514 os.write({w_interp}, {DONE!r}) 1515 t = threading.Thread(target=task) 1516 t.start() 1517 1518 os.write({w_interp}, {INTERP!r}) 1519 """) 1520 interp.close() 1521 1522 self.assertEqual(os.read(r_interp, 1), INTERP) 1523 self.assertEqual(os.read(r_interp, 1), FINI) 1524 self.assertEqual(os.read(r_interp, 1), DONE) 1525 1526 @cpython_only 1527 def test_daemon_threads_fatal_error(self): 1528 import_module("_testcapi") 1529 subinterp_code = f"""if 1: 1530 import os 1531 import threading 1532 import time 1533 1534 def f(): 1535 # Make sure the daemon thread is still running when 1536 # Py_EndInterpreter is called. 1537 time.sleep({test.support.SHORT_TIMEOUT}) 1538 threading.Thread(target=f, daemon=True).start() 1539 """ 1540 script = r"""if 1: 1541 import _testcapi 1542 1543 _testcapi.run_in_subinterp(%r) 1544 """ % (subinterp_code,) 1545 with test.support.SuppressCrashReport(): 1546 rc, out, err = assert_python_failure("-c", script) 1547 self.assertIn("Fatal Python error: Py_EndInterpreter: " 1548 "not the last thread", err.decode()) 1549 1550 def _check_allowed(self, before_start='', *, 1551 allowed=True, 1552 daemon_allowed=True, 1553 daemon=False, 1554 ): 1555 import_module("_testinternalcapi") 1556 subinterp_code = textwrap.dedent(f""" 1557 import test.support 1558 import threading 1559 def func(): 1560 print('this should not have run!') 1561 t = threading.Thread(target=func, daemon={daemon}) 1562 {before_start} 1563 t.start() 1564 """) 1565 check_multi_interp_extensions = bool(support.Py_GIL_DISABLED) 1566 script = textwrap.dedent(f""" 1567 import test.support 1568 test.support.run_in_subinterp_with_config( 1569 {subinterp_code!r}, 1570 use_main_obmalloc=True, 1571 allow_fork=True, 1572 allow_exec=True, 1573 allow_threads={allowed}, 1574 allow_daemon_threads={daemon_allowed}, 1575 check_multi_interp_extensions={check_multi_interp_extensions}, 1576 own_gil=False, 1577 ) 1578 """) 1579 with test.support.SuppressCrashReport(): 1580 _, _, err = assert_python_ok("-c", script) 1581 return err.decode() 1582 1583 @cpython_only 1584 def test_threads_not_allowed(self): 1585 err = self._check_allowed( 1586 allowed=False, 1587 daemon_allowed=False, 1588 daemon=False, 1589 ) 1590 self.assertIn('RuntimeError', err) 1591 1592 @cpython_only 1593 def test_daemon_threads_not_allowed(self): 1594 with self.subTest('via Thread()'): 1595 err = self._check_allowed( 1596 allowed=True, 1597 daemon_allowed=False, 1598 daemon=True, 1599 ) 1600 self.assertIn('RuntimeError', err) 1601 1602 with self.subTest('via Thread.daemon setter'): 1603 err = self._check_allowed( 1604 't.daemon = True', 1605 allowed=True, 1606 daemon_allowed=False, 1607 daemon=False, 1608 ) 1609 self.assertIn('RuntimeError', err) 1610 1611 1612class ThreadingExceptionTests(BaseTestCase): 1613 # A RuntimeError should be raised if Thread.start() is called 1614 # multiple times. 1615 def test_start_thread_again(self): 1616 thread = threading.Thread() 1617 thread.start() 1618 self.assertRaises(RuntimeError, thread.start) 1619 thread.join() 1620 1621 def test_joining_current_thread(self): 1622 current_thread = threading.current_thread() 1623 self.assertRaises(RuntimeError, current_thread.join); 1624 1625 def test_joining_inactive_thread(self): 1626 thread = threading.Thread() 1627 self.assertRaises(RuntimeError, thread.join) 1628 1629 def test_daemonize_active_thread(self): 1630 thread = threading.Thread() 1631 thread.start() 1632 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 1633 thread.join() 1634 1635 def test_releasing_unacquired_lock(self): 1636 lock = threading.Lock() 1637 self.assertRaises(RuntimeError, lock.release) 1638 1639 @requires_subprocess() 1640 def test_recursion_limit(self): 1641 # Issue 9670 1642 # test that excessive recursion within a non-main thread causes 1643 # an exception rather than crashing the interpreter on platforms 1644 # like Mac OS X or FreeBSD which have small default stack sizes 1645 # for threads 1646 script = """if True: 1647 import threading 1648 1649 def recurse(): 1650 return recurse() 1651 1652 def outer(): 1653 try: 1654 recurse() 1655 except RecursionError: 1656 pass 1657 1658 w = threading.Thread(target=outer) 1659 w.start() 1660 w.join() 1661 print('end of main thread') 1662 """ 1663 expected_output = "end of main thread\n" 1664 p = subprocess.Popen([sys.executable, "-c", script], 1665 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1666 stdout, stderr = p.communicate() 1667 data = stdout.decode().replace('\r', '') 1668 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) 1669 self.assertEqual(data, expected_output) 1670 1671 def test_print_exception(self): 1672 script = r"""if True: 1673 import threading 1674 import time 1675 1676 running = False 1677 def run(): 1678 global running 1679 running = True 1680 while running: 1681 time.sleep(0.01) 1682 1/0 1683 t = threading.Thread(target=run) 1684 t.start() 1685 while not running: 1686 time.sleep(0.01) 1687 running = False 1688 t.join() 1689 """ 1690 rc, out, err = assert_python_ok("-c", script) 1691 self.assertEqual(out, b'') 1692 err = err.decode() 1693 self.assertIn("Exception in thread", err) 1694 self.assertIn("Traceback (most recent call last):", err) 1695 self.assertIn("ZeroDivisionError", err) 1696 self.assertNotIn("Unhandled exception", err) 1697 1698 def test_print_exception_stderr_is_none_1(self): 1699 script = r"""if True: 1700 import sys 1701 import threading 1702 import time 1703 1704 running = False 1705 def run(): 1706 global running 1707 running = True 1708 while running: 1709 time.sleep(0.01) 1710 1/0 1711 t = threading.Thread(target=run) 1712 t.start() 1713 while not running: 1714 time.sleep(0.01) 1715 sys.stderr = None 1716 running = False 1717 t.join() 1718 """ 1719 rc, out, err = assert_python_ok("-c", script) 1720 self.assertEqual(out, b'') 1721 err = err.decode() 1722 self.assertIn("Exception in thread", err) 1723 self.assertIn("Traceback (most recent call last):", err) 1724 self.assertIn("ZeroDivisionError", err) 1725 self.assertNotIn("Unhandled exception", err) 1726 1727 def test_print_exception_stderr_is_none_2(self): 1728 script = r"""if True: 1729 import sys 1730 import threading 1731 import time 1732 1733 running = False 1734 def run(): 1735 global running 1736 running = True 1737 while running: 1738 time.sleep(0.01) 1739 1/0 1740 sys.stderr = None 1741 t = threading.Thread(target=run) 1742 t.start() 1743 while not running: 1744 time.sleep(0.01) 1745 running = False 1746 t.join() 1747 """ 1748 rc, out, err = assert_python_ok("-c", script) 1749 self.assertEqual(out, b'') 1750 self.assertNotIn("Unhandled exception", err.decode()) 1751 1752 def test_print_exception_gh_102056(self): 1753 # This used to crash. See gh-102056. 1754 script = r"""if True: 1755 import time 1756 import threading 1757 import _thread 1758 1759 def f(): 1760 try: 1761 f() 1762 except RecursionError: 1763 f() 1764 1765 def g(): 1766 try: 1767 raise ValueError() 1768 except* ValueError: 1769 f() 1770 1771 def h(): 1772 time.sleep(1) 1773 _thread.interrupt_main() 1774 1775 t = threading.Thread(target=h) 1776 t.start() 1777 g() 1778 t.join() 1779 """ 1780 1781 assert_python_failure("-c", script) 1782 1783 def test_bare_raise_in_brand_new_thread(self): 1784 def bare_raise(): 1785 raise 1786 1787 class Issue27558(threading.Thread): 1788 exc = None 1789 1790 def run(self): 1791 try: 1792 bare_raise() 1793 except Exception as exc: 1794 self.exc = exc 1795 1796 thread = Issue27558() 1797 thread.start() 1798 thread.join() 1799 self.assertIsNotNone(thread.exc) 1800 self.assertIsInstance(thread.exc, RuntimeError) 1801 # explicitly break the reference cycle to not leak a dangling thread 1802 thread.exc = None 1803 1804 def test_multithread_modify_file_noerror(self): 1805 # See issue25872 1806 def modify_file(): 1807 with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp: 1808 fp.write(' ') 1809 traceback.format_stack() 1810 1811 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 1812 threads = [ 1813 threading.Thread(target=modify_file) 1814 for i in range(100) 1815 ] 1816 for t in threads: 1817 t.start() 1818 t.join() 1819 1820 1821class ThreadRunFail(threading.Thread): 1822 def run(self): 1823 raise ValueError("run failed") 1824 1825 1826class ExceptHookTests(BaseTestCase): 1827 def setUp(self): 1828 restore_default_excepthook(self) 1829 super().setUp() 1830 1831 @force_not_colorized 1832 def test_excepthook(self): 1833 with support.captured_output("stderr") as stderr: 1834 thread = ThreadRunFail(name="excepthook thread") 1835 thread.start() 1836 thread.join() 1837 1838 stderr = stderr.getvalue().strip() 1839 self.assertIn(f'Exception in thread {thread.name}:\n', stderr) 1840 self.assertIn('Traceback (most recent call last):\n', stderr) 1841 self.assertIn(' raise ValueError("run failed")', stderr) 1842 self.assertIn('ValueError: run failed', stderr) 1843 1844 @support.cpython_only 1845 @force_not_colorized 1846 def test_excepthook_thread_None(self): 1847 # threading.excepthook called with thread=None: log the thread 1848 # identifier in this case. 1849 with support.captured_output("stderr") as stderr: 1850 try: 1851 raise ValueError("bug") 1852 except Exception as exc: 1853 args = threading.ExceptHookArgs([*sys.exc_info(), None]) 1854 try: 1855 threading.excepthook(args) 1856 finally: 1857 # Explicitly break a reference cycle 1858 args = None 1859 1860 stderr = stderr.getvalue().strip() 1861 self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr) 1862 self.assertIn('Traceback (most recent call last):\n', stderr) 1863 self.assertIn(' raise ValueError("bug")', stderr) 1864 self.assertIn('ValueError: bug', stderr) 1865 1866 def test_system_exit(self): 1867 class ThreadExit(threading.Thread): 1868 def run(self): 1869 sys.exit(1) 1870 1871 # threading.excepthook() silently ignores SystemExit 1872 with support.captured_output("stderr") as stderr: 1873 thread = ThreadExit() 1874 thread.start() 1875 thread.join() 1876 1877 self.assertEqual(stderr.getvalue(), '') 1878 1879 def test_custom_excepthook(self): 1880 args = None 1881 1882 def hook(hook_args): 1883 nonlocal args 1884 args = hook_args 1885 1886 try: 1887 with support.swap_attr(threading, 'excepthook', hook): 1888 thread = ThreadRunFail() 1889 thread.start() 1890 thread.join() 1891 1892 self.assertEqual(args.exc_type, ValueError) 1893 self.assertEqual(str(args.exc_value), 'run failed') 1894 self.assertEqual(args.exc_traceback, args.exc_value.__traceback__) 1895 self.assertIs(args.thread, thread) 1896 finally: 1897 # Break reference cycle 1898 args = None 1899 1900 def test_custom_excepthook_fail(self): 1901 def threading_hook(args): 1902 raise ValueError("threading_hook failed") 1903 1904 err_str = None 1905 1906 def sys_hook(exc_type, exc_value, exc_traceback): 1907 nonlocal err_str 1908 err_str = str(exc_value) 1909 1910 with support.swap_attr(threading, 'excepthook', threading_hook), \ 1911 support.swap_attr(sys, 'excepthook', sys_hook), \ 1912 support.captured_output('stderr') as stderr: 1913 thread = ThreadRunFail() 1914 thread.start() 1915 thread.join() 1916 1917 self.assertEqual(stderr.getvalue(), 1918 'Exception in threading.excepthook:\n') 1919 self.assertEqual(err_str, 'threading_hook failed') 1920 1921 def test_original_excepthook(self): 1922 def run_thread(): 1923 with support.captured_output("stderr") as output: 1924 thread = ThreadRunFail(name="excepthook thread") 1925 thread.start() 1926 thread.join() 1927 return output.getvalue() 1928 1929 def threading_hook(args): 1930 print("Running a thread failed", file=sys.stderr) 1931 1932 default_output = run_thread() 1933 with support.swap_attr(threading, 'excepthook', threading_hook): 1934 custom_hook_output = run_thread() 1935 threading.excepthook = threading.__excepthook__ 1936 recovered_output = run_thread() 1937 1938 self.assertEqual(default_output, recovered_output) 1939 self.assertNotEqual(default_output, custom_hook_output) 1940 self.assertEqual(custom_hook_output, "Running a thread failed\n") 1941 1942 1943class TimerTests(BaseTestCase): 1944 1945 def setUp(self): 1946 BaseTestCase.setUp(self) 1947 self.callback_args = [] 1948 self.callback_event = threading.Event() 1949 1950 def test_init_immutable_default_args(self): 1951 # Issue 17435: constructor defaults were mutable objects, they could be 1952 # mutated via the object attributes and affect other Timer objects. 1953 timer1 = threading.Timer(0.01, self._callback_spy) 1954 timer1.start() 1955 self.callback_event.wait() 1956 timer1.args.append("blah") 1957 timer1.kwargs["foo"] = "bar" 1958 self.callback_event.clear() 1959 timer2 = threading.Timer(0.01, self._callback_spy) 1960 timer2.start() 1961 self.callback_event.wait() 1962 self.assertEqual(len(self.callback_args), 2) 1963 self.assertEqual(self.callback_args, [((), {}), ((), {})]) 1964 timer1.join() 1965 timer2.join() 1966 1967 def _callback_spy(self, *args, **kwargs): 1968 self.callback_args.append((args[:], kwargs.copy())) 1969 self.callback_event.set() 1970 1971class LockTests(lock_tests.LockTests): 1972 locktype = staticmethod(threading.Lock) 1973 1974class PyRLockTests(lock_tests.RLockTests): 1975 locktype = staticmethod(threading._PyRLock) 1976 1977@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') 1978class CRLockTests(lock_tests.RLockTests): 1979 locktype = staticmethod(threading._CRLock) 1980 1981 def test_signature(self): # gh-102029 1982 with warnings.catch_warnings(record=True) as warnings_log: 1983 threading.RLock() 1984 self.assertEqual(warnings_log, []) 1985 1986 arg_types = [ 1987 ((1,), {}), 1988 ((), {'a': 1}), 1989 ((1, 2), {'a': 1}), 1990 ] 1991 for args, kwargs in arg_types: 1992 with self.subTest(args=args, kwargs=kwargs): 1993 with self.assertWarns(DeprecationWarning): 1994 threading.RLock(*args, **kwargs) 1995 1996 # Subtypes with custom `__init__` are allowed (but, not recommended): 1997 class CustomRLock(self.locktype): 1998 def __init__(self, a, *, b) -> None: 1999 super().__init__() 2000 2001 with warnings.catch_warnings(record=True) as warnings_log: 2002 CustomRLock(1, b=2) 2003 self.assertEqual(warnings_log, []) 2004 2005class EventTests(lock_tests.EventTests): 2006 eventtype = staticmethod(threading.Event) 2007 2008class ConditionAsRLockTests(lock_tests.RLockTests): 2009 # Condition uses an RLock by default and exports its API. 2010 locktype = staticmethod(threading.Condition) 2011 2012 def test_recursion_count(self): 2013 self.skipTest("Condition does not expose _recursion_count()") 2014 2015class ConditionTests(lock_tests.ConditionTests): 2016 condtype = staticmethod(threading.Condition) 2017 2018class SemaphoreTests(lock_tests.SemaphoreTests): 2019 semtype = staticmethod(threading.Semaphore) 2020 2021class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 2022 semtype = staticmethod(threading.BoundedSemaphore) 2023 2024class BarrierTests(lock_tests.BarrierTests): 2025 barriertype = staticmethod(threading.Barrier) 2026 2027 2028class MiscTestCase(unittest.TestCase): 2029 def test__all__(self): 2030 restore_default_excepthook(self) 2031 2032 extra = {"ThreadError"} 2033 not_exported = {'currentThread', 'activeCount'} 2034 support.check__all__(self, threading, ('threading', '_thread'), 2035 extra=extra, not_exported=not_exported) 2036 2037 2038class InterruptMainTests(unittest.TestCase): 2039 def check_interrupt_main_with_signal_handler(self, signum): 2040 def handler(signum, frame): 2041 1/0 2042 2043 old_handler = signal.signal(signum, handler) 2044 self.addCleanup(signal.signal, signum, old_handler) 2045 2046 with self.assertRaises(ZeroDivisionError): 2047 _thread.interrupt_main() 2048 2049 def check_interrupt_main_noerror(self, signum): 2050 handler = signal.getsignal(signum) 2051 try: 2052 # No exception should arise. 2053 signal.signal(signum, signal.SIG_IGN) 2054 _thread.interrupt_main(signum) 2055 2056 signal.signal(signum, signal.SIG_DFL) 2057 _thread.interrupt_main(signum) 2058 finally: 2059 # Restore original handler 2060 signal.signal(signum, handler) 2061 2062 @requires_gil_enabled("gh-118433: Flaky due to a longstanding bug") 2063 def test_interrupt_main_subthread(self): 2064 # Calling start_new_thread with a function that executes interrupt_main 2065 # should raise KeyboardInterrupt upon completion. 2066 def call_interrupt(): 2067 _thread.interrupt_main() 2068 t = threading.Thread(target=call_interrupt) 2069 with self.assertRaises(KeyboardInterrupt): 2070 t.start() 2071 t.join() 2072 t.join() 2073 2074 def test_interrupt_main_mainthread(self): 2075 # Make sure that if interrupt_main is called in main thread that 2076 # KeyboardInterrupt is raised instantly. 2077 with self.assertRaises(KeyboardInterrupt): 2078 _thread.interrupt_main() 2079 2080 def test_interrupt_main_with_signal_handler(self): 2081 self.check_interrupt_main_with_signal_handler(signal.SIGINT) 2082 self.check_interrupt_main_with_signal_handler(signal.SIGTERM) 2083 2084 def test_interrupt_main_noerror(self): 2085 self.check_interrupt_main_noerror(signal.SIGINT) 2086 self.check_interrupt_main_noerror(signal.SIGTERM) 2087 2088 def test_interrupt_main_invalid_signal(self): 2089 self.assertRaises(ValueError, _thread.interrupt_main, -1) 2090 self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG) 2091 self.assertRaises(ValueError, _thread.interrupt_main, 1000000) 2092 2093 @threading_helper.reap_threads 2094 def test_can_interrupt_tight_loops(self): 2095 cont = [True] 2096 started = [False] 2097 interrupted = [False] 2098 2099 def worker(started, cont, interrupted): 2100 iterations = 100_000_000 2101 started[0] = True 2102 while cont[0]: 2103 if iterations: 2104 iterations -= 1 2105 else: 2106 return 2107 pass 2108 interrupted[0] = True 2109 2110 t = threading.Thread(target=worker,args=(started, cont, interrupted)) 2111 t.start() 2112 while not started[0]: 2113 pass 2114 cont[0] = False 2115 t.join() 2116 self.assertTrue(interrupted[0]) 2117 2118 2119class AtexitTests(unittest.TestCase): 2120 2121 def test_atexit_output(self): 2122 rc, out, err = assert_python_ok("-c", """if True: 2123 import threading 2124 2125 def run_last(): 2126 print('parrot') 2127 2128 threading._register_atexit(run_last) 2129 """) 2130 2131 self.assertFalse(err) 2132 self.assertEqual(out.strip(), b'parrot') 2133 2134 def test_atexit_called_once(self): 2135 rc, out, err = assert_python_ok("-c", """if True: 2136 import threading 2137 from unittest.mock import Mock 2138 2139 mock = Mock() 2140 threading._register_atexit(mock) 2141 mock.assert_not_called() 2142 # force early shutdown to ensure it was called once 2143 threading._shutdown() 2144 mock.assert_called_once() 2145 """) 2146 2147 self.assertFalse(err) 2148 2149 def test_atexit_after_shutdown(self): 2150 # The only way to do this is by registering an atexit within 2151 # an atexit, which is intended to raise an exception. 2152 rc, out, err = assert_python_ok("-c", """if True: 2153 import threading 2154 2155 def func(): 2156 pass 2157 2158 def run_last(): 2159 threading._register_atexit(func) 2160 2161 threading._register_atexit(run_last) 2162 """) 2163 2164 self.assertTrue(err) 2165 self.assertIn("RuntimeError: can't register atexit after shutdown", 2166 err.decode()) 2167 2168 2169if __name__ == "__main__": 2170 unittest.main() 2171