1import unittest 2from test import support 3from contextlib import closing 4import enum 5import gc 6import pickle 7import select 8import signal 9import socket 10import struct 11import subprocess 12import traceback 13import sys, os, time, errno 14from test.support.script_helper import assert_python_ok, spawn_python 15try: 16 import threading 17except ImportError: 18 threading = None 19try: 20 import _testcapi 21except ImportError: 22 _testcapi = None 23 24 25class GenericTests(unittest.TestCase): 26 27 @unittest.skipIf(threading is None, "test needs threading module") 28 def test_enums(self): 29 for name in dir(signal): 30 sig = getattr(signal, name) 31 if name in {'SIG_DFL', 'SIG_IGN'}: 32 self.assertIsInstance(sig, signal.Handlers) 33 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 34 self.assertIsInstance(sig, signal.Sigmasks) 35 elif name.startswith('SIG') and not name.startswith('SIG_'): 36 self.assertIsInstance(sig, signal.Signals) 37 elif name.startswith('CTRL_'): 38 self.assertIsInstance(sig, signal.Signals) 39 self.assertEqual(sys.platform, "win32") 40 41 42@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 43class PosixTests(unittest.TestCase): 44 def trivial_signal_handler(self, *args): 45 pass 46 47 def test_out_of_range_signal_number_raises_error(self): 48 self.assertRaises(ValueError, signal.getsignal, 4242) 49 50 self.assertRaises(ValueError, signal.signal, 4242, 51 self.trivial_signal_handler) 52 53 def test_setting_signal_handler_to_none_raises_error(self): 54 self.assertRaises(TypeError, signal.signal, 55 signal.SIGUSR1, None) 56 57 def test_getsignal(self): 58 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 59 self.assertIsInstance(hup, signal.Handlers) 60 self.assertEqual(signal.getsignal(signal.SIGHUP), 61 self.trivial_signal_handler) 62 signal.signal(signal.SIGHUP, hup) 63 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 64 65 # Issue 3864, unknown if this affects earlier versions of freebsd also 66 @unittest.skipIf(sys.platform=='freebsd6', 67 'inter process signals not reliable (do not mix well with threading) ' 68 'on freebsd6') 69 def test_interprocess_signal(self): 70 dirname = os.path.dirname(__file__) 71 script = os.path.join(dirname, 'signalinterproctester.py') 72 assert_python_ok(script) 73 74 75@unittest.skipUnless(sys.platform == "win32", "Windows specific") 76class WindowsSignalTests(unittest.TestCase): 77 def test_issue9324(self): 78 # Updated for issue #10003, adding SIGBREAK 79 handler = lambda x, y: None 80 checked = set() 81 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 82 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 83 signal.SIGTERM): 84 # Set and then reset a handler for signals that work on windows. 85 # Issue #18396, only for signals without a C-level handler. 86 if signal.getsignal(sig) is not None: 87 signal.signal(sig, signal.signal(sig, handler)) 88 checked.add(sig) 89 # Issue #18396: Ensure the above loop at least tested *something* 90 self.assertTrue(checked) 91 92 with self.assertRaises(ValueError): 93 signal.signal(-1, handler) 94 95 with self.assertRaises(ValueError): 96 signal.signal(7, handler) 97 98 99class WakeupFDTests(unittest.TestCase): 100 101 def test_invalid_fd(self): 102 fd = support.make_bad_fd() 103 self.assertRaises((ValueError, OSError), 104 signal.set_wakeup_fd, fd) 105 106 def test_invalid_socket(self): 107 sock = socket.socket() 108 fd = sock.fileno() 109 sock.close() 110 self.assertRaises((ValueError, OSError), 111 signal.set_wakeup_fd, fd) 112 113 def test_set_wakeup_fd_result(self): 114 r1, w1 = os.pipe() 115 self.addCleanup(os.close, r1) 116 self.addCleanup(os.close, w1) 117 r2, w2 = os.pipe() 118 self.addCleanup(os.close, r2) 119 self.addCleanup(os.close, w2) 120 121 if hasattr(os, 'set_blocking'): 122 os.set_blocking(w1, False) 123 os.set_blocking(w2, False) 124 125 signal.set_wakeup_fd(w1) 126 self.assertEqual(signal.set_wakeup_fd(w2), w1) 127 self.assertEqual(signal.set_wakeup_fd(-1), w2) 128 self.assertEqual(signal.set_wakeup_fd(-1), -1) 129 130 def test_set_wakeup_fd_socket_result(self): 131 sock1 = socket.socket() 132 self.addCleanup(sock1.close) 133 sock1.setblocking(False) 134 fd1 = sock1.fileno() 135 136 sock2 = socket.socket() 137 self.addCleanup(sock2.close) 138 sock2.setblocking(False) 139 fd2 = sock2.fileno() 140 141 signal.set_wakeup_fd(fd1) 142 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 143 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 144 self.assertEqual(signal.set_wakeup_fd(-1), -1) 145 146 # On Windows, files are always blocking and Windows does not provide a 147 # function to test if a socket is in non-blocking mode. 148 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 149 def test_set_wakeup_fd_blocking(self): 150 rfd, wfd = os.pipe() 151 self.addCleanup(os.close, rfd) 152 self.addCleanup(os.close, wfd) 153 154 # fd must be non-blocking 155 os.set_blocking(wfd, True) 156 with self.assertRaises(ValueError) as cm: 157 signal.set_wakeup_fd(wfd) 158 self.assertEqual(str(cm.exception), 159 "the fd %s must be in non-blocking mode" % wfd) 160 161 # non-blocking is ok 162 os.set_blocking(wfd, False) 163 signal.set_wakeup_fd(wfd) 164 signal.set_wakeup_fd(-1) 165 166 167@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 168class WakeupSignalTests(unittest.TestCase): 169 @unittest.skipIf(_testcapi is None, 'need _testcapi') 170 def check_wakeup(self, test_body, *signals, ordered=True): 171 # use a subprocess to have only one thread 172 code = """if 1: 173 import _testcapi 174 import os 175 import signal 176 import struct 177 178 signals = {!r} 179 180 def handler(signum, frame): 181 pass 182 183 def check_signum(signals): 184 data = os.read(read, len(signals)+1) 185 raised = struct.unpack('%uB' % len(data), data) 186 if not {!r}: 187 raised = set(raised) 188 signals = set(signals) 189 if raised != signals: 190 raise Exception("%r != %r" % (raised, signals)) 191 192 {} 193 194 signal.signal(signal.SIGALRM, handler) 195 read, write = os.pipe() 196 os.set_blocking(write, False) 197 signal.set_wakeup_fd(write) 198 199 test() 200 check_signum(signals) 201 202 os.close(read) 203 os.close(write) 204 """.format(tuple(map(int, signals)), ordered, test_body) 205 206 assert_python_ok('-c', code) 207 208 @unittest.skipIf(_testcapi is None, 'need _testcapi') 209 def test_wakeup_write_error(self): 210 # Issue #16105: write() errors in the C signal handler should not 211 # pass silently. 212 # Use a subprocess to have only one thread. 213 code = """if 1: 214 import _testcapi 215 import errno 216 import os 217 import signal 218 import sys 219 from test.support import captured_stderr 220 221 def handler(signum, frame): 222 1/0 223 224 signal.signal(signal.SIGALRM, handler) 225 r, w = os.pipe() 226 os.set_blocking(r, False) 227 228 # Set wakeup_fd a read-only file descriptor to trigger the error 229 signal.set_wakeup_fd(r) 230 try: 231 with captured_stderr() as err: 232 _testcapi.raise_signal(signal.SIGALRM) 233 except ZeroDivisionError: 234 # An ignored exception should have been printed out on stderr 235 err = err.getvalue() 236 if ('Exception ignored when trying to write to the signal wakeup fd' 237 not in err): 238 raise AssertionError(err) 239 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 240 raise AssertionError(err) 241 else: 242 raise AssertionError("ZeroDivisionError not raised") 243 244 os.close(r) 245 os.close(w) 246 """ 247 r, w = os.pipe() 248 try: 249 os.write(r, b'x') 250 except OSError: 251 pass 252 else: 253 self.skipTest("OS doesn't report write() error on the read end of a pipe") 254 finally: 255 os.close(r) 256 os.close(w) 257 258 assert_python_ok('-c', code) 259 260 def test_wakeup_fd_early(self): 261 self.check_wakeup("""def test(): 262 import select 263 import time 264 265 TIMEOUT_FULL = 10 266 TIMEOUT_HALF = 5 267 268 class InterruptSelect(Exception): 269 pass 270 271 def handler(signum, frame): 272 raise InterruptSelect 273 signal.signal(signal.SIGALRM, handler) 274 275 signal.alarm(1) 276 277 # We attempt to get a signal during the sleep, 278 # before select is called 279 try: 280 select.select([], [], [], TIMEOUT_FULL) 281 except InterruptSelect: 282 pass 283 else: 284 raise Exception("select() was not interrupted") 285 286 before_time = time.monotonic() 287 select.select([read], [], [], TIMEOUT_FULL) 288 after_time = time.monotonic() 289 dt = after_time - before_time 290 if dt >= TIMEOUT_HALF: 291 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 292 """, signal.SIGALRM) 293 294 def test_wakeup_fd_during(self): 295 self.check_wakeup("""def test(): 296 import select 297 import time 298 299 TIMEOUT_FULL = 10 300 TIMEOUT_HALF = 5 301 302 class InterruptSelect(Exception): 303 pass 304 305 def handler(signum, frame): 306 raise InterruptSelect 307 signal.signal(signal.SIGALRM, handler) 308 309 signal.alarm(1) 310 before_time = time.monotonic() 311 # We attempt to get a signal during the select call 312 try: 313 select.select([read], [], [], TIMEOUT_FULL) 314 except InterruptSelect: 315 pass 316 else: 317 raise Exception("select() was not interrupted") 318 after_time = time.monotonic() 319 dt = after_time - before_time 320 if dt >= TIMEOUT_HALF: 321 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 322 """, signal.SIGALRM) 323 324 def test_signum(self): 325 self.check_wakeup("""def test(): 326 import _testcapi 327 signal.signal(signal.SIGUSR1, handler) 328 _testcapi.raise_signal(signal.SIGUSR1) 329 _testcapi.raise_signal(signal.SIGALRM) 330 """, signal.SIGUSR1, signal.SIGALRM) 331 332 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 333 'need signal.pthread_sigmask()') 334 def test_pending(self): 335 self.check_wakeup("""def test(): 336 signum1 = signal.SIGUSR1 337 signum2 = signal.SIGUSR2 338 339 signal.signal(signum1, handler) 340 signal.signal(signum2, handler) 341 342 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 343 _testcapi.raise_signal(signum1) 344 _testcapi.raise_signal(signum2) 345 # Unblocking the 2 signals calls the C signal handler twice 346 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 347 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 348 349 350@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 351class WakeupSocketSignalTests(unittest.TestCase): 352 353 @unittest.skipIf(_testcapi is None, 'need _testcapi') 354 def test_socket(self): 355 # use a subprocess to have only one thread 356 code = """if 1: 357 import signal 358 import socket 359 import struct 360 import _testcapi 361 362 signum = signal.SIGINT 363 signals = (signum,) 364 365 def handler(signum, frame): 366 pass 367 368 signal.signal(signum, handler) 369 370 read, write = socket.socketpair() 371 read.setblocking(False) 372 write.setblocking(False) 373 signal.set_wakeup_fd(write.fileno()) 374 375 _testcapi.raise_signal(signum) 376 377 data = read.recv(1) 378 if not data: 379 raise Exception("no signum written") 380 raised = struct.unpack('B', data) 381 if raised != signals: 382 raise Exception("%r != %r" % (raised, signals)) 383 384 read.close() 385 write.close() 386 """ 387 388 assert_python_ok('-c', code) 389 390 @unittest.skipIf(_testcapi is None, 'need _testcapi') 391 def test_send_error(self): 392 # Use a subprocess to have only one thread. 393 if os.name == 'nt': 394 action = 'send' 395 else: 396 action = 'write' 397 code = """if 1: 398 import errno 399 import signal 400 import socket 401 import sys 402 import time 403 import _testcapi 404 from test.support import captured_stderr 405 406 signum = signal.SIGINT 407 408 def handler(signum, frame): 409 pass 410 411 signal.signal(signum, handler) 412 413 read, write = socket.socketpair() 414 read.setblocking(False) 415 write.setblocking(False) 416 417 signal.set_wakeup_fd(write.fileno()) 418 419 # Close sockets: send() will fail 420 read.close() 421 write.close() 422 423 with captured_stderr() as err: 424 _testcapi.raise_signal(signum) 425 426 err = err.getvalue() 427 if ('Exception ignored when trying to {action} to the signal wakeup fd' 428 not in err): 429 raise AssertionError(err) 430 """.format(action=action) 431 assert_python_ok('-c', code) 432 433 434@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 435class SiginterruptTest(unittest.TestCase): 436 437 def readpipe_interrupted(self, interrupt): 438 """Perform a read during which a signal will arrive. Return True if the 439 read is interrupted by the signal and raises an exception. Return False 440 if it returns normally. 441 """ 442 # use a subprocess to have only one thread, to have a timeout on the 443 # blocking read and to not touch signal handling in this process 444 code = """if 1: 445 import errno 446 import os 447 import signal 448 import sys 449 450 interrupt = %r 451 r, w = os.pipe() 452 453 def handler(signum, frame): 454 1 / 0 455 456 signal.signal(signal.SIGALRM, handler) 457 if interrupt is not None: 458 signal.siginterrupt(signal.SIGALRM, interrupt) 459 460 print("ready") 461 sys.stdout.flush() 462 463 # run the test twice 464 try: 465 for loop in range(2): 466 # send a SIGALRM in a second (during the read) 467 signal.alarm(1) 468 try: 469 # blocking call: read from a pipe without data 470 os.read(r, 1) 471 except ZeroDivisionError: 472 pass 473 else: 474 sys.exit(2) 475 sys.exit(3) 476 finally: 477 os.close(r) 478 os.close(w) 479 """ % (interrupt,) 480 with spawn_python('-c', code) as process: 481 try: 482 # wait until the child process is loaded and has started 483 first_line = process.stdout.readline() 484 485 stdout, stderr = process.communicate(timeout=5.0) 486 except subprocess.TimeoutExpired: 487 process.kill() 488 return False 489 else: 490 stdout = first_line + stdout 491 exitcode = process.wait() 492 if exitcode not in (2, 3): 493 raise Exception("Child error (exit code %s): %r" 494 % (exitcode, stdout)) 495 return (exitcode == 3) 496 497 def test_without_siginterrupt(self): 498 # If a signal handler is installed and siginterrupt is not called 499 # at all, when that signal arrives, it interrupts a syscall that's in 500 # progress. 501 interrupted = self.readpipe_interrupted(None) 502 self.assertTrue(interrupted) 503 504 def test_siginterrupt_on(self): 505 # If a signal handler is installed and siginterrupt is called with 506 # a true value for the second argument, when that signal arrives, it 507 # interrupts a syscall that's in progress. 508 interrupted = self.readpipe_interrupted(True) 509 self.assertTrue(interrupted) 510 511 def test_siginterrupt_off(self): 512 # If a signal handler is installed and siginterrupt is called with 513 # a false value for the second argument, when that signal arrives, it 514 # does not interrupt a syscall that's in progress. 515 interrupted = self.readpipe_interrupted(False) 516 self.assertFalse(interrupted) 517 518 519@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 520class ItimerTest(unittest.TestCase): 521 def setUp(self): 522 self.hndl_called = False 523 self.hndl_count = 0 524 self.itimer = None 525 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 526 527 def tearDown(self): 528 signal.signal(signal.SIGALRM, self.old_alarm) 529 if self.itimer is not None: # test_itimer_exc doesn't change this attr 530 # just ensure that itimer is stopped 531 signal.setitimer(self.itimer, 0) 532 533 def sig_alrm(self, *args): 534 self.hndl_called = True 535 536 def sig_vtalrm(self, *args): 537 self.hndl_called = True 538 539 if self.hndl_count > 3: 540 # it shouldn't be here, because it should have been disabled. 541 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 542 "timer.") 543 elif self.hndl_count == 3: 544 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 545 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 546 547 self.hndl_count += 1 548 549 def sig_prof(self, *args): 550 self.hndl_called = True 551 signal.setitimer(signal.ITIMER_PROF, 0) 552 553 def test_itimer_exc(self): 554 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 555 # defines it ? 556 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 557 # Negative times are treated as zero on some platforms. 558 if 0: 559 self.assertRaises(signal.ItimerError, 560 signal.setitimer, signal.ITIMER_REAL, -1) 561 562 def test_itimer_real(self): 563 self.itimer = signal.ITIMER_REAL 564 signal.setitimer(self.itimer, 1.0) 565 signal.pause() 566 self.assertEqual(self.hndl_called, True) 567 568 # Issue 3864, unknown if this affects earlier versions of freebsd also 569 @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'), 570 'itimer not reliable (does not mix well with threading) on some BSDs.') 571 def test_itimer_virtual(self): 572 self.itimer = signal.ITIMER_VIRTUAL 573 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 574 signal.setitimer(self.itimer, 0.3, 0.2) 575 576 start_time = time.monotonic() 577 while time.monotonic() - start_time < 60.0: 578 # use up some virtual time by doing real work 579 _ = pow(12345, 67890, 10000019) 580 if signal.getitimer(self.itimer) == (0.0, 0.0): 581 break # sig_vtalrm handler stopped this itimer 582 else: # Issue 8424 583 self.skipTest("timeout: likely cause: machine too slow or load too " 584 "high") 585 586 # virtual itimer should be (0.0, 0.0) now 587 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 588 # and the handler should have been called 589 self.assertEqual(self.hndl_called, True) 590 591 # Issue 3864, unknown if this affects earlier versions of freebsd also 592 @unittest.skipIf(sys.platform=='freebsd6', 593 'itimer not reliable (does not mix well with threading) on freebsd6') 594 def test_itimer_prof(self): 595 self.itimer = signal.ITIMER_PROF 596 signal.signal(signal.SIGPROF, self.sig_prof) 597 signal.setitimer(self.itimer, 0.2, 0.2) 598 599 start_time = time.monotonic() 600 while time.monotonic() - start_time < 60.0: 601 # do some work 602 _ = pow(12345, 67890, 10000019) 603 if signal.getitimer(self.itimer) == (0.0, 0.0): 604 break # sig_prof handler stopped this itimer 605 else: # Issue 8424 606 self.skipTest("timeout: likely cause: machine too slow or load too " 607 "high") 608 609 # profiling itimer should be (0.0, 0.0) now 610 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 611 # and the handler should have been called 612 self.assertEqual(self.hndl_called, True) 613 614 615class PendingSignalsTests(unittest.TestCase): 616 """ 617 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 618 functions. 619 """ 620 @unittest.skipUnless(hasattr(signal, 'sigpending'), 621 'need signal.sigpending()') 622 def test_sigpending_empty(self): 623 self.assertEqual(signal.sigpending(), set()) 624 625 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 626 'need signal.pthread_sigmask()') 627 @unittest.skipUnless(hasattr(signal, 'sigpending'), 628 'need signal.sigpending()') 629 def test_sigpending(self): 630 code = """if 1: 631 import os 632 import signal 633 634 def handler(signum, frame): 635 1/0 636 637 signum = signal.SIGUSR1 638 signal.signal(signum, handler) 639 640 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 641 os.kill(os.getpid(), signum) 642 pending = signal.sigpending() 643 for sig in pending: 644 assert isinstance(sig, signal.Signals), repr(pending) 645 if pending != {signum}: 646 raise Exception('%s != {%s}' % (pending, signum)) 647 try: 648 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 649 except ZeroDivisionError: 650 pass 651 else: 652 raise Exception("ZeroDivisionError not raised") 653 """ 654 assert_python_ok('-c', code) 655 656 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 657 'need signal.pthread_kill()') 658 def test_pthread_kill(self): 659 code = """if 1: 660 import signal 661 import threading 662 import sys 663 664 signum = signal.SIGUSR1 665 666 def handler(signum, frame): 667 1/0 668 669 signal.signal(signum, handler) 670 671 if sys.platform == 'freebsd6': 672 # Issue #12392 and #12469: send a signal to the main thread 673 # doesn't work before the creation of the first thread on 674 # FreeBSD 6 675 def noop(): 676 pass 677 thread = threading.Thread(target=noop) 678 thread.start() 679 thread.join() 680 681 tid = threading.get_ident() 682 try: 683 signal.pthread_kill(tid, signum) 684 except ZeroDivisionError: 685 pass 686 else: 687 raise Exception("ZeroDivisionError not raised") 688 """ 689 assert_python_ok('-c', code) 690 691 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 692 'need signal.pthread_sigmask()') 693 def wait_helper(self, blocked, test): 694 """ 695 test: body of the "def test(signum):" function. 696 blocked: number of the blocked signal 697 """ 698 code = '''if 1: 699 import signal 700 import sys 701 from signal import Signals 702 703 def handler(signum, frame): 704 1/0 705 706 %s 707 708 blocked = %s 709 signum = signal.SIGALRM 710 711 # child: block and wait the signal 712 try: 713 signal.signal(signum, handler) 714 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 715 716 # Do the tests 717 test(signum) 718 719 # The handler must not be called on unblock 720 try: 721 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 722 except ZeroDivisionError: 723 print("the signal handler has been called", 724 file=sys.stderr) 725 sys.exit(1) 726 except BaseException as err: 727 print("error: {}".format(err), file=sys.stderr) 728 sys.stderr.flush() 729 sys.exit(1) 730 ''' % (test.strip(), blocked) 731 732 # sig*wait* must be called with the signal blocked: since the current 733 # process might have several threads running, use a subprocess to have 734 # a single thread. 735 assert_python_ok('-c', code) 736 737 @unittest.skipUnless(hasattr(signal, 'sigwait'), 738 'need signal.sigwait()') 739 def test_sigwait(self): 740 self.wait_helper(signal.SIGALRM, ''' 741 def test(signum): 742 signal.alarm(1) 743 received = signal.sigwait([signum]) 744 assert isinstance(received, signal.Signals), received 745 if received != signum: 746 raise Exception('received %s, not %s' % (received, signum)) 747 ''') 748 749 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 750 'need signal.sigwaitinfo()') 751 def test_sigwaitinfo(self): 752 self.wait_helper(signal.SIGALRM, ''' 753 def test(signum): 754 signal.alarm(1) 755 info = signal.sigwaitinfo([signum]) 756 if info.si_signo != signum: 757 raise Exception("info.si_signo != %s" % signum) 758 ''') 759 760 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 761 'need signal.sigtimedwait()') 762 def test_sigtimedwait(self): 763 self.wait_helper(signal.SIGALRM, ''' 764 def test(signum): 765 signal.alarm(1) 766 info = signal.sigtimedwait([signum], 10.1000) 767 if info.si_signo != signum: 768 raise Exception('info.si_signo != %s' % signum) 769 ''') 770 771 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 772 'need signal.sigtimedwait()') 773 def test_sigtimedwait_poll(self): 774 # check that polling with sigtimedwait works 775 self.wait_helper(signal.SIGALRM, ''' 776 def test(signum): 777 import os 778 os.kill(os.getpid(), signum) 779 info = signal.sigtimedwait([signum], 0) 780 if info.si_signo != signum: 781 raise Exception('info.si_signo != %s' % signum) 782 ''') 783 784 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 785 'need signal.sigtimedwait()') 786 def test_sigtimedwait_timeout(self): 787 self.wait_helper(signal.SIGALRM, ''' 788 def test(signum): 789 received = signal.sigtimedwait([signum], 1.0) 790 if received is not None: 791 raise Exception("received=%r" % (received,)) 792 ''') 793 794 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 795 'need signal.sigtimedwait()') 796 def test_sigtimedwait_negative_timeout(self): 797 signum = signal.SIGALRM 798 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 799 800 @unittest.skipUnless(hasattr(signal, 'sigwait'), 801 'need signal.sigwait()') 802 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 803 'need signal.pthread_sigmask()') 804 @unittest.skipIf(threading is None, "test needs threading module") 805 def test_sigwait_thread(self): 806 # Check that calling sigwait() from a thread doesn't suspend the whole 807 # process. A new interpreter is spawned to avoid problems when mixing 808 # threads and fork(): only async-safe functions are allowed between 809 # fork() and exec(). 810 assert_python_ok("-c", """if True: 811 import os, threading, sys, time, signal 812 813 # the default handler terminates the process 814 signum = signal.SIGUSR1 815 816 def kill_later(): 817 # wait until the main thread is waiting in sigwait() 818 time.sleep(1) 819 os.kill(os.getpid(), signum) 820 821 # the signal must be blocked by all the threads 822 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 823 killer = threading.Thread(target=kill_later) 824 killer.start() 825 received = signal.sigwait([signum]) 826 if received != signum: 827 print("sigwait() received %s, not %s" % (received, signum), 828 file=sys.stderr) 829 sys.exit(1) 830 killer.join() 831 # unblock the signal, which should have been cleared by sigwait() 832 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 833 """) 834 835 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 836 'need signal.pthread_sigmask()') 837 def test_pthread_sigmask_arguments(self): 838 self.assertRaises(TypeError, signal.pthread_sigmask) 839 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 840 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 841 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 842 843 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 844 'need signal.pthread_sigmask()') 845 def test_pthread_sigmask(self): 846 code = """if 1: 847 import signal 848 import os; import threading 849 850 def handler(signum, frame): 851 1/0 852 853 def kill(signum): 854 os.kill(os.getpid(), signum) 855 856 def check_mask(mask): 857 for sig in mask: 858 assert isinstance(sig, signal.Signals), repr(sig) 859 860 def read_sigmask(): 861 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 862 check_mask(sigmask) 863 return sigmask 864 865 signum = signal.SIGUSR1 866 867 # Install our signal handler 868 old_handler = signal.signal(signum, handler) 869 870 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 871 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 872 check_mask(old_mask) 873 try: 874 kill(signum) 875 except ZeroDivisionError: 876 pass 877 else: 878 raise Exception("ZeroDivisionError not raised") 879 880 # Block and then raise SIGUSR1. The signal is blocked: the signal 881 # handler is not called, and the signal is now pending 882 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 883 check_mask(mask) 884 kill(signum) 885 886 # Check the new mask 887 blocked = read_sigmask() 888 check_mask(blocked) 889 if signum not in blocked: 890 raise Exception("%s not in %s" % (signum, blocked)) 891 if old_mask ^ blocked != {signum}: 892 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 893 894 # Unblock SIGUSR1 895 try: 896 # unblock the pending signal calls immediately the signal handler 897 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 898 except ZeroDivisionError: 899 pass 900 else: 901 raise Exception("ZeroDivisionError not raised") 902 try: 903 kill(signum) 904 except ZeroDivisionError: 905 pass 906 else: 907 raise Exception("ZeroDivisionError not raised") 908 909 # Check the new mask 910 unblocked = read_sigmask() 911 if signum in unblocked: 912 raise Exception("%s in %s" % (signum, unblocked)) 913 if blocked ^ unblocked != {signum}: 914 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 915 if old_mask != unblocked: 916 raise Exception("%s != %s" % (old_mask, unblocked)) 917 """ 918 assert_python_ok('-c', code) 919 920 @unittest.skipIf(sys.platform == 'freebsd6', 921 "issue #12392: send a signal to the main thread doesn't work " 922 "before the creation of the first thread on FreeBSD 6") 923 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 924 'need signal.pthread_kill()') 925 def test_pthread_kill_main_thread(self): 926 # Test that a signal can be sent to the main thread with pthread_kill() 927 # before any other thread has been created (see issue #12392). 928 code = """if True: 929 import threading 930 import signal 931 import sys 932 933 def handler(signum, frame): 934 sys.exit(3) 935 936 signal.signal(signal.SIGUSR1, handler) 937 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 938 sys.exit(2) 939 """ 940 941 with spawn_python('-c', code) as process: 942 stdout, stderr = process.communicate() 943 exitcode = process.wait() 944 if exitcode != 3: 945 raise Exception("Child error (exit code %s): %s" % 946 (exitcode, stdout)) 947 948 949def tearDownModule(): 950 support.reap_children() 951 952if __name__ == "__main__": 953 unittest.main() 954