1from contextlib import contextmanager 2import datetime 3import faulthandler 4import os 5import re 6import signal 7import subprocess 8import sys 9import sysconfig 10from test import support 11from test.support import os_helper 12from test.support import script_helper, is_android 13import tempfile 14import unittest 15from textwrap import dedent 16 17try: 18 import _testcapi 19except ImportError: 20 _testcapi = None 21 22TIMEOUT = 0.5 23MS_WINDOWS = (os.name == 'nt') 24_cflags = sysconfig.get_config_var('CFLAGS') or '' 25_config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 26UB_SANITIZER = ( 27 '-fsanitize=undefined' in _cflags or 28 '--with-undefined-behavior-sanitizer' in _config_args 29) 30MEMORY_SANITIZER = ( 31 '-fsanitize=memory' in _cflags or 32 '--with-memory-sanitizer' in _config_args 33) 34 35 36def expected_traceback(lineno1, lineno2, header, min_count=1): 37 regex = header 38 regex += ' File "<string>", line %s in func\n' % lineno1 39 regex += ' File "<string>", line %s in <module>' % lineno2 40 if 1 < min_count: 41 return '^' + (regex + '\n') * (min_count - 1) + regex 42 else: 43 return '^' + regex + '$' 44 45def skip_segfault_on_android(test): 46 # Issue #32138: Raising SIGSEGV on Android may not cause a crash. 47 return unittest.skipIf(is_android, 48 'raising SIGSEGV on Android is unreliable')(test) 49 50@contextmanager 51def temporary_filename(): 52 filename = tempfile.mktemp() 53 try: 54 yield filename 55 finally: 56 os_helper.unlink(filename) 57 58class FaultHandlerTests(unittest.TestCase): 59 def get_output(self, code, filename=None, fd=None): 60 """ 61 Run the specified code in Python (in a new child process) and read the 62 output from the standard error or from a file (if filename is set). 63 Return the output lines as a list. 64 65 Strip the reference count from the standard error for Python debug 66 build, and replace "Current thread 0x00007f8d8fbd9700" by "Current 67 thread XXX". 68 """ 69 code = dedent(code).strip() 70 pass_fds = [] 71 if fd is not None: 72 pass_fds.append(fd) 73 with support.SuppressCrashReport(): 74 process = script_helper.spawn_python('-c', code, pass_fds=pass_fds) 75 with process: 76 output, stderr = process.communicate() 77 exitcode = process.wait() 78 output = output.decode('ascii', 'backslashreplace') 79 if filename: 80 self.assertEqual(output, '') 81 with open(filename, "rb") as fp: 82 output = fp.read() 83 output = output.decode('ascii', 'backslashreplace') 84 elif fd is not None: 85 self.assertEqual(output, '') 86 os.lseek(fd, os.SEEK_SET, 0) 87 with open(fd, "rb", closefd=False) as fp: 88 output = fp.read() 89 output = output.decode('ascii', 'backslashreplace') 90 return output.splitlines(), exitcode 91 92 def check_error(self, code, lineno, fatal_error, *, 93 filename=None, all_threads=True, other_regex=None, 94 fd=None, know_current_thread=True, 95 py_fatal_error=False, 96 garbage_collecting=False, 97 function='<module>'): 98 """ 99 Check that the fault handler for fatal errors is enabled and check the 100 traceback from the child process output. 101 102 Raise an error if the output doesn't match the expected format. 103 """ 104 if all_threads: 105 if know_current_thread: 106 header = 'Current thread 0x[0-9a-f]+' 107 else: 108 header = 'Thread 0x[0-9a-f]+' 109 else: 110 header = 'Stack' 111 regex = [f'^{fatal_error}'] 112 if py_fatal_error: 113 regex.append("Python runtime state: initialized") 114 regex.append('') 115 regex.append(fr'{header} \(most recent call first\):') 116 if garbage_collecting: 117 regex.append(' Garbage-collecting') 118 regex.append(fr' File "<string>", line {lineno} in {function}') 119 regex = '\n'.join(regex) 120 121 if other_regex: 122 regex = f'(?:{regex}|{other_regex})' 123 124 # Enable MULTILINE flag 125 regex = f'(?m){regex}' 126 output, exitcode = self.get_output(code, filename=filename, fd=fd) 127 output = '\n'.join(output) 128 self.assertRegex(output, regex) 129 self.assertNotEqual(exitcode, 0) 130 131 def check_fatal_error(self, code, line_number, name_regex, func=None, **kw): 132 if func: 133 name_regex = '%s: %s' % (func, name_regex) 134 fatal_error = 'Fatal Python error: %s' % name_regex 135 self.check_error(code, line_number, fatal_error, **kw) 136 137 def check_windows_exception(self, code, line_number, name_regex, **kw): 138 fatal_error = 'Windows fatal exception: %s' % name_regex 139 self.check_error(code, line_number, fatal_error, **kw) 140 141 @unittest.skipIf(sys.platform.startswith('aix'), 142 "the first page of memory is a mapped read-only on AIX") 143 def test_read_null(self): 144 if not MS_WINDOWS: 145 self.check_fatal_error(""" 146 import faulthandler 147 faulthandler.enable() 148 faulthandler._read_null() 149 """, 150 3, 151 # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion 152 '(?:Segmentation fault' 153 '|Bus error' 154 '|Illegal instruction)') 155 else: 156 self.check_windows_exception(""" 157 import faulthandler 158 faulthandler.enable() 159 faulthandler._read_null() 160 """, 161 3, 162 'access violation') 163 164 @skip_segfault_on_android 165 def test_sigsegv(self): 166 self.check_fatal_error(""" 167 import faulthandler 168 faulthandler.enable() 169 faulthandler._sigsegv() 170 """, 171 3, 172 'Segmentation fault') 173 174 @skip_segfault_on_android 175 def test_gc(self): 176 # bpo-44466: Detect if the GC is running 177 self.check_fatal_error(""" 178 import faulthandler 179 import gc 180 import sys 181 182 faulthandler.enable() 183 184 class RefCycle: 185 def __del__(self): 186 faulthandler._sigsegv() 187 188 # create a reference cycle which triggers a fatal 189 # error in a destructor 190 a = RefCycle() 191 b = RefCycle() 192 a.b = b 193 b.a = a 194 195 # Delete the objects, not the cycle 196 a = None 197 b = None 198 199 # Break the reference cycle: call __del__() 200 gc.collect() 201 202 # Should not reach this line 203 print("exit", file=sys.stderr) 204 """, 205 9, 206 'Segmentation fault', 207 function='__del__', 208 garbage_collecting=True) 209 210 def test_fatal_error_c_thread(self): 211 self.check_fatal_error(""" 212 import faulthandler 213 faulthandler.enable() 214 faulthandler._fatal_error_c_thread() 215 """, 216 3, 217 'in new thread', 218 know_current_thread=False, 219 func='faulthandler_fatal_error_thread', 220 py_fatal_error=True) 221 222 def test_sigabrt(self): 223 self.check_fatal_error(""" 224 import faulthandler 225 faulthandler.enable() 226 faulthandler._sigabrt() 227 """, 228 3, 229 'Aborted') 230 231 @unittest.skipIf(sys.platform == 'win32', 232 "SIGFPE cannot be caught on Windows") 233 def test_sigfpe(self): 234 self.check_fatal_error(""" 235 import faulthandler 236 faulthandler.enable() 237 faulthandler._sigfpe() 238 """, 239 3, 240 'Floating point exception') 241 242 @unittest.skipIf(_testcapi is None, 'need _testcapi') 243 @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS') 244 @skip_segfault_on_android 245 def test_sigbus(self): 246 self.check_fatal_error(""" 247 import faulthandler 248 import signal 249 250 faulthandler.enable() 251 signal.raise_signal(signal.SIGBUS) 252 """, 253 5, 254 'Bus error') 255 256 @unittest.skipIf(_testcapi is None, 'need _testcapi') 257 @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL') 258 @skip_segfault_on_android 259 def test_sigill(self): 260 self.check_fatal_error(""" 261 import faulthandler 262 import signal 263 264 faulthandler.enable() 265 signal.raise_signal(signal.SIGILL) 266 """, 267 5, 268 'Illegal instruction') 269 270 def check_fatal_error_func(self, release_gil): 271 # Test that Py_FatalError() dumps a traceback 272 with support.SuppressCrashReport(): 273 self.check_fatal_error(f""" 274 import _testcapi 275 _testcapi.fatal_error(b'xyz', {release_gil}) 276 """, 277 2, 278 'xyz', 279 func='test_fatal_error', 280 py_fatal_error=True) 281 282 def test_fatal_error(self): 283 self.check_fatal_error_func(False) 284 285 def test_fatal_error_without_gil(self): 286 self.check_fatal_error_func(True) 287 288 @unittest.skipIf(sys.platform.startswith('openbsd'), 289 "Issue #12868: sigaltstack() doesn't work on " 290 "OpenBSD if Python is compiled with pthread") 291 @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'), 292 'need faulthandler._stack_overflow()') 293 def test_stack_overflow(self): 294 self.check_fatal_error(""" 295 import faulthandler 296 faulthandler.enable() 297 faulthandler._stack_overflow() 298 """, 299 3, 300 '(?:Segmentation fault|Bus error)', 301 other_regex='unable to raise a stack overflow') 302 303 @skip_segfault_on_android 304 def test_gil_released(self): 305 self.check_fatal_error(""" 306 import faulthandler 307 faulthandler.enable() 308 faulthandler._sigsegv(True) 309 """, 310 3, 311 'Segmentation fault') 312 313 @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER, 314 "sanitizer builds change crashing process output.") 315 @skip_segfault_on_android 316 def test_enable_file(self): 317 with temporary_filename() as filename: 318 self.check_fatal_error(""" 319 import faulthandler 320 output = open({filename}, 'wb') 321 faulthandler.enable(output) 322 faulthandler._sigsegv() 323 """.format(filename=repr(filename)), 324 4, 325 'Segmentation fault', 326 filename=filename) 327 328 @unittest.skipIf(sys.platform == "win32", 329 "subprocess doesn't support pass_fds on Windows") 330 @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER, 331 "sanitizer builds change crashing process output.") 332 @skip_segfault_on_android 333 def test_enable_fd(self): 334 with tempfile.TemporaryFile('wb+') as fp: 335 fd = fp.fileno() 336 self.check_fatal_error(""" 337 import faulthandler 338 import sys 339 faulthandler.enable(%s) 340 faulthandler._sigsegv() 341 """ % fd, 342 4, 343 'Segmentation fault', 344 fd=fd) 345 346 @skip_segfault_on_android 347 def test_enable_single_thread(self): 348 self.check_fatal_error(""" 349 import faulthandler 350 faulthandler.enable(all_threads=False) 351 faulthandler._sigsegv() 352 """, 353 3, 354 'Segmentation fault', 355 all_threads=False) 356 357 @skip_segfault_on_android 358 def test_disable(self): 359 code = """ 360 import faulthandler 361 faulthandler.enable() 362 faulthandler.disable() 363 faulthandler._sigsegv() 364 """ 365 not_expected = 'Fatal Python error' 366 stderr, exitcode = self.get_output(code) 367 stderr = '\n'.join(stderr) 368 self.assertTrue(not_expected not in stderr, 369 "%r is present in %r" % (not_expected, stderr)) 370 self.assertNotEqual(exitcode, 0) 371 372 @skip_segfault_on_android 373 def test_dump_ext_modules(self): 374 code = """ 375 import faulthandler 376 import sys 377 # Don't filter stdlib module names 378 sys.stdlib_module_names = frozenset() 379 faulthandler.enable() 380 faulthandler._sigsegv() 381 """ 382 stderr, exitcode = self.get_output(code) 383 stderr = '\n'.join(stderr) 384 match = re.search(r'^Extension modules:(.*) \(total: [0-9]+\)$', 385 stderr, re.MULTILINE) 386 if not match: 387 self.fail(f"Cannot find 'Extension modules:' in {stderr!r}") 388 modules = set(match.group(1).strip().split(', ')) 389 for name in ('sys', 'faulthandler'): 390 self.assertIn(name, modules) 391 392 def test_is_enabled(self): 393 orig_stderr = sys.stderr 394 try: 395 # regrtest may replace sys.stderr by io.StringIO object, but 396 # faulthandler.enable() requires that sys.stderr has a fileno() 397 # method 398 sys.stderr = sys.__stderr__ 399 400 was_enabled = faulthandler.is_enabled() 401 try: 402 faulthandler.enable() 403 self.assertTrue(faulthandler.is_enabled()) 404 faulthandler.disable() 405 self.assertFalse(faulthandler.is_enabled()) 406 finally: 407 if was_enabled: 408 faulthandler.enable() 409 else: 410 faulthandler.disable() 411 finally: 412 sys.stderr = orig_stderr 413 414 def test_disabled_by_default(self): 415 # By default, the module should be disabled 416 code = "import faulthandler; print(faulthandler.is_enabled())" 417 args = (sys.executable, "-E", "-c", code) 418 # don't use assert_python_ok() because it always enables faulthandler 419 output = subprocess.check_output(args) 420 self.assertEqual(output.rstrip(), b"False") 421 422 def test_sys_xoptions(self): 423 # Test python -X faulthandler 424 code = "import faulthandler; print(faulthandler.is_enabled())" 425 args = filter(None, (sys.executable, 426 "-E" if sys.flags.ignore_environment else "", 427 "-X", "faulthandler", "-c", code)) 428 env = os.environ.copy() 429 env.pop("PYTHONFAULTHANDLER", None) 430 # don't use assert_python_ok() because it always enables faulthandler 431 output = subprocess.check_output(args, env=env) 432 self.assertEqual(output.rstrip(), b"True") 433 434 def test_env_var(self): 435 # empty env var 436 code = "import faulthandler; print(faulthandler.is_enabled())" 437 args = (sys.executable, "-c", code) 438 env = dict(os.environ) 439 env['PYTHONFAULTHANDLER'] = '' 440 env['PYTHONDEVMODE'] = '' 441 # don't use assert_python_ok() because it always enables faulthandler 442 output = subprocess.check_output(args, env=env) 443 self.assertEqual(output.rstrip(), b"False") 444 445 # non-empty env var 446 env = dict(os.environ) 447 env['PYTHONFAULTHANDLER'] = '1' 448 env['PYTHONDEVMODE'] = '' 449 output = subprocess.check_output(args, env=env) 450 self.assertEqual(output.rstrip(), b"True") 451 452 def check_dump_traceback(self, *, filename=None, fd=None): 453 """ 454 Explicitly call dump_traceback() function and check its output. 455 Raise an error if the output doesn't match the expected format. 456 """ 457 code = """ 458 import faulthandler 459 460 filename = {filename!r} 461 fd = {fd} 462 463 def funcB(): 464 if filename: 465 with open(filename, "wb") as fp: 466 faulthandler.dump_traceback(fp, all_threads=False) 467 elif fd is not None: 468 faulthandler.dump_traceback(fd, 469 all_threads=False) 470 else: 471 faulthandler.dump_traceback(all_threads=False) 472 473 def funcA(): 474 funcB() 475 476 funcA() 477 """ 478 code = code.format( 479 filename=filename, 480 fd=fd, 481 ) 482 if filename: 483 lineno = 9 484 elif fd is not None: 485 lineno = 11 486 else: 487 lineno = 14 488 expected = [ 489 'Stack (most recent call first):', 490 ' File "<string>", line %s in funcB' % lineno, 491 ' File "<string>", line 17 in funcA', 492 ' File "<string>", line 19 in <module>' 493 ] 494 trace, exitcode = self.get_output(code, filename, fd) 495 self.assertEqual(trace, expected) 496 self.assertEqual(exitcode, 0) 497 498 def test_dump_traceback(self): 499 self.check_dump_traceback() 500 501 def test_dump_traceback_file(self): 502 with temporary_filename() as filename: 503 self.check_dump_traceback(filename=filename) 504 505 @unittest.skipIf(sys.platform == "win32", 506 "subprocess doesn't support pass_fds on Windows") 507 def test_dump_traceback_fd(self): 508 with tempfile.TemporaryFile('wb+') as fp: 509 self.check_dump_traceback(fd=fp.fileno()) 510 511 def test_truncate(self): 512 maxlen = 500 513 func_name = 'x' * (maxlen + 50) 514 truncated = 'x' * maxlen + '...' 515 code = """ 516 import faulthandler 517 518 def {func_name}(): 519 faulthandler.dump_traceback(all_threads=False) 520 521 {func_name}() 522 """ 523 code = code.format( 524 func_name=func_name, 525 ) 526 expected = [ 527 'Stack (most recent call first):', 528 ' File "<string>", line 4 in %s' % truncated, 529 ' File "<string>", line 6 in <module>' 530 ] 531 trace, exitcode = self.get_output(code) 532 self.assertEqual(trace, expected) 533 self.assertEqual(exitcode, 0) 534 535 def check_dump_traceback_threads(self, filename): 536 """ 537 Call explicitly dump_traceback(all_threads=True) and check the output. 538 Raise an error if the output doesn't match the expected format. 539 """ 540 code = """ 541 import faulthandler 542 from threading import Thread, Event 543 import time 544 545 def dump(): 546 if {filename}: 547 with open({filename}, "wb") as fp: 548 faulthandler.dump_traceback(fp, all_threads=True) 549 else: 550 faulthandler.dump_traceback(all_threads=True) 551 552 class Waiter(Thread): 553 # avoid blocking if the main thread raises an exception. 554 daemon = True 555 556 def __init__(self): 557 Thread.__init__(self) 558 self.running = Event() 559 self.stop = Event() 560 561 def run(self): 562 self.running.set() 563 self.stop.wait() 564 565 waiter = Waiter() 566 waiter.start() 567 waiter.running.wait() 568 dump() 569 waiter.stop.set() 570 waiter.join() 571 """ 572 code = code.format(filename=repr(filename)) 573 output, exitcode = self.get_output(code, filename) 574 output = '\n'.join(output) 575 if filename: 576 lineno = 8 577 else: 578 lineno = 10 579 regex = r""" 580 ^Thread 0x[0-9a-f]+ \(most recent call first\): 581 (?: File ".*threading.py", line [0-9]+ in [_a-z]+ 582 ){{1,3}} File "<string>", line 23 in run 583 File ".*threading.py", line [0-9]+ in _bootstrap_inner 584 File ".*threading.py", line [0-9]+ in _bootstrap 585 586 Current thread 0x[0-9a-f]+ \(most recent call first\): 587 File "<string>", line {lineno} in dump 588 File "<string>", line 28 in <module>$ 589 """ 590 regex = dedent(regex.format(lineno=lineno)).strip() 591 self.assertRegex(output, regex) 592 self.assertEqual(exitcode, 0) 593 594 def test_dump_traceback_threads(self): 595 self.check_dump_traceback_threads(None) 596 597 def test_dump_traceback_threads_file(self): 598 with temporary_filename() as filename: 599 self.check_dump_traceback_threads(filename) 600 601 def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1, 602 *, filename=None, fd=None): 603 """ 604 Check how many times the traceback is written in timeout x 2.5 seconds, 605 or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending 606 on repeat and cancel options. 607 608 Raise an error if the output doesn't match the expect format. 609 """ 610 timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) 611 code = """ 612 import faulthandler 613 import time 614 import sys 615 616 timeout = {timeout} 617 repeat = {repeat} 618 cancel = {cancel} 619 loops = {loops} 620 filename = {filename!r} 621 fd = {fd} 622 623 def func(timeout, repeat, cancel, file, loops): 624 for loop in range(loops): 625 faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) 626 if cancel: 627 faulthandler.cancel_dump_traceback_later() 628 time.sleep(timeout * 5) 629 faulthandler.cancel_dump_traceback_later() 630 631 if filename: 632 file = open(filename, "wb") 633 elif fd is not None: 634 file = sys.stderr.fileno() 635 else: 636 file = None 637 func(timeout, repeat, cancel, file, loops) 638 if filename: 639 file.close() 640 """ 641 code = code.format( 642 timeout=TIMEOUT, 643 repeat=repeat, 644 cancel=cancel, 645 loops=loops, 646 filename=filename, 647 fd=fd, 648 ) 649 trace, exitcode = self.get_output(code, filename) 650 trace = '\n'.join(trace) 651 652 if not cancel: 653 count = loops 654 if repeat: 655 count *= 2 656 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str 657 regex = expected_traceback(17, 26, header, min_count=count) 658 self.assertRegex(trace, regex) 659 else: 660 self.assertEqual(trace, '') 661 self.assertEqual(exitcode, 0) 662 663 def test_dump_traceback_later(self): 664 self.check_dump_traceback_later() 665 666 def test_dump_traceback_later_repeat(self): 667 self.check_dump_traceback_later(repeat=True) 668 669 def test_dump_traceback_later_cancel(self): 670 self.check_dump_traceback_later(cancel=True) 671 672 def test_dump_traceback_later_file(self): 673 with temporary_filename() as filename: 674 self.check_dump_traceback_later(filename=filename) 675 676 @unittest.skipIf(sys.platform == "win32", 677 "subprocess doesn't support pass_fds on Windows") 678 def test_dump_traceback_later_fd(self): 679 with tempfile.TemporaryFile('wb+') as fp: 680 self.check_dump_traceback_later(fd=fp.fileno()) 681 682 def test_dump_traceback_later_twice(self): 683 self.check_dump_traceback_later(loops=2) 684 685 @unittest.skipIf(not hasattr(faulthandler, "register"), 686 "need faulthandler.register") 687 def check_register(self, filename=False, all_threads=False, 688 unregister=False, chain=False, fd=None): 689 """ 690 Register a handler displaying the traceback on a user signal. Raise the 691 signal and check the written traceback. 692 693 If chain is True, check that the previous signal handler is called. 694 695 Raise an error if the output doesn't match the expected format. 696 """ 697 signum = signal.SIGUSR1 698 code = """ 699 import faulthandler 700 import os 701 import signal 702 import sys 703 704 all_threads = {all_threads} 705 signum = {signum:d} 706 unregister = {unregister} 707 chain = {chain} 708 filename = {filename!r} 709 fd = {fd} 710 711 def func(signum): 712 os.kill(os.getpid(), signum) 713 714 def handler(signum, frame): 715 handler.called = True 716 handler.called = False 717 718 if filename: 719 file = open(filename, "wb") 720 elif fd is not None: 721 file = sys.stderr.fileno() 722 else: 723 file = None 724 if chain: 725 signal.signal(signum, handler) 726 faulthandler.register(signum, file=file, 727 all_threads=all_threads, chain={chain}) 728 if unregister: 729 faulthandler.unregister(signum) 730 func(signum) 731 if chain and not handler.called: 732 if file is not None: 733 output = file 734 else: 735 output = sys.stderr 736 print("Error: signal handler not called!", file=output) 737 exitcode = 1 738 else: 739 exitcode = 0 740 if filename: 741 file.close() 742 sys.exit(exitcode) 743 """ 744 code = code.format( 745 all_threads=all_threads, 746 signum=signum, 747 unregister=unregister, 748 chain=chain, 749 filename=filename, 750 fd=fd, 751 ) 752 trace, exitcode = self.get_output(code, filename) 753 trace = '\n'.join(trace) 754 if not unregister: 755 if all_threads: 756 regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n' 757 else: 758 regex = r'Stack \(most recent call first\):\n' 759 regex = expected_traceback(14, 32, regex) 760 self.assertRegex(trace, regex) 761 else: 762 self.assertEqual(trace, '') 763 if unregister: 764 self.assertNotEqual(exitcode, 0) 765 else: 766 self.assertEqual(exitcode, 0) 767 768 def test_register(self): 769 self.check_register() 770 771 def test_unregister(self): 772 self.check_register(unregister=True) 773 774 def test_register_file(self): 775 with temporary_filename() as filename: 776 self.check_register(filename=filename) 777 778 @unittest.skipIf(sys.platform == "win32", 779 "subprocess doesn't support pass_fds on Windows") 780 def test_register_fd(self): 781 with tempfile.TemporaryFile('wb+') as fp: 782 self.check_register(fd=fp.fileno()) 783 784 def test_register_threads(self): 785 self.check_register(all_threads=True) 786 787 def test_register_chain(self): 788 self.check_register(chain=True) 789 790 @contextmanager 791 def check_stderr_none(self): 792 stderr = sys.stderr 793 try: 794 sys.stderr = None 795 with self.assertRaises(RuntimeError) as cm: 796 yield 797 self.assertEqual(str(cm.exception), "sys.stderr is None") 798 finally: 799 sys.stderr = stderr 800 801 def test_stderr_None(self): 802 # Issue #21497: provide a helpful error if sys.stderr is None, 803 # instead of just an attribute error: "None has no attribute fileno". 804 with self.check_stderr_none(): 805 faulthandler.enable() 806 with self.check_stderr_none(): 807 faulthandler.dump_traceback() 808 with self.check_stderr_none(): 809 faulthandler.dump_traceback_later(1e-3) 810 if hasattr(faulthandler, "register"): 811 with self.check_stderr_none(): 812 faulthandler.register(signal.SIGUSR1) 813 814 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 815 def test_raise_exception(self): 816 for exc, name in ( 817 ('EXCEPTION_ACCESS_VIOLATION', 'access violation'), 818 ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'), 819 ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'), 820 ): 821 self.check_windows_exception(f""" 822 import faulthandler 823 faulthandler.enable() 824 faulthandler._raise_exception(faulthandler._{exc}) 825 """, 826 3, 827 name) 828 829 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 830 def test_ignore_exception(self): 831 for exc_code in ( 832 0xE06D7363, # MSC exception ("Emsc") 833 0xE0434352, # COM Callable Runtime exception ("ECCR") 834 ): 835 code = f""" 836 import faulthandler 837 faulthandler.enable() 838 faulthandler._raise_exception({exc_code}) 839 """ 840 code = dedent(code) 841 output, exitcode = self.get_output(code) 842 self.assertEqual(output, []) 843 self.assertEqual(exitcode, exc_code) 844 845 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 846 def test_raise_nonfatal_exception(self): 847 # These exceptions are not strictly errors. Letting 848 # faulthandler display the traceback when they are 849 # raised is likely to result in noise. However, they 850 # may still terminate the process if there is no 851 # handler installed for them (which there typically 852 # is, e.g. for debug messages). 853 for exc in ( 854 0x00000000, 855 0x34567890, 856 0x40000000, 857 0x40001000, 858 0x70000000, 859 0x7FFFFFFF, 860 ): 861 output, exitcode = self.get_output(f""" 862 import faulthandler 863 faulthandler.enable() 864 faulthandler._raise_exception(0x{exc:x}) 865 """ 866 ) 867 self.assertEqual(output, []) 868 # On Windows older than 7 SP1, the actual exception code has 869 # bit 29 cleared. 870 self.assertIn(exitcode, 871 (exc, exc & ~0x10000000)) 872 873 @unittest.skipUnless(MS_WINDOWS, 'specific to Windows') 874 def test_disable_windows_exc_handler(self): 875 code = dedent(""" 876 import faulthandler 877 faulthandler.enable() 878 faulthandler.disable() 879 code = faulthandler._EXCEPTION_ACCESS_VIOLATION 880 faulthandler._raise_exception(code) 881 """) 882 output, exitcode = self.get_output(code) 883 self.assertEqual(output, []) 884 self.assertEqual(exitcode, 0xC0000005) 885 886 def test_cancel_later_without_dump_traceback_later(self): 887 # bpo-37933: Calling cancel_dump_traceback_later() 888 # without dump_traceback_later() must not segfault. 889 code = dedent(""" 890 import faulthandler 891 faulthandler.cancel_dump_traceback_later() 892 """) 893 output, exitcode = self.get_output(code) 894 self.assertEqual(output, []) 895 self.assertEqual(exitcode, 0) 896 897 898if __name__ == "__main__": 899 unittest.main() 900