• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from contextlib import closing
4import enum
5import gc
6import pickle
7import select
8import signal
9import socket
10import struct
11import subprocess
12import traceback
13import sys, os, time, errno
14from test.support.script_helper import assert_python_ok, spawn_python
15try:
16    import threading
17except ImportError:
18    threading = None
19try:
20    import _testcapi
21except ImportError:
22    _testcapi = None
23
24
25class GenericTests(unittest.TestCase):
26
27    @unittest.skipIf(threading is None, "test needs threading module")
28    def test_enums(self):
29        for name in dir(signal):
30            sig = getattr(signal, name)
31            if name in {'SIG_DFL', 'SIG_IGN'}:
32                self.assertIsInstance(sig, signal.Handlers)
33            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
34                self.assertIsInstance(sig, signal.Sigmasks)
35            elif name.startswith('SIG') and not name.startswith('SIG_'):
36                self.assertIsInstance(sig, signal.Signals)
37            elif name.startswith('CTRL_'):
38                self.assertIsInstance(sig, signal.Signals)
39                self.assertEqual(sys.platform, "win32")
40
41
42@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
43class PosixTests(unittest.TestCase):
44    def trivial_signal_handler(self, *args):
45        pass
46
47    def test_out_of_range_signal_number_raises_error(self):
48        self.assertRaises(ValueError, signal.getsignal, 4242)
49
50        self.assertRaises(ValueError, signal.signal, 4242,
51                          self.trivial_signal_handler)
52
53    def test_setting_signal_handler_to_none_raises_error(self):
54        self.assertRaises(TypeError, signal.signal,
55                          signal.SIGUSR1, None)
56
57    def test_getsignal(self):
58        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
59        self.assertIsInstance(hup, signal.Handlers)
60        self.assertEqual(signal.getsignal(signal.SIGHUP),
61                         self.trivial_signal_handler)
62        signal.signal(signal.SIGHUP, hup)
63        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
64
65    # Issue 3864, unknown if this affects earlier versions of freebsd also
66    @unittest.skipIf(sys.platform=='freebsd6',
67        'inter process signals not reliable (do not mix well with threading) '
68        'on freebsd6')
69    def test_interprocess_signal(self):
70        dirname = os.path.dirname(__file__)
71        script = os.path.join(dirname, 'signalinterproctester.py')
72        assert_python_ok(script)
73
74
75@unittest.skipUnless(sys.platform == "win32", "Windows specific")
76class WindowsSignalTests(unittest.TestCase):
77    def test_issue9324(self):
78        # Updated for issue #10003, adding SIGBREAK
79        handler = lambda x, y: None
80        checked = set()
81        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
82                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
83                    signal.SIGTERM):
84            # Set and then reset a handler for signals that work on windows.
85            # Issue #18396, only for signals without a C-level handler.
86            if signal.getsignal(sig) is not None:
87                signal.signal(sig, signal.signal(sig, handler))
88                checked.add(sig)
89        # Issue #18396: Ensure the above loop at least tested *something*
90        self.assertTrue(checked)
91
92        with self.assertRaises(ValueError):
93            signal.signal(-1, handler)
94
95        with self.assertRaises(ValueError):
96            signal.signal(7, handler)
97
98
99class WakeupFDTests(unittest.TestCase):
100
101    def test_invalid_fd(self):
102        fd = support.make_bad_fd()
103        self.assertRaises((ValueError, OSError),
104                          signal.set_wakeup_fd, fd)
105
106    def test_invalid_socket(self):
107        sock = socket.socket()
108        fd = sock.fileno()
109        sock.close()
110        self.assertRaises((ValueError, OSError),
111                          signal.set_wakeup_fd, fd)
112
113    def test_set_wakeup_fd_result(self):
114        r1, w1 = os.pipe()
115        self.addCleanup(os.close, r1)
116        self.addCleanup(os.close, w1)
117        r2, w2 = os.pipe()
118        self.addCleanup(os.close, r2)
119        self.addCleanup(os.close, w2)
120
121        if hasattr(os, 'set_blocking'):
122            os.set_blocking(w1, False)
123            os.set_blocking(w2, False)
124
125        signal.set_wakeup_fd(w1)
126        self.assertEqual(signal.set_wakeup_fd(w2), w1)
127        self.assertEqual(signal.set_wakeup_fd(-1), w2)
128        self.assertEqual(signal.set_wakeup_fd(-1), -1)
129
130    def test_set_wakeup_fd_socket_result(self):
131        sock1 = socket.socket()
132        self.addCleanup(sock1.close)
133        sock1.setblocking(False)
134        fd1 = sock1.fileno()
135
136        sock2 = socket.socket()
137        self.addCleanup(sock2.close)
138        sock2.setblocking(False)
139        fd2 = sock2.fileno()
140
141        signal.set_wakeup_fd(fd1)
142        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
143        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
144        self.assertEqual(signal.set_wakeup_fd(-1), -1)
145
146    # On Windows, files are always blocking and Windows does not provide a
147    # function to test if a socket is in non-blocking mode.
148    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
149    def test_set_wakeup_fd_blocking(self):
150        rfd, wfd = os.pipe()
151        self.addCleanup(os.close, rfd)
152        self.addCleanup(os.close, wfd)
153
154        # fd must be non-blocking
155        os.set_blocking(wfd, True)
156        with self.assertRaises(ValueError) as cm:
157            signal.set_wakeup_fd(wfd)
158        self.assertEqual(str(cm.exception),
159                         "the fd %s must be in non-blocking mode" % wfd)
160
161        # non-blocking is ok
162        os.set_blocking(wfd, False)
163        signal.set_wakeup_fd(wfd)
164        signal.set_wakeup_fd(-1)
165
166
167@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
168class WakeupSignalTests(unittest.TestCase):
169    @unittest.skipIf(_testcapi is None, 'need _testcapi')
170    def check_wakeup(self, test_body, *signals, ordered=True):
171        # use a subprocess to have only one thread
172        code = """if 1:
173        import _testcapi
174        import os
175        import signal
176        import struct
177
178        signals = {!r}
179
180        def handler(signum, frame):
181            pass
182
183        def check_signum(signals):
184            data = os.read(read, len(signals)+1)
185            raised = struct.unpack('%uB' % len(data), data)
186            if not {!r}:
187                raised = set(raised)
188                signals = set(signals)
189            if raised != signals:
190                raise Exception("%r != %r" % (raised, signals))
191
192        {}
193
194        signal.signal(signal.SIGALRM, handler)
195        read, write = os.pipe()
196        os.set_blocking(write, False)
197        signal.set_wakeup_fd(write)
198
199        test()
200        check_signum(signals)
201
202        os.close(read)
203        os.close(write)
204        """.format(tuple(map(int, signals)), ordered, test_body)
205
206        assert_python_ok('-c', code)
207
208    @unittest.skipIf(_testcapi is None, 'need _testcapi')
209    def test_wakeup_write_error(self):
210        # Issue #16105: write() errors in the C signal handler should not
211        # pass silently.
212        # Use a subprocess to have only one thread.
213        code = """if 1:
214        import _testcapi
215        import errno
216        import os
217        import signal
218        import sys
219        from test.support import captured_stderr
220
221        def handler(signum, frame):
222            1/0
223
224        signal.signal(signal.SIGALRM, handler)
225        r, w = os.pipe()
226        os.set_blocking(r, False)
227
228        # Set wakeup_fd a read-only file descriptor to trigger the error
229        signal.set_wakeup_fd(r)
230        try:
231            with captured_stderr() as err:
232                _testcapi.raise_signal(signal.SIGALRM)
233        except ZeroDivisionError:
234            # An ignored exception should have been printed out on stderr
235            err = err.getvalue()
236            if ('Exception ignored when trying to write to the signal wakeup fd'
237                not in err):
238                raise AssertionError(err)
239            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
240                raise AssertionError(err)
241        else:
242            raise AssertionError("ZeroDivisionError not raised")
243
244        os.close(r)
245        os.close(w)
246        """
247        r, w = os.pipe()
248        try:
249            os.write(r, b'x')
250        except OSError:
251            pass
252        else:
253            self.skipTest("OS doesn't report write() error on the read end of a pipe")
254        finally:
255            os.close(r)
256            os.close(w)
257
258        assert_python_ok('-c', code)
259
260    def test_wakeup_fd_early(self):
261        self.check_wakeup("""def test():
262            import select
263            import time
264
265            TIMEOUT_FULL = 10
266            TIMEOUT_HALF = 5
267
268            class InterruptSelect(Exception):
269                pass
270
271            def handler(signum, frame):
272                raise InterruptSelect
273            signal.signal(signal.SIGALRM, handler)
274
275            signal.alarm(1)
276
277            # We attempt to get a signal during the sleep,
278            # before select is called
279            try:
280                select.select([], [], [], TIMEOUT_FULL)
281            except InterruptSelect:
282                pass
283            else:
284                raise Exception("select() was not interrupted")
285
286            before_time = time.monotonic()
287            select.select([read], [], [], TIMEOUT_FULL)
288            after_time = time.monotonic()
289            dt = after_time - before_time
290            if dt >= TIMEOUT_HALF:
291                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
292        """, signal.SIGALRM)
293
294    def test_wakeup_fd_during(self):
295        self.check_wakeup("""def test():
296            import select
297            import time
298
299            TIMEOUT_FULL = 10
300            TIMEOUT_HALF = 5
301
302            class InterruptSelect(Exception):
303                pass
304
305            def handler(signum, frame):
306                raise InterruptSelect
307            signal.signal(signal.SIGALRM, handler)
308
309            signal.alarm(1)
310            before_time = time.monotonic()
311            # We attempt to get a signal during the select call
312            try:
313                select.select([read], [], [], TIMEOUT_FULL)
314            except InterruptSelect:
315                pass
316            else:
317                raise Exception("select() was not interrupted")
318            after_time = time.monotonic()
319            dt = after_time - before_time
320            if dt >= TIMEOUT_HALF:
321                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
322        """, signal.SIGALRM)
323
324    def test_signum(self):
325        self.check_wakeup("""def test():
326            import _testcapi
327            signal.signal(signal.SIGUSR1, handler)
328            _testcapi.raise_signal(signal.SIGUSR1)
329            _testcapi.raise_signal(signal.SIGALRM)
330        """, signal.SIGUSR1, signal.SIGALRM)
331
332    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
333                         'need signal.pthread_sigmask()')
334    def test_pending(self):
335        self.check_wakeup("""def test():
336            signum1 = signal.SIGUSR1
337            signum2 = signal.SIGUSR2
338
339            signal.signal(signum1, handler)
340            signal.signal(signum2, handler)
341
342            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
343            _testcapi.raise_signal(signum1)
344            _testcapi.raise_signal(signum2)
345            # Unblocking the 2 signals calls the C signal handler twice
346            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
347        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
348
349
350@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
351class WakeupSocketSignalTests(unittest.TestCase):
352
353    @unittest.skipIf(_testcapi is None, 'need _testcapi')
354    def test_socket(self):
355        # use a subprocess to have only one thread
356        code = """if 1:
357        import signal
358        import socket
359        import struct
360        import _testcapi
361
362        signum = signal.SIGINT
363        signals = (signum,)
364
365        def handler(signum, frame):
366            pass
367
368        signal.signal(signum, handler)
369
370        read, write = socket.socketpair()
371        read.setblocking(False)
372        write.setblocking(False)
373        signal.set_wakeup_fd(write.fileno())
374
375        _testcapi.raise_signal(signum)
376
377        data = read.recv(1)
378        if not data:
379            raise Exception("no signum written")
380        raised = struct.unpack('B', data)
381        if raised != signals:
382            raise Exception("%r != %r" % (raised, signals))
383
384        read.close()
385        write.close()
386        """
387
388        assert_python_ok('-c', code)
389
390    @unittest.skipIf(_testcapi is None, 'need _testcapi')
391    def test_send_error(self):
392        # Use a subprocess to have only one thread.
393        if os.name == 'nt':
394            action = 'send'
395        else:
396            action = 'write'
397        code = """if 1:
398        import errno
399        import signal
400        import socket
401        import sys
402        import time
403        import _testcapi
404        from test.support import captured_stderr
405
406        signum = signal.SIGINT
407
408        def handler(signum, frame):
409            pass
410
411        signal.signal(signum, handler)
412
413        read, write = socket.socketpair()
414        read.setblocking(False)
415        write.setblocking(False)
416
417        signal.set_wakeup_fd(write.fileno())
418
419        # Close sockets: send() will fail
420        read.close()
421        write.close()
422
423        with captured_stderr() as err:
424            _testcapi.raise_signal(signum)
425
426        err = err.getvalue()
427        if ('Exception ignored when trying to {action} to the signal wakeup fd'
428            not in err):
429            raise AssertionError(err)
430        """.format(action=action)
431        assert_python_ok('-c', code)
432
433
434@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
435class SiginterruptTest(unittest.TestCase):
436
437    def readpipe_interrupted(self, interrupt):
438        """Perform a read during which a signal will arrive.  Return True if the
439        read is interrupted by the signal and raises an exception.  Return False
440        if it returns normally.
441        """
442        # use a subprocess to have only one thread, to have a timeout on the
443        # blocking read and to not touch signal handling in this process
444        code = """if 1:
445            import errno
446            import os
447            import signal
448            import sys
449
450            interrupt = %r
451            r, w = os.pipe()
452
453            def handler(signum, frame):
454                1 / 0
455
456            signal.signal(signal.SIGALRM, handler)
457            if interrupt is not None:
458                signal.siginterrupt(signal.SIGALRM, interrupt)
459
460            print("ready")
461            sys.stdout.flush()
462
463            # run the test twice
464            try:
465                for loop in range(2):
466                    # send a SIGALRM in a second (during the read)
467                    signal.alarm(1)
468                    try:
469                        # blocking call: read from a pipe without data
470                        os.read(r, 1)
471                    except ZeroDivisionError:
472                        pass
473                    else:
474                        sys.exit(2)
475                sys.exit(3)
476            finally:
477                os.close(r)
478                os.close(w)
479        """ % (interrupt,)
480        with spawn_python('-c', code) as process:
481            try:
482                # wait until the child process is loaded and has started
483                first_line = process.stdout.readline()
484
485                stdout, stderr = process.communicate(timeout=5.0)
486            except subprocess.TimeoutExpired:
487                process.kill()
488                return False
489            else:
490                stdout = first_line + stdout
491                exitcode = process.wait()
492                if exitcode not in (2, 3):
493                    raise Exception("Child error (exit code %s): %r"
494                                    % (exitcode, stdout))
495                return (exitcode == 3)
496
497    def test_without_siginterrupt(self):
498        # If a signal handler is installed and siginterrupt is not called
499        # at all, when that signal arrives, it interrupts a syscall that's in
500        # progress.
501        interrupted = self.readpipe_interrupted(None)
502        self.assertTrue(interrupted)
503
504    def test_siginterrupt_on(self):
505        # If a signal handler is installed and siginterrupt is called with
506        # a true value for the second argument, when that signal arrives, it
507        # interrupts a syscall that's in progress.
508        interrupted = self.readpipe_interrupted(True)
509        self.assertTrue(interrupted)
510
511    def test_siginterrupt_off(self):
512        # If a signal handler is installed and siginterrupt is called with
513        # a false value for the second argument, when that signal arrives, it
514        # does not interrupt a syscall that's in progress.
515        interrupted = self.readpipe_interrupted(False)
516        self.assertFalse(interrupted)
517
518
519@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
520class ItimerTest(unittest.TestCase):
521    def setUp(self):
522        self.hndl_called = False
523        self.hndl_count = 0
524        self.itimer = None
525        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
526
527    def tearDown(self):
528        signal.signal(signal.SIGALRM, self.old_alarm)
529        if self.itimer is not None: # test_itimer_exc doesn't change this attr
530            # just ensure that itimer is stopped
531            signal.setitimer(self.itimer, 0)
532
533    def sig_alrm(self, *args):
534        self.hndl_called = True
535
536    def sig_vtalrm(self, *args):
537        self.hndl_called = True
538
539        if self.hndl_count > 3:
540            # it shouldn't be here, because it should have been disabled.
541            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
542                "timer.")
543        elif self.hndl_count == 3:
544            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
545            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
546
547        self.hndl_count += 1
548
549    def sig_prof(self, *args):
550        self.hndl_called = True
551        signal.setitimer(signal.ITIMER_PROF, 0)
552
553    def test_itimer_exc(self):
554        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
555        # defines it ?
556        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
557        # Negative times are treated as zero on some platforms.
558        if 0:
559            self.assertRaises(signal.ItimerError,
560                              signal.setitimer, signal.ITIMER_REAL, -1)
561
562    def test_itimer_real(self):
563        self.itimer = signal.ITIMER_REAL
564        signal.setitimer(self.itimer, 1.0)
565        signal.pause()
566        self.assertEqual(self.hndl_called, True)
567
568    # Issue 3864, unknown if this affects earlier versions of freebsd also
569    @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
570        'itimer not reliable (does not mix well with threading) on some BSDs.')
571    def test_itimer_virtual(self):
572        self.itimer = signal.ITIMER_VIRTUAL
573        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
574        signal.setitimer(self.itimer, 0.3, 0.2)
575
576        start_time = time.monotonic()
577        while time.monotonic() - start_time < 60.0:
578            # use up some virtual time by doing real work
579            _ = pow(12345, 67890, 10000019)
580            if signal.getitimer(self.itimer) == (0.0, 0.0):
581                break # sig_vtalrm handler stopped this itimer
582        else: # Issue 8424
583            self.skipTest("timeout: likely cause: machine too slow or load too "
584                          "high")
585
586        # virtual itimer should be (0.0, 0.0) now
587        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
588        # and the handler should have been called
589        self.assertEqual(self.hndl_called, True)
590
591    # Issue 3864, unknown if this affects earlier versions of freebsd also
592    @unittest.skipIf(sys.platform=='freebsd6',
593        'itimer not reliable (does not mix well with threading) on freebsd6')
594    def test_itimer_prof(self):
595        self.itimer = signal.ITIMER_PROF
596        signal.signal(signal.SIGPROF, self.sig_prof)
597        signal.setitimer(self.itimer, 0.2, 0.2)
598
599        start_time = time.monotonic()
600        while time.monotonic() - start_time < 60.0:
601            # do some work
602            _ = pow(12345, 67890, 10000019)
603            if signal.getitimer(self.itimer) == (0.0, 0.0):
604                break # sig_prof handler stopped this itimer
605        else: # Issue 8424
606            self.skipTest("timeout: likely cause: machine too slow or load too "
607                          "high")
608
609        # profiling itimer should be (0.0, 0.0) now
610        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
611        # and the handler should have been called
612        self.assertEqual(self.hndl_called, True)
613
614
615class PendingSignalsTests(unittest.TestCase):
616    """
617    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
618    functions.
619    """
620    @unittest.skipUnless(hasattr(signal, 'sigpending'),
621                         'need signal.sigpending()')
622    def test_sigpending_empty(self):
623        self.assertEqual(signal.sigpending(), set())
624
625    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
626                         'need signal.pthread_sigmask()')
627    @unittest.skipUnless(hasattr(signal, 'sigpending'),
628                         'need signal.sigpending()')
629    def test_sigpending(self):
630        code = """if 1:
631            import os
632            import signal
633
634            def handler(signum, frame):
635                1/0
636
637            signum = signal.SIGUSR1
638            signal.signal(signum, handler)
639
640            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
641            os.kill(os.getpid(), signum)
642            pending = signal.sigpending()
643            for sig in pending:
644                assert isinstance(sig, signal.Signals), repr(pending)
645            if pending != {signum}:
646                raise Exception('%s != {%s}' % (pending, signum))
647            try:
648                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
649            except ZeroDivisionError:
650                pass
651            else:
652                raise Exception("ZeroDivisionError not raised")
653        """
654        assert_python_ok('-c', code)
655
656    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
657                         'need signal.pthread_kill()')
658    def test_pthread_kill(self):
659        code = """if 1:
660            import signal
661            import threading
662            import sys
663
664            signum = signal.SIGUSR1
665
666            def handler(signum, frame):
667                1/0
668
669            signal.signal(signum, handler)
670
671            if sys.platform == 'freebsd6':
672                # Issue #12392 and #12469: send a signal to the main thread
673                # doesn't work before the creation of the first thread on
674                # FreeBSD 6
675                def noop():
676                    pass
677                thread = threading.Thread(target=noop)
678                thread.start()
679                thread.join()
680
681            tid = threading.get_ident()
682            try:
683                signal.pthread_kill(tid, signum)
684            except ZeroDivisionError:
685                pass
686            else:
687                raise Exception("ZeroDivisionError not raised")
688        """
689        assert_python_ok('-c', code)
690
691    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
692                         'need signal.pthread_sigmask()')
693    def wait_helper(self, blocked, test):
694        """
695        test: body of the "def test(signum):" function.
696        blocked: number of the blocked signal
697        """
698        code = '''if 1:
699        import signal
700        import sys
701        from signal import Signals
702
703        def handler(signum, frame):
704            1/0
705
706        %s
707
708        blocked = %s
709        signum = signal.SIGALRM
710
711        # child: block and wait the signal
712        try:
713            signal.signal(signum, handler)
714            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
715
716            # Do the tests
717            test(signum)
718
719            # The handler must not be called on unblock
720            try:
721                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
722            except ZeroDivisionError:
723                print("the signal handler has been called",
724                      file=sys.stderr)
725                sys.exit(1)
726        except BaseException as err:
727            print("error: {}".format(err), file=sys.stderr)
728            sys.stderr.flush()
729            sys.exit(1)
730        ''' % (test.strip(), blocked)
731
732        # sig*wait* must be called with the signal blocked: since the current
733        # process might have several threads running, use a subprocess to have
734        # a single thread.
735        assert_python_ok('-c', code)
736
737    @unittest.skipUnless(hasattr(signal, 'sigwait'),
738                         'need signal.sigwait()')
739    def test_sigwait(self):
740        self.wait_helper(signal.SIGALRM, '''
741        def test(signum):
742            signal.alarm(1)
743            received = signal.sigwait([signum])
744            assert isinstance(received, signal.Signals), received
745            if received != signum:
746                raise Exception('received %s, not %s' % (received, signum))
747        ''')
748
749    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
750                         'need signal.sigwaitinfo()')
751    def test_sigwaitinfo(self):
752        self.wait_helper(signal.SIGALRM, '''
753        def test(signum):
754            signal.alarm(1)
755            info = signal.sigwaitinfo([signum])
756            if info.si_signo != signum:
757                raise Exception("info.si_signo != %s" % signum)
758        ''')
759
760    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
761                         'need signal.sigtimedwait()')
762    def test_sigtimedwait(self):
763        self.wait_helper(signal.SIGALRM, '''
764        def test(signum):
765            signal.alarm(1)
766            info = signal.sigtimedwait([signum], 10.1000)
767            if info.si_signo != signum:
768                raise Exception('info.si_signo != %s' % signum)
769        ''')
770
771    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
772                         'need signal.sigtimedwait()')
773    def test_sigtimedwait_poll(self):
774        # check that polling with sigtimedwait works
775        self.wait_helper(signal.SIGALRM, '''
776        def test(signum):
777            import os
778            os.kill(os.getpid(), signum)
779            info = signal.sigtimedwait([signum], 0)
780            if info.si_signo != signum:
781                raise Exception('info.si_signo != %s' % signum)
782        ''')
783
784    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
785                         'need signal.sigtimedwait()')
786    def test_sigtimedwait_timeout(self):
787        self.wait_helper(signal.SIGALRM, '''
788        def test(signum):
789            received = signal.sigtimedwait([signum], 1.0)
790            if received is not None:
791                raise Exception("received=%r" % (received,))
792        ''')
793
794    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
795                         'need signal.sigtimedwait()')
796    def test_sigtimedwait_negative_timeout(self):
797        signum = signal.SIGALRM
798        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
799
800    @unittest.skipUnless(hasattr(signal, 'sigwait'),
801                         'need signal.sigwait()')
802    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
803                         'need signal.pthread_sigmask()')
804    @unittest.skipIf(threading is None, "test needs threading module")
805    def test_sigwait_thread(self):
806        # Check that calling sigwait() from a thread doesn't suspend the whole
807        # process. A new interpreter is spawned to avoid problems when mixing
808        # threads and fork(): only async-safe functions are allowed between
809        # fork() and exec().
810        assert_python_ok("-c", """if True:
811            import os, threading, sys, time, signal
812
813            # the default handler terminates the process
814            signum = signal.SIGUSR1
815
816            def kill_later():
817                # wait until the main thread is waiting in sigwait()
818                time.sleep(1)
819                os.kill(os.getpid(), signum)
820
821            # the signal must be blocked by all the threads
822            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
823            killer = threading.Thread(target=kill_later)
824            killer.start()
825            received = signal.sigwait([signum])
826            if received != signum:
827                print("sigwait() received %s, not %s" % (received, signum),
828                      file=sys.stderr)
829                sys.exit(1)
830            killer.join()
831            # unblock the signal, which should have been cleared by sigwait()
832            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
833        """)
834
835    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
836                         'need signal.pthread_sigmask()')
837    def test_pthread_sigmask_arguments(self):
838        self.assertRaises(TypeError, signal.pthread_sigmask)
839        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
840        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
841        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
842
843    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
844                         'need signal.pthread_sigmask()')
845    def test_pthread_sigmask(self):
846        code = """if 1:
847        import signal
848        import os; import threading
849
850        def handler(signum, frame):
851            1/0
852
853        def kill(signum):
854            os.kill(os.getpid(), signum)
855
856        def check_mask(mask):
857            for sig in mask:
858                assert isinstance(sig, signal.Signals), repr(sig)
859
860        def read_sigmask():
861            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
862            check_mask(sigmask)
863            return sigmask
864
865        signum = signal.SIGUSR1
866
867        # Install our signal handler
868        old_handler = signal.signal(signum, handler)
869
870        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
871        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
872        check_mask(old_mask)
873        try:
874            kill(signum)
875        except ZeroDivisionError:
876            pass
877        else:
878            raise Exception("ZeroDivisionError not raised")
879
880        # Block and then raise SIGUSR1. The signal is blocked: the signal
881        # handler is not called, and the signal is now pending
882        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
883        check_mask(mask)
884        kill(signum)
885
886        # Check the new mask
887        blocked = read_sigmask()
888        check_mask(blocked)
889        if signum not in blocked:
890            raise Exception("%s not in %s" % (signum, blocked))
891        if old_mask ^ blocked != {signum}:
892            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
893
894        # Unblock SIGUSR1
895        try:
896            # unblock the pending signal calls immediately the signal handler
897            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
898        except ZeroDivisionError:
899            pass
900        else:
901            raise Exception("ZeroDivisionError not raised")
902        try:
903            kill(signum)
904        except ZeroDivisionError:
905            pass
906        else:
907            raise Exception("ZeroDivisionError not raised")
908
909        # Check the new mask
910        unblocked = read_sigmask()
911        if signum in unblocked:
912            raise Exception("%s in %s" % (signum, unblocked))
913        if blocked ^ unblocked != {signum}:
914            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
915        if old_mask != unblocked:
916            raise Exception("%s != %s" % (old_mask, unblocked))
917        """
918        assert_python_ok('-c', code)
919
920    @unittest.skipIf(sys.platform == 'freebsd6',
921        "issue #12392: send a signal to the main thread doesn't work "
922        "before the creation of the first thread on FreeBSD 6")
923    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
924                         'need signal.pthread_kill()')
925    def test_pthread_kill_main_thread(self):
926        # Test that a signal can be sent to the main thread with pthread_kill()
927        # before any other thread has been created (see issue #12392).
928        code = """if True:
929            import threading
930            import signal
931            import sys
932
933            def handler(signum, frame):
934                sys.exit(3)
935
936            signal.signal(signal.SIGUSR1, handler)
937            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
938            sys.exit(2)
939        """
940
941        with spawn_python('-c', code) as process:
942            stdout, stderr = process.communicate()
943            exitcode = process.wait()
944            if exitcode != 3:
945                raise Exception("Child error (exit code %s): %s" %
946                                (exitcode, stdout))
947
948
949def tearDownModule():
950    support.reap_children()
951
952if __name__ == "__main__":
953    unittest.main()
954