1import enum 2import errno 3import functools 4import inspect 5import os 6import random 7import signal 8import socket 9import statistics 10import subprocess 11import sys 12import threading 13import time 14import unittest 15from test import support 16from test.support import ( 17 is_apple, is_apple_mobile, os_helper, threading_helper 18) 19from test.support.script_helper import assert_python_ok, spawn_python 20try: 21 import _testcapi 22except ImportError: 23 _testcapi = None 24 25 26class GenericTests(unittest.TestCase): 27 28 def test_enums(self): 29 for name in dir(signal): 30 sig = getattr(signal, name) 31 if name in {'SIG_DFL', 'SIG_IGN'}: 32 self.assertIsInstance(sig, signal.Handlers) 33 elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 34 self.assertIsInstance(sig, signal.Sigmasks) 35 elif name.startswith('SIG') and not name.startswith('SIG_'): 36 self.assertIsInstance(sig, signal.Signals) 37 elif name.startswith('CTRL_'): 38 self.assertIsInstance(sig, signal.Signals) 39 self.assertEqual(sys.platform, "win32") 40 41 CheckedSignals = enum._old_convert_( 42 enum.IntEnum, 'Signals', 'signal', 43 lambda name: 44 name.isupper() 45 and (name.startswith('SIG') and not name.startswith('SIG_')) 46 or name.startswith('CTRL_'), 47 source=signal, 48 ) 49 enum._test_simple_enum(CheckedSignals, signal.Signals) 50 51 CheckedHandlers = enum._old_convert_( 52 enum.IntEnum, 'Handlers', 'signal', 53 lambda name: name in ('SIG_DFL', 'SIG_IGN'), 54 source=signal, 55 ) 56 enum._test_simple_enum(CheckedHandlers, signal.Handlers) 57 58 Sigmasks = getattr(signal, 'Sigmasks', None) 59 if Sigmasks is not None: 60 CheckedSigmasks = enum._old_convert_( 61 enum.IntEnum, 'Sigmasks', 'signal', 62 lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'), 63 source=signal, 64 ) 65 enum._test_simple_enum(CheckedSigmasks, Sigmasks) 66 67 def test_functions_module_attr(self): 68 # Issue #27718: If __all__ is not defined all non-builtin functions 69 # should have correct __module__ to be displayed by pydoc. 70 for name in dir(signal): 71 value = getattr(signal, name) 72 if inspect.isroutine(value) and not inspect.isbuiltin(value): 73 self.assertEqual(value.__module__, 'signal') 74 75 76@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 77class PosixTests(unittest.TestCase): 78 def trivial_signal_handler(self, *args): 79 pass 80 81 def create_handler_with_partial(self, argument): 82 return functools.partial(self.trivial_signal_handler, argument) 83 84 def test_out_of_range_signal_number_raises_error(self): 85 self.assertRaises(ValueError, signal.getsignal, 4242) 86 87 self.assertRaises(ValueError, signal.signal, 4242, 88 self.trivial_signal_handler) 89 90 self.assertRaises(ValueError, signal.strsignal, 4242) 91 92 def test_setting_signal_handler_to_none_raises_error(self): 93 self.assertRaises(TypeError, signal.signal, 94 signal.SIGUSR1, None) 95 96 def test_getsignal(self): 97 hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 98 self.assertIsInstance(hup, signal.Handlers) 99 self.assertEqual(signal.getsignal(signal.SIGHUP), 100 self.trivial_signal_handler) 101 signal.signal(signal.SIGHUP, hup) 102 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 103 104 def test_no_repr_is_called_on_signal_handler(self): 105 # See https://github.com/python/cpython/issues/112559. 106 107 class MyArgument: 108 def __init__(self): 109 self.repr_count = 0 110 111 def __repr__(self): 112 self.repr_count += 1 113 return super().__repr__() 114 115 argument = MyArgument() 116 self.assertEqual(0, argument.repr_count) 117 118 handler = self.create_handler_with_partial(argument) 119 hup = signal.signal(signal.SIGHUP, handler) 120 self.assertIsInstance(hup, signal.Handlers) 121 self.assertEqual(signal.getsignal(signal.SIGHUP), handler) 122 signal.signal(signal.SIGHUP, hup) 123 self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 124 self.assertEqual(0, argument.repr_count) 125 126 @unittest.skipIf(sys.platform.startswith("netbsd"), 127 "gh-124083: strsignal is not supported on NetBSD") 128 def test_strsignal(self): 129 self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) 130 self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) 131 self.assertIn("Hangup", signal.strsignal(signal.SIGHUP)) 132 133 # Issue 3864, unknown if this affects earlier versions of freebsd also 134 def test_interprocess_signal(self): 135 dirname = os.path.dirname(__file__) 136 script = os.path.join(dirname, 'signalinterproctester.py') 137 assert_python_ok(script) 138 139 @unittest.skipUnless( 140 hasattr(signal, "valid_signals"), 141 "requires signal.valid_signals" 142 ) 143 def test_valid_signals(self): 144 s = signal.valid_signals() 145 self.assertIsInstance(s, set) 146 self.assertIn(signal.Signals.SIGINT, s) 147 self.assertIn(signal.Signals.SIGALRM, s) 148 self.assertNotIn(0, s) 149 self.assertNotIn(signal.NSIG, s) 150 self.assertLess(len(s), signal.NSIG) 151 152 # gh-91145: Make sure that all SIGxxx constants exposed by the Python 153 # signal module have a number in the [0; signal.NSIG-1] range. 154 for name in dir(signal): 155 if not name.startswith("SIG"): 156 continue 157 if name in {"SIG_IGN", "SIG_DFL"}: 158 # SIG_IGN and SIG_DFL are pointers 159 continue 160 with self.subTest(name=name): 161 signum = getattr(signal, name) 162 self.assertGreaterEqual(signum, 0) 163 self.assertLess(signum, signal.NSIG) 164 165 @unittest.skipUnless(sys.executable, "sys.executable required.") 166 @support.requires_subprocess() 167 def test_keyboard_interrupt_exit_code(self): 168 """KeyboardInterrupt triggers exit via SIGINT.""" 169 process = subprocess.run( 170 [sys.executable, "-c", 171 "import os, signal, time\n" 172 "os.kill(os.getpid(), signal.SIGINT)\n" 173 "for _ in range(999): time.sleep(0.01)"], 174 stderr=subprocess.PIPE) 175 self.assertIn(b"KeyboardInterrupt", process.stderr) 176 self.assertEqual(process.returncode, -signal.SIGINT) 177 # Caveat: The exit code is insufficient to guarantee we actually died 178 # via a signal. POSIX shells do more than look at the 8 bit value. 179 # Writing an automation friendly test of an interactive shell 180 # to confirm that our process died via a SIGINT proved too complex. 181 182 183@unittest.skipUnless(sys.platform == "win32", "Windows specific") 184class WindowsSignalTests(unittest.TestCase): 185 186 def test_valid_signals(self): 187 s = signal.valid_signals() 188 self.assertIsInstance(s, set) 189 self.assertGreaterEqual(len(s), 6) 190 self.assertIn(signal.Signals.SIGINT, s) 191 self.assertNotIn(0, s) 192 self.assertNotIn(signal.NSIG, s) 193 self.assertLess(len(s), signal.NSIG) 194 195 def test_issue9324(self): 196 # Updated for issue #10003, adding SIGBREAK 197 handler = lambda x, y: None 198 checked = set() 199 for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 200 signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 201 signal.SIGTERM): 202 # Set and then reset a handler for signals that work on windows. 203 # Issue #18396, only for signals without a C-level handler. 204 if signal.getsignal(sig) is not None: 205 signal.signal(sig, signal.signal(sig, handler)) 206 checked.add(sig) 207 # Issue #18396: Ensure the above loop at least tested *something* 208 self.assertTrue(checked) 209 210 with self.assertRaises(ValueError): 211 signal.signal(-1, handler) 212 213 with self.assertRaises(ValueError): 214 signal.signal(7, handler) 215 216 @unittest.skipUnless(sys.executable, "sys.executable required.") 217 @support.requires_subprocess() 218 def test_keyboard_interrupt_exit_code(self): 219 """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.""" 220 # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here 221 # as that requires setting up a console control handler in a child 222 # in its own process group. Doable, but quite complicated. (see 223 # @eryksun on https://github.com/python/cpython/pull/11862) 224 process = subprocess.run( 225 [sys.executable, "-c", "raise KeyboardInterrupt"], 226 stderr=subprocess.PIPE) 227 self.assertIn(b"KeyboardInterrupt", process.stderr) 228 STATUS_CONTROL_C_EXIT = 0xC000013A 229 self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT) 230 231 232class WakeupFDTests(unittest.TestCase): 233 234 def test_invalid_call(self): 235 # First parameter is positional-only 236 with self.assertRaises(TypeError): 237 signal.set_wakeup_fd(signum=signal.SIGINT) 238 239 # warn_on_full_buffer is a keyword-only parameter 240 with self.assertRaises(TypeError): 241 signal.set_wakeup_fd(signal.SIGINT, False) 242 243 def test_invalid_fd(self): 244 fd = os_helper.make_bad_fd() 245 self.assertRaises((ValueError, OSError), 246 signal.set_wakeup_fd, fd) 247 248 @unittest.skipUnless(support.has_socket_support, "needs working sockets.") 249 def test_invalid_socket(self): 250 sock = socket.socket() 251 fd = sock.fileno() 252 sock.close() 253 self.assertRaises((ValueError, OSError), 254 signal.set_wakeup_fd, fd) 255 256 # Emscripten does not support fstat on pipes yet. 257 # https://github.com/emscripten-core/emscripten/issues/16414 258 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 259 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 260 def test_set_wakeup_fd_result(self): 261 r1, w1 = os.pipe() 262 self.addCleanup(os.close, r1) 263 self.addCleanup(os.close, w1) 264 r2, w2 = os.pipe() 265 self.addCleanup(os.close, r2) 266 self.addCleanup(os.close, w2) 267 268 if hasattr(os, 'set_blocking'): 269 os.set_blocking(w1, False) 270 os.set_blocking(w2, False) 271 272 signal.set_wakeup_fd(w1) 273 self.assertEqual(signal.set_wakeup_fd(w2), w1) 274 self.assertEqual(signal.set_wakeup_fd(-1), w2) 275 self.assertEqual(signal.set_wakeup_fd(-1), -1) 276 277 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 278 @unittest.skipUnless(support.has_socket_support, "needs working sockets.") 279 def test_set_wakeup_fd_socket_result(self): 280 sock1 = socket.socket() 281 self.addCleanup(sock1.close) 282 sock1.setblocking(False) 283 fd1 = sock1.fileno() 284 285 sock2 = socket.socket() 286 self.addCleanup(sock2.close) 287 sock2.setblocking(False) 288 fd2 = sock2.fileno() 289 290 signal.set_wakeup_fd(fd1) 291 self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 292 self.assertEqual(signal.set_wakeup_fd(-1), fd2) 293 self.assertEqual(signal.set_wakeup_fd(-1), -1) 294 295 # On Windows, files are always blocking and Windows does not provide a 296 # function to test if a socket is in non-blocking mode. 297 @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 298 @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 299 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 300 def test_set_wakeup_fd_blocking(self): 301 rfd, wfd = os.pipe() 302 self.addCleanup(os.close, rfd) 303 self.addCleanup(os.close, wfd) 304 305 # fd must be non-blocking 306 os.set_blocking(wfd, True) 307 with self.assertRaises(ValueError) as cm: 308 signal.set_wakeup_fd(wfd) 309 self.assertEqual(str(cm.exception), 310 "the fd %s must be in non-blocking mode" % wfd) 311 312 # non-blocking is ok 313 os.set_blocking(wfd, False) 314 signal.set_wakeup_fd(wfd) 315 signal.set_wakeup_fd(-1) 316 317 318@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 319class WakeupSignalTests(unittest.TestCase): 320 @unittest.skipIf(_testcapi is None, 'need _testcapi') 321 def check_wakeup(self, test_body, *signals, ordered=True): 322 # use a subprocess to have only one thread 323 code = """if 1: 324 import _testcapi 325 import os 326 import signal 327 import struct 328 329 signals = {!r} 330 331 def handler(signum, frame): 332 pass 333 334 def check_signum(signals): 335 data = os.read(read, len(signals)+1) 336 raised = struct.unpack('%uB' % len(data), data) 337 if not {!r}: 338 raised = set(raised) 339 signals = set(signals) 340 if raised != signals: 341 raise Exception("%r != %r" % (raised, signals)) 342 343 {} 344 345 signal.signal(signal.SIGALRM, handler) 346 read, write = os.pipe() 347 os.set_blocking(write, False) 348 signal.set_wakeup_fd(write) 349 350 test() 351 check_signum(signals) 352 353 os.close(read) 354 os.close(write) 355 """.format(tuple(map(int, signals)), ordered, test_body) 356 357 assert_python_ok('-c', code) 358 359 @unittest.skipIf(_testcapi is None, 'need _testcapi') 360 @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 361 def test_wakeup_write_error(self): 362 # Issue #16105: write() errors in the C signal handler should not 363 # pass silently. 364 # Use a subprocess to have only one thread. 365 code = """if 1: 366 import _testcapi 367 import errno 368 import os 369 import signal 370 import sys 371 from test.support import captured_stderr 372 373 def handler(signum, frame): 374 1/0 375 376 signal.signal(signal.SIGALRM, handler) 377 r, w = os.pipe() 378 os.set_blocking(r, False) 379 380 # Set wakeup_fd a read-only file descriptor to trigger the error 381 signal.set_wakeup_fd(r) 382 try: 383 with captured_stderr() as err: 384 signal.raise_signal(signal.SIGALRM) 385 except ZeroDivisionError: 386 # An ignored exception should have been printed out on stderr 387 err = err.getvalue() 388 if ('Exception ignored when trying to write to the signal wakeup fd' 389 not in err): 390 raise AssertionError(err) 391 if ('OSError: [Errno %d]' % errno.EBADF) not in err: 392 raise AssertionError(err) 393 else: 394 raise AssertionError("ZeroDivisionError not raised") 395 396 os.close(r) 397 os.close(w) 398 """ 399 r, w = os.pipe() 400 try: 401 os.write(r, b'x') 402 except OSError: 403 pass 404 else: 405 self.skipTest("OS doesn't report write() error on the read end of a pipe") 406 finally: 407 os.close(r) 408 os.close(w) 409 410 assert_python_ok('-c', code) 411 412 def test_wakeup_fd_early(self): 413 self.check_wakeup("""def test(): 414 import select 415 import time 416 417 TIMEOUT_FULL = 10 418 TIMEOUT_HALF = 5 419 420 class InterruptSelect(Exception): 421 pass 422 423 def handler(signum, frame): 424 raise InterruptSelect 425 signal.signal(signal.SIGALRM, handler) 426 427 signal.alarm(1) 428 429 # We attempt to get a signal during the sleep, 430 # before select is called 431 try: 432 select.select([], [], [], TIMEOUT_FULL) 433 except InterruptSelect: 434 pass 435 else: 436 raise Exception("select() was not interrupted") 437 438 before_time = time.monotonic() 439 select.select([read], [], [], TIMEOUT_FULL) 440 after_time = time.monotonic() 441 dt = after_time - before_time 442 if dt >= TIMEOUT_HALF: 443 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 444 """, signal.SIGALRM) 445 446 def test_wakeup_fd_during(self): 447 self.check_wakeup("""def test(): 448 import select 449 import time 450 451 TIMEOUT_FULL = 10 452 TIMEOUT_HALF = 5 453 454 class InterruptSelect(Exception): 455 pass 456 457 def handler(signum, frame): 458 raise InterruptSelect 459 signal.signal(signal.SIGALRM, handler) 460 461 signal.alarm(1) 462 before_time = time.monotonic() 463 # We attempt to get a signal during the select call 464 try: 465 select.select([read], [], [], TIMEOUT_FULL) 466 except InterruptSelect: 467 pass 468 else: 469 raise Exception("select() was not interrupted") 470 after_time = time.monotonic() 471 dt = after_time - before_time 472 if dt >= TIMEOUT_HALF: 473 raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 474 """, signal.SIGALRM) 475 476 def test_signum(self): 477 self.check_wakeup("""def test(): 478 signal.signal(signal.SIGUSR1, handler) 479 signal.raise_signal(signal.SIGUSR1) 480 signal.raise_signal(signal.SIGALRM) 481 """, signal.SIGUSR1, signal.SIGALRM) 482 483 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 484 'need signal.pthread_sigmask()') 485 def test_pending(self): 486 self.check_wakeup("""def test(): 487 signum1 = signal.SIGUSR1 488 signum2 = signal.SIGUSR2 489 490 signal.signal(signum1, handler) 491 signal.signal(signum2, handler) 492 493 signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 494 signal.raise_signal(signum1) 495 signal.raise_signal(signum2) 496 # Unblocking the 2 signals calls the C signal handler twice 497 signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 498 """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 499 500 501@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 502class WakeupSocketSignalTests(unittest.TestCase): 503 504 @unittest.skipIf(_testcapi is None, 'need _testcapi') 505 def test_socket(self): 506 # use a subprocess to have only one thread 507 code = """if 1: 508 import signal 509 import socket 510 import struct 511 import _testcapi 512 513 signum = signal.SIGINT 514 signals = (signum,) 515 516 def handler(signum, frame): 517 pass 518 519 signal.signal(signum, handler) 520 521 read, write = socket.socketpair() 522 write.setblocking(False) 523 signal.set_wakeup_fd(write.fileno()) 524 525 signal.raise_signal(signum) 526 527 data = read.recv(1) 528 if not data: 529 raise Exception("no signum written") 530 raised = struct.unpack('B', data) 531 if raised != signals: 532 raise Exception("%r != %r" % (raised, signals)) 533 534 read.close() 535 write.close() 536 """ 537 538 assert_python_ok('-c', code) 539 540 @unittest.skipIf(_testcapi is None, 'need _testcapi') 541 def test_send_error(self): 542 # Use a subprocess to have only one thread. 543 if os.name == 'nt': 544 action = 'send' 545 else: 546 action = 'write' 547 code = """if 1: 548 import errno 549 import signal 550 import socket 551 import sys 552 import time 553 import _testcapi 554 from test.support import captured_stderr 555 556 signum = signal.SIGINT 557 558 def handler(signum, frame): 559 pass 560 561 signal.signal(signum, handler) 562 563 read, write = socket.socketpair() 564 read.setblocking(False) 565 write.setblocking(False) 566 567 signal.set_wakeup_fd(write.fileno()) 568 569 # Close sockets: send() will fail 570 read.close() 571 write.close() 572 573 with captured_stderr() as err: 574 signal.raise_signal(signum) 575 576 err = err.getvalue() 577 if ('Exception ignored when trying to {action} to the signal wakeup fd' 578 not in err): 579 raise AssertionError(err) 580 """.format(action=action) 581 assert_python_ok('-c', code) 582 583 @unittest.skipIf(_testcapi is None, 'need _testcapi') 584 def test_warn_on_full_buffer(self): 585 # Use a subprocess to have only one thread. 586 if os.name == 'nt': 587 action = 'send' 588 else: 589 action = 'write' 590 code = """if 1: 591 import errno 592 import signal 593 import socket 594 import sys 595 import time 596 import _testcapi 597 from test.support import captured_stderr 598 599 signum = signal.SIGINT 600 601 # This handler will be called, but we intentionally won't read from 602 # the wakeup fd. 603 def handler(signum, frame): 604 pass 605 606 signal.signal(signum, handler) 607 608 read, write = socket.socketpair() 609 610 # Fill the socketpair buffer 611 if sys.platform == 'win32': 612 # bpo-34130: On Windows, sometimes non-blocking send fails to fill 613 # the full socketpair buffer, so use a timeout of 50 ms instead. 614 write.settimeout(0.050) 615 else: 616 write.setblocking(False) 617 618 written = 0 619 if sys.platform == "vxworks": 620 CHUNK_SIZES = (1,) 621 else: 622 # Start with large chunk size to reduce the 623 # number of send needed to fill the buffer. 624 CHUNK_SIZES = (2 ** 16, 2 ** 8, 1) 625 for chunk_size in CHUNK_SIZES: 626 chunk = b"x" * chunk_size 627 try: 628 while True: 629 write.send(chunk) 630 written += chunk_size 631 except (BlockingIOError, TimeoutError): 632 pass 633 634 print(f"%s bytes written into the socketpair" % written, flush=True) 635 636 write.setblocking(False) 637 try: 638 write.send(b"x") 639 except BlockingIOError: 640 # The socketpair buffer seems full 641 pass 642 else: 643 raise AssertionError("%s bytes failed to fill the socketpair " 644 "buffer" % written) 645 646 # By default, we get a warning when a signal arrives 647 msg = ('Exception ignored when trying to {action} ' 648 'to the signal wakeup fd') 649 signal.set_wakeup_fd(write.fileno()) 650 651 with captured_stderr() as err: 652 signal.raise_signal(signum) 653 654 err = err.getvalue() 655 if msg not in err: 656 raise AssertionError("first set_wakeup_fd() test failed, " 657 "stderr: %r" % err) 658 659 # And also if warn_on_full_buffer=True 660 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) 661 662 with captured_stderr() as err: 663 signal.raise_signal(signum) 664 665 err = err.getvalue() 666 if msg not in err: 667 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " 668 "test failed, stderr: %r" % err) 669 670 # But not if warn_on_full_buffer=False 671 signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) 672 673 with captured_stderr() as err: 674 signal.raise_signal(signum) 675 676 err = err.getvalue() 677 if err != "": 678 raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " 679 "test failed, stderr: %r" % err) 680 681 # And then check the default again, to make sure warn_on_full_buffer 682 # settings don't leak across calls. 683 signal.set_wakeup_fd(write.fileno()) 684 685 with captured_stderr() as err: 686 signal.raise_signal(signum) 687 688 err = err.getvalue() 689 if msg not in err: 690 raise AssertionError("second set_wakeup_fd() test failed, " 691 "stderr: %r" % err) 692 693 """.format(action=action) 694 assert_python_ok('-c', code) 695 696 697@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 698@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()") 699@support.requires_subprocess() 700@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 701class SiginterruptTest(unittest.TestCase): 702 703 def readpipe_interrupted(self, interrupt, timeout=support.SHORT_TIMEOUT): 704 """Perform a read during which a signal will arrive. Return True if the 705 read is interrupted by the signal and raises an exception. Return False 706 if it returns normally. 707 """ 708 # use a subprocess to have only one thread, to have a timeout on the 709 # blocking read and to not touch signal handling in this process 710 code = """if 1: 711 import errno 712 import os 713 import signal 714 import sys 715 716 interrupt = %r 717 r, w = os.pipe() 718 719 def handler(signum, frame): 720 1 / 0 721 722 signal.signal(signal.SIGALRM, handler) 723 if interrupt is not None: 724 signal.siginterrupt(signal.SIGALRM, interrupt) 725 726 print("ready") 727 sys.stdout.flush() 728 729 # run the test twice 730 try: 731 for loop in range(2): 732 # send a SIGALRM in a second (during the read) 733 signal.alarm(1) 734 try: 735 # blocking call: read from a pipe without data 736 os.read(r, 1) 737 except ZeroDivisionError: 738 pass 739 else: 740 sys.exit(2) 741 sys.exit(3) 742 finally: 743 os.close(r) 744 os.close(w) 745 """ % (interrupt,) 746 with spawn_python('-c', code) as process: 747 try: 748 # wait until the child process is loaded and has started 749 first_line = process.stdout.readline() 750 751 stdout, stderr = process.communicate(timeout=timeout) 752 except subprocess.TimeoutExpired: 753 process.kill() 754 return False 755 else: 756 stdout = first_line + stdout 757 exitcode = process.wait() 758 if exitcode not in (2, 3): 759 raise Exception("Child error (exit code %s): %r" 760 % (exitcode, stdout)) 761 return (exitcode == 3) 762 763 def test_without_siginterrupt(self): 764 # If a signal handler is installed and siginterrupt is not called 765 # at all, when that signal arrives, it interrupts a syscall that's in 766 # progress. 767 interrupted = self.readpipe_interrupted(None) 768 self.assertTrue(interrupted) 769 770 def test_siginterrupt_on(self): 771 # If a signal handler is installed and siginterrupt is called with 772 # a true value for the second argument, when that signal arrives, it 773 # interrupts a syscall that's in progress. 774 interrupted = self.readpipe_interrupted(True) 775 self.assertTrue(interrupted) 776 777 @support.requires_resource('walltime') 778 def test_siginterrupt_off(self): 779 # If a signal handler is installed and siginterrupt is called with 780 # a false value for the second argument, when that signal arrives, it 781 # does not interrupt a syscall that's in progress. 782 interrupted = self.readpipe_interrupted(False, timeout=2) 783 self.assertFalse(interrupted) 784 785 786@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 787@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'), 788 "needs signal.getitimer() and signal.setitimer()") 789class ItimerTest(unittest.TestCase): 790 def setUp(self): 791 self.hndl_called = False 792 self.hndl_count = 0 793 self.itimer = None 794 self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 795 796 def tearDown(self): 797 signal.signal(signal.SIGALRM, self.old_alarm) 798 if self.itimer is not None: # test_itimer_exc doesn't change this attr 799 # just ensure that itimer is stopped 800 signal.setitimer(self.itimer, 0) 801 802 def sig_alrm(self, *args): 803 self.hndl_called = True 804 805 def sig_vtalrm(self, *args): 806 self.hndl_called = True 807 808 if self.hndl_count > 3: 809 # it shouldn't be here, because it should have been disabled. 810 raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 811 "timer.") 812 elif self.hndl_count == 3: 813 # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 814 signal.setitimer(signal.ITIMER_VIRTUAL, 0) 815 816 self.hndl_count += 1 817 818 def sig_prof(self, *args): 819 self.hndl_called = True 820 signal.setitimer(signal.ITIMER_PROF, 0) 821 822 def test_itimer_exc(self): 823 # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 824 # defines it ? 825 self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 826 # Negative times are treated as zero on some platforms. 827 if 0: 828 self.assertRaises(signal.ItimerError, 829 signal.setitimer, signal.ITIMER_REAL, -1) 830 831 def test_itimer_real(self): 832 self.itimer = signal.ITIMER_REAL 833 signal.setitimer(self.itimer, 1.0) 834 signal.pause() 835 self.assertEqual(self.hndl_called, True) 836 837 # Issue 3864, unknown if this affects earlier versions of freebsd also 838 @unittest.skipIf(sys.platform in ('netbsd5',) or is_apple_mobile, 839 'itimer not reliable (does not mix well with threading) on some BSDs.') 840 def test_itimer_virtual(self): 841 self.itimer = signal.ITIMER_VIRTUAL 842 signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 843 signal.setitimer(self.itimer, 0.3, 0.2) 844 845 for _ in support.busy_retry(support.LONG_TIMEOUT): 846 # use up some virtual time by doing real work 847 _ = pow(12345, 67890, 10000019) 848 if signal.getitimer(self.itimer) == (0.0, 0.0): 849 # sig_vtalrm handler stopped this itimer 850 break 851 852 # virtual itimer should be (0.0, 0.0) now 853 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 854 # and the handler should have been called 855 self.assertEqual(self.hndl_called, True) 856 857 def test_itimer_prof(self): 858 self.itimer = signal.ITIMER_PROF 859 signal.signal(signal.SIGPROF, self.sig_prof) 860 signal.setitimer(self.itimer, 0.2, 0.2) 861 862 for _ in support.busy_retry(support.LONG_TIMEOUT): 863 # do some work 864 _ = pow(12345, 67890, 10000019) 865 if signal.getitimer(self.itimer) == (0.0, 0.0): 866 # sig_prof handler stopped this itimer 867 break 868 869 # profiling itimer should be (0.0, 0.0) now 870 self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 871 # and the handler should have been called 872 self.assertEqual(self.hndl_called, True) 873 874 def test_setitimer_tiny(self): 875 # bpo-30807: C setitimer() takes a microsecond-resolution interval. 876 # Check that float -> timeval conversion doesn't round 877 # the interval down to zero, which would disable the timer. 878 self.itimer = signal.ITIMER_REAL 879 signal.setitimer(self.itimer, 1e-6) 880 time.sleep(1) 881 self.assertEqual(self.hndl_called, True) 882 883 884class PendingSignalsTests(unittest.TestCase): 885 """ 886 Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 887 functions. 888 """ 889 @unittest.skipUnless(hasattr(signal, 'sigpending'), 890 'need signal.sigpending()') 891 def test_sigpending_empty(self): 892 self.assertEqual(signal.sigpending(), set()) 893 894 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 895 'need signal.pthread_sigmask()') 896 @unittest.skipUnless(hasattr(signal, 'sigpending'), 897 'need signal.sigpending()') 898 def test_sigpending(self): 899 code = """if 1: 900 import os 901 import signal 902 903 def handler(signum, frame): 904 1/0 905 906 signum = signal.SIGUSR1 907 signal.signal(signum, handler) 908 909 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 910 os.kill(os.getpid(), signum) 911 pending = signal.sigpending() 912 for sig in pending: 913 assert isinstance(sig, signal.Signals), repr(pending) 914 if pending != {signum}: 915 raise Exception('%s != {%s}' % (pending, signum)) 916 try: 917 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 918 except ZeroDivisionError: 919 pass 920 else: 921 raise Exception("ZeroDivisionError not raised") 922 """ 923 assert_python_ok('-c', code) 924 925 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 926 'need signal.pthread_kill()') 927 @threading_helper.requires_working_threading() 928 def test_pthread_kill(self): 929 code = """if 1: 930 import signal 931 import threading 932 import sys 933 934 signum = signal.SIGUSR1 935 936 def handler(signum, frame): 937 1/0 938 939 signal.signal(signum, handler) 940 941 tid = threading.get_ident() 942 try: 943 signal.pthread_kill(tid, signum) 944 except ZeroDivisionError: 945 pass 946 else: 947 raise Exception("ZeroDivisionError not raised") 948 """ 949 assert_python_ok('-c', code) 950 951 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 952 'need signal.pthread_sigmask()') 953 def wait_helper(self, blocked, test): 954 """ 955 test: body of the "def test(signum):" function. 956 blocked: number of the blocked signal 957 """ 958 code = '''if 1: 959 import signal 960 import sys 961 from signal import Signals 962 963 def handler(signum, frame): 964 1/0 965 966 %s 967 968 blocked = %s 969 signum = signal.SIGALRM 970 971 # child: block and wait the signal 972 try: 973 signal.signal(signum, handler) 974 signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 975 976 # Do the tests 977 test(signum) 978 979 # The handler must not be called on unblock 980 try: 981 signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 982 except ZeroDivisionError: 983 print("the signal handler has been called", 984 file=sys.stderr) 985 sys.exit(1) 986 except BaseException as err: 987 print("error: {}".format(err), file=sys.stderr) 988 sys.stderr.flush() 989 sys.exit(1) 990 ''' % (test.strip(), blocked) 991 992 # sig*wait* must be called with the signal blocked: since the current 993 # process might have several threads running, use a subprocess to have 994 # a single thread. 995 assert_python_ok('-c', code) 996 997 @unittest.skipUnless(hasattr(signal, 'sigwait'), 998 'need signal.sigwait()') 999 def test_sigwait(self): 1000 self.wait_helper(signal.SIGALRM, ''' 1001 def test(signum): 1002 signal.alarm(1) 1003 received = signal.sigwait([signum]) 1004 assert isinstance(received, signal.Signals), received 1005 if received != signum: 1006 raise Exception('received %s, not %s' % (received, signum)) 1007 ''') 1008 1009 @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 1010 'need signal.sigwaitinfo()') 1011 def test_sigwaitinfo(self): 1012 self.wait_helper(signal.SIGALRM, ''' 1013 def test(signum): 1014 signal.alarm(1) 1015 info = signal.sigwaitinfo([signum]) 1016 if info.si_signo != signum: 1017 raise Exception("info.si_signo != %s" % signum) 1018 ''') 1019 1020 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1021 'need signal.sigtimedwait()') 1022 def test_sigtimedwait(self): 1023 self.wait_helper(signal.SIGALRM, ''' 1024 def test(signum): 1025 signal.alarm(1) 1026 info = signal.sigtimedwait([signum], 10.1000) 1027 if info.si_signo != signum: 1028 raise Exception('info.si_signo != %s' % signum) 1029 ''') 1030 1031 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1032 'need signal.sigtimedwait()') 1033 def test_sigtimedwait_poll(self): 1034 # check that polling with sigtimedwait works 1035 self.wait_helper(signal.SIGALRM, ''' 1036 def test(signum): 1037 import os 1038 os.kill(os.getpid(), signum) 1039 info = signal.sigtimedwait([signum], 0) 1040 if info.si_signo != signum: 1041 raise Exception('info.si_signo != %s' % signum) 1042 ''') 1043 1044 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1045 'need signal.sigtimedwait()') 1046 def test_sigtimedwait_timeout(self): 1047 self.wait_helper(signal.SIGALRM, ''' 1048 def test(signum): 1049 received = signal.sigtimedwait([signum], 1.0) 1050 if received is not None: 1051 raise Exception("received=%r" % (received,)) 1052 ''') 1053 1054 @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 1055 'need signal.sigtimedwait()') 1056 def test_sigtimedwait_negative_timeout(self): 1057 signum = signal.SIGALRM 1058 self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 1059 1060 @unittest.skipUnless(hasattr(signal, 'sigwait'), 1061 'need signal.sigwait()') 1062 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1063 'need signal.pthread_sigmask()') 1064 @threading_helper.requires_working_threading() 1065 def test_sigwait_thread(self): 1066 # Check that calling sigwait() from a thread doesn't suspend the whole 1067 # process. A new interpreter is spawned to avoid problems when mixing 1068 # threads and fork(): only async-safe functions are allowed between 1069 # fork() and exec(). 1070 assert_python_ok("-c", """if True: 1071 import os, threading, sys, time, signal 1072 1073 # the default handler terminates the process 1074 signum = signal.SIGUSR1 1075 1076 def kill_later(): 1077 # wait until the main thread is waiting in sigwait() 1078 time.sleep(1) 1079 os.kill(os.getpid(), signum) 1080 1081 # the signal must be blocked by all the threads 1082 signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1083 killer = threading.Thread(target=kill_later) 1084 killer.start() 1085 received = signal.sigwait([signum]) 1086 if received != signum: 1087 print("sigwait() received %s, not %s" % (received, signum), 1088 file=sys.stderr) 1089 sys.exit(1) 1090 killer.join() 1091 # unblock the signal, which should have been cleared by sigwait() 1092 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1093 """) 1094 1095 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1096 'need signal.pthread_sigmask()') 1097 def test_pthread_sigmask_arguments(self): 1098 self.assertRaises(TypeError, signal.pthread_sigmask) 1099 self.assertRaises(TypeError, signal.pthread_sigmask, 1) 1100 self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 1101 self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 1102 with self.assertRaises(ValueError): 1103 signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG]) 1104 with self.assertRaises(ValueError): 1105 signal.pthread_sigmask(signal.SIG_BLOCK, [0]) 1106 with self.assertRaises(ValueError): 1107 signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000]) 1108 1109 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1110 'need signal.pthread_sigmask()') 1111 def test_pthread_sigmask_valid_signals(self): 1112 s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 1113 self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s) 1114 # Get current blocked set 1115 s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) 1116 self.assertLessEqual(s, signal.valid_signals()) 1117 1118 @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 1119 'need signal.pthread_sigmask()') 1120 @threading_helper.requires_working_threading() 1121 def test_pthread_sigmask(self): 1122 code = """if 1: 1123 import signal 1124 import os; import threading 1125 1126 def handler(signum, frame): 1127 1/0 1128 1129 def kill(signum): 1130 os.kill(os.getpid(), signum) 1131 1132 def check_mask(mask): 1133 for sig in mask: 1134 assert isinstance(sig, signal.Signals), repr(sig) 1135 1136 def read_sigmask(): 1137 sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 1138 check_mask(sigmask) 1139 return sigmask 1140 1141 signum = signal.SIGUSR1 1142 1143 # Install our signal handler 1144 old_handler = signal.signal(signum, handler) 1145 1146 # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 1147 old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1148 check_mask(old_mask) 1149 try: 1150 kill(signum) 1151 except ZeroDivisionError: 1152 pass 1153 else: 1154 raise Exception("ZeroDivisionError not raised") 1155 1156 # Block and then raise SIGUSR1. The signal is blocked: the signal 1157 # handler is not called, and the signal is now pending 1158 mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 1159 check_mask(mask) 1160 kill(signum) 1161 1162 # Check the new mask 1163 blocked = read_sigmask() 1164 check_mask(blocked) 1165 if signum not in blocked: 1166 raise Exception("%s not in %s" % (signum, blocked)) 1167 if old_mask ^ blocked != {signum}: 1168 raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 1169 1170 # Unblock SIGUSR1 1171 try: 1172 # unblock the pending signal calls immediately the signal handler 1173 signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 1174 except ZeroDivisionError: 1175 pass 1176 else: 1177 raise Exception("ZeroDivisionError not raised") 1178 try: 1179 kill(signum) 1180 except ZeroDivisionError: 1181 pass 1182 else: 1183 raise Exception("ZeroDivisionError not raised") 1184 1185 # Check the new mask 1186 unblocked = read_sigmask() 1187 if signum in unblocked: 1188 raise Exception("%s in %s" % (signum, unblocked)) 1189 if blocked ^ unblocked != {signum}: 1190 raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 1191 if old_mask != unblocked: 1192 raise Exception("%s != %s" % (old_mask, unblocked)) 1193 """ 1194 assert_python_ok('-c', code) 1195 1196 @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 1197 'need signal.pthread_kill()') 1198 @threading_helper.requires_working_threading() 1199 def test_pthread_kill_main_thread(self): 1200 # Test that a signal can be sent to the main thread with pthread_kill() 1201 # before any other thread has been created (see issue #12392). 1202 code = """if True: 1203 import threading 1204 import signal 1205 import sys 1206 1207 def handler(signum, frame): 1208 sys.exit(3) 1209 1210 signal.signal(signal.SIGUSR1, handler) 1211 signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 1212 sys.exit(2) 1213 """ 1214 1215 with spawn_python('-c', code) as process: 1216 stdout, stderr = process.communicate() 1217 exitcode = process.wait() 1218 if exitcode != 3: 1219 raise Exception("Child error (exit code %s): %s" % 1220 (exitcode, stdout)) 1221 1222 1223class StressTest(unittest.TestCase): 1224 """ 1225 Stress signal delivery, especially when a signal arrives in 1226 the middle of recomputing the signal state or executing 1227 previously tripped signal handlers. 1228 """ 1229 1230 def setsig(self, signum, handler): 1231 old_handler = signal.signal(signum, handler) 1232 self.addCleanup(signal.signal, signum, old_handler) 1233 1234 def measure_itimer_resolution(self): 1235 N = 20 1236 times = [] 1237 1238 def handler(signum=None, frame=None): 1239 if len(times) < N: 1240 times.append(time.perf_counter()) 1241 # 1 µs is the smallest possible timer interval, 1242 # we want to measure what the concrete duration 1243 # will be on this platform 1244 signal.setitimer(signal.ITIMER_REAL, 1e-6) 1245 1246 self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0) 1247 self.setsig(signal.SIGALRM, handler) 1248 handler() 1249 while len(times) < N: 1250 time.sleep(1e-3) 1251 1252 durations = [times[i+1] - times[i] for i in range(len(times) - 1)] 1253 med = statistics.median(durations) 1254 if support.verbose: 1255 print("detected median itimer() resolution: %.6f s." % (med,)) 1256 return med 1257 1258 def decide_itimer_count(self): 1259 # Some systems have poor setitimer() resolution (for example 1260 # measured around 20 ms. on FreeBSD 9), so decide on a reasonable 1261 # number of sequential timers based on that. 1262 reso = self.measure_itimer_resolution() 1263 if reso <= 1e-4: 1264 return 10000 1265 elif reso <= 1e-2: 1266 return 100 1267 else: 1268 self.skipTest("detected itimer resolution (%.3f s.) too high " 1269 "(> 10 ms.) on this platform (or system too busy)" 1270 % (reso,)) 1271 1272 @unittest.skipUnless(hasattr(signal, "setitimer"), 1273 "test needs setitimer()") 1274 def test_stress_delivery_dependent(self): 1275 """ 1276 This test uses dependent signal handlers. 1277 """ 1278 N = self.decide_itimer_count() 1279 sigs = [] 1280 1281 def first_handler(signum, frame): 1282 # 1e-6 is the minimum non-zero value for `setitimer()`. 1283 # Choose a random delay so as to improve chances of 1284 # triggering a race condition. Ideally the signal is received 1285 # when inside critical signal-handling routines such as 1286 # Py_MakePendingCalls(). 1287 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1288 1289 def second_handler(signum=None, frame=None): 1290 sigs.append(signum) 1291 1292 # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both 1293 # ascending and descending sequences (SIGUSR1 then SIGALRM, 1294 # SIGPROF then SIGALRM), we maximize chances of hitting a bug. 1295 self.setsig(signal.SIGPROF, first_handler) 1296 self.setsig(signal.SIGUSR1, first_handler) 1297 self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL 1298 1299 expected_sigs = 0 1300 deadline = time.monotonic() + support.SHORT_TIMEOUT 1301 1302 while expected_sigs < N: 1303 os.kill(os.getpid(), signal.SIGPROF) 1304 expected_sigs += 1 1305 # Wait for handlers to run to avoid signal coalescing 1306 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1307 time.sleep(1e-5) 1308 1309 os.kill(os.getpid(), signal.SIGUSR1) 1310 expected_sigs += 1 1311 while len(sigs) < expected_sigs and time.monotonic() < deadline: 1312 time.sleep(1e-5) 1313 1314 # All ITIMER_REAL signals should have been delivered to the 1315 # Python handler 1316 self.assertEqual(len(sigs), N, "Some signals were lost") 1317 1318 @unittest.skipUnless(hasattr(signal, "setitimer"), 1319 "test needs setitimer()") 1320 def test_stress_delivery_simultaneous(self): 1321 """ 1322 This test uses simultaneous signal handlers. 1323 """ 1324 N = self.decide_itimer_count() 1325 sigs = [] 1326 1327 def handler(signum, frame): 1328 sigs.append(signum) 1329 1330 # On Android, SIGUSR1 is unreliable when used in close proximity to 1331 # another signal – see Android/testbed/app/src/main/python/main.py. 1332 # So we use a different signal. 1333 self.setsig(signal.SIGUSR2, handler) 1334 self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL 1335 1336 expected_sigs = 0 1337 while expected_sigs < N: 1338 # Hopefully the SIGALRM will be received somewhere during 1339 # initial processing of SIGUSR2. 1340 signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 1341 os.kill(os.getpid(), signal.SIGUSR2) 1342 1343 expected_sigs += 2 1344 # Wait for handlers to run to avoid signal coalescing 1345 for _ in support.sleeping_retry(support.SHORT_TIMEOUT): 1346 if len(sigs) >= expected_sigs: 1347 break 1348 1349 # All ITIMER_REAL signals should have been delivered to the 1350 # Python handler 1351 self.assertEqual(len(sigs), N, "Some signals were lost") 1352 1353 @unittest.skipIf(is_apple, "crashes due to system bug (FB13453490)") 1354 @unittest.skipUnless(hasattr(signal, "SIGUSR1"), 1355 "test needs SIGUSR1") 1356 @threading_helper.requires_working_threading() 1357 def test_stress_modifying_handlers(self): 1358 # bpo-43406: race condition between trip_signal() and signal.signal 1359 signum = signal.SIGUSR1 1360 num_sent_signals = 0 1361 num_received_signals = 0 1362 do_stop = False 1363 1364 def custom_handler(signum, frame): 1365 nonlocal num_received_signals 1366 num_received_signals += 1 1367 1368 def set_interrupts(): 1369 nonlocal num_sent_signals 1370 while not do_stop: 1371 signal.raise_signal(signum) 1372 num_sent_signals += 1 1373 1374 def cycle_handlers(): 1375 while num_sent_signals < 100 or num_received_signals < 1: 1376 for i in range(20000): 1377 # Cycle between a Python-defined and a non-Python handler 1378 for handler in [custom_handler, signal.SIG_IGN]: 1379 signal.signal(signum, handler) 1380 1381 old_handler = signal.signal(signum, custom_handler) 1382 self.addCleanup(signal.signal, signum, old_handler) 1383 1384 t = threading.Thread(target=set_interrupts) 1385 try: 1386 ignored = False 1387 with support.catch_unraisable_exception() as cm: 1388 t.start() 1389 cycle_handlers() 1390 do_stop = True 1391 t.join() 1392 1393 if cm.unraisable is not None: 1394 # An unraisable exception may be printed out when 1395 # a signal is ignored due to the aforementioned 1396 # race condition, check it. 1397 self.assertIsInstance(cm.unraisable.exc_value, OSError) 1398 self.assertIn( 1399 f"Signal {signum:d} ignored due to race condition", 1400 str(cm.unraisable.exc_value)) 1401 ignored = True 1402 1403 # bpo-43406: Even if it is unlikely, it's technically possible that 1404 # all signals were ignored because of race conditions. 1405 if not ignored: 1406 # Sanity check that some signals were received, but not all 1407 self.assertGreater(num_received_signals, 0) 1408 self.assertLessEqual(num_received_signals, num_sent_signals) 1409 finally: 1410 do_stop = True 1411 t.join() 1412 1413 1414class RaiseSignalTest(unittest.TestCase): 1415 1416 def test_sigint(self): 1417 with self.assertRaises(KeyboardInterrupt): 1418 signal.raise_signal(signal.SIGINT) 1419 1420 @unittest.skipIf(sys.platform != "win32", "Windows specific test") 1421 def test_invalid_argument(self): 1422 try: 1423 SIGHUP = 1 # not supported on win32 1424 signal.raise_signal(SIGHUP) 1425 self.fail("OSError (Invalid argument) expected") 1426 except OSError as e: 1427 if e.errno == errno.EINVAL: 1428 pass 1429 else: 1430 raise 1431 1432 def test_handler(self): 1433 is_ok = False 1434 def handler(a, b): 1435 nonlocal is_ok 1436 is_ok = True 1437 old_signal = signal.signal(signal.SIGINT, handler) 1438 self.addCleanup(signal.signal, signal.SIGINT, old_signal) 1439 1440 signal.raise_signal(signal.SIGINT) 1441 self.assertTrue(is_ok) 1442 1443 def test__thread_interrupt_main(self): 1444 # See https://github.com/python/cpython/issues/102397 1445 code = """if 1: 1446 import _thread 1447 class Foo(): 1448 def __del__(self): 1449 _thread.interrupt_main() 1450 1451 x = Foo() 1452 """ 1453 1454 rc, out, err = assert_python_ok('-c', code) 1455 self.assertIn(b'OSError: Signal 2 ignored due to race condition', err) 1456 1457 1458 1459class PidfdSignalTest(unittest.TestCase): 1460 1461 @unittest.skipUnless( 1462 hasattr(signal, "pidfd_send_signal"), 1463 "pidfd support not built in", 1464 ) 1465 def test_pidfd_send_signal(self): 1466 with self.assertRaises(OSError) as cm: 1467 signal.pidfd_send_signal(0, signal.SIGINT) 1468 if cm.exception.errno == errno.ENOSYS: 1469 self.skipTest("kernel does not support pidfds") 1470 elif cm.exception.errno == errno.EPERM: 1471 self.skipTest("Not enough privileges to use pidfs") 1472 self.assertEqual(cm.exception.errno, errno.EBADF) 1473 my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY) 1474 self.addCleanup(os.close, my_pidfd) 1475 with self.assertRaisesRegex(TypeError, "^siginfo must be None$"): 1476 signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0) 1477 with self.assertRaises(KeyboardInterrupt): 1478 signal.pidfd_send_signal(my_pidfd, signal.SIGINT) 1479 1480def tearDownModule(): 1481 support.reap_children() 1482 1483if __name__ == "__main__": 1484 unittest.main() 1485