1import os 2import unittest 3import random 4from test import support 5from test.support import threading_helper 6import _thread as thread 7import time 8import weakref 9 10from test import lock_tests 11 12NUMTASKS = 10 13NUMTRIPS = 3 14POLL_SLEEP = 0.010 # seconds = 10 ms 15 16_print_mutex = thread.allocate_lock() 17 18def verbose_print(arg): 19 """Helper function for printing out debugging output.""" 20 if support.verbose: 21 with _print_mutex: 22 print(arg) 23 24 25class BasicThreadTest(unittest.TestCase): 26 27 def setUp(self): 28 self.done_mutex = thread.allocate_lock() 29 self.done_mutex.acquire() 30 self.running_mutex = thread.allocate_lock() 31 self.random_mutex = thread.allocate_lock() 32 self.created = 0 33 self.running = 0 34 self.next_ident = 0 35 36 key = threading_helper.threading_setup() 37 self.addCleanup(threading_helper.threading_cleanup, *key) 38 39 40class ThreadRunningTests(BasicThreadTest): 41 42 def newtask(self): 43 with self.running_mutex: 44 self.next_ident += 1 45 verbose_print("creating task %s" % self.next_ident) 46 thread.start_new_thread(self.task, (self.next_ident,)) 47 self.created += 1 48 self.running += 1 49 50 def task(self, ident): 51 with self.random_mutex: 52 delay = random.random() / 10000.0 53 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 54 time.sleep(delay) 55 verbose_print("task %s done" % ident) 56 with self.running_mutex: 57 self.running -= 1 58 if self.created == NUMTASKS and self.running == 0: 59 self.done_mutex.release() 60 61 def test_starting_threads(self): 62 with threading_helper.wait_threads_exit(): 63 # Basic test for thread creation. 64 for i in range(NUMTASKS): 65 self.newtask() 66 verbose_print("waiting for tasks to complete...") 67 self.done_mutex.acquire() 68 verbose_print("all tasks done") 69 70 def test_stack_size(self): 71 # Various stack size tests. 72 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 73 74 thread.stack_size(0) 75 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 76 77 @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') 78 def test_nt_and_posix_stack_size(self): 79 try: 80 thread.stack_size(4096) 81 except ValueError: 82 verbose_print("caught expected ValueError setting " 83 "stack_size(4096)") 84 except thread.error: 85 self.skipTest("platform does not support changing thread stack " 86 "size") 87 88 fail_msg = "stack_size(%d) failed - should succeed" 89 for tss in (262144, 0x100000, 0): 90 thread.stack_size(tss) 91 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 92 verbose_print("successfully set stack_size(%d)" % tss) 93 94 for tss in (262144, 0x100000): 95 verbose_print("trying stack_size = (%d)" % tss) 96 self.next_ident = 0 97 self.created = 0 98 with threading_helper.wait_threads_exit(): 99 for i in range(NUMTASKS): 100 self.newtask() 101 102 verbose_print("waiting for all tasks to complete") 103 self.done_mutex.acquire() 104 verbose_print("all tasks done") 105 106 thread.stack_size(0) 107 108 def test__count(self): 109 # Test the _count() function. 110 orig = thread._count() 111 mut = thread.allocate_lock() 112 mut.acquire() 113 started = [] 114 115 def task(): 116 started.append(None) 117 mut.acquire() 118 mut.release() 119 120 with threading_helper.wait_threads_exit(): 121 thread.start_new_thread(task, ()) 122 while not started: 123 time.sleep(POLL_SLEEP) 124 self.assertEqual(thread._count(), orig + 1) 125 # Allow the task to finish. 126 mut.release() 127 # The only reliable way to be sure that the thread ended from the 128 # interpreter's point of view is to wait for the function object to be 129 # destroyed. 130 done = [] 131 wr = weakref.ref(task, lambda _: done.append(None)) 132 del task 133 while not done: 134 time.sleep(POLL_SLEEP) 135 support.gc_collect() # For PyPy or other GCs. 136 self.assertEqual(thread._count(), orig) 137 138 def test_unraisable_exception(self): 139 def task(): 140 started.release() 141 raise ValueError("task failed") 142 143 started = thread.allocate_lock() 144 with support.catch_unraisable_exception() as cm: 145 with threading_helper.wait_threads_exit(): 146 started.acquire() 147 thread.start_new_thread(task, ()) 148 started.acquire() 149 150 self.assertEqual(str(cm.unraisable.exc_value), "task failed") 151 self.assertIs(cm.unraisable.object, task) 152 self.assertEqual(cm.unraisable.err_msg, 153 "Exception ignored in thread started by") 154 self.assertIsNotNone(cm.unraisable.exc_traceback) 155 156 157class Barrier: 158 def __init__(self, num_threads): 159 self.num_threads = num_threads 160 self.waiting = 0 161 self.checkin_mutex = thread.allocate_lock() 162 self.checkout_mutex = thread.allocate_lock() 163 self.checkout_mutex.acquire() 164 165 def enter(self): 166 self.checkin_mutex.acquire() 167 self.waiting = self.waiting + 1 168 if self.waiting == self.num_threads: 169 self.waiting = self.num_threads - 1 170 self.checkout_mutex.release() 171 return 172 self.checkin_mutex.release() 173 174 self.checkout_mutex.acquire() 175 self.waiting = self.waiting - 1 176 if self.waiting == 0: 177 self.checkin_mutex.release() 178 return 179 self.checkout_mutex.release() 180 181 182class BarrierTest(BasicThreadTest): 183 184 def test_barrier(self): 185 with threading_helper.wait_threads_exit(): 186 self.bar = Barrier(NUMTASKS) 187 self.running = NUMTASKS 188 for i in range(NUMTASKS): 189 thread.start_new_thread(self.task2, (i,)) 190 verbose_print("waiting for tasks to end") 191 self.done_mutex.acquire() 192 verbose_print("tasks done") 193 194 def task2(self, ident): 195 for i in range(NUMTRIPS): 196 if ident == 0: 197 # give it a good chance to enter the next 198 # barrier before the others are all out 199 # of the current one 200 delay = 0 201 else: 202 with self.random_mutex: 203 delay = random.random() / 10000.0 204 verbose_print("task %s will run for %sus" % 205 (ident, round(delay * 1e6))) 206 time.sleep(delay) 207 verbose_print("task %s entering %s" % (ident, i)) 208 self.bar.enter() 209 verbose_print("task %s leaving barrier" % ident) 210 with self.running_mutex: 211 self.running -= 1 212 # Must release mutex before releasing done, else the main thread can 213 # exit and set mutex to None as part of global teardown; then 214 # mutex.release() raises AttributeError. 215 finished = self.running == 0 216 if finished: 217 self.done_mutex.release() 218 219class LockTests(lock_tests.LockTests): 220 locktype = thread.allocate_lock 221 222 223class TestForkInThread(unittest.TestCase): 224 def setUp(self): 225 self.read_fd, self.write_fd = os.pipe() 226 227 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') 228 @threading_helper.reap_threads 229 def test_forkinthread(self): 230 pid = None 231 232 def fork_thread(read_fd, write_fd): 233 nonlocal pid 234 235 # fork in a thread 236 pid = os.fork() 237 if pid: 238 # parent process 239 return 240 241 # child process 242 try: 243 os.close(read_fd) 244 os.write(write_fd, b"OK") 245 finally: 246 os._exit(0) 247 248 with threading_helper.wait_threads_exit(): 249 thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd)) 250 self.assertEqual(os.read(self.read_fd, 2), b"OK") 251 os.close(self.write_fd) 252 253 self.assertIsNotNone(pid) 254 support.wait_process(pid, exitcode=0) 255 256 def tearDown(self): 257 try: 258 os.close(self.read_fd) 259 except OSError: 260 pass 261 262 try: 263 os.close(self.write_fd) 264 except OSError: 265 pass 266 267 268if __name__ == "__main__": 269 unittest.main() 270