1""" 2Tests for the threading module. 3""" 4 5import test.support 6from test.support import (verbose, import_module, cpython_only, 7 requires_type_collecting) 8from test.support.script_helper import assert_python_ok, assert_python_failure 9 10import random 11import sys 12_thread = import_module('_thread') 13threading = import_module('threading') 14import time 15import unittest 16import weakref 17import os 18import subprocess 19 20from test import lock_tests 21from test import support 22 23 24# Between fork() and exec(), only async-safe functions are allowed (issues 25# #12316 and #11870), and fork() from a worker thread is known to trigger 26# problems with some operating systems (issue #3863): skip problematic tests 27# on platforms known to behave badly. 28platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5', 29 'hp-ux11') 30 31 32# A trivial mutable counter. 33class Counter(object): 34 def __init__(self): 35 self.value = 0 36 def inc(self): 37 self.value += 1 38 def dec(self): 39 self.value -= 1 40 def get(self): 41 return self.value 42 43class TestThread(threading.Thread): 44 def __init__(self, name, testcase, sema, mutex, nrunning): 45 threading.Thread.__init__(self, name=name) 46 self.testcase = testcase 47 self.sema = sema 48 self.mutex = mutex 49 self.nrunning = nrunning 50 51 def run(self): 52 delay = random.random() / 10000.0 53 if verbose: 54 print('task %s will run for %.1f usec' % 55 (self.name, delay * 1e6)) 56 57 with self.sema: 58 with self.mutex: 59 self.nrunning.inc() 60 if verbose: 61 print(self.nrunning.get(), 'tasks are running') 62 self.testcase.assertLessEqual(self.nrunning.get(), 3) 63 64 time.sleep(delay) 65 if verbose: 66 print('task', self.name, 'done') 67 68 with self.mutex: 69 self.nrunning.dec() 70 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 71 if verbose: 72 print('%s is finished. %d tasks are running' % 73 (self.name, self.nrunning.get())) 74 75 76class BaseTestCase(unittest.TestCase): 77 def setUp(self): 78 self._threads = test.support.threading_setup() 79 80 def tearDown(self): 81 test.support.threading_cleanup(*self._threads) 82 test.support.reap_children() 83 84 85class ThreadTests(BaseTestCase): 86 87 # Create a bunch of threads, let each do some work, wait until all are 88 # done. 89 def test_various_ops(self): 90 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 91 # times about 1 second per clump). 92 NUMTASKS = 10 93 94 # no more than 3 of the 10 can run at once 95 sema = threading.BoundedSemaphore(value=3) 96 mutex = threading.RLock() 97 numrunning = Counter() 98 99 threads = [] 100 101 for i in range(NUMTASKS): 102 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 103 threads.append(t) 104 self.assertIsNone(t.ident) 105 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') 106 t.start() 107 108 if verbose: 109 print('waiting for all tasks to complete') 110 for t in threads: 111 t.join() 112 self.assertFalse(t.is_alive()) 113 self.assertNotEqual(t.ident, 0) 114 self.assertIsNotNone(t.ident) 115 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') 116 if verbose: 117 print('all tasks done') 118 self.assertEqual(numrunning.get(), 0) 119 120 def test_ident_of_no_threading_threads(self): 121 # The ident still must work for the main thread and dummy threads. 122 self.assertIsNotNone(threading.currentThread().ident) 123 def f(): 124 ident.append(threading.currentThread().ident) 125 done.set() 126 done = threading.Event() 127 ident = [] 128 _thread.start_new_thread(f, ()) 129 done.wait() 130 self.assertIsNotNone(ident[0]) 131 # Kill the "immortal" _DummyThread 132 del threading._active[ident[0]] 133 134 # run with a small(ish) thread stack size (256kB) 135 def test_various_ops_small_stack(self): 136 if verbose: 137 print('with 256kB thread stack size...') 138 try: 139 threading.stack_size(262144) 140 except _thread.error: 141 raise unittest.SkipTest( 142 'platform does not support changing thread stack size') 143 self.test_various_ops() 144 threading.stack_size(0) 145 146 # run with a large thread stack size (1MB) 147 def test_various_ops_large_stack(self): 148 if verbose: 149 print('with 1MB thread stack size...') 150 try: 151 threading.stack_size(0x100000) 152 except _thread.error: 153 raise unittest.SkipTest( 154 'platform does not support changing thread stack size') 155 self.test_various_ops() 156 threading.stack_size(0) 157 158 def test_foreign_thread(self): 159 # Check that a "foreign" thread can use the threading module. 160 def f(mutex): 161 # Calling current_thread() forces an entry for the foreign 162 # thread to get made in the threading._active map. 163 threading.current_thread() 164 mutex.release() 165 166 mutex = threading.Lock() 167 mutex.acquire() 168 tid = _thread.start_new_thread(f, (mutex,)) 169 # Wait for the thread to finish. 170 mutex.acquire() 171 self.assertIn(tid, threading._active) 172 self.assertIsInstance(threading._active[tid], threading._DummyThread) 173 #Issue 29376 174 self.assertTrue(threading._active[tid].is_alive()) 175 self.assertRegex(repr(threading._active[tid]), '_DummyThread') 176 del threading._active[tid] 177 178 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 179 # exposed at the Python level. This test relies on ctypes to get at it. 180 def test_PyThreadState_SetAsyncExc(self): 181 ctypes = import_module("ctypes") 182 183 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 184 185 class AsyncExc(Exception): 186 pass 187 188 exception = ctypes.py_object(AsyncExc) 189 190 # First check it works when setting the exception from the same thread. 191 tid = threading.get_ident() 192 193 try: 194 result = set_async_exc(ctypes.c_long(tid), exception) 195 # The exception is async, so we might have to keep the VM busy until 196 # it notices. 197 while True: 198 pass 199 except AsyncExc: 200 pass 201 else: 202 # This code is unreachable but it reflects the intent. If we wanted 203 # to be smarter the above loop wouldn't be infinite. 204 self.fail("AsyncExc not raised") 205 try: 206 self.assertEqual(result, 1) # one thread state modified 207 except UnboundLocalError: 208 # The exception was raised too quickly for us to get the result. 209 pass 210 211 # `worker_started` is set by the thread when it's inside a try/except 212 # block waiting to catch the asynchronously set AsyncExc exception. 213 # `worker_saw_exception` is set by the thread upon catching that 214 # exception. 215 worker_started = threading.Event() 216 worker_saw_exception = threading.Event() 217 218 class Worker(threading.Thread): 219 def run(self): 220 self.id = threading.get_ident() 221 self.finished = False 222 223 try: 224 while True: 225 worker_started.set() 226 time.sleep(0.1) 227 except AsyncExc: 228 self.finished = True 229 worker_saw_exception.set() 230 231 t = Worker() 232 t.daemon = True # so if this fails, we don't hang Python at shutdown 233 t.start() 234 if verbose: 235 print(" started worker thread") 236 237 # Try a thread id that doesn't make sense. 238 if verbose: 239 print(" trying nonsensical thread id") 240 result = set_async_exc(ctypes.c_long(-1), exception) 241 self.assertEqual(result, 0) # no thread states modified 242 243 # Now raise an exception in the worker thread. 244 if verbose: 245 print(" waiting for worker thread to get started") 246 ret = worker_started.wait() 247 self.assertTrue(ret) 248 if verbose: 249 print(" verifying worker hasn't exited") 250 self.assertFalse(t.finished) 251 if verbose: 252 print(" attempting to raise asynch exception in worker") 253 result = set_async_exc(ctypes.c_long(t.id), exception) 254 self.assertEqual(result, 1) # one thread state modified 255 if verbose: 256 print(" waiting for worker to say it caught the exception") 257 worker_saw_exception.wait(timeout=10) 258 self.assertTrue(t.finished) 259 if verbose: 260 print(" all OK -- joining worker") 261 if t.finished: 262 t.join() 263 # else the thread is still running, and we have no way to kill it 264 265 def test_limbo_cleanup(self): 266 # Issue 7481: Failure to start thread should cleanup the limbo map. 267 def fail_new_thread(*args): 268 raise threading.ThreadError() 269 _start_new_thread = threading._start_new_thread 270 threading._start_new_thread = fail_new_thread 271 try: 272 t = threading.Thread(target=lambda: None) 273 self.assertRaises(threading.ThreadError, t.start) 274 self.assertFalse( 275 t in threading._limbo, 276 "Failed to cleanup _limbo map on failure of Thread.start().") 277 finally: 278 threading._start_new_thread = _start_new_thread 279 280 def test_finalize_runnning_thread(self): 281 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 282 # very late on python exit: on deallocation of a running thread for 283 # example. 284 import_module("ctypes") 285 286 rc, out, err = assert_python_failure("-c", """if 1: 287 import ctypes, sys, time, _thread 288 289 # This lock is used as a simple event variable. 290 ready = _thread.allocate_lock() 291 ready.acquire() 292 293 # Module globals are cleared before __del__ is run 294 # So we save the functions in class dict 295 class C: 296 ensure = ctypes.pythonapi.PyGILState_Ensure 297 release = ctypes.pythonapi.PyGILState_Release 298 def __del__(self): 299 state = self.ensure() 300 self.release(state) 301 302 def waitingThread(): 303 x = C() 304 ready.release() 305 time.sleep(100) 306 307 _thread.start_new_thread(waitingThread, ()) 308 ready.acquire() # Be sure the other thread is waiting. 309 sys.exit(42) 310 """) 311 self.assertEqual(rc, 42) 312 313 def test_finalize_with_trace(self): 314 # Issue1733757 315 # Avoid a deadlock when sys.settrace steps into threading._shutdown 316 assert_python_ok("-c", """if 1: 317 import sys, threading 318 319 # A deadlock-killer, to prevent the 320 # testsuite to hang forever 321 def killer(): 322 import os, time 323 time.sleep(2) 324 print('program blocked; aborting') 325 os._exit(2) 326 t = threading.Thread(target=killer) 327 t.daemon = True 328 t.start() 329 330 # This is the trace function 331 def func(frame, event, arg): 332 threading.current_thread() 333 return func 334 335 sys.settrace(func) 336 """) 337 338 def test_join_nondaemon_on_shutdown(self): 339 # Issue 1722344 340 # Raising SystemExit skipped threading._shutdown 341 rc, out, err = assert_python_ok("-c", """if 1: 342 import threading 343 from time import sleep 344 345 def child(): 346 sleep(1) 347 # As a non-daemon thread we SHOULD wake up and nothing 348 # should be torn down yet 349 print("Woke up, sleep function is:", sleep) 350 351 threading.Thread(target=child).start() 352 raise SystemExit 353 """) 354 self.assertEqual(out.strip(), 355 b"Woke up, sleep function is: <built-in function sleep>") 356 self.assertEqual(err, b"") 357 358 def test_enumerate_after_join(self): 359 # Try hard to trigger #1703448: a thread is still returned in 360 # threading.enumerate() after it has been join()ed. 361 enum = threading.enumerate 362 old_interval = sys.getswitchinterval() 363 try: 364 for i in range(1, 100): 365 sys.setswitchinterval(i * 0.0002) 366 t = threading.Thread(target=lambda: None) 367 t.start() 368 t.join() 369 l = enum() 370 self.assertNotIn(t, l, 371 "#1703448 triggered after %d trials: %s" % (i, l)) 372 finally: 373 sys.setswitchinterval(old_interval) 374 375 def test_no_refcycle_through_target(self): 376 class RunSelfFunction(object): 377 def __init__(self, should_raise): 378 # The links in this refcycle from Thread back to self 379 # should be cleaned up when the thread completes. 380 self.should_raise = should_raise 381 self.thread = threading.Thread(target=self._run, 382 args=(self,), 383 kwargs={'yet_another':self}) 384 self.thread.start() 385 386 def _run(self, other_ref, yet_another): 387 if self.should_raise: 388 raise SystemExit 389 390 cyclic_object = RunSelfFunction(should_raise=False) 391 weak_cyclic_object = weakref.ref(cyclic_object) 392 cyclic_object.thread.join() 393 del cyclic_object 394 self.assertIsNone(weak_cyclic_object(), 395 msg=('%d references still around' % 396 sys.getrefcount(weak_cyclic_object()))) 397 398 raising_cyclic_object = RunSelfFunction(should_raise=True) 399 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 400 raising_cyclic_object.thread.join() 401 del raising_cyclic_object 402 self.assertIsNone(weak_raising_cyclic_object(), 403 msg=('%d references still around' % 404 sys.getrefcount(weak_raising_cyclic_object()))) 405 406 def test_old_threading_api(self): 407 # Just a quick sanity check to make sure the old method names are 408 # still present 409 t = threading.Thread() 410 t.isDaemon() 411 t.setDaemon(True) 412 t.getName() 413 t.setName("name") 414 t.isAlive() 415 e = threading.Event() 416 e.isSet() 417 threading.activeCount() 418 419 def test_repr_daemon(self): 420 t = threading.Thread() 421 self.assertNotIn('daemon', repr(t)) 422 t.daemon = True 423 self.assertIn('daemon', repr(t)) 424 425 def test_deamon_param(self): 426 t = threading.Thread() 427 self.assertFalse(t.daemon) 428 t = threading.Thread(daemon=False) 429 self.assertFalse(t.daemon) 430 t = threading.Thread(daemon=True) 431 self.assertTrue(t.daemon) 432 433 @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') 434 def test_dummy_thread_after_fork(self): 435 # Issue #14308: a dummy thread in the active list doesn't mess up 436 # the after-fork mechanism. 437 code = """if 1: 438 import _thread, threading, os, time 439 440 def background_thread(evt): 441 # Creates and registers the _DummyThread instance 442 threading.current_thread() 443 evt.set() 444 time.sleep(10) 445 446 evt = threading.Event() 447 _thread.start_new_thread(background_thread, (evt,)) 448 evt.wait() 449 assert threading.active_count() == 2, threading.active_count() 450 if os.fork() == 0: 451 assert threading.active_count() == 1, threading.active_count() 452 os._exit(0) 453 else: 454 os.wait() 455 """ 456 _, out, err = assert_python_ok("-c", code) 457 self.assertEqual(out, b'') 458 self.assertEqual(err, b'') 459 460 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 461 def test_is_alive_after_fork(self): 462 # Try hard to trigger #18418: is_alive() could sometimes be True on 463 # threads that vanished after a fork. 464 old_interval = sys.getswitchinterval() 465 self.addCleanup(sys.setswitchinterval, old_interval) 466 467 # Make the bug more likely to manifest. 468 test.support.setswitchinterval(1e-6) 469 470 for i in range(20): 471 t = threading.Thread(target=lambda: None) 472 t.start() 473 self.addCleanup(t.join) 474 pid = os.fork() 475 if pid == 0: 476 os._exit(1 if t.is_alive() else 0) 477 else: 478 pid, status = os.waitpid(pid, 0) 479 self.assertEqual(0, status) 480 481 def test_main_thread(self): 482 main = threading.main_thread() 483 self.assertEqual(main.name, 'MainThread') 484 self.assertEqual(main.ident, threading.current_thread().ident) 485 self.assertEqual(main.ident, threading.get_ident()) 486 487 def f(): 488 self.assertNotEqual(threading.main_thread().ident, 489 threading.current_thread().ident) 490 th = threading.Thread(target=f) 491 th.start() 492 th.join() 493 494 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 495 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 496 def test_main_thread_after_fork(self): 497 code = """if 1: 498 import os, threading 499 500 pid = os.fork() 501 if pid == 0: 502 main = threading.main_thread() 503 print(main.name) 504 print(main.ident == threading.current_thread().ident) 505 print(main.ident == threading.get_ident()) 506 else: 507 os.waitpid(pid, 0) 508 """ 509 _, out, err = assert_python_ok("-c", code) 510 data = out.decode().replace('\r', '') 511 self.assertEqual(err, b"") 512 self.assertEqual(data, "MainThread\nTrue\nTrue\n") 513 514 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 515 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 516 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 517 def test_main_thread_after_fork_from_nonmain_thread(self): 518 code = """if 1: 519 import os, threading, sys 520 521 def f(): 522 pid = os.fork() 523 if pid == 0: 524 main = threading.main_thread() 525 print(main.name) 526 print(main.ident == threading.current_thread().ident) 527 print(main.ident == threading.get_ident()) 528 # stdout is fully buffered because not a tty, 529 # we have to flush before exit. 530 sys.stdout.flush() 531 else: 532 os.waitpid(pid, 0) 533 534 th = threading.Thread(target=f) 535 th.start() 536 th.join() 537 """ 538 _, out, err = assert_python_ok("-c", code) 539 data = out.decode().replace('\r', '') 540 self.assertEqual(err, b"") 541 self.assertEqual(data, "Thread-1\nTrue\nTrue\n") 542 543 def test_tstate_lock(self): 544 # Test an implementation detail of Thread objects. 545 started = _thread.allocate_lock() 546 finish = _thread.allocate_lock() 547 started.acquire() 548 finish.acquire() 549 def f(): 550 started.release() 551 finish.acquire() 552 time.sleep(0.01) 553 # The tstate lock is None until the thread is started 554 t = threading.Thread(target=f) 555 self.assertIs(t._tstate_lock, None) 556 t.start() 557 started.acquire() 558 self.assertTrue(t.is_alive()) 559 # The tstate lock can't be acquired when the thread is running 560 # (or suspended). 561 tstate_lock = t._tstate_lock 562 self.assertFalse(tstate_lock.acquire(timeout=0), False) 563 finish.release() 564 # When the thread ends, the state_lock can be successfully 565 # acquired. 566 self.assertTrue(tstate_lock.acquire(timeout=5), False) 567 # But is_alive() is still True: we hold _tstate_lock now, which 568 # prevents is_alive() from knowing the thread's end-of-life C code 569 # is done. 570 self.assertTrue(t.is_alive()) 571 # Let is_alive() find out the C code is done. 572 tstate_lock.release() 573 self.assertFalse(t.is_alive()) 574 # And verify the thread disposed of _tstate_lock. 575 self.assertIsNone(t._tstate_lock) 576 577 def test_repr_stopped(self): 578 # Verify that "stopped" shows up in repr(Thread) appropriately. 579 started = _thread.allocate_lock() 580 finish = _thread.allocate_lock() 581 started.acquire() 582 finish.acquire() 583 def f(): 584 started.release() 585 finish.acquire() 586 t = threading.Thread(target=f) 587 t.start() 588 started.acquire() 589 self.assertIn("started", repr(t)) 590 finish.release() 591 # "stopped" should appear in the repr in a reasonable amount of time. 592 # Implementation detail: as of this writing, that's trivially true 593 # if .join() is called, and almost trivially true if .is_alive() is 594 # called. The detail we're testing here is that "stopped" shows up 595 # "all on its own". 596 LOOKING_FOR = "stopped" 597 for i in range(500): 598 if LOOKING_FOR in repr(t): 599 break 600 time.sleep(0.01) 601 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds 602 603 def test_BoundedSemaphore_limit(self): 604 # BoundedSemaphore should raise ValueError if released too often. 605 for limit in range(1, 10): 606 bs = threading.BoundedSemaphore(limit) 607 threads = [threading.Thread(target=bs.acquire) 608 for _ in range(limit)] 609 for t in threads: 610 t.start() 611 for t in threads: 612 t.join() 613 threads = [threading.Thread(target=bs.release) 614 for _ in range(limit)] 615 for t in threads: 616 t.start() 617 for t in threads: 618 t.join() 619 self.assertRaises(ValueError, bs.release) 620 621 @cpython_only 622 def test_frame_tstate_tracing(self): 623 # Issue #14432: Crash when a generator is created in a C thread that is 624 # destroyed while the generator is still used. The issue was that a 625 # generator contains a frame, and the frame kept a reference to the 626 # Python state of the destroyed C thread. The crash occurs when a trace 627 # function is setup. 628 629 def noop_trace(frame, event, arg): 630 # no operation 631 return noop_trace 632 633 def generator(): 634 while 1: 635 yield "generator" 636 637 def callback(): 638 if callback.gen is None: 639 callback.gen = generator() 640 return next(callback.gen) 641 callback.gen = None 642 643 old_trace = sys.gettrace() 644 sys.settrace(noop_trace) 645 try: 646 # Install a trace function 647 threading.settrace(noop_trace) 648 649 # Create a generator in a C thread which exits after the call 650 import _testcapi 651 _testcapi.call_in_temporary_c_thread(callback) 652 653 # Call the generator in a different Python thread, check that the 654 # generator didn't keep a reference to the destroyed thread state 655 for test in range(3): 656 # The trace function is still called here 657 callback() 658 finally: 659 sys.settrace(old_trace) 660 661 662class ThreadJoinOnShutdown(BaseTestCase): 663 664 def _run_and_join(self, script): 665 script = """if 1: 666 import sys, os, time, threading 667 668 # a thread, which waits for the main program to terminate 669 def joiningfunc(mainthread): 670 mainthread.join() 671 print('end of thread') 672 # stdout is fully buffered because not a tty, we have to flush 673 # before exit. 674 sys.stdout.flush() 675 \n""" + script 676 677 rc, out, err = assert_python_ok("-c", script) 678 data = out.decode().replace('\r', '') 679 self.assertEqual(data, "end of main\nend of thread\n") 680 681 def test_1_join_on_shutdown(self): 682 # The usual case: on exit, wait for a non-daemon thread 683 script = """if 1: 684 import os 685 t = threading.Thread(target=joiningfunc, 686 args=(threading.current_thread(),)) 687 t.start() 688 time.sleep(0.1) 689 print('end of main') 690 """ 691 self._run_and_join(script) 692 693 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 694 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 695 def test_2_join_in_forked_process(self): 696 # Like the test above, but from a forked interpreter 697 script = """if 1: 698 childpid = os.fork() 699 if childpid != 0: 700 os.waitpid(childpid, 0) 701 sys.exit(0) 702 703 t = threading.Thread(target=joiningfunc, 704 args=(threading.current_thread(),)) 705 t.start() 706 print('end of main') 707 """ 708 self._run_and_join(script) 709 710 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 711 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 712 def test_3_join_in_forked_from_thread(self): 713 # Like the test above, but fork() was called from a worker thread 714 # In the forked process, the main Thread object must be marked as stopped. 715 716 script = """if 1: 717 main_thread = threading.current_thread() 718 def worker(): 719 childpid = os.fork() 720 if childpid != 0: 721 os.waitpid(childpid, 0) 722 sys.exit(0) 723 724 t = threading.Thread(target=joiningfunc, 725 args=(main_thread,)) 726 print('end of main') 727 t.start() 728 t.join() # Should not block: main_thread is already stopped 729 730 w = threading.Thread(target=worker) 731 w.start() 732 """ 733 self._run_and_join(script) 734 735 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 736 def test_4_daemon_threads(self): 737 # Check that a daemon thread cannot crash the interpreter on shutdown 738 # by manipulating internal structures that are being disposed of in 739 # the main thread. 740 script = """if True: 741 import os 742 import random 743 import sys 744 import time 745 import threading 746 747 thread_has_run = set() 748 749 def random_io(): 750 '''Loop for a while sleeping random tiny amounts and doing some I/O.''' 751 while True: 752 in_f = open(os.__file__, 'rb') 753 stuff = in_f.read(200) 754 null_f = open(os.devnull, 'wb') 755 null_f.write(stuff) 756 time.sleep(random.random() / 1995) 757 null_f.close() 758 in_f.close() 759 thread_has_run.add(threading.current_thread()) 760 761 def main(): 762 count = 0 763 for _ in range(40): 764 new_thread = threading.Thread(target=random_io) 765 new_thread.daemon = True 766 new_thread.start() 767 count += 1 768 while len(thread_has_run) < count: 769 time.sleep(0.001) 770 # Trigger process shutdown 771 sys.exit(0) 772 773 main() 774 """ 775 rc, out, err = assert_python_ok('-c', script) 776 self.assertFalse(err) 777 778 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 779 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 780 def test_reinit_tls_after_fork(self): 781 # Issue #13817: fork() would deadlock in a multithreaded program with 782 # the ad-hoc TLS implementation. 783 784 def do_fork_and_wait(): 785 # just fork a child process and wait it 786 pid = os.fork() 787 if pid > 0: 788 os.waitpid(pid, 0) 789 else: 790 os._exit(0) 791 792 # start a bunch of threads that will fork() child processes 793 threads = [] 794 for i in range(16): 795 t = threading.Thread(target=do_fork_and_wait) 796 threads.append(t) 797 t.start() 798 799 for t in threads: 800 t.join() 801 802 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 803 def test_clear_threads_states_after_fork(self): 804 # Issue #17094: check that threads states are cleared after fork() 805 806 # start a bunch of threads 807 threads = [] 808 for i in range(16): 809 t = threading.Thread(target=lambda : time.sleep(0.3)) 810 threads.append(t) 811 t.start() 812 813 pid = os.fork() 814 if pid == 0: 815 # check that threads states have been cleared 816 if len(sys._current_frames()) == 1: 817 os._exit(0) 818 else: 819 os._exit(1) 820 else: 821 _, status = os.waitpid(pid, 0) 822 self.assertEqual(0, status) 823 824 for t in threads: 825 t.join() 826 827 828class SubinterpThreadingTests(BaseTestCase): 829 830 def test_threads_join(self): 831 # Non-daemon threads should be joined at subinterpreter shutdown 832 # (issue #18808) 833 r, w = os.pipe() 834 self.addCleanup(os.close, r) 835 self.addCleanup(os.close, w) 836 code = r"""if 1: 837 import os 838 import threading 839 import time 840 841 def f(): 842 # Sleep a bit so that the thread is still running when 843 # Py_EndInterpreter is called. 844 time.sleep(0.05) 845 os.write(%d, b"x") 846 threading.Thread(target=f).start() 847 """ % (w,) 848 ret = test.support.run_in_subinterp(code) 849 self.assertEqual(ret, 0) 850 # The thread was joined properly. 851 self.assertEqual(os.read(r, 1), b"x") 852 853 def test_threads_join_2(self): 854 # Same as above, but a delay gets introduced after the thread's 855 # Python code returned but before the thread state is deleted. 856 # To achieve this, we register a thread-local object which sleeps 857 # a bit when deallocated. 858 r, w = os.pipe() 859 self.addCleanup(os.close, r) 860 self.addCleanup(os.close, w) 861 code = r"""if 1: 862 import os 863 import threading 864 import time 865 866 class Sleeper: 867 def __del__(self): 868 time.sleep(0.05) 869 870 tls = threading.local() 871 872 def f(): 873 # Sleep a bit so that the thread is still running when 874 # Py_EndInterpreter is called. 875 time.sleep(0.05) 876 tls.x = Sleeper() 877 os.write(%d, b"x") 878 threading.Thread(target=f).start() 879 """ % (w,) 880 ret = test.support.run_in_subinterp(code) 881 self.assertEqual(ret, 0) 882 # The thread was joined properly. 883 self.assertEqual(os.read(r, 1), b"x") 884 885 @cpython_only 886 def test_daemon_threads_fatal_error(self): 887 subinterp_code = r"""if 1: 888 import os 889 import threading 890 import time 891 892 def f(): 893 # Make sure the daemon thread is still running when 894 # Py_EndInterpreter is called. 895 time.sleep(10) 896 threading.Thread(target=f, daemon=True).start() 897 """ 898 script = r"""if 1: 899 import _testcapi 900 901 _testcapi.run_in_subinterp(%r) 902 """ % (subinterp_code,) 903 with test.support.SuppressCrashReport(): 904 rc, out, err = assert_python_failure("-c", script) 905 self.assertIn("Fatal Python error: Py_EndInterpreter: " 906 "not the last thread", err.decode()) 907 908 909class ThreadingExceptionTests(BaseTestCase): 910 # A RuntimeError should be raised if Thread.start() is called 911 # multiple times. 912 def test_start_thread_again(self): 913 thread = threading.Thread() 914 thread.start() 915 self.assertRaises(RuntimeError, thread.start) 916 917 def test_joining_current_thread(self): 918 current_thread = threading.current_thread() 919 self.assertRaises(RuntimeError, current_thread.join); 920 921 def test_joining_inactive_thread(self): 922 thread = threading.Thread() 923 self.assertRaises(RuntimeError, thread.join) 924 925 def test_daemonize_active_thread(self): 926 thread = threading.Thread() 927 thread.start() 928 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 929 930 def test_releasing_unacquired_lock(self): 931 lock = threading.Lock() 932 self.assertRaises(RuntimeError, lock.release) 933 934 @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(), 935 'test macosx problem') 936 def test_recursion_limit(self): 937 # Issue 9670 938 # test that excessive recursion within a non-main thread causes 939 # an exception rather than crashing the interpreter on platforms 940 # like Mac OS X or FreeBSD which have small default stack sizes 941 # for threads 942 script = """if True: 943 import threading 944 945 def recurse(): 946 return recurse() 947 948 def outer(): 949 try: 950 recurse() 951 except RecursionError: 952 pass 953 954 w = threading.Thread(target=outer) 955 w.start() 956 w.join() 957 print('end of main thread') 958 """ 959 expected_output = "end of main thread\n" 960 p = subprocess.Popen([sys.executable, "-c", script], 961 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 962 stdout, stderr = p.communicate() 963 data = stdout.decode().replace('\r', '') 964 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) 965 self.assertEqual(data, expected_output) 966 967 def test_print_exception(self): 968 script = r"""if True: 969 import threading 970 import time 971 972 running = False 973 def run(): 974 global running 975 running = True 976 while running: 977 time.sleep(0.01) 978 1/0 979 t = threading.Thread(target=run) 980 t.start() 981 while not running: 982 time.sleep(0.01) 983 running = False 984 t.join() 985 """ 986 rc, out, err = assert_python_ok("-c", script) 987 self.assertEqual(out, b'') 988 err = err.decode() 989 self.assertIn("Exception in thread", err) 990 self.assertIn("Traceback (most recent call last):", err) 991 self.assertIn("ZeroDivisionError", err) 992 self.assertNotIn("Unhandled exception", err) 993 994 @requires_type_collecting 995 def test_print_exception_stderr_is_none_1(self): 996 script = r"""if True: 997 import sys 998 import threading 999 import time 1000 1001 running = False 1002 def run(): 1003 global running 1004 running = True 1005 while running: 1006 time.sleep(0.01) 1007 1/0 1008 t = threading.Thread(target=run) 1009 t.start() 1010 while not running: 1011 time.sleep(0.01) 1012 sys.stderr = None 1013 running = False 1014 t.join() 1015 """ 1016 rc, out, err = assert_python_ok("-c", script) 1017 self.assertEqual(out, b'') 1018 err = err.decode() 1019 self.assertIn("Exception in thread", err) 1020 self.assertIn("Traceback (most recent call last):", err) 1021 self.assertIn("ZeroDivisionError", err) 1022 self.assertNotIn("Unhandled exception", err) 1023 1024 def test_print_exception_stderr_is_none_2(self): 1025 script = r"""if True: 1026 import sys 1027 import threading 1028 import time 1029 1030 running = False 1031 def run(): 1032 global running 1033 running = True 1034 while running: 1035 time.sleep(0.01) 1036 1/0 1037 sys.stderr = None 1038 t = threading.Thread(target=run) 1039 t.start() 1040 while not running: 1041 time.sleep(0.01) 1042 running = False 1043 t.join() 1044 """ 1045 rc, out, err = assert_python_ok("-c", script) 1046 self.assertEqual(out, b'') 1047 self.assertNotIn("Unhandled exception", err.decode()) 1048 1049 def test_bare_raise_in_brand_new_thread(self): 1050 def bare_raise(): 1051 raise 1052 1053 class Issue27558(threading.Thread): 1054 exc = None 1055 1056 def run(self): 1057 try: 1058 bare_raise() 1059 except Exception as exc: 1060 self.exc = exc 1061 1062 thread = Issue27558() 1063 thread.start() 1064 thread.join() 1065 self.assertIsNotNone(thread.exc) 1066 self.assertIsInstance(thread.exc, RuntimeError) 1067 1068class TimerTests(BaseTestCase): 1069 1070 def setUp(self): 1071 BaseTestCase.setUp(self) 1072 self.callback_args = [] 1073 self.callback_event = threading.Event() 1074 1075 def test_init_immutable_default_args(self): 1076 # Issue 17435: constructor defaults were mutable objects, they could be 1077 # mutated via the object attributes and affect other Timer objects. 1078 timer1 = threading.Timer(0.01, self._callback_spy) 1079 timer1.start() 1080 self.callback_event.wait() 1081 timer1.args.append("blah") 1082 timer1.kwargs["foo"] = "bar" 1083 self.callback_event.clear() 1084 timer2 = threading.Timer(0.01, self._callback_spy) 1085 timer2.start() 1086 self.callback_event.wait() 1087 self.assertEqual(len(self.callback_args), 2) 1088 self.assertEqual(self.callback_args, [((), {}), ((), {})]) 1089 1090 def _callback_spy(self, *args, **kwargs): 1091 self.callback_args.append((args[:], kwargs.copy())) 1092 self.callback_event.set() 1093 1094class LockTests(lock_tests.LockTests): 1095 locktype = staticmethod(threading.Lock) 1096 1097class PyRLockTests(lock_tests.RLockTests): 1098 locktype = staticmethod(threading._PyRLock) 1099 1100@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') 1101class CRLockTests(lock_tests.RLockTests): 1102 locktype = staticmethod(threading._CRLock) 1103 1104class EventTests(lock_tests.EventTests): 1105 eventtype = staticmethod(threading.Event) 1106 1107class ConditionAsRLockTests(lock_tests.RLockTests): 1108 # Condition uses an RLock by default and exports its API. 1109 locktype = staticmethod(threading.Condition) 1110 1111class ConditionTests(lock_tests.ConditionTests): 1112 condtype = staticmethod(threading.Condition) 1113 1114class SemaphoreTests(lock_tests.SemaphoreTests): 1115 semtype = staticmethod(threading.Semaphore) 1116 1117class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 1118 semtype = staticmethod(threading.BoundedSemaphore) 1119 1120class BarrierTests(lock_tests.BarrierTests): 1121 barriertype = staticmethod(threading.Barrier) 1122 1123class MiscTestCase(unittest.TestCase): 1124 def test__all__(self): 1125 extra = {"ThreadError"} 1126 blacklist = {'currentThread', 'activeCount'} 1127 support.check__all__(self, threading, ('threading', '_thread'), 1128 extra=extra, blacklist=blacklist) 1129 1130if __name__ == "__main__": 1131 unittest.main() 1132