• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2import os
3import random
4import signal
5import socket
6import statistics
7import subprocess
8import sys
9import time
10import unittest
11from test import support
12from test.support.script_helper import assert_python_ok, spawn_python
13try:
14    import _testcapi
15except ImportError:
16    _testcapi = None
17
18
19class GenericTests(unittest.TestCase):
20
21    def test_enums(self):
22        for name in dir(signal):
23            sig = getattr(signal, name)
24            if name in {'SIG_DFL', 'SIG_IGN'}:
25                self.assertIsInstance(sig, signal.Handlers)
26            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
27                self.assertIsInstance(sig, signal.Sigmasks)
28            elif name.startswith('SIG') and not name.startswith('SIG_'):
29                self.assertIsInstance(sig, signal.Signals)
30            elif name.startswith('CTRL_'):
31                self.assertIsInstance(sig, signal.Signals)
32                self.assertEqual(sys.platform, "win32")
33
34
35@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
36class PosixTests(unittest.TestCase):
37    def trivial_signal_handler(self, *args):
38        pass
39
40    def test_out_of_range_signal_number_raises_error(self):
41        self.assertRaises(ValueError, signal.getsignal, 4242)
42
43        self.assertRaises(ValueError, signal.signal, 4242,
44                          self.trivial_signal_handler)
45
46        self.assertRaises(ValueError, signal.strsignal, 4242)
47
48    def test_setting_signal_handler_to_none_raises_error(self):
49        self.assertRaises(TypeError, signal.signal,
50                          signal.SIGUSR1, None)
51
52    def test_getsignal(self):
53        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
54        self.assertIsInstance(hup, signal.Handlers)
55        self.assertEqual(signal.getsignal(signal.SIGHUP),
56                         self.trivial_signal_handler)
57        signal.signal(signal.SIGHUP, hup)
58        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
59
60    def test_strsignal(self):
61        self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
62        self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
63        self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
64
65    # Issue 3864, unknown if this affects earlier versions of freebsd also
66    def test_interprocess_signal(self):
67        dirname = os.path.dirname(__file__)
68        script = os.path.join(dirname, 'signalinterproctester.py')
69        assert_python_ok(script)
70
71    def test_valid_signals(self):
72        s = signal.valid_signals()
73        self.assertIsInstance(s, set)
74        self.assertIn(signal.Signals.SIGINT, s)
75        self.assertIn(signal.Signals.SIGALRM, s)
76        self.assertNotIn(0, s)
77        self.assertNotIn(signal.NSIG, s)
78        self.assertLess(len(s), signal.NSIG)
79
80    @unittest.skipUnless(sys.executable, "sys.executable required.")
81    def test_keyboard_interrupt_exit_code(self):
82        """KeyboardInterrupt triggers exit via SIGINT."""
83        process = subprocess.run(
84                [sys.executable, "-c",
85                 "import os, signal, time\n"
86                 "os.kill(os.getpid(), signal.SIGINT)\n"
87                 "for _ in range(999): time.sleep(0.01)"],
88                stderr=subprocess.PIPE)
89        self.assertIn(b"KeyboardInterrupt", process.stderr)
90        self.assertEqual(process.returncode, -signal.SIGINT)
91        # Caveat: The exit code is insufficient to guarantee we actually died
92        # via a signal.  POSIX shells do more than look at the 8 bit value.
93        # Writing an automation friendly test of an interactive shell
94        # to confirm that our process died via a SIGINT proved too complex.
95
96
97@unittest.skipUnless(sys.platform == "win32", "Windows specific")
98class WindowsSignalTests(unittest.TestCase):
99
100    def test_valid_signals(self):
101        s = signal.valid_signals()
102        self.assertIsInstance(s, set)
103        self.assertGreaterEqual(len(s), 6)
104        self.assertIn(signal.Signals.SIGINT, s)
105        self.assertNotIn(0, s)
106        self.assertNotIn(signal.NSIG, s)
107        self.assertLess(len(s), signal.NSIG)
108
109    def test_issue9324(self):
110        # Updated for issue #10003, adding SIGBREAK
111        handler = lambda x, y: None
112        checked = set()
113        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
114                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
115                    signal.SIGTERM):
116            # Set and then reset a handler for signals that work on windows.
117            # Issue #18396, only for signals without a C-level handler.
118            if signal.getsignal(sig) is not None:
119                signal.signal(sig, signal.signal(sig, handler))
120                checked.add(sig)
121        # Issue #18396: Ensure the above loop at least tested *something*
122        self.assertTrue(checked)
123
124        with self.assertRaises(ValueError):
125            signal.signal(-1, handler)
126
127        with self.assertRaises(ValueError):
128            signal.signal(7, handler)
129
130    @unittest.skipUnless(sys.executable, "sys.executable required.")
131    def test_keyboard_interrupt_exit_code(self):
132        """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
133        # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
134        # as that requires setting up a console control handler in a child
135        # in its own process group.  Doable, but quite complicated.  (see
136        # @eryksun on https://github.com/python/cpython/pull/11862)
137        process = subprocess.run(
138                [sys.executable, "-c", "raise KeyboardInterrupt"],
139                stderr=subprocess.PIPE)
140        self.assertIn(b"KeyboardInterrupt", process.stderr)
141        STATUS_CONTROL_C_EXIT = 0xC000013A
142        self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
143
144
145class WakeupFDTests(unittest.TestCase):
146
147    def test_invalid_call(self):
148        # First parameter is positional-only
149        with self.assertRaises(TypeError):
150            signal.set_wakeup_fd(signum=signal.SIGINT)
151
152        # warn_on_full_buffer is a keyword-only parameter
153        with self.assertRaises(TypeError):
154            signal.set_wakeup_fd(signal.SIGINT, False)
155
156    def test_invalid_fd(self):
157        fd = support.make_bad_fd()
158        self.assertRaises((ValueError, OSError),
159                          signal.set_wakeup_fd, fd)
160
161    def test_invalid_socket(self):
162        sock = socket.socket()
163        fd = sock.fileno()
164        sock.close()
165        self.assertRaises((ValueError, OSError),
166                          signal.set_wakeup_fd, fd)
167
168    def test_set_wakeup_fd_result(self):
169        r1, w1 = os.pipe()
170        self.addCleanup(os.close, r1)
171        self.addCleanup(os.close, w1)
172        r2, w2 = os.pipe()
173        self.addCleanup(os.close, r2)
174        self.addCleanup(os.close, w2)
175
176        if hasattr(os, 'set_blocking'):
177            os.set_blocking(w1, False)
178            os.set_blocking(w2, False)
179
180        signal.set_wakeup_fd(w1)
181        self.assertEqual(signal.set_wakeup_fd(w2), w1)
182        self.assertEqual(signal.set_wakeup_fd(-1), w2)
183        self.assertEqual(signal.set_wakeup_fd(-1), -1)
184
185    def test_set_wakeup_fd_socket_result(self):
186        sock1 = socket.socket()
187        self.addCleanup(sock1.close)
188        sock1.setblocking(False)
189        fd1 = sock1.fileno()
190
191        sock2 = socket.socket()
192        self.addCleanup(sock2.close)
193        sock2.setblocking(False)
194        fd2 = sock2.fileno()
195
196        signal.set_wakeup_fd(fd1)
197        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
198        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
199        self.assertEqual(signal.set_wakeup_fd(-1), -1)
200
201    # On Windows, files are always blocking and Windows does not provide a
202    # function to test if a socket is in non-blocking mode.
203    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
204    def test_set_wakeup_fd_blocking(self):
205        rfd, wfd = os.pipe()
206        self.addCleanup(os.close, rfd)
207        self.addCleanup(os.close, wfd)
208
209        # fd must be non-blocking
210        os.set_blocking(wfd, True)
211        with self.assertRaises(ValueError) as cm:
212            signal.set_wakeup_fd(wfd)
213        self.assertEqual(str(cm.exception),
214                         "the fd %s must be in non-blocking mode" % wfd)
215
216        # non-blocking is ok
217        os.set_blocking(wfd, False)
218        signal.set_wakeup_fd(wfd)
219        signal.set_wakeup_fd(-1)
220
221
222@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
223class WakeupSignalTests(unittest.TestCase):
224    @unittest.skipIf(_testcapi is None, 'need _testcapi')
225    def check_wakeup(self, test_body, *signals, ordered=True):
226        # use a subprocess to have only one thread
227        code = """if 1:
228        import _testcapi
229        import os
230        import signal
231        import struct
232
233        signals = {!r}
234
235        def handler(signum, frame):
236            pass
237
238        def check_signum(signals):
239            data = os.read(read, len(signals)+1)
240            raised = struct.unpack('%uB' % len(data), data)
241            if not {!r}:
242                raised = set(raised)
243                signals = set(signals)
244            if raised != signals:
245                raise Exception("%r != %r" % (raised, signals))
246
247        {}
248
249        signal.signal(signal.SIGALRM, handler)
250        read, write = os.pipe()
251        os.set_blocking(write, False)
252        signal.set_wakeup_fd(write)
253
254        test()
255        check_signum(signals)
256
257        os.close(read)
258        os.close(write)
259        """.format(tuple(map(int, signals)), ordered, test_body)
260
261        assert_python_ok('-c', code)
262
263    @unittest.skipIf(_testcapi is None, 'need _testcapi')
264    def test_wakeup_write_error(self):
265        # Issue #16105: write() errors in the C signal handler should not
266        # pass silently.
267        # Use a subprocess to have only one thread.
268        code = """if 1:
269        import _testcapi
270        import errno
271        import os
272        import signal
273        import sys
274        from test.support import captured_stderr
275
276        def handler(signum, frame):
277            1/0
278
279        signal.signal(signal.SIGALRM, handler)
280        r, w = os.pipe()
281        os.set_blocking(r, False)
282
283        # Set wakeup_fd a read-only file descriptor to trigger the error
284        signal.set_wakeup_fd(r)
285        try:
286            with captured_stderr() as err:
287                signal.raise_signal(signal.SIGALRM)
288        except ZeroDivisionError:
289            # An ignored exception should have been printed out on stderr
290            err = err.getvalue()
291            if ('Exception ignored when trying to write to the signal wakeup fd'
292                not in err):
293                raise AssertionError(err)
294            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
295                raise AssertionError(err)
296        else:
297            raise AssertionError("ZeroDivisionError not raised")
298
299        os.close(r)
300        os.close(w)
301        """
302        r, w = os.pipe()
303        try:
304            os.write(r, b'x')
305        except OSError:
306            pass
307        else:
308            self.skipTest("OS doesn't report write() error on the read end of a pipe")
309        finally:
310            os.close(r)
311            os.close(w)
312
313        assert_python_ok('-c', code)
314
315    def test_wakeup_fd_early(self):
316        self.check_wakeup("""def test():
317            import select
318            import time
319
320            TIMEOUT_FULL = 10
321            TIMEOUT_HALF = 5
322
323            class InterruptSelect(Exception):
324                pass
325
326            def handler(signum, frame):
327                raise InterruptSelect
328            signal.signal(signal.SIGALRM, handler)
329
330            signal.alarm(1)
331
332            # We attempt to get a signal during the sleep,
333            # before select is called
334            try:
335                select.select([], [], [], TIMEOUT_FULL)
336            except InterruptSelect:
337                pass
338            else:
339                raise Exception("select() was not interrupted")
340
341            before_time = time.monotonic()
342            select.select([read], [], [], TIMEOUT_FULL)
343            after_time = time.monotonic()
344            dt = after_time - before_time
345            if dt >= TIMEOUT_HALF:
346                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
347        """, signal.SIGALRM)
348
349    def test_wakeup_fd_during(self):
350        self.check_wakeup("""def test():
351            import select
352            import time
353
354            TIMEOUT_FULL = 10
355            TIMEOUT_HALF = 5
356
357            class InterruptSelect(Exception):
358                pass
359
360            def handler(signum, frame):
361                raise InterruptSelect
362            signal.signal(signal.SIGALRM, handler)
363
364            signal.alarm(1)
365            before_time = time.monotonic()
366            # We attempt to get a signal during the select call
367            try:
368                select.select([read], [], [], TIMEOUT_FULL)
369            except InterruptSelect:
370                pass
371            else:
372                raise Exception("select() was not interrupted")
373            after_time = time.monotonic()
374            dt = after_time - before_time
375            if dt >= TIMEOUT_HALF:
376                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
377        """, signal.SIGALRM)
378
379    def test_signum(self):
380        self.check_wakeup("""def test():
381            signal.signal(signal.SIGUSR1, handler)
382            signal.raise_signal(signal.SIGUSR1)
383            signal.raise_signal(signal.SIGALRM)
384        """, signal.SIGUSR1, signal.SIGALRM)
385
386    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
387                         'need signal.pthread_sigmask()')
388    def test_pending(self):
389        self.check_wakeup("""def test():
390            signum1 = signal.SIGUSR1
391            signum2 = signal.SIGUSR2
392
393            signal.signal(signum1, handler)
394            signal.signal(signum2, handler)
395
396            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
397            signal.raise_signal(signum1)
398            signal.raise_signal(signum2)
399            # Unblocking the 2 signals calls the C signal handler twice
400            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
401        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
402
403
404@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
405class WakeupSocketSignalTests(unittest.TestCase):
406
407    @unittest.skipIf(_testcapi is None, 'need _testcapi')
408    def test_socket(self):
409        # use a subprocess to have only one thread
410        code = """if 1:
411        import signal
412        import socket
413        import struct
414        import _testcapi
415
416        signum = signal.SIGINT
417        signals = (signum,)
418
419        def handler(signum, frame):
420            pass
421
422        signal.signal(signum, handler)
423
424        read, write = socket.socketpair()
425        write.setblocking(False)
426        signal.set_wakeup_fd(write.fileno())
427
428        signal.raise_signal(signum)
429
430        data = read.recv(1)
431        if not data:
432            raise Exception("no signum written")
433        raised = struct.unpack('B', data)
434        if raised != signals:
435            raise Exception("%r != %r" % (raised, signals))
436
437        read.close()
438        write.close()
439        """
440
441        assert_python_ok('-c', code)
442
443    @unittest.skipIf(_testcapi is None, 'need _testcapi')
444    def test_send_error(self):
445        # Use a subprocess to have only one thread.
446        if os.name == 'nt':
447            action = 'send'
448        else:
449            action = 'write'
450        code = """if 1:
451        import errno
452        import signal
453        import socket
454        import sys
455        import time
456        import _testcapi
457        from test.support import captured_stderr
458
459        signum = signal.SIGINT
460
461        def handler(signum, frame):
462            pass
463
464        signal.signal(signum, handler)
465
466        read, write = socket.socketpair()
467        read.setblocking(False)
468        write.setblocking(False)
469
470        signal.set_wakeup_fd(write.fileno())
471
472        # Close sockets: send() will fail
473        read.close()
474        write.close()
475
476        with captured_stderr() as err:
477            signal.raise_signal(signum)
478
479        err = err.getvalue()
480        if ('Exception ignored when trying to {action} to the signal wakeup fd'
481            not in err):
482            raise AssertionError(err)
483        """.format(action=action)
484        assert_python_ok('-c', code)
485
486    @unittest.skipIf(_testcapi is None, 'need _testcapi')
487    def test_warn_on_full_buffer(self):
488        # Use a subprocess to have only one thread.
489        if os.name == 'nt':
490            action = 'send'
491        else:
492            action = 'write'
493        code = """if 1:
494        import errno
495        import signal
496        import socket
497        import sys
498        import time
499        import _testcapi
500        from test.support import captured_stderr
501
502        signum = signal.SIGINT
503
504        # This handler will be called, but we intentionally won't read from
505        # the wakeup fd.
506        def handler(signum, frame):
507            pass
508
509        signal.signal(signum, handler)
510
511        read, write = socket.socketpair()
512
513        # Fill the socketpair buffer
514        if sys.platform == 'win32':
515            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
516            # the full socketpair buffer, so use a timeout of 50 ms instead.
517            write.settimeout(0.050)
518        else:
519            write.setblocking(False)
520
521        # Start with large chunk size to reduce the
522        # number of send needed to fill the buffer.
523        written = 0
524        for chunk_size in (2 ** 16, 2 ** 8, 1):
525            chunk = b"x" * chunk_size
526            try:
527                while True:
528                    write.send(chunk)
529                    written += chunk_size
530            except (BlockingIOError, socket.timeout):
531                pass
532
533        print(f"%s bytes written into the socketpair" % written, flush=True)
534
535        write.setblocking(False)
536        try:
537            write.send(b"x")
538        except BlockingIOError:
539            # The socketpair buffer seems full
540            pass
541        else:
542            raise AssertionError("%s bytes failed to fill the socketpair "
543                                 "buffer" % written)
544
545        # By default, we get a warning when a signal arrives
546        msg = ('Exception ignored when trying to {action} '
547               'to the signal wakeup fd')
548        signal.set_wakeup_fd(write.fileno())
549
550        with captured_stderr() as err:
551            signal.raise_signal(signum)
552
553        err = err.getvalue()
554        if msg not in err:
555            raise AssertionError("first set_wakeup_fd() test failed, "
556                                 "stderr: %r" % err)
557
558        # And also if warn_on_full_buffer=True
559        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
560
561        with captured_stderr() as err:
562            signal.raise_signal(signum)
563
564        err = err.getvalue()
565        if msg not in err:
566            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
567                                 "test failed, stderr: %r" % err)
568
569        # But not if warn_on_full_buffer=False
570        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
571
572        with captured_stderr() as err:
573            signal.raise_signal(signum)
574
575        err = err.getvalue()
576        if err != "":
577            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
578                                 "test failed, stderr: %r" % err)
579
580        # And then check the default again, to make sure warn_on_full_buffer
581        # settings don't leak across calls.
582        signal.set_wakeup_fd(write.fileno())
583
584        with captured_stderr() as err:
585            signal.raise_signal(signum)
586
587        err = err.getvalue()
588        if msg not in err:
589            raise AssertionError("second set_wakeup_fd() test failed, "
590                                 "stderr: %r" % err)
591
592        """.format(action=action)
593        assert_python_ok('-c', code)
594
595
596@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
597class SiginterruptTest(unittest.TestCase):
598
599    def readpipe_interrupted(self, interrupt):
600        """Perform a read during which a signal will arrive.  Return True if the
601        read is interrupted by the signal and raises an exception.  Return False
602        if it returns normally.
603        """
604        # use a subprocess to have only one thread, to have a timeout on the
605        # blocking read and to not touch signal handling in this process
606        code = """if 1:
607            import errno
608            import os
609            import signal
610            import sys
611
612            interrupt = %r
613            r, w = os.pipe()
614
615            def handler(signum, frame):
616                1 / 0
617
618            signal.signal(signal.SIGALRM, handler)
619            if interrupt is not None:
620                signal.siginterrupt(signal.SIGALRM, interrupt)
621
622            print("ready")
623            sys.stdout.flush()
624
625            # run the test twice
626            try:
627                for loop in range(2):
628                    # send a SIGALRM in a second (during the read)
629                    signal.alarm(1)
630                    try:
631                        # blocking call: read from a pipe without data
632                        os.read(r, 1)
633                    except ZeroDivisionError:
634                        pass
635                    else:
636                        sys.exit(2)
637                sys.exit(3)
638            finally:
639                os.close(r)
640                os.close(w)
641        """ % (interrupt,)
642        with spawn_python('-c', code) as process:
643            try:
644                # wait until the child process is loaded and has started
645                first_line = process.stdout.readline()
646
647                stdout, stderr = process.communicate(timeout=5.0)
648            except subprocess.TimeoutExpired:
649                process.kill()
650                return False
651            else:
652                stdout = first_line + stdout
653                exitcode = process.wait()
654                if exitcode not in (2, 3):
655                    raise Exception("Child error (exit code %s): %r"
656                                    % (exitcode, stdout))
657                return (exitcode == 3)
658
659    def test_without_siginterrupt(self):
660        # If a signal handler is installed and siginterrupt is not called
661        # at all, when that signal arrives, it interrupts a syscall that's in
662        # progress.
663        interrupted = self.readpipe_interrupted(None)
664        self.assertTrue(interrupted)
665
666    def test_siginterrupt_on(self):
667        # If a signal handler is installed and siginterrupt is called with
668        # a true value for the second argument, when that signal arrives, it
669        # interrupts a syscall that's in progress.
670        interrupted = self.readpipe_interrupted(True)
671        self.assertTrue(interrupted)
672
673    def test_siginterrupt_off(self):
674        # If a signal handler is installed and siginterrupt is called with
675        # a false value for the second argument, when that signal arrives, it
676        # does not interrupt a syscall that's in progress.
677        interrupted = self.readpipe_interrupted(False)
678        self.assertFalse(interrupted)
679
680
681@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
682class ItimerTest(unittest.TestCase):
683    def setUp(self):
684        self.hndl_called = False
685        self.hndl_count = 0
686        self.itimer = None
687        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
688
689    def tearDown(self):
690        signal.signal(signal.SIGALRM, self.old_alarm)
691        if self.itimer is not None: # test_itimer_exc doesn't change this attr
692            # just ensure that itimer is stopped
693            signal.setitimer(self.itimer, 0)
694
695    def sig_alrm(self, *args):
696        self.hndl_called = True
697
698    def sig_vtalrm(self, *args):
699        self.hndl_called = True
700
701        if self.hndl_count > 3:
702            # it shouldn't be here, because it should have been disabled.
703            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
704                "timer.")
705        elif self.hndl_count == 3:
706            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
707            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
708
709        self.hndl_count += 1
710
711    def sig_prof(self, *args):
712        self.hndl_called = True
713        signal.setitimer(signal.ITIMER_PROF, 0)
714
715    def test_itimer_exc(self):
716        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
717        # defines it ?
718        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
719        # Negative times are treated as zero on some platforms.
720        if 0:
721            self.assertRaises(signal.ItimerError,
722                              signal.setitimer, signal.ITIMER_REAL, -1)
723
724    def test_itimer_real(self):
725        self.itimer = signal.ITIMER_REAL
726        signal.setitimer(self.itimer, 1.0)
727        signal.pause()
728        self.assertEqual(self.hndl_called, True)
729
730    # Issue 3864, unknown if this affects earlier versions of freebsd also
731    @unittest.skipIf(sys.platform in ('netbsd5',),
732        'itimer not reliable (does not mix well with threading) on some BSDs.')
733    def test_itimer_virtual(self):
734        self.itimer = signal.ITIMER_VIRTUAL
735        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
736        signal.setitimer(self.itimer, 0.3, 0.2)
737
738        start_time = time.monotonic()
739        while time.monotonic() - start_time < 60.0:
740            # use up some virtual time by doing real work
741            _ = pow(12345, 67890, 10000019)
742            if signal.getitimer(self.itimer) == (0.0, 0.0):
743                break # sig_vtalrm handler stopped this itimer
744        else: # Issue 8424
745            self.skipTest("timeout: likely cause: machine too slow or load too "
746                          "high")
747
748        # virtual itimer should be (0.0, 0.0) now
749        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
750        # and the handler should have been called
751        self.assertEqual(self.hndl_called, True)
752
753    def test_itimer_prof(self):
754        self.itimer = signal.ITIMER_PROF
755        signal.signal(signal.SIGPROF, self.sig_prof)
756        signal.setitimer(self.itimer, 0.2, 0.2)
757
758        start_time = time.monotonic()
759        while time.monotonic() - start_time < 60.0:
760            # do some work
761            _ = pow(12345, 67890, 10000019)
762            if signal.getitimer(self.itimer) == (0.0, 0.0):
763                break # sig_prof handler stopped this itimer
764        else: # Issue 8424
765            self.skipTest("timeout: likely cause: machine too slow or load too "
766                          "high")
767
768        # profiling itimer should be (0.0, 0.0) now
769        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
770        # and the handler should have been called
771        self.assertEqual(self.hndl_called, True)
772
773    def test_setitimer_tiny(self):
774        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
775        # Check that float -> timeval conversion doesn't round
776        # the interval down to zero, which would disable the timer.
777        self.itimer = signal.ITIMER_REAL
778        signal.setitimer(self.itimer, 1e-6)
779        time.sleep(1)
780        self.assertEqual(self.hndl_called, True)
781
782
783class PendingSignalsTests(unittest.TestCase):
784    """
785    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
786    functions.
787    """
788    @unittest.skipUnless(hasattr(signal, 'sigpending'),
789                         'need signal.sigpending()')
790    def test_sigpending_empty(self):
791        self.assertEqual(signal.sigpending(), set())
792
793    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
794                         'need signal.pthread_sigmask()')
795    @unittest.skipUnless(hasattr(signal, 'sigpending'),
796                         'need signal.sigpending()')
797    def test_sigpending(self):
798        code = """if 1:
799            import os
800            import signal
801
802            def handler(signum, frame):
803                1/0
804
805            signum = signal.SIGUSR1
806            signal.signal(signum, handler)
807
808            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
809            os.kill(os.getpid(), signum)
810            pending = signal.sigpending()
811            for sig in pending:
812                assert isinstance(sig, signal.Signals), repr(pending)
813            if pending != {signum}:
814                raise Exception('%s != {%s}' % (pending, signum))
815            try:
816                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
817            except ZeroDivisionError:
818                pass
819            else:
820                raise Exception("ZeroDivisionError not raised")
821        """
822        assert_python_ok('-c', code)
823
824    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
825                         'need signal.pthread_kill()')
826    def test_pthread_kill(self):
827        code = """if 1:
828            import signal
829            import threading
830            import sys
831
832            signum = signal.SIGUSR1
833
834            def handler(signum, frame):
835                1/0
836
837            signal.signal(signum, handler)
838
839            tid = threading.get_ident()
840            try:
841                signal.pthread_kill(tid, signum)
842            except ZeroDivisionError:
843                pass
844            else:
845                raise Exception("ZeroDivisionError not raised")
846        """
847        assert_python_ok('-c', code)
848
849    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
850                         'need signal.pthread_sigmask()')
851    def wait_helper(self, blocked, test):
852        """
853        test: body of the "def test(signum):" function.
854        blocked: number of the blocked signal
855        """
856        code = '''if 1:
857        import signal
858        import sys
859        from signal import Signals
860
861        def handler(signum, frame):
862            1/0
863
864        %s
865
866        blocked = %s
867        signum = signal.SIGALRM
868
869        # child: block and wait the signal
870        try:
871            signal.signal(signum, handler)
872            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
873
874            # Do the tests
875            test(signum)
876
877            # The handler must not be called on unblock
878            try:
879                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
880            except ZeroDivisionError:
881                print("the signal handler has been called",
882                      file=sys.stderr)
883                sys.exit(1)
884        except BaseException as err:
885            print("error: {}".format(err), file=sys.stderr)
886            sys.stderr.flush()
887            sys.exit(1)
888        ''' % (test.strip(), blocked)
889
890        # sig*wait* must be called with the signal blocked: since the current
891        # process might have several threads running, use a subprocess to have
892        # a single thread.
893        assert_python_ok('-c', code)
894
895    @unittest.skipUnless(hasattr(signal, 'sigwait'),
896                         'need signal.sigwait()')
897    def test_sigwait(self):
898        self.wait_helper(signal.SIGALRM, '''
899        def test(signum):
900            signal.alarm(1)
901            received = signal.sigwait([signum])
902            assert isinstance(received, signal.Signals), received
903            if received != signum:
904                raise Exception('received %s, not %s' % (received, signum))
905        ''')
906
907    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
908                         'need signal.sigwaitinfo()')
909    def test_sigwaitinfo(self):
910        self.wait_helper(signal.SIGALRM, '''
911        def test(signum):
912            signal.alarm(1)
913            info = signal.sigwaitinfo([signum])
914            if info.si_signo != signum:
915                raise Exception("info.si_signo != %s" % signum)
916        ''')
917
918    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
919                         'need signal.sigtimedwait()')
920    def test_sigtimedwait(self):
921        self.wait_helper(signal.SIGALRM, '''
922        def test(signum):
923            signal.alarm(1)
924            info = signal.sigtimedwait([signum], 10.1000)
925            if info.si_signo != signum:
926                raise Exception('info.si_signo != %s' % signum)
927        ''')
928
929    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
930                         'need signal.sigtimedwait()')
931    def test_sigtimedwait_poll(self):
932        # check that polling with sigtimedwait works
933        self.wait_helper(signal.SIGALRM, '''
934        def test(signum):
935            import os
936            os.kill(os.getpid(), signum)
937            info = signal.sigtimedwait([signum], 0)
938            if info.si_signo != signum:
939                raise Exception('info.si_signo != %s' % signum)
940        ''')
941
942    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
943                         'need signal.sigtimedwait()')
944    def test_sigtimedwait_timeout(self):
945        self.wait_helper(signal.SIGALRM, '''
946        def test(signum):
947            received = signal.sigtimedwait([signum], 1.0)
948            if received is not None:
949                raise Exception("received=%r" % (received,))
950        ''')
951
952    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
953                         'need signal.sigtimedwait()')
954    def test_sigtimedwait_negative_timeout(self):
955        signum = signal.SIGALRM
956        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
957
958    @unittest.skipUnless(hasattr(signal, 'sigwait'),
959                         'need signal.sigwait()')
960    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
961                         'need signal.pthread_sigmask()')
962    def test_sigwait_thread(self):
963        # Check that calling sigwait() from a thread doesn't suspend the whole
964        # process. A new interpreter is spawned to avoid problems when mixing
965        # threads and fork(): only async-safe functions are allowed between
966        # fork() and exec().
967        assert_python_ok("-c", """if True:
968            import os, threading, sys, time, signal
969
970            # the default handler terminates the process
971            signum = signal.SIGUSR1
972
973            def kill_later():
974                # wait until the main thread is waiting in sigwait()
975                time.sleep(1)
976                os.kill(os.getpid(), signum)
977
978            # the signal must be blocked by all the threads
979            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
980            killer = threading.Thread(target=kill_later)
981            killer.start()
982            received = signal.sigwait([signum])
983            if received != signum:
984                print("sigwait() received %s, not %s" % (received, signum),
985                      file=sys.stderr)
986                sys.exit(1)
987            killer.join()
988            # unblock the signal, which should have been cleared by sigwait()
989            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
990        """)
991
992    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
993                         'need signal.pthread_sigmask()')
994    def test_pthread_sigmask_arguments(self):
995        self.assertRaises(TypeError, signal.pthread_sigmask)
996        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
997        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
998        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
999        with self.assertRaises(ValueError):
1000            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
1001        with self.assertRaises(ValueError):
1002            signal.pthread_sigmask(signal.SIG_BLOCK, [0])
1003        with self.assertRaises(ValueError):
1004            signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
1005
1006    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1007                         'need signal.pthread_sigmask()')
1008    def test_pthread_sigmask_valid_signals(self):
1009        s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
1010        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
1011        # Get current blocked set
1012        s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
1013        self.assertLessEqual(s, signal.valid_signals())
1014
1015    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
1016                         'need signal.pthread_sigmask()')
1017    def test_pthread_sigmask(self):
1018        code = """if 1:
1019        import signal
1020        import os; import threading
1021
1022        def handler(signum, frame):
1023            1/0
1024
1025        def kill(signum):
1026            os.kill(os.getpid(), signum)
1027
1028        def check_mask(mask):
1029            for sig in mask:
1030                assert isinstance(sig, signal.Signals), repr(sig)
1031
1032        def read_sigmask():
1033            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
1034            check_mask(sigmask)
1035            return sigmask
1036
1037        signum = signal.SIGUSR1
1038
1039        # Install our signal handler
1040        old_handler = signal.signal(signum, handler)
1041
1042        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
1043        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1044        check_mask(old_mask)
1045        try:
1046            kill(signum)
1047        except ZeroDivisionError:
1048            pass
1049        else:
1050            raise Exception("ZeroDivisionError not raised")
1051
1052        # Block and then raise SIGUSR1. The signal is blocked: the signal
1053        # handler is not called, and the signal is now pending
1054        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
1055        check_mask(mask)
1056        kill(signum)
1057
1058        # Check the new mask
1059        blocked = read_sigmask()
1060        check_mask(blocked)
1061        if signum not in blocked:
1062            raise Exception("%s not in %s" % (signum, blocked))
1063        if old_mask ^ blocked != {signum}:
1064            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
1065
1066        # Unblock SIGUSR1
1067        try:
1068            # unblock the pending signal calls immediately the signal handler
1069            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
1070        except ZeroDivisionError:
1071            pass
1072        else:
1073            raise Exception("ZeroDivisionError not raised")
1074        try:
1075            kill(signum)
1076        except ZeroDivisionError:
1077            pass
1078        else:
1079            raise Exception("ZeroDivisionError not raised")
1080
1081        # Check the new mask
1082        unblocked = read_sigmask()
1083        if signum in unblocked:
1084            raise Exception("%s in %s" % (signum, unblocked))
1085        if blocked ^ unblocked != {signum}:
1086            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
1087        if old_mask != unblocked:
1088            raise Exception("%s != %s" % (old_mask, unblocked))
1089        """
1090        assert_python_ok('-c', code)
1091
1092    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
1093                         'need signal.pthread_kill()')
1094    def test_pthread_kill_main_thread(self):
1095        # Test that a signal can be sent to the main thread with pthread_kill()
1096        # before any other thread has been created (see issue #12392).
1097        code = """if True:
1098            import threading
1099            import signal
1100            import sys
1101
1102            def handler(signum, frame):
1103                sys.exit(3)
1104
1105            signal.signal(signal.SIGUSR1, handler)
1106            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
1107            sys.exit(2)
1108        """
1109
1110        with spawn_python('-c', code) as process:
1111            stdout, stderr = process.communicate()
1112            exitcode = process.wait()
1113            if exitcode != 3:
1114                raise Exception("Child error (exit code %s): %s" %
1115                                (exitcode, stdout))
1116
1117
1118class StressTest(unittest.TestCase):
1119    """
1120    Stress signal delivery, especially when a signal arrives in
1121    the middle of recomputing the signal state or executing
1122    previously tripped signal handlers.
1123    """
1124
1125    def setsig(self, signum, handler):
1126        old_handler = signal.signal(signum, handler)
1127        self.addCleanup(signal.signal, signum, old_handler)
1128
1129    def measure_itimer_resolution(self):
1130        N = 20
1131        times = []
1132
1133        def handler(signum=None, frame=None):
1134            if len(times) < N:
1135                times.append(time.perf_counter())
1136                # 1 µs is the smallest possible timer interval,
1137                # we want to measure what the concrete duration
1138                # will be on this platform
1139                signal.setitimer(signal.ITIMER_REAL, 1e-6)
1140
1141        self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
1142        self.setsig(signal.SIGALRM, handler)
1143        handler()
1144        while len(times) < N:
1145            time.sleep(1e-3)
1146
1147        durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
1148        med = statistics.median(durations)
1149        if support.verbose:
1150            print("detected median itimer() resolution: %.6f s." % (med,))
1151        return med
1152
1153    def decide_itimer_count(self):
1154        # Some systems have poor setitimer() resolution (for example
1155        # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
1156        # number of sequential timers based on that.
1157        reso = self.measure_itimer_resolution()
1158        if reso <= 1e-4:
1159            return 10000
1160        elif reso <= 1e-2:
1161            return 100
1162        else:
1163            self.skipTest("detected itimer resolution (%.3f s.) too high "
1164                          "(> 10 ms.) on this platform (or system too busy)"
1165                          % (reso,))
1166
1167    @unittest.skipUnless(hasattr(signal, "setitimer"),
1168                         "test needs setitimer()")
1169    def test_stress_delivery_dependent(self):
1170        """
1171        This test uses dependent signal handlers.
1172        """
1173        N = self.decide_itimer_count()
1174        sigs = []
1175
1176        def first_handler(signum, frame):
1177            # 1e-6 is the minimum non-zero value for `setitimer()`.
1178            # Choose a random delay so as to improve chances of
1179            # triggering a race condition.  Ideally the signal is received
1180            # when inside critical signal-handling routines such as
1181            # Py_MakePendingCalls().
1182            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1183
1184        def second_handler(signum=None, frame=None):
1185            sigs.append(signum)
1186
1187        # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
1188        # ascending and descending sequences (SIGUSR1 then SIGALRM,
1189        # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
1190        self.setsig(signal.SIGPROF, first_handler)
1191        self.setsig(signal.SIGUSR1, first_handler)
1192        self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
1193
1194        expected_sigs = 0
1195        deadline = time.monotonic() + 15.0
1196
1197        while expected_sigs < N:
1198            os.kill(os.getpid(), signal.SIGPROF)
1199            expected_sigs += 1
1200            # Wait for handlers to run to avoid signal coalescing
1201            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1202                time.sleep(1e-5)
1203
1204            os.kill(os.getpid(), signal.SIGUSR1)
1205            expected_sigs += 1
1206            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1207                time.sleep(1e-5)
1208
1209        # All ITIMER_REAL signals should have been delivered to the
1210        # Python handler
1211        self.assertEqual(len(sigs), N, "Some signals were lost")
1212
1213    @unittest.skipUnless(hasattr(signal, "setitimer"),
1214                         "test needs setitimer()")
1215    def test_stress_delivery_simultaneous(self):
1216        """
1217        This test uses simultaneous signal handlers.
1218        """
1219        N = self.decide_itimer_count()
1220        sigs = []
1221
1222        def handler(signum, frame):
1223            sigs.append(signum)
1224
1225        self.setsig(signal.SIGUSR1, handler)
1226        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
1227
1228        expected_sigs = 0
1229        deadline = time.monotonic() + 15.0
1230
1231        while expected_sigs < N:
1232            # Hopefully the SIGALRM will be received somewhere during
1233            # initial processing of SIGUSR1.
1234            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
1235            os.kill(os.getpid(), signal.SIGUSR1)
1236
1237            expected_sigs += 2
1238            # Wait for handlers to run to avoid signal coalescing
1239            while len(sigs) < expected_sigs and time.monotonic() < deadline:
1240                time.sleep(1e-5)
1241
1242        # All ITIMER_REAL signals should have been delivered to the
1243        # Python handler
1244        self.assertEqual(len(sigs), N, "Some signals were lost")
1245
1246class RaiseSignalTest(unittest.TestCase):
1247
1248    def test_sigint(self):
1249        with self.assertRaises(KeyboardInterrupt):
1250            signal.raise_signal(signal.SIGINT)
1251
1252    @unittest.skipIf(sys.platform != "win32", "Windows specific test")
1253    def test_invalid_argument(self):
1254        try:
1255            SIGHUP = 1 # not supported on win32
1256            signal.raise_signal(SIGHUP)
1257            self.fail("OSError (Invalid argument) expected")
1258        except OSError as e:
1259            if e.errno == errno.EINVAL:
1260                pass
1261            else:
1262                raise
1263
1264    def test_handler(self):
1265        is_ok = False
1266        def handler(a, b):
1267            nonlocal is_ok
1268            is_ok = True
1269        old_signal = signal.signal(signal.SIGINT, handler)
1270        self.addCleanup(signal.signal, signal.SIGINT, old_signal)
1271
1272        signal.raise_signal(signal.SIGINT)
1273        self.assertTrue(is_ok)
1274
1275
1276def tearDownModule():
1277    support.reap_children()
1278
1279if __name__ == "__main__":
1280    unittest.main()
1281