1from contextlib import contextmanager 2import datetime 3import faulthandler 4import os 5import signal 6import subprocess 7import sys 8import sysconfig 9from test import support 10from test.support import script_helper, is_android 11import tempfile 12import threading 13import unittest 14from textwrap import dedent 15 16try: 17 import _testcapi 18except ImportError: 19 _testcapi = None 20 21TIMEOUT = 0.5 22MS_WINDOWS = (os.name == 'nt') 23_cflags = sysconfig.get_config_var('CFLAGS') or '' 24_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 25UB_SANITIZER = ( 26 '-fsanitize=undefined' in _cflags or 27 '--with-undefined-behavior-sanitizer' in _config_args 28) 29MEMORY_SANITIZER = ( 30 '-fsanitize=memory' in _cflags or 31 '--with-memory-sanitizer' in _config_args 32) 33 34 35def expected_traceback(lineno1, lineno2, header, min_count=1): 36 regex = header 37 regex += ' File "<string>", line %s in func\n' % lineno1 38 regex += ' File "<string>", line %s in <module>' % lineno2 39 if 1 < min_count: 40 return '^' + (regex + '\n') * (min_count - 1) + regex 41 else: 42 return '^' + regex + '$' 43 44def skip_segfault_on_android(test): 45 # Issue #32138: Raising SIGSEGV on Android may not cause a crash. 46 return unittest.skipIf(is_android, 47 'raising SIGSEGV on Android is unreliable')(test) 48 49@contextmanager 50def temporary_filename(): 51 filename = tempfile.mktemp() 52 try: 53 yield filename 54 finally: 55 support.unlink(filename) 56 57class FaultHandlerTests(unittest.TestCase): 58 def get_output(self, code, filename=None, fd=None): 59 """ 60 Run the specified code in Python (in a new child process) and read the 61 output from the standard error or from a file (if filename is set). 62 Return the output lines as a list. 63 64 Strip the reference count from the standard error for Python debug 65 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current 66 thread XXX". 67 """ 68 code = dedent(code).strip() 69 pass_fds = [] 70 if fd is not None: 71 pass_fds.append(fd) 72 with support.SuppressCrashReport(): 73 process = script_helper.spawn_python('-c', code, pass_fds=pass_fds) 74 with process: 75 stdout, stderr = process.communicate() 76 exitcode = process.wait() 77 output = support.strip_python_stderr(stdout) 78 output = output.decode('ascii', 'backslashreplace') 79 if filename: 80 self.assertEqual(output, '') 81 with open(filename, "rb") as fp: 82 output = fp.read() 83 output = output.decode('ascii', 'backslashreplace') 84 elif fd is not None: 85 self.assertEqual(output, '') 86 os.lseek(fd, os.SEEK_SET, 0) 87 with open(fd, "rb", closefd=False) as fp: 88 output = fp.read() 89 output = output.decode('ascii', 'backslashreplace') 90 return output.splitlines(), exitcode 91 92 def check_error(self, code, line_number, fatal_error, *, 93 filename=None, all_threads=True, other_regex=None, 94 fd=None, know_current_thread=True): 95 """ 96 Check that the fault handler for fatal errors is enabled and check the 97 traceback from the child process output. 98 99 Raise an error if the output doesn't match the expected format. 100 """ 101 if all_threads: 102 if know_current_thread: 103 header = 'Current thread 0x[0-9a-f]+' 104 else: 105 header = 'Thread 0x[0-9a-f]+' 106 else: 107 header = 'Stack' 108 regex = r""" 109 (?m)^{fatal_error} 110 111 {header} \(most recent call first\): 112 File "<string>", line {lineno} in <module> 113 """ 114 regex = dedent(regex.format( 115 lineno=line_number, 116 fatal_error=fatal_error, 117 header=header)).strip() 118 if other_regex: 119 regex += '|' + other_regex 120 output, exitcode = self.get_output(code, filename=filename, fd=fd) 121 output = '\n'.join(output) 122 self.assertRegex(output, regex) 123 self.assertNotEqual(exitcode, 0) 124 125 def check_fatal_error(self, code, line_number, name_regex, **kw): 126 fatal_error = 'Fatal Python error: %s' % name_regex 127 self.check_error(code, line_number, fatal_error, **kw) 128 129 def check_windows_exception(self, code, line_number, name_regex, **kw): 130 fatal_error = 'Windows fatal exception: %s' % name_regex 131 self.check_error(code, line_number, fatal_error, **kw) 132 133 @unittest.skipIf(sys.platform.startswith('aix'), 134 "the first page of memory is a mapped read-only on AIX") 135 def test_read_null(self): 136 if not MS_WINDOWS: 137 self.check_fatal_error(""" 138 import faulthandler 139 faulthandler.enable() 140 faulthandler._read_null() 141 """, 142 3, 143 # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion 144 '(?:Segmentation fault' 145 '|Bus error' 146 '|Illegal instruction)') 147 else: 148 self.check_windows_exception(""" 149 import faulthandler 150 faulthandler.enable() 151 faulthandler._read_null() 152 """, 153 3, 154 'access violation') 155 156 @skip_segfault_on_android 157 def test_sigsegv(self): 158 self.check_fatal_error(""" 159 import faulthandler 160 faulthandler.enable() 161 faulthandler._sigsegv() 162 """, 163 3, 164 'Segmentation fault') 165 166 def test_fatal_error_c_thread(self): 167 self.check_fatal_error(""" 168 import faulthandler 169 faulthandler.enable() 170 faulthandler._fatal_error_c_thread() 171 """, 172 3, 173 'in new thread', 174 know_current_thread=False) 175 176 def test_sigabrt(self): 177 self.check_fatal_error(""" 178 import faulthandler 179 faulthandler.enable() 180 faulthandler._sigabrt() 181 """, 182 3, 183 'Aborted') 184 185 @unittest.skipIf(sys.platform == 'win32', 186 "SIGFPE cannot be caught on Windows") 187 def test_sigfpe(self): 188 self.check_fatal_error(""" 189 import faulthandler 190 faulthandler.enable() 191 faulthandler._sigfpe() 192 """, 193 3, 194 'Floating point exception') 195 196 @unittest.skipIf(_testcapi is None, 'need _testcapi') 197 @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS') 198 @skip_segfault_on_android 199 def test_sigbus(self): 200 self.check_fatal_error(""" 201 import _testcapi 202 import faulthandler 203 import signal 204 205 faulthandler.enable() 206 _testcapi.raise_signal(signal.SIGBUS) 207 """, 208 6, 209 'Bus error') 210 211 @unittest.skipIf(_testcapi is None, 'need _testcapi') 212 @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL') 213 @skip_segfault_on_android 214 def test_sigill(self): 215 self.check_fatal_error(""" 216 import _testcapi 217 import faulthandler 218 import signal 219 220 faulthandler.enable() 221 _testcapi.raise_signal(signal.SIGILL) 222 """, 223 6, 224 'Illegal instruction') 225 226 def test_fatal_error(self): 227 self.check_fatal_error(""" 228 import faulthandler 229 faulthandler._fatal_error(b'xyz') 230 """, 231 2, 232 'xyz') 233 234 def test_fatal_error_without_gil(self): 235 self.check_fatal_error(""" 236 import faulthandler 237 faulthandler._fatal_error(b'xyz', True) 238 """, 239 2, 240 'xyz') 241 242 @unittest.skipIf(sys.platform.startswith('openbsd'), 243 "Issue #12868: sigaltstack() doesn't work on " 244 "OpenBSD if Python is compiled with pthread") 245 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'), 246 'need faulthandler._stack_overflow()') 247 def test_stack_overflow(self): 248 self.check_fatal_error(""" 249 import faulthandler 250 faulthandler.enable() 251 faulthandler._stack_overflow() 252 """, 253 3, 254 '(?:Segmentation fault|Bus error)', 255 other_regex='unable to raise a stack overflow') 256 257 @skip_segfault_on_android 258 def test_gil_released(self): 259 self.check_fatal_error(""" 260 import faulthandler 261 faulthandler.enable() 262 faulthandler._sigsegv(True) 263 """, 264 3, 265 'Segmentation fault') 266 267 @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER, 268 "sanitizer builds change crashing process output.") 269 @skip_segfault_on_android 270 def test_enable_file(self): 271 with temporary_filename() as filename: 272 self.check_fatal_error(""" 273 import faulthandler 274 output = open({filename}, 'wb') 275 faulthandler.enable(output) 276 faulthandler._sigsegv() 277 """.format(filename=repr(filename)), 278 4, 279 'Segmentation fault', 280 filename=filename) 281 282 @unittest.skipIf(sys.platform == "win32", 283 "subprocess doesn't support pass_fds on Windows") 284 @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER, 285 "sanitizer builds change crashing process output.") 286 @skip_segfault_on_android 287 def test_enable_fd(self): 288 with tempfile.TemporaryFile('wb+') as fp: 289 fd = fp.fileno() 290 self.check_fatal_error(""" 291 import faulthandler 292 import sys 293 faulthandler.enable(%s) 294 faulthandler._sigsegv() 295 """ % fd, 296 4, 297 'Segmentation fault', 298 fd=fd) 299 300 @skip_segfault_on_android 301 def test_enable_single_thread(self): 302 self.check_fatal_error(""" 303 import faulthandler 304 faulthandler.enable(all_threads=False) 305 faulthandler._sigsegv() 306 """, 307 3, 308 'Segmentation fault', 309 all_threads=False) 310 311 @skip_segfault_on_android 312 def test_disable(self): 313 code = """ 314 import faulthandler 315 faulthandler.enable() 316 faulthandler.disable() 317 faulthandler._sigsegv() 318 """ 319 not_expected = 'Fatal Python error' 320 stderr, exitcode = self.get_output(code) 321 stderr = '\n'.join(stderr) 322 self.assertTrue(not_expected not in stderr, 323 "%r is present in %r" % (not_expected, stderr)) 324 self.assertNotEqual(exitcode, 0) 325 326 def test_is_enabled(self): 327 orig_stderr = sys.stderr 328 try: 329 # regrtest may replace sys.stderr by io.StringIO object, but 330 # faulthandler.enable() requires that sys.stderr has a fileno() 331 # method 332 sys.stderr = sys.__stderr__ 333 334 was_enabled = faulthandler.is_enabled() 335 try: 336 faulthandler.enable() 337 self.assertTrue(faulthandler.is_enabled()) 338 faulthandler.disable() 339 self.assertFalse(faulthandler.is_enabled()) 340 finally: 341 if was_enabled: 342 faulthandler.enable() 343 else: 344 faulthandler.disable() 345 finally: 346 sys.stderr = orig_stderr 347 348 def test_disabled_by_default(self): 349 # By default, the module should be disabled 350 code = "import faulthandler; print(faulthandler.is_enabled())" 351 args = (sys.executable, "-E", "-c", code) 352 # don't use assert_python_ok() because it always enables faulthandler 353 output = subprocess.check_output(args) 354 self.assertEqual(output.rstrip(), b"False") 355 356 def test_sys_xoptions(self): 357 # Test python -X faulthandler 358 code = "import faulthandler; print(faulthandler.is_enabled())" 359 args = filter(None, (sys.executable, 360 "-E" if sys.flags.ignore_environment else "", 361 "-X", "faulthandler", "-c", code)) 362 env = os.environ.copy() 363 env.pop("PYTHONFAULTHANDLER", None) 364 # don't use assert_python_ok() because it always enables faulthandler 365 output = subprocess.check_output(args, env=env) 366 self.assertEqual(output.rstrip(), b"True") 367 368 def test_env_var(self): 369 # empty env var 370 code = "import faulthandler; print(faulthandler.is_enabled())" 371 args = (sys.executable, "-c", code) 372 env = dict(os.environ) 373 env['PYTHONFAULTHANDLER'] = '' 374 env['PYTHONDEVMODE'] = '' 375 # don't use assert_python_ok() because it always enables faulthandler 376 output = subprocess.check_output(args, env=env) 377 self.assertEqual(output.rstrip(), b"False") 378 379 # non-empty env var 380 env = dict(os.environ) 381 env['PYTHONFAULTHANDLER'] = '1' 382 env['PYTHONDEVMODE'] = '' 383 output = subprocess.check_output(args, env=env) 384 self.assertEqual(output.rstrip(), b"True") 385 386 def check_dump_traceback(self, *, filename=None, fd=None): 387 """ 388 Explicitly call dump_traceback() function and check its output. 389 Raise an error if the output doesn't match the expected format. 390 """ 391 code = """ 392 import faulthandler 393 394 filename = {filename!r} 395 fd = {fd} 396 397 def funcB(): 398 if filename: 399 with open(filename, "wb") as fp: 400 faulthandler.dump_traceback(fp, all_threads=False) 401 elif fd is not None: 402 faulthandler.dump_traceback(fd, 403 all_threads=False) 404 else: 405 faulthandler.dump_traceback(all_threads=False) 406 407 def funcA(): 408 funcB() 409 410 funcA() 411 """ 412 code = code.format( 413 filename=filename, 414 fd=fd, 415 ) 416 if filename: 417 lineno = 9 418 elif fd is not None: 419 lineno = 12 420 else: 421 lineno = 14 422 expected = [ 423 'Stack (most recent call first):', 424 ' File "<string>", line %s in funcB' % lineno, 425 ' File "<string>", line 17 in funcA', 426 ' File "<string>", line 19 in <module>' 427 ] 428 trace, exitcode = self.get_output(code, filename, fd) 429 self.assertEqual(trace, expected) 430 self.assertEqual(exitcode, 0) 431 432 def test_dump_traceback(self): 433 self.check_dump_traceback() 434 435 def test_dump_traceback_file(self): 436 with temporary_filename() as filename: 437 self.check_dump_traceback(filename=filename) 438 439 @unittest.skipIf(sys.platform == "win32", 440 "subprocess doesn't support pass_fds on Windows") 441 def test_dump_traceback_fd(self): 442 with tempfile.TemporaryFile('wb+') as fp: 443 self.check_dump_traceback(fd=fp.fileno()) 444 445 def test_truncate(self): 446 maxlen = 500 447 func_name = 'x' * (maxlen + 50) 448 truncated = 'x' * maxlen + '...' 449 code = """ 450 import faulthandler 451 452 def {func_name}(): 453 faulthandler.dump_traceback(all_threads=False) 454 455 {func_name}() 456 """ 457 code = code.format( 458 func_name=func_name, 459 ) 460 expected = [ 461 'Stack (most recent call first):', 462 ' File "<string>", line 4 in %s' % truncated, 463 ' File "<string>", line 6 in <module>' 464 ] 465 trace, exitcode = self.get_output(code) 466 self.assertEqual(trace, expected) 467 self.assertEqual(exitcode, 0) 468 469 def check_dump_traceback_threads(self, filename): 470 """ 471 Call explicitly dump_traceback(all_threads=True) and check the output. 472 Raise an error if the output doesn't match the expected format. 473 """ 474 code = """ 475 import faulthandler 476 from threading import Thread, Event 477 import time 478 479 def dump(): 480 if {filename}: 481 with open({filename}, "wb") as fp: 482 faulthandler.dump_traceback(fp, all_threads=True) 483 else: 484 faulthandler.dump_traceback(all_threads=True) 485 486 class Waiter(Thread): 487 # avoid blocking if the main thread raises an exception. 488 daemon = True 489 490 def __init__(self): 491 Thread.__init__(self) 492 self.running = Event() 493 self.stop = Event() 494 495 def run(self): 496 self.running.set() 497 self.stop.wait() 498 499 waiter = Waiter() 500 waiter.start() 501 waiter.running.wait() 502 dump() 503 waiter.stop.set() 504 waiter.join() 505 """ 506 code = code.format(filename=repr(filename)) 507 output, exitcode = self.get_output(code, filename) 508 output = '\n'.join(output) 509 if filename: 510 lineno = 8 511 else: 512 lineno = 10 513 regex = r""" 514 ^Thread 0x[0-9a-f]+ \(most recent call first\): 515 (?: File ".*threading.py", line [0-9]+ in [_a-z]+ 516 ){{1,3}} File "<string>", line 23 in run 517 File ".*threading.py", line [0-9]+ in _bootstrap_inner 518 File ".*threading.py", line [0-9]+ in _bootstrap 519 520 Current thread 0x[0-9a-f]+ \(most recent call first\): 521 File "<string>", line {lineno} in dump 522 File "<string>", line 28 in <module>$ 523 """ 524 regex = dedent(regex.format(lineno=lineno)).strip() 525 self.assertRegex(output, regex) 526 self.assertEqual(exitcode, 0) 527 528 def test_dump_traceback_threads(self): 529 self.check_dump_traceback_threads(None) 530 531 def test_dump_traceback_threads_file(self): 532 with temporary_filename() as filename: 533 self.check_dump_traceback_threads(filename) 534 535 @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'), 536 'need faulthandler.dump_traceback_later()') 537 def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1, 538 *, filename=None, fd=None): 539 """ 540 Check how many times the traceback is written in timeout x 2.5 seconds, 541 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending 542 on repeat and cancel options. 543 544 Raise an error if the output doesn't match the expect format. 545 """ 546 timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) 547 code = """ 548 import faulthandler 549 import time 550 import sys 551 552 timeout = {timeout} 553 repeat = {repeat} 554 cancel = {cancel} 555 loops = {loops} 556 filename = {filename!r} 557 fd = {fd} 558 559 def func(timeout, repeat, cancel, file, loops): 560 for loop in range(loops): 561 faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) 562 if cancel: 563 faulthandler.cancel_dump_traceback_later() 564 time.sleep(timeout * 5) 565 faulthandler.cancel_dump_traceback_later() 566 567 if filename: 568 file = open(filename, "wb") 569 elif fd is not None: 570 file = sys.stderr.fileno() 571 else: 572 file = None 573 func(timeout, repeat, cancel, file, loops) 574 if filename: 575 file.close() 576 """ 577 code = code.format( 578 timeout=TIMEOUT, 579 repeat=repeat, 580 cancel=cancel, 581 loops=loops, 582 filename=filename, 583 fd=fd, 584 ) 585 trace, exitcode = self.get_output(code, filename) 586 trace = '\n'.join(trace) 587 588 if not cancel: 589 count = loops 590 if repeat: 591 count *= 2 592 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str 593 regex = expected_traceback(17, 26, header, min_count=count) 594 self.assertRegex(trace, regex) 595 else: 596 self.assertEqual(trace, '') 597 self.assertEqual(exitcode, 0) 598 599 def test_dump_traceback_later(self): 600 self.check_dump_traceback_later() 601 602 def test_dump_traceback_later_repeat(self): 603 self.check_dump_traceback_later(repeat=True) 604 605 def test_dump_traceback_later_cancel(self): 606 self.check_dump_traceback_later(cancel=True) 607 608 def test_dump_traceback_later_file(self): 609 with temporary_filename() as filename: 610 self.check_dump_traceback_later(filename=filename) 611 612 @unittest.skipIf(sys.platform == "win32", 613 "subprocess doesn't support pass_fds on Windows") 614 def test_dump_traceback_later_fd(self): 615 with tempfile.TemporaryFile('wb+') as fp: 616 self.check_dump_traceback_later(fd=fp.fileno()) 617 618 def test_dump_traceback_later_twice(self): 619 self.check_dump_traceback_later(loops=2) 620 621 @unittest.skipIf(not hasattr(faulthandler, "register"), 622 "need faulthandler.register") 623 def check_register(self, filename=False, all_threads=False, 624 unregister=False, chain=False, fd=None): 625 """ 626 Register a handler displaying the traceback on a user signal. Raise the 627 signal and check the written traceback. 628 629 If chain is True, check that the previous signal handler is called. 630 631 Raise an error if the output doesn't match the expected format. 632 """ 633 signum = signal.SIGUSR1 634 code = """ 635 import faulthandler 636 import os 637 import signal 638 import sys 639 640 all_threads = {all_threads} 641 signum = {signum} 642 unregister = {unregister} 643 chain = {chain} 644 filename = {filename!r} 645 fd = {fd} 646 647 def func(signum): 648 os.kill(os.getpid(), signum) 649 650 def handler(signum, frame): 651 handler.called = True 652 handler.called = False 653 654 if filename: 655 file = open(filename, "wb") 656 elif fd is not None: 657 file = sys.stderr.fileno() 658 else: 659 file = None 660 if chain: 661 signal.signal(signum, handler) 662 faulthandler.register(signum, file=file, 663 all_threads=all_threads, chain={chain}) 664 if unregister: 665 faulthandler.unregister(signum) 666 func(signum) 667 if chain and not handler.called: 668 if file is not None: 669 output = file 670 else: 671 output = sys.stderr 672 print("Error: signal handler not called!", file=output) 673 exitcode = 1 674 else: 675 exitcode = 0 676 if filename: 677 file.close() 678 sys.exit(exitcode) 679 """ 680 code = code.format( 681 all_threads=all_threads, 682 signum=signum, 683 unregister=unregister, 684 chain=chain, 685 filename=filename, 686 fd=fd, 687 ) 688 trace, exitcode = self.get_output(code, filename) 689 trace = '\n'.join(trace) 690 if not unregister: 691 if all_threads: 692 regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n' 693 else: 694 regex = r'Stack \(most recent call first\):\n' 695 regex = expected_traceback(14, 32, regex) 696 self.assertRegex(trace, regex) 697 else: 698 self.assertEqual(trace, '') 699 if unregister: 700 self.assertNotEqual(exitcode, 0) 701 else: 702 self.assertEqual(exitcode, 0) 703 704 def test_register(self): 705 self.check_register() 706 707 def test_unregister(self): 708 self.check_register(unregister=True) 709 710 def test_register_file(self): 711 with temporary_filename() as filename: 712 self.check_register(filename=filename) 713 714 @unittest.skipIf(sys.platform == "win32", 715 "subprocess doesn't support pass_fds on Windows") 716 def test_register_fd(self): 717 with tempfile.TemporaryFile('wb+') as fp: 718 self.check_register(fd=fp.fileno()) 719 720 def test_register_threads(self): 721 self.check_register(all_threads=True) 722 723 def test_register_chain(self): 724 self.check_register(chain=True) 725 726 @contextmanager 727 def check_stderr_none(self): 728 stderr = sys.stderr 729 try: 730 sys.stderr = None 731 with self.assertRaises(RuntimeError) as cm: 732 yield 733 self.assertEqual(str(cm.exception), "sys.stderr is None") 734 finally: 735 sys.stderr = stderr 736 737 def test_stderr_None(self): 738 # Issue #21497: provide a helpful error if sys.stderr is None, 739 # instead of just an attribute error: "None has no attribute fileno". 740 with self.check_stderr_none(): 741 faulthandler.enable() 742 with self.check_stderr_none(): 743 faulthandler.dump_traceback() 744 if hasattr(faulthandler, 'dump_traceback_later'): 745 with self.check_stderr_none(): 746 faulthandler.dump_traceback_later(1e-3) 747 if hasattr(faulthandler, "register"): 748 with self.check_stderr_none(): 749 faulthandler.register(signal.SIGUSR1) 750 751 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 752 def test_raise_exception(self): 753 for exc, name in ( 754 ('EXCEPTION_ACCESS_VIOLATION', 'access violation'), 755 ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'), 756 ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'), 757 ): 758 self.check_windows_exception(f""" 759 import faulthandler 760 faulthandler.enable() 761 faulthandler._raise_exception(faulthandler._{exc}) 762 """, 763 3, 764 name) 765 766 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 767 def test_ignore_exception(self): 768 for exc_code in ( 769 0xE06D7363, # MSC exception ("Emsc") 770 0xE0434352, # COM Callable Runtime exception ("ECCR") 771 ): 772 code = f""" 773 import faulthandler 774 faulthandler.enable() 775 faulthandler._raise_exception({exc_code}) 776 """ 777 code = dedent(code) 778 output, exitcode = self.get_output(code) 779 self.assertEqual(output, []) 780 self.assertEqual(exitcode, exc_code) 781 782 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 783 def test_raise_nonfatal_exception(self): 784 # These exceptions are not strictly errors. Letting 785 # faulthandler display the traceback when they are 786 # raised is likely to result in noise. However, they 787 # may still terminate the process if there is no 788 # handler installed for them (which there typically 789 # is, e.g. for debug messages). 790 for exc in ( 791 0x00000000, 792 0x34567890, 793 0x40000000, 794 0x40001000, 795 0x70000000, 796 0x7FFFFFFF, 797 ): 798 output, exitcode = self.get_output(f""" 799 import faulthandler 800 faulthandler.enable() 801 faulthandler._raise_exception(0x{exc:x}) 802 """ 803 ) 804 self.assertEqual(output, []) 805 # On Windows older than 7 SP1, the actual exception code has 806 # bit 29 cleared. 807 self.assertIn(exitcode, 808 (exc, exc & ~0x10000000)) 809 810 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 811 def test_disable_windows_exc_handler(self): 812 code = dedent(""" 813 import faulthandler 814 faulthandler.enable() 815 faulthandler.disable() 816 code = faulthandler._EXCEPTION_ACCESS_VIOLATION 817 faulthandler._raise_exception(code) 818 """) 819 output, exitcode = self.get_output(code) 820 self.assertEqual(output, []) 821 self.assertEqual(exitcode, 0xC0000005) 822 823 824if __name__ == "__main__": 825 unittest.main() 826