• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import enum
2import errno
3import functools
4import inspect
5import os
6import random
7import signal
8import socket
9import statistics
10import subprocess
11import sys
12import threading
13import time
14import unittest
15from test import support
16from test.support import (
17    is_apple, is_apple_mobile, os_helper, threading_helper
18)
19from test.support.script_helper import assert_python_ok, spawn_python
20try:
21    import _testcapi
22except ImportError:
23    _testcapi = None
24
25
26class GenericTests(unittest.TestCase):
27
28    def test_enums(self):
29        for name in dir(signal):
30            sig = getattr(signal, name)
31            if name in {'SIG_DFL', 'SIG_IGN'}:
32                self.assertIsInstance(sig, signal.Handlers)
33            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
34                self.assertIsInstance(sig, signal.Sigmasks)
35            elif name.startswith('SIG') and not name.startswith('SIG_'):
36                self.assertIsInstance(sig, signal.Signals)
37            elif name.startswith('CTRL_'):
38                self.assertIsInstance(sig, signal.Signals)
39                self.assertEqual(sys.platform, "win32")
40
41        CheckedSignals = enum._old_convert_(
42                enum.IntEnum, 'Signals', 'signal',
43                lambda name:
44                    name.isupper()
45                    and (name.startswith('SIG') and not name.startswith('SIG_'))
46                    or name.startswith('CTRL_'),
47                source=signal,
48                )
49        enum._test_simple_enum(CheckedSignals, signal.Signals)
50
51        CheckedHandlers = enum._old_convert_(
52                enum.IntEnum, 'Handlers', 'signal',
53                lambda name: name in ('SIG_DFL', 'SIG_IGN'),
54                source=signal,
55                )
56        enum._test_simple_enum(CheckedHandlers, signal.Handlers)
57
58        Sigmasks = getattr(signal, 'Sigmasks', None)
59        if Sigmasks is not None:
60            CheckedSigmasks = enum._old_convert_(
61                    enum.IntEnum, 'Sigmasks', 'signal',
62                    lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
63                    source=signal,
64                    )
65            enum._test_simple_enum(CheckedSigmasks, Sigmasks)
66
67    def test_functions_module_attr(self):
68        # Issue #27718: If __all__ is not defined all non-builtin functions
69        # should have correct __module__ to be displayed by pydoc.
70        for name in dir(signal):
71            value = getattr(signal, name)
72            if inspect.isroutine(value) and not inspect.isbuiltin(value):
73                self.assertEqual(value.__module__, 'signal')
74
75
76@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
77class PosixTests(unittest.TestCase):
78    def trivial_signal_handler(self, *args):
79        pass
80
81    def create_handler_with_partial(self, argument):
82        return functools.partial(self.trivial_signal_handler, argument)
83
84    def test_out_of_range_signal_number_raises_error(self):
85        self.assertRaises(ValueError, signal.getsignal, 4242)
86
87        self.assertRaises(ValueError, signal.signal, 4242,
88                          self.trivial_signal_handler)
89
90        self.assertRaises(ValueError, signal.strsignal, 4242)
91
92    def test_setting_signal_handler_to_none_raises_error(self):
93        self.assertRaises(TypeError, signal.signal,
94                          signal.SIGUSR1, None)
95
96    def test_getsignal(self):
97        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
98        self.assertIsInstance(hup, signal.Handlers)
99        self.assertEqual(signal.getsignal(signal.SIGHUP),
100                         self.trivial_signal_handler)
101        signal.signal(signal.SIGHUP, hup)
102        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
103
104    def test_no_repr_is_called_on_signal_handler(self):
105        # See https://github.com/python/cpython/issues/112559.
106
107        class MyArgument:
108            def __init__(self):
109                self.repr_count = 0
110
111            def __repr__(self):
112                self.repr_count += 1
113                return super().__repr__()
114
115        argument = MyArgument()
116        self.assertEqual(0, argument.repr_count)
117
118        handler = self.create_handler_with_partial(argument)
119        hup = signal.signal(signal.SIGHUP, handler)
120        self.assertIsInstance(hup, signal.Handlers)
121        self.assertEqual(signal.getsignal(signal.SIGHUP), handler)
122        signal.signal(signal.SIGHUP, hup)
123        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
124        self.assertEqual(0, argument.repr_count)
125
126    @unittest.skipIf(sys.platform.startswith("netbsd"),
127                     "gh-124083: strsignal is not supported on NetBSD")
128    def test_strsignal(self):
129        self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
130        self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
131        self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
132
133    # Issue 3864, unknown if this affects earlier versions of freebsd also
134    def test_interprocess_signal(self):
135        dirname = os.path.dirname(__file__)
136        script = os.path.join(dirname, 'signalinterproctester.py')
137        assert_python_ok(script)
138
139    @unittest.skipUnless(
140        hasattr(signal, "valid_signals"),
141        "requires signal.valid_signals"
142    )
143    def test_valid_signals(self):
144        s = signal.valid_signals()
145        self.assertIsInstance(s, set)
146        self.assertIn(signal.Signals.SIGINT, s)
147        self.assertIn(signal.Signals.SIGALRM, s)
148        self.assertNotIn(0, s)
149        self.assertNotIn(signal.NSIG, s)
150        self.assertLess(len(s), signal.NSIG)
151
152        # gh-91145: Make sure that all SIGxxx constants exposed by the Python
153        # signal module have a number in the [0; signal.NSIG-1] range.
154        for name in dir(signal):
155            if not name.startswith("SIG"):
156                continue
157            if name in {"SIG_IGN", "SIG_DFL"}:
158                # SIG_IGN and SIG_DFL are pointers
159                continue
160            with self.subTest(name=name):
161                signum = getattr(signal, name)
162                self.assertGreaterEqual(signum, 0)
163                self.assertLess(signum, signal.NSIG)
164
165    @unittest.skipUnless(sys.executable, "sys.executable required.")
166    @support.requires_subprocess()
167    def test_keyboard_interrupt_exit_code(self):
168        """KeyboardInterrupt triggers exit via SIGINT."""
169        process = subprocess.run(
170                [sys.executable, "-c",
171                 "import os, signal, time\n"
172                 "os.kill(os.getpid(), signal.SIGINT)\n"
173                 "for _ in range(999): time.sleep(0.01)"],
174                stderr=subprocess.PIPE)
175        self.assertIn(b"KeyboardInterrupt", process.stderr)
176        self.assertEqual(process.returncode, -signal.SIGINT)
177        # Caveat: The exit code is insufficient to guarantee we actually died
178        # via a signal.  POSIX shells do more than look at the 8 bit value.
179        # Writing an automation friendly test of an interactive shell
180        # to confirm that our process died via a SIGINT proved too complex.
181
182
183@unittest.skipUnless(sys.platform == "win32", "Windows specific")
184class WindowsSignalTests(unittest.TestCase):
185
186    def test_valid_signals(self):
187        s = signal.valid_signals()
188        self.assertIsInstance(s, set)
189        self.assertGreaterEqual(len(s), 6)
190        self.assertIn(signal.Signals.SIGINT, s)
191        self.assertNotIn(0, s)
192        self.assertNotIn(signal.NSIG, s)
193        self.assertLess(len(s), signal.NSIG)
194
195    def test_issue9324(self):
196        # Updated for issue #10003, adding SIGBREAK
197        handler = lambda x, y: None
198        checked = set()
199        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
200                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
201                    signal.SIGTERM):
202            # Set and then reset a handler for signals that work on windows.
203            # Issue #18396, only for signals without a C-level handler.
204            if signal.getsignal(sig) is not None:
205                signal.signal(sig, signal.signal(sig, handler))
206                checked.add(sig)
207        # Issue #18396: Ensure the above loop at least tested *something*
208        self.assertTrue(checked)
209
210        with self.assertRaises(ValueError):
211            signal.signal(-1, handler)
212
213        with self.assertRaises(ValueError):
214            signal.signal(7, handler)
215
216    @unittest.skipUnless(sys.executable, "sys.executable required.")
217    @support.requires_subprocess()
218    def test_keyboard_interrupt_exit_code(self):
219        """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
220        # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
221        # as that requires setting up a console control handler in a child
222        # in its own process group.  Doable, but quite complicated.  (see
223        # @eryksun on https://github.com/python/cpython/pull/11862)
224        process = subprocess.run(
225                [sys.executable, "-c", "raise KeyboardInterrupt"],
226                stderr=subprocess.PIPE)
227        self.assertIn(b"KeyboardInterrupt", process.stderr)
228        STATUS_CONTROL_C_EXIT = 0xC000013A
229        self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
230
231
232class WakeupFDTests(unittest.TestCase):
233
234    def test_invalid_call(self):
235        # First parameter is positional-only
236        with self.assertRaises(TypeError):
237            signal.set_wakeup_fd(signum=signal.SIGINT)
238
239        # warn_on_full_buffer is a keyword-only parameter
240        with self.assertRaises(TypeError):
241            signal.set_wakeup_fd(signal.SIGINT, False)
242
243    def test_invalid_fd(self):
244        fd = os_helper.make_bad_fd()
245        self.assertRaises((ValueError, OSError),
246                          signal.set_wakeup_fd, fd)
247
248    @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
249    def test_invalid_socket(self):
250        sock = socket.socket()
251        fd = sock.fileno()
252        sock.close()
253        self.assertRaises((ValueError, OSError),
254                          signal.set_wakeup_fd, fd)
255
256    # Emscripten does not support fstat on pipes yet.
257    # https://github.com/emscripten-core/emscripten/issues/16414
258    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
259    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
260    def test_set_wakeup_fd_result(self):
261        r1, w1 = os.pipe()
262        self.addCleanup(os.close, r1)
263        self.addCleanup(os.close, w1)
264        r2, w2 = os.pipe()
265        self.addCleanup(os.close, r2)
266        self.addCleanup(os.close, w2)
267
268        if hasattr(os, 'set_blocking'):
269            os.set_blocking(w1, False)
270            os.set_blocking(w2, False)
271
272        signal.set_wakeup_fd(w1)
273        self.assertEqual(signal.set_wakeup_fd(w2), w1)
274        self.assertEqual(signal.set_wakeup_fd(-1), w2)
275        self.assertEqual(signal.set_wakeup_fd(-1), -1)
276
277    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
278    @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
279    def test_set_wakeup_fd_socket_result(self):
280        sock1 = socket.socket()
281        self.addCleanup(sock1.close)
282        sock1.setblocking(False)
283        fd1 = sock1.fileno()
284
285        sock2 = socket.socket()
286        self.addCleanup(sock2.close)
287        sock2.setblocking(False)
288        fd2 = sock2.fileno()
289
290        signal.set_wakeup_fd(fd1)
291        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
292        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
293        self.assertEqual(signal.set_wakeup_fd(-1), -1)
294
295    # On Windows, files are always blocking and Windows does not provide a
296    # function to test if a socket is in non-blocking mode.
297    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
298    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
299    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
300    def test_set_wakeup_fd_blocking(self):
301        rfd, wfd = os.pipe()
302        self.addCleanup(os.close, rfd)
303        self.addCleanup(os.close, wfd)
304
305        # fd must be non-blocking
306        os.set_blocking(wfd, True)
307        with self.assertRaises(ValueError) as cm:
308            signal.set_wakeup_fd(wfd)
309        self.assertEqual(str(cm.exception),
310                         "the fd %s must be in non-blocking mode" % wfd)
311
312        # non-blocking is ok
313        os.set_blocking(wfd, False)
314        signal.set_wakeup_fd(wfd)
315        signal.set_wakeup_fd(-1)
316
317
318@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
319class WakeupSignalTests(unittest.TestCase):
320    @unittest.skipIf(_testcapi is None, 'need _testcapi')
321    def check_wakeup(self, test_body, *signals, ordered=True):
322        # use a subprocess to have only one thread
323        code = """if 1:
324        import _testcapi
325        import os
326        import signal
327        import struct
328
329        signals = {!r}
330
331        def handler(signum, frame):
332            pass
333
334        def check_signum(signals):
335            data = os.read(read, len(signals)+1)
336            raised = struct.unpack('%uB' % len(data), data)
337            if not {!r}:
338                raised = set(raised)
339                signals = set(signals)
340            if raised != signals:
341                raise Exception("%r != %r" % (raised, signals))
342
343        {}
344
345        signal.signal(signal.SIGALRM, handler)
346        read, write = os.pipe()
347        os.set_blocking(write, False)
348        signal.set_wakeup_fd(write)
349
350        test()
351        check_signum(signals)
352
353        os.close(read)
354        os.close(write)
355        """.format(tuple(map(int, signals)), ordered, test_body)
356
357        assert_python_ok('-c', code)
358
359    @unittest.skipIf(_testcapi is None, 'need _testcapi')
360    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
361    def test_wakeup_write_error(self):
362        # Issue #16105: write() errors in the C signal handler should not
363        # pass silently.
364        # Use a subprocess to have only one thread.
365        code = """if 1:
366        import _testcapi
367        import errno
368        import os
369        import signal
370        import sys
371        from test.support import captured_stderr
372
373        def handler(signum, frame):
374            1/0
375
376        signal.signal(signal.SIGALRM, handler)
377        r, w = os.pipe()
378        os.set_blocking(r, False)
379
380        # Set wakeup_fd a read-only file descriptor to trigger the error
381        signal.set_wakeup_fd(r)
382        try:
383            with captured_stderr() as err:
384                signal.raise_signal(signal.SIGALRM)
385        except ZeroDivisionError:
386            # An ignored exception should have been printed out on stderr
387            err = err.getvalue()
388            if ('Exception ignored when trying to write to the signal wakeup fd'
389                not in err):
390                raise AssertionError(err)
391            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
392                raise AssertionError(err)
393        else:
394            raise AssertionError("ZeroDivisionError not raised")
395
396        os.close(r)
397        os.close(w)
398        """
399        r, w = os.pipe()
400        try:
401            os.write(r, b'x')
402        except OSError:
403            pass
404        else:
405            self.skipTest("OS doesn't report write() error on the read end of a pipe")
406        finally:
407            os.close(r)
408            os.close(w)
409
410        assert_python_ok('-c', code)
411
412    def test_wakeup_fd_early(self):
413        self.check_wakeup("""def test():
414            import select
415            import time
416
417            TIMEOUT_FULL = 10
418            TIMEOUT_HALF = 5
419
420            class InterruptSelect(Exception):
421                pass
422
423            def handler(signum, frame):
424                raise InterruptSelect
425            signal.signal(signal.SIGALRM, handler)
426
427            signal.alarm(1)
428
429            # We attempt to get a signal during the sleep,
430            # before select is called
431            try:
432                select.select([], [], [], TIMEOUT_FULL)
433            except InterruptSelect:
434                pass
435            else:
436                raise Exception("select() was not interrupted")
437
438            before_time = time.monotonic()
439            select.select([read], [], [], TIMEOUT_FULL)
440            after_time = time.monotonic()
441            dt = after_time - before_time
442            if dt >= TIMEOUT_HALF:
443                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
444        """, signal.SIGALRM)
445
446    def test_wakeup_fd_during(self):
447        self.check_wakeup("""def test():
448            import select
449            import time
450
451            TIMEOUT_FULL = 10
452            TIMEOUT_HALF = 5
453
454            class InterruptSelect(Exception):
455                pass
456
457            def handler(signum, frame):
458                raise InterruptSelect
459            signal.signal(signal.SIGALRM, handler)
460
461            signal.alarm(1)
462            before_time = time.monotonic()
463            # We attempt to get a signal during the select call
464            try:
465                select.select([read], [], [], TIMEOUT_FULL)
466            except InterruptSelect:
467                pass
468            else:
469                raise Exception("select() was not interrupted")
470            after_time = time.monotonic()
471            dt = after_time - before_time
472            if dt >= TIMEOUT_HALF:
473                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
474        """, signal.SIGALRM)
475
476    def test_signum(self):
477        self.check_wakeup("""def test():
478            signal.signal(signal.SIGUSR1, handler)
479            signal.raise_signal(signal.SIGUSR1)
480            signal.raise_signal(signal.SIGALRM)
481        """, signal.SIGUSR1, signal.SIGALRM)
482
483    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
484                         'need signal.pthread_sigmask()')
485    def test_pending(self):
486        self.check_wakeup("""def test():
487            signum1 = signal.SIGUSR1
488            signum2 = signal.SIGUSR2
489
490            signal.signal(signum1, handler)
491            signal.signal(signum2, handler)
492
493            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
494            signal.raise_signal(signum1)
495            signal.raise_signal(signum2)
496            # Unblocking the 2 signals calls the C signal handler twice
497            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
498        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
499
500
501@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
502class WakeupSocketSignalTests(unittest.TestCase):
503
504    @unittest.skipIf(_testcapi is None, 'need _testcapi')
505    def test_socket(self):
506        # use a subprocess to have only one thread
507        code = """if 1:
508        import signal
509        import socket
510        import struct
511        import _testcapi
512
513        signum = signal.SIGINT
514        signals = (signum,)
515
516        def handler(signum, frame):
517            pass
518
519        signal.signal(signum, handler)
520
521        read, write = socket.socketpair()
522        write.setblocking(False)
523        signal.set_wakeup_fd(write.fileno())
524
525        signal.raise_signal(signum)
526
527        data = read.recv(1)
528        if not data:
529            raise Exception("no signum written")
530        raised = struct.unpack('B', data)
531        if raised != signals:
532            raise Exception("%r != %r" % (raised, signals))
533
534        read.close()
535        write.close()
536        """
537
538        assert_python_ok('-c', code)
539
540    @unittest.skipIf(_testcapi is None, 'need _testcapi')
541    def test_send_error(self):
542        # Use a subprocess to have only one thread.
543        if os.name == 'nt':
544            action = 'send'
545        else:
546            action = 'write'
547        code = """if 1:
548        import errno
549        import signal
550        import socket
551        import sys
552        import time
553        import _testcapi
554        from test.support import captured_stderr
555
556        signum = signal.SIGINT
557
558        def handler(signum, frame):
559            pass
560
561        signal.signal(signum, handler)
562
563        read, write = socket.socketpair()
564        read.setblocking(False)
565        write.setblocking(False)
566
567        signal.set_wakeup_fd(write.fileno())
568
569        # Close sockets: send() will fail
570        read.close()
571        write.close()
572
573        with captured_stderr() as err:
574            signal.raise_signal(signum)
575
576        err = err.getvalue()
577        if ('Exception ignored when trying to {action} to the signal wakeup fd'
578            not in err):
579            raise AssertionError(err)
580        """.format(action=action)
581        assert_python_ok('-c', code)
582
583    @unittest.skipIf(_testcapi is None, 'need _testcapi')
584    def test_warn_on_full_buffer(self):
585        # Use a subprocess to have only one thread.
586        if os.name == 'nt':
587            action = 'send'
588        else:
589            action = 'write'
590        code = """if 1:
591        import errno
592        import signal
593        import socket
594        import sys
595        import time
596        import _testcapi
597        from test.support import captured_stderr
598
599        signum = signal.SIGINT
600
601        # This handler will be called, but we intentionally won't read from
602        # the wakeup fd.
603        def handler(signum, frame):
604            pass
605
606        signal.signal(signum, handler)
607
608        read, write = socket.socketpair()
609
610        # Fill the socketpair buffer
611        if sys.platform == 'win32':
612            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
613            # the full socketpair buffer, so use a timeout of 50 ms instead.
614            write.settimeout(0.050)
615        else:
616            write.setblocking(False)
617
618        written = 0
619        if sys.platform == "vxworks":
620            CHUNK_SIZES = (1,)
621        else:
622            # Start with large chunk size to reduce the
623            # number of send needed to fill the buffer.
624            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
625        for chunk_size in CHUNK_SIZES:
626            chunk = b"x" * chunk_size
627            try:
628                while True:
629                    write.send(chunk)
630                    written += chunk_size
631            except (BlockingIOError, TimeoutError):
632                pass
633
634        print(f"%s bytes written into the socketpair" % written, flush=True)
635
636        write.setblocking(False)
637        try:
638            write.send(b"x")
639        except BlockingIOError:
640            # The socketpair buffer seems full
641            pass
642        else:
643            raise AssertionError("%s bytes failed to fill the socketpair "
644                                 "buffer" % written)
645
646        # By default, we get a warning when a signal arrives
647        msg = ('Exception ignored when trying to {action} '
648               'to the signal wakeup fd')
649        signal.set_wakeup_fd(write.fileno())
650
651        with captured_stderr() as err:
652            signal.raise_signal(signum)
653
654        err = err.getvalue()
655        if msg not in err:
656            raise AssertionError("first set_wakeup_fd() test failed, "
657                                 "stderr: %r" % err)
658
659        # And also if warn_on_full_buffer=True
660        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
661
662        with captured_stderr() as err:
663            signal.raise_signal(signum)
664
665        err = err.getvalue()
666        if msg not in err:
667            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
668                                 "test failed, stderr: %r" % err)
669
670        # But not if warn_on_full_buffer=False
671        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
672
673        with captured_stderr() as err:
674            signal.raise_signal(signum)
675
676        err = err.getvalue()
677        if err != "":
678            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
679                                 "test failed, stderr: %r" % err)
680
681        # And then check the default again, to make sure warn_on_full_buffer
682        # settings don't leak across calls.
683        signal.set_wakeup_fd(write.fileno())
684
685        with captured_stderr() as err:
686            signal.raise_signal(signum)
687
688        err = err.getvalue()
689        if msg not in err:
690            raise AssertionError("second set_wakeup_fd() test failed, "
691                                 "stderr: %r" % err)
692
693        """.format(action=action)
694        assert_python_ok('-c', code)
695
696
697@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
698@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
699@support.requires_subprocess()
700@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
701class SiginterruptTest(unittest.TestCase):
702
703    def readpipe_interrupted(self, interrupt, timeout=support.SHORT_TIMEOUT):
704        """Perform a read during which a signal will arrive.  Return True if the
705        read is interrupted by the signal and raises an exception.  Return False
706        if it returns normally.
707        """
708        # use a subprocess to have only one thread, to have a timeout on the
709        # blocking read and to not touch signal handling in this process
710        code = """if 1:
711            import errno
712            import os
713            import signal
714            import sys
715
716            interrupt = %r
717            r, w = os.pipe()
718
719            def handler(signum, frame):
720                1 / 0
721
722            signal.signal(signal.SIGALRM, handler)
723            if interrupt is not None:
724                signal.siginterrupt(signal.SIGALRM, interrupt)
725
726            print("ready")
727            sys.stdout.flush()
728
729            # run the test twice
730            try:
731                for loop in range(2):
732                    # send a SIGALRM in a second (during the read)
733                    signal.alarm(1)
734                    try:
735                        # blocking call: read from a pipe without data
736                        os.read(r, 1)
737                    except ZeroDivisionError:
738                        pass
739                    else:
740                        sys.exit(2)
741                sys.exit(3)
742            finally:
743                os.close(r)
744                os.close(w)
745        """ % (interrupt,)
746        with spawn_python('-c', code) as process:
747            try:
748                # wait until the child process is loaded and has started
749                first_line = process.stdout.readline()
750
751                stdout, stderr = process.communicate(timeout=timeout)
752            except subprocess.TimeoutExpired:
753                process.kill()
754                return False
755            else:
756                stdout = first_line + stdout
757                exitcode = process.wait()
758                if exitcode not in (2, 3):
759                    raise Exception("Child error (exit code %s): %r"
760                                    % (exitcode, stdout))
761                return (exitcode == 3)
762
763    def test_without_siginterrupt(self):
764        # If a signal handler is installed and siginterrupt is not called
765        # at all, when that signal arrives, it interrupts a syscall that's in
766        # progress.
767        interrupted = self.readpipe_interrupted(None)
768        self.assertTrue(interrupted)
769
770    def test_siginterrupt_on(self):
771        # If a signal handler is installed and siginterrupt is called with
772        # a true value for the second argument, when that signal arrives, it
773        # interrupts a syscall that's in progress.
774        interrupted = self.readpipe_interrupted(True)
775        self.assertTrue(interrupted)
776
777    @support.requires_resource('walltime')
778    def test_siginterrupt_off(self):
779        # If a signal handler is installed and siginterrupt is called with
780        # a false value for the second argument, when that signal arrives, it
781        # does not interrupt a syscall that's in progress.
782        interrupted = self.readpipe_interrupted(False, timeout=2)
783        self.assertFalse(interrupted)
784
785
786@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
787@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
788                         "needs signal.getitimer() and signal.setitimer()")
789class ItimerTest(unittest.TestCase):
790    def setUp(self):
791        self.hndl_called = False
792        self.hndl_count = 0
793        self.itimer = None
794        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
795
796    def tearDown(self):
797        signal.signal(signal.SIGALRM, self.old_alarm)
798        if self.itimer is not None: # test_itimer_exc doesn't change this attr
799            # just ensure that itimer is stopped
800            signal.setitimer(self.itimer, 0)
801
802    def sig_alrm(self, *args):
803        self.hndl_called = True
804
805    def sig_vtalrm(self, *args):
806        self.hndl_called = True
807
808        if self.hndl_count > 3:
809            # it shouldn't be here, because it should have been disabled.
810            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
811                "timer.")
812        elif self.hndl_count == 3:
813            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
814            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
815
816        self.hndl_count += 1
817
818    def sig_prof(self, *args):
819        self.hndl_called = True
820        signal.setitimer(signal.ITIMER_PROF, 0)
821
822    def test_itimer_exc(self):
823        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
824        # defines it ?
825        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
826        # Negative times are treated as zero on some platforms.
827        if 0:
828            self.assertRaises(signal.ItimerError,
829                              signal.setitimer, signal.ITIMER_REAL, -1)
830
831    def test_itimer_real(self):
832        self.itimer = signal.ITIMER_REAL
833        signal.setitimer(self.itimer, 1.0)
834        signal.pause()
835        self.assertEqual(self.hndl_called, True)
836
837    # Issue 3864, unknown if this affects earlier versions of freebsd also
838    @unittest.skipIf(sys.platform in ('netbsd5',) or is_apple_mobile,
839        'itimer not reliable (does not mix well with threading) on some BSDs.')
840    def test_itimer_virtual(self):
841        self.itimer = signal.ITIMER_VIRTUAL
842        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
843        signal.setitimer(self.itimer, 0.3, 0.2)
844
845        for _ in support.busy_retry(support.LONG_TIMEOUT):
846            # use up some virtual time by doing real work
847            _ = pow(12345, 67890, 10000019)
848            if signal.getitimer(self.itimer) == (0.0, 0.0):
849                # sig_vtalrm handler stopped this itimer
850                break
851
852        # virtual itimer should be (0.0, 0.0) now
853        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
854        # and the handler should have been called
855        self.assertEqual(self.hndl_called, True)
856
857    def test_itimer_prof(self):
858        self.itimer = signal.ITIMER_PROF
859        signal.signal(signal.SIGPROF, self.sig_prof)
860        signal.setitimer(self.itimer, 0.2, 0.2)
861
862        for _ in support.busy_retry(support.LONG_TIMEOUT):
863            # do some work
864            _ = pow(12345, 67890, 10000019)
865            if signal.getitimer(self.itimer) == (0.0, 0.0):
866                # sig_prof handler stopped this itimer
867                break
868
869        # profiling itimer should be (0.0, 0.0) now
870        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
871        # and the handler should have been called
872        self.assertEqual(self.hndl_called, True)
873
874    def test_setitimer_tiny(self):
875        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
876        # Check that float -> timeval conversion doesn't round
877        # the interval down to zero, which would disable the timer.
878        self.itimer = signal.ITIMER_REAL
879        signal.setitimer(self.itimer, 1e-6)
880        time.sleep(1)
881        self.assertEqual(self.hndl_called, True)
882
883
884class PendingSignalsTests(unittest.TestCase):
885    """
886    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
887    functions.
888    """
889    @unittest.skipUnless(hasattr(signal, 'sigpending'),
890                         'need signal.sigpending()')
891    def test_sigpending_empty(self):
892        self.assertEqual(signal.sigpending(), set())
893
894    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
895                         'need signal.pthread_sigmask()')
896    @unittest.skipUnless(hasattr(signal, 'sigpending'),
897                         'need signal.sigpending()')
898    def test_sigpending(self):
899        code = """if 1:
900            import os
901            import signal
902
903            def handler(signum, frame):
904                1/0
905
906            signum = signal.SIGUSR1
907            signal.signal(signum, handler)
908
909            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
910            os.kill(os.getpid(), signum)
911            pending = signal.sigpending()
912            for sig in pending:
913                assert isinstance(sig, signal.Signals), repr(pending)
914            if pending != {signum}:
915                raise Exception('%s != {%s}' % (pending, signum))
916            try:
917                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
918            except ZeroDivisionError:
919                pass
920            else:
921                raise Exception("ZeroDivisionError not raised")
922        """
923        assert_python_ok('-c', code)
924
925    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
926                         'need signal.pthread_kill()')
927    @threading_helper.requires_working_threading()
928    def test_pthread_kill(self):
929        code = """if 1:
930            import signal
931            import threading
932            import sys
933
934            signum = signal.SIGUSR1
935
936            def handler(signum, frame):
937                1/0
938
939            signal.signal(signum, handler)
940
941            tid = threading.get_ident()
942            try:
943                signal.pthread_kill(tid, signum)
944            except ZeroDivisionError:
945                pass
946            else:
947                raise Exception("ZeroDivisionError not raised")
948        """
949        assert_python_ok('-c', code)
950
951    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
952                         'need signal.pthread_sigmask()')
953    def wait_helper(self, blocked, test):
954        """
955        test: body of the "def test(signum):" function.
956        blocked: number of the blocked signal
957        """
958        code = '''if 1:
959        import signal
960        import sys
961        from signal import Signals
962
963        def handler(signum, frame):
964            1/0
965
966        %s
967
968        blocked = %s
969        signum = signal.SIGALRM
970
971        # child: block and wait the signal
972        try:
973            signal.signal(signum, handler)
974            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
975
976            # Do the tests
977            test(signum)
978
979            # The handler must not be called on unblock
980            try:
981                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
982            except ZeroDivisionError:
983                print("the signal handler has been called",
984                      file=sys.stderr)
985                sys.exit(1)
986        except BaseException as err:
987            print("error: {}".format(err), file=sys.stderr)
988            sys.stderr.flush()
989            sys.exit(1)
990        ''' % (test.strip(), blocked)
991
992        # sig*wait* must be called with the signal blocked: since the current
993        # process might have several threads running, use a subprocess to have
994        # a single thread.
995        assert_python_ok('-c', code)
996
997    @unittest.skipUnless(hasattr(signal, 'sigwait'),
998                         'need signal.sigwait()')
999    def test_sigwait(self):
1000        self.wait_helper(signal.SIGALRM, '''
1001        def test(signum):
1002            signal.alarm(1)
1003            received = signal.sigwait([signum])
1004            assert isinstance(received, signal.Signals), received
1005            if received != signum:
1006                raise Exception('received %s, not %s' % (received, signum))
1007        ''')
1008
1009    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
1010                         'need signal.sigwaitinfo()')
1011    def test_sigwaitinfo(self):
1012        self.wait_helper(signal.SIGALRM, '''
1013        def test(signum):
1014            signal.alarm(1)
1015            info = signal.sigwaitinfo([signum])
1016            if info.si_signo != signum:
1017                raise Exception("info.si_signo != %s" % signum)
1018        ''')
1019
1020    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1021                         'need signal.sigtimedwait()')
1022    def test_sigtimedwait(self):
1023        self.wait_helper(signal.SIGALRM, '''
1024        def test(signum):
1025            signal.alarm(1)
1026            info = signal.sigtimedwait([signum], 10.1000)
1027            if info.si_signo != signum:
1028                raise Exception('info.si_signo != %s' % signum)
1029        ''')
1030
1031    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1032                         'need signal.sigtimedwait()')
1033    def test_sigtimedwait_poll(self):
1034        # check that polling with sigtimedwait works
1035        self.wait_helper(signal.SIGALRM, '''
1036        def test(signum):
1037            import os
1038            os.kill(os.getpid(), signum)
1039            info = signal.sigtimedwait([signum], 0)
1040            if info.si_signo != signum:
1041                raise Exception('info.si_signo != %s' % signum)
1042        ''')
1043
1044    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1045                         'need signal.sigtimedwait()')
1046    def test_sigtimedwait_timeout(self):
1047        self.wait_helper(signal.SIGALRM, '''
1048        def test(signum):
1049            received = signal.sigtimedwait([signum], 1.0)
1050            if received is not None:
1051                raise Exception("received=%r" % (received,))
1052        ''')
1053
1054    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
1055                         'need signal.sigtimedwait()')
1056    def test_sigtimedwait_negative_timeout(self):
1057        signum = signal.SIGALRM
1058        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
1059
1060    @unittest.skipUnless(hasattr(signal, 'sigwait'),
1061                         'need signal.sigwait()')
1062    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1063                         'need signal.pthread_sigmask()')
1064    @threading_helper.requires_working_threading()
1065    def test_sigwait_thread(self):
1066        # Check that calling sigwait() from a thread doesn't suspend the whole
1067        # process. A new interpreter is spawned to avoid problems when mixing
1068        # threads and fork(): only async-safe functions are allowed between
1069        # fork() and exec().
1070        assert_python_ok("-c", """if True:
1071            import os, threading, sys, time, signal
1072
1073            # the default handler terminates the process
1074            signum = signal.SIGUSR1
1075
1076            def kill_later():
1077                # wait until the main thread is waiting in sigwait()
1078                time.sleep(1)
1079                os.kill(os.getpid(), signum)
1080
1081            # the signal must be blocked by all the threads
1082            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1083            killer = threading.Thread(target=kill_later)
1084            killer.start()
1085            received = signal.sigwait([signum])
1086            if received != signum:
1087                print("sigwait() received %s, not %s" % (received, signum),
1088                      file=sys.stderr)
1089                sys.exit(1)
1090            killer.join()
1091            # unblock the signal, which should have been cleared by sigwait()
1092            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1093        """)
1094
1095    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1096                         'need signal.pthread_sigmask()')
1097    def test_pthread_sigmask_arguments(self):
1098        self.assertRaises(TypeError, signal.pthread_sigmask)
1099        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
1100        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
1101        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
1102        with self.assertRaises(ValueError):
1103            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
1104        with self.assertRaises(ValueError):
1105            signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1106        with self.assertRaises(ValueError):
1107            signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
1108
1109    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1110                         'need signal.pthread_sigmask()')
1111    def test_pthread_sigmask_valid_signals(self):
1112        s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1113        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1114        # Get current blocked set
1115        s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1116        self.assertLessEqual(s, signal.valid_signals())
1117
1118    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1119                         'need signal.pthread_sigmask()')
1120    @threading_helper.requires_working_threading()
1121    def test_pthread_sigmask(self):
1122        code = """if 1:
1123        import signal
1124        import os; import threading
1125
1126        def handler(signum, frame):
1127            1/0
1128
1129        def kill(signum):
1130            os.kill(os.getpid(), signum)
1131
1132        def check_mask(mask):
1133            for sig in mask:
1134                assert isinstance(sig, signal.Signals), repr(sig)
1135
1136        def read_sigmask():
1137            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1138            check_mask(sigmask)
1139            return sigmask
1140
1141        signum = signal.SIGUSR1
1142
1143        # Install our signal handler
1144        old_handler = signal.signal(signum, handler)
1145
1146        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
1147        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1148        check_mask(old_mask)
1149        try:
1150            kill(signum)
1151        except ZeroDivisionError:
1152            pass
1153        else:
1154            raise Exception("ZeroDivisionError not raised")
1155
1156        # Block and then raise SIGUSR1. The signal is blocked: the signal
1157        # handler is not called, and the signal is now pending
1158        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1159        check_mask(mask)
1160        kill(signum)
1161
1162        # Check the new mask
1163        blocked = read_sigmask()
1164        check_mask(blocked)
1165        if signum not in blocked:
1166            raise Exception("%s not in %s" % (signum, blocked))
1167        if old_mask ^ blocked != {signum}:
1168            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
1169
1170        # Unblock SIGUSR1
1171        try:
1172            # unblock the pending signal calls immediately the signal handler
1173            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1174        except ZeroDivisionError:
1175            pass
1176        else:
1177            raise Exception("ZeroDivisionError not raised")
1178        try:
1179            kill(signum)
1180        except ZeroDivisionError:
1181            pass
1182        else:
1183            raise Exception("ZeroDivisionError not raised")
1184
1185        # Check the new mask
1186        unblocked = read_sigmask()
1187        if signum in unblocked:
1188            raise Exception("%s in %s" % (signum, unblocked))
1189        if blocked ^ unblocked != {signum}:
1190            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1191        if old_mask != unblocked:
1192            raise Exception("%s != %s" % (old_mask, unblocked))
1193        """
1194        assert_python_ok('-c', code)
1195
1196    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1197                         'need signal.pthread_kill()')
1198    @threading_helper.requires_working_threading()
1199    def test_pthread_kill_main_thread(self):
1200        # Test that a signal can be sent to the main thread with pthread_kill()
1201        # before any other thread has been created (see issue #12392).
1202        code = """if True:
1203            import threading
1204            import signal
1205            import sys
1206
1207            def handler(signum, frame):
1208                sys.exit(3)
1209
1210            signal.signal(signal.SIGUSR1, handler)
1211            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1212            sys.exit(2)
1213        """
1214
1215        with spawn_python('-c', code) as process:
1216            stdout, stderr = process.communicate()
1217            exitcode = process.wait()
1218            if exitcode != 3:
1219                raise Exception("Child error (exit code %s): %s" %
1220                                (exitcode, stdout))
1221
1222
1223class StressTest(unittest.TestCase):
1224    """
1225    Stress signal delivery, especially when a signal arrives in
1226    the middle of recomputing the signal state or executing
1227    previously tripped signal handlers.
1228    """
1229
1230    def setsig(self, signum, handler):
1231        old_handler = signal.signal(signum, handler)
1232        self.addCleanup(signal.signal, signum, old_handler)
1233
1234    def measure_itimer_resolution(self):
1235        N = 20
1236        times = []
1237
1238        def handler(signum=None, frame=None):
1239            if len(times) < N:
1240                times.append(time.perf_counter())
1241                # 1 µs is the smallest possible timer interval,
1242                # we want to measure what the concrete duration
1243                # will be on this platform
1244                signal.setitimer(signal.ITIMER_REAL, 1e-6)
1245
1246        self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1247        self.setsig(signal.SIGALRM, handler)
1248        handler()
1249        while len(times) < N:
1250            time.sleep(1e-3)
1251
1252        durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1253        med = statistics.median(durations)
1254        if support.verbose:
1255            print("detected median itimer() resolution: %.6f s." % (med,))
1256        return med
1257
1258    def decide_itimer_count(self):
1259        # Some systems have poor setitimer() resolution (for example
1260        # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1261        # number of sequential timers based on that.
1262        reso = self.measure_itimer_resolution()
1263        if reso <= 1e-4:
1264            return 10000
1265        elif reso <= 1e-2:
1266            return 100
1267        else:
1268            self.skipTest("detected itimer resolution (%.3f s.) too high "
1269                          "(> 10 ms.) on this platform (or system too busy)"
1270                          % (reso,))
1271
1272    @unittest.skipUnless(hasattr(signal, "setitimer"),
1273                         "test needs setitimer()")
1274    def test_stress_delivery_dependent(self):
1275        """
1276        This test uses dependent signal handlers.
1277        """
1278        N = self.decide_itimer_count()
1279        sigs = []
1280
1281        def first_handler(signum, frame):
1282            # 1e-6 is the minimum non-zero value for `setitimer()`.
1283            # Choose a random delay so as to improve chances of
1284            # triggering a race condition.  Ideally the signal is received
1285            # when inside critical signal-handling routines such as
1286            # Py_MakePendingCalls().
1287            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1288
1289        def second_handler(signum=None, frame=None):
1290            sigs.append(signum)
1291
1292        # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
1293        # ascending and descending sequences (SIGUSR1 then SIGALRM,
1294        # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
1295        self.setsig(signal.SIGPROF, first_handler)
1296        self.setsig(signal.SIGUSR1, first_handler)
1297        self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
1298
1299        expected_sigs = 0
1300        deadline = time.monotonic() + support.SHORT_TIMEOUT
1301
1302        while expected_sigs < N:
1303            os.kill(os.getpid(), signal.SIGPROF)
1304            expected_sigs += 1
1305            # Wait for handlers to run to avoid signal coalescing
1306            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1307                time.sleep(1e-5)
1308
1309            os.kill(os.getpid(), signal.SIGUSR1)
1310            expected_sigs += 1
1311            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1312                time.sleep(1e-5)
1313
1314        # All ITIMER_REAL signals should have been delivered to the
1315        # Python handler
1316        self.assertEqual(len(sigs), N, "Some signals were lost")
1317
1318    @unittest.skipUnless(hasattr(signal, "setitimer"),
1319                         "test needs setitimer()")
1320    def test_stress_delivery_simultaneous(self):
1321        """
1322        This test uses simultaneous signal handlers.
1323        """
1324        N = self.decide_itimer_count()
1325        sigs = []
1326
1327        def handler(signum, frame):
1328            sigs.append(signum)
1329
1330        # On Android, SIGUSR1 is unreliable when used in close proximity to
1331        # another signal – see Android/testbed/app/src/main/python/main.py.
1332        # So we use a different signal.
1333        self.setsig(signal.SIGUSR2, handler)
1334        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
1335
1336        expected_sigs = 0
1337        while expected_sigs < N:
1338            # Hopefully the SIGALRM will be received somewhere during
1339            # initial processing of SIGUSR2.
1340            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1341            os.kill(os.getpid(), signal.SIGUSR2)
1342
1343            expected_sigs += 2
1344            # Wait for handlers to run to avoid signal coalescing
1345            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
1346                if len(sigs) >= expected_sigs:
1347                    break
1348
1349        # All ITIMER_REAL signals should have been delivered to the
1350        # Python handler
1351        self.assertEqual(len(sigs), N, "Some signals were lost")
1352
1353    @unittest.skipIf(is_apple, "crashes due to system bug (FB13453490)")
1354    @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
1355                         "test needs SIGUSR1")
1356    @threading_helper.requires_working_threading()
1357    def test_stress_modifying_handlers(self):
1358        # bpo-43406: race condition between trip_signal() and signal.signal
1359        signum = signal.SIGUSR1
1360        num_sent_signals = 0
1361        num_received_signals = 0
1362        do_stop = False
1363
1364        def custom_handler(signum, frame):
1365            nonlocal num_received_signals
1366            num_received_signals += 1
1367
1368        def set_interrupts():
1369            nonlocal num_sent_signals
1370            while not do_stop:
1371                signal.raise_signal(signum)
1372                num_sent_signals += 1
1373
1374        def cycle_handlers():
1375            while num_sent_signals < 100 or num_received_signals < 1:
1376                for i in range(20000):
1377                    # Cycle between a Python-defined and a non-Python handler
1378                    for handler in [custom_handler, signal.SIG_IGN]:
1379                        signal.signal(signum, handler)
1380
1381        old_handler = signal.signal(signum, custom_handler)
1382        self.addCleanup(signal.signal, signum, old_handler)
1383
1384        t = threading.Thread(target=set_interrupts)
1385        try:
1386            ignored = False
1387            with support.catch_unraisable_exception() as cm:
1388                t.start()
1389                cycle_handlers()
1390                do_stop = True
1391                t.join()
1392
1393                if cm.unraisable is not None:
1394                    # An unraisable exception may be printed out when
1395                    # a signal is ignored due to the aforementioned
1396                    # race condition, check it.
1397                    self.assertIsInstance(cm.unraisable.exc_value, OSError)
1398                    self.assertIn(
1399                        f"Signal {signum:d} ignored due to race condition",
1400                        str(cm.unraisable.exc_value))
1401                    ignored = True
1402
1403            # bpo-43406: Even if it is unlikely, it's technically possible that
1404            # all signals were ignored because of race conditions.
1405            if not ignored:
1406                # Sanity check that some signals were received, but not all
1407                self.assertGreater(num_received_signals, 0)
1408            self.assertLessEqual(num_received_signals, num_sent_signals)
1409        finally:
1410            do_stop = True
1411            t.join()
1412
1413
1414class RaiseSignalTest(unittest.TestCase):
1415
1416    def test_sigint(self):
1417        with self.assertRaises(KeyboardInterrupt):
1418            signal.raise_signal(signal.SIGINT)
1419
1420    @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1421    def test_invalid_argument(self):
1422        try:
1423            SIGHUP = 1 # not supported on win32
1424            signal.raise_signal(SIGHUP)
1425            self.fail("OSError (Invalid argument) expected")
1426        except OSError as e:
1427            if e.errno == errno.EINVAL:
1428                pass
1429            else:
1430                raise
1431
1432    def test_handler(self):
1433        is_ok = False
1434        def handler(a, b):
1435            nonlocal is_ok
1436            is_ok = True
1437        old_signal = signal.signal(signal.SIGINT, handler)
1438        self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1439
1440        signal.raise_signal(signal.SIGINT)
1441        self.assertTrue(is_ok)
1442
1443    def test__thread_interrupt_main(self):
1444        # See https://github.com/python/cpython/issues/102397
1445        code = """if 1:
1446        import _thread
1447        class Foo():
1448            def __del__(self):
1449                _thread.interrupt_main()
1450
1451        x = Foo()
1452        """
1453
1454        rc, out, err = assert_python_ok('-c', code)
1455        self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
1456
1457
1458
1459class PidfdSignalTest(unittest.TestCase):
1460
1461    @unittest.skipUnless(
1462        hasattr(signal, "pidfd_send_signal"),
1463        "pidfd support not built in",
1464    )
1465    def test_pidfd_send_signal(self):
1466        with self.assertRaises(OSError) as cm:
1467            signal.pidfd_send_signal(0, signal.SIGINT)
1468        if cm.exception.errno == errno.ENOSYS:
1469            self.skipTest("kernel does not support pidfds")
1470        elif cm.exception.errno == errno.EPERM:
1471            self.skipTest("Not enough privileges to use pidfs")
1472        self.assertEqual(cm.exception.errno, errno.EBADF)
1473        my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
1474        self.addCleanup(os.close, my_pidfd)
1475        with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
1476            signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
1477        with self.assertRaises(KeyboardInterrupt):
1478            signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
1479
1480def tearDownModule():
1481    support.reap_children()
1482
1483if __name__ == "__main__":
1484    unittest.main()
1485