• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import unittest
3import random
4from test import support
5import _thread as thread
6import time
7import sys
8import weakref
9
10from test import lock_tests
11
12NUMTASKS = 10
13NUMTRIPS = 3
14POLL_SLEEP = 0.010 # seconds = 10 ms
15
16_print_mutex = thread.allocate_lock()
17
18def verbose_print(arg):
19    """Helper function for printing out debugging output."""
20    if support.verbose:
21        with _print_mutex:
22            print(arg)
23
24
25class BasicThreadTest(unittest.TestCase):
26
27    def setUp(self):
28        self.done_mutex = thread.allocate_lock()
29        self.done_mutex.acquire()
30        self.running_mutex = thread.allocate_lock()
31        self.random_mutex = thread.allocate_lock()
32        self.created = 0
33        self.running = 0
34        self.next_ident = 0
35
36        key = support.threading_setup()
37        self.addCleanup(support.threading_cleanup, *key)
38
39
40class ThreadRunningTests(BasicThreadTest):
41
42    def newtask(self):
43        with self.running_mutex:
44            self.next_ident += 1
45            verbose_print("creating task %s" % self.next_ident)
46            thread.start_new_thread(self.task, (self.next_ident,))
47            self.created += 1
48            self.running += 1
49
50    def task(self, ident):
51        with self.random_mutex:
52            delay = random.random() / 10000.0
53        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
54        time.sleep(delay)
55        verbose_print("task %s done" % ident)
56        with self.running_mutex:
57            self.running -= 1
58            if self.created == NUMTASKS and self.running == 0:
59                self.done_mutex.release()
60
61    def test_starting_threads(self):
62        with support.wait_threads_exit():
63            # Basic test for thread creation.
64            for i in range(NUMTASKS):
65                self.newtask()
66            verbose_print("waiting for tasks to complete...")
67            self.done_mutex.acquire()
68            verbose_print("all tasks done")
69
70    def test_stack_size(self):
71        # Various stack size tests.
72        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
73
74        thread.stack_size(0)
75        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
76
77    @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
78    def test_nt_and_posix_stack_size(self):
79        try:
80            thread.stack_size(4096)
81        except ValueError:
82            verbose_print("caught expected ValueError setting "
83                            "stack_size(4096)")
84        except thread.error:
85            self.skipTest("platform does not support changing thread stack "
86                          "size")
87
88        fail_msg = "stack_size(%d) failed - should succeed"
89        for tss in (262144, 0x100000, 0):
90            thread.stack_size(tss)
91            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
92            verbose_print("successfully set stack_size(%d)" % tss)
93
94        for tss in (262144, 0x100000):
95            verbose_print("trying stack_size = (%d)" % tss)
96            self.next_ident = 0
97            self.created = 0
98            with support.wait_threads_exit():
99                for i in range(NUMTASKS):
100                    self.newtask()
101
102                verbose_print("waiting for all tasks to complete")
103                self.done_mutex.acquire()
104                verbose_print("all tasks done")
105
106        thread.stack_size(0)
107
108    def test__count(self):
109        # Test the _count() function.
110        orig = thread._count()
111        mut = thread.allocate_lock()
112        mut.acquire()
113        started = []
114
115        def task():
116            started.append(None)
117            mut.acquire()
118            mut.release()
119
120        with support.wait_threads_exit():
121            thread.start_new_thread(task, ())
122            while not started:
123                time.sleep(POLL_SLEEP)
124            self.assertEqual(thread._count(), orig + 1)
125            # Allow the task to finish.
126            mut.release()
127            # The only reliable way to be sure that the thread ended from the
128            # interpreter's point of view is to wait for the function object to be
129            # destroyed.
130            done = []
131            wr = weakref.ref(task, lambda _: done.append(None))
132            del task
133            while not done:
134                time.sleep(POLL_SLEEP)
135            self.assertEqual(thread._count(), orig)
136
137    def test_save_exception_state_on_error(self):
138        # See issue #14474
139        def task():
140            started.release()
141            raise SyntaxError
142        def mywrite(self, *args):
143            try:
144                raise ValueError
145            except ValueError:
146                pass
147            real_write(self, *args)
148        started = thread.allocate_lock()
149        with support.captured_output("stderr") as stderr:
150            real_write = stderr.write
151            stderr.write = mywrite
152            started.acquire()
153            with support.wait_threads_exit():
154                thread.start_new_thread(task, ())
155                started.acquire()
156        self.assertIn("Traceback", stderr.getvalue())
157
158
159class Barrier:
160    def __init__(self, num_threads):
161        self.num_threads = num_threads
162        self.waiting = 0
163        self.checkin_mutex  = thread.allocate_lock()
164        self.checkout_mutex = thread.allocate_lock()
165        self.checkout_mutex.acquire()
166
167    def enter(self):
168        self.checkin_mutex.acquire()
169        self.waiting = self.waiting + 1
170        if self.waiting == self.num_threads:
171            self.waiting = self.num_threads - 1
172            self.checkout_mutex.release()
173            return
174        self.checkin_mutex.release()
175
176        self.checkout_mutex.acquire()
177        self.waiting = self.waiting - 1
178        if self.waiting == 0:
179            self.checkin_mutex.release()
180            return
181        self.checkout_mutex.release()
182
183
184class BarrierTest(BasicThreadTest):
185
186    def test_barrier(self):
187        with support.wait_threads_exit():
188            self.bar = Barrier(NUMTASKS)
189            self.running = NUMTASKS
190            for i in range(NUMTASKS):
191                thread.start_new_thread(self.task2, (i,))
192            verbose_print("waiting for tasks to end")
193            self.done_mutex.acquire()
194            verbose_print("tasks done")
195
196    def task2(self, ident):
197        for i in range(NUMTRIPS):
198            if ident == 0:
199                # give it a good chance to enter the next
200                # barrier before the others are all out
201                # of the current one
202                delay = 0
203            else:
204                with self.random_mutex:
205                    delay = random.random() / 10000.0
206            verbose_print("task %s will run for %sus" %
207                          (ident, round(delay * 1e6)))
208            time.sleep(delay)
209            verbose_print("task %s entering %s" % (ident, i))
210            self.bar.enter()
211            verbose_print("task %s leaving barrier" % ident)
212        with self.running_mutex:
213            self.running -= 1
214            # Must release mutex before releasing done, else the main thread can
215            # exit and set mutex to None as part of global teardown; then
216            # mutex.release() raises AttributeError.
217            finished = self.running == 0
218        if finished:
219            self.done_mutex.release()
220
221class LockTests(lock_tests.LockTests):
222    locktype = thread.allocate_lock
223
224
225class TestForkInThread(unittest.TestCase):
226    def setUp(self):
227        self.read_fd, self.write_fd = os.pipe()
228
229    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
230    @support.reap_threads
231    def test_forkinthread(self):
232        status = "not set"
233
234        def thread1():
235            nonlocal status
236
237            # fork in a thread
238            pid = os.fork()
239            if pid == 0:
240                # child
241                try:
242                    os.close(self.read_fd)
243                    os.write(self.write_fd, b"OK")
244                finally:
245                    os._exit(0)
246            else:
247                # parent
248                os.close(self.write_fd)
249                pid, status = os.waitpid(pid, 0)
250
251        with support.wait_threads_exit():
252            thread.start_new_thread(thread1, ())
253            self.assertEqual(os.read(self.read_fd, 2), b"OK",
254                             "Unable to fork() in thread")
255        self.assertEqual(status, 0)
256
257    def tearDown(self):
258        try:
259            os.close(self.read_fd)
260        except OSError:
261            pass
262
263        try:
264            os.close(self.write_fd)
265        except OSError:
266            pass
267
268
269if __name__ == "__main__":
270    unittest.main()
271