1import os 2import unittest 3import random 4from test import support 5import _thread as thread 6import time 7import sys 8import weakref 9 10from test import lock_tests 11 12NUMTASKS = 10 13NUMTRIPS = 3 14POLL_SLEEP = 0.010 # seconds = 10 ms 15 16_print_mutex = thread.allocate_lock() 17 18def verbose_print(arg): 19 """Helper function for printing out debugging output.""" 20 if support.verbose: 21 with _print_mutex: 22 print(arg) 23 24 25class BasicThreadTest(unittest.TestCase): 26 27 def setUp(self): 28 self.done_mutex = thread.allocate_lock() 29 self.done_mutex.acquire() 30 self.running_mutex = thread.allocate_lock() 31 self.random_mutex = thread.allocate_lock() 32 self.created = 0 33 self.running = 0 34 self.next_ident = 0 35 36 key = support.threading_setup() 37 self.addCleanup(support.threading_cleanup, *key) 38 39 40class ThreadRunningTests(BasicThreadTest): 41 42 def newtask(self): 43 with self.running_mutex: 44 self.next_ident += 1 45 verbose_print("creating task %s" % self.next_ident) 46 thread.start_new_thread(self.task, (self.next_ident,)) 47 self.created += 1 48 self.running += 1 49 50 def task(self, ident): 51 with self.random_mutex: 52 delay = random.random() / 10000.0 53 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 54 time.sleep(delay) 55 verbose_print("task %s done" % ident) 56 with self.running_mutex: 57 self.running -= 1 58 if self.created == NUMTASKS and self.running == 0: 59 self.done_mutex.release() 60 61 def test_starting_threads(self): 62 with support.wait_threads_exit(): 63 # Basic test for thread creation. 64 for i in range(NUMTASKS): 65 self.newtask() 66 verbose_print("waiting for tasks to complete...") 67 self.done_mutex.acquire() 68 verbose_print("all tasks done") 69 70 def test_stack_size(self): 71 # Various stack size tests. 72 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 73 74 thread.stack_size(0) 75 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 76 77 @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') 78 def test_nt_and_posix_stack_size(self): 79 try: 80 thread.stack_size(4096) 81 except ValueError: 82 verbose_print("caught expected ValueError setting " 83 "stack_size(4096)") 84 except thread.error: 85 self.skipTest("platform does not support changing thread stack " 86 "size") 87 88 fail_msg = "stack_size(%d) failed - should succeed" 89 for tss in (262144, 0x100000, 0): 90 thread.stack_size(tss) 91 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 92 verbose_print("successfully set stack_size(%d)" % tss) 93 94 for tss in (262144, 0x100000): 95 verbose_print("trying stack_size = (%d)" % tss) 96 self.next_ident = 0 97 self.created = 0 98 with support.wait_threads_exit(): 99 for i in range(NUMTASKS): 100 self.newtask() 101 102 verbose_print("waiting for all tasks to complete") 103 self.done_mutex.acquire() 104 verbose_print("all tasks done") 105 106 thread.stack_size(0) 107 108 def test__count(self): 109 # Test the _count() function. 110 orig = thread._count() 111 mut = thread.allocate_lock() 112 mut.acquire() 113 started = [] 114 115 def task(): 116 started.append(None) 117 mut.acquire() 118 mut.release() 119 120 with support.wait_threads_exit(): 121 thread.start_new_thread(task, ()) 122 while not started: 123 time.sleep(POLL_SLEEP) 124 self.assertEqual(thread._count(), orig + 1) 125 # Allow the task to finish. 126 mut.release() 127 # The only reliable way to be sure that the thread ended from the 128 # interpreter's point of view is to wait for the function object to be 129 # destroyed. 130 done = [] 131 wr = weakref.ref(task, lambda _: done.append(None)) 132 del task 133 while not done: 134 time.sleep(POLL_SLEEP) 135 self.assertEqual(thread._count(), orig) 136 137 def test_save_exception_state_on_error(self): 138 # See issue #14474 139 def task(): 140 started.release() 141 raise SyntaxError 142 def mywrite(self, *args): 143 try: 144 raise ValueError 145 except ValueError: 146 pass 147 real_write(self, *args) 148 started = thread.allocate_lock() 149 with support.captured_output("stderr") as stderr: 150 real_write = stderr.write 151 stderr.write = mywrite 152 started.acquire() 153 with support.wait_threads_exit(): 154 thread.start_new_thread(task, ()) 155 started.acquire() 156 self.assertIn("Traceback", stderr.getvalue()) 157 158 159class Barrier: 160 def __init__(self, num_threads): 161 self.num_threads = num_threads 162 self.waiting = 0 163 self.checkin_mutex = thread.allocate_lock() 164 self.checkout_mutex = thread.allocate_lock() 165 self.checkout_mutex.acquire() 166 167 def enter(self): 168 self.checkin_mutex.acquire() 169 self.waiting = self.waiting + 1 170 if self.waiting == self.num_threads: 171 self.waiting = self.num_threads - 1 172 self.checkout_mutex.release() 173 return 174 self.checkin_mutex.release() 175 176 self.checkout_mutex.acquire() 177 self.waiting = self.waiting - 1 178 if self.waiting == 0: 179 self.checkin_mutex.release() 180 return 181 self.checkout_mutex.release() 182 183 184class BarrierTest(BasicThreadTest): 185 186 def test_barrier(self): 187 with support.wait_threads_exit(): 188 self.bar = Barrier(NUMTASKS) 189 self.running = NUMTASKS 190 for i in range(NUMTASKS): 191 thread.start_new_thread(self.task2, (i,)) 192 verbose_print("waiting for tasks to end") 193 self.done_mutex.acquire() 194 verbose_print("tasks done") 195 196 def task2(self, ident): 197 for i in range(NUMTRIPS): 198 if ident == 0: 199 # give it a good chance to enter the next 200 # barrier before the others are all out 201 # of the current one 202 delay = 0 203 else: 204 with self.random_mutex: 205 delay = random.random() / 10000.0 206 verbose_print("task %s will run for %sus" % 207 (ident, round(delay * 1e6))) 208 time.sleep(delay) 209 verbose_print("task %s entering %s" % (ident, i)) 210 self.bar.enter() 211 verbose_print("task %s leaving barrier" % ident) 212 with self.running_mutex: 213 self.running -= 1 214 # Must release mutex before releasing done, else the main thread can 215 # exit and set mutex to None as part of global teardown; then 216 # mutex.release() raises AttributeError. 217 finished = self.running == 0 218 if finished: 219 self.done_mutex.release() 220 221class LockTests(lock_tests.LockTests): 222 locktype = thread.allocate_lock 223 224 225class TestForkInThread(unittest.TestCase): 226 def setUp(self): 227 self.read_fd, self.write_fd = os.pipe() 228 229 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') 230 @support.reap_threads 231 def test_forkinthread(self): 232 status = "not set" 233 234 def thread1(): 235 nonlocal status 236 237 # fork in a thread 238 pid = os.fork() 239 if pid == 0: 240 # child 241 try: 242 os.close(self.read_fd) 243 os.write(self.write_fd, b"OK") 244 finally: 245 os._exit(0) 246 else: 247 # parent 248 os.close(self.write_fd) 249 pid, status = os.waitpid(pid, 0) 250 251 with support.wait_threads_exit(): 252 thread.start_new_thread(thread1, ()) 253 self.assertEqual(os.read(self.read_fd, 2), b"OK", 254 "Unable to fork() in thread") 255 self.assertEqual(status, 0) 256 257 def tearDown(self): 258 try: 259 os.close(self.read_fd) 260 except OSError: 261 pass 262 263 try: 264 os.close(self.write_fd) 265 except OSError: 266 pass 267 268 269if __name__ == "__main__": 270 unittest.main() 271