1"""PyUnit testing that threads honor our signal semantics""" 2 3import unittest 4import signal 5import os 6import sys 7from test.support import threading_helper 8import _thread as thread 9import time 10 11if (sys.platform[:3] == 'win'): 12 raise unittest.SkipTest("Can't test signal on %s" % sys.platform) 13 14process_pid = os.getpid() 15signalled_all=thread.allocate_lock() 16 17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread' 18 and sys.thread_info.lock == 'mutex+cond') 19 20def registerSignals(for_usr1, for_usr2, for_alrm): 21 usr1 = signal.signal(signal.SIGUSR1, for_usr1) 22 usr2 = signal.signal(signal.SIGUSR2, for_usr2) 23 alrm = signal.signal(signal.SIGALRM, for_alrm) 24 return usr1, usr2, alrm 25 26 27# The signal handler. Just note that the signal occurred and 28# from who. 29def handle_signals(sig,frame): 30 signal_blackboard[sig]['tripped'] += 1 31 signal_blackboard[sig]['tripped_by'] = thread.get_ident() 32 33# a function that will be spawned as a separate thread. 34def send_signals(): 35 # We use `raise_signal` rather than `kill` because: 36 # * It verifies that a signal delivered to a background thread still has 37 # its Python-level handler called on the main thread. 38 # * It ensures the signal is handled before the thread exits. 39 signal.raise_signal(signal.SIGUSR1) 40 signal.raise_signal(signal.SIGUSR2) 41 signalled_all.release() 42 43 44@threading_helper.requires_working_threading() 45class ThreadSignals(unittest.TestCase): 46 47 def test_signals(self): 48 with threading_helper.wait_threads_exit(): 49 # Test signal handling semantics of threads. 50 # We spawn a thread, have the thread send itself two signals, and 51 # wait for it to finish. Check that we got both signals 52 # and that they were run by the main thread. 53 signalled_all.acquire() 54 self.spawnSignallingThread() 55 signalled_all.acquire() 56 57 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1) 58 self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'], 59 thread.get_ident()) 60 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1) 61 self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'], 62 thread.get_ident()) 63 signalled_all.release() 64 65 def spawnSignallingThread(self): 66 thread.start_new_thread(send_signals, ()) 67 68 def alarm_interrupt(self, sig, frame): 69 raise KeyboardInterrupt 70 71 @unittest.skipIf(USING_PTHREAD_COND, 72 'POSIX condition variables cannot be interrupted') 73 @unittest.skipIf(sys.platform.startswith('linux') and 74 not sys.thread_info.version, 75 'Issue 34004: musl does not allow interruption of locks ' 76 'by signals.') 77 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 78 @unittest.skipIf(sys.platform.startswith('openbsd'), 79 'lock cannot be interrupted on OpenBSD') 80 def test_lock_acquire_interruption(self): 81 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 82 # in a deadlock. 83 # XXX this test can fail when the legacy (non-semaphore) implementation 84 # of locks is used in thread_pthread.h, see issue #11223. 85 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 86 try: 87 lock = thread.allocate_lock() 88 lock.acquire() 89 signal.alarm(1) 90 t1 = time.monotonic() 91 self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5) 92 dt = time.monotonic() - t1 93 # Checking that KeyboardInterrupt was raised is not sufficient. 94 # We want to assert that lock.acquire() was interrupted because 95 # of the signal, not that the signal handler was called immediately 96 # after timeout return of lock.acquire() (which can fool assertRaises). 97 self.assertLess(dt, 3.0) 98 finally: 99 signal.alarm(0) 100 signal.signal(signal.SIGALRM, oldalrm) 101 102 @unittest.skipIf(USING_PTHREAD_COND, 103 'POSIX condition variables cannot be interrupted') 104 @unittest.skipIf(sys.platform.startswith('linux') and 105 not sys.thread_info.version, 106 'Issue 34004: musl does not allow interruption of locks ' 107 'by signals.') 108 # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD 109 @unittest.skipIf(sys.platform.startswith('openbsd'), 110 'lock cannot be interrupted on OpenBSD') 111 def test_rlock_acquire_interruption(self): 112 # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck 113 # in a deadlock. 114 # XXX this test can fail when the legacy (non-semaphore) implementation 115 # of locks is used in thread_pthread.h, see issue #11223. 116 oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) 117 try: 118 rlock = thread.RLock() 119 # For reentrant locks, the initial acquisition must be in another 120 # thread. 121 def other_thread(): 122 rlock.acquire() 123 124 with threading_helper.wait_threads_exit(): 125 thread.start_new_thread(other_thread, ()) 126 # Wait until we can't acquire it without blocking... 127 while rlock.acquire(blocking=False): 128 rlock.release() 129 time.sleep(0.01) 130 signal.alarm(1) 131 t1 = time.monotonic() 132 self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5) 133 dt = time.monotonic() - t1 134 # See rationale above in test_lock_acquire_interruption 135 self.assertLess(dt, 3.0) 136 finally: 137 signal.alarm(0) 138 signal.signal(signal.SIGALRM, oldalrm) 139 140 def acquire_retries_on_intr(self, lock): 141 self.sig_recvd = False 142 def my_handler(signal, frame): 143 self.sig_recvd = True 144 145 old_handler = signal.signal(signal.SIGUSR1, my_handler) 146 try: 147 def other_thread(): 148 # Acquire the lock in a non-main thread, so this test works for 149 # RLocks. 150 lock.acquire() 151 # Wait until the main thread is blocked in the lock acquire, and 152 # then wake it up with this. 153 time.sleep(0.5) 154 os.kill(process_pid, signal.SIGUSR1) 155 # Let the main thread take the interrupt, handle it, and retry 156 # the lock acquisition. Then we'll let it run. 157 time.sleep(0.5) 158 lock.release() 159 160 with threading_helper.wait_threads_exit(): 161 thread.start_new_thread(other_thread, ()) 162 # Wait until we can't acquire it without blocking... 163 while lock.acquire(blocking=False): 164 lock.release() 165 time.sleep(0.01) 166 result = lock.acquire() # Block while we receive a signal. 167 self.assertTrue(self.sig_recvd) 168 self.assertTrue(result) 169 finally: 170 signal.signal(signal.SIGUSR1, old_handler) 171 172 def test_lock_acquire_retries_on_intr(self): 173 self.acquire_retries_on_intr(thread.allocate_lock()) 174 175 def test_rlock_acquire_retries_on_intr(self): 176 self.acquire_retries_on_intr(thread.RLock()) 177 178 def test_interrupted_timed_acquire(self): 179 # Test to make sure we recompute lock acquisition timeouts when we 180 # receive a signal. Check this by repeatedly interrupting a lock 181 # acquire in the main thread, and make sure that the lock acquire times 182 # out after the right amount of time. 183 # NOTE: this test only behaves as expected if C signals get delivered 184 # to the main thread. Otherwise lock.acquire() itself doesn't get 185 # interrupted and the test trivially succeeds. 186 self.start = None 187 self.end = None 188 self.sigs_recvd = 0 189 done = thread.allocate_lock() 190 done.acquire() 191 lock = thread.allocate_lock() 192 lock.acquire() 193 def my_handler(signum, frame): 194 self.sigs_recvd += 1 195 old_handler = signal.signal(signal.SIGUSR1, my_handler) 196 try: 197 def timed_acquire(): 198 self.start = time.monotonic() 199 lock.acquire(timeout=0.5) 200 self.end = time.monotonic() 201 def send_signals(): 202 for _ in range(40): 203 time.sleep(0.02) 204 os.kill(process_pid, signal.SIGUSR1) 205 done.release() 206 207 with threading_helper.wait_threads_exit(): 208 # Send the signals from the non-main thread, since the main thread 209 # is the only one that can process signals. 210 thread.start_new_thread(send_signals, ()) 211 timed_acquire() 212 # Wait for thread to finish 213 done.acquire() 214 # This allows for some timing and scheduling imprecision 215 self.assertLess(self.end - self.start, 2.0) 216 self.assertGreater(self.end - self.start, 0.3) 217 # If the signal is received several times before PyErr_CheckSignals() 218 # is called, the handler will get called less than 40 times. Just 219 # check it's been called at least once. 220 self.assertGreater(self.sigs_recvd, 0) 221 finally: 222 signal.signal(signal.SIGUSR1, old_handler) 223 224 225def setUpModule(): 226 global signal_blackboard 227 228 signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 }, 229 signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 }, 230 signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } } 231 232 oldsigs = registerSignals(handle_signals, handle_signals, handle_signals) 233 unittest.addModuleCleanup(registerSignals, *oldsigs) 234 235 236if __name__ == '__main__': 237 unittest.main() 238