• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Tests for the threading module.
3"""
4
5import test.support
6from test.support import threading_helper, requires_subprocess, requires_gil_enabled
7from test.support import verbose, cpython_only, os_helper
8from test.support.import_helper import import_module
9from test.support.script_helper import assert_python_ok, assert_python_failure
10from test.support import force_not_colorized
11
12import random
13import sys
14import _thread
15import threading
16import time
17import unittest
18import weakref
19import os
20import subprocess
21import signal
22import textwrap
23import traceback
24import warnings
25
26from unittest import mock
27from test import lock_tests
28from test import support
29
30try:
31    from test.support import interpreters
32except ImportError:
33    interpreters = None
34
35threading_helper.requires_working_threading(module=True)
36
37# Between fork() and exec(), only async-safe functions are allowed (issues
38# #12316 and #11870), and fork() from a worker thread is known to trigger
39# problems with some operating systems (issue #3863): skip problematic tests
40# on platforms known to behave badly.
41platforms_to_skip = ('netbsd5', 'hp-ux11')
42
43
44def skip_unless_reliable_fork(test):
45    if not support.has_fork_support:
46        return unittest.skip("requires working os.fork()")(test)
47    if sys.platform in platforms_to_skip:
48        return unittest.skip("due to known OS bug related to thread+fork")(test)
49    if support.HAVE_ASAN_FORK_BUG:
50        return unittest.skip("libasan has a pthread_create() dead lock related to thread+fork")(test)
51    if support.check_sanitizer(thread=True):
52        return unittest.skip("TSAN doesn't support threads after fork")(test)
53    return test
54
55
56def requires_subinterpreters(meth):
57    """Decorator to skip a test if subinterpreters are not supported."""
58    return unittest.skipIf(interpreters is None,
59                           'subinterpreters required')(meth)
60
61
62def restore_default_excepthook(testcase):
63    testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook)
64    threading.excepthook = threading.__excepthook__
65
66
67# A trivial mutable counter.
68class Counter(object):
69    def __init__(self):
70        self.value = 0
71    def inc(self):
72        self.value += 1
73    def dec(self):
74        self.value -= 1
75    def get(self):
76        return self.value
77
78class TestThread(threading.Thread):
79    def __init__(self, name, testcase, sema, mutex, nrunning):
80        threading.Thread.__init__(self, name=name)
81        self.testcase = testcase
82        self.sema = sema
83        self.mutex = mutex
84        self.nrunning = nrunning
85
86    def run(self):
87        delay = random.random() / 10000.0
88        if verbose:
89            print('task %s will run for %.1f usec' %
90                  (self.name, delay * 1e6))
91
92        with self.sema:
93            with self.mutex:
94                self.nrunning.inc()
95                if verbose:
96                    print(self.nrunning.get(), 'tasks are running')
97                self.testcase.assertLessEqual(self.nrunning.get(), 3)
98
99            time.sleep(delay)
100            if verbose:
101                print('task', self.name, 'done')
102
103            with self.mutex:
104                self.nrunning.dec()
105                self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
106                if verbose:
107                    print('%s is finished. %d tasks are running' %
108                          (self.name, self.nrunning.get()))
109
110
111class BaseTestCase(unittest.TestCase):
112    def setUp(self):
113        self._threads = threading_helper.threading_setup()
114
115    def tearDown(self):
116        threading_helper.threading_cleanup(*self._threads)
117        test.support.reap_children()
118
119
120class ThreadTests(BaseTestCase):
121    maxDiff = 9999
122
123    @cpython_only
124    def test_name(self):
125        def func(): pass
126
127        thread = threading.Thread(name="myname1")
128        self.assertEqual(thread.name, "myname1")
129
130        # Convert int name to str
131        thread = threading.Thread(name=123)
132        self.assertEqual(thread.name, "123")
133
134        # target name is ignored if name is specified
135        thread = threading.Thread(target=func, name="myname2")
136        self.assertEqual(thread.name, "myname2")
137
138        with mock.patch.object(threading, '_counter', return_value=2):
139            thread = threading.Thread(name="")
140            self.assertEqual(thread.name, "Thread-2")
141
142        with mock.patch.object(threading, '_counter', return_value=3):
143            thread = threading.Thread()
144            self.assertEqual(thread.name, "Thread-3")
145
146        with mock.patch.object(threading, '_counter', return_value=5):
147            thread = threading.Thread(target=func)
148            self.assertEqual(thread.name, "Thread-5 (func)")
149
150    def test_args_argument(self):
151        # bpo-45735: Using list or tuple as *args* in constructor could
152        # achieve the same effect.
153        num_list = [1]
154        num_tuple = (1,)
155
156        str_list = ["str"]
157        str_tuple = ("str",)
158
159        list_in_tuple = ([1],)
160        tuple_in_list = [(1,)]
161
162        test_cases = (
163            (num_list, lambda arg: self.assertEqual(arg, 1)),
164            (num_tuple, lambda arg: self.assertEqual(arg, 1)),
165            (str_list, lambda arg: self.assertEqual(arg, "str")),
166            (str_tuple, lambda arg: self.assertEqual(arg, "str")),
167            (list_in_tuple, lambda arg: self.assertEqual(arg, [1])),
168            (tuple_in_list, lambda arg: self.assertEqual(arg, (1,)))
169        )
170
171        for args, target in test_cases:
172            with self.subTest(target=target, args=args):
173                t = threading.Thread(target=target, args=args)
174                t.start()
175                t.join()
176
177    def test_lock_no_args(self):
178        threading.Lock()  # works
179        self.assertRaises(TypeError, threading.Lock, 1)
180        self.assertRaises(TypeError, threading.Lock, a=1)
181        self.assertRaises(TypeError, threading.Lock, 1, 2, a=1, b=2)
182
183    def test_lock_no_subclass(self):
184        # Intentionally disallow subclasses of threading.Lock because they have
185        # never been allowed, so why start now just because the type is public?
186        with self.assertRaises(TypeError):
187            class MyLock(threading.Lock): pass
188
189    def test_lock_or_none(self):
190        import types
191        self.assertIsInstance(threading.Lock | None, types.UnionType)
192
193    # Create a bunch of threads, let each do some work, wait until all are
194    # done.
195    def test_various_ops(self):
196        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
197        # times about 1 second per clump).
198        NUMTASKS = 10
199
200        # no more than 3 of the 10 can run at once
201        sema = threading.BoundedSemaphore(value=3)
202        mutex = threading.RLock()
203        numrunning = Counter()
204
205        threads = []
206
207        for i in range(NUMTASKS):
208            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
209            threads.append(t)
210            self.assertIsNone(t.ident)
211            self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
212            t.start()
213
214        if hasattr(threading, 'get_native_id'):
215            native_ids = set(t.native_id for t in threads) | {threading.get_native_id()}
216            self.assertNotIn(None, native_ids)
217            self.assertEqual(len(native_ids), NUMTASKS + 1)
218
219        if verbose:
220            print('waiting for all tasks to complete')
221        for t in threads:
222            t.join()
223            self.assertFalse(t.is_alive())
224            self.assertNotEqual(t.ident, 0)
225            self.assertIsNotNone(t.ident)
226            self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
227        if verbose:
228            print('all tasks done')
229        self.assertEqual(numrunning.get(), 0)
230
231    def test_ident_of_no_threading_threads(self):
232        # The ident still must work for the main thread and dummy threads.
233        self.assertIsNotNone(threading.current_thread().ident)
234        def f():
235            ident.append(threading.current_thread().ident)
236            done.set()
237        done = threading.Event()
238        ident = []
239        with threading_helper.wait_threads_exit():
240            tid = _thread.start_new_thread(f, ())
241            done.wait()
242            self.assertEqual(ident[0], tid)
243
244    # run with a small(ish) thread stack size (256 KiB)
245    def test_various_ops_small_stack(self):
246        if verbose:
247            print('with 256 KiB thread stack size...')
248        try:
249            threading.stack_size(262144)
250        except _thread.error:
251            raise unittest.SkipTest(
252                'platform does not support changing thread stack size')
253        self.test_various_ops()
254        threading.stack_size(0)
255
256    # run with a large thread stack size (1 MiB)
257    def test_various_ops_large_stack(self):
258        if verbose:
259            print('with 1 MiB thread stack size...')
260        try:
261            threading.stack_size(0x100000)
262        except _thread.error:
263            raise unittest.SkipTest(
264                'platform does not support changing thread stack size')
265        self.test_various_ops()
266        threading.stack_size(0)
267
268    def test_foreign_thread(self):
269        # Check that a "foreign" thread can use the threading module.
270        dummy_thread = None
271        error = None
272        def f(mutex):
273            try:
274                nonlocal dummy_thread
275                nonlocal error
276                # Calling current_thread() forces an entry for the foreign
277                # thread to get made in the threading._active map.
278                dummy_thread = threading.current_thread()
279                tid = dummy_thread.ident
280                self.assertIn(tid, threading._active)
281                self.assertIsInstance(dummy_thread, threading._DummyThread)
282                self.assertIs(threading._active.get(tid), dummy_thread)
283                # gh-29376
284                self.assertTrue(
285                    dummy_thread.is_alive(),
286                    'Expected _DummyThread to be considered alive.'
287                )
288                self.assertIn('_DummyThread', repr(dummy_thread))
289            except BaseException as e:
290                error = e
291            finally:
292                mutex.release()
293
294        mutex = threading.Lock()
295        mutex.acquire()
296        with threading_helper.wait_threads_exit():
297            tid = _thread.start_new_thread(f, (mutex,))
298            # Wait for the thread to finish.
299            mutex.acquire()
300        if error is not None:
301            raise error
302        self.assertEqual(tid, dummy_thread.ident)
303        # Issue gh-106236:
304        with self.assertRaises(RuntimeError):
305            dummy_thread.join()
306        dummy_thread._started.clear()
307        with self.assertRaises(RuntimeError):
308            dummy_thread.is_alive()
309        # Busy wait for the following condition: after the thread dies, the
310        # related dummy thread must be removed from threading._active.
311        timeout = 5
312        timeout_at = time.monotonic() + timeout
313        while time.monotonic() < timeout_at:
314            if threading._active.get(dummy_thread.ident) is not dummy_thread:
315                break
316            time.sleep(.1)
317        else:
318            self.fail('It was expected that the created threading._DummyThread was removed from threading._active.')
319
320    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
321    # exposed at the Python level.  This test relies on ctypes to get at it.
322    def test_PyThreadState_SetAsyncExc(self):
323        ctypes = import_module("ctypes")
324
325        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
326        set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
327
328        class AsyncExc(Exception):
329            pass
330
331        exception = ctypes.py_object(AsyncExc)
332
333        # First check it works when setting the exception from the same thread.
334        tid = threading.get_ident()
335        self.assertIsInstance(tid, int)
336        self.assertGreater(tid, 0)
337
338        try:
339            result = set_async_exc(tid, exception)
340            # The exception is async, so we might have to keep the VM busy until
341            # it notices.
342            while True:
343                pass
344        except AsyncExc:
345            pass
346        else:
347            # This code is unreachable but it reflects the intent. If we wanted
348            # to be smarter the above loop wouldn't be infinite.
349            self.fail("AsyncExc not raised")
350        try:
351            self.assertEqual(result, 1) # one thread state modified
352        except UnboundLocalError:
353            # The exception was raised too quickly for us to get the result.
354            pass
355
356        # `worker_started` is set by the thread when it's inside a try/except
357        # block waiting to catch the asynchronously set AsyncExc exception.
358        # `worker_saw_exception` is set by the thread upon catching that
359        # exception.
360        worker_started = threading.Event()
361        worker_saw_exception = threading.Event()
362
363        class Worker(threading.Thread):
364            def run(self):
365                self.id = threading.get_ident()
366                self.finished = False
367
368                try:
369                    while True:
370                        worker_started.set()
371                        time.sleep(0.1)
372                except AsyncExc:
373                    self.finished = True
374                    worker_saw_exception.set()
375
376        t = Worker()
377        t.daemon = True # so if this fails, we don't hang Python at shutdown
378        t.start()
379        if verbose:
380            print("    started worker thread")
381
382        # Try a thread id that doesn't make sense.
383        if verbose:
384            print("    trying nonsensical thread id")
385        result = set_async_exc(-1, exception)
386        self.assertEqual(result, 0)  # no thread states modified
387
388        # Now raise an exception in the worker thread.
389        if verbose:
390            print("    waiting for worker thread to get started")
391        ret = worker_started.wait()
392        self.assertTrue(ret)
393        if verbose:
394            print("    verifying worker hasn't exited")
395        self.assertFalse(t.finished)
396        if verbose:
397            print("    attempting to raise asynch exception in worker")
398        result = set_async_exc(t.id, exception)
399        self.assertEqual(result, 1) # one thread state modified
400        if verbose:
401            print("    waiting for worker to say it caught the exception")
402        worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT)
403        self.assertTrue(t.finished)
404        if verbose:
405            print("    all OK -- joining worker")
406        if t.finished:
407            t.join()
408        # else the thread is still running, and we have no way to kill it
409
410    def test_limbo_cleanup(self):
411        # Issue 7481: Failure to start thread should cleanup the limbo map.
412        def fail_new_thread(*args, **kwargs):
413            raise threading.ThreadError()
414        _start_joinable_thread = threading._start_joinable_thread
415        threading._start_joinable_thread = fail_new_thread
416        try:
417            t = threading.Thread(target=lambda: None)
418            self.assertRaises(threading.ThreadError, t.start)
419            self.assertFalse(
420                t in threading._limbo,
421                "Failed to cleanup _limbo map on failure of Thread.start().")
422        finally:
423            threading._start_joinable_thread = _start_joinable_thread
424
425    def test_finalize_running_thread(self):
426        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
427        # very late on python exit: on deallocation of a running thread for
428        # example.
429        if support.check_sanitizer(thread=True):
430            # the thread running `time.sleep(100)` below will still be alive
431            # at process exit
432            self.skipTest("TSAN would report thread leak")
433        import_module("ctypes")
434
435        rc, out, err = assert_python_failure("-c", """if 1:
436            import ctypes, sys, time, _thread
437
438            # This lock is used as a simple event variable.
439            ready = _thread.allocate_lock()
440            ready.acquire()
441
442            # Module globals are cleared before __del__ is run
443            # So we save the functions in class dict
444            class C:
445                ensure = ctypes.pythonapi.PyGILState_Ensure
446                release = ctypes.pythonapi.PyGILState_Release
447                def __del__(self):
448                    state = self.ensure()
449                    self.release(state)
450
451            def waitingThread():
452                x = C()
453                ready.release()
454                time.sleep(100)
455
456            _thread.start_new_thread(waitingThread, ())
457            ready.acquire()  # Be sure the other thread is waiting.
458            sys.exit(42)
459            """)
460        self.assertEqual(rc, 42)
461
462    def test_finalize_with_trace(self):
463        # Issue1733757
464        # Avoid a deadlock when sys.settrace steps into threading._shutdown
465        if support.check_sanitizer(thread=True):
466            # the thread running `time.sleep(2)` below will still be alive
467            # at process exit
468            self.skipTest("TSAN would report thread leak")
469
470        assert_python_ok("-c", """if 1:
471            import sys, threading
472
473            # A deadlock-killer, to prevent the
474            # testsuite to hang forever
475            def killer():
476                import os, time
477                time.sleep(2)
478                print('program blocked; aborting')
479                os._exit(2)
480            t = threading.Thread(target=killer)
481            t.daemon = True
482            t.start()
483
484            # This is the trace function
485            def func(frame, event, arg):
486                threading.current_thread()
487                return func
488
489            sys.settrace(func)
490            """)
491
492    def test_join_nondaemon_on_shutdown(self):
493        # Issue 1722344
494        # Raising SystemExit skipped threading._shutdown
495        rc, out, err = assert_python_ok("-c", """if 1:
496                import threading
497                from time import sleep
498
499                def child():
500                    sleep(1)
501                    # As a non-daemon thread we SHOULD wake up and nothing
502                    # should be torn down yet
503                    print("Woke up, sleep function is:", sleep)
504
505                threading.Thread(target=child).start()
506                raise SystemExit
507            """)
508        self.assertEqual(out.strip(),
509            b"Woke up, sleep function is: <built-in function sleep>")
510        self.assertEqual(err, b"")
511
512    def test_enumerate_after_join(self):
513        # Try hard to trigger #1703448: a thread is still returned in
514        # threading.enumerate() after it has been join()ed.
515        enum = threading.enumerate
516        old_interval = sys.getswitchinterval()
517        try:
518            for i in range(1, 100):
519                support.setswitchinterval(i * 0.0002)
520                t = threading.Thread(target=lambda: None)
521                t.start()
522                t.join()
523                l = enum()
524                self.assertNotIn(t, l,
525                    "#1703448 triggered after %d trials: %s" % (i, l))
526        finally:
527            sys.setswitchinterval(old_interval)
528
529    def test_join_from_multiple_threads(self):
530        # Thread.join() should be thread-safe
531        errors = []
532
533        def worker():
534            time.sleep(0.005)
535
536        def joiner(thread):
537            try:
538                thread.join()
539            except Exception as e:
540                errors.append(e)
541
542        for N in range(2, 20):
543            threads = [threading.Thread(target=worker)]
544            for i in range(N):
545                threads.append(threading.Thread(target=joiner,
546                                                args=(threads[0],)))
547            for t in threads:
548                t.start()
549            time.sleep(0.01)
550            for t in threads:
551                t.join()
552            if errors:
553                raise errors[0]
554
555    def test_join_with_timeout(self):
556        lock = _thread.allocate_lock()
557        lock.acquire()
558
559        def worker():
560            lock.acquire()
561
562        thread = threading.Thread(target=worker)
563        thread.start()
564        thread.join(timeout=0.01)
565        assert thread.is_alive()
566        lock.release()
567        thread.join()
568        assert not thread.is_alive()
569
570    def test_no_refcycle_through_target(self):
571        class RunSelfFunction(object):
572            def __init__(self, should_raise):
573                # The links in this refcycle from Thread back to self
574                # should be cleaned up when the thread completes.
575                self.should_raise = should_raise
576                self.thread = threading.Thread(target=self._run,
577                                               args=(self,),
578                                               kwargs={'yet_another':self})
579                self.thread.start()
580
581            def _run(self, other_ref, yet_another):
582                if self.should_raise:
583                    raise SystemExit
584
585        restore_default_excepthook(self)
586
587        cyclic_object = RunSelfFunction(should_raise=False)
588        weak_cyclic_object = weakref.ref(cyclic_object)
589        cyclic_object.thread.join()
590        del cyclic_object
591        self.assertIsNone(weak_cyclic_object(),
592                         msg=('%d references still around' %
593                              sys.getrefcount(weak_cyclic_object())))
594
595        raising_cyclic_object = RunSelfFunction(should_raise=True)
596        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
597        raising_cyclic_object.thread.join()
598        del raising_cyclic_object
599        self.assertIsNone(weak_raising_cyclic_object(),
600                         msg=('%d references still around' %
601                              sys.getrefcount(weak_raising_cyclic_object())))
602
603    def test_old_threading_api(self):
604        # Just a quick sanity check to make sure the old method names are
605        # still present
606        t = threading.Thread()
607        with self.assertWarnsRegex(DeprecationWarning,
608                                   r'get the daemon attribute'):
609            t.isDaemon()
610        with self.assertWarnsRegex(DeprecationWarning,
611                                   r'set the daemon attribute'):
612            t.setDaemon(True)
613        with self.assertWarnsRegex(DeprecationWarning,
614                                   r'get the name attribute'):
615            t.getName()
616        with self.assertWarnsRegex(DeprecationWarning,
617                                   r'set the name attribute'):
618            t.setName("name")
619
620        e = threading.Event()
621        with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'):
622            e.isSet()
623
624        cond = threading.Condition()
625        cond.acquire()
626        with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'):
627            cond.notifyAll()
628
629        with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'):
630            threading.activeCount()
631        with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'):
632            threading.currentThread()
633
634    def test_repr_daemon(self):
635        t = threading.Thread()
636        self.assertNotIn('daemon', repr(t))
637        t.daemon = True
638        self.assertIn('daemon', repr(t))
639
640    def test_daemon_param(self):
641        t = threading.Thread()
642        self.assertFalse(t.daemon)
643        t = threading.Thread(daemon=False)
644        self.assertFalse(t.daemon)
645        t = threading.Thread(daemon=True)
646        self.assertTrue(t.daemon)
647
648    @skip_unless_reliable_fork
649    def test_dummy_thread_after_fork(self):
650        # Issue #14308: a dummy thread in the active list doesn't mess up
651        # the after-fork mechanism.
652        code = """if 1:
653            import _thread, threading, os, time, warnings
654
655            def background_thread(evt):
656                # Creates and registers the _DummyThread instance
657                threading.current_thread()
658                evt.set()
659                time.sleep(10)
660
661            evt = threading.Event()
662            _thread.start_new_thread(background_thread, (evt,))
663            evt.wait()
664            assert threading.active_count() == 2, threading.active_count()
665            with warnings.catch_warnings(record=True) as ws:
666                warnings.filterwarnings(
667                        "always", category=DeprecationWarning)
668                if os.fork() == 0:
669                    assert threading.active_count() == 1, threading.active_count()
670                    os._exit(0)
671                else:
672                    assert ws[0].category == DeprecationWarning, ws[0]
673                    assert 'fork' in str(ws[0].message), ws[0]
674                    os.wait()
675        """
676        _, out, err = assert_python_ok("-c", code)
677        self.assertEqual(out, b'')
678        self.assertEqual(err, b'')
679
680    @skip_unless_reliable_fork
681    def test_is_alive_after_fork(self):
682        # Try hard to trigger #18418: is_alive() could sometimes be True on
683        # threads that vanished after a fork.
684        old_interval = sys.getswitchinterval()
685        self.addCleanup(sys.setswitchinterval, old_interval)
686
687        # Make the bug more likely to manifest.
688        test.support.setswitchinterval(1e-6)
689
690        for i in range(20):
691            t = threading.Thread(target=lambda: None)
692            t.start()
693            # Ignore the warning about fork with threads.
694            with warnings.catch_warnings(category=DeprecationWarning,
695                                         action="ignore"):
696                if (pid := os.fork()) == 0:
697                    os._exit(11 if t.is_alive() else 10)
698                else:
699                    t.join()
700
701                    support.wait_process(pid, exitcode=10)
702
703    def test_main_thread(self):
704        main = threading.main_thread()
705        self.assertEqual(main.name, 'MainThread')
706        self.assertEqual(main.ident, threading.current_thread().ident)
707        self.assertEqual(main.ident, threading.get_ident())
708
709        def f():
710            self.assertNotEqual(threading.main_thread().ident,
711                                threading.current_thread().ident)
712        th = threading.Thread(target=f)
713        th.start()
714        th.join()
715
716    @skip_unless_reliable_fork
717    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
718    def test_main_thread_after_fork(self):
719        code = """if 1:
720            import os, threading
721            from test import support
722
723            ident = threading.get_ident()
724            pid = os.fork()
725            if pid == 0:
726                print("current ident", threading.get_ident() == ident)
727                main = threading.main_thread()
728                print("main", main.name)
729                print("main ident", main.ident == ident)
730                print("current is main", threading.current_thread() is main)
731            else:
732                support.wait_process(pid, exitcode=0)
733        """
734        _, out, err = assert_python_ok("-c", code)
735        data = out.decode().replace('\r', '')
736        self.assertEqual(err, b"")
737        self.assertEqual(data,
738                         "current ident True\n"
739                         "main MainThread\n"
740                         "main ident True\n"
741                         "current is main True\n")
742
743    @skip_unless_reliable_fork
744    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
745    def test_main_thread_after_fork_from_nonmain_thread(self):
746        code = """if 1:
747            import os, threading, sys, warnings
748            from test import support
749
750            def func():
751                ident = threading.get_ident()
752                with warnings.catch_warnings(record=True) as ws:
753                    warnings.filterwarnings(
754                            "always", category=DeprecationWarning)
755                    pid = os.fork()
756                    if pid == 0:
757                        print("current ident", threading.get_ident() == ident)
758                        main = threading.main_thread()
759                        print("main", main.name, type(main).__name__)
760                        print("main ident", main.ident == ident)
761                        print("current is main", threading.current_thread() is main)
762                        # stdout is fully buffered because not a tty,
763                        # we have to flush before exit.
764                        sys.stdout.flush()
765                    else:
766                        assert ws[0].category == DeprecationWarning, ws[0]
767                        assert 'fork' in str(ws[0].message), ws[0]
768                        support.wait_process(pid, exitcode=0)
769
770            th = threading.Thread(target=func)
771            th.start()
772            th.join()
773        """
774        _, out, err = assert_python_ok("-c", code)
775        data = out.decode().replace('\r', '')
776        self.assertEqual(err.decode('utf-8'), "")
777        self.assertEqual(data,
778                         "current ident True\n"
779                         "main Thread-1 (func) Thread\n"
780                         "main ident True\n"
781                         "current is main True\n"
782                         )
783
784    @skip_unless_reliable_fork
785    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
786    def test_main_thread_after_fork_from_foreign_thread(self, create_dummy=False):
787        code = """if 1:
788            import os, threading, sys, traceback, _thread
789            from test import support
790
791            def func(lock):
792                ident = threading.get_ident()
793                if %s:
794                    # call current_thread() before fork to allocate DummyThread
795                    current = threading.current_thread()
796                    print("current", current.name, type(current).__name__)
797                print("ident in _active", ident in threading._active)
798                # flush before fork, so child won't flush it again
799                sys.stdout.flush()
800                pid = os.fork()
801                if pid == 0:
802                    print("current ident", threading.get_ident() == ident)
803                    main = threading.main_thread()
804                    print("main", main.name, type(main).__name__)
805                    print("main ident", main.ident == ident)
806                    print("current is main", threading.current_thread() is main)
807                    print("_dangling", [t.name for t in list(threading._dangling)])
808                    # stdout is fully buffered because not a tty,
809                    # we have to flush before exit.
810                    sys.stdout.flush()
811                    try:
812                        threading._shutdown()
813                        os._exit(0)
814                    except:
815                        traceback.print_exc()
816                        sys.stderr.flush()
817                        os._exit(1)
818                else:
819                    try:
820                        support.wait_process(pid, exitcode=0)
821                    except Exception:
822                        # avoid 'could not acquire lock for
823                        # <_io.BufferedWriter name='<stderr>'> at interpreter shutdown,'
824                        traceback.print_exc()
825                        sys.stderr.flush()
826                    finally:
827                        lock.release()
828
829            join_lock = _thread.allocate_lock()
830            join_lock.acquire()
831            th = _thread.start_new_thread(func, (join_lock,))
832            join_lock.acquire()
833        """ % create_dummy
834        # "DeprecationWarning: This process is multi-threaded, use of fork()
835        # may lead to deadlocks in the child"
836        _, out, err = assert_python_ok("-W", "ignore::DeprecationWarning", "-c", code)
837        data = out.decode().replace('\r', '')
838        self.assertEqual(err.decode(), "")
839        self.assertEqual(data,
840                         ("current Dummy-1 _DummyThread\n" if create_dummy else "") +
841                         f"ident in _active {create_dummy!s}\n" +
842                         "current ident True\n"
843                         "main MainThread _MainThread\n"
844                         "main ident True\n"
845                         "current is main True\n"
846                         "_dangling ['MainThread']\n")
847
848    def test_main_thread_after_fork_from_dummy_thread(self, create_dummy=False):
849        self.test_main_thread_after_fork_from_foreign_thread(create_dummy=True)
850
851    def test_main_thread_during_shutdown(self):
852        # bpo-31516: current_thread() should still point to the main thread
853        # at shutdown
854        code = """if 1:
855            import gc, threading
856
857            main_thread = threading.current_thread()
858            assert main_thread is threading.main_thread()  # sanity check
859
860            class RefCycle:
861                def __init__(self):
862                    self.cycle = self
863
864                def __del__(self):
865                    print("GC:",
866                          threading.current_thread() is main_thread,
867                          threading.main_thread() is main_thread,
868                          threading.enumerate() == [main_thread])
869
870            RefCycle()
871            gc.collect()  # sanity check
872            x = RefCycle()
873        """
874        _, out, err = assert_python_ok("-c", code)
875        data = out.decode()
876        self.assertEqual(err, b"")
877        self.assertEqual(data.splitlines(),
878                         ["GC: True True True"] * 2)
879
880    def test_finalization_shutdown(self):
881        # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
882        # until Python thread states of all non-daemon threads get deleted.
883        #
884        # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
885        # test the finalization of the main interpreter.
886        code = """if 1:
887            import os
888            import threading
889            import time
890            import random
891
892            def random_sleep():
893                seconds = random.random() * 0.010
894                time.sleep(seconds)
895
896            class Sleeper:
897                def __del__(self):
898                    random_sleep()
899
900            tls = threading.local()
901
902            def f():
903                # Sleep a bit so that the thread is still running when
904                # Py_Finalize() is called.
905                random_sleep()
906                tls.x = Sleeper()
907                random_sleep()
908
909            threading.Thread(target=f).start()
910            random_sleep()
911        """
912        rc, out, err = assert_python_ok("-c", code)
913        self.assertEqual(err, b"")
914
915    def test_repr_stopped(self):
916        # Verify that "stopped" shows up in repr(Thread) appropriately.
917        started = _thread.allocate_lock()
918        finish = _thread.allocate_lock()
919        started.acquire()
920        finish.acquire()
921        def f():
922            started.release()
923            finish.acquire()
924        t = threading.Thread(target=f)
925        t.start()
926        started.acquire()
927        self.assertIn("started", repr(t))
928        finish.release()
929        # "stopped" should appear in the repr in a reasonable amount of time.
930        # Implementation detail:  as of this writing, that's trivially true
931        # if .join() is called, and almost trivially true if .is_alive() is
932        # called.  The detail we're testing here is that "stopped" shows up
933        # "all on its own".
934        LOOKING_FOR = "stopped"
935        for i in range(500):
936            if LOOKING_FOR in repr(t):
937                break
938            time.sleep(0.01)
939        self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
940        t.join()
941
942    def test_BoundedSemaphore_limit(self):
943        # BoundedSemaphore should raise ValueError if released too often.
944        for limit in range(1, 10):
945            bs = threading.BoundedSemaphore(limit)
946            threads = [threading.Thread(target=bs.acquire)
947                       for _ in range(limit)]
948            for t in threads:
949                t.start()
950            for t in threads:
951                t.join()
952            threads = [threading.Thread(target=bs.release)
953                       for _ in range(limit)]
954            for t in threads:
955                t.start()
956            for t in threads:
957                t.join()
958            self.assertRaises(ValueError, bs.release)
959
960    @cpython_only
961    def test_frame_tstate_tracing(self):
962        _testcapi = import_module("_testcapi")
963        # Issue #14432: Crash when a generator is created in a C thread that is
964        # destroyed while the generator is still used. The issue was that a
965        # generator contains a frame, and the frame kept a reference to the
966        # Python state of the destroyed C thread. The crash occurs when a trace
967        # function is setup.
968
969        def noop_trace(frame, event, arg):
970            # no operation
971            return noop_trace
972
973        def generator():
974            while 1:
975                yield "generator"
976
977        def callback():
978            if callback.gen is None:
979                callback.gen = generator()
980            return next(callback.gen)
981        callback.gen = None
982
983        old_trace = sys.gettrace()
984        sys.settrace(noop_trace)
985        try:
986            # Install a trace function
987            threading.settrace(noop_trace)
988
989            # Create a generator in a C thread which exits after the call
990            _testcapi.call_in_temporary_c_thread(callback)
991
992            # Call the generator in a different Python thread, check that the
993            # generator didn't keep a reference to the destroyed thread state
994            for test in range(3):
995                # The trace function is still called here
996                callback()
997        finally:
998            sys.settrace(old_trace)
999            threading.settrace(old_trace)
1000
1001    def test_gettrace(self):
1002        def noop_trace(frame, event, arg):
1003            # no operation
1004            return noop_trace
1005        old_trace = threading.gettrace()
1006        try:
1007            threading.settrace(noop_trace)
1008            trace_func = threading.gettrace()
1009            self.assertEqual(noop_trace,trace_func)
1010        finally:
1011            threading.settrace(old_trace)
1012
1013    def test_gettrace_all_threads(self):
1014        def fn(*args): pass
1015        old_trace = threading.gettrace()
1016        first_check = threading.Event()
1017        second_check = threading.Event()
1018
1019        trace_funcs = []
1020        def checker():
1021            trace_funcs.append(sys.gettrace())
1022            first_check.set()
1023            second_check.wait()
1024            trace_funcs.append(sys.gettrace())
1025
1026        try:
1027            t = threading.Thread(target=checker)
1028            t.start()
1029            first_check.wait()
1030            threading.settrace_all_threads(fn)
1031            second_check.set()
1032            t.join()
1033            self.assertEqual(trace_funcs, [None, fn])
1034            self.assertEqual(threading.gettrace(), fn)
1035            self.assertEqual(sys.gettrace(), fn)
1036        finally:
1037            threading.settrace_all_threads(old_trace)
1038
1039        self.assertEqual(threading.gettrace(), old_trace)
1040        self.assertEqual(sys.gettrace(), old_trace)
1041
1042    def test_getprofile(self):
1043        def fn(*args): pass
1044        old_profile = threading.getprofile()
1045        try:
1046            threading.setprofile(fn)
1047            self.assertEqual(fn, threading.getprofile())
1048        finally:
1049            threading.setprofile(old_profile)
1050
1051    def test_getprofile_all_threads(self):
1052        def fn(*args): pass
1053        old_profile = threading.getprofile()
1054        first_check = threading.Event()
1055        second_check = threading.Event()
1056
1057        profile_funcs = []
1058        def checker():
1059            profile_funcs.append(sys.getprofile())
1060            first_check.set()
1061            second_check.wait()
1062            profile_funcs.append(sys.getprofile())
1063
1064        try:
1065            t = threading.Thread(target=checker)
1066            t.start()
1067            first_check.wait()
1068            threading.setprofile_all_threads(fn)
1069            second_check.set()
1070            t.join()
1071            self.assertEqual(profile_funcs, [None, fn])
1072            self.assertEqual(threading.getprofile(), fn)
1073            self.assertEqual(sys.getprofile(), fn)
1074        finally:
1075            threading.setprofile_all_threads(old_profile)
1076
1077        self.assertEqual(threading.getprofile(), old_profile)
1078        self.assertEqual(sys.getprofile(), old_profile)
1079
1080    def test_locals_at_exit(self):
1081        # bpo-19466: thread locals must not be deleted before destructors
1082        # are called
1083        rc, out, err = assert_python_ok("-c", """if 1:
1084            import threading
1085
1086            class Atexit:
1087                def __del__(self):
1088                    print("thread_dict.atexit = %r" % thread_dict.atexit)
1089
1090            thread_dict = threading.local()
1091            thread_dict.atexit = "value"
1092
1093            atexit = Atexit()
1094        """)
1095        self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'")
1096
1097    def test_boolean_target(self):
1098        # bpo-41149: A thread that had a boolean value of False would not
1099        # run, regardless of whether it was callable. The correct behaviour
1100        # is for a thread to do nothing if its target is None, and to call
1101        # the target otherwise.
1102        class BooleanTarget(object):
1103            def __init__(self):
1104                self.ran = False
1105            def __bool__(self):
1106                return False
1107            def __call__(self):
1108                self.ran = True
1109
1110        target = BooleanTarget()
1111        thread = threading.Thread(target=target)
1112        thread.start()
1113        thread.join()
1114        self.assertTrue(target.ran)
1115
1116    def test_leak_without_join(self):
1117        # bpo-37788: Test that a thread which is not joined explicitly
1118        # does not leak. Test written for reference leak checks.
1119        def noop(): pass
1120        with threading_helper.wait_threads_exit():
1121            threading.Thread(target=noop).start()
1122            # Thread.join() is not called
1123
1124    def test_import_from_another_thread(self):
1125        # bpo-1596321: If the threading module is first import from a thread
1126        # different than the main thread, threading._shutdown() must handle
1127        # this case without logging an error at Python exit.
1128        code = textwrap.dedent('''
1129            import _thread
1130            import sys
1131
1132            event = _thread.allocate_lock()
1133            event.acquire()
1134
1135            def import_threading():
1136                import threading
1137                event.release()
1138
1139            if 'threading' in sys.modules:
1140                raise Exception('threading is already imported')
1141
1142            _thread.start_new_thread(import_threading, ())
1143
1144            # wait until the threading module is imported
1145            event.acquire()
1146            event.release()
1147
1148            if 'threading' not in sys.modules:
1149                raise Exception('threading is not imported')
1150
1151            # don't wait until the thread completes
1152        ''')
1153        rc, out, err = assert_python_ok("-c", code)
1154        self.assertEqual(out, b'')
1155        self.assertEqual(err, b'')
1156
1157    def test_start_new_thread_at_finalization(self):
1158        code = """if 1:
1159            import _thread
1160
1161            def f():
1162                print("shouldn't be printed")
1163
1164            class AtFinalization:
1165                def __del__(self):
1166                    print("OK")
1167                    _thread.start_new_thread(f, ())
1168            at_finalization = AtFinalization()
1169        """
1170        _, out, err = assert_python_ok("-c", code)
1171        self.assertEqual(out.strip(), b"OK")
1172        self.assertIn(b"can't create new thread at interpreter shutdown", err)
1173
1174    def test_start_new_thread_failed(self):
1175        # gh-109746: if Python fails to start newly created thread
1176        # due to failure of underlying PyThread_start_new_thread() call,
1177        # its state should be removed from interpreter' thread states list
1178        # to avoid its double cleanup
1179        try:
1180            from resource import setrlimit, RLIMIT_NPROC
1181        except ImportError as err:
1182            self.skipTest(err)  # RLIMIT_NPROC is specific to Linux and BSD
1183        code = """if 1:
1184            import resource
1185            import _thread
1186
1187            def f():
1188                print("shouldn't be printed")
1189
1190            limits = resource.getrlimit(resource.RLIMIT_NPROC)
1191            [_, hard] = limits
1192            resource.setrlimit(resource.RLIMIT_NPROC, (0, hard))
1193
1194            try:
1195                _thread.start_new_thread(f, ())
1196            except RuntimeError:
1197                print('ok')
1198            else:
1199                print('!skip!')
1200        """
1201        _, out, err = assert_python_ok("-u", "-c", code)
1202        out = out.strip()
1203        if b'!skip!' in out:
1204            self.skipTest('RLIMIT_NPROC had no effect; probably superuser')
1205        self.assertEqual(out, b'ok')
1206        self.assertEqual(err, b'')
1207
1208
1209class ThreadJoinOnShutdown(BaseTestCase):
1210
1211    def _run_and_join(self, script):
1212        script = """if 1:
1213            import sys, os, time, threading
1214
1215            # a thread, which waits for the main program to terminate
1216            def joiningfunc(mainthread):
1217                mainthread.join()
1218                print('end of thread')
1219                # stdout is fully buffered because not a tty, we have to flush
1220                # before exit.
1221                sys.stdout.flush()
1222        \n""" + script
1223
1224        rc, out, err = assert_python_ok("-c", script)
1225        data = out.decode().replace('\r', '')
1226        self.assertEqual(data, "end of main\nend of thread\n")
1227
1228    def test_1_join_on_shutdown(self):
1229        # The usual case: on exit, wait for a non-daemon thread
1230        script = """if 1:
1231            import os
1232            t = threading.Thread(target=joiningfunc,
1233                                 args=(threading.current_thread(),))
1234            t.start()
1235            time.sleep(0.1)
1236            print('end of main')
1237            """
1238        self._run_and_join(script)
1239
1240    @skip_unless_reliable_fork
1241    def test_2_join_in_forked_process(self):
1242        # Like the test above, but from a forked interpreter
1243        script = """if 1:
1244            from test import support
1245
1246            childpid = os.fork()
1247            if childpid != 0:
1248                # parent process
1249                support.wait_process(childpid, exitcode=0)
1250                sys.exit(0)
1251
1252            # child process
1253            t = threading.Thread(target=joiningfunc,
1254                                 args=(threading.current_thread(),))
1255            t.start()
1256            print('end of main')
1257            """
1258        self._run_and_join(script)
1259
1260    @skip_unless_reliable_fork
1261    def test_3_join_in_forked_from_thread(self):
1262        # Like the test above, but fork() was called from a worker thread
1263        # In the forked process, the main Thread object must be marked as stopped.
1264
1265        script = """if 1:
1266            from test import support
1267
1268            main_thread = threading.current_thread()
1269            def worker():
1270                childpid = os.fork()
1271                if childpid != 0:
1272                    # parent process
1273                    support.wait_process(childpid, exitcode=0)
1274                    sys.exit(0)
1275
1276                # child process
1277                t = threading.Thread(target=joiningfunc,
1278                                     args=(main_thread,))
1279                print('end of main')
1280                t.start()
1281                t.join() # Should not block: main_thread is already stopped
1282
1283            w = threading.Thread(target=worker)
1284            w.start()
1285            """
1286        self._run_and_join(script)
1287
1288    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
1289    def test_4_daemon_threads(self):
1290        # Check that a daemon thread cannot crash the interpreter on shutdown
1291        # by manipulating internal structures that are being disposed of in
1292        # the main thread.
1293        if support.check_sanitizer(thread=True):
1294            # some of the threads running `random_io` below will still be alive
1295            # at process exit
1296            self.skipTest("TSAN would report thread leak")
1297
1298        script = """if True:
1299            import os
1300            import random
1301            import sys
1302            import time
1303            import threading
1304
1305            thread_has_run = set()
1306
1307            def random_io():
1308                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
1309                import test.test_threading as mod
1310                while True:
1311                    with open(mod.__file__, 'rb') as in_f:
1312                        stuff = in_f.read(200)
1313                        with open(os.devnull, 'wb') as null_f:
1314                            null_f.write(stuff)
1315                            time.sleep(random.random() / 1995)
1316                    thread_has_run.add(threading.current_thread())
1317
1318            def main():
1319                count = 0
1320                for _ in range(40):
1321                    new_thread = threading.Thread(target=random_io)
1322                    new_thread.daemon = True
1323                    new_thread.start()
1324                    count += 1
1325                while len(thread_has_run) < count:
1326                    time.sleep(0.001)
1327                # Trigger process shutdown
1328                sys.exit(0)
1329
1330            main()
1331            """
1332        rc, out, err = assert_python_ok('-c', script)
1333        self.assertFalse(err)
1334
1335    def test_thread_from_thread(self):
1336        script = """if True:
1337            import threading
1338            import time
1339
1340            def thread2():
1341                time.sleep(0.05)
1342                print("OK")
1343
1344            def thread1():
1345                time.sleep(0.05)
1346                t2 = threading.Thread(target=thread2)
1347                t2.start()
1348
1349            t = threading.Thread(target=thread1)
1350            t.start()
1351            # do not join() -- the interpreter waits for non-daemon threads to
1352            # finish.
1353            """
1354        rc, out, err = assert_python_ok('-c', script)
1355        self.assertEqual(err, b"")
1356        self.assertEqual(out.strip(), b"OK")
1357        self.assertEqual(rc, 0)
1358
1359    @skip_unless_reliable_fork
1360    def test_reinit_tls_after_fork(self):
1361        # Issue #13817: fork() would deadlock in a multithreaded program with
1362        # the ad-hoc TLS implementation.
1363
1364        def do_fork_and_wait():
1365            # just fork a child process and wait it
1366            pid = os.fork()
1367            if pid > 0:
1368                support.wait_process(pid, exitcode=50)
1369            else:
1370                os._exit(50)
1371
1372        # Ignore the warning about fork with threads.
1373        with warnings.catch_warnings(category=DeprecationWarning,
1374                                     action="ignore"):
1375            # start a bunch of threads that will fork() child processes
1376            threads = []
1377            for i in range(16):
1378                t = threading.Thread(target=do_fork_and_wait)
1379                threads.append(t)
1380                t.start()
1381
1382            for t in threads:
1383                t.join()
1384
1385    @skip_unless_reliable_fork
1386    def test_clear_threads_states_after_fork(self):
1387        # Issue #17094: check that threads states are cleared after fork()
1388
1389        # start a bunch of threads
1390        threads = []
1391        for i in range(16):
1392            t = threading.Thread(target=lambda : time.sleep(0.3))
1393            threads.append(t)
1394            t.start()
1395
1396        try:
1397            # Ignore the warning about fork with threads.
1398            with warnings.catch_warnings(category=DeprecationWarning,
1399                                         action="ignore"):
1400                pid = os.fork()
1401                if pid == 0:
1402                    # check that threads states have been cleared
1403                    if len(sys._current_frames()) == 1:
1404                        os._exit(51)
1405                    else:
1406                        os._exit(52)
1407                else:
1408                    support.wait_process(pid, exitcode=51)
1409        finally:
1410            for t in threads:
1411                t.join()
1412
1413
1414class SubinterpThreadingTests(BaseTestCase):
1415    def pipe(self):
1416        r, w = os.pipe()
1417        self.addCleanup(os.close, r)
1418        self.addCleanup(os.close, w)
1419        if hasattr(os, 'set_blocking'):
1420            os.set_blocking(r, False)
1421        return (r, w)
1422
1423    def test_threads_join(self):
1424        # Non-daemon threads should be joined at subinterpreter shutdown
1425        # (issue #18808)
1426        r, w = self.pipe()
1427        code = textwrap.dedent(r"""
1428            import os
1429            import random
1430            import threading
1431            import time
1432
1433            def random_sleep():
1434                seconds = random.random() * 0.010
1435                time.sleep(seconds)
1436
1437            def f():
1438                # Sleep a bit so that the thread is still running when
1439                # Py_EndInterpreter is called.
1440                random_sleep()
1441                os.write(%d, b"x")
1442
1443            threading.Thread(target=f).start()
1444            random_sleep()
1445        """ % (w,))
1446        ret = test.support.run_in_subinterp(code)
1447        self.assertEqual(ret, 0)
1448        # The thread was joined properly.
1449        self.assertEqual(os.read(r, 1), b"x")
1450
1451    def test_threads_join_2(self):
1452        # Same as above, but a delay gets introduced after the thread's
1453        # Python code returned but before the thread state is deleted.
1454        # To achieve this, we register a thread-local object which sleeps
1455        # a bit when deallocated.
1456        r, w = self.pipe()
1457        code = textwrap.dedent(r"""
1458            import os
1459            import random
1460            import threading
1461            import time
1462
1463            def random_sleep():
1464                seconds = random.random() * 0.010
1465                time.sleep(seconds)
1466
1467            class Sleeper:
1468                def __del__(self):
1469                    random_sleep()
1470
1471            tls = threading.local()
1472
1473            def f():
1474                # Sleep a bit so that the thread is still running when
1475                # Py_EndInterpreter is called.
1476                random_sleep()
1477                tls.x = Sleeper()
1478                os.write(%d, b"x")
1479
1480            threading.Thread(target=f).start()
1481            random_sleep()
1482        """ % (w,))
1483        ret = test.support.run_in_subinterp(code)
1484        self.assertEqual(ret, 0)
1485        # The thread was joined properly.
1486        self.assertEqual(os.read(r, 1), b"x")
1487
1488    @requires_subinterpreters
1489    def test_threads_join_with_no_main(self):
1490        r_interp, w_interp = self.pipe()
1491
1492        INTERP = b'I'
1493        FINI = b'F'
1494        DONE = b'D'
1495
1496        interp = interpreters.create()
1497        interp.exec(f"""if True:
1498            import os
1499            import threading
1500            import time
1501
1502            done = False
1503
1504            def notify_fini():
1505                global done
1506                done = True
1507                os.write({w_interp}, {FINI!r})
1508                t.join()
1509            threading._register_atexit(notify_fini)
1510
1511            def task():
1512                while not done:
1513                    time.sleep(0.1)
1514                os.write({w_interp}, {DONE!r})
1515            t = threading.Thread(target=task)
1516            t.start()
1517
1518            os.write({w_interp}, {INTERP!r})
1519            """)
1520        interp.close()
1521
1522        self.assertEqual(os.read(r_interp, 1), INTERP)
1523        self.assertEqual(os.read(r_interp, 1), FINI)
1524        self.assertEqual(os.read(r_interp, 1), DONE)
1525
1526    @cpython_only
1527    def test_daemon_threads_fatal_error(self):
1528        import_module("_testcapi")
1529        subinterp_code = f"""if 1:
1530            import os
1531            import threading
1532            import time
1533
1534            def f():
1535                # Make sure the daemon thread is still running when
1536                # Py_EndInterpreter is called.
1537                time.sleep({test.support.SHORT_TIMEOUT})
1538            threading.Thread(target=f, daemon=True).start()
1539            """
1540        script = r"""if 1:
1541            import _testcapi
1542
1543            _testcapi.run_in_subinterp(%r)
1544            """ % (subinterp_code,)
1545        with test.support.SuppressCrashReport():
1546            rc, out, err = assert_python_failure("-c", script)
1547        self.assertIn("Fatal Python error: Py_EndInterpreter: "
1548                      "not the last thread", err.decode())
1549
1550    def _check_allowed(self, before_start='', *,
1551                       allowed=True,
1552                       daemon_allowed=True,
1553                       daemon=False,
1554                       ):
1555        import_module("_testinternalcapi")
1556        subinterp_code = textwrap.dedent(f"""
1557            import test.support
1558            import threading
1559            def func():
1560                print('this should not have run!')
1561            t = threading.Thread(target=func, daemon={daemon})
1562            {before_start}
1563            t.start()
1564            """)
1565        check_multi_interp_extensions = bool(support.Py_GIL_DISABLED)
1566        script = textwrap.dedent(f"""
1567            import test.support
1568            test.support.run_in_subinterp_with_config(
1569                {subinterp_code!r},
1570                use_main_obmalloc=True,
1571                allow_fork=True,
1572                allow_exec=True,
1573                allow_threads={allowed},
1574                allow_daemon_threads={daemon_allowed},
1575                check_multi_interp_extensions={check_multi_interp_extensions},
1576                own_gil=False,
1577            )
1578            """)
1579        with test.support.SuppressCrashReport():
1580            _, _, err = assert_python_ok("-c", script)
1581        return err.decode()
1582
1583    @cpython_only
1584    def test_threads_not_allowed(self):
1585        err = self._check_allowed(
1586            allowed=False,
1587            daemon_allowed=False,
1588            daemon=False,
1589        )
1590        self.assertIn('RuntimeError', err)
1591
1592    @cpython_only
1593    def test_daemon_threads_not_allowed(self):
1594        with self.subTest('via Thread()'):
1595            err = self._check_allowed(
1596                allowed=True,
1597                daemon_allowed=False,
1598                daemon=True,
1599            )
1600            self.assertIn('RuntimeError', err)
1601
1602        with self.subTest('via Thread.daemon setter'):
1603            err = self._check_allowed(
1604                't.daemon = True',
1605                allowed=True,
1606                daemon_allowed=False,
1607                daemon=False,
1608            )
1609            self.assertIn('RuntimeError', err)
1610
1611
1612class ThreadingExceptionTests(BaseTestCase):
1613    # A RuntimeError should be raised if Thread.start() is called
1614    # multiple times.
1615    def test_start_thread_again(self):
1616        thread = threading.Thread()
1617        thread.start()
1618        self.assertRaises(RuntimeError, thread.start)
1619        thread.join()
1620
1621    def test_joining_current_thread(self):
1622        current_thread = threading.current_thread()
1623        self.assertRaises(RuntimeError, current_thread.join);
1624
1625    def test_joining_inactive_thread(self):
1626        thread = threading.Thread()
1627        self.assertRaises(RuntimeError, thread.join)
1628
1629    def test_daemonize_active_thread(self):
1630        thread = threading.Thread()
1631        thread.start()
1632        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
1633        thread.join()
1634
1635    def test_releasing_unacquired_lock(self):
1636        lock = threading.Lock()
1637        self.assertRaises(RuntimeError, lock.release)
1638
1639    @requires_subprocess()
1640    def test_recursion_limit(self):
1641        # Issue 9670
1642        # test that excessive recursion within a non-main thread causes
1643        # an exception rather than crashing the interpreter on platforms
1644        # like Mac OS X or FreeBSD which have small default stack sizes
1645        # for threads
1646        script = """if True:
1647            import threading
1648
1649            def recurse():
1650                return recurse()
1651
1652            def outer():
1653                try:
1654                    recurse()
1655                except RecursionError:
1656                    pass
1657
1658            w = threading.Thread(target=outer)
1659            w.start()
1660            w.join()
1661            print('end of main thread')
1662            """
1663        expected_output = "end of main thread\n"
1664        p = subprocess.Popen([sys.executable, "-c", script],
1665                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1666        stdout, stderr = p.communicate()
1667        data = stdout.decode().replace('\r', '')
1668        self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
1669        self.assertEqual(data, expected_output)
1670
1671    def test_print_exception(self):
1672        script = r"""if True:
1673            import threading
1674            import time
1675
1676            running = False
1677            def run():
1678                global running
1679                running = True
1680                while running:
1681                    time.sleep(0.01)
1682                1/0
1683            t = threading.Thread(target=run)
1684            t.start()
1685            while not running:
1686                time.sleep(0.01)
1687            running = False
1688            t.join()
1689            """
1690        rc, out, err = assert_python_ok("-c", script)
1691        self.assertEqual(out, b'')
1692        err = err.decode()
1693        self.assertIn("Exception in thread", err)
1694        self.assertIn("Traceback (most recent call last):", err)
1695        self.assertIn("ZeroDivisionError", err)
1696        self.assertNotIn("Unhandled exception", err)
1697
1698    def test_print_exception_stderr_is_none_1(self):
1699        script = r"""if True:
1700            import sys
1701            import threading
1702            import time
1703
1704            running = False
1705            def run():
1706                global running
1707                running = True
1708                while running:
1709                    time.sleep(0.01)
1710                1/0
1711            t = threading.Thread(target=run)
1712            t.start()
1713            while not running:
1714                time.sleep(0.01)
1715            sys.stderr = None
1716            running = False
1717            t.join()
1718            """
1719        rc, out, err = assert_python_ok("-c", script)
1720        self.assertEqual(out, b'')
1721        err = err.decode()
1722        self.assertIn("Exception in thread", err)
1723        self.assertIn("Traceback (most recent call last):", err)
1724        self.assertIn("ZeroDivisionError", err)
1725        self.assertNotIn("Unhandled exception", err)
1726
1727    def test_print_exception_stderr_is_none_2(self):
1728        script = r"""if True:
1729            import sys
1730            import threading
1731            import time
1732
1733            running = False
1734            def run():
1735                global running
1736                running = True
1737                while running:
1738                    time.sleep(0.01)
1739                1/0
1740            sys.stderr = None
1741            t = threading.Thread(target=run)
1742            t.start()
1743            while not running:
1744                time.sleep(0.01)
1745            running = False
1746            t.join()
1747            """
1748        rc, out, err = assert_python_ok("-c", script)
1749        self.assertEqual(out, b'')
1750        self.assertNotIn("Unhandled exception", err.decode())
1751
1752    def test_print_exception_gh_102056(self):
1753        # This used to crash. See gh-102056.
1754        script = r"""if True:
1755            import time
1756            import threading
1757            import _thread
1758
1759            def f():
1760                try:
1761                    f()
1762                except RecursionError:
1763                    f()
1764
1765            def g():
1766                try:
1767                    raise ValueError()
1768                except* ValueError:
1769                    f()
1770
1771            def h():
1772                time.sleep(1)
1773                _thread.interrupt_main()
1774
1775            t = threading.Thread(target=h)
1776            t.start()
1777            g()
1778            t.join()
1779            """
1780
1781        assert_python_failure("-c", script)
1782
1783    def test_bare_raise_in_brand_new_thread(self):
1784        def bare_raise():
1785            raise
1786
1787        class Issue27558(threading.Thread):
1788            exc = None
1789
1790            def run(self):
1791                try:
1792                    bare_raise()
1793                except Exception as exc:
1794                    self.exc = exc
1795
1796        thread = Issue27558()
1797        thread.start()
1798        thread.join()
1799        self.assertIsNotNone(thread.exc)
1800        self.assertIsInstance(thread.exc, RuntimeError)
1801        # explicitly break the reference cycle to not leak a dangling thread
1802        thread.exc = None
1803
1804    def test_multithread_modify_file_noerror(self):
1805        # See issue25872
1806        def modify_file():
1807            with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp:
1808                fp.write(' ')
1809                traceback.format_stack()
1810
1811        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
1812        threads = [
1813            threading.Thread(target=modify_file)
1814            for i in range(100)
1815        ]
1816        for t in threads:
1817            t.start()
1818            t.join()
1819
1820
1821class ThreadRunFail(threading.Thread):
1822    def run(self):
1823        raise ValueError("run failed")
1824
1825
1826class ExceptHookTests(BaseTestCase):
1827    def setUp(self):
1828        restore_default_excepthook(self)
1829        super().setUp()
1830
1831    @force_not_colorized
1832    def test_excepthook(self):
1833        with support.captured_output("stderr") as stderr:
1834            thread = ThreadRunFail(name="excepthook thread")
1835            thread.start()
1836            thread.join()
1837
1838        stderr = stderr.getvalue().strip()
1839        self.assertIn(f'Exception in thread {thread.name}:\n', stderr)
1840        self.assertIn('Traceback (most recent call last):\n', stderr)
1841        self.assertIn('  raise ValueError("run failed")', stderr)
1842        self.assertIn('ValueError: run failed', stderr)
1843
1844    @support.cpython_only
1845    @force_not_colorized
1846    def test_excepthook_thread_None(self):
1847        # threading.excepthook called with thread=None: log the thread
1848        # identifier in this case.
1849        with support.captured_output("stderr") as stderr:
1850            try:
1851                raise ValueError("bug")
1852            except Exception as exc:
1853                args = threading.ExceptHookArgs([*sys.exc_info(), None])
1854                try:
1855                    threading.excepthook(args)
1856                finally:
1857                    # Explicitly break a reference cycle
1858                    args = None
1859
1860        stderr = stderr.getvalue().strip()
1861        self.assertIn(f'Exception in thread {threading.get_ident()}:\n', stderr)
1862        self.assertIn('Traceback (most recent call last):\n', stderr)
1863        self.assertIn('  raise ValueError("bug")', stderr)
1864        self.assertIn('ValueError: bug', stderr)
1865
1866    def test_system_exit(self):
1867        class ThreadExit(threading.Thread):
1868            def run(self):
1869                sys.exit(1)
1870
1871        # threading.excepthook() silently ignores SystemExit
1872        with support.captured_output("stderr") as stderr:
1873            thread = ThreadExit()
1874            thread.start()
1875            thread.join()
1876
1877        self.assertEqual(stderr.getvalue(), '')
1878
1879    def test_custom_excepthook(self):
1880        args = None
1881
1882        def hook(hook_args):
1883            nonlocal args
1884            args = hook_args
1885
1886        try:
1887            with support.swap_attr(threading, 'excepthook', hook):
1888                thread = ThreadRunFail()
1889                thread.start()
1890                thread.join()
1891
1892            self.assertEqual(args.exc_type, ValueError)
1893            self.assertEqual(str(args.exc_value), 'run failed')
1894            self.assertEqual(args.exc_traceback, args.exc_value.__traceback__)
1895            self.assertIs(args.thread, thread)
1896        finally:
1897            # Break reference cycle
1898            args = None
1899
1900    def test_custom_excepthook_fail(self):
1901        def threading_hook(args):
1902            raise ValueError("threading_hook failed")
1903
1904        err_str = None
1905
1906        def sys_hook(exc_type, exc_value, exc_traceback):
1907            nonlocal err_str
1908            err_str = str(exc_value)
1909
1910        with support.swap_attr(threading, 'excepthook', threading_hook), \
1911             support.swap_attr(sys, 'excepthook', sys_hook), \
1912             support.captured_output('stderr') as stderr:
1913            thread = ThreadRunFail()
1914            thread.start()
1915            thread.join()
1916
1917        self.assertEqual(stderr.getvalue(),
1918                         'Exception in threading.excepthook:\n')
1919        self.assertEqual(err_str, 'threading_hook failed')
1920
1921    def test_original_excepthook(self):
1922        def run_thread():
1923            with support.captured_output("stderr") as output:
1924                thread = ThreadRunFail(name="excepthook thread")
1925                thread.start()
1926                thread.join()
1927            return output.getvalue()
1928
1929        def threading_hook(args):
1930            print("Running a thread failed", file=sys.stderr)
1931
1932        default_output = run_thread()
1933        with support.swap_attr(threading, 'excepthook', threading_hook):
1934            custom_hook_output = run_thread()
1935            threading.excepthook = threading.__excepthook__
1936            recovered_output = run_thread()
1937
1938        self.assertEqual(default_output, recovered_output)
1939        self.assertNotEqual(default_output, custom_hook_output)
1940        self.assertEqual(custom_hook_output, "Running a thread failed\n")
1941
1942
1943class TimerTests(BaseTestCase):
1944
1945    def setUp(self):
1946        BaseTestCase.setUp(self)
1947        self.callback_args = []
1948        self.callback_event = threading.Event()
1949
1950    def test_init_immutable_default_args(self):
1951        # Issue 17435: constructor defaults were mutable objects, they could be
1952        # mutated via the object attributes and affect other Timer objects.
1953        timer1 = threading.Timer(0.01, self._callback_spy)
1954        timer1.start()
1955        self.callback_event.wait()
1956        timer1.args.append("blah")
1957        timer1.kwargs["foo"] = "bar"
1958        self.callback_event.clear()
1959        timer2 = threading.Timer(0.01, self._callback_spy)
1960        timer2.start()
1961        self.callback_event.wait()
1962        self.assertEqual(len(self.callback_args), 2)
1963        self.assertEqual(self.callback_args, [((), {}), ((), {})])
1964        timer1.join()
1965        timer2.join()
1966
1967    def _callback_spy(self, *args, **kwargs):
1968        self.callback_args.append((args[:], kwargs.copy()))
1969        self.callback_event.set()
1970
1971class LockTests(lock_tests.LockTests):
1972    locktype = staticmethod(threading.Lock)
1973
1974class PyRLockTests(lock_tests.RLockTests):
1975    locktype = staticmethod(threading._PyRLock)
1976
1977@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1978class CRLockTests(lock_tests.RLockTests):
1979    locktype = staticmethod(threading._CRLock)
1980
1981    def test_signature(self):  # gh-102029
1982        with warnings.catch_warnings(record=True) as warnings_log:
1983            threading.RLock()
1984        self.assertEqual(warnings_log, [])
1985
1986        arg_types = [
1987            ((1,), {}),
1988            ((), {'a': 1}),
1989            ((1, 2), {'a': 1}),
1990        ]
1991        for args, kwargs in arg_types:
1992            with self.subTest(args=args, kwargs=kwargs):
1993                with self.assertWarns(DeprecationWarning):
1994                    threading.RLock(*args, **kwargs)
1995
1996        # Subtypes with custom `__init__` are allowed (but, not recommended):
1997        class CustomRLock(self.locktype):
1998            def __init__(self, a, *, b) -> None:
1999                super().__init__()
2000
2001        with warnings.catch_warnings(record=True) as warnings_log:
2002            CustomRLock(1, b=2)
2003        self.assertEqual(warnings_log, [])
2004
2005class EventTests(lock_tests.EventTests):
2006    eventtype = staticmethod(threading.Event)
2007
2008class ConditionAsRLockTests(lock_tests.RLockTests):
2009    # Condition uses an RLock by default and exports its API.
2010    locktype = staticmethod(threading.Condition)
2011
2012    def test_recursion_count(self):
2013        self.skipTest("Condition does not expose _recursion_count()")
2014
2015class ConditionTests(lock_tests.ConditionTests):
2016    condtype = staticmethod(threading.Condition)
2017
2018class SemaphoreTests(lock_tests.SemaphoreTests):
2019    semtype = staticmethod(threading.Semaphore)
2020
2021class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
2022    semtype = staticmethod(threading.BoundedSemaphore)
2023
2024class BarrierTests(lock_tests.BarrierTests):
2025    barriertype = staticmethod(threading.Barrier)
2026
2027
2028class MiscTestCase(unittest.TestCase):
2029    def test__all__(self):
2030        restore_default_excepthook(self)
2031
2032        extra = {"ThreadError"}
2033        not_exported = {'currentThread', 'activeCount'}
2034        support.check__all__(self, threading, ('threading', '_thread'),
2035                             extra=extra, not_exported=not_exported)
2036
2037
2038class InterruptMainTests(unittest.TestCase):
2039    def check_interrupt_main_with_signal_handler(self, signum):
2040        def handler(signum, frame):
2041            1/0
2042
2043        old_handler = signal.signal(signum, handler)
2044        self.addCleanup(signal.signal, signum, old_handler)
2045
2046        with self.assertRaises(ZeroDivisionError):
2047            _thread.interrupt_main()
2048
2049    def check_interrupt_main_noerror(self, signum):
2050        handler = signal.getsignal(signum)
2051        try:
2052            # No exception should arise.
2053            signal.signal(signum, signal.SIG_IGN)
2054            _thread.interrupt_main(signum)
2055
2056            signal.signal(signum, signal.SIG_DFL)
2057            _thread.interrupt_main(signum)
2058        finally:
2059            # Restore original handler
2060            signal.signal(signum, handler)
2061
2062    @requires_gil_enabled("gh-118433: Flaky due to a longstanding bug")
2063    def test_interrupt_main_subthread(self):
2064        # Calling start_new_thread with a function that executes interrupt_main
2065        # should raise KeyboardInterrupt upon completion.
2066        def call_interrupt():
2067            _thread.interrupt_main()
2068        t = threading.Thread(target=call_interrupt)
2069        with self.assertRaises(KeyboardInterrupt):
2070            t.start()
2071            t.join()
2072        t.join()
2073
2074    def test_interrupt_main_mainthread(self):
2075        # Make sure that if interrupt_main is called in main thread that
2076        # KeyboardInterrupt is raised instantly.
2077        with self.assertRaises(KeyboardInterrupt):
2078            _thread.interrupt_main()
2079
2080    def test_interrupt_main_with_signal_handler(self):
2081        self.check_interrupt_main_with_signal_handler(signal.SIGINT)
2082        self.check_interrupt_main_with_signal_handler(signal.SIGTERM)
2083
2084    def test_interrupt_main_noerror(self):
2085        self.check_interrupt_main_noerror(signal.SIGINT)
2086        self.check_interrupt_main_noerror(signal.SIGTERM)
2087
2088    def test_interrupt_main_invalid_signal(self):
2089        self.assertRaises(ValueError, _thread.interrupt_main, -1)
2090        self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG)
2091        self.assertRaises(ValueError, _thread.interrupt_main, 1000000)
2092
2093    @threading_helper.reap_threads
2094    def test_can_interrupt_tight_loops(self):
2095        cont = [True]
2096        started = [False]
2097        interrupted = [False]
2098
2099        def worker(started, cont, interrupted):
2100            iterations = 100_000_000
2101            started[0] = True
2102            while cont[0]:
2103                if iterations:
2104                    iterations -= 1
2105                else:
2106                    return
2107                pass
2108            interrupted[0] = True
2109
2110        t = threading.Thread(target=worker,args=(started, cont, interrupted))
2111        t.start()
2112        while not started[0]:
2113            pass
2114        cont[0] = False
2115        t.join()
2116        self.assertTrue(interrupted[0])
2117
2118
2119class AtexitTests(unittest.TestCase):
2120
2121    def test_atexit_output(self):
2122        rc, out, err = assert_python_ok("-c", """if True:
2123            import threading
2124
2125            def run_last():
2126                print('parrot')
2127
2128            threading._register_atexit(run_last)
2129        """)
2130
2131        self.assertFalse(err)
2132        self.assertEqual(out.strip(), b'parrot')
2133
2134    def test_atexit_called_once(self):
2135        rc, out, err = assert_python_ok("-c", """if True:
2136            import threading
2137            from unittest.mock import Mock
2138
2139            mock = Mock()
2140            threading._register_atexit(mock)
2141            mock.assert_not_called()
2142            # force early shutdown to ensure it was called once
2143            threading._shutdown()
2144            mock.assert_called_once()
2145        """)
2146
2147        self.assertFalse(err)
2148
2149    def test_atexit_after_shutdown(self):
2150        # The only way to do this is by registering an atexit within
2151        # an atexit, which is intended to raise an exception.
2152        rc, out, err = assert_python_ok("-c", """if True:
2153            import threading
2154
2155            def func():
2156                pass
2157
2158            def run_last():
2159                threading._register_atexit(func)
2160
2161            threading._register_atexit(run_last)
2162        """)
2163
2164        self.assertTrue(err)
2165        self.assertIn("RuntimeError: can't register atexit after shutdown",
2166                err.decode())
2167
2168
2169if __name__ == "__main__":
2170    unittest.main()
2171