• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""PyUnit testing that threads honor our signal semantics"""
2
3import unittest
4import signal
5import os
6import sys
7from test import support
8import _thread as thread
9import time
10
11if (sys.platform[:3] == 'win'):
12    raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
13
14process_pid = os.getpid()
15signalled_all=thread.allocate_lock()
16
17USING_PTHREAD_COND = (sys.thread_info.name == 'pthread'
18                      and sys.thread_info.lock == 'mutex+cond')
19
20def registerSignals(for_usr1, for_usr2, for_alrm):
21    usr1 = signal.signal(signal.SIGUSR1, for_usr1)
22    usr2 = signal.signal(signal.SIGUSR2, for_usr2)
23    alrm = signal.signal(signal.SIGALRM, for_alrm)
24    return usr1, usr2, alrm
25
26
27# The signal handler. Just note that the signal occurred and
28# from who.
29def handle_signals(sig,frame):
30    signal_blackboard[sig]['tripped'] += 1
31    signal_blackboard[sig]['tripped_by'] = thread.get_ident()
32
33# a function that will be spawned as a separate thread.
34def send_signals():
35    os.kill(process_pid, signal.SIGUSR1)
36    os.kill(process_pid, signal.SIGUSR2)
37    signalled_all.release()
38
39class ThreadSignals(unittest.TestCase):
40
41    def test_signals(self):
42        with support.wait_threads_exit():
43            # Test signal handling semantics of threads.
44            # We spawn a thread, have the thread send two signals, and
45            # wait for it to finish. Check that we got both signals
46            # and that they were run by the main thread.
47            signalled_all.acquire()
48            self.spawnSignallingThread()
49            signalled_all.acquire()
50
51        # the signals that we asked the kernel to send
52        # will come back, but we don't know when.
53        # (it might even be after the thread exits
54        # and might be out of order.)  If we haven't seen
55        # the signals yet, send yet another signal and
56        # wait for it return.
57        if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
58           or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
59            try:
60                signal.alarm(1)
61                signal.pause()
62            finally:
63                signal.alarm(0)
64
65        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
66        self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],
67                           thread.get_ident())
68        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped'], 1)
69        self.assertEqual( signal_blackboard[signal.SIGUSR2]['tripped_by'],
70                           thread.get_ident())
71        signalled_all.release()
72
73    def spawnSignallingThread(self):
74        thread.start_new_thread(send_signals, ())
75
76    def alarm_interrupt(self, sig, frame):
77        raise KeyboardInterrupt
78
79    @unittest.skipIf(USING_PTHREAD_COND,
80                     'POSIX condition variables cannot be interrupted')
81    @unittest.skipIf(sys.platform.startswith('linux') and
82                     not sys.thread_info.version,
83                     'Issue 34004: musl does not allow interruption of locks '
84                     'by signals.')
85    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
86    @unittest.skipIf(sys.platform.startswith('openbsd'),
87                     'lock cannot be interrupted on OpenBSD')
88    def test_lock_acquire_interruption(self):
89        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
90        # in a deadlock.
91        # XXX this test can fail when the legacy (non-semaphore) implementation
92        # of locks is used in thread_pthread.h, see issue #11223.
93        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
94        try:
95            lock = thread.allocate_lock()
96            lock.acquire()
97            signal.alarm(1)
98            t1 = time.monotonic()
99            self.assertRaises(KeyboardInterrupt, lock.acquire, timeout=5)
100            dt = time.monotonic() - t1
101            # Checking that KeyboardInterrupt was raised is not sufficient.
102            # We want to assert that lock.acquire() was interrupted because
103            # of the signal, not that the signal handler was called immediately
104            # after timeout return of lock.acquire() (which can fool assertRaises).
105            self.assertLess(dt, 3.0)
106        finally:
107            signal.alarm(0)
108            signal.signal(signal.SIGALRM, oldalrm)
109
110    @unittest.skipIf(USING_PTHREAD_COND,
111                     'POSIX condition variables cannot be interrupted')
112    @unittest.skipIf(sys.platform.startswith('linux') and
113                     not sys.thread_info.version,
114                     'Issue 34004: musl does not allow interruption of locks '
115                     'by signals.')
116    # Issue #20564: sem_timedwait() cannot be interrupted on OpenBSD
117    @unittest.skipIf(sys.platform.startswith('openbsd'),
118                     'lock cannot be interrupted on OpenBSD')
119    def test_rlock_acquire_interruption(self):
120        # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck
121        # in a deadlock.
122        # XXX this test can fail when the legacy (non-semaphore) implementation
123        # of locks is used in thread_pthread.h, see issue #11223.
124        oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
125        try:
126            rlock = thread.RLock()
127            # For reentrant locks, the initial acquisition must be in another
128            # thread.
129            def other_thread():
130                rlock.acquire()
131
132            with support.wait_threads_exit():
133                thread.start_new_thread(other_thread, ())
134                # Wait until we can't acquire it without blocking...
135                while rlock.acquire(blocking=False):
136                    rlock.release()
137                    time.sleep(0.01)
138                signal.alarm(1)
139                t1 = time.monotonic()
140                self.assertRaises(KeyboardInterrupt, rlock.acquire, timeout=5)
141                dt = time.monotonic() - t1
142                # See rationale above in test_lock_acquire_interruption
143                self.assertLess(dt, 3.0)
144        finally:
145            signal.alarm(0)
146            signal.signal(signal.SIGALRM, oldalrm)
147
148    def acquire_retries_on_intr(self, lock):
149        self.sig_recvd = False
150        def my_handler(signal, frame):
151            self.sig_recvd = True
152
153        old_handler = signal.signal(signal.SIGUSR1, my_handler)
154        try:
155            def other_thread():
156                # Acquire the lock in a non-main thread, so this test works for
157                # RLocks.
158                lock.acquire()
159                # Wait until the main thread is blocked in the lock acquire, and
160                # then wake it up with this.
161                time.sleep(0.5)
162                os.kill(process_pid, signal.SIGUSR1)
163                # Let the main thread take the interrupt, handle it, and retry
164                # the lock acquisition.  Then we'll let it run.
165                time.sleep(0.5)
166                lock.release()
167
168            with support.wait_threads_exit():
169                thread.start_new_thread(other_thread, ())
170                # Wait until we can't acquire it without blocking...
171                while lock.acquire(blocking=False):
172                    lock.release()
173                    time.sleep(0.01)
174                result = lock.acquire()  # Block while we receive a signal.
175                self.assertTrue(self.sig_recvd)
176                self.assertTrue(result)
177        finally:
178            signal.signal(signal.SIGUSR1, old_handler)
179
180    def test_lock_acquire_retries_on_intr(self):
181        self.acquire_retries_on_intr(thread.allocate_lock())
182
183    def test_rlock_acquire_retries_on_intr(self):
184        self.acquire_retries_on_intr(thread.RLock())
185
186    def test_interrupted_timed_acquire(self):
187        # Test to make sure we recompute lock acquisition timeouts when we
188        # receive a signal.  Check this by repeatedly interrupting a lock
189        # acquire in the main thread, and make sure that the lock acquire times
190        # out after the right amount of time.
191        # NOTE: this test only behaves as expected if C signals get delivered
192        # to the main thread.  Otherwise lock.acquire() itself doesn't get
193        # interrupted and the test trivially succeeds.
194        self.start = None
195        self.end = None
196        self.sigs_recvd = 0
197        done = thread.allocate_lock()
198        done.acquire()
199        lock = thread.allocate_lock()
200        lock.acquire()
201        def my_handler(signum, frame):
202            self.sigs_recvd += 1
203        old_handler = signal.signal(signal.SIGUSR1, my_handler)
204        try:
205            def timed_acquire():
206                self.start = time.monotonic()
207                lock.acquire(timeout=0.5)
208                self.end = time.monotonic()
209            def send_signals():
210                for _ in range(40):
211                    time.sleep(0.02)
212                    os.kill(process_pid, signal.SIGUSR1)
213                done.release()
214
215            with support.wait_threads_exit():
216                # Send the signals from the non-main thread, since the main thread
217                # is the only one that can process signals.
218                thread.start_new_thread(send_signals, ())
219                timed_acquire()
220                # Wait for thread to finish
221                done.acquire()
222                # This allows for some timing and scheduling imprecision
223                self.assertLess(self.end - self.start, 2.0)
224                self.assertGreater(self.end - self.start, 0.3)
225                # If the signal is received several times before PyErr_CheckSignals()
226                # is called, the handler will get called less than 40 times. Just
227                # check it's been called at least once.
228                self.assertGreater(self.sigs_recvd, 0)
229        finally:
230            signal.signal(signal.SIGUSR1, old_handler)
231
232
233def test_main():
234    global signal_blackboard
235
236    signal_blackboard = { signal.SIGUSR1 : {'tripped': 0, 'tripped_by': 0 },
237                          signal.SIGUSR2 : {'tripped': 0, 'tripped_by': 0 },
238                          signal.SIGALRM : {'tripped': 0, 'tripped_by': 0 } }
239
240    oldsigs = registerSignals(handle_signals, handle_signals, handle_signals)
241    try:
242        support.run_unittest(ThreadSignals)
243    finally:
244        registerSignals(*oldsigs)
245
246if __name__ == '__main__':
247    test_main()
248