• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Very rudimentary test of threading module
2
3import test.test_support
4from test.test_support import verbose, cpython_only
5from test.script_helper import assert_python_ok
6
7import random
8import re
9import sys
10thread = test.test_support.import_module('thread')
11threading = test.test_support.import_module('threading')
12import time
13import unittest
14import weakref
15import os
16import subprocess
17try:
18    import _testcapi
19except ImportError:
20    _testcapi = None
21
22from test import lock_tests
23
24# A trivial mutable counter.
25class Counter(object):
26    def __init__(self):
27        self.value = 0
28    def inc(self):
29        self.value += 1
30    def dec(self):
31        self.value -= 1
32    def get(self):
33        return self.value
34
35class TestThread(threading.Thread):
36    def __init__(self, name, testcase, sema, mutex, nrunning):
37        threading.Thread.__init__(self, name=name)
38        self.testcase = testcase
39        self.sema = sema
40        self.mutex = mutex
41        self.nrunning = nrunning
42
43    def run(self):
44        delay = random.random() / 10000.0
45        if verbose:
46            print 'task %s will run for %.1f usec' % (
47                self.name, delay * 1e6)
48
49        with self.sema:
50            with self.mutex:
51                self.nrunning.inc()
52                if verbose:
53                    print self.nrunning.get(), 'tasks are running'
54                self.testcase.assertLessEqual(self.nrunning.get(), 3)
55
56            time.sleep(delay)
57            if verbose:
58                print 'task', self.name, 'done'
59
60            with self.mutex:
61                self.nrunning.dec()
62                self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
63                if verbose:
64                    print '%s is finished. %d tasks are running' % (
65                        self.name, self.nrunning.get())
66
67class BaseTestCase(unittest.TestCase):
68    def setUp(self):
69        self._threads = test.test_support.threading_setup()
70
71    def tearDown(self):
72        test.test_support.threading_cleanup(*self._threads)
73        test.test_support.reap_children()
74
75
76class ThreadTests(BaseTestCase):
77
78    # Create a bunch of threads, let each do some work, wait until all are
79    # done.
80    def test_various_ops(self):
81        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
82        # times about 1 second per clump).
83        NUMTASKS = 10
84
85        # no more than 3 of the 10 can run at once
86        sema = threading.BoundedSemaphore(value=3)
87        mutex = threading.RLock()
88        numrunning = Counter()
89
90        threads = []
91
92        for i in range(NUMTASKS):
93            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
94            threads.append(t)
95            self.assertIsNone(t.ident)
96            self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
97            t.start()
98
99        if verbose:
100            print 'waiting for all tasks to complete'
101        for t in threads:
102            t.join(NUMTASKS)
103            self.assertFalse(t.is_alive())
104            self.assertNotEqual(t.ident, 0)
105            self.assertIsNotNone(t.ident)
106            self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
107        if verbose:
108            print 'all tasks done'
109        self.assertEqual(numrunning.get(), 0)
110
111    def test_ident_of_no_threading_threads(self):
112        # The ident still must work for the main thread and dummy threads.
113        self.assertIsNotNone(threading.currentThread().ident)
114        def f():
115            ident.append(threading.currentThread().ident)
116            done.set()
117        done = threading.Event()
118        ident = []
119        thread.start_new_thread(f, ())
120        done.wait()
121        self.assertIsNotNone(ident[0])
122        # Kill the "immortal" _DummyThread
123        del threading._active[ident[0]]
124
125    # run with a small(ish) thread stack size (256kB)
126    def test_various_ops_small_stack(self):
127        if verbose:
128            print 'with 256kB thread stack size...'
129        try:
130            threading.stack_size(262144)
131        except thread.error:
132            self.skipTest('platform does not support changing thread stack size')
133        self.test_various_ops()
134        threading.stack_size(0)
135
136    # run with a large thread stack size (1MB)
137    def test_various_ops_large_stack(self):
138        if verbose:
139            print 'with 1MB thread stack size...'
140        try:
141            threading.stack_size(0x100000)
142        except thread.error:
143            self.skipTest('platform does not support changing thread stack size')
144        self.test_various_ops()
145        threading.stack_size(0)
146
147    def test_foreign_thread(self):
148        # Check that a "foreign" thread can use the threading module.
149        def f(mutex):
150            # Calling current_thread() forces an entry for the foreign
151            # thread to get made in the threading._active map.
152            threading.current_thread()
153            mutex.release()
154
155        mutex = threading.Lock()
156        mutex.acquire()
157        tid = thread.start_new_thread(f, (mutex,))
158        # Wait for the thread to finish.
159        mutex.acquire()
160        self.assertIn(tid, threading._active)
161        self.assertIsInstance(threading._active[tid], threading._DummyThread)
162        del threading._active[tid]
163
164    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
165    # exposed at the Python level.  This test relies on ctypes to get at it.
166    def test_PyThreadState_SetAsyncExc(self):
167        try:
168            import ctypes
169        except ImportError:
170            self.skipTest('requires ctypes')
171
172        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
173
174        class AsyncExc(Exception):
175            pass
176
177        exception = ctypes.py_object(AsyncExc)
178
179        # First check it works when setting the exception from the same thread.
180        tid = thread.get_ident()
181
182        try:
183            result = set_async_exc(ctypes.c_long(tid), exception)
184            # The exception is async, so we might have to keep the VM busy until
185            # it notices.
186            while True:
187                pass
188        except AsyncExc:
189            pass
190        else:
191            # This code is unreachable but it reflects the intent. If we wanted
192            # to be smarter the above loop wouldn't be infinite.
193            self.fail("AsyncExc not raised")
194        try:
195            self.assertEqual(result, 1) # one thread state modified
196        except UnboundLocalError:
197            # The exception was raised too quickly for us to get the result.
198            pass
199
200        # `worker_started` is set by the thread when it's inside a try/except
201        # block waiting to catch the asynchronously set AsyncExc exception.
202        # `worker_saw_exception` is set by the thread upon catching that
203        # exception.
204        worker_started = threading.Event()
205        worker_saw_exception = threading.Event()
206
207        class Worker(threading.Thread):
208            def run(self):
209                self.id = thread.get_ident()
210                self.finished = False
211
212                try:
213                    while True:
214                        worker_started.set()
215                        time.sleep(0.1)
216                except AsyncExc:
217                    self.finished = True
218                    worker_saw_exception.set()
219
220        t = Worker()
221        t.daemon = True # so if this fails, we don't hang Python at shutdown
222        t.start()
223        if verbose:
224            print "    started worker thread"
225
226        # Try a thread id that doesn't make sense.
227        if verbose:
228            print "    trying nonsensical thread id"
229        result = set_async_exc(ctypes.c_long(-1), exception)
230        self.assertEqual(result, 0)  # no thread states modified
231
232        # Now raise an exception in the worker thread.
233        if verbose:
234            print "    waiting for worker thread to get started"
235        ret = worker_started.wait()
236        self.assertTrue(ret)
237        if verbose:
238            print "    verifying worker hasn't exited"
239        self.assertFalse(t.finished)
240        if verbose:
241            print "    attempting to raise asynch exception in worker"
242        result = set_async_exc(ctypes.c_long(t.id), exception)
243        self.assertEqual(result, 1) # one thread state modified
244        if verbose:
245            print "    waiting for worker to say it caught the exception"
246        worker_saw_exception.wait(timeout=10)
247        self.assertTrue(t.finished)
248        if verbose:
249            print "    all OK -- joining worker"
250        if t.finished:
251            t.join()
252        # else the thread is still running, and we have no way to kill it
253
254    def test_limbo_cleanup(self):
255        # Issue 7481: Failure to start thread should cleanup the limbo map.
256        def fail_new_thread(*args):
257            raise thread.error()
258        _start_new_thread = threading._start_new_thread
259        threading._start_new_thread = fail_new_thread
260        try:
261            t = threading.Thread(target=lambda: None)
262            self.assertRaises(thread.error, t.start)
263            self.assertFalse(
264                t in threading._limbo,
265                "Failed to cleanup _limbo map on failure of Thread.start().")
266        finally:
267            threading._start_new_thread = _start_new_thread
268
269    def test_finalize_runnning_thread(self):
270        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
271        # very late on python exit: on deallocation of a running thread for
272        # example.
273        try:
274            import ctypes
275        except ImportError:
276            self.skipTest('requires ctypes')
277
278        rc = subprocess.call([sys.executable, "-c", """if 1:
279            import ctypes, sys, time, thread
280
281            # This lock is used as a simple event variable.
282            ready = thread.allocate_lock()
283            ready.acquire()
284
285            # Module globals are cleared before __del__ is run
286            # So we save the functions in class dict
287            class C:
288                ensure = ctypes.pythonapi.PyGILState_Ensure
289                release = ctypes.pythonapi.PyGILState_Release
290                def __del__(self):
291                    state = self.ensure()
292                    self.release(state)
293
294            def waitingThread():
295                x = C()
296                ready.release()
297                time.sleep(100)
298
299            thread.start_new_thread(waitingThread, ())
300            ready.acquire()  # Be sure the other thread is waiting.
301            sys.exit(42)
302            """])
303        self.assertEqual(rc, 42)
304
305    def test_finalize_with_trace(self):
306        # Issue1733757
307        # Avoid a deadlock when sys.settrace steps into threading._shutdown
308        p = subprocess.Popen([sys.executable, "-c", """if 1:
309            import sys, threading
310
311            # A deadlock-killer, to prevent the
312            # testsuite to hang forever
313            def killer():
314                import os, time
315                time.sleep(2)
316                print 'program blocked; aborting'
317                os._exit(2)
318            t = threading.Thread(target=killer)
319            t.daemon = True
320            t.start()
321
322            # This is the trace function
323            def func(frame, event, arg):
324                threading.current_thread()
325                return func
326
327            sys.settrace(func)
328            """],
329            stdout=subprocess.PIPE,
330            stderr=subprocess.PIPE)
331        self.addCleanup(p.stdout.close)
332        self.addCleanup(p.stderr.close)
333        stdout, stderr = p.communicate()
334        rc = p.returncode
335        self.assertFalse(rc == 2, "interpreted was blocked")
336        self.assertTrue(rc == 0,
337                        "Unexpected error: " + repr(stderr))
338
339    def test_join_nondaemon_on_shutdown(self):
340        # Issue 1722344
341        # Raising SystemExit skipped threading._shutdown
342        p = subprocess.Popen([sys.executable, "-c", """if 1:
343                import threading
344                from time import sleep
345
346                def child():
347                    sleep(1)
348                    # As a non-daemon thread we SHOULD wake up and nothing
349                    # should be torn down yet
350                    print "Woke up, sleep function is:", sleep
351
352                threading.Thread(target=child).start()
353                raise SystemExit
354            """],
355            stdout=subprocess.PIPE,
356            stderr=subprocess.PIPE)
357        self.addCleanup(p.stdout.close)
358        self.addCleanup(p.stderr.close)
359        stdout, stderr = p.communicate()
360        self.assertEqual(stdout.strip(),
361            "Woke up, sleep function is: <built-in function sleep>")
362        stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
363        self.assertEqual(stderr, "")
364
365    def test_enumerate_after_join(self):
366        # Try hard to trigger #1703448: a thread is still returned in
367        # threading.enumerate() after it has been join()ed.
368        enum = threading.enumerate
369        old_interval = sys.getcheckinterval()
370        try:
371            for i in xrange(1, 100):
372                # Try a couple times at each thread-switching interval
373                # to get more interleavings.
374                sys.setcheckinterval(i // 5)
375                t = threading.Thread(target=lambda: None)
376                t.start()
377                t.join()
378                l = enum()
379                self.assertNotIn(t, l,
380                    "#1703448 triggered after %d trials: %s" % (i, l))
381        finally:
382            sys.setcheckinterval(old_interval)
383
384    def test_no_refcycle_through_target(self):
385        class RunSelfFunction(object):
386            def __init__(self, should_raise):
387                # The links in this refcycle from Thread back to self
388                # should be cleaned up when the thread completes.
389                self.should_raise = should_raise
390                self.thread = threading.Thread(target=self._run,
391                                               args=(self,),
392                                               kwargs={'yet_another':self})
393                self.thread.start()
394
395            def _run(self, other_ref, yet_another):
396                if self.should_raise:
397                    raise SystemExit
398
399        cyclic_object = RunSelfFunction(should_raise=False)
400        weak_cyclic_object = weakref.ref(cyclic_object)
401        cyclic_object.thread.join()
402        del cyclic_object
403        self.assertEqual(None, weak_cyclic_object(),
404                         msg=('%d references still around' %
405                              sys.getrefcount(weak_cyclic_object())))
406
407        raising_cyclic_object = RunSelfFunction(should_raise=True)
408        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
409        raising_cyclic_object.thread.join()
410        del raising_cyclic_object
411        self.assertEqual(None, weak_raising_cyclic_object(),
412                         msg=('%d references still around' %
413                              sys.getrefcount(weak_raising_cyclic_object())))
414
415    @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
416    def test_dummy_thread_after_fork(self):
417        # Issue #14308: a dummy thread in the active list doesn't mess up
418        # the after-fork mechanism.
419        code = """if 1:
420            import thread, threading, os, time
421
422            def background_thread(evt):
423                # Creates and registers the _DummyThread instance
424                threading.current_thread()
425                evt.set()
426                time.sleep(10)
427
428            evt = threading.Event()
429            thread.start_new_thread(background_thread, (evt,))
430            evt.wait()
431            assert threading.active_count() == 2, threading.active_count()
432            if os.fork() == 0:
433                assert threading.active_count() == 1, threading.active_count()
434                os._exit(0)
435            else:
436                os.wait()
437        """
438        _, out, err = assert_python_ok("-c", code)
439        self.assertEqual(out, '')
440        self.assertEqual(err, '')
441
442    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
443    def test_is_alive_after_fork(self):
444        # Try hard to trigger #18418: is_alive() could sometimes be True on
445        # threads that vanished after a fork.
446        old_interval = sys.getcheckinterval()
447
448        # Make the bug more likely to manifest.
449        sys.setcheckinterval(10)
450
451        try:
452            for i in range(20):
453                t = threading.Thread(target=lambda: None)
454                t.start()
455                pid = os.fork()
456                if pid == 0:
457                    os._exit(1 if t.is_alive() else 0)
458                else:
459                    t.join()
460                    pid, status = os.waitpid(pid, 0)
461                    self.assertEqual(0, status)
462        finally:
463            sys.setcheckinterval(old_interval)
464
465    def test_BoundedSemaphore_limit(self):
466        # BoundedSemaphore should raise ValueError if released too often.
467        for limit in range(1, 10):
468            bs = threading.BoundedSemaphore(limit)
469            threads = [threading.Thread(target=bs.acquire)
470                       for _ in range(limit)]
471            for t in threads:
472                t.start()
473            for t in threads:
474                t.join()
475            threads = [threading.Thread(target=bs.release)
476                       for _ in range(limit)]
477            for t in threads:
478                t.start()
479            for t in threads:
480                t.join()
481            self.assertRaises(ValueError, bs.release)
482
483class ThreadJoinOnShutdown(BaseTestCase):
484
485    # Between fork() and exec(), only async-safe functions are allowed (issues
486    # #12316 and #11870), and fork() from a worker thread is known to trigger
487    # problems with some operating systems (issue #3863): skip problematic tests
488    # on platforms known to behave badly.
489    platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
490                         'os2emx')
491
492    def _run_and_join(self, script):
493        script = """if 1:
494            import sys, os, time, threading
495
496            # a thread, which waits for the main program to terminate
497            def joiningfunc(mainthread):
498                mainthread.join()
499                print 'end of thread'
500        \n""" + script
501
502        p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
503        rc = p.wait()
504        data = p.stdout.read().replace('\r', '')
505        p.stdout.close()
506        self.assertEqual(data, "end of main\nend of thread\n")
507        self.assertFalse(rc == 2, "interpreter was blocked")
508        self.assertTrue(rc == 0, "Unexpected error")
509
510    def test_1_join_on_shutdown(self):
511        # The usual case: on exit, wait for a non-daemon thread
512        script = """if 1:
513            import os
514            t = threading.Thread(target=joiningfunc,
515                                 args=(threading.current_thread(),))
516            t.start()
517            time.sleep(0.1)
518            print 'end of main'
519            """
520        self._run_and_join(script)
521
522
523    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
524    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
525    def test_2_join_in_forked_process(self):
526        # Like the test above, but from a forked interpreter
527        script = """if 1:
528            childpid = os.fork()
529            if childpid != 0:
530                os.waitpid(childpid, 0)
531                sys.exit(0)
532
533            t = threading.Thread(target=joiningfunc,
534                                 args=(threading.current_thread(),))
535            t.start()
536            print 'end of main'
537            """
538        self._run_and_join(script)
539
540    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
541    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
542    def test_3_join_in_forked_from_thread(self):
543        # Like the test above, but fork() was called from a worker thread
544        # In the forked process, the main Thread object must be marked as stopped.
545        script = """if 1:
546            main_thread = threading.current_thread()
547            def worker():
548                childpid = os.fork()
549                if childpid != 0:
550                    os.waitpid(childpid, 0)
551                    sys.exit(0)
552
553                t = threading.Thread(target=joiningfunc,
554                                     args=(main_thread,))
555                print 'end of main'
556                t.start()
557                t.join() # Should not block: main_thread is already stopped
558
559            w = threading.Thread(target=worker)
560            w.start()
561            """
562        self._run_and_join(script)
563
564    def assertScriptHasOutput(self, script, expected_output):
565        p = subprocess.Popen([sys.executable, "-c", script],
566                             stdout=subprocess.PIPE)
567        rc = p.wait()
568        data = p.stdout.read().decode().replace('\r', '')
569        self.assertEqual(rc, 0, "Unexpected error")
570        self.assertEqual(data, expected_output)
571
572    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
573    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
574    def test_4_joining_across_fork_in_worker_thread(self):
575        # There used to be a possible deadlock when forking from a child
576        # thread.  See http://bugs.python.org/issue6643.
577
578        # The script takes the following steps:
579        # - The main thread in the parent process starts a new thread and then
580        #   tries to join it.
581        # - The join operation acquires the Lock inside the thread's _block
582        #   Condition.  (See threading.py:Thread.join().)
583        # - We stub out the acquire method on the condition to force it to wait
584        #   until the child thread forks.  (See LOCK ACQUIRED HERE)
585        # - The child thread forks.  (See LOCK HELD and WORKER THREAD FORKS
586        #   HERE)
587        # - The main thread of the parent process enters Condition.wait(),
588        #   which releases the lock on the child thread.
589        # - The child process returns.  Without the necessary fix, when the
590        #   main thread of the child process (which used to be the child thread
591        #   in the parent process) attempts to exit, it will try to acquire the
592        #   lock in the Thread._block Condition object and hang, because the
593        #   lock was held across the fork.
594
595        script = """if 1:
596            import os, time, threading
597
598            finish_join = False
599            start_fork = False
600
601            def worker():
602                # Wait until this thread's lock is acquired before forking to
603                # create the deadlock.
604                global finish_join
605                while not start_fork:
606                    time.sleep(0.01)
607                # LOCK HELD: Main thread holds lock across this call.
608                childpid = os.fork()
609                finish_join = True
610                if childpid != 0:
611                    # Parent process just waits for child.
612                    os.waitpid(childpid, 0)
613                # Child process should just return.
614
615            w = threading.Thread(target=worker)
616
617            # Stub out the private condition variable's lock acquire method.
618            # This acquires the lock and then waits until the child has forked
619            # before returning, which will release the lock soon after.  If
620            # someone else tries to fix this test case by acquiring this lock
621            # before forking instead of resetting it, the test case will
622            # deadlock when it shouldn't.
623            condition = w._block
624            orig_acquire = condition.acquire
625            call_count_lock = threading.Lock()
626            call_count = 0
627            def my_acquire():
628                global call_count
629                global start_fork
630                orig_acquire()  # LOCK ACQUIRED HERE
631                start_fork = True
632                if call_count == 0:
633                    while not finish_join:
634                        time.sleep(0.01)  # WORKER THREAD FORKS HERE
635                with call_count_lock:
636                    call_count += 1
637            condition.acquire = my_acquire
638
639            w.start()
640            w.join()
641            print('end of main')
642            """
643        self.assertScriptHasOutput(script, "end of main\n")
644
645    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
646    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
647    def test_5_clear_waiter_locks_to_avoid_crash(self):
648        # Check that a spawned thread that forks doesn't segfault on certain
649        # platforms, namely OS X.  This used to happen if there was a waiter
650        # lock in the thread's condition variable's waiters list.  Even though
651        # we know the lock will be held across the fork, it is not safe to
652        # release locks held across forks on all platforms, so releasing the
653        # waiter lock caused a segfault on OS X.  Furthermore, since locks on
654        # OS X are (as of this writing) implemented with a mutex + condition
655        # variable instead of a semaphore, while we know that the Python-level
656        # lock will be acquired, we can't know if the internal mutex will be
657        # acquired at the time of the fork.
658
659        script = """if True:
660            import os, time, threading
661
662            start_fork = False
663
664            def worker():
665                # Wait until the main thread has attempted to join this thread
666                # before continuing.
667                while not start_fork:
668                    time.sleep(0.01)
669                childpid = os.fork()
670                if childpid != 0:
671                    # Parent process just waits for child.
672                    (cpid, rc) = os.waitpid(childpid, 0)
673                    assert cpid == childpid
674                    assert rc == 0
675                    print('end of worker thread')
676                else:
677                    # Child process should just return.
678                    pass
679
680            w = threading.Thread(target=worker)
681
682            # Stub out the private condition variable's _release_save method.
683            # This releases the condition's lock and flips the global that
684            # causes the worker to fork.  At this point, the problematic waiter
685            # lock has been acquired once by the waiter and has been put onto
686            # the waiters list.
687            condition = w._block
688            orig_release_save = condition._release_save
689            def my_release_save():
690                global start_fork
691                orig_release_save()
692                # Waiter lock held here, condition lock released.
693                start_fork = True
694            condition._release_save = my_release_save
695
696            w.start()
697            w.join()
698            print('end of main thread')
699            """
700        output = "end of worker thread\nend of main thread\n"
701        self.assertScriptHasOutput(script, output)
702
703    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
704    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
705    def test_reinit_tls_after_fork(self):
706        # Issue #13817: fork() would deadlock in a multithreaded program with
707        # the ad-hoc TLS implementation.
708
709        def do_fork_and_wait():
710            # just fork a child process and wait it
711            pid = os.fork()
712            if pid > 0:
713                os.waitpid(pid, 0)
714            else:
715                os._exit(0)
716
717        # start a bunch of threads that will fork() child processes
718        threads = []
719        for i in range(16):
720            t = threading.Thread(target=do_fork_and_wait)
721            threads.append(t)
722            t.start()
723
724        for t in threads:
725            t.join()
726
727    @cpython_only
728    @unittest.skipIf(_testcapi is None, "need _testcapi module")
729    def test_frame_tstate_tracing(self):
730        # Issue #14432: Crash when a generator is created in a C thread that is
731        # destroyed while the generator is still used. The issue was that a
732        # generator contains a frame, and the frame kept a reference to the
733        # Python state of the destroyed C thread. The crash occurs when a trace
734        # function is setup.
735
736        def noop_trace(frame, event, arg):
737            # no operation
738            return noop_trace
739
740        def generator():
741            while 1:
742                yield "generator"
743
744        def callback():
745            if callback.gen is None:
746                callback.gen = generator()
747            return next(callback.gen)
748        callback.gen = None
749
750        old_trace = sys.gettrace()
751        sys.settrace(noop_trace)
752        try:
753            # Install a trace function
754            threading.settrace(noop_trace)
755
756            # Create a generator in a C thread which exits after the call
757            _testcapi.call_in_temporary_c_thread(callback)
758
759            # Call the generator in a different Python thread, check that the
760            # generator didn't keep a reference to the destroyed thread state
761            for test in range(3):
762                # The trace function is still called here
763                callback()
764        finally:
765            sys.settrace(old_trace)
766
767
768class ThreadingExceptionTests(BaseTestCase):
769    # A RuntimeError should be raised if Thread.start() is called
770    # multiple times.
771    def test_start_thread_again(self):
772        thread = threading.Thread()
773        thread.start()
774        self.assertRaises(RuntimeError, thread.start)
775
776    def test_joining_current_thread(self):
777        current_thread = threading.current_thread()
778        self.assertRaises(RuntimeError, current_thread.join);
779
780    def test_joining_inactive_thread(self):
781        thread = threading.Thread()
782        self.assertRaises(RuntimeError, thread.join)
783
784    def test_daemonize_active_thread(self):
785        thread = threading.Thread()
786        thread.start()
787        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
788
789    def test_print_exception(self):
790        script = r"""if 1:
791            import threading
792            import time
793
794            running = False
795            def run():
796                global running
797                running = True
798                while running:
799                    time.sleep(0.01)
800                1.0/0.0
801            t = threading.Thread(target=run)
802            t.start()
803            while not running:
804                time.sleep(0.01)
805            running = False
806            t.join()
807            """
808        rc, out, err = assert_python_ok("-c", script)
809        self.assertEqual(out, '')
810        self.assertIn("Exception in thread", err)
811        self.assertIn("Traceback (most recent call last):", err)
812        self.assertIn("ZeroDivisionError", err)
813        self.assertNotIn("Unhandled exception", err)
814
815    def test_print_exception_stderr_is_none_1(self):
816        script = r"""if 1:
817            import sys
818            import threading
819            import time
820
821            running = False
822            def run():
823                global running
824                running = True
825                while running:
826                    time.sleep(0.01)
827                1.0/0.0
828            t = threading.Thread(target=run)
829            t.start()
830            while not running:
831                time.sleep(0.01)
832            sys.stderr = None
833            running = False
834            t.join()
835            """
836        rc, out, err = assert_python_ok("-c", script)
837        self.assertEqual(out, '')
838        self.assertIn("Exception in thread", err)
839        self.assertIn("Traceback (most recent call last):", err)
840        self.assertIn("ZeroDivisionError", err)
841        self.assertNotIn("Unhandled exception", err)
842
843    def test_print_exception_stderr_is_none_2(self):
844        script = r"""if 1:
845            import sys
846            import threading
847            import time
848
849            running = False
850            def run():
851                global running
852                running = True
853                while running:
854                    time.sleep(0.01)
855                1.0/0.0
856            sys.stderr = None
857            t = threading.Thread(target=run)
858            t.start()
859            while not running:
860                time.sleep(0.01)
861            running = False
862            t.join()
863            """
864        rc, out, err = assert_python_ok("-c", script)
865        self.assertEqual(out, '')
866        self.assertNotIn("Unhandled exception", err)
867
868
869class LockTests(lock_tests.LockTests):
870    locktype = staticmethod(threading.Lock)
871
872class RLockTests(lock_tests.RLockTests):
873    locktype = staticmethod(threading.RLock)
874
875class EventTests(lock_tests.EventTests):
876    eventtype = staticmethod(threading.Event)
877
878class ConditionAsRLockTests(lock_tests.RLockTests):
879    # Condition uses an RLock by default and exports its API.
880    locktype = staticmethod(threading.Condition)
881
882class ConditionTests(lock_tests.ConditionTests):
883    condtype = staticmethod(threading.Condition)
884
885class SemaphoreTests(lock_tests.SemaphoreTests):
886    semtype = staticmethod(threading.Semaphore)
887
888class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
889    semtype = staticmethod(threading.BoundedSemaphore)
890
891    @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
892    def test_recursion_limit(self):
893        # Issue 9670
894        # test that excessive recursion within a non-main thread causes
895        # an exception rather than crashing the interpreter on platforms
896        # like Mac OS X or FreeBSD which have small default stack sizes
897        # for threads
898        script = """if True:
899            import threading
900
901            def recurse():
902                return recurse()
903
904            def outer():
905                try:
906                    recurse()
907                except RuntimeError:
908                    pass
909
910            w = threading.Thread(target=outer)
911            w.start()
912            w.join()
913            print('end of main thread')
914            """
915        expected_output = "end of main thread\n"
916        p = subprocess.Popen([sys.executable, "-c", script],
917                             stdout=subprocess.PIPE)
918        stdout, stderr = p.communicate()
919        data = stdout.decode().replace('\r', '')
920        self.assertEqual(p.returncode, 0, "Unexpected error")
921        self.assertEqual(data, expected_output)
922
923def test_main():
924    test.test_support.run_unittest(LockTests, RLockTests, EventTests,
925                                   ConditionAsRLockTests, ConditionTests,
926                                   SemaphoreTests, BoundedSemaphoreTests,
927                                   ThreadTests,
928                                   ThreadJoinOnShutdown,
929                                   ThreadingExceptionTests,
930                                   )
931
932if __name__ == "__main__":
933    test_main()
934