• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import unittest
3import random
4from test import support
5thread = support.import_module('thread')
6import time
7import sys
8import weakref
9
10from test import lock_tests
11
12NUMTASKS = 10
13NUMTRIPS = 3
14
15
16_print_mutex = thread.allocate_lock()
17
18def verbose_print(arg):
19    """Helper function for printing out debugging output."""
20    if support.verbose:
21        with _print_mutex:
22            print arg
23
24
25
26class BasicThreadTest(unittest.TestCase):
27
28    def setUp(self):
29        self.done_mutex = thread.allocate_lock()
30        self.done_mutex.acquire()
31        self.running_mutex = thread.allocate_lock()
32        self.random_mutex = thread.allocate_lock()
33        self.created = 0
34        self.running = 0
35        self.next_ident = 0
36
37        key = support.threading_setup()
38        self.addCleanup(support.threading_cleanup, *key)
39
40
41class ThreadRunningTests(BasicThreadTest):
42
43    def newtask(self):
44        with self.running_mutex:
45            self.next_ident += 1
46            verbose_print("creating task %s" % self.next_ident)
47            thread.start_new_thread(self.task, (self.next_ident,))
48            self.created += 1
49            self.running += 1
50
51    def task(self, ident):
52        with self.random_mutex:
53            delay = random.random() / 10000.0
54        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
55        time.sleep(delay)
56        verbose_print("task %s done" % ident)
57        with self.running_mutex:
58            self.running -= 1
59            if self.created == NUMTASKS and self.running == 0:
60                self.done_mutex.release()
61
62    def test_starting_threads(self):
63        with support.wait_threads_exit():
64            # Basic test for thread creation.
65            for i in range(NUMTASKS):
66                self.newtask()
67            verbose_print("waiting for tasks to complete...")
68            self.done_mutex.acquire()
69            verbose_print("all tasks done")
70
71    def test_stack_size(self):
72        # Various stack size tests.
73        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
74
75        thread.stack_size(0)
76        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
77
78    @unittest.skipIf(os.name not in ("nt", "os2", "posix"), 'test meant for nt, os2, and posix')
79    def test_nt_and_posix_stack_size(self):
80        try:
81            thread.stack_size(4096)
82        except ValueError:
83            verbose_print("caught expected ValueError setting "
84                            "stack_size(4096)")
85        except thread.error:
86            self.skipTest("platform does not support changing thread stack "
87                          "size")
88
89        fail_msg = "stack_size(%d) failed - should succeed"
90        for tss in (262144, 0x100000, 0):
91            thread.stack_size(tss)
92            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
93            verbose_print("successfully set stack_size(%d)" % tss)
94
95        for tss in (262144, 0x100000):
96            verbose_print("trying stack_size = (%d)" % tss)
97            self.next_ident = 0
98            self.created = 0
99            with support.wait_threads_exit():
100                for i in range(NUMTASKS):
101                    self.newtask()
102
103                verbose_print("waiting for all tasks to complete")
104                self.done_mutex.acquire()
105                verbose_print("all tasks done")
106
107        thread.stack_size(0)
108
109    def test__count(self):
110        # Test the _count() function.
111        orig = thread._count()
112        mut = thread.allocate_lock()
113        mut.acquire()
114        started = []
115
116        def task():
117            started.append(None)
118            mut.acquire()
119            mut.release()
120
121        with support.wait_threads_exit():
122            thread.start_new_thread(task, ())
123            while not started:
124                time.sleep(0.01)
125            self.assertEqual(thread._count(), orig + 1)
126            # Allow the task to finish.
127            mut.release()
128            # The only reliable way to be sure that the thread ended from the
129            # interpreter's point of view is to wait for the function object to be
130            # destroyed.
131            done = []
132            wr = weakref.ref(task, lambda _: done.append(None))
133            del task
134            while not done:
135                time.sleep(0.01)
136            self.assertEqual(thread._count(), orig)
137
138    def test_save_exception_state_on_error(self):
139        # See issue #14474
140        def task():
141            started.release()
142            raise SyntaxError
143        def mywrite(self, *args):
144            try:
145                raise ValueError
146            except ValueError:
147                pass
148            real_write(self, *args)
149        c = thread._count()
150        started = thread.allocate_lock()
151        with support.captured_output("stderr") as stderr:
152            real_write = stderr.write
153            stderr.write = mywrite
154            started.acquire()
155            with support.wait_threads_exit():
156                thread.start_new_thread(task, ())
157                started.acquire()
158        self.assertIn("Traceback", stderr.getvalue())
159
160
161class Barrier:
162    def __init__(self, num_threads):
163        self.num_threads = num_threads
164        self.waiting = 0
165        self.checkin_mutex  = thread.allocate_lock()
166        self.checkout_mutex = thread.allocate_lock()
167        self.checkout_mutex.acquire()
168
169    def enter(self):
170        self.checkin_mutex.acquire()
171        self.waiting = self.waiting + 1
172        if self.waiting == self.num_threads:
173            self.waiting = self.num_threads - 1
174            self.checkout_mutex.release()
175            return
176        self.checkin_mutex.release()
177
178        self.checkout_mutex.acquire()
179        self.waiting = self.waiting - 1
180        if self.waiting == 0:
181            self.checkin_mutex.release()
182            return
183        self.checkout_mutex.release()
184
185
186class BarrierTest(BasicThreadTest):
187
188    def test_barrier(self):
189        with support.wait_threads_exit():
190            self.bar = Barrier(NUMTASKS)
191            self.running = NUMTASKS
192            for i in range(NUMTASKS):
193                thread.start_new_thread(self.task2, (i,))
194            verbose_print("waiting for tasks to end")
195            self.done_mutex.acquire()
196            verbose_print("tasks done")
197
198    def task2(self, ident):
199        for i in range(NUMTRIPS):
200            if ident == 0:
201                # give it a good chance to enter the next
202                # barrier before the others are all out
203                # of the current one
204                delay = 0
205            else:
206                with self.random_mutex:
207                    delay = random.random() / 10000.0
208            verbose_print("task %s will run for %sus" %
209                          (ident, round(delay * 1e6)))
210            time.sleep(delay)
211            verbose_print("task %s entering %s" % (ident, i))
212            self.bar.enter()
213            verbose_print("task %s leaving barrier" % ident)
214        with self.running_mutex:
215            self.running -= 1
216            # Must release mutex before releasing done, else the main thread can
217            # exit and set mutex to None as part of global teardown; then
218            # mutex.release() raises AttributeError.
219            finished = self.running == 0
220        if finished:
221            self.done_mutex.release()
222
223
224class LockTests(lock_tests.LockTests):
225    locktype = thread.allocate_lock
226
227
228class TestForkInThread(unittest.TestCase):
229    def setUp(self):
230        self.read_fd, self.write_fd = os.pipe()
231
232    @unittest.skipIf(sys.platform.startswith('win'),
233                     "This test is only appropriate for POSIX-like systems.")
234    @support.reap_threads
235    def test_forkinthread(self):
236        non_local = {'status': None}
237        def thread1():
238            try:
239                pid = os.fork() # fork in a thread
240            except RuntimeError:
241                sys.exit(0) # exit the child
242
243            if pid == 0: # child
244                os.close(self.read_fd)
245                os.write(self.write_fd, "OK")
246                # Exiting the thread normally in the child process can leave
247                # any additional threads (such as the one started by
248                # importing _tkinter) still running, and this can prevent
249                # the half-zombie child process from being cleaned up. See
250                # Issue #26456.
251                os._exit(0)
252            else: # parent
253                os.close(self.write_fd)
254                pid, status = os.waitpid(pid, 0)
255                non_local['status'] = status
256
257        with support.wait_threads_exit():
258            thread.start_new_thread(thread1, ())
259            self.assertEqual(os.read(self.read_fd, 2), "OK",
260                             "Unable to fork() in thread")
261        self.assertEqual(non_local['status'], 0)
262
263    def tearDown(self):
264        try:
265            os.close(self.read_fd)
266        except OSError:
267            pass
268
269        try:
270            os.close(self.write_fd)
271        except OSError:
272            pass
273
274
275def test_main():
276    support.run_unittest(ThreadRunningTests, BarrierTest, LockTests,
277                              TestForkInThread)
278
279if __name__ == "__main__":
280    test_main()
281