• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Tests for the threading module.
3"""
4
5import test.support
6from test.support import threading_helper
7from test.support import verbose, cpython_only, os_helper
8from test.support.import_helper import import_module
9from test.support.script_helper import assert_python_ok, assert_python_failure
10
11import random
12import sys
13import _thread
14import threading
15import time
16import unittest
17import weakref
18import os
19import subprocess
20import signal
21import textwrap
22import traceback
23
24from unittest import mock
25from test import lock_tests
26from test import support
27
28
29# Between fork() and exec(), only async-safe functions are allowed (issues
30# #12316 and #11870), and fork() from a worker thread is known to trigger
31# problems with some operating systems (issue #3863): skip problematic tests
32# on platforms known to behave badly.
33platforms_to_skip = ('netbsd5', 'hp-ux11')
34
35# Is Python built with Py_DEBUG macro defined?
36Py_DEBUG = hasattr(sys, 'gettotalrefcount')
37
38
39def restore_default_excepthook(testcase):
40    testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
41    threading.excepthook = threading.__excepthook__
42
43
44# A trivial mutable counter.
45class Counter(object):
46    def __init__(self):
47        self.value = 0
48    def inc(self):
49        self.value += 1
50    def dec(self):
51        self.value -= 1
52    def get(self):
53        return self.value
54
55class TestThread(threading.Thread):
56    def __init__(self, name, testcase, sema, mutex, nrunning):
57        threading.Thread.__init__(self, name=name)
58        self.testcase = testcase
59        self.sema = sema
60        self.mutex = mutex
61        self.nrunning = nrunning
62
63    def run(self):
64        delay = random.random() / 10000.0
65        if verbose:
66            print('task %s will run for %.1f usec' %
67                  (self.name, delay * 1e6))
68
69        with self.sema:
70            with self.mutex:
71                self.nrunning.inc()
72                if verbose:
73                    print(self.nrunning.get(), 'tasks are running')
74                self.testcase.assertLessEqual(self.nrunning.get(), 3)
75
76            time.sleep(delay)
77            if verbose:
78                print('task', self.name, 'done')
79
80            with self.mutex:
81                self.nrunning.dec()
82                self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
83                if verbose:
84                    print('%s is finished. %d tasks are running' %
85                          (self.name, self.nrunning.get()))
86
87
88class BaseTestCase(unittest.TestCase):
89    def setUp(self):
90        self._threads = threading_helper.threading_setup()
91
92    def tearDown(self):
93        threading_helper.threading_cleanup(*self._threads)
94        test.support.reap_children()
95
96
97class ThreadTests(BaseTestCase):
98
99    @cpython_only
100    def test_name(self):
101        def func(): pass
102
103        thread = threading.Thread(name="myname1")
104        self.assertEqual(thread.name, "myname1")
105
106        # Convert int name to str
107        thread = threading.Thread(name=123)
108        self.assertEqual(thread.name, "123")
109
110        # target name is ignored if name is specified
111        thread = threading.Thread(target=func, name="myname2")
112        self.assertEqual(thread.name, "myname2")
113
114        with mock.patch.object(threading, '_counter', return_value=2):
115            thread = threading.Thread(name="")
116            self.assertEqual(thread.name, "Thread-2")
117
118        with mock.patch.object(threading, '_counter', return_value=3):
119            thread = threading.Thread()
120            self.assertEqual(thread.name, "Thread-3")
121
122        with mock.patch.object(threading, '_counter', return_value=5):
123            thread = threading.Thread(target=func)
124            self.assertEqual(thread.name, "Thread-5 (func)")
125
126    @cpython_only
127    def test_disallow_instantiation(self):
128        # Ensure that the type disallows instantiation (bpo-43916)
129        lock = threading.Lock()
130        test.support.check_disallow_instantiation(self, type(lock))
131
132    # Create a bunch of threads, let each do some work, wait until all are
133    # done.
134    def test_various_ops(self):
135        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
136        # times about 1 second per clump).
137        NUMTASKS = 10
138
139        # no more than 3 of the 10 can run at once
140        sema = threading.BoundedSemaphore(value=3)
141        mutex = threading.RLock()
142        numrunning = Counter()
143
144        threads = []
145
146        for i in range(NUMTASKS):
147            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
148            threads.append(t)
149            self.assertIsNone(t.ident)
150            self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
151            t.start()
152
153        if hasattr(threading, 'get_native_id'):
154            native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
155            self.assertNotIn(None, native_ids)
156            self.assertEqual(len(native_ids), NUMTASKS + 1)
157
158        if verbose:
159            print('waiting for all tasks to complete')
160        for t in threads:
161            t.join()
162            self.assertFalse(t.is_alive())
163            self.assertNotEqual(t.ident, 0)
164            self.assertIsNotNone(t.ident)
165            self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
166        if verbose:
167            print('all tasks done')
168        self.assertEqual(numrunning.get(), 0)
169
170    def test_ident_of_no_threading_threads(self):
171        # The ident still must work for the main thread and dummy threads.
172        self.assertIsNotNone(threading.current_thread().ident)
173        def f():
174            ident.append(threading.current_thread().ident)
175            done.set()
176        done = threading.Event()
177        ident = []
178        with threading_helper.wait_threads_exit():
179            tid = _thread.start_new_thread(f, ())
180            done.wait()
181            self.assertEqual(ident[0], tid)
182        # Kill the "immortal" _DummyThread
183        del threading._active[ident[0]]
184
185    # run with a small(ish) thread stack size (256 KiB)
186    def test_various_ops_small_stack(self):
187        if verbose:
188            print('with 256 KiB thread stack size...')
189        try:
190            threading.stack_size(262144)
191        except _thread.error:
192            raise unittest.SkipTest(
193                'platform does not support changing thread stack size')
194        self.test_various_ops()
195        threading.stack_size(0)
196
197    # run with a large thread stack size (1 MiB)
198    def test_various_ops_large_stack(self):
199        if verbose:
200            print('with 1 MiB thread stack size...')
201        try:
202            threading.stack_size(0x100000)
203        except _thread.error:
204            raise unittest.SkipTest(
205                'platform does not support changing thread stack size')
206        self.test_various_ops()
207        threading.stack_size(0)
208
209    def test_foreign_thread(self):
210        # Check that a "foreign" thread can use the threading module.
211        def f(mutex):
212            # Calling current_thread() forces an entry for the foreign
213            # thread to get made in the threading._active map.
214            threading.current_thread()
215            mutex.release()
216
217        mutex = threading.Lock()
218        mutex.acquire()
219        with threading_helper.wait_threads_exit():
220            tid = _thread.start_new_thread(f, (mutex,))
221            # Wait for the thread to finish.
222            mutex.acquire()
223        self.assertIn(tid, threading._active)
224        self.assertIsInstance(threading._active[tid], threading._DummyThread)
225        #Issue 29376
226        self.assertTrue(threading._active[tid].is_alive())
227        self.assertRegex(repr(threading._active[tid]), '_DummyThread')
228        del threading._active[tid]
229
230    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
231    # exposed at the Python level.  This test relies on ctypes to get at it.
232    def test_PyThreadState_SetAsyncExc(self):
233        ctypes = import_module("ctypes")
234
235        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
236        set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
237
238        class AsyncExc(Exception):
239            pass
240
241        exception = ctypes.py_object(AsyncExc)
242
243        # First check it works when setting the exception from the same thread.
244        tid = threading.get_ident()
245        self.assertIsInstance(tid, int)
246        self.assertGreater(tid, 0)
247
248        try:
249            result = set_async_exc(tid, exception)
250            # The exception is async, so we might have to keep the VM busy until
251            # it notices.
252            while True:
253                pass
254        except AsyncExc:
255            pass
256        else:
257            # This code is unreachable but it reflects the intent. If we wanted
258            # to be smarter the above loop wouldn't be infinite.
259            self.fail("AsyncExc not raised")
260        try:
261            self.assertEqual(result, 1) # one thread state modified
262        except UnboundLocalError:
263            # The exception was raised too quickly for us to get the result.
264            pass
265
266        # `worker_started` is set by the thread when it's inside a try/except
267        # block waiting to catch the asynchronously set AsyncExc exception.
268        # `worker_saw_exception` is set by the thread upon catching that
269        # exception.
270        worker_started = threading.Event()
271        worker_saw_exception = threading.Event()
272
273        class Worker(threading.Thread):
274            def run(self):
275                self.id = threading.get_ident()
276                self.finished = False
277
278                try:
279                    while True:
280                        worker_started.set()
281                        time.sleep(0.1)
282                except AsyncExc:
283                    self.finished = True
284                    worker_saw_exception.set()
285
286        t = Worker()
287        t.daemon = True # so if this fails, we don't hang Python at shutdown
288        t.start()
289        if verbose:
290            print("    started worker thread")
291
292        # Try a thread id that doesn't make sense.
293        if verbose:
294            print("    trying nonsensical thread id")
295        result = set_async_exc(-1, exception)
296        self.assertEqual(result, 0)  # no thread states modified
297
298        # Now raise an exception in the worker thread.
299        if verbose:
300            print("    waiting for worker thread to get started")
301        ret = worker_started.wait()
302        self.assertTrue(ret)
303        if verbose:
304            print("    verifying worker hasn't exited")
305        self.assertFalse(t.finished)
306        if verbose:
307            print("    attempting to raise asynch exception in worker")
308        result = set_async_exc(t.id, exception)
309        self.assertEqual(result, 1) # one thread state modified
310        if verbose:
311            print("    waiting for worker to say it caught the exception")
312        worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
313        self.assertTrue(t.finished)
314        if verbose:
315            print("    all OK -- joining worker")
316        if t.finished:
317            t.join()
318        # else the thread is still running, and we have no way to kill it
319
320    def test_limbo_cleanup(self):
321        # Issue 7481: Failure to start thread should cleanup the limbo map.
322        def fail_new_thread(*args):
323            raise threading.ThreadError()
324        _start_new_thread = threading._start_new_thread
325        threading._start_new_thread = fail_new_thread
326        try:
327            t = threading.Thread(target=lambda: None)
328            self.assertRaises(threading.ThreadError, t.start)
329            self.assertFalse(
330                t in threading._limbo,
331                "Failed to cleanup _limbo map on failure of Thread.start().")
332        finally:
333            threading._start_new_thread = _start_new_thread
334
335    def test_finalize_running_thread(self):
336        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
337        # very late on python exit: on deallocation of a running thread for
338        # example.
339        import_module("ctypes")
340
341        rc, out, err = assert_python_failure("-c", """if 1:
342            import ctypes, sys, time, _thread
343
344            # This lock is used as a simple event variable.
345            ready = _thread.allocate_lock()
346            ready.acquire()
347
348            # Module globals are cleared before __del__ is run
349            # So we save the functions in class dict
350            class C:
351                ensure = ctypes.pythonapi.PyGILState_Ensure
352                release = ctypes.pythonapi.PyGILState_Release
353                def __del__(self):
354                    state = self.ensure()
355                    self.release(state)
356
357            def waitingThread():
358                x = C()
359                ready.release()
360                time.sleep(100)
361
362            _thread.start_new_thread(waitingThread, ())
363            ready.acquire()  # Be sure the other thread is waiting.
364            sys.exit(42)
365            """)
366        self.assertEqual(rc, 42)
367
368    def test_finalize_with_trace(self):
369        # Issue1733757
370        # Avoid a deadlock when sys.settrace steps into threading._shutdown
371        assert_python_ok("-c", """if 1:
372            import sys, threading
373
374            # A deadlock-killer, to prevent the
375            # testsuite to hang forever
376            def killer():
377                import os, time
378                time.sleep(2)
379                print('program blocked; aborting')
380                os._exit(2)
381            t = threading.Thread(target=killer)
382            t.daemon = True
383            t.start()
384
385            # This is the trace function
386            def func(frame, event, arg):
387                threading.current_thread()
388                return func
389
390            sys.settrace(func)
391            """)
392
393    def test_join_nondaemon_on_shutdown(self):
394        # Issue 1722344
395        # Raising SystemExit skipped threading._shutdown
396        rc, out, err = assert_python_ok("-c", """if 1:
397                import threading
398                from time import sleep
399
400                def child():
401                    sleep(1)
402                    # As a non-daemon thread we SHOULD wake up and nothing
403                    # should be torn down yet
404                    print("Woke up, sleep function is:", sleep)
405
406                threading.Thread(target=child).start()
407                raise SystemExit
408            """)
409        self.assertEqual(out.strip(),
410            b"Woke up, sleep function is: <built-in function sleep>")
411        self.assertEqual(err, b"")
412
413    def test_enumerate_after_join(self):
414        # Try hard to trigger #1703448: a thread is still returned in
415        # threading.enumerate() after it has been join()ed.
416        enum = threading.enumerate
417        old_interval = sys.getswitchinterval()
418        try:
419            for i in range(1, 100):
420                sys.setswitchinterval(i * 0.0002)
421                t = threading.Thread(target=lambda: None)
422                t.start()
423                t.join()
424                l = enum()
425                self.assertNotIn(t, l,
426                    "#1703448 triggered after %d trials: %s" % (i, l))
427        finally:
428            sys.setswitchinterval(old_interval)
429
430    def test_no_refcycle_through_target(self):
431        class RunSelfFunction(object):
432            def __init__(self, should_raise):
433                # The links in this refcycle from Thread back to self
434                # should be cleaned up when the thread completes.
435                self.should_raise = should_raise
436                self.thread = threading.Thread(target=self._run,
437                                               args=(self,),
438                                               kwargs={'yet_another':self})
439                self.thread.start()
440
441            def _run(self, other_ref, yet_another):
442                if self.should_raise:
443                    raise SystemExit
444
445        restore_default_excepthook(self)
446
447        cyclic_object = RunSelfFunction(should_raise=False)
448        weak_cyclic_object = weakref.ref(cyclic_object)
449        cyclic_object.thread.join()
450        del cyclic_object
451        self.assertIsNone(weak_cyclic_object(),
452                         msg=('%d references still around' %
453                              sys.getrefcount(weak_cyclic_object())))
454
455        raising_cyclic_object = RunSelfFunction(should_raise=True)
456        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
457        raising_cyclic_object.thread.join()
458        del raising_cyclic_object
459        self.assertIsNone(weak_raising_cyclic_object(),
460                         msg=('%d references still around' %
461                              sys.getrefcount(weak_raising_cyclic_object())))
462
463    def test_old_threading_api(self):
464        # Just a quick sanity check to make sure the old method names are
465        # still present
466        t = threading.Thread()
467        with self.assertWarnsRegex(DeprecationWarning,
468                                   r'get the daemon attribute'):
469            t.isDaemon()
470        with self.assertWarnsRegex(DeprecationWarning,
471                                   r'set the daemon attribute'):
472            t.setDaemon(True)
473        with self.assertWarnsRegex(DeprecationWarning,
474                                   r'get the name attribute'):
475            t.getName()
476        with self.assertWarnsRegex(DeprecationWarning,
477                                   r'set the name attribute'):
478            t.setName("name")
479
480        e = threading.Event()
481        with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
482            e.isSet()
483
484        cond = threading.Condition()
485        cond.acquire()
486        with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
487            cond.notifyAll()
488
489        with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
490            threading.activeCount()
491        with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
492            threading.currentThread()
493
494    def test_repr_daemon(self):
495        t = threading.Thread()
496        self.assertNotIn('daemon', repr(t))
497        t.daemon = True
498        self.assertIn('daemon', repr(t))
499
500    def test_daemon_param(self):
501        t = threading.Thread()
502        self.assertFalse(t.daemon)
503        t = threading.Thread(daemon=False)
504        self.assertFalse(t.daemon)
505        t = threading.Thread(daemon=True)
506        self.assertTrue(t.daemon)
507
508    @unittest.skipUnless(hasattr(os, 'fork'), 'needs os.fork()')
509    def test_fork_at_exit(self):
510        # bpo-42350: Calling os.fork() after threading._shutdown() must
511        # not log an error.
512        code = textwrap.dedent("""
513            import atexit
514            import os
515            import sys
516            from test.support import wait_process
517
518            # Import the threading module to register its "at fork" callback
519            import threading
520
521            def exit_handler():
522                pid = os.fork()
523                if not pid:
524                    print("child process ok", file=sys.stderr, flush=True)
525                    # child process
526                else:
527                    wait_process(pid, exitcode=0)
528
529            # exit_handler() will be called after threading._shutdown()
530            atexit.register(exit_handler)
531        """)
532        _, out, err = assert_python_ok("-c", code)
533        self.assertEqual(out, b'')
534        self.assertEqual(err.rstrip(), b'child process ok')
535
536    @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
537    def test_dummy_thread_after_fork(self):
538        # Issue #14308: a dummy thread in the active list doesn't mess up
539        # the after-fork mechanism.
540        code = """if 1:
541            import _thread, threading, os, time
542
543            def background_thread(evt):
544                # Creates and registers the _DummyThread instance
545                threading.current_thread()
546                evt.set()
547                time.sleep(10)
548
549            evt = threading.Event()
550            _thread.start_new_thread(background_thread, (evt,))
551            evt.wait()
552            assert threading.active_count() == 2, threading.active_count()
553            if os.fork() == 0:
554                assert threading.active_count() == 1, threading.active_count()
555                os._exit(0)
556            else:
557                os.wait()
558        """
559        _, out, err = assert_python_ok("-c", code)
560        self.assertEqual(out, b'')
561        self.assertEqual(err, b'')
562
563    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
564    def test_is_alive_after_fork(self):
565        # Try hard to trigger #18418: is_alive() could sometimes be True on
566        # threads that vanished after a fork.
567        old_interval = sys.getswitchinterval()
568        self.addCleanup(sys.setswitchinterval, old_interval)
569
570        # Make the bug more likely to manifest.
571        test.support.setswitchinterval(1e-6)
572
573        for i in range(20):
574            t = threading.Thread(target=lambda: None)
575            t.start()
576            pid = os.fork()
577            if pid == 0:
578                os._exit(11 if t.is_alive() else 10)
579            else:
580                t.join()
581
582                support.wait_process(pid, exitcode=10)
583
584    def test_main_thread(self):
585        main = threading.main_thread()
586        self.assertEqual(main.name, 'MainThread')
587        self.assertEqual(main.ident, threading.current_thread().ident)
588        self.assertEqual(main.ident, threading.get_ident())
589
590        def f():
591            self.assertNotEqual(threading.main_thread().ident,
592                                threading.current_thread().ident)
593        th = threading.Thread(target=f)
594        th.start()
595        th.join()
596
597    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
598    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
599    def test_main_thread_after_fork(self):
600        code = """if 1:
601            import os, threading
602            from test import support
603
604            pid = os.fork()
605            if pid == 0:
606                main = threading.main_thread()
607                print(main.name)
608                print(main.ident == threading.current_thread().ident)
609                print(main.ident == threading.get_ident())
610            else:
611                support.wait_process(pid, exitcode=0)
612        """
613        _, out, err = assert_python_ok("-c", code)
614        data = out.decode().replace('\r', '')
615        self.assertEqual(err, b"")
616        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
617
618    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
619    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
620    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
621    def test_main_thread_after_fork_from_nonmain_thread(self):
622        code = """if 1:
623            import os, threading, sys
624            from test import support
625
626            def func():
627                pid = os.fork()
628                if pid == 0:
629                    main = threading.main_thread()
630                    print(main.name)
631                    print(main.ident == threading.current_thread().ident)
632                    print(main.ident == threading.get_ident())
633                    # stdout is fully buffered because not a tty,
634                    # we have to flush before exit.
635                    sys.stdout.flush()
636                else:
637                    support.wait_process(pid, exitcode=0)
638
639            th = threading.Thread(target=func)
640            th.start()
641            th.join()
642        """
643        _, out, err = assert_python_ok("-c", code)
644        data = out.decode().replace('\r', '')
645        self.assertEqual(err, b"")
646        self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n")
647
648    def test_main_thread_during_shutdown(self):
649        # bpo-31516: current_thread() should still point to the main thread
650        # at shutdown
651        code = """if 1:
652            import gc, threading
653
654            main_thread = threading.current_thread()
655            assert main_thread is threading.main_thread()  # sanity check
656
657            class RefCycle:
658                def __init__(self):
659                    self.cycle = self
660
661                def __del__(self):
662                    print("GC:",
663                          threading.current_thread() is main_thread,
664                          threading.main_thread() is main_thread,
665                          threading.enumerate() == [main_thread])
666
667            RefCycle()
668            gc.collect()  # sanity check
669            x = RefCycle()
670        """
671        _, out, err = assert_python_ok("-c", code)
672        data = out.decode()
673        self.assertEqual(err, b"")
674        self.assertEqual(data.splitlines(),
675                         ["GC: True True True"] * 2)
676
677    def test_finalization_shutdown(self):
678        # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
679        # until Python thread states of all non-daemon threads get deleted.
680        #
681        # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
682        # test the finalization of the main interpreter.
683        code = """if 1:
684            import os
685            import threading
686            import time
687            import random
688
689            def random_sleep():
690                seconds = random.random() * 0.010
691                time.sleep(seconds)
692
693            class Sleeper:
694                def __del__(self):
695                    random_sleep()
696
697            tls = threading.local()
698
699            def f():
700                # Sleep a bit so that the thread is still running when
701                # Py_Finalize() is called.
702                random_sleep()
703                tls.x = Sleeper()
704                random_sleep()
705
706            threading.Thread(target=f).start()
707            random_sleep()
708        """
709        rc, out, err = assert_python_ok("-c", code)
710        self.assertEqual(err, b"")
711
712    def test_tstate_lock(self):
713        # Test an implementation detail of Thread objects.
714        started = _thread.allocate_lock()
715        finish = _thread.allocate_lock()
716        started.acquire()
717        finish.acquire()
718        def f():
719            started.release()
720            finish.acquire()
721            time.sleep(0.01)
722        # The tstate lock is None until the thread is started
723        t = threading.Thread(target=f)
724        self.assertIs(t._tstate_lock, None)
725        t.start()
726        started.acquire()
727        self.assertTrue(t.is_alive())
728        # The tstate lock can't be acquired when the thread is running
729        # (or suspended).
730        tstate_lock = t._tstate_lock
731        self.assertFalse(tstate_lock.acquire(timeout=0), False)
732        finish.release()
733        # When the thread ends, the state_lock can be successfully
734        # acquired.
735        self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False)
736        # But is_alive() is still True:  we hold _tstate_lock now, which
737        # prevents is_alive() from knowing the thread's end-of-life C code
738        # is done.
739        self.assertTrue(t.is_alive())
740        # Let is_alive() find out the C code is done.
741        tstate_lock.release()
742        self.assertFalse(t.is_alive())
743        # And verify the thread disposed of _tstate_lock.
744        self.assertIsNone(t._tstate_lock)
745        t.join()
746
747    def test_repr_stopped(self):
748        # Verify that "stopped" shows up in repr(Thread) appropriately.
749        started = _thread.allocate_lock()
750        finish = _thread.allocate_lock()
751        started.acquire()
752        finish.acquire()
753        def f():
754            started.release()
755            finish.acquire()
756        t = threading.Thread(target=f)
757        t.start()
758        started.acquire()
759        self.assertIn("started", repr(t))
760        finish.release()
761        # "stopped" should appear in the repr in a reasonable amount of time.
762        # Implementation detail:  as of this writing, that's trivially true
763        # if .join() is called, and almost trivially true if .is_alive() is
764        # called.  The detail we're testing here is that "stopped" shows up
765        # "all on its own".
766        LOOKING_FOR = "stopped"
767        for i in range(500):
768            if LOOKING_FOR in repr(t):
769                break
770            time.sleep(0.01)
771        self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
772        t.join()
773
774    def test_BoundedSemaphore_limit(self):
775        # BoundedSemaphore should raise ValueError if released too often.
776        for limit in range(1, 10):
777            bs = threading.BoundedSemaphore(limit)
778            threads = [threading.Thread(target=bs.acquire)
779                       for _ in range(limit)]
780            for t in threads:
781                t.start()
782            for t in threads:
783                t.join()
784            threads = [threading.Thread(target=bs.release)
785                       for _ in range(limit)]
786            for t in threads:
787                t.start()
788            for t in threads:
789                t.join()
790            self.assertRaises(ValueError, bs.release)
791
792    @cpython_only
793    def test_frame_tstate_tracing(self):
794        # Issue #14432: Crash when a generator is created in a C thread that is
795        # destroyed while the generator is still used. The issue was that a
796        # generator contains a frame, and the frame kept a reference to the
797        # Python state of the destroyed C thread. The crash occurs when a trace
798        # function is setup.
799
800        def noop_trace(frame, event, arg):
801            # no operation
802            return noop_trace
803
804        def generator():
805            while 1:
806                yield "generator"
807
808        def callback():
809            if callback.gen is None:
810                callback.gen = generator()
811            return next(callback.gen)
812        callback.gen = None
813
814        old_trace = sys.gettrace()
815        sys.settrace(noop_trace)
816        try:
817            # Install a trace function
818            threading.settrace(noop_trace)
819
820            # Create a generator in a C thread which exits after the call
821            import _testcapi
822            _testcapi.call_in_temporary_c_thread(callback)
823
824            # Call the generator in a different Python thread, check that the
825            # generator didn't keep a reference to the destroyed thread state
826            for test in range(3):
827                # The trace function is still called here
828                callback()
829        finally:
830            sys.settrace(old_trace)
831
832    def test_gettrace(self):
833        def noop_trace(frame, event, arg):
834            # no operation
835            return noop_trace
836        old_trace = threading.gettrace()
837        try:
838            threading.settrace(noop_trace)
839            trace_func = threading.gettrace()
840            self.assertEqual(noop_trace,trace_func)
841        finally:
842            threading.settrace(old_trace)
843
844    def test_getprofile(self):
845        def fn(*args): pass
846        old_profile = threading.getprofile()
847        try:
848            threading.setprofile(fn)
849            self.assertEqual(fn, threading.getprofile())
850        finally:
851            threading.setprofile(old_profile)
852
853    @cpython_only
854    def test_shutdown_locks(self):
855        for daemon in (False, True):
856            with self.subTest(daemon=daemon):
857                event = threading.Event()
858                thread = threading.Thread(target=event.wait, daemon=daemon)
859
860                # Thread.start() must add lock to _shutdown_locks,
861                # but only for non-daemon thread
862                thread.start()
863                tstate_lock = thread._tstate_lock
864                if not daemon:
865                    self.assertIn(tstate_lock, threading._shutdown_locks)
866                else:
867                    self.assertNotIn(tstate_lock, threading._shutdown_locks)
868
869                # unblock the thread and join it
870                event.set()
871                thread.join()
872
873                # Thread._stop() must remove tstate_lock from _shutdown_locks.
874                # Daemon threads must never add it to _shutdown_locks.
875                self.assertNotIn(tstate_lock, threading._shutdown_locks)
876
877    def test_locals_at_exit(self):
878        # bpo-19466: thread locals must not be deleted before destructors
879        # are called
880        rc, out, err = assert_python_ok("-c", """if 1:
881            import threading
882
883            class Atexit:
884                def __del__(self):
885                    print("thread_dict.atexit = %r" % thread_dict.atexit)
886
887            thread_dict = threading.local()
888            thread_dict.atexit = "value"
889
890            atexit = Atexit()
891        """)
892        self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
893
894    def test_boolean_target(self):
895        # bpo-41149: A thread that had a boolean value of False would not
896        # run, regardless of whether it was callable. The correct behaviour
897        # is for a thread to do nothing if its target is None, and to call
898        # the target otherwise.
899        class BooleanTarget(object):
900            def __init__(self):
901                self.ran = False
902            def __bool__(self):
903                return False
904            def __call__(self):
905                self.ran = True
906
907        target = BooleanTarget()
908        thread = threading.Thread(target=target)
909        thread.start()
910        thread.join()
911        self.assertTrue(target.ran)
912
913    def test_leak_without_join(self):
914        # bpo-37788: Test that a thread which is not joined explicitly
915        # does not leak. Test written for reference leak checks.
916        def noop(): pass
917        with threading_helper.wait_threads_exit():
918            threading.Thread(target=noop).start()
919            # Thread.join() is not called
920
921    @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)')
922    def test_debug_deprecation(self):
923        # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated
924        rc, out, err = assert_python_ok("-Wdefault", "-c", "pass",
925                                        PYTHONTHREADDEBUG="1")
926        msg = (b'DeprecationWarning: The threading debug '
927               b'(PYTHONTHREADDEBUG environment variable) '
928               b'is deprecated and will be removed in Python 3.12')
929        self.assertIn(msg, err)
930
931    def test_import_from_another_thread(self):
932        # bpo-1596321: If the threading module is first import from a thread
933        # different than the main thread, threading._shutdown() must handle
934        # this case without logging an error at Python exit.
935        code = textwrap.dedent('''
936            import _thread
937            import sys
938
939            event = _thread.allocate_lock()
940            event.acquire()
941
942            def import_threading():
943                import threading
944                event.release()
945
946            if 'threading' in sys.modules:
947                raise Exception('threading is already imported')
948
949            _thread.start_new_thread(import_threading, ())
950
951            # wait until the threading module is imported
952            event.acquire()
953            event.release()
954
955            if 'threading' not in sys.modules:
956                raise Exception('threading is not imported')
957
958            # don't wait until the thread completes
959        ''')
960        rc, out, err = assert_python_ok("-c", code)
961        self.assertEqual(out, b'')
962        self.assertEqual(err, b'')
963
964
965class ThreadJoinOnShutdown(BaseTestCase):
966
967    def _run_and_join(self, script):
968        script = """if 1:
969            import sys, os, time, threading
970
971            # a thread, which waits for the main program to terminate
972            def joiningfunc(mainthread):
973                mainthread.join()
974                print('end of thread')
975                # stdout is fully buffered because not a tty, we have to flush
976                # before exit.
977                sys.stdout.flush()
978        \n""" + script
979
980        rc, out, err = assert_python_ok("-c", script)
981        data = out.decode().replace('\r', '')
982        self.assertEqual(data, "end of main\nend of thread\n")
983
984    def test_1_join_on_shutdown(self):
985        # The usual case: on exit, wait for a non-daemon thread
986        script = """if 1:
987            import os
988            t = threading.Thread(target=joiningfunc,
989                                 args=(threading.current_thread(),))
990            t.start()
991            time.sleep(0.1)
992            print('end of main')
993            """
994        self._run_and_join(script)
995
996    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
997    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
998    def test_2_join_in_forked_process(self):
999        # Like the test above, but from a forked interpreter
1000        script = """if 1:
1001            from test import support
1002
1003            childpid = os.fork()
1004            if childpid != 0:
1005                # parent process
1006                support.wait_process(childpid, exitcode=0)
1007                sys.exit(0)
1008
1009            # child process
1010            t = threading.Thread(target=joiningfunc,
1011                                 args=(threading.current_thread(),))
1012            t.start()
1013            print('end of main')
1014            """
1015        self._run_and_join(script)
1016
1017    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
1018    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1019    def test_3_join_in_forked_from_thread(self):
1020        # Like the test above, but fork() was called from a worker thread
1021        # In the forked process, the main Thread object must be marked as stopped.
1022
1023        script = """if 1:
1024            from test import support
1025
1026            main_thread = threading.current_thread()
1027            def worker():
1028                childpid = os.fork()
1029                if childpid != 0:
1030                    # parent process
1031                    support.wait_process(childpid, exitcode=0)
1032                    sys.exit(0)
1033
1034                # child process
1035                t = threading.Thread(target=joiningfunc,
1036                                     args=(main_thread,))
1037                print('end of main')
1038                t.start()
1039                t.join() # Should not block: main_thread is already stopped
1040
1041            w = threading.Thread(target=worker)
1042            w.start()
1043            """
1044        self._run_and_join(script)
1045
1046    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1047    def test_4_daemon_threads(self):
1048        # Check that a daemon thread cannot crash the interpreter on shutdown
1049        # by manipulating internal structures that are being disposed of in
1050        # the main thread.
1051        script = """if True:
1052            import os
1053            import random
1054            import sys
1055            import time
1056            import threading
1057
1058            thread_has_run = set()
1059
1060            def random_io():
1061                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
1062                while True:
1063                    with open(os.__file__, 'rb') as in_f:
1064                        stuff = in_f.read(200)
1065                        with open(os.devnull, 'wb') as null_f:
1066                            null_f.write(stuff)
1067                            time.sleep(random.random() / 1995)
1068                    thread_has_run.add(threading.current_thread())
1069
1070            def main():
1071                count = 0
1072                for _ in range(40):
1073                    new_thread = threading.Thread(target=random_io)
1074                    new_thread.daemon = True
1075                    new_thread.start()
1076                    count += 1
1077                while len(thread_has_run) < count:
1078                    time.sleep(0.001)
1079                # Trigger process shutdown
1080                sys.exit(0)
1081
1082            main()
1083            """
1084        rc, out, err = assert_python_ok('-c', script)
1085        self.assertFalse(err)
1086
1087    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
1088    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1089    def test_reinit_tls_after_fork(self):
1090        # Issue #13817: fork() would deadlock in a multithreaded program with
1091        # the ad-hoc TLS implementation.
1092
1093        def do_fork_and_wait():
1094            # just fork a child process and wait it
1095            pid = os.fork()
1096            if pid > 0:
1097                support.wait_process(pid, exitcode=50)
1098            else:
1099                os._exit(50)
1100
1101        # start a bunch of threads that will fork() child processes
1102        threads = []
1103        for i in range(16):
1104            t = threading.Thread(target=do_fork_and_wait)
1105            threads.append(t)
1106            t.start()
1107
1108        for t in threads:
1109            t.join()
1110
1111    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
1112    def test_clear_threads_states_after_fork(self):
1113        # Issue #17094: check that threads states are cleared after fork()
1114
1115        # start a bunch of threads
1116        threads = []
1117        for i in range(16):
1118            t = threading.Thread(target=lambda : time.sleep(0.3))
1119            threads.append(t)
1120            t.start()
1121
1122        pid = os.fork()
1123        if pid == 0:
1124            # check that threads states have been cleared
1125            if len(sys._current_frames()) == 1:
1126                os._exit(51)
1127            else:
1128                os._exit(52)
1129        else:
1130            support.wait_process(pid, exitcode=51)
1131
1132        for t in threads:
1133            t.join()
1134
1135
1136class SubinterpThreadingTests(BaseTestCase):
1137    def pipe(self):
1138        r, w = os.pipe()
1139        self.addCleanup(os.close, r)
1140        self.addCleanup(os.close, w)
1141        if hasattr(os, 'set_blocking'):
1142            os.set_blocking(r, False)
1143        return (r, w)
1144
1145    def test_threads_join(self):
1146        # Non-daemon threads should be joined at subinterpreter shutdown
1147        # (issue #18808)
1148        r, w = self.pipe()
1149        code = textwrap.dedent(r"""
1150            import os
1151            import random
1152            import threading
1153            import time
1154
1155            def random_sleep():
1156                seconds = random.random() * 0.010
1157                time.sleep(seconds)
1158
1159            def f():
1160                # Sleep a bit so that the thread is still running when
1161                # Py_EndInterpreter is called.
1162                random_sleep()
1163                os.write(%d, b"x")
1164
1165            threading.Thread(target=f).start()
1166            random_sleep()
1167        """ % (w,))
1168        ret = test.support.run_in_subinterp(code)
1169        self.assertEqual(ret, 0)
1170        # The thread was joined properly.
1171        self.assertEqual(os.read(r, 1), b"x")
1172
1173    def test_threads_join_2(self):
1174        # Same as above, but a delay gets introduced after the thread's
1175        # Python code returned but before the thread state is deleted.
1176        # To achieve this, we register a thread-local object which sleeps
1177        # a bit when deallocated.
1178        r, w = self.pipe()
1179        code = textwrap.dedent(r"""
1180            import os
1181            import random
1182            import threading
1183            import time
1184
1185            def random_sleep():
1186                seconds = random.random() * 0.010
1187                time.sleep(seconds)
1188
1189            class Sleeper:
1190                def __del__(self):
1191                    random_sleep()
1192
1193            tls = threading.local()
1194
1195            def f():
1196                # Sleep a bit so that the thread is still running when
1197                # Py_EndInterpreter is called.
1198                random_sleep()
1199                tls.x = Sleeper()
1200                os.write(%d, b"x")
1201
1202            threading.Thread(target=f).start()
1203            random_sleep()
1204        """ % (w,))
1205        ret = test.support.run_in_subinterp(code)
1206        self.assertEqual(ret, 0)
1207        # The thread was joined properly.
1208        self.assertEqual(os.read(r, 1), b"x")
1209
1210    @cpython_only
1211    def test_daemon_threads_fatal_error(self):
1212        subinterp_code = f"""if 1:
1213            import os
1214            import threading
1215            import time
1216
1217            def f():
1218                # Make sure the daemon thread is still running when
1219                # Py_EndInterpreter is called.
1220                time.sleep({test.support.SHORT_TIMEOUT})
1221            threading.Thread(target=f, daemon=True).start()
1222            """
1223        script = r"""if 1:
1224            import _testcapi
1225
1226            _testcapi.run_in_subinterp(%r)
1227            """ % (subinterp_code,)
1228        with test.support.SuppressCrashReport():
1229            rc, out, err = assert_python_failure("-c", script)
1230        self.assertIn("Fatal Python error: Py_EndInterpreter: "
1231                      "not the last thread", err.decode())
1232
1233
1234class ThreadingExceptionTests(BaseTestCase):
1235    # A RuntimeError should be raised if Thread.start() is called
1236    # multiple times.
1237    def test_start_thread_again(self):
1238        thread = threading.Thread()
1239        thread.start()
1240        self.assertRaises(RuntimeError, thread.start)
1241        thread.join()
1242
1243    def test_joining_current_thread(self):
1244        current_thread = threading.current_thread()
1245        self.assertRaises(RuntimeError, current_thread.join);
1246
1247    def test_joining_inactive_thread(self):
1248        thread = threading.Thread()
1249        self.assertRaises(RuntimeError, thread.join)
1250
1251    def test_daemonize_active_thread(self):
1252        thread = threading.Thread()
1253        thread.start()
1254        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
1255        thread.join()
1256
1257    def test_releasing_unacquired_lock(self):
1258        lock = threading.Lock()
1259        self.assertRaises(RuntimeError, lock.release)
1260
1261    def test_recursion_limit(self):
1262        # Issue 9670
1263        # test that excessive recursion within a non-main thread causes
1264        # an exception rather than crashing the interpreter on platforms
1265        # like Mac OS X or FreeBSD which have small default stack sizes
1266        # for threads
1267        script = """if True:
1268            import threading
1269
1270            def recurse():
1271                return recurse()
1272
1273            def outer():
1274                try:
1275                    recurse()
1276                except RecursionError:
1277                    pass
1278
1279            w = threading.Thread(target=outer)
1280            w.start()
1281            w.join()
1282            print('end of main thread')
1283            """
1284        expected_output = "end of main thread\n"
1285        p = subprocess.Popen([sys.executable, "-c", script],
1286                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1287        stdout, stderr = p.communicate()
1288        data = stdout.decode().replace('\r', '')
1289        self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
1290        self.assertEqual(data, expected_output)
1291
1292    def test_print_exception(self):
1293        script = r"""if True:
1294            import threading
1295            import time
1296
1297            running = False
1298            def run():
1299                global running
1300                running = True
1301                while running:
1302                    time.sleep(0.01)
1303                1/0
1304            t = threading.Thread(target=run)
1305            t.start()
1306            while not running:
1307                time.sleep(0.01)
1308            running = False
1309            t.join()
1310            """
1311        rc, out, err = assert_python_ok("-c", script)
1312        self.assertEqual(out, b'')
1313        err = err.decode()
1314        self.assertIn("Exception in thread", err)
1315        self.assertIn("Traceback (most recent call last):", err)
1316        self.assertIn("ZeroDivisionError", err)
1317        self.assertNotIn("Unhandled exception", err)
1318
1319    def test_print_exception_stderr_is_none_1(self):
1320        script = r"""if True:
1321            import sys
1322            import threading
1323            import time
1324
1325            running = False
1326            def run():
1327                global running
1328                running = True
1329                while running:
1330                    time.sleep(0.01)
1331                1/0
1332            t = threading.Thread(target=run)
1333            t.start()
1334            while not running:
1335                time.sleep(0.01)
1336            sys.stderr = None
1337            running = False
1338            t.join()
1339            """
1340        rc, out, err = assert_python_ok("-c", script)
1341        self.assertEqual(out, b'')
1342        err = err.decode()
1343        self.assertIn("Exception in thread", err)
1344        self.assertIn("Traceback (most recent call last):", err)
1345        self.assertIn("ZeroDivisionError", err)
1346        self.assertNotIn("Unhandled exception", err)
1347
1348    def test_print_exception_stderr_is_none_2(self):
1349        script = r"""if True:
1350            import sys
1351            import threading
1352            import time
1353
1354            running = False
1355            def run():
1356                global running
1357                running = True
1358                while running:
1359                    time.sleep(0.01)
1360                1/0
1361            sys.stderr = None
1362            t = threading.Thread(target=run)
1363            t.start()
1364            while not running:
1365                time.sleep(0.01)
1366            running = False
1367            t.join()
1368            """
1369        rc, out, err = assert_python_ok("-c", script)
1370        self.assertEqual(out, b'')
1371        self.assertNotIn("Unhandled exception", err.decode())
1372
1373    def test_bare_raise_in_brand_new_thread(self):
1374        def bare_raise():
1375            raise
1376
1377        class Issue27558(threading.Thread):
1378            exc = None
1379
1380            def run(self):
1381                try:
1382                    bare_raise()
1383                except Exception as exc:
1384                    self.exc = exc
1385
1386        thread = Issue27558()
1387        thread.start()
1388        thread.join()
1389        self.assertIsNotNone(thread.exc)
1390        self.assertIsInstance(thread.exc, RuntimeError)
1391        # explicitly break the reference cycle to not leak a dangling thread
1392        thread.exc = None
1393
1394    def test_multithread_modify_file_noerror(self):
1395        # See issue25872
1396        def modify_file():
1397            with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
1398                fp.write(' ')
1399                traceback.format_stack()
1400
1401        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1402        threads = [
1403            threading.Thread(target=modify_file)
1404            for i in range(100)
1405        ]
1406        for t in threads:
1407            t.start()
1408            t.join()
1409
1410
1411class ThreadRunFail(threading.Thread):
1412    def run(self):
1413        raise ValueError("run failed")
1414
1415
1416class ExceptHookTests(BaseTestCase):
1417    def setUp(self):
1418        restore_default_excepthook(self)
1419        super().setUp()
1420
1421    def test_excepthook(self):
1422        with support.captured_output("stderr") as stderr:
1423            thread = ThreadRunFail(name="excepthook thread")
1424            thread.start()
1425            thread.join()
1426
1427        stderr = stderr.getvalue().strip()
1428        self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
1429        self.assertIn('Traceback (most recent call last):\n', stderr)
1430        self.assertIn('  raise ValueError("run failed")', stderr)
1431        self.assertIn('ValueError: run failed', stderr)
1432
1433    @support.cpython_only
1434    def test_excepthook_thread_None(self):
1435        # threading.excepthook called with thread=None: log the thread
1436        # identifier in this case.
1437        with support.captured_output("stderr") as stderr:
1438            try:
1439                raise ValueError("bug")
1440            except Exception as exc:
1441                args = threading.ExceptHookArgs([*sys.exc_info(), None])
1442                try:
1443                    threading.excepthook(args)
1444                finally:
1445                    # Explicitly break a reference cycle
1446                    args = None
1447
1448        stderr = stderr.getvalue().strip()
1449        self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
1450        self.assertIn('Traceback (most recent call last):\n', stderr)
1451        self.assertIn('  raise ValueError("bug")', stderr)
1452        self.assertIn('ValueError: bug', stderr)
1453
1454    def test_system_exit(self):
1455        class ThreadExit(threading.Thread):
1456            def run(self):
1457                sys.exit(1)
1458
1459        # threading.excepthook() silently ignores SystemExit
1460        with support.captured_output("stderr") as stderr:
1461            thread = ThreadExit()
1462            thread.start()
1463            thread.join()
1464
1465        self.assertEqual(stderr.getvalue(), '')
1466
1467    def test_custom_excepthook(self):
1468        args = None
1469
1470        def hook(hook_args):
1471            nonlocal args
1472            args = hook_args
1473
1474        try:
1475            with support.swap_attr(threading, 'excepthook', hook):
1476                thread = ThreadRunFail()
1477                thread.start()
1478                thread.join()
1479
1480            self.assertEqual(args.exc_type, ValueError)
1481            self.assertEqual(str(args.exc_value), 'run failed')
1482            self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
1483            self.assertIs(args.thread, thread)
1484        finally:
1485            # Break reference cycle
1486            args = None
1487
1488    def test_custom_excepthook_fail(self):
1489        def threading_hook(args):
1490            raise ValueError("threading_hook failed")
1491
1492        err_str = None
1493
1494        def sys_hook(exc_type, exc_value, exc_traceback):
1495            nonlocal err_str
1496            err_str = str(exc_value)
1497
1498        with support.swap_attr(threading, 'excepthook', threading_hook), \
1499             support.swap_attr(sys, 'excepthook', sys_hook), \
1500             support.captured_output('stderr') as stderr:
1501            thread = ThreadRunFail()
1502            thread.start()
1503            thread.join()
1504
1505        self.assertEqual(stderr.getvalue(),
1506                         'Exception in threading.excepthook:\n')
1507        self.assertEqual(err_str, 'threading_hook failed')
1508
1509    def test_original_excepthook(self):
1510        def run_thread():
1511            with support.captured_output("stderr") as output:
1512                thread = ThreadRunFail(name="excepthook thread")
1513                thread.start()
1514                thread.join()
1515            return output.getvalue()
1516
1517        def threading_hook(args):
1518            print("Running a thread failed", file=sys.stderr)
1519
1520        default_output = run_thread()
1521        with support.swap_attr(threading, 'excepthook', threading_hook):
1522            custom_hook_output = run_thread()
1523            threading.excepthook = threading.__excepthook__
1524            recovered_output = run_thread()
1525
1526        self.assertEqual(default_output, recovered_output)
1527        self.assertNotEqual(default_output, custom_hook_output)
1528        self.assertEqual(custom_hook_output, "Running a thread failed\n")
1529
1530
1531class TimerTests(BaseTestCase):
1532
1533    def setUp(self):
1534        BaseTestCase.setUp(self)
1535        self.callback_args = []
1536        self.callback_event = threading.Event()
1537
1538    def test_init_immutable_default_args(self):
1539        # Issue 17435: constructor defaults were mutable objects, they could be
1540        # mutated via the object attributes and affect other Timer objects.
1541        timer1 = threading.Timer(0.01, self._callback_spy)
1542        timer1.start()
1543        self.callback_event.wait()
1544        timer1.args.append("blah")
1545        timer1.kwargs["foo"] = "bar"
1546        self.callback_event.clear()
1547        timer2 = threading.Timer(0.01, self._callback_spy)
1548        timer2.start()
1549        self.callback_event.wait()
1550        self.assertEqual(len(self.callback_args), 2)
1551        self.assertEqual(self.callback_args, [((), {}), ((), {})])
1552        timer1.join()
1553        timer2.join()
1554
1555    def _callback_spy(self, *args, **kwargs):
1556        self.callback_args.append((args[:], kwargs.copy()))
1557        self.callback_event.set()
1558
1559class LockTests(lock_tests.LockTests):
1560    locktype = staticmethod(threading.Lock)
1561
1562class PyRLockTests(lock_tests.RLockTests):
1563    locktype = staticmethod(threading._PyRLock)
1564
1565@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1566class CRLockTests(lock_tests.RLockTests):
1567    locktype = staticmethod(threading._CRLock)
1568
1569class EventTests(lock_tests.EventTests):
1570    eventtype = staticmethod(threading.Event)
1571
1572class ConditionAsRLockTests(lock_tests.RLockTests):
1573    # Condition uses an RLock by default and exports its API.
1574    locktype = staticmethod(threading.Condition)
1575
1576class ConditionTests(lock_tests.ConditionTests):
1577    condtype = staticmethod(threading.Condition)
1578
1579class SemaphoreTests(lock_tests.SemaphoreTests):
1580    semtype = staticmethod(threading.Semaphore)
1581
1582class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
1583    semtype = staticmethod(threading.BoundedSemaphore)
1584
1585class BarrierTests(lock_tests.BarrierTests):
1586    barriertype = staticmethod(threading.Barrier)
1587
1588
1589class MiscTestCase(unittest.TestCase):
1590    def test__all__(self):
1591        restore_default_excepthook(self)
1592
1593        extra = {"ThreadError"}
1594        not_exported = {'currentThread', 'activeCount'}
1595        support.check__all__(self, threading, ('threading', '_thread'),
1596                             extra=extra, not_exported=not_exported)
1597
1598
1599class InterruptMainTests(unittest.TestCase):
1600    def check_interrupt_main_with_signal_handler(self, signum):
1601        def handler(signum, frame):
1602            1/0
1603
1604        old_handler = signal.signal(signum, handler)
1605        self.addCleanup(signal.signal, signum, old_handler)
1606
1607        with self.assertRaises(ZeroDivisionError):
1608            _thread.interrupt_main()
1609
1610    def check_interrupt_main_noerror(self, signum):
1611        handler = signal.getsignal(signum)
1612        try:
1613            # No exception should arise.
1614            signal.signal(signum, signal.SIG_IGN)
1615            _thread.interrupt_main(signum)
1616
1617            signal.signal(signum, signal.SIG_DFL)
1618            _thread.interrupt_main(signum)
1619        finally:
1620            # Restore original handler
1621            signal.signal(signum, handler)
1622
1623    def test_interrupt_main_subthread(self):
1624        # Calling start_new_thread with a function that executes interrupt_main
1625        # should raise KeyboardInterrupt upon completion.
1626        def call_interrupt():
1627            _thread.interrupt_main()
1628        t = threading.Thread(target=call_interrupt)
1629        with self.assertRaises(KeyboardInterrupt):
1630            t.start()
1631            t.join()
1632        t.join()
1633
1634    def test_interrupt_main_mainthread(self):
1635        # Make sure that if interrupt_main is called in main thread that
1636        # KeyboardInterrupt is raised instantly.
1637        with self.assertRaises(KeyboardInterrupt):
1638            _thread.interrupt_main()
1639
1640    def test_interrupt_main_with_signal_handler(self):
1641        self.check_interrupt_main_with_signal_handler(signal.SIGINT)
1642        self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
1643
1644    def test_interrupt_main_noerror(self):
1645        self.check_interrupt_main_noerror(signal.SIGINT)
1646        self.check_interrupt_main_noerror(signal.SIGTERM)
1647
1648    def test_interrupt_main_invalid_signal(self):
1649        self.assertRaises(ValueError, _thread.interrupt_main, -1)
1650        self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
1651        self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
1652
1653    @threading_helper.reap_threads
1654    def test_can_interrupt_tight_loops(self):
1655        cont = [True]
1656        started = [False]
1657        interrupted = [False]
1658
1659        def worker(started, cont, interrupted):
1660            iterations = 100_000_000
1661            started[0] = True
1662            while cont[0]:
1663                if iterations:
1664                    iterations -= 1
1665                else:
1666                    return
1667                pass
1668            interrupted[0] = True
1669
1670        t = threading.Thread(target=worker,args=(started, cont, interrupted))
1671        t.start()
1672        while not started[0]:
1673            pass
1674        cont[0] = False
1675        t.join()
1676        self.assertTrue(interrupted[0])
1677
1678
1679class AtexitTests(unittest.TestCase):
1680
1681    def test_atexit_output(self):
1682        rc, out, err = assert_python_ok("-c", """if True:
1683            import threading
1684
1685            def run_last():
1686                print('parrot')
1687
1688            threading._register_atexit(run_last)
1689        """)
1690
1691        self.assertFalse(err)
1692        self.assertEqual(out.strip(), b'parrot')
1693
1694    def test_atexit_called_once(self):
1695        rc, out, err = assert_python_ok("-c", """if True:
1696            import threading
1697            from unittest.mock import Mock
1698
1699            mock = Mock()
1700            threading._register_atexit(mock)
1701            mock.assert_not_called()
1702            # force early shutdown to ensure it was called once
1703            threading._shutdown()
1704            mock.assert_called_once()
1705        """)
1706
1707        self.assertFalse(err)
1708
1709    def test_atexit_after_shutdown(self):
1710        # The only way to do this is by registering an atexit within
1711        # an atexit, which is intended to raise an exception.
1712        rc, out, err = assert_python_ok("-c", """if True:
1713            import threading
1714
1715            def func():
1716                pass
1717
1718            def run_last():
1719                threading._register_atexit(func)
1720
1721            threading._register_atexit(run_last)
1722        """)
1723
1724        self.assertTrue(err)
1725        self.assertIn("RuntimeError: can't register atexit after shutdown",
1726                err.decode())
1727
1728
1729if __name__ == "__main__":
1730    unittest.main()
1731