• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""PyUnit testing that threads honor our signal semantics"""
2
3import unittest
4import signal
5import os
6import sys
7from test.support import threading_helper
8import _thread as thread
9import time
10
11if (sys.platform[:3] == 'win'):
12    raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
13
14process_pid = os.getpid()
15signalled_all=thread.allocate_lock()
16
17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
18                      and sys.thread_info.lock == 'mutex+cond')
19
20def registerSignals(for_usr1, for_usr2, for_alrm):
21    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
22    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
23    alrm = signal.signal(signal.SIGALRM, for_alrm)
24    return usr1, usr2, alrm
25
26
27# The signal handler. Just note that the signal occurred and
28# from who.
29def handle_signals(sig,frame):
30    signal_blackboard[sig]['tripped'] += 1
31    signal_blackboard[sig]['tripped_by'] = thread.get_ident()
32
33# a function that will be spawned as a separate thread.
34def send_signals():
35    # We use `raise_signal` rather than `kill` because:
36    #   * It verifies that a signal delivered to a background thread still has
37    #     its Python-level handler called on the main thread.
38    #   * It ensures the signal is handled before the thread exits.
39    signal.raise_signal(signal.SIGUSR1)
40    signal.raise_signal(signal.SIGUSR2)
41    signalled_all.release()
42
43
44@threading_helper.requires_working_threading()
45class ThreadSignals(unittest.TestCase):
46
47    def test_signals(self):
48        with threading_helper.wait_threads_exit():
49            # Test signal handling semantics of threads.
50            # We spawn a thread, have the thread send itself two signals, and
51            # wait for it to finish. Check that we got both signals
52            # and that they were run by the main thread.
53            signalled_all.acquire()
54            self.spawnSignallingThread()
55            signalled_all.acquire()
56
57        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
58        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
59                           thread.get_ident())
60        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
61        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
62                           thread.get_ident())
63        signalled_all.release()
64
65    def spawnSignallingThread(self):
66        thread.start_new_thread(send_signals, ())
67
68    def alarm_interrupt(self, sig, frame):
69        raise KeyboardInterrupt
70
71    @unittest.skipIf(USING_PTHREAD_COND,
72                     'POSIX condition variables cannot be interrupted')
73    @unittest.skipIf(sys.platform.startswith('linux') and
74                     not sys.thread_info.version,
75                     'Issue 34004: musl does not allow interruption of locks '
76                     'by signals.')
77    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
78    @unittest.skipIf(sys.platform.startswith('openbsd'),
79                     'lock cannot be interrupted on OpenBSD')
80    def test_lock_acquire_interruption(self):
81        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
82        # in a deadlock.
83        # XXX this test can fail when the legacy (non-semaphore) implementation
84        # of locks is used in thread_pthread.h, see issue #11223.
85        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
86        try:
87            lock = thread.allocate_lock()
88            lock.acquire()
89            signal.alarm(1)
90            t1 = time.monotonic()
91            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
92            dt = time.monotonic() - t1
93            # Checking that KeyboardInterrupt was raised is not sufficient.
94            # We want to assert that lock.acquire() was interrupted because
95            # of the signal, not that the signal handler was called immediately
96            # after timeout return of lock.acquire() (which can fool assertRaises).
97            self.assertLess(dt, 3.0)
98        finally:
99            signal.alarm(0)
100            signal.signal(signal.SIGALRM, oldalrm)
101
102    @unittest.skipIf(USING_PTHREAD_COND,
103                     'POSIX condition variables cannot be interrupted')
104    @unittest.skipIf(sys.platform.startswith('linux') and
105                     not sys.thread_info.version,
106                     'Issue 34004: musl does not allow interruption of locks '
107                     'by signals.')
108    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
109    @unittest.skipIf(sys.platform.startswith('openbsd'),
110                     'lock cannot be interrupted on OpenBSD')
111    def test_rlock_acquire_interruption(self):
112        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
113        # in a deadlock.
114        # XXX this test can fail when the legacy (non-semaphore) implementation
115        # of locks is used in thread_pthread.h, see issue #11223.
116        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
117        try:
118            rlock = thread.RLock()
119            # For reentrant locks, the initial acquisition must be in another
120            # thread.
121            def other_thread():
122                rlock.acquire()
123
124            with threading_helper.wait_threads_exit():
125                thread.start_new_thread(other_thread, ())
126                # Wait until we can't acquire it without blocking...
127                while rlock.acquire(blocking=False):
128                    rlock.release()
129                    time.sleep(0.01)
130                signal.alarm(1)
131                t1 = time.monotonic()
132                self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
133                dt = time.monotonic() - t1
134                # See rationale above in test_lock_acquire_interruption
135                self.assertLess(dt, 3.0)
136        finally:
137            signal.alarm(0)
138            signal.signal(signal.SIGALRM, oldalrm)
139
140    def acquire_retries_on_intr(self, lock):
141        self.sig_recvd = False
142        def my_handler(signal, frame):
143            self.sig_recvd = True
144
145        old_handler = signal.signal(signal.SIGUSR1, my_handler)
146        try:
147            def other_thread():
148                # Acquire the lock in a non-main thread, so this test works for
149                # RLocks.
150                lock.acquire()
151                # Wait until the main thread is blocked in the lock acquire, and
152                # then wake it up with this.
153                time.sleep(0.5)
154                os.kill(process_pid, signal.SIGUSR1)
155                # Let the main thread take the interrupt, handle it, and retry
156                # the lock acquisition.  Then we'll let it run.
157                time.sleep(0.5)
158                lock.release()
159
160            with threading_helper.wait_threads_exit():
161                thread.start_new_thread(other_thread, ())
162                # Wait until we can't acquire it without blocking...
163                while lock.acquire(blocking=False):
164                    lock.release()
165                    time.sleep(0.01)
166                result = lock.acquire()  # Block while we receive a signal.
167                self.assertTrue(self.sig_recvd)
168                self.assertTrue(result)
169        finally:
170            signal.signal(signal.SIGUSR1, old_handler)
171
172    def test_lock_acquire_retries_on_intr(self):
173        self.acquire_retries_on_intr(thread.allocate_lock())
174
175    def test_rlock_acquire_retries_on_intr(self):
176        self.acquire_retries_on_intr(thread.RLock())
177
178    def test_interrupted_timed_acquire(self):
179        # Test to make sure we recompute lock acquisition timeouts when we
180        # receive a signal.  Check this by repeatedly interrupting a lock
181        # acquire in the main thread, and make sure that the lock acquire times
182        # out after the right amount of time.
183        # NOTE: this test only behaves as expected if C signals get delivered
184        # to the main thread.  Otherwise lock.acquire() itself doesn't get
185        # interrupted and the test trivially succeeds.
186        self.start = None
187        self.end = None
188        self.sigs_recvd = 0
189        done = thread.allocate_lock()
190        done.acquire()
191        lock = thread.allocate_lock()
192        lock.acquire()
193        def my_handler(signum, frame):
194            self.sigs_recvd += 1
195        old_handler = signal.signal(signal.SIGUSR1, my_handler)
196        try:
197            def timed_acquire():
198                self.start = time.monotonic()
199                lock.acquire(timeout=0.5)
200                self.end = time.monotonic()
201            def send_signals():
202                for _ in range(40):
203                    time.sleep(0.02)
204                    os.kill(process_pid, signal.SIGUSR1)
205                done.release()
206
207            with threading_helper.wait_threads_exit():
208                # Send the signals from the non-main thread, since the main thread
209                # is the only one that can process signals.
210                thread.start_new_thread(send_signals, ())
211                timed_acquire()
212                # Wait for thread to finish
213                done.acquire()
214                # This allows for some timing and scheduling imprecision
215                self.assertLess(self.end - self.start, 2.0)
216                self.assertGreater(self.end - self.start, 0.3)
217                # If the signal is received several times before PyErr_CheckSignals()
218                # is called, the handler will get called less than 40 times. Just
219                # check it's been called at least once.
220                self.assertGreater(self.sigs_recvd, 0)
221        finally:
222            signal.signal(signal.SIGUSR1, old_handler)
223
224
225def setUpModule():
226    global signal_blackboard
227
228    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
229                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
230                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
231
232    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
233    unittest.addModuleCleanup(registerSignals, *oldsigs)
234
235
236if __name__ == '__main__':
237    unittest.main()
238