1# Very rudimentary test of threading module 2 3import test.test_support 4from test.test_support import verbose 5import random 6import re 7import sys 8thread = test.test_support.import_module('thread') 9threading = test.test_support.import_module('threading') 10import time 11import unittest 12import weakref 13import os 14import subprocess 15 16from test import lock_tests 17 18# A trivial mutable counter. 19class Counter(object): 20 def __init__(self): 21 self.value = 0 22 def inc(self): 23 self.value += 1 24 def dec(self): 25 self.value -= 1 26 def get(self): 27 return self.value 28 29class TestThread(threading.Thread): 30 def __init__(self, name, testcase, sema, mutex, nrunning): 31 threading.Thread.__init__(self, name=name) 32 self.testcase = testcase 33 self.sema = sema 34 self.mutex = mutex 35 self.nrunning = nrunning 36 37 def run(self): 38 delay = random.random() / 10000.0 39 if verbose: 40 print 'task %s will run for %.1f usec' % ( 41 self.name, delay * 1e6) 42 43 with self.sema: 44 with self.mutex: 45 self.nrunning.inc() 46 if verbose: 47 print self.nrunning.get(), 'tasks are running' 48 self.testcase.assertTrue(self.nrunning.get() <= 3) 49 50 time.sleep(delay) 51 if verbose: 52 print 'task', self.name, 'done' 53 54 with self.mutex: 55 self.nrunning.dec() 56 self.testcase.assertTrue(self.nrunning.get() >= 0) 57 if verbose: 58 print '%s is finished. %d tasks are running' % ( 59 self.name, self.nrunning.get()) 60 61class BaseTestCase(unittest.TestCase): 62 def setUp(self): 63 self._threads = test.test_support.threading_setup() 64 65 def tearDown(self): 66 test.test_support.threading_cleanup(*self._threads) 67 test.test_support.reap_children() 68 69 70class ThreadTests(BaseTestCase): 71 72 # Create a bunch of threads, let each do some work, wait until all are 73 # done. 74 def test_various_ops(self): 75 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 76 # times about 1 second per clump). 77 NUMTASKS = 10 78 79 # no more than 3 of the 10 can run at once 80 sema = threading.BoundedSemaphore(value=3) 81 mutex = threading.RLock() 82 numrunning = Counter() 83 84 threads = [] 85 86 for i in range(NUMTASKS): 87 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 88 threads.append(t) 89 self.assertEqual(t.ident, None) 90 self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t))) 91 t.start() 92 93 if verbose: 94 print 'waiting for all tasks to complete' 95 for t in threads: 96 t.join(NUMTASKS) 97 self.assertTrue(not t.is_alive()) 98 self.assertNotEqual(t.ident, 0) 99 self.assertFalse(t.ident is None) 100 self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t))) 101 if verbose: 102 print 'all tasks done' 103 self.assertEqual(numrunning.get(), 0) 104 105 def test_ident_of_no_threading_threads(self): 106 # The ident still must work for the main thread and dummy threads. 107 self.assertFalse(threading.currentThread().ident is None) 108 def f(): 109 ident.append(threading.currentThread().ident) 110 done.set() 111 done = threading.Event() 112 ident = [] 113 thread.start_new_thread(f, ()) 114 done.wait() 115 self.assertFalse(ident[0] is None) 116 # Kill the "immortal" _DummyThread 117 del threading._active[ident[0]] 118 119 # run with a small(ish) thread stack size (256kB) 120 def test_various_ops_small_stack(self): 121 if verbose: 122 print 'with 256kB thread stack size...' 123 try: 124 threading.stack_size(262144) 125 except thread.error: 126 if verbose: 127 print 'platform does not support changing thread stack size' 128 return 129 self.test_various_ops() 130 threading.stack_size(0) 131 132 # run with a large thread stack size (1MB) 133 def test_various_ops_large_stack(self): 134 if verbose: 135 print 'with 1MB thread stack size...' 136 try: 137 threading.stack_size(0x100000) 138 except thread.error: 139 if verbose: 140 print 'platform does not support changing thread stack size' 141 return 142 self.test_various_ops() 143 threading.stack_size(0) 144 145 def test_foreign_thread(self): 146 # Check that a "foreign" thread can use the threading module. 147 def f(mutex): 148 # Calling current_thread() forces an entry for the foreign 149 # thread to get made in the threading._active map. 150 threading.current_thread() 151 mutex.release() 152 153 mutex = threading.Lock() 154 mutex.acquire() 155 tid = thread.start_new_thread(f, (mutex,)) 156 # Wait for the thread to finish. 157 mutex.acquire() 158 self.assertIn(tid, threading._active) 159 self.assertIsInstance(threading._active[tid], threading._DummyThread) 160 del threading._active[tid] 161 162 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 163 # exposed at the Python level. This test relies on ctypes to get at it. 164 def test_PyThreadState_SetAsyncExc(self): 165 try: 166 import ctypes 167 except ImportError: 168 if verbose: 169 print "test_PyThreadState_SetAsyncExc can't import ctypes" 170 return # can't do anything 171 172 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 173 174 class AsyncExc(Exception): 175 pass 176 177 exception = ctypes.py_object(AsyncExc) 178 179 # First check it works when setting the exception from the same thread. 180 tid = thread.get_ident() 181 182 try: 183 result = set_async_exc(ctypes.c_long(tid), exception) 184 # The exception is async, so we might have to keep the VM busy until 185 # it notices. 186 while True: 187 pass 188 except AsyncExc: 189 pass 190 else: 191 # This code is unreachable but it reflects the intent. If we wanted 192 # to be smarter the above loop wouldn't be infinite. 193 self.fail("AsyncExc not raised") 194 try: 195 self.assertEqual(result, 1) # one thread state modified 196 except UnboundLocalError: 197 # The exception was raised too quickly for us to get the result. 198 pass 199 200 # `worker_started` is set by the thread when it's inside a try/except 201 # block waiting to catch the asynchronously set AsyncExc exception. 202 # `worker_saw_exception` is set by the thread upon catching that 203 # exception. 204 worker_started = threading.Event() 205 worker_saw_exception = threading.Event() 206 207 class Worker(threading.Thread): 208 def run(self): 209 self.id = thread.get_ident() 210 self.finished = False 211 212 try: 213 while True: 214 worker_started.set() 215 time.sleep(0.1) 216 except AsyncExc: 217 self.finished = True 218 worker_saw_exception.set() 219 220 t = Worker() 221 t.daemon = True # so if this fails, we don't hang Python at shutdown 222 t.start() 223 if verbose: 224 print " started worker thread" 225 226 # Try a thread id that doesn't make sense. 227 if verbose: 228 print " trying nonsensical thread id" 229 result = set_async_exc(ctypes.c_long(-1), exception) 230 self.assertEqual(result, 0) # no thread states modified 231 232 # Now raise an exception in the worker thread. 233 if verbose: 234 print " waiting for worker thread to get started" 235 ret = worker_started.wait() 236 self.assertTrue(ret) 237 if verbose: 238 print " verifying worker hasn't exited" 239 self.assertTrue(not t.finished) 240 if verbose: 241 print " attempting to raise asynch exception in worker" 242 result = set_async_exc(ctypes.c_long(t.id), exception) 243 self.assertEqual(result, 1) # one thread state modified 244 if verbose: 245 print " waiting for worker to say it caught the exception" 246 worker_saw_exception.wait(timeout=10) 247 self.assertTrue(t.finished) 248 if verbose: 249 print " all OK -- joining worker" 250 if t.finished: 251 t.join() 252 # else the thread is still running, and we have no way to kill it 253 254 def test_limbo_cleanup(self): 255 # Issue 7481: Failure to start thread should cleanup the limbo map. 256 def fail_new_thread(*args): 257 raise thread.error() 258 _start_new_thread = threading._start_new_thread 259 threading._start_new_thread = fail_new_thread 260 try: 261 t = threading.Thread(target=lambda: None) 262 self.assertRaises(thread.error, t.start) 263 self.assertFalse( 264 t in threading._limbo, 265 "Failed to cleanup _limbo map on failure of Thread.start().") 266 finally: 267 threading._start_new_thread = _start_new_thread 268 269 def test_finalize_runnning_thread(self): 270 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 271 # very late on python exit: on deallocation of a running thread for 272 # example. 273 try: 274 import ctypes 275 except ImportError: 276 if verbose: 277 print("test_finalize_with_runnning_thread can't import ctypes") 278 return # can't do anything 279 280 rc = subprocess.call([sys.executable, "-c", """if 1: 281 import ctypes, sys, time, thread 282 283 # This lock is used as a simple event variable. 284 ready = thread.allocate_lock() 285 ready.acquire() 286 287 # Module globals are cleared before __del__ is run 288 # So we save the functions in class dict 289 class C: 290 ensure = ctypes.pythonapi.PyGILState_Ensure 291 release = ctypes.pythonapi.PyGILState_Release 292 def __del__(self): 293 state = self.ensure() 294 self.release(state) 295 296 def waitingThread(): 297 x = C() 298 ready.release() 299 time.sleep(100) 300 301 thread.start_new_thread(waitingThread, ()) 302 ready.acquire() # Be sure the other thread is waiting. 303 sys.exit(42) 304 """]) 305 self.assertEqual(rc, 42) 306 307 def test_finalize_with_trace(self): 308 # Issue1733757 309 # Avoid a deadlock when sys.settrace steps into threading._shutdown 310 p = subprocess.Popen([sys.executable, "-c", """if 1: 311 import sys, threading 312 313 # A deadlock-killer, to prevent the 314 # testsuite to hang forever 315 def killer(): 316 import os, time 317 time.sleep(2) 318 print 'program blocked; aborting' 319 os._exit(2) 320 t = threading.Thread(target=killer) 321 t.daemon = True 322 t.start() 323 324 # This is the trace function 325 def func(frame, event, arg): 326 threading.current_thread() 327 return func 328 329 sys.settrace(func) 330 """], 331 stdout=subprocess.PIPE, 332 stderr=subprocess.PIPE) 333 self.addCleanup(p.stdout.close) 334 self.addCleanup(p.stderr.close) 335 stdout, stderr = p.communicate() 336 rc = p.returncode 337 self.assertFalse(rc == 2, "interpreted was blocked") 338 self.assertTrue(rc == 0, 339 "Unexpected error: " + repr(stderr)) 340 341 def test_join_nondaemon_on_shutdown(self): 342 # Issue 1722344 343 # Raising SystemExit skipped threading._shutdown 344 p = subprocess.Popen([sys.executable, "-c", """if 1: 345 import threading 346 from time import sleep 347 348 def child(): 349 sleep(1) 350 # As a non-daemon thread we SHOULD wake up and nothing 351 # should be torn down yet 352 print "Woke up, sleep function is:", sleep 353 354 threading.Thread(target=child).start() 355 raise SystemExit 356 """], 357 stdout=subprocess.PIPE, 358 stderr=subprocess.PIPE) 359 self.addCleanup(p.stdout.close) 360 self.addCleanup(p.stderr.close) 361 stdout, stderr = p.communicate() 362 self.assertEqual(stdout.strip(), 363 "Woke up, sleep function is: <built-in function sleep>") 364 stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip() 365 self.assertEqual(stderr, "") 366 367 def test_enumerate_after_join(self): 368 # Try hard to trigger #1703448: a thread is still returned in 369 # threading.enumerate() after it has been join()ed. 370 enum = threading.enumerate 371 old_interval = sys.getcheckinterval() 372 try: 373 for i in xrange(1, 100): 374 # Try a couple times at each thread-switching interval 375 # to get more interleavings. 376 sys.setcheckinterval(i // 5) 377 t = threading.Thread(target=lambda: None) 378 t.start() 379 t.join() 380 l = enum() 381 self.assertNotIn(t, l, 382 "#1703448 triggered after %d trials: %s" % (i, l)) 383 finally: 384 sys.setcheckinterval(old_interval) 385 386 def test_no_refcycle_through_target(self): 387 class RunSelfFunction(object): 388 def __init__(self, should_raise): 389 # The links in this refcycle from Thread back to self 390 # should be cleaned up when the thread completes. 391 self.should_raise = should_raise 392 self.thread = threading.Thread(target=self._run, 393 args=(self,), 394 kwargs={'yet_another':self}) 395 self.thread.start() 396 397 def _run(self, other_ref, yet_another): 398 if self.should_raise: 399 raise SystemExit 400 401 cyclic_object = RunSelfFunction(should_raise=False) 402 weak_cyclic_object = weakref.ref(cyclic_object) 403 cyclic_object.thread.join() 404 del cyclic_object 405 self.assertEqual(None, weak_cyclic_object(), 406 msg=('%d references still around' % 407 sys.getrefcount(weak_cyclic_object()))) 408 409 raising_cyclic_object = RunSelfFunction(should_raise=True) 410 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 411 raising_cyclic_object.thread.join() 412 del raising_cyclic_object 413 self.assertEqual(None, weak_raising_cyclic_object(), 414 msg=('%d references still around' % 415 sys.getrefcount(weak_raising_cyclic_object()))) 416 417 418class ThreadJoinOnShutdown(BaseTestCase): 419 420 def _run_and_join(self, script): 421 script = """if 1: 422 import sys, os, time, threading 423 424 # a thread, which waits for the main program to terminate 425 def joiningfunc(mainthread): 426 mainthread.join() 427 print 'end of thread' 428 \n""" + script 429 430 p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE) 431 rc = p.wait() 432 data = p.stdout.read().replace('\r', '') 433 p.stdout.close() 434 self.assertEqual(data, "end of main\nend of thread\n") 435 self.assertFalse(rc == 2, "interpreter was blocked") 436 self.assertTrue(rc == 0, "Unexpected error") 437 438 def test_1_join_on_shutdown(self): 439 # The usual case: on exit, wait for a non-daemon thread 440 script = """if 1: 441 import os 442 t = threading.Thread(target=joiningfunc, 443 args=(threading.current_thread(),)) 444 t.start() 445 time.sleep(0.1) 446 print 'end of main' 447 """ 448 self._run_and_join(script) 449 450 451 def test_2_join_in_forked_process(self): 452 # Like the test above, but from a forked interpreter 453 import os 454 if not hasattr(os, 'fork'): 455 return 456 script = """if 1: 457 childpid = os.fork() 458 if childpid != 0: 459 os.waitpid(childpid, 0) 460 sys.exit(0) 461 462 t = threading.Thread(target=joiningfunc, 463 args=(threading.current_thread(),)) 464 t.start() 465 print 'end of main' 466 """ 467 self._run_and_join(script) 468 469 def test_3_join_in_forked_from_thread(self): 470 # Like the test above, but fork() was called from a worker thread 471 # In the forked process, the main Thread object must be marked as stopped. 472 import os 473 if not hasattr(os, 'fork'): 474 return 475 # Skip platforms with known problems forking from a worker thread. 476 # See http://bugs.python.org/issue3863. 477 if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', 478 'os2emx'): 479 print >>sys.stderr, ('Skipping test_3_join_in_forked_from_thread' 480 ' due to known OS bugs on'), sys.platform 481 return 482 script = """if 1: 483 main_thread = threading.current_thread() 484 def worker(): 485 childpid = os.fork() 486 if childpid != 0: 487 os.waitpid(childpid, 0) 488 sys.exit(0) 489 490 t = threading.Thread(target=joiningfunc, 491 args=(main_thread,)) 492 print 'end of main' 493 t.start() 494 t.join() # Should not block: main_thread is already stopped 495 496 w = threading.Thread(target=worker) 497 w.start() 498 """ 499 self._run_and_join(script) 500 501 def assertScriptHasOutput(self, script, expected_output): 502 p = subprocess.Popen([sys.executable, "-c", script], 503 stdout=subprocess.PIPE) 504 rc = p.wait() 505 data = p.stdout.read().decode().replace('\r', '') 506 self.assertEqual(rc, 0, "Unexpected error") 507 self.assertEqual(data, expected_output) 508 509 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 510 def test_4_joining_across_fork_in_worker_thread(self): 511 # There used to be a possible deadlock when forking from a child 512 # thread. See http://bugs.python.org/issue6643. 513 514 # Skip platforms with known problems forking from a worker thread. 515 # See http://bugs.python.org/issue3863. 516 if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): 517 raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) 518 519 # The script takes the following steps: 520 # - The main thread in the parent process starts a new thread and then 521 # tries to join it. 522 # - The join operation acquires the Lock inside the thread's _block 523 # Condition. (See threading.py:Thread.join().) 524 # - We stub out the acquire method on the condition to force it to wait 525 # until the child thread forks. (See LOCK ACQUIRED HERE) 526 # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS 527 # HERE) 528 # - The main thread of the parent process enters Condition.wait(), 529 # which releases the lock on the child thread. 530 # - The child process returns. Without the necessary fix, when the 531 # main thread of the child process (which used to be the child thread 532 # in the parent process) attempts to exit, it will try to acquire the 533 # lock in the Thread._block Condition object and hang, because the 534 # lock was held across the fork. 535 536 script = """if 1: 537 import os, time, threading 538 539 finish_join = False 540 start_fork = False 541 542 def worker(): 543 # Wait until this thread's lock is acquired before forking to 544 # create the deadlock. 545 global finish_join 546 while not start_fork: 547 time.sleep(0.01) 548 # LOCK HELD: Main thread holds lock across this call. 549 childpid = os.fork() 550 finish_join = True 551 if childpid != 0: 552 # Parent process just waits for child. 553 os.waitpid(childpid, 0) 554 # Child process should just return. 555 556 w = threading.Thread(target=worker) 557 558 # Stub out the private condition variable's lock acquire method. 559 # This acquires the lock and then waits until the child has forked 560 # before returning, which will release the lock soon after. If 561 # someone else tries to fix this test case by acquiring this lock 562 # before forking instead of resetting it, the test case will 563 # deadlock when it shouldn't. 564 condition = w._block 565 orig_acquire = condition.acquire 566 call_count_lock = threading.Lock() 567 call_count = 0 568 def my_acquire(): 569 global call_count 570 global start_fork 571 orig_acquire() # LOCK ACQUIRED HERE 572 start_fork = True 573 if call_count == 0: 574 while not finish_join: 575 time.sleep(0.01) # WORKER THREAD FORKS HERE 576 with call_count_lock: 577 call_count += 1 578 condition.acquire = my_acquire 579 580 w.start() 581 w.join() 582 print('end of main') 583 """ 584 self.assertScriptHasOutput(script, "end of main\n") 585 586 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 587 def test_5_clear_waiter_locks_to_avoid_crash(self): 588 # Check that a spawned thread that forks doesn't segfault on certain 589 # platforms, namely OS X. This used to happen if there was a waiter 590 # lock in the thread's condition variable's waiters list. Even though 591 # we know the lock will be held across the fork, it is not safe to 592 # release locks held across forks on all platforms, so releasing the 593 # waiter lock caused a segfault on OS X. Furthermore, since locks on 594 # OS X are (as of this writing) implemented with a mutex + condition 595 # variable instead of a semaphore, while we know that the Python-level 596 # lock will be acquired, we can't know if the internal mutex will be 597 # acquired at the time of the fork. 598 599 # Skip platforms with known problems forking from a worker thread. 600 # See http://bugs.python.org/issue3863. 601 if sys.platform in ('freebsd4', 'freebsd5', 'freebsd6', 'os2emx'): 602 raise unittest.SkipTest('due to known OS bugs on ' + sys.platform) 603 script = """if True: 604 import os, time, threading 605 606 start_fork = False 607 608 def worker(): 609 # Wait until the main thread has attempted to join this thread 610 # before continuing. 611 while not start_fork: 612 time.sleep(0.01) 613 childpid = os.fork() 614 if childpid != 0: 615 # Parent process just waits for child. 616 (cpid, rc) = os.waitpid(childpid, 0) 617 assert cpid == childpid 618 assert rc == 0 619 print('end of worker thread') 620 else: 621 # Child process should just return. 622 pass 623 624 w = threading.Thread(target=worker) 625 626 # Stub out the private condition variable's _release_save method. 627 # This releases the condition's lock and flips the global that 628 # causes the worker to fork. At this point, the problematic waiter 629 # lock has been acquired once by the waiter and has been put onto 630 # the waiters list. 631 condition = w._block 632 orig_release_save = condition._release_save 633 def my_release_save(): 634 global start_fork 635 orig_release_save() 636 # Waiter lock held here, condition lock released. 637 start_fork = True 638 condition._release_save = my_release_save 639 640 w.start() 641 w.join() 642 print('end of main thread') 643 """ 644 output = "end of worker thread\nend of main thread\n" 645 self.assertScriptHasOutput(script, output) 646 647 648class ThreadingExceptionTests(BaseTestCase): 649 # A RuntimeError should be raised if Thread.start() is called 650 # multiple times. 651 def test_start_thread_again(self): 652 thread = threading.Thread() 653 thread.start() 654 self.assertRaises(RuntimeError, thread.start) 655 656 def test_joining_current_thread(self): 657 current_thread = threading.current_thread() 658 self.assertRaises(RuntimeError, current_thread.join); 659 660 def test_joining_inactive_thread(self): 661 thread = threading.Thread() 662 self.assertRaises(RuntimeError, thread.join) 663 664 def test_daemonize_active_thread(self): 665 thread = threading.Thread() 666 thread.start() 667 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 668 669 670class LockTests(lock_tests.LockTests): 671 locktype = staticmethod(threading.Lock) 672 673class RLockTests(lock_tests.RLockTests): 674 locktype = staticmethod(threading.RLock) 675 676class EventTests(lock_tests.EventTests): 677 eventtype = staticmethod(threading.Event) 678 679class ConditionAsRLockTests(lock_tests.RLockTests): 680 # An Condition uses an RLock by default and exports its API. 681 locktype = staticmethod(threading.Condition) 682 683class ConditionTests(lock_tests.ConditionTests): 684 condtype = staticmethod(threading.Condition) 685 686class SemaphoreTests(lock_tests.SemaphoreTests): 687 semtype = staticmethod(threading.Semaphore) 688 689class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 690 semtype = staticmethod(threading.BoundedSemaphore) 691 692 @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem') 693 def test_recursion_limit(self): 694 # Issue 9670 695 # test that excessive recursion within a non-main thread causes 696 # an exception rather than crashing the interpreter on platforms 697 # like Mac OS X or FreeBSD which have small default stack sizes 698 # for threads 699 script = """if True: 700 import threading 701 702 def recurse(): 703 return recurse() 704 705 def outer(): 706 try: 707 recurse() 708 except RuntimeError: 709 pass 710 711 w = threading.Thread(target=outer) 712 w.start() 713 w.join() 714 print('end of main thread') 715 """ 716 expected_output = "end of main thread\n" 717 p = subprocess.Popen([sys.executable, "-c", script], 718 stdout=subprocess.PIPE) 719 stdout, stderr = p.communicate() 720 data = stdout.decode().replace('\r', '') 721 self.assertEqual(p.returncode, 0, "Unexpected error") 722 self.assertEqual(data, expected_output) 723 724def test_main(): 725 test.test_support.run_unittest(LockTests, RLockTests, EventTests, 726 ConditionAsRLockTests, ConditionTests, 727 SemaphoreTests, BoundedSemaphoreTests, 728 ThreadTests, 729 ThreadJoinOnShutdown, 730 ThreadingExceptionTests, 731 ) 732 733if __name__ == "__main__": 734 test_main() 735