• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import unittest
3import random
4from test import support
5from test.support import threading_helper
6import _thread as thread
7import time
8import warnings
9import weakref
10
11from test import lock_tests
12
13threading_helper.requires_working_threading(module=True)
14
15NUMTASKS = 10
16NUMTRIPS = 3
17
18_print_mutex = thread.allocate_lock()
19
20def verbose_print(arg):
21    """Helper function for printing out debugging output."""
22    if support.verbose:
23        with _print_mutex:
24            print(arg)
25
26
27class BasicThreadTest(unittest.TestCase):
28
29    def setUp(self):
30        self.done_mutex = thread.allocate_lock()
31        self.done_mutex.acquire()
32        self.running_mutex = thread.allocate_lock()
33        self.random_mutex = thread.allocate_lock()
34        self.created = 0
35        self.running = 0
36        self.next_ident = 0
37
38        key = threading_helper.threading_setup()
39        self.addCleanup(threading_helper.threading_cleanup, *key)
40
41
42class ThreadRunningTests(BasicThreadTest):
43
44    def newtask(self):
45        with self.running_mutex:
46            self.next_ident += 1
47            verbose_print("creating task %s" % self.next_ident)
48            thread.start_new_thread(self.task, (self.next_ident,))
49            self.created += 1
50            self.running += 1
51
52    def task(self, ident):
53        with self.random_mutex:
54            delay = random.random() / 10000.0
55        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
56        time.sleep(delay)
57        verbose_print("task %s done" % ident)
58        with self.running_mutex:
59            self.running -= 1
60            if self.created == NUMTASKS and self.running == 0:
61                self.done_mutex.release()
62
63    def test_starting_threads(self):
64        with threading_helper.wait_threads_exit():
65            # Basic test for thread creation.
66            for i in range(NUMTASKS):
67                self.newtask()
68            verbose_print("waiting for tasks to complete...")
69            self.done_mutex.acquire()
70            verbose_print("all tasks done")
71
72    def test_stack_size(self):
73        # Various stack size tests.
74        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
75
76        thread.stack_size(0)
77        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
78
79    @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
80    def test_nt_and_posix_stack_size(self):
81        try:
82            thread.stack_size(4096)
83        except ValueError:
84            verbose_print("caught expected ValueError setting "
85                            "stack_size(4096)")
86        except thread.error:
87            self.skipTest("platform does not support changing thread stack "
88                          "size")
89
90        fail_msg = "stack_size(%d) failed - should succeed"
91        for tss in (262144, 0x100000, 0):
92            thread.stack_size(tss)
93            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
94            verbose_print("successfully set stack_size(%d)" % tss)
95
96        for tss in (262144, 0x100000):
97            verbose_print("trying stack_size = (%d)" % tss)
98            self.next_ident = 0
99            self.created = 0
100            with threading_helper.wait_threads_exit():
101                for i in range(NUMTASKS):
102                    self.newtask()
103
104                verbose_print("waiting for all tasks to complete")
105                self.done_mutex.acquire()
106                verbose_print("all tasks done")
107
108        thread.stack_size(0)
109
110    def test__count(self):
111        # Test the _count() function.
112        orig = thread._count()
113        mut = thread.allocate_lock()
114        mut.acquire()
115        started = []
116
117        def task():
118            started.append(None)
119            mut.acquire()
120            mut.release()
121
122        with threading_helper.wait_threads_exit():
123            thread.start_new_thread(task, ())
124            for _ in support.sleeping_retry(support.LONG_TIMEOUT):
125                if started:
126                    break
127            self.assertEqual(thread._count(), orig + 1)
128
129            # Allow the task to finish.
130            mut.release()
131
132            # The only reliable way to be sure that the thread ended from the
133            # interpreter's point of view is to wait for the function object to
134            # be destroyed.
135            done = []
136            wr = weakref.ref(task, lambda _: done.append(None))
137            del task
138
139            for _ in support.sleeping_retry(support.LONG_TIMEOUT):
140                if done:
141                    break
142                support.gc_collect()  # For PyPy or other GCs.
143            self.assertEqual(thread._count(), orig)
144
145    def test_unraisable_exception(self):
146        def task():
147            started.release()
148            raise ValueError("task failed")
149
150        started = thread.allocate_lock()
151        with support.catch_unraisable_exception() as cm:
152            with threading_helper.wait_threads_exit():
153                started.acquire()
154                thread.start_new_thread(task, ())
155                started.acquire()
156
157            self.assertEqual(str(cm.unraisable.exc_value), "task failed")
158            self.assertIsNone(cm.unraisable.object)
159            self.assertEqual(cm.unraisable.err_msg,
160                             f"Exception ignored in thread started by {task!r}")
161            self.assertIsNotNone(cm.unraisable.exc_traceback)
162
163    def test_join_thread(self):
164        finished = []
165
166        def task():
167            time.sleep(0.05)
168            finished.append(thread.get_ident())
169
170        with threading_helper.wait_threads_exit():
171            handle = thread.start_joinable_thread(task)
172            handle.join()
173            self.assertEqual(len(finished), 1)
174            self.assertEqual(handle.ident, finished[0])
175
176    def test_join_thread_already_exited(self):
177        def task():
178            pass
179
180        with threading_helper.wait_threads_exit():
181            handle = thread.start_joinable_thread(task)
182            time.sleep(0.05)
183            handle.join()
184
185    def test_join_several_times(self):
186        def task():
187            pass
188
189        with threading_helper.wait_threads_exit():
190            handle = thread.start_joinable_thread(task)
191            handle.join()
192            # Subsequent join() calls should succeed
193            handle.join()
194
195    def test_joinable_not_joined(self):
196        handle_destroyed = thread.allocate_lock()
197        handle_destroyed.acquire()
198
199        def task():
200            handle_destroyed.acquire()
201
202        with threading_helper.wait_threads_exit():
203            handle = thread.start_joinable_thread(task)
204            del handle
205            handle_destroyed.release()
206
207    def test_join_from_self(self):
208        errors = []
209        handles = []
210        start_joinable_thread_returned = thread.allocate_lock()
211        start_joinable_thread_returned.acquire()
212        task_tried_to_join = thread.allocate_lock()
213        task_tried_to_join.acquire()
214
215        def task():
216            start_joinable_thread_returned.acquire()
217            try:
218                handles[0].join()
219            except Exception as e:
220                errors.append(e)
221            finally:
222                task_tried_to_join.release()
223
224        with threading_helper.wait_threads_exit():
225            handle = thread.start_joinable_thread(task)
226            handles.append(handle)
227            start_joinable_thread_returned.release()
228            # Can still join after joining failed in other thread
229            task_tried_to_join.acquire()
230            handle.join()
231
232        assert len(errors) == 1
233        with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
234            raise errors[0]
235
236    def test_join_then_self_join(self):
237        # make sure we can't deadlock in the following scenario with
238        # threads t0 and t1 (see comment in `ThreadHandle_join()` for more
239        # details):
240        #
241        # - t0 joins t1
242        # - t1 self joins
243        def make_lock():
244            lock = thread.allocate_lock()
245            lock.acquire()
246            return lock
247
248        error = None
249        self_joiner_handle = None
250        self_joiner_started = make_lock()
251        self_joiner_barrier = make_lock()
252        def self_joiner():
253            nonlocal error
254
255            self_joiner_started.release()
256            self_joiner_barrier.acquire()
257
258            try:
259                self_joiner_handle.join()
260            except Exception as e:
261                error = e
262
263        joiner_started = make_lock()
264        def joiner():
265            joiner_started.release()
266            self_joiner_handle.join()
267
268        with threading_helper.wait_threads_exit():
269            self_joiner_handle = thread.start_joinable_thread(self_joiner)
270            # Wait for the self-joining thread to start
271            self_joiner_started.acquire()
272
273            # Start the thread that joins the self-joiner
274            joiner_handle = thread.start_joinable_thread(joiner)
275
276            # Wait for the joiner to start
277            joiner_started.acquire()
278
279            # Not great, but I don't think there's a deterministic way to make
280            # sure that the self-joining thread has been joined.
281            time.sleep(0.1)
282
283            # Unblock the self-joiner
284            self_joiner_barrier.release()
285
286            self_joiner_handle.join()
287            joiner_handle.join()
288
289            with self.assertRaisesRegex(RuntimeError, "Cannot join current thread"):
290                raise error
291
292    def test_join_with_timeout(self):
293        lock = thread.allocate_lock()
294        lock.acquire()
295
296        def thr():
297            lock.acquire()
298
299        with threading_helper.wait_threads_exit():
300            handle = thread.start_joinable_thread(thr)
301            handle.join(0.1)
302            self.assertFalse(handle.is_done())
303            lock.release()
304            handle.join()
305            self.assertTrue(handle.is_done())
306
307    def test_join_unstarted(self):
308        handle = thread._ThreadHandle()
309        with self.assertRaisesRegex(RuntimeError, "thread not started"):
310            handle.join()
311
312    def test_set_done_unstarted(self):
313        handle = thread._ThreadHandle()
314        with self.assertRaisesRegex(RuntimeError, "thread not started"):
315            handle._set_done()
316
317    def test_start_duplicate_handle(self):
318        lock = thread.allocate_lock()
319        lock.acquire()
320
321        def func():
322            lock.acquire()
323
324        handle = thread._ThreadHandle()
325        with threading_helper.wait_threads_exit():
326            thread.start_joinable_thread(func, handle=handle)
327            with self.assertRaisesRegex(RuntimeError, "thread already started"):
328                thread.start_joinable_thread(func, handle=handle)
329            lock.release()
330            handle.join()
331
332    def test_start_with_none_handle(self):
333        def func():
334            pass
335
336        with threading_helper.wait_threads_exit():
337            handle = thread.start_joinable_thread(func, handle=None)
338            handle.join()
339
340
341class Barrier:
342    def __init__(self, num_threads):
343        self.num_threads = num_threads
344        self.waiting = 0
345        self.checkin_mutex  = thread.allocate_lock()
346        self.checkout_mutex = thread.allocate_lock()
347        self.checkout_mutex.acquire()
348
349    def enter(self):
350        self.checkin_mutex.acquire()
351        self.waiting = self.waiting + 1
352        if self.waiting == self.num_threads:
353            self.waiting = self.num_threads - 1
354            self.checkout_mutex.release()
355            return
356        self.checkin_mutex.release()
357
358        self.checkout_mutex.acquire()
359        self.waiting = self.waiting - 1
360        if self.waiting == 0:
361            self.checkin_mutex.release()
362            return
363        self.checkout_mutex.release()
364
365
366class BarrierTest(BasicThreadTest):
367
368    def test_barrier(self):
369        with threading_helper.wait_threads_exit():
370            self.bar = Barrier(NUMTASKS)
371            self.running = NUMTASKS
372            for i in range(NUMTASKS):
373                thread.start_new_thread(self.task2, (i,))
374            verbose_print("waiting for tasks to end")
375            self.done_mutex.acquire()
376            verbose_print("tasks done")
377
378    def task2(self, ident):
379        for i in range(NUMTRIPS):
380            if ident == 0:
381                # give it a good chance to enter the next
382                # barrier before the others are all out
383                # of the current one
384                delay = 0
385            else:
386                with self.random_mutex:
387                    delay = random.random() / 10000.0
388            verbose_print("task %s will run for %sus" %
389                          (ident, round(delay * 1e6)))
390            time.sleep(delay)
391            verbose_print("task %s entering %s" % (ident, i))
392            self.bar.enter()
393            verbose_print("task %s leaving barrier" % ident)
394        with self.running_mutex:
395            self.running -= 1
396            # Must release mutex before releasing done, else the main thread can
397            # exit and set mutex to None as part of global teardown; then
398            # mutex.release() raises AttributeError.
399            finished = self.running == 0
400        if finished:
401            self.done_mutex.release()
402
403class LockTests(lock_tests.LockTests):
404    locktype = thread.allocate_lock
405
406
407class TestForkInThread(unittest.TestCase):
408    def setUp(self):
409        self.read_fd, self.write_fd = os.pipe()
410
411    @support.requires_fork()
412    @threading_helper.reap_threads
413    def test_forkinthread(self):
414        pid = None
415
416        def fork_thread(read_fd, write_fd):
417            nonlocal pid
418
419            # Ignore the warning about fork with threads.
420            with warnings.catch_warnings(category=DeprecationWarning,
421                                         action="ignore"):
422                # fork in a thread (DANGER, undefined per POSIX)
423                if (pid := os.fork()):
424                    # parent process
425                    return
426
427            # child process
428            try:
429                os.close(read_fd)
430                os.write(write_fd, b"OK")
431            finally:
432                os._exit(0)
433
434        with threading_helper.wait_threads_exit():
435            thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
436            self.assertEqual(os.read(self.read_fd, 2), b"OK")
437            os.close(self.write_fd)
438
439        self.assertIsNotNone(pid)
440        support.wait_process(pid, exitcode=0)
441
442    def tearDown(self):
443        try:
444            os.close(self.read_fd)
445        except OSError:
446            pass
447
448        try:
449            os.close(self.write_fd)
450        except OSError:
451            pass
452
453
454if __name__ == "__main__":
455    unittest.main()
456