1import os 2import unittest 3import random 4from test import support 5import _thread as thread 6import time 7import weakref 8 9from test import lock_tests 10 11NUMTASKS = 10 12NUMTRIPS = 3 13POLL_SLEEP = 0.010 # seconds = 10 ms 14 15_print_mutex = thread.allocate_lock() 16 17def verbose_print(arg): 18 """Helper function for printing out debugging output.""" 19 if support.verbose: 20 with _print_mutex: 21 print(arg) 22 23 24class BasicThreadTest(unittest.TestCase): 25 26 def setUp(self): 27 self.done_mutex = thread.allocate_lock() 28 self.done_mutex.acquire() 29 self.running_mutex = thread.allocate_lock() 30 self.random_mutex = thread.allocate_lock() 31 self.created = 0 32 self.running = 0 33 self.next_ident = 0 34 35 key = support.threading_setup() 36 self.addCleanup(support.threading_cleanup, *key) 37 38 39class ThreadRunningTests(BasicThreadTest): 40 41 def newtask(self): 42 with self.running_mutex: 43 self.next_ident += 1 44 verbose_print("creating task %s" % self.next_ident) 45 thread.start_new_thread(self.task, (self.next_ident,)) 46 self.created += 1 47 self.running += 1 48 49 def task(self, ident): 50 with self.random_mutex: 51 delay = random.random() / 10000.0 52 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 53 time.sleep(delay) 54 verbose_print("task %s done" % ident) 55 with self.running_mutex: 56 self.running -= 1 57 if self.created == NUMTASKS and self.running == 0: 58 self.done_mutex.release() 59 60 def test_starting_threads(self): 61 with support.wait_threads_exit(): 62 # Basic test for thread creation. 63 for i in range(NUMTASKS): 64 self.newtask() 65 verbose_print("waiting for tasks to complete...") 66 self.done_mutex.acquire() 67 verbose_print("all tasks done") 68 69 def test_stack_size(self): 70 # Various stack size tests. 71 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 72 73 thread.stack_size(0) 74 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 75 76 @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') 77 def test_nt_and_posix_stack_size(self): 78 try: 79 thread.stack_size(4096) 80 except ValueError: 81 verbose_print("caught expected ValueError setting " 82 "stack_size(4096)") 83 except thread.error: 84 self.skipTest("platform does not support changing thread stack " 85 "size") 86 87 fail_msg = "stack_size(%d) failed - should succeed" 88 for tss in (262144, 0x100000, 0): 89 thread.stack_size(tss) 90 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 91 verbose_print("successfully set stack_size(%d)" % tss) 92 93 for tss in (262144, 0x100000): 94 verbose_print("trying stack_size = (%d)" % tss) 95 self.next_ident = 0 96 self.created = 0 97 with support.wait_threads_exit(): 98 for i in range(NUMTASKS): 99 self.newtask() 100 101 verbose_print("waiting for all tasks to complete") 102 self.done_mutex.acquire() 103 verbose_print("all tasks done") 104 105 thread.stack_size(0) 106 107 def test__count(self): 108 # Test the _count() function. 109 orig = thread._count() 110 mut = thread.allocate_lock() 111 mut.acquire() 112 started = [] 113 114 def task(): 115 started.append(None) 116 mut.acquire() 117 mut.release() 118 119 with support.wait_threads_exit(): 120 thread.start_new_thread(task, ()) 121 while not started: 122 time.sleep(POLL_SLEEP) 123 self.assertEqual(thread._count(), orig + 1) 124 # Allow the task to finish. 125 mut.release() 126 # The only reliable way to be sure that the thread ended from the 127 # interpreter's point of view is to wait for the function object to be 128 # destroyed. 129 done = [] 130 wr = weakref.ref(task, lambda _: done.append(None)) 131 del task 132 while not done: 133 time.sleep(POLL_SLEEP) 134 self.assertEqual(thread._count(), orig) 135 136 def test_unraisable_exception(self): 137 def task(): 138 started.release() 139 raise ValueError("task failed") 140 141 started = thread.allocate_lock() 142 with support.catch_unraisable_exception() as cm: 143 with support.wait_threads_exit(): 144 started.acquire() 145 thread.start_new_thread(task, ()) 146 started.acquire() 147 148 self.assertEqual(str(cm.unraisable.exc_value), "task failed") 149 self.assertIs(cm.unraisable.object, task) 150 self.assertEqual(cm.unraisable.err_msg, 151 "Exception ignored in thread started by") 152 self.assertIsNotNone(cm.unraisable.exc_traceback) 153 154 155class Barrier: 156 def __init__(self, num_threads): 157 self.num_threads = num_threads 158 self.waiting = 0 159 self.checkin_mutex = thread.allocate_lock() 160 self.checkout_mutex = thread.allocate_lock() 161 self.checkout_mutex.acquire() 162 163 def enter(self): 164 self.checkin_mutex.acquire() 165 self.waiting = self.waiting + 1 166 if self.waiting == self.num_threads: 167 self.waiting = self.num_threads - 1 168 self.checkout_mutex.release() 169 return 170 self.checkin_mutex.release() 171 172 self.checkout_mutex.acquire() 173 self.waiting = self.waiting - 1 174 if self.waiting == 0: 175 self.checkin_mutex.release() 176 return 177 self.checkout_mutex.release() 178 179 180class BarrierTest(BasicThreadTest): 181 182 def test_barrier(self): 183 with support.wait_threads_exit(): 184 self.bar = Barrier(NUMTASKS) 185 self.running = NUMTASKS 186 for i in range(NUMTASKS): 187 thread.start_new_thread(self.task2, (i,)) 188 verbose_print("waiting for tasks to end") 189 self.done_mutex.acquire() 190 verbose_print("tasks done") 191 192 def task2(self, ident): 193 for i in range(NUMTRIPS): 194 if ident == 0: 195 # give it a good chance to enter the next 196 # barrier before the others are all out 197 # of the current one 198 delay = 0 199 else: 200 with self.random_mutex: 201 delay = random.random() / 10000.0 202 verbose_print("task %s will run for %sus" % 203 (ident, round(delay * 1e6))) 204 time.sleep(delay) 205 verbose_print("task %s entering %s" % (ident, i)) 206 self.bar.enter() 207 verbose_print("task %s leaving barrier" % ident) 208 with self.running_mutex: 209 self.running -= 1 210 # Must release mutex before releasing done, else the main thread can 211 # exit and set mutex to None as part of global teardown; then 212 # mutex.release() raises AttributeError. 213 finished = self.running == 0 214 if finished: 215 self.done_mutex.release() 216 217class LockTests(lock_tests.LockTests): 218 locktype = thread.allocate_lock 219 220 221class TestForkInThread(unittest.TestCase): 222 def setUp(self): 223 self.read_fd, self.write_fd = os.pipe() 224 225 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') 226 @support.reap_threads 227 def test_forkinthread(self): 228 pid = None 229 230 def fork_thread(read_fd, write_fd): 231 nonlocal pid 232 233 # fork in a thread 234 pid = os.fork() 235 if pid: 236 # parent process 237 return 238 239 # child process 240 try: 241 os.close(read_fd) 242 os.write(write_fd, b"OK") 243 finally: 244 os._exit(0) 245 246 with support.wait_threads_exit(): 247 thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd)) 248 self.assertEqual(os.read(self.read_fd, 2), b"OK") 249 os.close(self.write_fd) 250 251 self.assertIsNotNone(pid) 252 support.wait_process(pid, exitcode=0) 253 254 def tearDown(self): 255 try: 256 os.close(self.read_fd) 257 except OSError: 258 pass 259 260 try: 261 os.close(self.write_fd) 262 except OSError: 263 pass 264 265 266if __name__ == "__main__": 267 unittest.main() 268