1# Very rudimentary test of threading module 2 3import test.test_support 4from test.test_support import verbose, cpython_only 5from test.script_helper import assert_python_ok 6 7import random 8import re 9import sys 10thread = test.test_support.import_module('thread') 11threading = test.test_support.import_module('threading') 12import time 13import unittest 14import weakref 15import os 16import subprocess 17try: 18 import _testcapi 19except ImportError: 20 _testcapi = None 21 22from test import lock_tests 23 24# A trivial mutable counter. 25class Counter(object): 26 def __init__(self): 27 self.value = 0 28 def inc(self): 29 self.value += 1 30 def dec(self): 31 self.value -= 1 32 def get(self): 33 return self.value 34 35class TestThread(threading.Thread): 36 def __init__(self, name, testcase, sema, mutex, nrunning): 37 threading.Thread.__init__(self, name=name) 38 self.testcase = testcase 39 self.sema = sema 40 self.mutex = mutex 41 self.nrunning = nrunning 42 43 def run(self): 44 delay = random.random() / 10000.0 45 if verbose: 46 print 'task %s will run for %.1f usec' % ( 47 self.name, delay * 1e6) 48 49 with self.sema: 50 with self.mutex: 51 self.nrunning.inc() 52 if verbose: 53 print self.nrunning.get(), 'tasks are running' 54 self.testcase.assertLessEqual(self.nrunning.get(), 3) 55 56 time.sleep(delay) 57 if verbose: 58 print 'task', self.name, 'done' 59 60 with self.mutex: 61 self.nrunning.dec() 62 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 63 if verbose: 64 print '%s is finished. %d tasks are running' % ( 65 self.name, self.nrunning.get()) 66 67class BaseTestCase(unittest.TestCase): 68 def setUp(self): 69 self._threads = test.test_support.threading_setup() 70 71 def tearDown(self): 72 test.test_support.threading_cleanup(*self._threads) 73 test.test_support.reap_children() 74 75 76class ThreadTests(BaseTestCase): 77 78 # Create a bunch of threads, let each do some work, wait until all are 79 # done. 80 def test_various_ops(self): 81 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 82 # times about 1 second per clump). 83 NUMTASKS = 10 84 85 # no more than 3 of the 10 can run at once 86 sema = threading.BoundedSemaphore(value=3) 87 mutex = threading.RLock() 88 numrunning = Counter() 89 90 threads = [] 91 92 for i in range(NUMTASKS): 93 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 94 threads.append(t) 95 self.assertIsNone(t.ident) 96 self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$') 97 t.start() 98 99 if verbose: 100 print 'waiting for all tasks to complete' 101 for t in threads: 102 t.join(NUMTASKS) 103 self.assertFalse(t.is_alive()) 104 self.assertNotEqual(t.ident, 0) 105 self.assertIsNotNone(t.ident) 106 self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$') 107 if verbose: 108 print 'all tasks done' 109 self.assertEqual(numrunning.get(), 0) 110 111 def test_ident_of_no_threading_threads(self): 112 # The ident still must work for the main thread and dummy threads. 113 self.assertIsNotNone(threading.currentThread().ident) 114 def f(): 115 ident.append(threading.currentThread().ident) 116 done.set() 117 done = threading.Event() 118 ident = [] 119 thread.start_new_thread(f, ()) 120 done.wait() 121 self.assertIsNotNone(ident[0]) 122 # Kill the "immortal" _DummyThread 123 del threading._active[ident[0]] 124 125 # run with a small(ish) thread stack size (256kB) 126 def test_various_ops_small_stack(self): 127 if verbose: 128 print 'with 256kB thread stack size...' 129 try: 130 threading.stack_size(262144) 131 except thread.error: 132 self.skipTest('platform does not support changing thread stack size') 133 self.test_various_ops() 134 threading.stack_size(0) 135 136 # run with a large thread stack size (1MB) 137 def test_various_ops_large_stack(self): 138 if verbose: 139 print 'with 1MB thread stack size...' 140 try: 141 threading.stack_size(0x100000) 142 except thread.error: 143 self.skipTest('platform does not support changing thread stack size') 144 self.test_various_ops() 145 threading.stack_size(0) 146 147 def test_foreign_thread(self): 148 # Check that a "foreign" thread can use the threading module. 149 def f(mutex): 150 # Calling current_thread() forces an entry for the foreign 151 # thread to get made in the threading._active map. 152 threading.current_thread() 153 mutex.release() 154 155 mutex = threading.Lock() 156 mutex.acquire() 157 tid = thread.start_new_thread(f, (mutex,)) 158 # Wait for the thread to finish. 159 mutex.acquire() 160 self.assertIn(tid, threading._active) 161 self.assertIsInstance(threading._active[tid], threading._DummyThread) 162 del threading._active[tid] 163 164 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 165 # exposed at the Python level. This test relies on ctypes to get at it. 166 def test_PyThreadState_SetAsyncExc(self): 167 try: 168 import ctypes 169 except ImportError: 170 self.skipTest('requires ctypes') 171 172 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 173 174 class AsyncExc(Exception): 175 pass 176 177 exception = ctypes.py_object(AsyncExc) 178 179 # First check it works when setting the exception from the same thread. 180 tid = thread.get_ident() 181 182 try: 183 result = set_async_exc(ctypes.c_long(tid), exception) 184 # The exception is async, so we might have to keep the VM busy until 185 # it notices. 186 while True: 187 pass 188 except AsyncExc: 189 pass 190 else: 191 # This code is unreachable but it reflects the intent. If we wanted 192 # to be smarter the above loop wouldn't be infinite. 193 self.fail("AsyncExc not raised") 194 try: 195 self.assertEqual(result, 1) # one thread state modified 196 except UnboundLocalError: 197 # The exception was raised too quickly for us to get the result. 198 pass 199 200 # `worker_started` is set by the thread when it's inside a try/except 201 # block waiting to catch the asynchronously set AsyncExc exception. 202 # `worker_saw_exception` is set by the thread upon catching that 203 # exception. 204 worker_started = threading.Event() 205 worker_saw_exception = threading.Event() 206 207 class Worker(threading.Thread): 208 def run(self): 209 self.id = thread.get_ident() 210 self.finished = False 211 212 try: 213 while True: 214 worker_started.set() 215 time.sleep(0.1) 216 except AsyncExc: 217 self.finished = True 218 worker_saw_exception.set() 219 220 t = Worker() 221 t.daemon = True # so if this fails, we don't hang Python at shutdown 222 t.start() 223 if verbose: 224 print " started worker thread" 225 226 # Try a thread id that doesn't make sense. 227 if verbose: 228 print " trying nonsensical thread id" 229 result = set_async_exc(ctypes.c_long(-1), exception) 230 self.assertEqual(result, 0) # no thread states modified 231 232 # Now raise an exception in the worker thread. 233 if verbose: 234 print " waiting for worker thread to get started" 235 ret = worker_started.wait() 236 self.assertTrue(ret) 237 if verbose: 238 print " verifying worker hasn't exited" 239 self.assertFalse(t.finished) 240 if verbose: 241 print " attempting to raise asynch exception in worker" 242 result = set_async_exc(ctypes.c_long(t.id), exception) 243 self.assertEqual(result, 1) # one thread state modified 244 if verbose: 245 print " waiting for worker to say it caught the exception" 246 worker_saw_exception.wait(timeout=10) 247 self.assertTrue(t.finished) 248 if verbose: 249 print " all OK -- joining worker" 250 if t.finished: 251 t.join() 252 # else the thread is still running, and we have no way to kill it 253 254 def test_limbo_cleanup(self): 255 # Issue 7481: Failure to start thread should cleanup the limbo map. 256 def fail_new_thread(*args): 257 raise thread.error() 258 _start_new_thread = threading._start_new_thread 259 threading._start_new_thread = fail_new_thread 260 try: 261 t = threading.Thread(target=lambda: None) 262 self.assertRaises(thread.error, t.start) 263 self.assertFalse( 264 t in threading._limbo, 265 "Failed to cleanup _limbo map on failure of Thread.start().") 266 finally: 267 threading._start_new_thread = _start_new_thread 268 269 def test_finalize_runnning_thread(self): 270 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 271 # very late on python exit: on deallocation of a running thread for 272 # example. 273 try: 274 import ctypes 275 except ImportError: 276 self.skipTest('requires ctypes') 277 278 rc = subprocess.call([sys.executable, "-c", """if 1: 279 import ctypes, sys, time, thread 280 281 # This lock is used as a simple event variable. 282 ready = thread.allocate_lock() 283 ready.acquire() 284 285 # Module globals are cleared before __del__ is run 286 # So we save the functions in class dict 287 class C: 288 ensure = ctypes.pythonapi.PyGILState_Ensure 289 release = ctypes.pythonapi.PyGILState_Release 290 def __del__(self): 291 state = self.ensure() 292 self.release(state) 293 294 def waitingThread(): 295 x = C() 296 ready.release() 297 time.sleep(100) 298 299 thread.start_new_thread(waitingThread, ()) 300 ready.acquire() # Be sure the other thread is waiting. 301 sys.exit(42) 302 """]) 303 self.assertEqual(rc, 42) 304 305 def test_finalize_with_trace(self): 306 # Issue1733757 307 # Avoid a deadlock when sys.settrace steps into threading._shutdown 308 p = subprocess.Popen([sys.executable, "-c", """if 1: 309 import sys, threading 310 311 # A deadlock-killer, to prevent the 312 # testsuite to hang forever 313 def killer(): 314 import os, time 315 time.sleep(2) 316 print 'program blocked; aborting' 317 os._exit(2) 318 t = threading.Thread(target=killer) 319 t.daemon = True 320 t.start() 321 322 # This is the trace function 323 def func(frame, event, arg): 324 threading.current_thread() 325 return func 326 327 sys.settrace(func) 328 """], 329 stdout=subprocess.PIPE, 330 stderr=subprocess.PIPE) 331 self.addCleanup(p.stdout.close) 332 self.addCleanup(p.stderr.close) 333 stdout, stderr = p.communicate() 334 rc = p.returncode 335 self.assertFalse(rc == 2, "interpreted was blocked") 336 self.assertTrue(rc == 0, 337 "Unexpected error: " + repr(stderr)) 338 339 def test_join_nondaemon_on_shutdown(self): 340 # Issue 1722344 341 # Raising SystemExit skipped threading._shutdown 342 p = subprocess.Popen([sys.executable, "-c", """if 1: 343 import threading 344 from time import sleep 345 346 def child(): 347 sleep(1) 348 # As a non-daemon thread we SHOULD wake up and nothing 349 # should be torn down yet 350 print "Woke up, sleep function is:", sleep 351 352 threading.Thread(target=child).start() 353 raise SystemExit 354 """], 355 stdout=subprocess.PIPE, 356 stderr=subprocess.PIPE) 357 self.addCleanup(p.stdout.close) 358 self.addCleanup(p.stderr.close) 359 stdout, stderr = p.communicate() 360 self.assertEqual(stdout.strip(), 361 "Woke up, sleep function is: <built-in function sleep>") 362 stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip() 363 self.assertEqual(stderr, "") 364 365 def test_enumerate_after_join(self): 366 # Try hard to trigger #1703448: a thread is still returned in 367 # threading.enumerate() after it has been join()ed. 368 enum = threading.enumerate 369 old_interval = sys.getcheckinterval() 370 try: 371 for i in xrange(1, 100): 372 # Try a couple times at each thread-switching interval 373 # to get more interleavings. 374 sys.setcheckinterval(i // 5) 375 t = threading.Thread(target=lambda: None) 376 t.start() 377 t.join() 378 l = enum() 379 self.assertNotIn(t, l, 380 "#1703448 triggered after %d trials: %s" % (i, l)) 381 finally: 382 sys.setcheckinterval(old_interval) 383 384 def test_no_refcycle_through_target(self): 385 class RunSelfFunction(object): 386 def __init__(self, should_raise): 387 # The links in this refcycle from Thread back to self 388 # should be cleaned up when the thread completes. 389 self.should_raise = should_raise 390 self.thread = threading.Thread(target=self._run, 391 args=(self,), 392 kwargs={'yet_another':self}) 393 self.thread.start() 394 395 def _run(self, other_ref, yet_another): 396 if self.should_raise: 397 raise SystemExit 398 399 cyclic_object = RunSelfFunction(should_raise=False) 400 weak_cyclic_object = weakref.ref(cyclic_object) 401 cyclic_object.thread.join() 402 del cyclic_object 403 self.assertEqual(None, weak_cyclic_object(), 404 msg=('%d references still around' % 405 sys.getrefcount(weak_cyclic_object()))) 406 407 raising_cyclic_object = RunSelfFunction(should_raise=True) 408 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 409 raising_cyclic_object.thread.join() 410 del raising_cyclic_object 411 self.assertEqual(None, weak_raising_cyclic_object(), 412 msg=('%d references still around' % 413 sys.getrefcount(weak_raising_cyclic_object()))) 414 415 @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') 416 def test_dummy_thread_after_fork(self): 417 # Issue #14308: a dummy thread in the active list doesn't mess up 418 # the after-fork mechanism. 419 code = """if 1: 420 import thread, threading, os, time 421 422 def background_thread(evt): 423 # Creates and registers the _DummyThread instance 424 threading.current_thread() 425 evt.set() 426 time.sleep(10) 427 428 evt = threading.Event() 429 thread.start_new_thread(background_thread, (evt,)) 430 evt.wait() 431 assert threading.active_count() == 2, threading.active_count() 432 if os.fork() == 0: 433 assert threading.active_count() == 1, threading.active_count() 434 os._exit(0) 435 else: 436 os.wait() 437 """ 438 _, out, err = assert_python_ok("-c", code) 439 self.assertEqual(out, '') 440 self.assertEqual(err, '') 441 442 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 443 def test_is_alive_after_fork(self): 444 # Try hard to trigger #18418: is_alive() could sometimes be True on 445 # threads that vanished after a fork. 446 old_interval = sys.getcheckinterval() 447 448 # Make the bug more likely to manifest. 449 sys.setcheckinterval(10) 450 451 try: 452 for i in range(20): 453 t = threading.Thread(target=lambda: None) 454 t.start() 455 pid = os.fork() 456 if pid == 0: 457 os._exit(1 if t.is_alive() else 0) 458 else: 459 t.join() 460 pid, status = os.waitpid(pid, 0) 461 self.assertEqual(0, status) 462 finally: 463 sys.setcheckinterval(old_interval) 464 465 def test_BoundedSemaphore_limit(self): 466 # BoundedSemaphore should raise ValueError if released too often. 467 for limit in range(1, 10): 468 bs = threading.BoundedSemaphore(limit) 469 threads = [threading.Thread(target=bs.acquire) 470 for _ in range(limit)] 471 for t in threads: 472 t.start() 473 for t in threads: 474 t.join() 475 threads = [threading.Thread(target=bs.release) 476 for _ in range(limit)] 477 for t in threads: 478 t.start() 479 for t in threads: 480 t.join() 481 self.assertRaises(ValueError, bs.release) 482 483class ThreadJoinOnShutdown(BaseTestCase): 484 485 # Between fork() and exec(), only async-safe functions are allowed (issues 486 # #12316 and #11870), and fork() from a worker thread is known to trigger 487 # problems with some operating systems (issue #3863): skip problematic tests 488 # on platforms known to behave badly. 489 platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', 490 'os2emx') 491 492 def _run_and_join(self, script): 493 script = """if 1: 494 import sys, os, time, threading 495 496 # a thread, which waits for the main program to terminate 497 def joiningfunc(mainthread): 498 mainthread.join() 499 print 'end of thread' 500 \n""" + script 501 502 p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) 503 rc = p.wait() 504 data = p.stdout.read().replace('\r', '') 505 p.stdout.close() 506 self.assertEqual(data, "end of main\nend of thread\n") 507 self.assertFalse(rc == 2, "interpreter was blocked") 508 self.assertTrue(rc == 0, "Unexpected error") 509 510 def test_1_join_on_shutdown(self): 511 # The usual case: on exit, wait for a non-daemon thread 512 script = """if 1: 513 import os 514 t = threading.Thread(target=joiningfunc, 515 args=(threading.current_thread(),)) 516 t.start() 517 time.sleep(0.1) 518 print 'end of main' 519 """ 520 self._run_and_join(script) 521 522 523 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 524 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 525 def test_2_join_in_forked_process(self): 526 # Like the test above, but from a forked interpreter 527 script = """if 1: 528 childpid = os.fork() 529 if childpid != 0: 530 os.waitpid(childpid, 0) 531 sys.exit(0) 532 533 t = threading.Thread(target=joiningfunc, 534 args=(threading.current_thread(),)) 535 t.start() 536 print 'end of main' 537 """ 538 self._run_and_join(script) 539 540 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 541 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 542 def test_3_join_in_forked_from_thread(self): 543 # Like the test above, but fork() was called from a worker thread 544 # In the forked process, the main Thread object must be marked as stopped. 545 script = """if 1: 546 main_thread = threading.current_thread() 547 def worker(): 548 childpid = os.fork() 549 if childpid != 0: 550 os.waitpid(childpid, 0) 551 sys.exit(0) 552 553 t = threading.Thread(target=joiningfunc, 554 args=(main_thread,)) 555 print 'end of main' 556 t.start() 557 t.join() # Should not block: main_thread is already stopped 558 559 w = threading.Thread(target=worker) 560 w.start() 561 """ 562 self._run_and_join(script) 563 564 def assertScriptHasOutput(self, script, expected_output): 565 p = subprocess.Popen([sys.executable, "-c", script], 566 stdout=subprocess.PIPE) 567 rc = p.wait() 568 data = p.stdout.read().decode().replace('\r', '') 569 self.assertEqual(rc, 0, "Unexpected error") 570 self.assertEqual(data, expected_output) 571 572 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 573 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 574 def test_4_joining_across_fork_in_worker_thread(self): 575 # There used to be a possible deadlock when forking from a child 576 # thread. See http://bugs.python.org/issue6643. 577 578 # The script takes the following steps: 579 # - The main thread in the parent process starts a new thread and then 580 # tries to join it. 581 # - The join operation acquires the Lock inside the thread's _block 582 # Condition. (See threading.py:Thread.join().) 583 # - We stub out the acquire method on the condition to force it to wait 584 # until the child thread forks. (See LOCK ACQUIRED HERE) 585 # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS 586 # HERE) 587 # - The main thread of the parent process enters Condition.wait(), 588 # which releases the lock on the child thread. 589 # - The child process returns. Without the necessary fix, when the 590 # main thread of the child process (which used to be the child thread 591 # in the parent process) attempts to exit, it will try to acquire the 592 # lock in the Thread._block Condition object and hang, because the 593 # lock was held across the fork. 594 595 script = """if 1: 596 import os, time, threading 597 598 finish_join = False 599 start_fork = False 600 601 def worker(): 602 # Wait until this thread's lock is acquired before forking to 603 # create the deadlock. 604 global finish_join 605 while not start_fork: 606 time.sleep(0.01) 607 # LOCK HELD: Main thread holds lock across this call. 608 childpid = os.fork() 609 finish_join = True 610 if childpid != 0: 611 # Parent process just waits for child. 612 os.waitpid(childpid, 0) 613 # Child process should just return. 614 615 w = threading.Thread(target=worker) 616 617 # Stub out the private condition variable's lock acquire method. 618 # This acquires the lock and then waits until the child has forked 619 # before returning, which will release the lock soon after. If 620 # someone else tries to fix this test case by acquiring this lock 621 # before forking instead of resetting it, the test case will 622 # deadlock when it shouldn't. 623 condition = w._block 624 orig_acquire = condition.acquire 625 call_count_lock = threading.Lock() 626 call_count = 0 627 def my_acquire(): 628 global call_count 629 global start_fork 630 orig_acquire() # LOCK ACQUIRED HERE 631 start_fork = True 632 if call_count == 0: 633 while not finish_join: 634 time.sleep(0.01) # WORKER THREAD FORKS HERE 635 with call_count_lock: 636 call_count += 1 637 condition.acquire = my_acquire 638 639 w.start() 640 w.join() 641 print('end of main') 642 """ 643 self.assertScriptHasOutput(script, "end of main\n") 644 645 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 646 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 647 def test_5_clear_waiter_locks_to_avoid_crash(self): 648 # Check that a spawned thread that forks doesn't segfault on certain 649 # platforms, namely OS X. This used to happen if there was a waiter 650 # lock in the thread's condition variable's waiters list. Even though 651 # we know the lock will be held across the fork, it is not safe to 652 # release locks held across forks on all platforms, so releasing the 653 # waiter lock caused a segfault on OS X. Furthermore, since locks on 654 # OS X are (as of this writing) implemented with a mutex + condition 655 # variable instead of a semaphore, while we know that the Python-level 656 # lock will be acquired, we can't know if the internal mutex will be 657 # acquired at the time of the fork. 658 659 script = """if True: 660 import os, time, threading 661 662 start_fork = False 663 664 def worker(): 665 # Wait until the main thread has attempted to join this thread 666 # before continuing. 667 while not start_fork: 668 time.sleep(0.01) 669 childpid = os.fork() 670 if childpid != 0: 671 # Parent process just waits for child. 672 (cpid, rc) = os.waitpid(childpid, 0) 673 assert cpid == childpid 674 assert rc == 0 675 print('end of worker thread') 676 else: 677 # Child process should just return. 678 pass 679 680 w = threading.Thread(target=worker) 681 682 # Stub out the private condition variable's _release_save method. 683 # This releases the condition's lock and flips the global that 684 # causes the worker to fork. At this point, the problematic waiter 685 # lock has been acquired once by the waiter and has been put onto 686 # the waiters list. 687 condition = w._block 688 orig_release_save = condition._release_save 689 def my_release_save(): 690 global start_fork 691 orig_release_save() 692 # Waiter lock held here, condition lock released. 693 start_fork = True 694 condition._release_save = my_release_save 695 696 w.start() 697 w.join() 698 print('end of main thread') 699 """ 700 output = "end of worker thread\nend of main thread\n" 701 self.assertScriptHasOutput(script, output) 702 703 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 704 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 705 def test_reinit_tls_after_fork(self): 706 # Issue #13817: fork() would deadlock in a multithreaded program with 707 # the ad-hoc TLS implementation. 708 709 def do_fork_and_wait(): 710 # just fork a child process and wait it 711 pid = os.fork() 712 if pid > 0: 713 os.waitpid(pid, 0) 714 else: 715 os._exit(0) 716 717 # start a bunch of threads that will fork() child processes 718 threads = [] 719 for i in range(16): 720 t = threading.Thread(target=do_fork_and_wait) 721 threads.append(t) 722 t.start() 723 724 for t in threads: 725 t.join() 726 727 @cpython_only 728 @unittest.skipIf(_testcapi is None, "need _testcapi module") 729 def test_frame_tstate_tracing(self): 730 # Issue #14432: Crash when a generator is created in a C thread that is 731 # destroyed while the generator is still used. The issue was that a 732 # generator contains a frame, and the frame kept a reference to the 733 # Python state of the destroyed C thread. The crash occurs when a trace 734 # function is setup. 735 736 def noop_trace(frame, event, arg): 737 # no operation 738 return noop_trace 739 740 def generator(): 741 while 1: 742 yield "generator" 743 744 def callback(): 745 if callback.gen is None: 746 callback.gen = generator() 747 return next(callback.gen) 748 callback.gen = None 749 750 old_trace = sys.gettrace() 751 sys.settrace(noop_trace) 752 try: 753 # Install a trace function 754 threading.settrace(noop_trace) 755 756 # Create a generator in a C thread which exits after the call 757 _testcapi.call_in_temporary_c_thread(callback) 758 759 # Call the generator in a different Python thread, check that the 760 # generator didn't keep a reference to the destroyed thread state 761 for test in range(3): 762 # The trace function is still called here 763 callback() 764 finally: 765 sys.settrace(old_trace) 766 767 768class ThreadingExceptionTests(BaseTestCase): 769 # A RuntimeError should be raised if Thread.start() is called 770 # multiple times. 771 def test_start_thread_again(self): 772 thread = threading.Thread() 773 thread.start() 774 self.assertRaises(RuntimeError, thread.start) 775 776 def test_joining_current_thread(self): 777 current_thread = threading.current_thread() 778 self.assertRaises(RuntimeError, current_thread.join); 779 780 def test_joining_inactive_thread(self): 781 thread = threading.Thread() 782 self.assertRaises(RuntimeError, thread.join) 783 784 def test_daemonize_active_thread(self): 785 thread = threading.Thread() 786 thread.start() 787 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 788 789 def test_print_exception(self): 790 script = r"""if 1: 791 import threading 792 import time 793 794 running = False 795 def run(): 796 global running 797 running = True 798 while running: 799 time.sleep(0.01) 800 1.0/0.0 801 t = threading.Thread(target=run) 802 t.start() 803 while not running: 804 time.sleep(0.01) 805 running = False 806 t.join() 807 """ 808 rc, out, err = assert_python_ok("-c", script) 809 self.assertEqual(out, '') 810 self.assertIn("Exception in thread", err) 811 self.assertIn("Traceback (most recent call last):", err) 812 self.assertIn("ZeroDivisionError", err) 813 self.assertNotIn("Unhandled exception", err) 814 815 def test_print_exception_stderr_is_none_1(self): 816 script = r"""if 1: 817 import sys 818 import threading 819 import time 820 821 running = False 822 def run(): 823 global running 824 running = True 825 while running: 826 time.sleep(0.01) 827 1.0/0.0 828 t = threading.Thread(target=run) 829 t.start() 830 while not running: 831 time.sleep(0.01) 832 sys.stderr = None 833 running = False 834 t.join() 835 """ 836 rc, out, err = assert_python_ok("-c", script) 837 self.assertEqual(out, '') 838 self.assertIn("Exception in thread", err) 839 self.assertIn("Traceback (most recent call last):", err) 840 self.assertIn("ZeroDivisionError", err) 841 self.assertNotIn("Unhandled exception", err) 842 843 def test_print_exception_stderr_is_none_2(self): 844 script = r"""if 1: 845 import sys 846 import threading 847 import time 848 849 running = False 850 def run(): 851 global running 852 running = True 853 while running: 854 time.sleep(0.01) 855 1.0/0.0 856 sys.stderr = None 857 t = threading.Thread(target=run) 858 t.start() 859 while not running: 860 time.sleep(0.01) 861 running = False 862 t.join() 863 """ 864 rc, out, err = assert_python_ok("-c", script) 865 self.assertEqual(out, '') 866 self.assertNotIn("Unhandled exception", err) 867 868 869class LockTests(lock_tests.LockTests): 870 locktype = staticmethod(threading.Lock) 871 872class RLockTests(lock_tests.RLockTests): 873 locktype = staticmethod(threading.RLock) 874 875class EventTests(lock_tests.EventTests): 876 eventtype = staticmethod(threading.Event) 877 878class ConditionAsRLockTests(lock_tests.RLockTests): 879 # Condition uses an RLock by default and exports its API. 880 locktype = staticmethod(threading.Condition) 881 882class ConditionTests(lock_tests.ConditionTests): 883 condtype = staticmethod(threading.Condition) 884 885class SemaphoreTests(lock_tests.SemaphoreTests): 886 semtype = staticmethod(threading.Semaphore) 887 888class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 889 semtype = staticmethod(threading.BoundedSemaphore) 890 891 @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') 892 def test_recursion_limit(self): 893 # Issue 9670 894 # test that excessive recursion within a non-main thread causes 895 # an exception rather than crashing the interpreter on platforms 896 # like Mac OS X or FreeBSD which have small default stack sizes 897 # for threads 898 script = """if True: 899 import threading 900 901 def recurse(): 902 return recurse() 903 904 def outer(): 905 try: 906 recurse() 907 except RuntimeError: 908 pass 909 910 w = threading.Thread(target=outer) 911 w.start() 912 w.join() 913 print('end of main thread') 914 """ 915 expected_output = "end of main thread\n" 916 p = subprocess.Popen([sys.executable, "-c", script], 917 stdout=subprocess.PIPE) 918 stdout, stderr = p.communicate() 919 data = stdout.decode().replace('\r', '') 920 self.assertEqual(p.returncode, 0, "Unexpected error") 921 self.assertEqual(data, expected_output) 922 923def test_main(): 924 test.test_support.run_unittest(LockTests, RLockTests, EventTests, 925 ConditionAsRLockTests, ConditionTests, 926 SemaphoreTests, BoundedSemaphoreTests, 927 ThreadTests, 928 ThreadJoinOnShutdown, 929 ThreadingExceptionTests, 930 ) 931 932if __name__ == "__main__": 933 test_main() 934