1import errno 2import os 3import random 4import signal 5import socket 6import statistics 7import subprocess 8import sys 9import time 10import unittest 11from test import support 12from test.support.script_helper import assert_python_ok, spawn_python 13try: 14 import _testcapi 15except ImportError: 16 _testcapi = None 17 18 19class GenericTests(unittest.TestCase): 20 21 def test_enums(self): 22 for name in dir(signal): 23 sig = getattr(signal, name) 24 if name in {'SIG_DFL', 'SIG_IGN'}: 25 self.assertIsInstance(sig, signal.Handlers) 26 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 27 self.assertIsInstance(sig, signal.Sigmasks) 28 elif name.startswith('SIG') and not name.startswith('SIG_'): 29 self.assertIsInstance(sig, signal.Signals) 30 elif name.startswith('CTRL_'): 31 self.assertIsInstance(sig, signal.Signals) 32 self.assertEqual(sys.platform, "win32") 33 34 35@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 36class PosixTests(unittest.TestCase): 37 def trivial_signal_handler(self, *args): 38 pass 39 40 def test_out_of_range_signal_number_raises_error(self): 41 self.assertRaises(ValueError, signal.getsignal, 4242) 42 43 self.assertRaises(ValueError, signal.signal, 4242, 44 self.trivial_signal_handler) 45 46 self.assertRaises(ValueError, signal.strsignal, 4242) 47 48 def test_setting_signal_handler_to_none_raises_error(self): 49 self.assertRaises(TypeError, signal.signal, 50 signal.SIGUSR1, None) 51 52 def test_getsignal(self): 53 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 54 self.assertIsInstance(hup, signal.Handlers) 55 self.assertEqual(signal.getsignal(signal.SIGHUP), 56 self.trivial_signal_handler) 57 signal.signal(signal.SIGHUP, hup) 58 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 59 60 def test_strsignal(self): 61 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) 62 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) 63 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP)) 64 65 # Issue 3864, unknown if this affects earlier versions of freebsd also 66 def test_interprocess_signal(self): 67 dirname = os.path.dirname(__file__) 68 script = os.path.join(dirname, 'signalinterproctester.py') 69 assert_python_ok(script) 70 71 def test_valid_signals(self): 72 s = signal.valid_signals() 73 self.assertIsInstance(s, set) 74 self.assertIn(signal.Signals.SIGINT, s) 75 self.assertIn(signal.Signals.SIGALRM, s) 76 self.assertNotIn(0, s) 77 self.assertNotIn(signal.NSIG, s) 78 self.assertLess(len(s), signal.NSIG) 79 80 @unittest.skipUnless(sys.executable, "sys.executable required.") 81 def test_keyboard_interrupt_exit_code(self): 82 """KeyboardInterrupt triggers exit via SIGINT.""" 83 process = subprocess.run( 84 [sys.executable, "-c", 85 "import os, signal, time\n" 86 "os.kill(os.getpid(), signal.SIGINT)\n" 87 "for _ in range(999): time.sleep(0.01)"], 88 stderr=subprocess.PIPE) 89 self.assertIn(b"KeyboardInterrupt", process.stderr) 90 self.assertEqual(process.returncode, -signal.SIGINT) 91 # Caveat: The exit code is insufficient to guarantee we actually died 92 # via a signal. POSIX shells do more than look at the 8 bit value. 93 # Writing an automation friendly test of an interactive shell 94 # to confirm that our process died via a SIGINT proved too complex. 95 96 97@unittest.skipUnless(sys.platform == "win32", "Windows specific") 98class WindowsSignalTests(unittest.TestCase): 99 100 def test_valid_signals(self): 101 s = signal.valid_signals() 102 self.assertIsInstance(s, set) 103 self.assertGreaterEqual(len(s), 6) 104 self.assertIn(signal.Signals.SIGINT, s) 105 self.assertNotIn(0, s) 106 self.assertNotIn(signal.NSIG, s) 107 self.assertLess(len(s), signal.NSIG) 108 109 def test_issue9324(self): 110 # Updated for issue #10003, adding SIGBREAK 111 handler = lambda x, y: None 112 checked = set() 113 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 114 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 115 signal.SIGTERM): 116 # Set and then reset a handler for signals that work on windows. 117 # Issue #18396, only for signals without a C-level handler. 118 if signal.getsignal(sig) is not None: 119 signal.signal(sig, signal.signal(sig, handler)) 120 checked.add(sig) 121 # Issue #18396: Ensure the above loop at least tested *something* 122 self.assertTrue(checked) 123 124 with self.assertRaises(ValueError): 125 signal.signal(-1, handler) 126 127 with self.assertRaises(ValueError): 128 signal.signal(7, handler) 129 130 @unittest.skipUnless(sys.executable, "sys.executable required.") 131 def test_keyboard_interrupt_exit_code(self): 132 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.""" 133 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here 134 # as that requires setting up a console control handler in a child 135 # in its own process group. Doable, but quite complicated. (see 136 # @eryksun on https://github.com/python/cpython/pull/11862) 137 process = subprocess.run( 138 [sys.executable, "-c", "raise KeyboardInterrupt"], 139 stderr=subprocess.PIPE) 140 self.assertIn(b"KeyboardInterrupt", process.stderr) 141 STATUS_CONTROL_C_EXIT = 0xC000013A 142 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT) 143 144 145class WakeupFDTests(unittest.TestCase): 146 147 def test_invalid_call(self): 148 # First parameter is positional-only 149 with self.assertRaises(TypeError): 150 signal.set_wakeup_fd(signum=signal.SIGINT) 151 152 # warn_on_full_buffer is a keyword-only parameter 153 with self.assertRaises(TypeError): 154 signal.set_wakeup_fd(signal.SIGINT, False) 155 156 def test_invalid_fd(self): 157 fd = support.make_bad_fd() 158 self.assertRaises((ValueError, OSError), 159 signal.set_wakeup_fd, fd) 160 161 def test_invalid_socket(self): 162 sock = socket.socket() 163 fd = sock.fileno() 164 sock.close() 165 self.assertRaises((ValueError, OSError), 166 signal.set_wakeup_fd, fd) 167 168 def test_set_wakeup_fd_result(self): 169 r1, w1 = os.pipe() 170 self.addCleanup(os.close, r1) 171 self.addCleanup(os.close, w1) 172 r2, w2 = os.pipe() 173 self.addCleanup(os.close, r2) 174 self.addCleanup(os.close, w2) 175 176 if hasattr(os, 'set_blocking'): 177 os.set_blocking(w1, False) 178 os.set_blocking(w2, False) 179 180 signal.set_wakeup_fd(w1) 181 self.assertEqual(signal.set_wakeup_fd(w2), w1) 182 self.assertEqual(signal.set_wakeup_fd(-1), w2) 183 self.assertEqual(signal.set_wakeup_fd(-1), -1) 184 185 def test_set_wakeup_fd_socket_result(self): 186 sock1 = socket.socket() 187 self.addCleanup(sock1.close) 188 sock1.setblocking(False) 189 fd1 = sock1.fileno() 190 191 sock2 = socket.socket() 192 self.addCleanup(sock2.close) 193 sock2.setblocking(False) 194 fd2 = sock2.fileno() 195 196 signal.set_wakeup_fd(fd1) 197 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 198 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 199 self.assertEqual(signal.set_wakeup_fd(-1), -1) 200 201 # On Windows, files are always blocking and Windows does not provide a 202 # function to test if a socket is in non-blocking mode. 203 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 204 def test_set_wakeup_fd_blocking(self): 205 rfd, wfd = os.pipe() 206 self.addCleanup(os.close, rfd) 207 self.addCleanup(os.close, wfd) 208 209 # fd must be non-blocking 210 os.set_blocking(wfd, True) 211 with self.assertRaises(ValueError) as cm: 212 signal.set_wakeup_fd(wfd) 213 self.assertEqual(str(cm.exception), 214 "the fd %s must be in non-blocking mode" % wfd) 215 216 # non-blocking is ok 217 os.set_blocking(wfd, False) 218 signal.set_wakeup_fd(wfd) 219 signal.set_wakeup_fd(-1) 220 221 222@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 223class WakeupSignalTests(unittest.TestCase): 224 @unittest.skipIf(_testcapi is None, 'need _testcapi') 225 def check_wakeup(self, test_body, *signals, ordered=True): 226 # use a subprocess to have only one thread 227 code = """if 1: 228 import _testcapi 229 import os 230 import signal 231 import struct 232 233 signals = {!r} 234 235 def handler(signum, frame): 236 pass 237 238 def check_signum(signals): 239 data = os.read(read, len(signals)+1) 240 raised = struct.unpack('%uB' % len(data), data) 241 if not {!r}: 242 raised = set(raised) 243 signals = set(signals) 244 if raised != signals: 245 raise Exception("%r != %r" % (raised, signals)) 246 247 {} 248 249 signal.signal(signal.SIGALRM, handler) 250 read, write = os.pipe() 251 os.set_blocking(write, False) 252 signal.set_wakeup_fd(write) 253 254 test() 255 check_signum(signals) 256 257 os.close(read) 258 os.close(write) 259 """.format(tuple(map(int, signals)), ordered, test_body) 260 261 assert_python_ok('-c', code) 262 263 @unittest.skipIf(_testcapi is None, 'need _testcapi') 264 def test_wakeup_write_error(self): 265 # Issue #16105: write() errors in the C signal handler should not 266 # pass silently. 267 # Use a subprocess to have only one thread. 268 code = """if 1: 269 import _testcapi 270 import errno 271 import os 272 import signal 273 import sys 274 from test.support import captured_stderr 275 276 def handler(signum, frame): 277 1/0 278 279 signal.signal(signal.SIGALRM, handler) 280 r, w = os.pipe() 281 os.set_blocking(r, False) 282 283 # Set wakeup_fd a read-only file descriptor to trigger the error 284 signal.set_wakeup_fd(r) 285 try: 286 with captured_stderr() as err: 287 signal.raise_signal(signal.SIGALRM) 288 except ZeroDivisionError: 289 # An ignored exception should have been printed out on stderr 290 err = err.getvalue() 291 if ('Exception ignored when trying to write to the signal wakeup fd' 292 not in err): 293 raise AssertionError(err) 294 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 295 raise AssertionError(err) 296 else: 297 raise AssertionError("ZeroDivisionError not raised") 298 299 os.close(r) 300 os.close(w) 301 """ 302 r, w = os.pipe() 303 try: 304 os.write(r, b'x') 305 except OSError: 306 pass 307 else: 308 self.skipTest("OS doesn't report write() error on the read end of a pipe") 309 finally: 310 os.close(r) 311 os.close(w) 312 313 assert_python_ok('-c', code) 314 315 def test_wakeup_fd_early(self): 316 self.check_wakeup("""def test(): 317 import select 318 import time 319 320 TIMEOUT_FULL = 10 321 TIMEOUT_HALF = 5 322 323 class InterruptSelect(Exception): 324 pass 325 326 def handler(signum, frame): 327 raise InterruptSelect 328 signal.signal(signal.SIGALRM, handler) 329 330 signal.alarm(1) 331 332 # We attempt to get a signal during the sleep, 333 # before select is called 334 try: 335 select.select([], [], [], TIMEOUT_FULL) 336 except InterruptSelect: 337 pass 338 else: 339 raise Exception("select() was not interrupted") 340 341 before_time = time.monotonic() 342 select.select([read], [], [], TIMEOUT_FULL) 343 after_time = time.monotonic() 344 dt = after_time - before_time 345 if dt >= TIMEOUT_HALF: 346 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 347 """, signal.SIGALRM) 348 349 def test_wakeup_fd_during(self): 350 self.check_wakeup("""def test(): 351 import select 352 import time 353 354 TIMEOUT_FULL = 10 355 TIMEOUT_HALF = 5 356 357 class InterruptSelect(Exception): 358 pass 359 360 def handler(signum, frame): 361 raise InterruptSelect 362 signal.signal(signal.SIGALRM, handler) 363 364 signal.alarm(1) 365 before_time = time.monotonic() 366 # We attempt to get a signal during the select call 367 try: 368 select.select([read], [], [], TIMEOUT_FULL) 369 except InterruptSelect: 370 pass 371 else: 372 raise Exception("select() was not interrupted") 373 after_time = time.monotonic() 374 dt = after_time - before_time 375 if dt >= TIMEOUT_HALF: 376 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 377 """, signal.SIGALRM) 378 379 def test_signum(self): 380 self.check_wakeup("""def test(): 381 signal.signal(signal.SIGUSR1, handler) 382 signal.raise_signal(signal.SIGUSR1) 383 signal.raise_signal(signal.SIGALRM) 384 """, signal.SIGUSR1, signal.SIGALRM) 385 386 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 387 'need signal.pthread_sigmask()') 388 def test_pending(self): 389 self.check_wakeup("""def test(): 390 signum1 = signal.SIGUSR1 391 signum2 = signal.SIGUSR2 392 393 signal.signal(signum1, handler) 394 signal.signal(signum2, handler) 395 396 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 397 signal.raise_signal(signum1) 398 signal.raise_signal(signum2) 399 # Unblocking the 2 signals calls the C signal handler twice 400 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 401 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 402 403 404@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 405class WakeupSocketSignalTests(unittest.TestCase): 406 407 @unittest.skipIf(_testcapi is None, 'need _testcapi') 408 def test_socket(self): 409 # use a subprocess to have only one thread 410 code = """if 1: 411 import signal 412 import socket 413 import struct 414 import _testcapi 415 416 signum = signal.SIGINT 417 signals = (signum,) 418 419 def handler(signum, frame): 420 pass 421 422 signal.signal(signum, handler) 423 424 read, write = socket.socketpair() 425 write.setblocking(False) 426 signal.set_wakeup_fd(write.fileno()) 427 428 signal.raise_signal(signum) 429 430 data = read.recv(1) 431 if not data: 432 raise Exception("no signum written") 433 raised = struct.unpack('B', data) 434 if raised != signals: 435 raise Exception("%r != %r" % (raised, signals)) 436 437 read.close() 438 write.close() 439 """ 440 441 assert_python_ok('-c', code) 442 443 @unittest.skipIf(_testcapi is None, 'need _testcapi') 444 def test_send_error(self): 445 # Use a subprocess to have only one thread. 446 if os.name == 'nt': 447 action = 'send' 448 else: 449 action = 'write' 450 code = """if 1: 451 import errno 452 import signal 453 import socket 454 import sys 455 import time 456 import _testcapi 457 from test.support import captured_stderr 458 459 signum = signal.SIGINT 460 461 def handler(signum, frame): 462 pass 463 464 signal.signal(signum, handler) 465 466 read, write = socket.socketpair() 467 read.setblocking(False) 468 write.setblocking(False) 469 470 signal.set_wakeup_fd(write.fileno()) 471 472 # Close sockets: send() will fail 473 read.close() 474 write.close() 475 476 with captured_stderr() as err: 477 signal.raise_signal(signum) 478 479 err = err.getvalue() 480 if ('Exception ignored when trying to {action} to the signal wakeup fd' 481 not in err): 482 raise AssertionError(err) 483 """.format(action=action) 484 assert_python_ok('-c', code) 485 486 @unittest.skipIf(_testcapi is None, 'need _testcapi') 487 def test_warn_on_full_buffer(self): 488 # Use a subprocess to have only one thread. 489 if os.name == 'nt': 490 action = 'send' 491 else: 492 action = 'write' 493 code = """if 1: 494 import errno 495 import signal 496 import socket 497 import sys 498 import time 499 import _testcapi 500 from test.support import captured_stderr 501 502 signum = signal.SIGINT 503 504 # This handler will be called, but we intentionally won't read from 505 # the wakeup fd. 506 def handler(signum, frame): 507 pass 508 509 signal.signal(signum, handler) 510 511 read, write = socket.socketpair() 512 513 # Fill the socketpair buffer 514 if sys.platform == 'win32': 515 # bpo-34130: On Windows, sometimes non-blocking send fails to fill 516 # the full socketpair buffer, so use a timeout of 50 ms instead. 517 write.settimeout(0.050) 518 else: 519 write.setblocking(False) 520 521 # Start with large chunk size to reduce the 522 # number of send needed to fill the buffer. 523 written = 0 524 for chunk_size in (2 ** 16, 2 ** 8, 1): 525 chunk = b"x" * chunk_size 526 try: 527 while True: 528 write.send(chunk) 529 written += chunk_size 530 except (BlockingIOError, socket.timeout): 531 pass 532 533 print(f"%s bytes written into the socketpair" % written, flush=True) 534 535 write.setblocking(False) 536 try: 537 write.send(b"x") 538 except BlockingIOError: 539 # The socketpair buffer seems full 540 pass 541 else: 542 raise AssertionError("%s bytes failed to fill the socketpair " 543 "buffer" % written) 544 545 # By default, we get a warning when a signal arrives 546 msg = ('Exception ignored when trying to {action} ' 547 'to the signal wakeup fd') 548 signal.set_wakeup_fd(write.fileno()) 549 550 with captured_stderr() as err: 551 signal.raise_signal(signum) 552 553 err = err.getvalue() 554 if msg not in err: 555 raise AssertionError("first set_wakeup_fd() test failed, " 556 "stderr: %r" % err) 557 558 # And also if warn_on_full_buffer=True 559 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) 560 561 with captured_stderr() as err: 562 signal.raise_signal(signum) 563 564 err = err.getvalue() 565 if msg not in err: 566 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " 567 "test failed, stderr: %r" % err) 568 569 # But not if warn_on_full_buffer=False 570 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) 571 572 with captured_stderr() as err: 573 signal.raise_signal(signum) 574 575 err = err.getvalue() 576 if err != "": 577 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " 578 "test failed, stderr: %r" % err) 579 580 # And then check the default again, to make sure warn_on_full_buffer 581 # settings don't leak across calls. 582 signal.set_wakeup_fd(write.fileno()) 583 584 with captured_stderr() as err: 585 signal.raise_signal(signum) 586 587 err = err.getvalue() 588 if msg not in err: 589 raise AssertionError("second set_wakeup_fd() test failed, " 590 "stderr: %r" % err) 591 592 """.format(action=action) 593 assert_python_ok('-c', code) 594 595 596@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 597class SiginterruptTest(unittest.TestCase): 598 599 def readpipe_interrupted(self, interrupt): 600 """Perform a read during which a signal will arrive. Return True if the 601 read is interrupted by the signal and raises an exception. Return False 602 if it returns normally. 603 """ 604 # use a subprocess to have only one thread, to have a timeout on the 605 # blocking read and to not touch signal handling in this process 606 code = """if 1: 607 import errno 608 import os 609 import signal 610 import sys 611 612 interrupt = %r 613 r, w = os.pipe() 614 615 def handler(signum, frame): 616 1 / 0 617 618 signal.signal(signal.SIGALRM, handler) 619 if interrupt is not None: 620 signal.siginterrupt(signal.SIGALRM, interrupt) 621 622 print("ready") 623 sys.stdout.flush() 624 625 # run the test twice 626 try: 627 for loop in range(2): 628 # send a SIGALRM in a second (during the read) 629 signal.alarm(1) 630 try: 631 # blocking call: read from a pipe without data 632 os.read(r, 1) 633 except ZeroDivisionError: 634 pass 635 else: 636 sys.exit(2) 637 sys.exit(3) 638 finally: 639 os.close(r) 640 os.close(w) 641 """ % (interrupt,) 642 with spawn_python('-c', code) as process: 643 try: 644 # wait until the child process is loaded and has started 645 first_line = process.stdout.readline() 646 647 stdout, stderr = process.communicate(timeout=5.0) 648 except subprocess.TimeoutExpired: 649 process.kill() 650 return False 651 else: 652 stdout = first_line + stdout 653 exitcode = process.wait() 654 if exitcode not in (2, 3): 655 raise Exception("Child error (exit code %s): %r" 656 % (exitcode, stdout)) 657 return (exitcode == 3) 658 659 def test_without_siginterrupt(self): 660 # If a signal handler is installed and siginterrupt is not called 661 # at all, when that signal arrives, it interrupts a syscall that's in 662 # progress. 663 interrupted = self.readpipe_interrupted(None) 664 self.assertTrue(interrupted) 665 666 def test_siginterrupt_on(self): 667 # If a signal handler is installed and siginterrupt is called with 668 # a true value for the second argument, when that signal arrives, it 669 # interrupts a syscall that's in progress. 670 interrupted = self.readpipe_interrupted(True) 671 self.assertTrue(interrupted) 672 673 def test_siginterrupt_off(self): 674 # If a signal handler is installed and siginterrupt is called with 675 # a false value for the second argument, when that signal arrives, it 676 # does not interrupt a syscall that's in progress. 677 interrupted = self.readpipe_interrupted(False) 678 self.assertFalse(interrupted) 679 680 681@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 682class ItimerTest(unittest.TestCase): 683 def setUp(self): 684 self.hndl_called = False 685 self.hndl_count = 0 686 self.itimer = None 687 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 688 689 def tearDown(self): 690 signal.signal(signal.SIGALRM, self.old_alarm) 691 if self.itimer is not None: # test_itimer_exc doesn't change this attr 692 # just ensure that itimer is stopped 693 signal.setitimer(self.itimer, 0) 694 695 def sig_alrm(self, *args): 696 self.hndl_called = True 697 698 def sig_vtalrm(self, *args): 699 self.hndl_called = True 700 701 if self.hndl_count > 3: 702 # it shouldn't be here, because it should have been disabled. 703 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 704 "timer.") 705 elif self.hndl_count == 3: 706 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 707 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 708 709 self.hndl_count += 1 710 711 def sig_prof(self, *args): 712 self.hndl_called = True 713 signal.setitimer(signal.ITIMER_PROF, 0) 714 715 def test_itimer_exc(self): 716 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 717 # defines it ? 718 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 719 # Negative times are treated as zero on some platforms. 720 if 0: 721 self.assertRaises(signal.ItimerError, 722 signal.setitimer, signal.ITIMER_REAL, -1) 723 724 def test_itimer_real(self): 725 self.itimer = signal.ITIMER_REAL 726 signal.setitimer(self.itimer, 1.0) 727 signal.pause() 728 self.assertEqual(self.hndl_called, True) 729 730 # Issue 3864, unknown if this affects earlier versions of freebsd also 731 @unittest.skipIf(sys.platform in ('netbsd5',), 732 'itimer not reliable (does not mix well with threading) on some BSDs.') 733 def test_itimer_virtual(self): 734 self.itimer = signal.ITIMER_VIRTUAL 735 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 736 signal.setitimer(self.itimer, 0.3, 0.2) 737 738 start_time = time.monotonic() 739 while time.monotonic() - start_time < 60.0: 740 # use up some virtual time by doing real work 741 _ = pow(12345, 67890, 10000019) 742 if signal.getitimer(self.itimer) == (0.0, 0.0): 743 break # sig_vtalrm handler stopped this itimer 744 else: # Issue 8424 745 self.skipTest("timeout: likely cause: machine too slow or load too " 746 "high") 747 748 # virtual itimer should be (0.0, 0.0) now 749 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 750 # and the handler should have been called 751 self.assertEqual(self.hndl_called, True) 752 753 def test_itimer_prof(self): 754 self.itimer = signal.ITIMER_PROF 755 signal.signal(signal.SIGPROF, self.sig_prof) 756 signal.setitimer(self.itimer, 0.2, 0.2) 757 758 start_time = time.monotonic() 759 while time.monotonic() - start_time < 60.0: 760 # do some work 761 _ = pow(12345, 67890, 10000019) 762 if signal.getitimer(self.itimer) == (0.0, 0.0): 763 break # sig_prof handler stopped this itimer 764 else: # Issue 8424 765 self.skipTest("timeout: likely cause: machine too slow or load too " 766 "high") 767 768 # profiling itimer should be (0.0, 0.0) now 769 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 770 # and the handler should have been called 771 self.assertEqual(self.hndl_called, True) 772 773 def test_setitimer_tiny(self): 774 # bpo-30807: C setitimer() takes a microsecond-resolution interval. 775 # Check that float -> timeval conversion doesn't round 776 # the interval down to zero, which would disable the timer. 777 self.itimer = signal.ITIMER_REAL 778 signal.setitimer(self.itimer, 1e-6) 779 time.sleep(1) 780 self.assertEqual(self.hndl_called, True) 781 782 783class PendingSignalsTests(unittest.TestCase): 784 """ 785 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 786 functions. 787 """ 788 @unittest.skipUnless(hasattr(signal, 'sigpending'), 789 'need signal.sigpending()') 790 def test_sigpending_empty(self): 791 self.assertEqual(signal.sigpending(), set()) 792 793 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 794 'need signal.pthread_sigmask()') 795 @unittest.skipUnless(hasattr(signal, 'sigpending'), 796 'need signal.sigpending()') 797 def test_sigpending(self): 798 code = """if 1: 799 import os 800 import signal 801 802 def handler(signum, frame): 803 1/0 804 805 signum = signal.SIGUSR1 806 signal.signal(signum, handler) 807 808 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 809 os.kill(os.getpid(), signum) 810 pending = signal.sigpending() 811 for sig in pending: 812 assert isinstance(sig, signal.Signals), repr(pending) 813 if pending != {signum}: 814 raise Exception('%s != {%s}' % (pending, signum)) 815 try: 816 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 817 except ZeroDivisionError: 818 pass 819 else: 820 raise Exception("ZeroDivisionError not raised") 821 """ 822 assert_python_ok('-c', code) 823 824 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 825 'need signal.pthread_kill()') 826 def test_pthread_kill(self): 827 code = """if 1: 828 import signal 829 import threading 830 import sys 831 832 signum = signal.SIGUSR1 833 834 def handler(signum, frame): 835 1/0 836 837 signal.signal(signum, handler) 838 839 tid = threading.get_ident() 840 try: 841 signal.pthread_kill(tid, signum) 842 except ZeroDivisionError: 843 pass 844 else: 845 raise Exception("ZeroDivisionError not raised") 846 """ 847 assert_python_ok('-c', code) 848 849 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 850 'need signal.pthread_sigmask()') 851 def wait_helper(self, blocked, test): 852 """ 853 test: body of the "def test(signum):" function. 854 blocked: number of the blocked signal 855 """ 856 code = '''if 1: 857 import signal 858 import sys 859 from signal import Signals 860 861 def handler(signum, frame): 862 1/0 863 864 %s 865 866 blocked = %s 867 signum = signal.SIGALRM 868 869 # child: block and wait the signal 870 try: 871 signal.signal(signum, handler) 872 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 873 874 # Do the tests 875 test(signum) 876 877 # The handler must not be called on unblock 878 try: 879 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 880 except ZeroDivisionError: 881 print("the signal handler has been called", 882 file=sys.stderr) 883 sys.exit(1) 884 except BaseException as err: 885 print("error: {}".format(err), file=sys.stderr) 886 sys.stderr.flush() 887 sys.exit(1) 888 ''' % (test.strip(), blocked) 889 890 # sig*wait* must be called with the signal blocked: since the current 891 # process might have several threads running, use a subprocess to have 892 # a single thread. 893 assert_python_ok('-c', code) 894 895 @unittest.skipUnless(hasattr(signal, 'sigwait'), 896 'need signal.sigwait()') 897 def test_sigwait(self): 898 self.wait_helper(signal.SIGALRM, ''' 899 def test(signum): 900 signal.alarm(1) 901 received = signal.sigwait([signum]) 902 assert isinstance(received, signal.Signals), received 903 if received != signum: 904 raise Exception('received %s, not %s' % (received, signum)) 905 ''') 906 907 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 908 'need signal.sigwaitinfo()') 909 def test_sigwaitinfo(self): 910 self.wait_helper(signal.SIGALRM, ''' 911 def test(signum): 912 signal.alarm(1) 913 info = signal.sigwaitinfo([signum]) 914 if info.si_signo != signum: 915 raise Exception("info.si_signo != %s" % signum) 916 ''') 917 918 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 919 'need signal.sigtimedwait()') 920 def test_sigtimedwait(self): 921 self.wait_helper(signal.SIGALRM, ''' 922 def test(signum): 923 signal.alarm(1) 924 info = signal.sigtimedwait([signum], 10.1000) 925 if info.si_signo != signum: 926 raise Exception('info.si_signo != %s' % signum) 927 ''') 928 929 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 930 'need signal.sigtimedwait()') 931 def test_sigtimedwait_poll(self): 932 # check that polling with sigtimedwait works 933 self.wait_helper(signal.SIGALRM, ''' 934 def test(signum): 935 import os 936 os.kill(os.getpid(), signum) 937 info = signal.sigtimedwait([signum], 0) 938 if info.si_signo != signum: 939 raise Exception('info.si_signo != %s' % signum) 940 ''') 941 942 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 943 'need signal.sigtimedwait()') 944 def test_sigtimedwait_timeout(self): 945 self.wait_helper(signal.SIGALRM, ''' 946 def test(signum): 947 received = signal.sigtimedwait([signum], 1.0) 948 if received is not None: 949 raise Exception("received=%r" % (received,)) 950 ''') 951 952 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 953 'need signal.sigtimedwait()') 954 def test_sigtimedwait_negative_timeout(self): 955 signum = signal.SIGALRM 956 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 957 958 @unittest.skipUnless(hasattr(signal, 'sigwait'), 959 'need signal.sigwait()') 960 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 961 'need signal.pthread_sigmask()') 962 def test_sigwait_thread(self): 963 # Check that calling sigwait() from a thread doesn't suspend the whole 964 # process. A new interpreter is spawned to avoid problems when mixing 965 # threads and fork(): only async-safe functions are allowed between 966 # fork() and exec(). 967 assert_python_ok("-c", """if True: 968 import os, threading, sys, time, signal 969 970 # the default handler terminates the process 971 signum = signal.SIGUSR1 972 973 def kill_later(): 974 # wait until the main thread is waiting in sigwait() 975 time.sleep(1) 976 os.kill(os.getpid(), signum) 977 978 # the signal must be blocked by all the threads 979 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 980 killer = threading.Thread(target=kill_later) 981 killer.start() 982 received = signal.sigwait([signum]) 983 if received != signum: 984 print("sigwait() received %s, not %s" % (received, signum), 985 file=sys.stderr) 986 sys.exit(1) 987 killer.join() 988 # unblock the signal, which should have been cleared by sigwait() 989 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 990 """) 991 992 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 993 'need signal.pthread_sigmask()') 994 def test_pthread_sigmask_arguments(self): 995 self.assertRaises(TypeError, signal.pthread_sigmask) 996 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 997 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 998 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 999 with self.assertRaises(ValueError): 1000 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG]) 1001 with self.assertRaises(ValueError): 1002 signal.pthread_sigmask(signal.SIG_BLOCK, [0]) 1003 with self.assertRaises(ValueError): 1004 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000]) 1005 1006 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1007 'need signal.pthread_sigmask()') 1008 def test_pthread_sigmask_valid_signals(self): 1009 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 1010 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s) 1011 # Get current blocked set 1012 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) 1013 self.assertLessEqual(s, signal.valid_signals()) 1014 1015 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1016 'need signal.pthread_sigmask()') 1017 def test_pthread_sigmask(self): 1018 code = """if 1: 1019 import signal 1020 import os; import threading 1021 1022 def handler(signum, frame): 1023 1/0 1024 1025 def kill(signum): 1026 os.kill(os.getpid(), signum) 1027 1028 def check_mask(mask): 1029 for sig in mask: 1030 assert isinstance(sig, signal.Signals), repr(sig) 1031 1032 def read_sigmask(): 1033 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 1034 check_mask(sigmask) 1035 return sigmask 1036 1037 signum = signal.SIGUSR1 1038 1039 # Install our signal handler 1040 old_handler = signal.signal(signum, handler) 1041 1042 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 1043 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1044 check_mask(old_mask) 1045 try: 1046 kill(signum) 1047 except ZeroDivisionError: 1048 pass 1049 else: 1050 raise Exception("ZeroDivisionError not raised") 1051 1052 # Block and then raise SIGUSR1. The signal is blocked: the signal 1053 # handler is not called, and the signal is now pending 1054 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1055 check_mask(mask) 1056 kill(signum) 1057 1058 # Check the new mask 1059 blocked = read_sigmask() 1060 check_mask(blocked) 1061 if signum not in blocked: 1062 raise Exception("%s not in %s" % (signum, blocked)) 1063 if old_mask ^ blocked != {signum}: 1064 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 1065 1066 # Unblock SIGUSR1 1067 try: 1068 # unblock the pending signal calls immediately the signal handler 1069 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1070 except ZeroDivisionError: 1071 pass 1072 else: 1073 raise Exception("ZeroDivisionError not raised") 1074 try: 1075 kill(signum) 1076 except ZeroDivisionError: 1077 pass 1078 else: 1079 raise Exception("ZeroDivisionError not raised") 1080 1081 # Check the new mask 1082 unblocked = read_sigmask() 1083 if signum in unblocked: 1084 raise Exception("%s in %s" % (signum, unblocked)) 1085 if blocked ^ unblocked != {signum}: 1086 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 1087 if old_mask != unblocked: 1088 raise Exception("%s != %s" % (old_mask, unblocked)) 1089 """ 1090 assert_python_ok('-c', code) 1091 1092 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 1093 'need signal.pthread_kill()') 1094 def test_pthread_kill_main_thread(self): 1095 # Test that a signal can be sent to the main thread with pthread_kill() 1096 # before any other thread has been created (see issue #12392). 1097 code = """if True: 1098 import threading 1099 import signal 1100 import sys 1101 1102 def handler(signum, frame): 1103 sys.exit(3) 1104 1105 signal.signal(signal.SIGUSR1, handler) 1106 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 1107 sys.exit(2) 1108 """ 1109 1110 with spawn_python('-c', code) as process: 1111 stdout, stderr = process.communicate() 1112 exitcode = process.wait() 1113 if exitcode != 3: 1114 raise Exception("Child error (exit code %s): %s" % 1115 (exitcode, stdout)) 1116 1117 1118class StressTest(unittest.TestCase): 1119 """ 1120 Stress signal delivery, especially when a signal arrives in 1121 the middle of recomputing the signal state or executing 1122 previously tripped signal handlers. 1123 """ 1124 1125 def setsig(self, signum, handler): 1126 old_handler = signal.signal(signum, handler) 1127 self.addCleanup(signal.signal, signum, old_handler) 1128 1129 def measure_itimer_resolution(self): 1130 N = 20 1131 times = [] 1132 1133 def handler(signum=None, frame=None): 1134 if len(times) < N: 1135 times.append(time.perf_counter()) 1136 # 1 µs is the smallest possible timer interval, 1137 # we want to measure what the concrete duration 1138 # will be on this platform 1139 signal.setitimer(signal.ITIMER_REAL, 1e-6) 1140 1141 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0) 1142 self.setsig(signal.SIGALRM, handler) 1143 handler() 1144 while len(times) < N: 1145 time.sleep(1e-3) 1146 1147 durations = [times[i+1] - times[i] for i in range(len(times) - 1)] 1148 med = statistics.median(durations) 1149 if support.verbose: 1150 print("detected median itimer() resolution: %.6f s." % (med,)) 1151 return med 1152 1153 def decide_itimer_count(self): 1154 # Some systems have poor setitimer() resolution (for example 1155 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable 1156 # number of sequential timers based on that. 1157 reso = self.measure_itimer_resolution() 1158 if reso <= 1e-4: 1159 return 10000 1160 elif reso <= 1e-2: 1161 return 100 1162 else: 1163 self.skipTest("detected itimer resolution (%.3f s.) too high " 1164 "(> 10 ms.) on this platform (or system too busy)" 1165 % (reso,)) 1166 1167 @unittest.skipUnless(hasattr(signal, "setitimer"), 1168 "test needs setitimer()") 1169 def test_stress_delivery_dependent(self): 1170 """ 1171 This test uses dependent signal handlers. 1172 """ 1173 N = self.decide_itimer_count() 1174 sigs = [] 1175 1176 def first_handler(signum, frame): 1177 # 1e-6 is the minimum non-zero value for `setitimer()`. 1178 # Choose a random delay so as to improve chances of 1179 # triggering a race condition. Ideally the signal is received 1180 # when inside critical signal-handling routines such as 1181 # Py_MakePendingCalls(). 1182 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1183 1184 def second_handler(signum=None, frame=None): 1185 sigs.append(signum) 1186 1187 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both 1188 # ascending and descending sequences (SIGUSR1 then SIGALRM, 1189 # SIGPROF then SIGALRM), we maximize chances of hitting a bug. 1190 self.setsig(signal.SIGPROF, first_handler) 1191 self.setsig(signal.SIGUSR1, first_handler) 1192 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL 1193 1194 expected_sigs = 0 1195 deadline = time.monotonic() + 15.0 1196 1197 while expected_sigs < N: 1198 os.kill(os.getpid(), signal.SIGPROF) 1199 expected_sigs += 1 1200 # Wait for handlers to run to avoid signal coalescing 1201 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1202 time.sleep(1e-5) 1203 1204 os.kill(os.getpid(), signal.SIGUSR1) 1205 expected_sigs += 1 1206 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1207 time.sleep(1e-5) 1208 1209 # All ITIMER_REAL signals should have been delivered to the 1210 # Python handler 1211 self.assertEqual(len(sigs), N, "Some signals were lost") 1212 1213 @unittest.skipUnless(hasattr(signal, "setitimer"), 1214 "test needs setitimer()") 1215 def test_stress_delivery_simultaneous(self): 1216 """ 1217 This test uses simultaneous signal handlers. 1218 """ 1219 N = self.decide_itimer_count() 1220 sigs = [] 1221 1222 def handler(signum, frame): 1223 sigs.append(signum) 1224 1225 self.setsig(signal.SIGUSR1, handler) 1226 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL 1227 1228 expected_sigs = 0 1229 deadline = time.monotonic() + 15.0 1230 1231 while expected_sigs < N: 1232 # Hopefully the SIGALRM will be received somewhere during 1233 # initial processing of SIGUSR1. 1234 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1235 os.kill(os.getpid(), signal.SIGUSR1) 1236 1237 expected_sigs += 2 1238 # Wait for handlers to run to avoid signal coalescing 1239 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1240 time.sleep(1e-5) 1241 1242 # All ITIMER_REAL signals should have been delivered to the 1243 # Python handler 1244 self.assertEqual(len(sigs), N, "Some signals were lost") 1245 1246class RaiseSignalTest(unittest.TestCase): 1247 1248 def test_sigint(self): 1249 with self.assertRaises(KeyboardInterrupt): 1250 signal.raise_signal(signal.SIGINT) 1251 1252 @unittest.skipIf(sys.platform != "win32", "Windows specific test") 1253 def test_invalid_argument(self): 1254 try: 1255 SIGHUP = 1 # not supported on win32 1256 signal.raise_signal(SIGHUP) 1257 self.fail("OSError (Invalid argument) expected") 1258 except OSError as e: 1259 if e.errno == errno.EINVAL: 1260 pass 1261 else: 1262 raise 1263 1264 def test_handler(self): 1265 is_ok = False 1266 def handler(a, b): 1267 nonlocal is_ok 1268 is_ok = True 1269 old_signal = signal.signal(signal.SIGINT, handler) 1270 self.addCleanup(signal.signal, signal.SIGINT, old_signal) 1271 1272 signal.raise_signal(signal.SIGINT) 1273 self.assertTrue(is_ok) 1274 1275 1276def tearDownModule(): 1277 support.reap_children() 1278 1279if __name__ == "__main__": 1280 unittest.main() 1281