• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2import inspect
3import os
4import random
5import signal
6import socket
7import statistics
8import subprocess
9import sys
10import threading
11import time
12import unittest
13from test import support
14from test.support import os_helper
15from test.support.script_helper import assert_python_ok, spawn_python
16try:
17    import _testcapi
18except ImportError:
19    _testcapi = None
20
21
22class GenericTests(unittest.TestCase):
23
24    def test_enums(self):
25        for name in dir(signal):
26            sig = getattr(signal, name)
27            if name in {'SIG_DFL', 'SIG_IGN'}:
28                self.assertIsInstance(sig, signal.Handlers)
29            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
30                self.assertIsInstance(sig, signal.Sigmasks)
31            elif name.startswith('SIG') and not name.startswith('SIG_'):
32                self.assertIsInstance(sig, signal.Signals)
33            elif name.startswith('CTRL_'):
34                self.assertIsInstance(sig, signal.Signals)
35                self.assertEqual(sys.platform, "win32")
36
37    def test_functions_module_attr(self):
38        # Issue #27718: If __all__ is not defined all non-builtin functions
39        # should have correct __module__ to be displayed by pydoc.
40        for name in dir(signal):
41            value = getattr(signal, name)
42            if inspect.isroutine(value) and not inspect.isbuiltin(value):
43                self.assertEqual(value.__module__, 'signal')
44
45
46@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
47class PosixTests(unittest.TestCase):
48    def trivial_signal_handler(self, *args):
49        pass
50
51    def test_out_of_range_signal_number_raises_error(self):
52        self.assertRaises(ValueError, signal.getsignal, 4242)
53
54        self.assertRaises(ValueError, signal.signal, 4242,
55                          self.trivial_signal_handler)
56
57        self.assertRaises(ValueError, signal.strsignal, 4242)
58
59    def test_setting_signal_handler_to_none_raises_error(self):
60        self.assertRaises(TypeError, signal.signal,
61                          signal.SIGUSR1, None)
62
63    def test_getsignal(self):
64        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
65        self.assertIsInstance(hup, signal.Handlers)
66        self.assertEqual(signal.getsignal(signal.SIGHUP),
67                         self.trivial_signal_handler)
68        signal.signal(signal.SIGHUP, hup)
69        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
70
71    def test_strsignal(self):
72        self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
73        self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
74        self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
75
76    # Issue 3864, unknown if this affects earlier versions of freebsd also
77    def test_interprocess_signal(self):
78        dirname = os.path.dirname(__file__)
79        script = os.path.join(dirname, 'signalinterproctester.py')
80        assert_python_ok(script)
81
82    def test_valid_signals(self):
83        s = signal.valid_signals()
84        self.assertIsInstance(s, set)
85        self.assertIn(signal.Signals.SIGINT, s)
86        self.assertIn(signal.Signals.SIGALRM, s)
87        self.assertNotIn(0, s)
88        self.assertNotIn(signal.NSIG, s)
89        self.assertLess(len(s), signal.NSIG)
90
91    @unittest.skipUnless(sys.executable, "sys.executable required.")
92    def test_keyboard_interrupt_exit_code(self):
93        """KeyboardInterrupt triggers exit via SIGINT."""
94        process = subprocess.run(
95                [sys.executable, "-c",
96                 "import os, signal, time\n"
97                 "os.kill(os.getpid(), signal.SIGINT)\n"
98                 "for _ in range(999): time.sleep(0.01)"],
99                stderr=subprocess.PIPE)
100        self.assertIn(b"KeyboardInterrupt", process.stderr)
101        self.assertEqual(process.returncode, -signal.SIGINT)
102        # Caveat: The exit code is insufficient to guarantee we actually died
103        # via a signal.  POSIX shells do more than look at the 8 bit value.
104        # Writing an automation friendly test of an interactive shell
105        # to confirm that our process died via a SIGINT proved too complex.
106
107
108@unittest.skipUnless(sys.platform == "win32", "Windows specific")
109class WindowsSignalTests(unittest.TestCase):
110
111    def test_valid_signals(self):
112        s = signal.valid_signals()
113        self.assertIsInstance(s, set)
114        self.assertGreaterEqual(len(s), 6)
115        self.assertIn(signal.Signals.SIGINT, s)
116        self.assertNotIn(0, s)
117        self.assertNotIn(signal.NSIG, s)
118        self.assertLess(len(s), signal.NSIG)
119
120    def test_issue9324(self):
121        # Updated for issue #10003, adding SIGBREAK
122        handler = lambda x, y: None
123        checked = set()
124        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
125                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
126                    signal.SIGTERM):
127            # Set and then reset a handler for signals that work on windows.
128            # Issue #18396, only for signals without a C-level handler.
129            if signal.getsignal(sig) is not None:
130                signal.signal(sig, signal.signal(sig, handler))
131                checked.add(sig)
132        # Issue #18396: Ensure the above loop at least tested *something*
133        self.assertTrue(checked)
134
135        with self.assertRaises(ValueError):
136            signal.signal(-1, handler)
137
138        with self.assertRaises(ValueError):
139            signal.signal(7, handler)
140
141    @unittest.skipUnless(sys.executable, "sys.executable required.")
142    def test_keyboard_interrupt_exit_code(self):
143        """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
144        # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
145        # as that requires setting up a console control handler in a child
146        # in its own process group.  Doable, but quite complicated.  (see
147        # @eryksun on https://github.com/python/cpython/pull/11862)
148        process = subprocess.run(
149                [sys.executable, "-c", "raise KeyboardInterrupt"],
150                stderr=subprocess.PIPE)
151        self.assertIn(b"KeyboardInterrupt", process.stderr)
152        STATUS_CONTROL_C_EXIT = 0xC000013A
153        self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
154
155
156class WakeupFDTests(unittest.TestCase):
157
158    def test_invalid_call(self):
159        # First parameter is positional-only
160        with self.assertRaises(TypeError):
161            signal.set_wakeup_fd(signum=signal.SIGINT)
162
163        # warn_on_full_buffer is a keyword-only parameter
164        with self.assertRaises(TypeError):
165            signal.set_wakeup_fd(signal.SIGINT, False)
166
167    def test_invalid_fd(self):
168        fd = os_helper.make_bad_fd()
169        self.assertRaises((ValueError, OSError),
170                          signal.set_wakeup_fd, fd)
171
172    def test_invalid_socket(self):
173        sock = socket.socket()
174        fd = sock.fileno()
175        sock.close()
176        self.assertRaises((ValueError, OSError),
177                          signal.set_wakeup_fd, fd)
178
179    def test_set_wakeup_fd_result(self):
180        r1, w1 = os.pipe()
181        self.addCleanup(os.close, r1)
182        self.addCleanup(os.close, w1)
183        r2, w2 = os.pipe()
184        self.addCleanup(os.close, r2)
185        self.addCleanup(os.close, w2)
186
187        if hasattr(os, 'set_blocking'):
188            os.set_blocking(w1, False)
189            os.set_blocking(w2, False)
190
191        signal.set_wakeup_fd(w1)
192        self.assertEqual(signal.set_wakeup_fd(w2), w1)
193        self.assertEqual(signal.set_wakeup_fd(-1), w2)
194        self.assertEqual(signal.set_wakeup_fd(-1), -1)
195
196    def test_set_wakeup_fd_socket_result(self):
197        sock1 = socket.socket()
198        self.addCleanup(sock1.close)
199        sock1.setblocking(False)
200        fd1 = sock1.fileno()
201
202        sock2 = socket.socket()
203        self.addCleanup(sock2.close)
204        sock2.setblocking(False)
205        fd2 = sock2.fileno()
206
207        signal.set_wakeup_fd(fd1)
208        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
209        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
210        self.assertEqual(signal.set_wakeup_fd(-1), -1)
211
212    # On Windows, files are always blocking and Windows does not provide a
213    # function to test if a socket is in non-blocking mode.
214    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
215    def test_set_wakeup_fd_blocking(self):
216        rfd, wfd = os.pipe()
217        self.addCleanup(os.close, rfd)
218        self.addCleanup(os.close, wfd)
219
220        # fd must be non-blocking
221        os.set_blocking(wfd, True)
222        with self.assertRaises(ValueError) as cm:
223            signal.set_wakeup_fd(wfd)
224        self.assertEqual(str(cm.exception),
225                         "the fd %s must be in non-blocking mode" % wfd)
226
227        # non-blocking is ok
228        os.set_blocking(wfd, False)
229        signal.set_wakeup_fd(wfd)
230        signal.set_wakeup_fd(-1)
231
232
233@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
234class WakeupSignalTests(unittest.TestCase):
235    @unittest.skipIf(_testcapi is None, 'need _testcapi')
236    def check_wakeup(self, test_body, *signals, ordered=True):
237        # use a subprocess to have only one thread
238        code = """if 1:
239        import _testcapi
240        import os
241        import signal
242        import struct
243
244        signals = {!r}
245
246        def handler(signum, frame):
247            pass
248
249        def check_signum(signals):
250            data = os.read(read, len(signals)+1)
251            raised = struct.unpack('%uB' % len(data), data)
252            if not {!r}:
253                raised = set(raised)
254                signals = set(signals)
255            if raised != signals:
256                raise Exception("%r != %r" % (raised, signals))
257
258        {}
259
260        signal.signal(signal.SIGALRM, handler)
261        read, write = os.pipe()
262        os.set_blocking(write, False)
263        signal.set_wakeup_fd(write)
264
265        test()
266        check_signum(signals)
267
268        os.close(read)
269        os.close(write)
270        """.format(tuple(map(int, signals)), ordered, test_body)
271
272        assert_python_ok('-c', code)
273
274    @unittest.skipIf(_testcapi is None, 'need _testcapi')
275    def test_wakeup_write_error(self):
276        # Issue #16105: write() errors in the C signal handler should not
277        # pass silently.
278        # Use a subprocess to have only one thread.
279        code = """if 1:
280        import _testcapi
281        import errno
282        import os
283        import signal
284        import sys
285        from test.support import captured_stderr
286
287        def handler(signum, frame):
288            1/0
289
290        signal.signal(signal.SIGALRM, handler)
291        r, w = os.pipe()
292        os.set_blocking(r, False)
293
294        # Set wakeup_fd a read-only file descriptor to trigger the error
295        signal.set_wakeup_fd(r)
296        try:
297            with captured_stderr() as err:
298                signal.raise_signal(signal.SIGALRM)
299        except ZeroDivisionError:
300            # An ignored exception should have been printed out on stderr
301            err = err.getvalue()
302            if ('Exception ignored when trying to write to the signal wakeup fd'
303                not in err):
304                raise AssertionError(err)
305            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
306                raise AssertionError(err)
307        else:
308            raise AssertionError("ZeroDivisionError not raised")
309
310        os.close(r)
311        os.close(w)
312        """
313        r, w = os.pipe()
314        try:
315            os.write(r, b'x')
316        except OSError:
317            pass
318        else:
319            self.skipTest("OS doesn't report write() error on the read end of a pipe")
320        finally:
321            os.close(r)
322            os.close(w)
323
324        assert_python_ok('-c', code)
325
326    def test_wakeup_fd_early(self):
327        self.check_wakeup("""def test():
328            import select
329            import time
330
331            TIMEOUT_FULL = 10
332            TIMEOUT_HALF = 5
333
334            class InterruptSelect(Exception):
335                pass
336
337            def handler(signum, frame):
338                raise InterruptSelect
339            signal.signal(signal.SIGALRM, handler)
340
341            signal.alarm(1)
342
343            # We attempt to get a signal during the sleep,
344            # before select is called
345            try:
346                select.select([], [], [], TIMEOUT_FULL)
347            except InterruptSelect:
348                pass
349            else:
350                raise Exception("select() was not interrupted")
351
352            before_time = time.monotonic()
353            select.select([read], [], [], TIMEOUT_FULL)
354            after_time = time.monotonic()
355            dt = after_time - before_time
356            if dt >= TIMEOUT_HALF:
357                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
358        """, signal.SIGALRM)
359
360    def test_wakeup_fd_during(self):
361        self.check_wakeup("""def test():
362            import select
363            import time
364
365            TIMEOUT_FULL = 10
366            TIMEOUT_HALF = 5
367
368            class InterruptSelect(Exception):
369                pass
370
371            def handler(signum, frame):
372                raise InterruptSelect
373            signal.signal(signal.SIGALRM, handler)
374
375            signal.alarm(1)
376            before_time = time.monotonic()
377            # We attempt to get a signal during the select call
378            try:
379                select.select([read], [], [], TIMEOUT_FULL)
380            except InterruptSelect:
381                pass
382            else:
383                raise Exception("select() was not interrupted")
384            after_time = time.monotonic()
385            dt = after_time - before_time
386            if dt >= TIMEOUT_HALF:
387                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
388        """, signal.SIGALRM)
389
390    def test_signum(self):
391        self.check_wakeup("""def test():
392            signal.signal(signal.SIGUSR1, handler)
393            signal.raise_signal(signal.SIGUSR1)
394            signal.raise_signal(signal.SIGALRM)
395        """, signal.SIGUSR1, signal.SIGALRM)
396
397    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
398                         'need signal.pthread_sigmask()')
399    def test_pending(self):
400        self.check_wakeup("""def test():
401            signum1 = signal.SIGUSR1
402            signum2 = signal.SIGUSR2
403
404            signal.signal(signum1, handler)
405            signal.signal(signum2, handler)
406
407            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
408            signal.raise_signal(signum1)
409            signal.raise_signal(signum2)
410            # Unblocking the 2 signals calls the C signal handler twice
411            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
412        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
413
414
415@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
416class WakeupSocketSignalTests(unittest.TestCase):
417
418    @unittest.skipIf(_testcapi is None, 'need _testcapi')
419    def test_socket(self):
420        # use a subprocess to have only one thread
421        code = """if 1:
422        import signal
423        import socket
424        import struct
425        import _testcapi
426
427        signum = signal.SIGINT
428        signals = (signum,)
429
430        def handler(signum, frame):
431            pass
432
433        signal.signal(signum, handler)
434
435        read, write = socket.socketpair()
436        write.setblocking(False)
437        signal.set_wakeup_fd(write.fileno())
438
439        signal.raise_signal(signum)
440
441        data = read.recv(1)
442        if not data:
443            raise Exception("no signum written")
444        raised = struct.unpack('B', data)
445        if raised != signals:
446            raise Exception("%r != %r" % (raised, signals))
447
448        read.close()
449        write.close()
450        """
451
452        assert_python_ok('-c', code)
453
454    @unittest.skipIf(_testcapi is None, 'need _testcapi')
455    def test_send_error(self):
456        # Use a subprocess to have only one thread.
457        if os.name == 'nt':
458            action = 'send'
459        else:
460            action = 'write'
461        code = """if 1:
462        import errno
463        import signal
464        import socket
465        import sys
466        import time
467        import _testcapi
468        from test.support import captured_stderr
469
470        signum = signal.SIGINT
471
472        def handler(signum, frame):
473            pass
474
475        signal.signal(signum, handler)
476
477        read, write = socket.socketpair()
478        read.setblocking(False)
479        write.setblocking(False)
480
481        signal.set_wakeup_fd(write.fileno())
482
483        # Close sockets: send() will fail
484        read.close()
485        write.close()
486
487        with captured_stderr() as err:
488            signal.raise_signal(signum)
489
490        err = err.getvalue()
491        if ('Exception ignored when trying to {action} to the signal wakeup fd'
492            not in err):
493            raise AssertionError(err)
494        """.format(action=action)
495        assert_python_ok('-c', code)
496
497    @unittest.skipIf(_testcapi is None, 'need _testcapi')
498    def test_warn_on_full_buffer(self):
499        # Use a subprocess to have only one thread.
500        if os.name == 'nt':
501            action = 'send'
502        else:
503            action = 'write'
504        code = """if 1:
505        import errno
506        import signal
507        import socket
508        import sys
509        import time
510        import _testcapi
511        from test.support import captured_stderr
512
513        signum = signal.SIGINT
514
515        # This handler will be called, but we intentionally won't read from
516        # the wakeup fd.
517        def handler(signum, frame):
518            pass
519
520        signal.signal(signum, handler)
521
522        read, write = socket.socketpair()
523
524        # Fill the socketpair buffer
525        if sys.platform == 'win32':
526            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
527            # the full socketpair buffer, so use a timeout of 50 ms instead.
528            write.settimeout(0.050)
529        else:
530            write.setblocking(False)
531
532        written = 0
533        if sys.platform == "vxworks":
534            CHUNK_SIZES = (1,)
535        else:
536            # Start with large chunk size to reduce the
537            # number of send needed to fill the buffer.
538            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
539        for chunk_size in CHUNK_SIZES:
540            chunk = b"x" * chunk_size
541            try:
542                while True:
543                    write.send(chunk)
544                    written += chunk_size
545            except (BlockingIOError, TimeoutError):
546                pass
547
548        print(f"%s bytes written into the socketpair" % written, flush=True)
549
550        write.setblocking(False)
551        try:
552            write.send(b"x")
553        except BlockingIOError:
554            # The socketpair buffer seems full
555            pass
556        else:
557            raise AssertionError("%s bytes failed to fill the socketpair "
558                                 "buffer" % written)
559
560        # By default, we get a warning when a signal arrives
561        msg = ('Exception ignored when trying to {action} '
562               'to the signal wakeup fd')
563        signal.set_wakeup_fd(write.fileno())
564
565        with captured_stderr() as err:
566            signal.raise_signal(signum)
567
568        err = err.getvalue()
569        if msg not in err:
570            raise AssertionError("first set_wakeup_fd() test failed, "
571                                 "stderr: %r" % err)
572
573        # And also if warn_on_full_buffer=True
574        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
575
576        with captured_stderr() as err:
577            signal.raise_signal(signum)
578
579        err = err.getvalue()
580        if msg not in err:
581            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
582                                 "test failed, stderr: %r" % err)
583
584        # But not if warn_on_full_buffer=False
585        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
586
587        with captured_stderr() as err:
588            signal.raise_signal(signum)
589
590        err = err.getvalue()
591        if err != "":
592            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
593                                 "test failed, stderr: %r" % err)
594
595        # And then check the default again, to make sure warn_on_full_buffer
596        # settings don't leak across calls.
597        signal.set_wakeup_fd(write.fileno())
598
599        with captured_stderr() as err:
600            signal.raise_signal(signum)
601
602        err = err.getvalue()
603        if msg not in err:
604            raise AssertionError("second set_wakeup_fd() test failed, "
605                                 "stderr: %r" % err)
606
607        """.format(action=action)
608        assert_python_ok('-c', code)
609
610
611@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
612@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
613class SiginterruptTest(unittest.TestCase):
614
615    def readpipe_interrupted(self, interrupt):
616        """Perform a read during which a signal will arrive.  Return True if the
617        read is interrupted by the signal and raises an exception.  Return False
618        if it returns normally.
619        """
620        # use a subprocess to have only one thread, to have a timeout on the
621        # blocking read and to not touch signal handling in this process
622        code = """if 1:
623            import errno
624            import os
625            import signal
626            import sys
627
628            interrupt = %r
629            r, w = os.pipe()
630
631            def handler(signum, frame):
632                1 / 0
633
634            signal.signal(signal.SIGALRM, handler)
635            if interrupt is not None:
636                signal.siginterrupt(signal.SIGALRM, interrupt)
637
638            print("ready")
639            sys.stdout.flush()
640
641            # run the test twice
642            try:
643                for loop in range(2):
644                    # send a SIGALRM in a second (during the read)
645                    signal.alarm(1)
646                    try:
647                        # blocking call: read from a pipe without data
648                        os.read(r, 1)
649                    except ZeroDivisionError:
650                        pass
651                    else:
652                        sys.exit(2)
653                sys.exit(3)
654            finally:
655                os.close(r)
656                os.close(w)
657        """ % (interrupt,)
658        with spawn_python('-c', code) as process:
659            try:
660                # wait until the child process is loaded and has started
661                first_line = process.stdout.readline()
662
663                stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
664            except subprocess.TimeoutExpired:
665                process.kill()
666                return False
667            else:
668                stdout = first_line + stdout
669                exitcode = process.wait()
670                if exitcode not in (2, 3):
671                    raise Exception("Child error (exit code %s): %r"
672                                    % (exitcode, stdout))
673                return (exitcode == 3)
674
675    def test_without_siginterrupt(self):
676        # If a signal handler is installed and siginterrupt is not called
677        # at all, when that signal arrives, it interrupts a syscall that's in
678        # progress.
679        interrupted = self.readpipe_interrupted(None)
680        self.assertTrue(interrupted)
681
682    def test_siginterrupt_on(self):
683        # If a signal handler is installed and siginterrupt is called with
684        # a true value for the second argument, when that signal arrives, it
685        # interrupts a syscall that's in progress.
686        interrupted = self.readpipe_interrupted(True)
687        self.assertTrue(interrupted)
688
689    def test_siginterrupt_off(self):
690        # If a signal handler is installed and siginterrupt is called with
691        # a false value for the second argument, when that signal arrives, it
692        # does not interrupt a syscall that's in progress.
693        interrupted = self.readpipe_interrupted(False)
694        self.assertFalse(interrupted)
695
696
697@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
698@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
699                         "needs signal.getitimer() and signal.setitimer()")
700class ItimerTest(unittest.TestCase):
701    def setUp(self):
702        self.hndl_called = False
703        self.hndl_count = 0
704        self.itimer = None
705        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
706
707    def tearDown(self):
708        signal.signal(signal.SIGALRM, self.old_alarm)
709        if self.itimer is not None: # test_itimer_exc doesn't change this attr
710            # just ensure that itimer is stopped
711            signal.setitimer(self.itimer, 0)
712
713    def sig_alrm(self, *args):
714        self.hndl_called = True
715
716    def sig_vtalrm(self, *args):
717        self.hndl_called = True
718
719        if self.hndl_count > 3:
720            # it shouldn't be here, because it should have been disabled.
721            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
722                "timer.")
723        elif self.hndl_count == 3:
724            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
725            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
726
727        self.hndl_count += 1
728
729    def sig_prof(self, *args):
730        self.hndl_called = True
731        signal.setitimer(signal.ITIMER_PROF, 0)
732
733    def test_itimer_exc(self):
734        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
735        # defines it ?
736        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
737        # Negative times are treated as zero on some platforms.
738        if 0:
739            self.assertRaises(signal.ItimerError,
740                              signal.setitimer, signal.ITIMER_REAL, -1)
741
742    def test_itimer_real(self):
743        self.itimer = signal.ITIMER_REAL
744        signal.setitimer(self.itimer, 1.0)
745        signal.pause()
746        self.assertEqual(self.hndl_called, True)
747
748    # Issue 3864, unknown if this affects earlier versions of freebsd also
749    @unittest.skipIf(sys.platform in ('netbsd5',),
750        'itimer not reliable (does not mix well with threading) on some BSDs.')
751    def test_itimer_virtual(self):
752        self.itimer = signal.ITIMER_VIRTUAL
753        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
754        signal.setitimer(self.itimer, 0.3, 0.2)
755
756        start_time = time.monotonic()
757        while time.monotonic() - start_time < 60.0:
758            # use up some virtual time by doing real work
759            _ = pow(12345, 67890, 10000019)
760            if signal.getitimer(self.itimer) == (0.0, 0.0):
761                break # sig_vtalrm handler stopped this itimer
762        else: # Issue 8424
763            self.skipTest("timeout: likely cause: machine too slow or load too "
764                          "high")
765
766        # virtual itimer should be (0.0, 0.0) now
767        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
768        # and the handler should have been called
769        self.assertEqual(self.hndl_called, True)
770
771    def test_itimer_prof(self):
772        self.itimer = signal.ITIMER_PROF
773        signal.signal(signal.SIGPROF, self.sig_prof)
774        signal.setitimer(self.itimer, 0.2, 0.2)
775
776        start_time = time.monotonic()
777        while time.monotonic() - start_time < 60.0:
778            # do some work
779            _ = pow(12345, 67890, 10000019)
780            if signal.getitimer(self.itimer) == (0.0, 0.0):
781                break # sig_prof handler stopped this itimer
782        else: # Issue 8424
783            self.skipTest("timeout: likely cause: machine too slow or load too "
784                          "high")
785
786        # profiling itimer should be (0.0, 0.0) now
787        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
788        # and the handler should have been called
789        self.assertEqual(self.hndl_called, True)
790
791    def test_setitimer_tiny(self):
792        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
793        # Check that float -> timeval conversion doesn't round
794        # the interval down to zero, which would disable the timer.
795        self.itimer = signal.ITIMER_REAL
796        signal.setitimer(self.itimer, 1e-6)
797        time.sleep(1)
798        self.assertEqual(self.hndl_called, True)
799
800
801class PendingSignalsTests(unittest.TestCase):
802    """
803    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
804    functions.
805    """
806    @unittest.skipUnless(hasattr(signal, 'sigpending'),
807                         'need signal.sigpending()')
808    def test_sigpending_empty(self):
809        self.assertEqual(signal.sigpending(), set())
810
811    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
812                         'need signal.pthread_sigmask()')
813    @unittest.skipUnless(hasattr(signal, 'sigpending'),
814                         'need signal.sigpending()')
815    def test_sigpending(self):
816        code = """if 1:
817            import os
818            import signal
819
820            def handler(signum, frame):
821                1/0
822
823            signum = signal.SIGUSR1
824            signal.signal(signum, handler)
825
826            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
827            os.kill(os.getpid(), signum)
828            pending = signal.sigpending()
829            for sig in pending:
830                assert isinstance(sig, signal.Signals), repr(pending)
831            if pending != {signum}:
832                raise Exception('%s != {%s}' % (pending, signum))
833            try:
834                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
835            except ZeroDivisionError:
836                pass
837            else:
838                raise Exception("ZeroDivisionError not raised")
839        """
840        assert_python_ok('-c', code)
841
842    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
843                         'need signal.pthread_kill()')
844    def test_pthread_kill(self):
845        code = """if 1:
846            import signal
847            import threading
848            import sys
849
850            signum = signal.SIGUSR1
851
852            def handler(signum, frame):
853                1/0
854
855            signal.signal(signum, handler)
856
857            tid = threading.get_ident()
858            try:
859                signal.pthread_kill(tid, signum)
860            except ZeroDivisionError:
861                pass
862            else:
863                raise Exception("ZeroDivisionError not raised")
864        """
865        assert_python_ok('-c', code)
866
867    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
868                         'need signal.pthread_sigmask()')
869    def wait_helper(self, blocked, test):
870        """
871        test: body of the "def test(signum):" function.
872        blocked: number of the blocked signal
873        """
874        code = '''if 1:
875        import signal
876        import sys
877        from signal import Signals
878
879        def handler(signum, frame):
880            1/0
881
882        %s
883
884        blocked = %s
885        signum = signal.SIGALRM
886
887        # child: block and wait the signal
888        try:
889            signal.signal(signum, handler)
890            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
891
892            # Do the tests
893            test(signum)
894
895            # The handler must not be called on unblock
896            try:
897                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
898            except ZeroDivisionError:
899                print("the signal handler has been called",
900                      file=sys.stderr)
901                sys.exit(1)
902        except BaseException as err:
903            print("error: {}".format(err), file=sys.stderr)
904            sys.stderr.flush()
905            sys.exit(1)
906        ''' % (test.strip(), blocked)
907
908        # sig*wait* must be called with the signal blocked: since the current
909        # process might have several threads running, use a subprocess to have
910        # a single thread.
911        assert_python_ok('-c', code)
912
913    @unittest.skipUnless(hasattr(signal, 'sigwait'),
914                         'need signal.sigwait()')
915    def test_sigwait(self):
916        self.wait_helper(signal.SIGALRM, '''
917        def test(signum):
918            signal.alarm(1)
919            received = signal.sigwait([signum])
920            assert isinstance(received, signal.Signals), received
921            if received != signum:
922                raise Exception('received %s, not %s' % (received, signum))
923        ''')
924
925    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
926                         'need signal.sigwaitinfo()')
927    def test_sigwaitinfo(self):
928        self.wait_helper(signal.SIGALRM, '''
929        def test(signum):
930            signal.alarm(1)
931            info = signal.sigwaitinfo([signum])
932            if info.si_signo != signum:
933                raise Exception("info.si_signo != %s" % signum)
934        ''')
935
936    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
937                         'need signal.sigtimedwait()')
938    def test_sigtimedwait(self):
939        self.wait_helper(signal.SIGALRM, '''
940        def test(signum):
941            signal.alarm(1)
942            info = signal.sigtimedwait([signum], 10.1000)
943            if info.si_signo != signum:
944                raise Exception('info.si_signo != %s' % signum)
945        ''')
946
947    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
948                         'need signal.sigtimedwait()')
949    def test_sigtimedwait_poll(self):
950        # check that polling with sigtimedwait works
951        self.wait_helper(signal.SIGALRM, '''
952        def test(signum):
953            import os
954            os.kill(os.getpid(), signum)
955            info = signal.sigtimedwait([signum], 0)
956            if info.si_signo != signum:
957                raise Exception('info.si_signo != %s' % signum)
958        ''')
959
960    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
961                         'need signal.sigtimedwait()')
962    def test_sigtimedwait_timeout(self):
963        self.wait_helper(signal.SIGALRM, '''
964        def test(signum):
965            received = signal.sigtimedwait([signum], 1.0)
966            if received is not None:
967                raise Exception("received=%r" % (received,))
968        ''')
969
970    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
971                         'need signal.sigtimedwait()')
972    def test_sigtimedwait_negative_timeout(self):
973        signum = signal.SIGALRM
974        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
975
976    @unittest.skipUnless(hasattr(signal, 'sigwait'),
977                         'need signal.sigwait()')
978    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
979                         'need signal.pthread_sigmask()')
980    def test_sigwait_thread(self):
981        # Check that calling sigwait() from a thread doesn't suspend the whole
982        # process. A new interpreter is spawned to avoid problems when mixing
983        # threads and fork(): only async-safe functions are allowed between
984        # fork() and exec().
985        assert_python_ok("-c", """if True:
986            import os, threading, sys, time, signal
987
988            # the default handler terminates the process
989            signum = signal.SIGUSR1
990
991            def kill_later():
992                # wait until the main thread is waiting in sigwait()
993                time.sleep(1)
994                os.kill(os.getpid(), signum)
995
996            # the signal must be blocked by all the threads
997            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
998            killer = threading.Thread(target=kill_later)
999            killer.start()
1000            received = signal.sigwait([signum])
1001            if received != signum:
1002                print("sigwait() received %s, not %s" % (received, signum),
1003                      file=sys.stderr)
1004                sys.exit(1)
1005            killer.join()
1006            # unblock the signal, which should have been cleared by sigwait()
1007            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1008        """)
1009
1010    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1011                         'need signal.pthread_sigmask()')
1012    def test_pthread_sigmask_arguments(self):
1013        self.assertRaises(TypeError, signal.pthread_sigmask)
1014        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
1015        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
1016        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
1017        with self.assertRaises(ValueError):
1018            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
1019        with self.assertRaises(ValueError):
1020            signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1021        with self.assertRaises(ValueError):
1022            signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
1023
1024    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1025                         'need signal.pthread_sigmask()')
1026    def test_pthread_sigmask_valid_signals(self):
1027        s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1028        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1029        # Get current blocked set
1030        s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1031        self.assertLessEqual(s, signal.valid_signals())
1032
1033    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1034                         'need signal.pthread_sigmask()')
1035    def test_pthread_sigmask(self):
1036        code = """if 1:
1037        import signal
1038        import os; import threading
1039
1040        def handler(signum, frame):
1041            1/0
1042
1043        def kill(signum):
1044            os.kill(os.getpid(), signum)
1045
1046        def check_mask(mask):
1047            for sig in mask:
1048                assert isinstance(sig, signal.Signals), repr(sig)
1049
1050        def read_sigmask():
1051            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1052            check_mask(sigmask)
1053            return sigmask
1054
1055        signum = signal.SIGUSR1
1056
1057        # Install our signal handler
1058        old_handler = signal.signal(signum, handler)
1059
1060        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
1061        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1062        check_mask(old_mask)
1063        try:
1064            kill(signum)
1065        except ZeroDivisionError:
1066            pass
1067        else:
1068            raise Exception("ZeroDivisionError not raised")
1069
1070        # Block and then raise SIGUSR1. The signal is blocked: the signal
1071        # handler is not called, and the signal is now pending
1072        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1073        check_mask(mask)
1074        kill(signum)
1075
1076        # Check the new mask
1077        blocked = read_sigmask()
1078        check_mask(blocked)
1079        if signum not in blocked:
1080            raise Exception("%s not in %s" % (signum, blocked))
1081        if old_mask ^ blocked != {signum}:
1082            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
1083
1084        # Unblock SIGUSR1
1085        try:
1086            # unblock the pending signal calls immediately the signal handler
1087            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1088        except ZeroDivisionError:
1089            pass
1090        else:
1091            raise Exception("ZeroDivisionError not raised")
1092        try:
1093            kill(signum)
1094        except ZeroDivisionError:
1095            pass
1096        else:
1097            raise Exception("ZeroDivisionError not raised")
1098
1099        # Check the new mask
1100        unblocked = read_sigmask()
1101        if signum in unblocked:
1102            raise Exception("%s in %s" % (signum, unblocked))
1103        if blocked ^ unblocked != {signum}:
1104            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1105        if old_mask != unblocked:
1106            raise Exception("%s != %s" % (old_mask, unblocked))
1107        """
1108        assert_python_ok('-c', code)
1109
1110    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1111                         'need signal.pthread_kill()')
1112    def test_pthread_kill_main_thread(self):
1113        # Test that a signal can be sent to the main thread with pthread_kill()
1114        # before any other thread has been created (see issue #12392).
1115        code = """if True:
1116            import threading
1117            import signal
1118            import sys
1119
1120            def handler(signum, frame):
1121                sys.exit(3)
1122
1123            signal.signal(signal.SIGUSR1, handler)
1124            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1125            sys.exit(2)
1126        """
1127
1128        with spawn_python('-c', code) as process:
1129            stdout, stderr = process.communicate()
1130            exitcode = process.wait()
1131            if exitcode != 3:
1132                raise Exception("Child error (exit code %s): %s" %
1133                                (exitcode, stdout))
1134
1135
1136class StressTest(unittest.TestCase):
1137    """
1138    Stress signal delivery, especially when a signal arrives in
1139    the middle of recomputing the signal state or executing
1140    previously tripped signal handlers.
1141    """
1142
1143    def setsig(self, signum, handler):
1144        old_handler = signal.signal(signum, handler)
1145        self.addCleanup(signal.signal, signum, old_handler)
1146
1147    def measure_itimer_resolution(self):
1148        N = 20
1149        times = []
1150
1151        def handler(signum=None, frame=None):
1152            if len(times) < N:
1153                times.append(time.perf_counter())
1154                # 1 µs is the smallest possible timer interval,
1155                # we want to measure what the concrete duration
1156                # will be on this platform
1157                signal.setitimer(signal.ITIMER_REAL, 1e-6)
1158
1159        self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1160        self.setsig(signal.SIGALRM, handler)
1161        handler()
1162        while len(times) < N:
1163            time.sleep(1e-3)
1164
1165        durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1166        med = statistics.median(durations)
1167        if support.verbose:
1168            print("detected median itimer() resolution: %.6f s." % (med,))
1169        return med
1170
1171    def decide_itimer_count(self):
1172        # Some systems have poor setitimer() resolution (for example
1173        # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1174        # number of sequential timers based on that.
1175        reso = self.measure_itimer_resolution()
1176        if reso <= 1e-4:
1177            return 10000
1178        elif reso <= 1e-2:
1179            return 100
1180        else:
1181            self.skipTest("detected itimer resolution (%.3f s.) too high "
1182                          "(> 10 ms.) on this platform (or system too busy)"
1183                          % (reso,))
1184
1185    @unittest.skipUnless(hasattr(signal, "setitimer"),
1186                         "test needs setitimer()")
1187    def test_stress_delivery_dependent(self):
1188        """
1189        This test uses dependent signal handlers.
1190        """
1191        N = self.decide_itimer_count()
1192        sigs = []
1193
1194        def first_handler(signum, frame):
1195            # 1e-6 is the minimum non-zero value for `setitimer()`.
1196            # Choose a random delay so as to improve chances of
1197            # triggering a race condition.  Ideally the signal is received
1198            # when inside critical signal-handling routines such as
1199            # Py_MakePendingCalls().
1200            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1201
1202        def second_handler(signum=None, frame=None):
1203            sigs.append(signum)
1204
1205        # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
1206        # ascending and descending sequences (SIGUSR1 then SIGALRM,
1207        # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
1208        self.setsig(signal.SIGPROF, first_handler)
1209        self.setsig(signal.SIGUSR1, first_handler)
1210        self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
1211
1212        expected_sigs = 0
1213        deadline = time.monotonic() + support.SHORT_TIMEOUT
1214
1215        while expected_sigs < N:
1216            os.kill(os.getpid(), signal.SIGPROF)
1217            expected_sigs += 1
1218            # Wait for handlers to run to avoid signal coalescing
1219            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1220                time.sleep(1e-5)
1221
1222            os.kill(os.getpid(), signal.SIGUSR1)
1223            expected_sigs += 1
1224            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1225                time.sleep(1e-5)
1226
1227        # All ITIMER_REAL signals should have been delivered to the
1228        # Python handler
1229        self.assertEqual(len(sigs), N, "Some signals were lost")
1230
1231    @unittest.skipUnless(hasattr(signal, "setitimer"),
1232                         "test needs setitimer()")
1233    def test_stress_delivery_simultaneous(self):
1234        """
1235        This test uses simultaneous signal handlers.
1236        """
1237        N = self.decide_itimer_count()
1238        sigs = []
1239
1240        def handler(signum, frame):
1241            sigs.append(signum)
1242
1243        self.setsig(signal.SIGUSR1, handler)
1244        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
1245
1246        expected_sigs = 0
1247        deadline = time.monotonic() + support.SHORT_TIMEOUT
1248
1249        while expected_sigs < N:
1250            # Hopefully the SIGALRM will be received somewhere during
1251            # initial processing of SIGUSR1.
1252            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1253            os.kill(os.getpid(), signal.SIGUSR1)
1254
1255            expected_sigs += 2
1256            # Wait for handlers to run to avoid signal coalescing
1257            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1258                time.sleep(1e-5)
1259
1260        # All ITIMER_REAL signals should have been delivered to the
1261        # Python handler
1262        self.assertEqual(len(sigs), N, "Some signals were lost")
1263
1264    @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
1265                         "test needs SIGUSR1")
1266    def test_stress_modifying_handlers(self):
1267        # bpo-43406: race condition between trip_signal() and signal.signal
1268        signum = signal.SIGUSR1
1269        num_sent_signals = 0
1270        num_received_signals = 0
1271        do_stop = False
1272
1273        def custom_handler(signum, frame):
1274            nonlocal num_received_signals
1275            num_received_signals += 1
1276
1277        def set_interrupts():
1278            nonlocal num_sent_signals
1279            while not do_stop:
1280                signal.raise_signal(signum)
1281                num_sent_signals += 1
1282
1283        def cycle_handlers():
1284            while num_sent_signals < 100:
1285                for i in range(20000):
1286                    # Cycle between a Python-defined and a non-Python handler
1287                    for handler in [custom_handler, signal.SIG_IGN]:
1288                        signal.signal(signum, handler)
1289
1290        old_handler = signal.signal(signum, custom_handler)
1291        self.addCleanup(signal.signal, signum, old_handler)
1292
1293        t = threading.Thread(target=set_interrupts)
1294        try:
1295            ignored = False
1296            with support.catch_unraisable_exception() as cm:
1297                t.start()
1298                cycle_handlers()
1299                do_stop = True
1300                t.join()
1301
1302                if cm.unraisable is not None:
1303                    # An unraisable exception may be printed out when
1304                    # a signal is ignored due to the aforementioned
1305                    # race condition, check it.
1306                    self.assertIsInstance(cm.unraisable.exc_value, OSError)
1307                    self.assertIn(
1308                        f"Signal {signum:d} ignored due to race condition",
1309                        str(cm.unraisable.exc_value))
1310                    ignored = True
1311
1312            # bpo-43406: Even if it is unlikely, it's technically possible that
1313            # all signals were ignored because of race conditions.
1314            if not ignored:
1315                # Sanity check that some signals were received, but not all
1316                self.assertGreater(num_received_signals, 0)
1317            self.assertLess(num_received_signals, num_sent_signals)
1318        finally:
1319            do_stop = True
1320            t.join()
1321
1322
1323class RaiseSignalTest(unittest.TestCase):
1324
1325    def test_sigint(self):
1326        with self.assertRaises(KeyboardInterrupt):
1327            signal.raise_signal(signal.SIGINT)
1328
1329    @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1330    def test_invalid_argument(self):
1331        try:
1332            SIGHUP = 1 # not supported on win32
1333            signal.raise_signal(SIGHUP)
1334            self.fail("OSError (Invalid argument) expected")
1335        except OSError as e:
1336            if e.errno == errno.EINVAL:
1337                pass
1338            else:
1339                raise
1340
1341    def test_handler(self):
1342        is_ok = False
1343        def handler(a, b):
1344            nonlocal is_ok
1345            is_ok = True
1346        old_signal = signal.signal(signal.SIGINT, handler)
1347        self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1348
1349        signal.raise_signal(signal.SIGINT)
1350        self.assertTrue(is_ok)
1351
1352
1353class PidfdSignalTest(unittest.TestCase):
1354
1355    @unittest.skipUnless(
1356        hasattr(signal, "pidfd_send_signal"),
1357        "pidfd support not built in",
1358    )
1359    def test_pidfd_send_signal(self):
1360        with self.assertRaises(OSError) as cm:
1361            signal.pidfd_send_signal(0, signal.SIGINT)
1362        if cm.exception.errno == errno.ENOSYS:
1363            self.skipTest("kernel does not support pidfds")
1364        elif cm.exception.errno == errno.EPERM:
1365            self.skipTest("Not enough privileges to use pidfs")
1366        self.assertEqual(cm.exception.errno, errno.EBADF)
1367        my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
1368        self.addCleanup(os.close, my_pidfd)
1369        with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
1370            signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
1371        with self.assertRaises(KeyboardInterrupt):
1372            signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
1373
1374def tearDownModule():
1375    support.reap_children()
1376
1377if __name__ == "__main__":
1378    unittest.main()
1379