1import os 2import unittest 3import random 4from test import support 5thread = support.import_module('thread') 6import time 7import sys 8import weakref 9 10from test import lock_tests 11 12NUMTASKS = 10 13NUMTRIPS = 3 14 15 16_print_mutex = thread.allocate_lock() 17 18def verbose_print(arg): 19 """Helper function for printing out debugging output.""" 20 if support.verbose: 21 with _print_mutex: 22 print arg 23 24 25 26class BasicThreadTest(unittest.TestCase): 27 28 def setUp(self): 29 self.done_mutex = thread.allocate_lock() 30 self.done_mutex.acquire() 31 self.running_mutex = thread.allocate_lock() 32 self.random_mutex = thread.allocate_lock() 33 self.created = 0 34 self.running = 0 35 self.next_ident = 0 36 37 key = support.threading_setup() 38 self.addCleanup(support.threading_cleanup, *key) 39 40 41class ThreadRunningTests(BasicThreadTest): 42 43 def newtask(self): 44 with self.running_mutex: 45 self.next_ident += 1 46 verbose_print("creating task %s" % self.next_ident) 47 thread.start_new_thread(self.task, (self.next_ident,)) 48 self.created += 1 49 self.running += 1 50 51 def task(self, ident): 52 with self.random_mutex: 53 delay = random.random() / 10000.0 54 verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) 55 time.sleep(delay) 56 verbose_print("task %s done" % ident) 57 with self.running_mutex: 58 self.running -= 1 59 if self.created == NUMTASKS and self.running == 0: 60 self.done_mutex.release() 61 62 def test_starting_threads(self): 63 with support.wait_threads_exit(): 64 # Basic test for thread creation. 65 for i in range(NUMTASKS): 66 self.newtask() 67 verbose_print("waiting for tasks to complete...") 68 self.done_mutex.acquire() 69 verbose_print("all tasks done") 70 71 def test_stack_size(self): 72 # Various stack size tests. 73 self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") 74 75 thread.stack_size(0) 76 self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") 77 78 @unittest.skipIf(os.name not in ("nt", "os2", "posix"), 'test meant for nt, os2, and posix') 79 def test_nt_and_posix_stack_size(self): 80 try: 81 thread.stack_size(4096) 82 except ValueError: 83 verbose_print("caught expected ValueError setting " 84 "stack_size(4096)") 85 except thread.error: 86 self.skipTest("platform does not support changing thread stack " 87 "size") 88 89 fail_msg = "stack_size(%d) failed - should succeed" 90 for tss in (262144, 0x100000, 0): 91 thread.stack_size(tss) 92 self.assertEqual(thread.stack_size(), tss, fail_msg % tss) 93 verbose_print("successfully set stack_size(%d)" % tss) 94 95 for tss in (262144, 0x100000): 96 verbose_print("trying stack_size = (%d)" % tss) 97 self.next_ident = 0 98 self.created = 0 99 with support.wait_threads_exit(): 100 for i in range(NUMTASKS): 101 self.newtask() 102 103 verbose_print("waiting for all tasks to complete") 104 self.done_mutex.acquire() 105 verbose_print("all tasks done") 106 107 thread.stack_size(0) 108 109 def test__count(self): 110 # Test the _count() function. 111 orig = thread._count() 112 mut = thread.allocate_lock() 113 mut.acquire() 114 started = [] 115 116 def task(): 117 started.append(None) 118 mut.acquire() 119 mut.release() 120 121 with support.wait_threads_exit(): 122 thread.start_new_thread(task, ()) 123 while not started: 124 time.sleep(0.01) 125 self.assertEqual(thread._count(), orig + 1) 126 # Allow the task to finish. 127 mut.release() 128 # The only reliable way to be sure that the thread ended from the 129 # interpreter's point of view is to wait for the function object to be 130 # destroyed. 131 done = [] 132 wr = weakref.ref(task, lambda _: done.append(None)) 133 del task 134 while not done: 135 time.sleep(0.01) 136 self.assertEqual(thread._count(), orig) 137 138 def test_save_exception_state_on_error(self): 139 # See issue #14474 140 def task(): 141 started.release() 142 raise SyntaxError 143 def mywrite(self, *args): 144 try: 145 raise ValueError 146 except ValueError: 147 pass 148 real_write(self, *args) 149 c = thread._count() 150 started = thread.allocate_lock() 151 with support.captured_output("stderr") as stderr: 152 real_write = stderr.write 153 stderr.write = mywrite 154 started.acquire() 155 with support.wait_threads_exit(): 156 thread.start_new_thread(task, ()) 157 started.acquire() 158 self.assertIn("Traceback", stderr.getvalue()) 159 160 161class Barrier: 162 def __init__(self, num_threads): 163 self.num_threads = num_threads 164 self.waiting = 0 165 self.checkin_mutex = thread.allocate_lock() 166 self.checkout_mutex = thread.allocate_lock() 167 self.checkout_mutex.acquire() 168 169 def enter(self): 170 self.checkin_mutex.acquire() 171 self.waiting = self.waiting + 1 172 if self.waiting == self.num_threads: 173 self.waiting = self.num_threads - 1 174 self.checkout_mutex.release() 175 return 176 self.checkin_mutex.release() 177 178 self.checkout_mutex.acquire() 179 self.waiting = self.waiting - 1 180 if self.waiting == 0: 181 self.checkin_mutex.release() 182 return 183 self.checkout_mutex.release() 184 185 186class BarrierTest(BasicThreadTest): 187 188 def test_barrier(self): 189 with support.wait_threads_exit(): 190 self.bar = Barrier(NUMTASKS) 191 self.running = NUMTASKS 192 for i in range(NUMTASKS): 193 thread.start_new_thread(self.task2, (i,)) 194 verbose_print("waiting for tasks to end") 195 self.done_mutex.acquire() 196 verbose_print("tasks done") 197 198 def task2(self, ident): 199 for i in range(NUMTRIPS): 200 if ident == 0: 201 # give it a good chance to enter the next 202 # barrier before the others are all out 203 # of the current one 204 delay = 0 205 else: 206 with self.random_mutex: 207 delay = random.random() / 10000.0 208 verbose_print("task %s will run for %sus" % 209 (ident, round(delay * 1e6))) 210 time.sleep(delay) 211 verbose_print("task %s entering %s" % (ident, i)) 212 self.bar.enter() 213 verbose_print("task %s leaving barrier" % ident) 214 with self.running_mutex: 215 self.running -= 1 216 # Must release mutex before releasing done, else the main thread can 217 # exit and set mutex to None as part of global teardown; then 218 # mutex.release() raises AttributeError. 219 finished = self.running == 0 220 if finished: 221 self.done_mutex.release() 222 223 224class LockTests(lock_tests.LockTests): 225 locktype = thread.allocate_lock 226 227 228class TestForkInThread(unittest.TestCase): 229 def setUp(self): 230 self.read_fd, self.write_fd = os.pipe() 231 232 @unittest.skipIf(sys.platform.startswith('win'), 233 "This test is only appropriate for POSIX-like systems.") 234 @support.reap_threads 235 def test_forkinthread(self): 236 non_local = {'status': None} 237 def thread1(): 238 try: 239 pid = os.fork() # fork in a thread 240 except RuntimeError: 241 sys.exit(0) # exit the child 242 243 if pid == 0: # child 244 os.close(self.read_fd) 245 os.write(self.write_fd, "OK") 246 # Exiting the thread normally in the child process can leave 247 # any additional threads (such as the one started by 248 # importing _tkinter) still running, and this can prevent 249 # the half-zombie child process from being cleaned up. See 250 # Issue #26456. 251 os._exit(0) 252 else: # parent 253 os.close(self.write_fd) 254 pid, status = os.waitpid(pid, 0) 255 non_local['status'] = status 256 257 with support.wait_threads_exit(): 258 thread.start_new_thread(thread1, ()) 259 self.assertEqual(os.read(self.read_fd, 2), "OK", 260 "Unable to fork() in thread") 261 self.assertEqual(non_local['status'], 0) 262 263 def tearDown(self): 264 try: 265 os.close(self.read_fd) 266 except OSError: 267 pass 268 269 try: 270 os.close(self.write_fd) 271 except OSError: 272 pass 273 274 275def test_main(): 276 support.run_unittest(ThreadRunningTests, BarrierTest, LockTests, 277 TestForkInThread) 278 279if __name__ == "__main__": 280 test_main() 281