1import errno 2import inspect 3import os 4import random 5import signal 6import socket 7import statistics 8import subprocess 9import sys 10import threading 11import time 12import unittest 13from test import support 14from test.support import os_helper 15from test.support.script_helper import assert_python_ok, spawn_python 16try: 17 import _testcapi 18except ImportError: 19 _testcapi = None 20 21 22class GenericTests(unittest.TestCase): 23 24 def test_enums(self): 25 for name in dir(signal): 26 sig = getattr(signal, name) 27 if name in {'SIG_DFL', 'SIG_IGN'}: 28 self.assertIsInstance(sig, signal.Handlers) 29 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 30 self.assertIsInstance(sig, signal.Sigmasks) 31 elif name.startswith('SIG') and not name.startswith('SIG_'): 32 self.assertIsInstance(sig, signal.Signals) 33 elif name.startswith('CTRL_'): 34 self.assertIsInstance(sig, signal.Signals) 35 self.assertEqual(sys.platform, "win32") 36 37 def test_functions_module_attr(self): 38 # Issue #27718: If __all__ is not defined all non-builtin functions 39 # should have correct __module__ to be displayed by pydoc. 40 for name in dir(signal): 41 value = getattr(signal, name) 42 if inspect.isroutine(value) and not inspect.isbuiltin(value): 43 self.assertEqual(value.__module__, 'signal') 44 45 46@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 47class PosixTests(unittest.TestCase): 48 def trivial_signal_handler(self, *args): 49 pass 50 51 def test_out_of_range_signal_number_raises_error(self): 52 self.assertRaises(ValueError, signal.getsignal, 4242) 53 54 self.assertRaises(ValueError, signal.signal, 4242, 55 self.trivial_signal_handler) 56 57 self.assertRaises(ValueError, signal.strsignal, 4242) 58 59 def test_setting_signal_handler_to_none_raises_error(self): 60 self.assertRaises(TypeError, signal.signal, 61 signal.SIGUSR1, None) 62 63 def test_getsignal(self): 64 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 65 self.assertIsInstance(hup, signal.Handlers) 66 self.assertEqual(signal.getsignal(signal.SIGHUP), 67 self.trivial_signal_handler) 68 signal.signal(signal.SIGHUP, hup) 69 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 70 71 def test_strsignal(self): 72 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) 73 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) 74 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP)) 75 76 # Issue 3864, unknown if this affects earlier versions of freebsd also 77 def test_interprocess_signal(self): 78 dirname = os.path.dirname(__file__) 79 script = os.path.join(dirname, 'signalinterproctester.py') 80 assert_python_ok(script) 81 82 def test_valid_signals(self): 83 s = signal.valid_signals() 84 self.assertIsInstance(s, set) 85 self.assertIn(signal.Signals.SIGINT, s) 86 self.assertIn(signal.Signals.SIGALRM, s) 87 self.assertNotIn(0, s) 88 self.assertNotIn(signal.NSIG, s) 89 self.assertLess(len(s), signal.NSIG) 90 91 @unittest.skipUnless(sys.executable, "sys.executable required.") 92 def test_keyboard_interrupt_exit_code(self): 93 """KeyboardInterrupt triggers exit via SIGINT.""" 94 process = subprocess.run( 95 [sys.executable, "-c", 96 "import os, signal, time\n" 97 "os.kill(os.getpid(), signal.SIGINT)\n" 98 "for _ in range(999): time.sleep(0.01)"], 99 stderr=subprocess.PIPE) 100 self.assertIn(b"KeyboardInterrupt", process.stderr) 101 self.assertEqual(process.returncode, -signal.SIGINT) 102 # Caveat: The exit code is insufficient to guarantee we actually died 103 # via a signal. POSIX shells do more than look at the 8 bit value. 104 # Writing an automation friendly test of an interactive shell 105 # to confirm that our process died via a SIGINT proved too complex. 106 107 108@unittest.skipUnless(sys.platform == "win32", "Windows specific") 109class WindowsSignalTests(unittest.TestCase): 110 111 def test_valid_signals(self): 112 s = signal.valid_signals() 113 self.assertIsInstance(s, set) 114 self.assertGreaterEqual(len(s), 6) 115 self.assertIn(signal.Signals.SIGINT, s) 116 self.assertNotIn(0, s) 117 self.assertNotIn(signal.NSIG, s) 118 self.assertLess(len(s), signal.NSIG) 119 120 def test_issue9324(self): 121 # Updated for issue #10003, adding SIGBREAK 122 handler = lambda x, y: None 123 checked = set() 124 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 125 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 126 signal.SIGTERM): 127 # Set and then reset a handler for signals that work on windows. 128 # Issue #18396, only for signals without a C-level handler. 129 if signal.getsignal(sig) is not None: 130 signal.signal(sig, signal.signal(sig, handler)) 131 checked.add(sig) 132 # Issue #18396: Ensure the above loop at least tested *something* 133 self.assertTrue(checked) 134 135 with self.assertRaises(ValueError): 136 signal.signal(-1, handler) 137 138 with self.assertRaises(ValueError): 139 signal.signal(7, handler) 140 141 @unittest.skipUnless(sys.executable, "sys.executable required.") 142 def test_keyboard_interrupt_exit_code(self): 143 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.""" 144 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here 145 # as that requires setting up a console control handler in a child 146 # in its own process group. Doable, but quite complicated. (see 147 # @eryksun on https://github.com/python/cpython/pull/11862) 148 process = subprocess.run( 149 [sys.executable, "-c", "raise KeyboardInterrupt"], 150 stderr=subprocess.PIPE) 151 self.assertIn(b"KeyboardInterrupt", process.stderr) 152 STATUS_CONTROL_C_EXIT = 0xC000013A 153 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT) 154 155 156class WakeupFDTests(unittest.TestCase): 157 158 def test_invalid_call(self): 159 # First parameter is positional-only 160 with self.assertRaises(TypeError): 161 signal.set_wakeup_fd(signum=signal.SIGINT) 162 163 # warn_on_full_buffer is a keyword-only parameter 164 with self.assertRaises(TypeError): 165 signal.set_wakeup_fd(signal.SIGINT, False) 166 167 def test_invalid_fd(self): 168 fd = os_helper.make_bad_fd() 169 self.assertRaises((ValueError, OSError), 170 signal.set_wakeup_fd, fd) 171 172 def test_invalid_socket(self): 173 sock = socket.socket() 174 fd = sock.fileno() 175 sock.close() 176 self.assertRaises((ValueError, OSError), 177 signal.set_wakeup_fd, fd) 178 179 def test_set_wakeup_fd_result(self): 180 r1, w1 = os.pipe() 181 self.addCleanup(os.close, r1) 182 self.addCleanup(os.close, w1) 183 r2, w2 = os.pipe() 184 self.addCleanup(os.close, r2) 185 self.addCleanup(os.close, w2) 186 187 if hasattr(os, 'set_blocking'): 188 os.set_blocking(w1, False) 189 os.set_blocking(w2, False) 190 191 signal.set_wakeup_fd(w1) 192 self.assertEqual(signal.set_wakeup_fd(w2), w1) 193 self.assertEqual(signal.set_wakeup_fd(-1), w2) 194 self.assertEqual(signal.set_wakeup_fd(-1), -1) 195 196 def test_set_wakeup_fd_socket_result(self): 197 sock1 = socket.socket() 198 self.addCleanup(sock1.close) 199 sock1.setblocking(False) 200 fd1 = sock1.fileno() 201 202 sock2 = socket.socket() 203 self.addCleanup(sock2.close) 204 sock2.setblocking(False) 205 fd2 = sock2.fileno() 206 207 signal.set_wakeup_fd(fd1) 208 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 209 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 210 self.assertEqual(signal.set_wakeup_fd(-1), -1) 211 212 # On Windows, files are always blocking and Windows does not provide a 213 # function to test if a socket is in non-blocking mode. 214 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 215 def test_set_wakeup_fd_blocking(self): 216 rfd, wfd = os.pipe() 217 self.addCleanup(os.close, rfd) 218 self.addCleanup(os.close, wfd) 219 220 # fd must be non-blocking 221 os.set_blocking(wfd, True) 222 with self.assertRaises(ValueError) as cm: 223 signal.set_wakeup_fd(wfd) 224 self.assertEqual(str(cm.exception), 225 "the fd %s must be in non-blocking mode" % wfd) 226 227 # non-blocking is ok 228 os.set_blocking(wfd, False) 229 signal.set_wakeup_fd(wfd) 230 signal.set_wakeup_fd(-1) 231 232 233@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 234class WakeupSignalTests(unittest.TestCase): 235 @unittest.skipIf(_testcapi is None, 'need _testcapi') 236 def check_wakeup(self, test_body, *signals, ordered=True): 237 # use a subprocess to have only one thread 238 code = """if 1: 239 import _testcapi 240 import os 241 import signal 242 import struct 243 244 signals = {!r} 245 246 def handler(signum, frame): 247 pass 248 249 def check_signum(signals): 250 data = os.read(read, len(signals)+1) 251 raised = struct.unpack('%uB' % len(data), data) 252 if not {!r}: 253 raised = set(raised) 254 signals = set(signals) 255 if raised != signals: 256 raise Exception("%r != %r" % (raised, signals)) 257 258 {} 259 260 signal.signal(signal.SIGALRM, handler) 261 read, write = os.pipe() 262 os.set_blocking(write, False) 263 signal.set_wakeup_fd(write) 264 265 test() 266 check_signum(signals) 267 268 os.close(read) 269 os.close(write) 270 """.format(tuple(map(int, signals)), ordered, test_body) 271 272 assert_python_ok('-c', code) 273 274 @unittest.skipIf(_testcapi is None, 'need _testcapi') 275 def test_wakeup_write_error(self): 276 # Issue #16105: write() errors in the C signal handler should not 277 # pass silently. 278 # Use a subprocess to have only one thread. 279 code = """if 1: 280 import _testcapi 281 import errno 282 import os 283 import signal 284 import sys 285 from test.support import captured_stderr 286 287 def handler(signum, frame): 288 1/0 289 290 signal.signal(signal.SIGALRM, handler) 291 r, w = os.pipe() 292 os.set_blocking(r, False) 293 294 # Set wakeup_fd a read-only file descriptor to trigger the error 295 signal.set_wakeup_fd(r) 296 try: 297 with captured_stderr() as err: 298 signal.raise_signal(signal.SIGALRM) 299 except ZeroDivisionError: 300 # An ignored exception should have been printed out on stderr 301 err = err.getvalue() 302 if ('Exception ignored when trying to write to the signal wakeup fd' 303 not in err): 304 raise AssertionError(err) 305 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 306 raise AssertionError(err) 307 else: 308 raise AssertionError("ZeroDivisionError not raised") 309 310 os.close(r) 311 os.close(w) 312 """ 313 r, w = os.pipe() 314 try: 315 os.write(r, b'x') 316 except OSError: 317 pass 318 else: 319 self.skipTest("OS doesn't report write() error on the read end of a pipe") 320 finally: 321 os.close(r) 322 os.close(w) 323 324 assert_python_ok('-c', code) 325 326 def test_wakeup_fd_early(self): 327 self.check_wakeup("""def test(): 328 import select 329 import time 330 331 TIMEOUT_FULL = 10 332 TIMEOUT_HALF = 5 333 334 class InterruptSelect(Exception): 335 pass 336 337 def handler(signum, frame): 338 raise InterruptSelect 339 signal.signal(signal.SIGALRM, handler) 340 341 signal.alarm(1) 342 343 # We attempt to get a signal during the sleep, 344 # before select is called 345 try: 346 select.select([], [], [], TIMEOUT_FULL) 347 except InterruptSelect: 348 pass 349 else: 350 raise Exception("select() was not interrupted") 351 352 before_time = time.monotonic() 353 select.select([read], [], [], TIMEOUT_FULL) 354 after_time = time.monotonic() 355 dt = after_time - before_time 356 if dt >= TIMEOUT_HALF: 357 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 358 """, signal.SIGALRM) 359 360 def test_wakeup_fd_during(self): 361 self.check_wakeup("""def test(): 362 import select 363 import time 364 365 TIMEOUT_FULL = 10 366 TIMEOUT_HALF = 5 367 368 class InterruptSelect(Exception): 369 pass 370 371 def handler(signum, frame): 372 raise InterruptSelect 373 signal.signal(signal.SIGALRM, handler) 374 375 signal.alarm(1) 376 before_time = time.monotonic() 377 # We attempt to get a signal during the select call 378 try: 379 select.select([read], [], [], TIMEOUT_FULL) 380 except InterruptSelect: 381 pass 382 else: 383 raise Exception("select() was not interrupted") 384 after_time = time.monotonic() 385 dt = after_time - before_time 386 if dt >= TIMEOUT_HALF: 387 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 388 """, signal.SIGALRM) 389 390 def test_signum(self): 391 self.check_wakeup("""def test(): 392 signal.signal(signal.SIGUSR1, handler) 393 signal.raise_signal(signal.SIGUSR1) 394 signal.raise_signal(signal.SIGALRM) 395 """, signal.SIGUSR1, signal.SIGALRM) 396 397 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 398 'need signal.pthread_sigmask()') 399 def test_pending(self): 400 self.check_wakeup("""def test(): 401 signum1 = signal.SIGUSR1 402 signum2 = signal.SIGUSR2 403 404 signal.signal(signum1, handler) 405 signal.signal(signum2, handler) 406 407 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 408 signal.raise_signal(signum1) 409 signal.raise_signal(signum2) 410 # Unblocking the 2 signals calls the C signal handler twice 411 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 412 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 413 414 415@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 416class WakeupSocketSignalTests(unittest.TestCase): 417 418 @unittest.skipIf(_testcapi is None, 'need _testcapi') 419 def test_socket(self): 420 # use a subprocess to have only one thread 421 code = """if 1: 422 import signal 423 import socket 424 import struct 425 import _testcapi 426 427 signum = signal.SIGINT 428 signals = (signum,) 429 430 def handler(signum, frame): 431 pass 432 433 signal.signal(signum, handler) 434 435 read, write = socket.socketpair() 436 write.setblocking(False) 437 signal.set_wakeup_fd(write.fileno()) 438 439 signal.raise_signal(signum) 440 441 data = read.recv(1) 442 if not data: 443 raise Exception("no signum written") 444 raised = struct.unpack('B', data) 445 if raised != signals: 446 raise Exception("%r != %r" % (raised, signals)) 447 448 read.close() 449 write.close() 450 """ 451 452 assert_python_ok('-c', code) 453 454 @unittest.skipIf(_testcapi is None, 'need _testcapi') 455 def test_send_error(self): 456 # Use a subprocess to have only one thread. 457 if os.name == 'nt': 458 action = 'send' 459 else: 460 action = 'write' 461 code = """if 1: 462 import errno 463 import signal 464 import socket 465 import sys 466 import time 467 import _testcapi 468 from test.support import captured_stderr 469 470 signum = signal.SIGINT 471 472 def handler(signum, frame): 473 pass 474 475 signal.signal(signum, handler) 476 477 read, write = socket.socketpair() 478 read.setblocking(False) 479 write.setblocking(False) 480 481 signal.set_wakeup_fd(write.fileno()) 482 483 # Close sockets: send() will fail 484 read.close() 485 write.close() 486 487 with captured_stderr() as err: 488 signal.raise_signal(signum) 489 490 err = err.getvalue() 491 if ('Exception ignored when trying to {action} to the signal wakeup fd' 492 not in err): 493 raise AssertionError(err) 494 """.format(action=action) 495 assert_python_ok('-c', code) 496 497 @unittest.skipIf(_testcapi is None, 'need _testcapi') 498 def test_warn_on_full_buffer(self): 499 # Use a subprocess to have only one thread. 500 if os.name == 'nt': 501 action = 'send' 502 else: 503 action = 'write' 504 code = """if 1: 505 import errno 506 import signal 507 import socket 508 import sys 509 import time 510 import _testcapi 511 from test.support import captured_stderr 512 513 signum = signal.SIGINT 514 515 # This handler will be called, but we intentionally won't read from 516 # the wakeup fd. 517 def handler(signum, frame): 518 pass 519 520 signal.signal(signum, handler) 521 522 read, write = socket.socketpair() 523 524 # Fill the socketpair buffer 525 if sys.platform == 'win32': 526 # bpo-34130: On Windows, sometimes non-blocking send fails to fill 527 # the full socketpair buffer, so use a timeout of 50 ms instead. 528 write.settimeout(0.050) 529 else: 530 write.setblocking(False) 531 532 written = 0 533 if sys.platform == "vxworks": 534 CHUNK_SIZES = (1,) 535 else: 536 # Start with large chunk size to reduce the 537 # number of send needed to fill the buffer. 538 CHUNK_SIZES = (2 ** 16, 2 ** 8, 1) 539 for chunk_size in CHUNK_SIZES: 540 chunk = b"x" * chunk_size 541 try: 542 while True: 543 write.send(chunk) 544 written += chunk_size 545 except (BlockingIOError, TimeoutError): 546 pass 547 548 print(f"%s bytes written into the socketpair" % written, flush=True) 549 550 write.setblocking(False) 551 try: 552 write.send(b"x") 553 except BlockingIOError: 554 # The socketpair buffer seems full 555 pass 556 else: 557 raise AssertionError("%s bytes failed to fill the socketpair " 558 "buffer" % written) 559 560 # By default, we get a warning when a signal arrives 561 msg = ('Exception ignored when trying to {action} ' 562 'to the signal wakeup fd') 563 signal.set_wakeup_fd(write.fileno()) 564 565 with captured_stderr() as err: 566 signal.raise_signal(signum) 567 568 err = err.getvalue() 569 if msg not in err: 570 raise AssertionError("first set_wakeup_fd() test failed, " 571 "stderr: %r" % err) 572 573 # And also if warn_on_full_buffer=True 574 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) 575 576 with captured_stderr() as err: 577 signal.raise_signal(signum) 578 579 err = err.getvalue() 580 if msg not in err: 581 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " 582 "test failed, stderr: %r" % err) 583 584 # But not if warn_on_full_buffer=False 585 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) 586 587 with captured_stderr() as err: 588 signal.raise_signal(signum) 589 590 err = err.getvalue() 591 if err != "": 592 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " 593 "test failed, stderr: %r" % err) 594 595 # And then check the default again, to make sure warn_on_full_buffer 596 # settings don't leak across calls. 597 signal.set_wakeup_fd(write.fileno()) 598 599 with captured_stderr() as err: 600 signal.raise_signal(signum) 601 602 err = err.getvalue() 603 if msg not in err: 604 raise AssertionError("second set_wakeup_fd() test failed, " 605 "stderr: %r" % err) 606 607 """.format(action=action) 608 assert_python_ok('-c', code) 609 610 611@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 612@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()") 613class SiginterruptTest(unittest.TestCase): 614 615 def readpipe_interrupted(self, interrupt): 616 """Perform a read during which a signal will arrive. Return True if the 617 read is interrupted by the signal and raises an exception. Return False 618 if it returns normally. 619 """ 620 # use a subprocess to have only one thread, to have a timeout on the 621 # blocking read and to not touch signal handling in this process 622 code = """if 1: 623 import errno 624 import os 625 import signal 626 import sys 627 628 interrupt = %r 629 r, w = os.pipe() 630 631 def handler(signum, frame): 632 1 / 0 633 634 signal.signal(signal.SIGALRM, handler) 635 if interrupt is not None: 636 signal.siginterrupt(signal.SIGALRM, interrupt) 637 638 print("ready") 639 sys.stdout.flush() 640 641 # run the test twice 642 try: 643 for loop in range(2): 644 # send a SIGALRM in a second (during the read) 645 signal.alarm(1) 646 try: 647 # blocking call: read from a pipe without data 648 os.read(r, 1) 649 except ZeroDivisionError: 650 pass 651 else: 652 sys.exit(2) 653 sys.exit(3) 654 finally: 655 os.close(r) 656 os.close(w) 657 """ % (interrupt,) 658 with spawn_python('-c', code) as process: 659 try: 660 # wait until the child process is loaded and has started 661 first_line = process.stdout.readline() 662 663 stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT) 664 except subprocess.TimeoutExpired: 665 process.kill() 666 return False 667 else: 668 stdout = first_line + stdout 669 exitcode = process.wait() 670 if exitcode not in (2, 3): 671 raise Exception("Child error (exit code %s): %r" 672 % (exitcode, stdout)) 673 return (exitcode == 3) 674 675 def test_without_siginterrupt(self): 676 # If a signal handler is installed and siginterrupt is not called 677 # at all, when that signal arrives, it interrupts a syscall that's in 678 # progress. 679 interrupted = self.readpipe_interrupted(None) 680 self.assertTrue(interrupted) 681 682 def test_siginterrupt_on(self): 683 # If a signal handler is installed and siginterrupt is called with 684 # a true value for the second argument, when that signal arrives, it 685 # interrupts a syscall that's in progress. 686 interrupted = self.readpipe_interrupted(True) 687 self.assertTrue(interrupted) 688 689 def test_siginterrupt_off(self): 690 # If a signal handler is installed and siginterrupt is called with 691 # a false value for the second argument, when that signal arrives, it 692 # does not interrupt a syscall that's in progress. 693 interrupted = self.readpipe_interrupted(False) 694 self.assertFalse(interrupted) 695 696 697@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 698@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'), 699 "needs signal.getitimer() and signal.setitimer()") 700class ItimerTest(unittest.TestCase): 701 def setUp(self): 702 self.hndl_called = False 703 self.hndl_count = 0 704 self.itimer = None 705 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 706 707 def tearDown(self): 708 signal.signal(signal.SIGALRM, self.old_alarm) 709 if self.itimer is not None: # test_itimer_exc doesn't change this attr 710 # just ensure that itimer is stopped 711 signal.setitimer(self.itimer, 0) 712 713 def sig_alrm(self, *args): 714 self.hndl_called = True 715 716 def sig_vtalrm(self, *args): 717 self.hndl_called = True 718 719 if self.hndl_count > 3: 720 # it shouldn't be here, because it should have been disabled. 721 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 722 "timer.") 723 elif self.hndl_count == 3: 724 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 725 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 726 727 self.hndl_count += 1 728 729 def sig_prof(self, *args): 730 self.hndl_called = True 731 signal.setitimer(signal.ITIMER_PROF, 0) 732 733 def test_itimer_exc(self): 734 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 735 # defines it ? 736 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 737 # Negative times are treated as zero on some platforms. 738 if 0: 739 self.assertRaises(signal.ItimerError, 740 signal.setitimer, signal.ITIMER_REAL, -1) 741 742 def test_itimer_real(self): 743 self.itimer = signal.ITIMER_REAL 744 signal.setitimer(self.itimer, 1.0) 745 signal.pause() 746 self.assertEqual(self.hndl_called, True) 747 748 # Issue 3864, unknown if this affects earlier versions of freebsd also 749 @unittest.skipIf(sys.platform in ('netbsd5',), 750 'itimer not reliable (does not mix well with threading) on some BSDs.') 751 def test_itimer_virtual(self): 752 self.itimer = signal.ITIMER_VIRTUAL 753 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 754 signal.setitimer(self.itimer, 0.3, 0.2) 755 756 start_time = time.monotonic() 757 while time.monotonic() - start_time < 60.0: 758 # use up some virtual time by doing real work 759 _ = pow(12345, 67890, 10000019) 760 if signal.getitimer(self.itimer) == (0.0, 0.0): 761 break # sig_vtalrm handler stopped this itimer 762 else: # Issue 8424 763 self.skipTest("timeout: likely cause: machine too slow or load too " 764 "high") 765 766 # virtual itimer should be (0.0, 0.0) now 767 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 768 # and the handler should have been called 769 self.assertEqual(self.hndl_called, True) 770 771 def test_itimer_prof(self): 772 self.itimer = signal.ITIMER_PROF 773 signal.signal(signal.SIGPROF, self.sig_prof) 774 signal.setitimer(self.itimer, 0.2, 0.2) 775 776 start_time = time.monotonic() 777 while time.monotonic() - start_time < 60.0: 778 # do some work 779 _ = pow(12345, 67890, 10000019) 780 if signal.getitimer(self.itimer) == (0.0, 0.0): 781 break # sig_prof handler stopped this itimer 782 else: # Issue 8424 783 self.skipTest("timeout: likely cause: machine too slow or load too " 784 "high") 785 786 # profiling itimer should be (0.0, 0.0) now 787 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 788 # and the handler should have been called 789 self.assertEqual(self.hndl_called, True) 790 791 def test_setitimer_tiny(self): 792 # bpo-30807: C setitimer() takes a microsecond-resolution interval. 793 # Check that float -> timeval conversion doesn't round 794 # the interval down to zero, which would disable the timer. 795 self.itimer = signal.ITIMER_REAL 796 signal.setitimer(self.itimer, 1e-6) 797 time.sleep(1) 798 self.assertEqual(self.hndl_called, True) 799 800 801class PendingSignalsTests(unittest.TestCase): 802 """ 803 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 804 functions. 805 """ 806 @unittest.skipUnless(hasattr(signal, 'sigpending'), 807 'need signal.sigpending()') 808 def test_sigpending_empty(self): 809 self.assertEqual(signal.sigpending(), set()) 810 811 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 812 'need signal.pthread_sigmask()') 813 @unittest.skipUnless(hasattr(signal, 'sigpending'), 814 'need signal.sigpending()') 815 def test_sigpending(self): 816 code = """if 1: 817 import os 818 import signal 819 820 def handler(signum, frame): 821 1/0 822 823 signum = signal.SIGUSR1 824 signal.signal(signum, handler) 825 826 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 827 os.kill(os.getpid(), signum) 828 pending = signal.sigpending() 829 for sig in pending: 830 assert isinstance(sig, signal.Signals), repr(pending) 831 if pending != {signum}: 832 raise Exception('%s != {%s}' % (pending, signum)) 833 try: 834 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 835 except ZeroDivisionError: 836 pass 837 else: 838 raise Exception("ZeroDivisionError not raised") 839 """ 840 assert_python_ok('-c', code) 841 842 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 843 'need signal.pthread_kill()') 844 def test_pthread_kill(self): 845 code = """if 1: 846 import signal 847 import threading 848 import sys 849 850 signum = signal.SIGUSR1 851 852 def handler(signum, frame): 853 1/0 854 855 signal.signal(signum, handler) 856 857 tid = threading.get_ident() 858 try: 859 signal.pthread_kill(tid, signum) 860 except ZeroDivisionError: 861 pass 862 else: 863 raise Exception("ZeroDivisionError not raised") 864 """ 865 assert_python_ok('-c', code) 866 867 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 868 'need signal.pthread_sigmask()') 869 def wait_helper(self, blocked, test): 870 """ 871 test: body of the "def test(signum):" function. 872 blocked: number of the blocked signal 873 """ 874 code = '''if 1: 875 import signal 876 import sys 877 from signal import Signals 878 879 def handler(signum, frame): 880 1/0 881 882 %s 883 884 blocked = %s 885 signum = signal.SIGALRM 886 887 # child: block and wait the signal 888 try: 889 signal.signal(signum, handler) 890 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 891 892 # Do the tests 893 test(signum) 894 895 # The handler must not be called on unblock 896 try: 897 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 898 except ZeroDivisionError: 899 print("the signal handler has been called", 900 file=sys.stderr) 901 sys.exit(1) 902 except BaseException as err: 903 print("error: {}".format(err), file=sys.stderr) 904 sys.stderr.flush() 905 sys.exit(1) 906 ''' % (test.strip(), blocked) 907 908 # sig*wait* must be called with the signal blocked: since the current 909 # process might have several threads running, use a subprocess to have 910 # a single thread. 911 assert_python_ok('-c', code) 912 913 @unittest.skipUnless(hasattr(signal, 'sigwait'), 914 'need signal.sigwait()') 915 def test_sigwait(self): 916 self.wait_helper(signal.SIGALRM, ''' 917 def test(signum): 918 signal.alarm(1) 919 received = signal.sigwait([signum]) 920 assert isinstance(received, signal.Signals), received 921 if received != signum: 922 raise Exception('received %s, not %s' % (received, signum)) 923 ''') 924 925 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 926 'need signal.sigwaitinfo()') 927 def test_sigwaitinfo(self): 928 self.wait_helper(signal.SIGALRM, ''' 929 def test(signum): 930 signal.alarm(1) 931 info = signal.sigwaitinfo([signum]) 932 if info.si_signo != signum: 933 raise Exception("info.si_signo != %s" % signum) 934 ''') 935 936 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 937 'need signal.sigtimedwait()') 938 def test_sigtimedwait(self): 939 self.wait_helper(signal.SIGALRM, ''' 940 def test(signum): 941 signal.alarm(1) 942 info = signal.sigtimedwait([signum], 10.1000) 943 if info.si_signo != signum: 944 raise Exception('info.si_signo != %s' % signum) 945 ''') 946 947 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 948 'need signal.sigtimedwait()') 949 def test_sigtimedwait_poll(self): 950 # check that polling with sigtimedwait works 951 self.wait_helper(signal.SIGALRM, ''' 952 def test(signum): 953 import os 954 os.kill(os.getpid(), signum) 955 info = signal.sigtimedwait([signum], 0) 956 if info.si_signo != signum: 957 raise Exception('info.si_signo != %s' % signum) 958 ''') 959 960 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 961 'need signal.sigtimedwait()') 962 def test_sigtimedwait_timeout(self): 963 self.wait_helper(signal.SIGALRM, ''' 964 def test(signum): 965 received = signal.sigtimedwait([signum], 1.0) 966 if received is not None: 967 raise Exception("received=%r" % (received,)) 968 ''') 969 970 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 971 'need signal.sigtimedwait()') 972 def test_sigtimedwait_negative_timeout(self): 973 signum = signal.SIGALRM 974 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 975 976 @unittest.skipUnless(hasattr(signal, 'sigwait'), 977 'need signal.sigwait()') 978 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 979 'need signal.pthread_sigmask()') 980 def test_sigwait_thread(self): 981 # Check that calling sigwait() from a thread doesn't suspend the whole 982 # process. A new interpreter is spawned to avoid problems when mixing 983 # threads and fork(): only async-safe functions are allowed between 984 # fork() and exec(). 985 assert_python_ok("-c", """if True: 986 import os, threading, sys, time, signal 987 988 # the default handler terminates the process 989 signum = signal.SIGUSR1 990 991 def kill_later(): 992 # wait until the main thread is waiting in sigwait() 993 time.sleep(1) 994 os.kill(os.getpid(), signum) 995 996 # the signal must be blocked by all the threads 997 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 998 killer = threading.Thread(target=kill_later) 999 killer.start() 1000 received = signal.sigwait([signum]) 1001 if received != signum: 1002 print("sigwait() received %s, not %s" % (received, signum), 1003 file=sys.stderr) 1004 sys.exit(1) 1005 killer.join() 1006 # unblock the signal, which should have been cleared by sigwait() 1007 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1008 """) 1009 1010 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1011 'need signal.pthread_sigmask()') 1012 def test_pthread_sigmask_arguments(self): 1013 self.assertRaises(TypeError, signal.pthread_sigmask) 1014 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 1015 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 1016 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 1017 with self.assertRaises(ValueError): 1018 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG]) 1019 with self.assertRaises(ValueError): 1020 signal.pthread_sigmask(signal.SIG_BLOCK, [0]) 1021 with self.assertRaises(ValueError): 1022 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000]) 1023 1024 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1025 'need signal.pthread_sigmask()') 1026 def test_pthread_sigmask_valid_signals(self): 1027 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 1028 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s) 1029 # Get current blocked set 1030 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) 1031 self.assertLessEqual(s, signal.valid_signals()) 1032 1033 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1034 'need signal.pthread_sigmask()') 1035 def test_pthread_sigmask(self): 1036 code = """if 1: 1037 import signal 1038 import os; import threading 1039 1040 def handler(signum, frame): 1041 1/0 1042 1043 def kill(signum): 1044 os.kill(os.getpid(), signum) 1045 1046 def check_mask(mask): 1047 for sig in mask: 1048 assert isinstance(sig, signal.Signals), repr(sig) 1049 1050 def read_sigmask(): 1051 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 1052 check_mask(sigmask) 1053 return sigmask 1054 1055 signum = signal.SIGUSR1 1056 1057 # Install our signal handler 1058 old_handler = signal.signal(signum, handler) 1059 1060 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 1061 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1062 check_mask(old_mask) 1063 try: 1064 kill(signum) 1065 except ZeroDivisionError: 1066 pass 1067 else: 1068 raise Exception("ZeroDivisionError not raised") 1069 1070 # Block and then raise SIGUSR1. The signal is blocked: the signal 1071 # handler is not called, and the signal is now pending 1072 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1073 check_mask(mask) 1074 kill(signum) 1075 1076 # Check the new mask 1077 blocked = read_sigmask() 1078 check_mask(blocked) 1079 if signum not in blocked: 1080 raise Exception("%s not in %s" % (signum, blocked)) 1081 if old_mask ^ blocked != {signum}: 1082 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 1083 1084 # Unblock SIGUSR1 1085 try: 1086 # unblock the pending signal calls immediately the signal handler 1087 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1088 except ZeroDivisionError: 1089 pass 1090 else: 1091 raise Exception("ZeroDivisionError not raised") 1092 try: 1093 kill(signum) 1094 except ZeroDivisionError: 1095 pass 1096 else: 1097 raise Exception("ZeroDivisionError not raised") 1098 1099 # Check the new mask 1100 unblocked = read_sigmask() 1101 if signum in unblocked: 1102 raise Exception("%s in %s" % (signum, unblocked)) 1103 if blocked ^ unblocked != {signum}: 1104 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 1105 if old_mask != unblocked: 1106 raise Exception("%s != %s" % (old_mask, unblocked)) 1107 """ 1108 assert_python_ok('-c', code) 1109 1110 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 1111 'need signal.pthread_kill()') 1112 def test_pthread_kill_main_thread(self): 1113 # Test that a signal can be sent to the main thread with pthread_kill() 1114 # before any other thread has been created (see issue #12392). 1115 code = """if True: 1116 import threading 1117 import signal 1118 import sys 1119 1120 def handler(signum, frame): 1121 sys.exit(3) 1122 1123 signal.signal(signal.SIGUSR1, handler) 1124 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 1125 sys.exit(2) 1126 """ 1127 1128 with spawn_python('-c', code) as process: 1129 stdout, stderr = process.communicate() 1130 exitcode = process.wait() 1131 if exitcode != 3: 1132 raise Exception("Child error (exit code %s): %s" % 1133 (exitcode, stdout)) 1134 1135 1136class StressTest(unittest.TestCase): 1137 """ 1138 Stress signal delivery, especially when a signal arrives in 1139 the middle of recomputing the signal state or executing 1140 previously tripped signal handlers. 1141 """ 1142 1143 def setsig(self, signum, handler): 1144 old_handler = signal.signal(signum, handler) 1145 self.addCleanup(signal.signal, signum, old_handler) 1146 1147 def measure_itimer_resolution(self): 1148 N = 20 1149 times = [] 1150 1151 def handler(signum=None, frame=None): 1152 if len(times) < N: 1153 times.append(time.perf_counter()) 1154 # 1 µs is the smallest possible timer interval, 1155 # we want to measure what the concrete duration 1156 # will be on this platform 1157 signal.setitimer(signal.ITIMER_REAL, 1e-6) 1158 1159 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0) 1160 self.setsig(signal.SIGALRM, handler) 1161 handler() 1162 while len(times) < N: 1163 time.sleep(1e-3) 1164 1165 durations = [times[i+1] - times[i] for i in range(len(times) - 1)] 1166 med = statistics.median(durations) 1167 if support.verbose: 1168 print("detected median itimer() resolution: %.6f s." % (med,)) 1169 return med 1170 1171 def decide_itimer_count(self): 1172 # Some systems have poor setitimer() resolution (for example 1173 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable 1174 # number of sequential timers based on that. 1175 reso = self.measure_itimer_resolution() 1176 if reso <= 1e-4: 1177 return 10000 1178 elif reso <= 1e-2: 1179 return 100 1180 else: 1181 self.skipTest("detected itimer resolution (%.3f s.) too high " 1182 "(> 10 ms.) on this platform (or system too busy)" 1183 % (reso,)) 1184 1185 @unittest.skipUnless(hasattr(signal, "setitimer"), 1186 "test needs setitimer()") 1187 def test_stress_delivery_dependent(self): 1188 """ 1189 This test uses dependent signal handlers. 1190 """ 1191 N = self.decide_itimer_count() 1192 sigs = [] 1193 1194 def first_handler(signum, frame): 1195 # 1e-6 is the minimum non-zero value for `setitimer()`. 1196 # Choose a random delay so as to improve chances of 1197 # triggering a race condition. Ideally the signal is received 1198 # when inside critical signal-handling routines such as 1199 # Py_MakePendingCalls(). 1200 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1201 1202 def second_handler(signum=None, frame=None): 1203 sigs.append(signum) 1204 1205 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both 1206 # ascending and descending sequences (SIGUSR1 then SIGALRM, 1207 # SIGPROF then SIGALRM), we maximize chances of hitting a bug. 1208 self.setsig(signal.SIGPROF, first_handler) 1209 self.setsig(signal.SIGUSR1, first_handler) 1210 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL 1211 1212 expected_sigs = 0 1213 deadline = time.monotonic() + support.SHORT_TIMEOUT 1214 1215 while expected_sigs < N: 1216 os.kill(os.getpid(), signal.SIGPROF) 1217 expected_sigs += 1 1218 # Wait for handlers to run to avoid signal coalescing 1219 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1220 time.sleep(1e-5) 1221 1222 os.kill(os.getpid(), signal.SIGUSR1) 1223 expected_sigs += 1 1224 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1225 time.sleep(1e-5) 1226 1227 # All ITIMER_REAL signals should have been delivered to the 1228 # Python handler 1229 self.assertEqual(len(sigs), N, "Some signals were lost") 1230 1231 @unittest.skipUnless(hasattr(signal, "setitimer"), 1232 "test needs setitimer()") 1233 def test_stress_delivery_simultaneous(self): 1234 """ 1235 This test uses simultaneous signal handlers. 1236 """ 1237 N = self.decide_itimer_count() 1238 sigs = [] 1239 1240 def handler(signum, frame): 1241 sigs.append(signum) 1242 1243 self.setsig(signal.SIGUSR1, handler) 1244 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL 1245 1246 expected_sigs = 0 1247 deadline = time.monotonic() + support.SHORT_TIMEOUT 1248 1249 while expected_sigs < N: 1250 # Hopefully the SIGALRM will be received somewhere during 1251 # initial processing of SIGUSR1. 1252 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1253 os.kill(os.getpid(), signal.SIGUSR1) 1254 1255 expected_sigs += 2 1256 # Wait for handlers to run to avoid signal coalescing 1257 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1258 time.sleep(1e-5) 1259 1260 # All ITIMER_REAL signals should have been delivered to the 1261 # Python handler 1262 self.assertEqual(len(sigs), N, "Some signals were lost") 1263 1264 @unittest.skipUnless(hasattr(signal, "SIGUSR1"), 1265 "test needs SIGUSR1") 1266 def test_stress_modifying_handlers(self): 1267 # bpo-43406: race condition between trip_signal() and signal.signal 1268 signum = signal.SIGUSR1 1269 num_sent_signals = 0 1270 num_received_signals = 0 1271 do_stop = False 1272 1273 def custom_handler(signum, frame): 1274 nonlocal num_received_signals 1275 num_received_signals += 1 1276 1277 def set_interrupts(): 1278 nonlocal num_sent_signals 1279 while not do_stop: 1280 signal.raise_signal(signum) 1281 num_sent_signals += 1 1282 1283 def cycle_handlers(): 1284 while num_sent_signals < 100: 1285 for i in range(20000): 1286 # Cycle between a Python-defined and a non-Python handler 1287 for handler in [custom_handler, signal.SIG_IGN]: 1288 signal.signal(signum, handler) 1289 1290 old_handler = signal.signal(signum, custom_handler) 1291 self.addCleanup(signal.signal, signum, old_handler) 1292 1293 t = threading.Thread(target=set_interrupts) 1294 try: 1295 ignored = False 1296 with support.catch_unraisable_exception() as cm: 1297 t.start() 1298 cycle_handlers() 1299 do_stop = True 1300 t.join() 1301 1302 if cm.unraisable is not None: 1303 # An unraisable exception may be printed out when 1304 # a signal is ignored due to the aforementioned 1305 # race condition, check it. 1306 self.assertIsInstance(cm.unraisable.exc_value, OSError) 1307 self.assertIn( 1308 f"Signal {signum:d} ignored due to race condition", 1309 str(cm.unraisable.exc_value)) 1310 ignored = True 1311 1312 # bpo-43406: Even if it is unlikely, it's technically possible that 1313 # all signals were ignored because of race conditions. 1314 if not ignored: 1315 # Sanity check that some signals were received, but not all 1316 self.assertGreater(num_received_signals, 0) 1317 self.assertLess(num_received_signals, num_sent_signals) 1318 finally: 1319 do_stop = True 1320 t.join() 1321 1322 1323class RaiseSignalTest(unittest.TestCase): 1324 1325 def test_sigint(self): 1326 with self.assertRaises(KeyboardInterrupt): 1327 signal.raise_signal(signal.SIGINT) 1328 1329 @unittest.skipIf(sys.platform != "win32", "Windows specific test") 1330 def test_invalid_argument(self): 1331 try: 1332 SIGHUP = 1 # not supported on win32 1333 signal.raise_signal(SIGHUP) 1334 self.fail("OSError (Invalid argument) expected") 1335 except OSError as e: 1336 if e.errno == errno.EINVAL: 1337 pass 1338 else: 1339 raise 1340 1341 def test_handler(self): 1342 is_ok = False 1343 def handler(a, b): 1344 nonlocal is_ok 1345 is_ok = True 1346 old_signal = signal.signal(signal.SIGINT, handler) 1347 self.addCleanup(signal.signal, signal.SIGINT, old_signal) 1348 1349 signal.raise_signal(signal.SIGINT) 1350 self.assertTrue(is_ok) 1351 1352 1353class PidfdSignalTest(unittest.TestCase): 1354 1355 @unittest.skipUnless( 1356 hasattr(signal, "pidfd_send_signal"), 1357 "pidfd support not built in", 1358 ) 1359 def test_pidfd_send_signal(self): 1360 with self.assertRaises(OSError) as cm: 1361 signal.pidfd_send_signal(0, signal.SIGINT) 1362 if cm.exception.errno == errno.ENOSYS: 1363 self.skipTest("kernel does not support pidfds") 1364 elif cm.exception.errno == errno.EPERM: 1365 self.skipTest("Not enough privileges to use pidfs") 1366 self.assertEqual(cm.exception.errno, errno.EBADF) 1367 my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY) 1368 self.addCleanup(os.close, my_pidfd) 1369 with self.assertRaisesRegex(TypeError, "^siginfo must be None$"): 1370 signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0) 1371 with self.assertRaises(KeyboardInterrupt): 1372 signal.pidfd_send_signal(my_pidfd, signal.SIGINT) 1373 1374def tearDownModule(): 1375 support.reap_children() 1376 1377if __name__ == "__main__": 1378 unittest.main() 1379