• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from contextlib import contextmanager
2import datetime
3import faulthandler
4import os
5import signal
6import subprocess
7import sys
8import sysconfig
9from test import support
10from test.support import script_helper, is_android
11import tempfile
12import threading
13import unittest
14from textwrap import dedent
15
16try:
17    import _testcapi
18except ImportError:
19    _testcapi = None
20
21TIMEOUT = 0.5
22MS_WINDOWS = (os.name == 'nt')
23_cflags = sysconfig.get_config_var('CFLAGS') or ''
24_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
25UB_SANITIZER = (
26    '-fsanitize=undefined' in _cflags or
27    '--with-undefined-behavior-sanitizer' in _config_args
28)
29MEMORY_SANITIZER = (
30    '-fsanitize=memory' in _cflags or
31    '--with-memory-sanitizer' in _config_args
32)
33
34
35def expected_traceback(lineno1, lineno2, header, min_count=1):
36    regex = header
37    regex += '  File "<string>", line %s in func\n' % lineno1
38    regex += '  File "<string>", line %s in <module>' % lineno2
39    if 1 < min_count:
40        return '^' + (regex + '\n') * (min_count - 1) + regex
41    else:
42        return '^' + regex + '$'
43
44def skip_segfault_on_android(test):
45    # Issue #32138: Raising SIGSEGV on Android may not cause a crash.
46    return unittest.skipIf(is_android,
47                           'raising SIGSEGV on Android is unreliable')(test)
48
49@contextmanager
50def temporary_filename():
51    filename = tempfile.mktemp()
52    try:
53        yield filename
54    finally:
55        support.unlink(filename)
56
57class FaultHandlerTests(unittest.TestCase):
58    def get_output(self, code, filename=None, fd=None):
59        """
60        Run the specified code in Python (in a new child process) and read the
61        output from the standard error or from a file (if filename is set).
62        Return the output lines as a list.
63
64        Strip the reference count from the standard error for Python debug
65        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
66        thread XXX".
67        """
68        code = dedent(code).strip()
69        pass_fds = []
70        if fd is not None:
71            pass_fds.append(fd)
72        with support.SuppressCrashReport():
73            process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
74            with process:
75                stdout, stderr = process.communicate()
76                exitcode = process.wait()
77        output = support.strip_python_stderr(stdout)
78        output = output.decode('ascii', 'backslashreplace')
79        if filename:
80            self.assertEqual(output, '')
81            with open(filename, "rb") as fp:
82                output = fp.read()
83            output = output.decode('ascii', 'backslashreplace')
84        elif fd is not None:
85            self.assertEqual(output, '')
86            os.lseek(fd, os.SEEK_SET, 0)
87            with open(fd, "rb", closefd=False) as fp:
88                output = fp.read()
89            output = output.decode('ascii', 'backslashreplace')
90        return output.splitlines(), exitcode
91
92    def check_error(self, code, line_number, fatal_error, *,
93                    filename=None, all_threads=True, other_regex=None,
94                    fd=None, know_current_thread=True):
95        """
96        Check that the fault handler for fatal errors is enabled and check the
97        traceback from the child process output.
98
99        Raise an error if the output doesn't match the expected format.
100        """
101        if all_threads:
102            if know_current_thread:
103                header = 'Current thread 0x[0-9a-f]+'
104            else:
105                header = 'Thread 0x[0-9a-f]+'
106        else:
107            header = 'Stack'
108        regex = r"""
109            (?m)^{fatal_error}
110
111            {header} \(most recent call first\):
112              File "<string>", line {lineno} in <module>
113            """
114        regex = dedent(regex.format(
115            lineno=line_number,
116            fatal_error=fatal_error,
117            header=header)).strip()
118        if other_regex:
119            regex += '|' + other_regex
120        output, exitcode = self.get_output(code, filename=filename, fd=fd)
121        output = '\n'.join(output)
122        self.assertRegex(output, regex)
123        self.assertNotEqual(exitcode, 0)
124
125    def check_fatal_error(self, code, line_number, name_regex, **kw):
126        fatal_error = 'Fatal Python error: %s' % name_regex
127        self.check_error(code, line_number, fatal_error, **kw)
128
129    def check_windows_exception(self, code, line_number, name_regex, **kw):
130        fatal_error = 'Windows fatal exception: %s' % name_regex
131        self.check_error(code, line_number, fatal_error, **kw)
132
133    @unittest.skipIf(sys.platform.startswith('aix'),
134                     "the first page of memory is a mapped read-only on AIX")
135    def test_read_null(self):
136        if not MS_WINDOWS:
137            self.check_fatal_error("""
138                import faulthandler
139                faulthandler.enable()
140                faulthandler._read_null()
141                """,
142                3,
143                # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
144                '(?:Segmentation fault'
145                    '|Bus error'
146                    '|Illegal instruction)')
147        else:
148            self.check_windows_exception("""
149                import faulthandler
150                faulthandler.enable()
151                faulthandler._read_null()
152                """,
153                3,
154                'access violation')
155
156    @skip_segfault_on_android
157    def test_sigsegv(self):
158        self.check_fatal_error("""
159            import faulthandler
160            faulthandler.enable()
161            faulthandler._sigsegv()
162            """,
163            3,
164            'Segmentation fault')
165
166    def test_fatal_error_c_thread(self):
167        self.check_fatal_error("""
168            import faulthandler
169            faulthandler.enable()
170            faulthandler._fatal_error_c_thread()
171            """,
172            3,
173            'in new thread',
174            know_current_thread=False)
175
176    def test_sigabrt(self):
177        self.check_fatal_error("""
178            import faulthandler
179            faulthandler.enable()
180            faulthandler._sigabrt()
181            """,
182            3,
183            'Aborted')
184
185    @unittest.skipIf(sys.platform == 'win32',
186                     "SIGFPE cannot be caught on Windows")
187    def test_sigfpe(self):
188        self.check_fatal_error("""
189            import faulthandler
190            faulthandler.enable()
191            faulthandler._sigfpe()
192            """,
193            3,
194            'Floating point exception')
195
196    @unittest.skipIf(_testcapi is None, 'need _testcapi')
197    @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
198    @skip_segfault_on_android
199    def test_sigbus(self):
200        self.check_fatal_error("""
201            import _testcapi
202            import faulthandler
203            import signal
204
205            faulthandler.enable()
206            _testcapi.raise_signal(signal.SIGBUS)
207            """,
208            6,
209            'Bus error')
210
211    @unittest.skipIf(_testcapi is None, 'need _testcapi')
212    @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
213    @skip_segfault_on_android
214    def test_sigill(self):
215        self.check_fatal_error("""
216            import _testcapi
217            import faulthandler
218            import signal
219
220            faulthandler.enable()
221            _testcapi.raise_signal(signal.SIGILL)
222            """,
223            6,
224            'Illegal instruction')
225
226    def test_fatal_error(self):
227        self.check_fatal_error("""
228            import faulthandler
229            faulthandler._fatal_error(b'xyz')
230            """,
231            2,
232            'xyz')
233
234    def test_fatal_error_without_gil(self):
235        self.check_fatal_error("""
236            import faulthandler
237            faulthandler._fatal_error(b'xyz', True)
238            """,
239            2,
240            'xyz')
241
242    @unittest.skipIf(sys.platform.startswith('openbsd'),
243                     "Issue #12868: sigaltstack() doesn't work on "
244                     "OpenBSD if Python is compiled with pthread")
245    @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
246                     'need faulthandler._stack_overflow()')
247    def test_stack_overflow(self):
248        self.check_fatal_error("""
249            import faulthandler
250            faulthandler.enable()
251            faulthandler._stack_overflow()
252            """,
253            3,
254            '(?:Segmentation fault|Bus error)',
255            other_regex='unable to raise a stack overflow')
256
257    @skip_segfault_on_android
258    def test_gil_released(self):
259        self.check_fatal_error("""
260            import faulthandler
261            faulthandler.enable()
262            faulthandler._sigsegv(True)
263            """,
264            3,
265            'Segmentation fault')
266
267    @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER,
268                     "sanitizer builds change crashing process output.")
269    @skip_segfault_on_android
270    def test_enable_file(self):
271        with temporary_filename() as filename:
272            self.check_fatal_error("""
273                import faulthandler
274                output = open({filename}, 'wb')
275                faulthandler.enable(output)
276                faulthandler._sigsegv()
277                """.format(filename=repr(filename)),
278                4,
279                'Segmentation fault',
280                filename=filename)
281
282    @unittest.skipIf(sys.platform == "win32",
283                     "subprocess doesn't support pass_fds on Windows")
284    @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER,
285                     "sanitizer builds change crashing process output.")
286    @skip_segfault_on_android
287    def test_enable_fd(self):
288        with tempfile.TemporaryFile('wb+') as fp:
289            fd = fp.fileno()
290            self.check_fatal_error("""
291                import faulthandler
292                import sys
293                faulthandler.enable(%s)
294                faulthandler._sigsegv()
295                """ % fd,
296                4,
297                'Segmentation fault',
298                fd=fd)
299
300    @skip_segfault_on_android
301    def test_enable_single_thread(self):
302        self.check_fatal_error("""
303            import faulthandler
304            faulthandler.enable(all_threads=False)
305            faulthandler._sigsegv()
306            """,
307            3,
308            'Segmentation fault',
309            all_threads=False)
310
311    @skip_segfault_on_android
312    def test_disable(self):
313        code = """
314            import faulthandler
315            faulthandler.enable()
316            faulthandler.disable()
317            faulthandler._sigsegv()
318            """
319        not_expected = 'Fatal Python error'
320        stderr, exitcode = self.get_output(code)
321        stderr = '\n'.join(stderr)
322        self.assertTrue(not_expected not in stderr,
323                     "%r is present in %r" % (not_expected, stderr))
324        self.assertNotEqual(exitcode, 0)
325
326    def test_is_enabled(self):
327        orig_stderr = sys.stderr
328        try:
329            # regrtest may replace sys.stderr by io.StringIO object, but
330            # faulthandler.enable() requires that sys.stderr has a fileno()
331            # method
332            sys.stderr = sys.__stderr__
333
334            was_enabled = faulthandler.is_enabled()
335            try:
336                faulthandler.enable()
337                self.assertTrue(faulthandler.is_enabled())
338                faulthandler.disable()
339                self.assertFalse(faulthandler.is_enabled())
340            finally:
341                if was_enabled:
342                    faulthandler.enable()
343                else:
344                    faulthandler.disable()
345        finally:
346            sys.stderr = orig_stderr
347
348    def test_disabled_by_default(self):
349        # By default, the module should be disabled
350        code = "import faulthandler; print(faulthandler.is_enabled())"
351        args = (sys.executable, "-E", "-c", code)
352        # don't use assert_python_ok() because it always enables faulthandler
353        output = subprocess.check_output(args)
354        self.assertEqual(output.rstrip(), b"False")
355
356    def test_sys_xoptions(self):
357        # Test python -X faulthandler
358        code = "import faulthandler; print(faulthandler.is_enabled())"
359        args = filter(None, (sys.executable,
360                             "-E" if sys.flags.ignore_environment else "",
361                             "-X", "faulthandler", "-c", code))
362        env = os.environ.copy()
363        env.pop("PYTHONFAULTHANDLER", None)
364        # don't use assert_python_ok() because it always enables faulthandler
365        output = subprocess.check_output(args, env=env)
366        self.assertEqual(output.rstrip(), b"True")
367
368    def test_env_var(self):
369        # empty env var
370        code = "import faulthandler; print(faulthandler.is_enabled())"
371        args = (sys.executable, "-c", code)
372        env = dict(os.environ)
373        env['PYTHONFAULTHANDLER'] = ''
374        env['PYTHONDEVMODE'] = ''
375        # don't use assert_python_ok() because it always enables faulthandler
376        output = subprocess.check_output(args, env=env)
377        self.assertEqual(output.rstrip(), b"False")
378
379        # non-empty env var
380        env = dict(os.environ)
381        env['PYTHONFAULTHANDLER'] = '1'
382        env['PYTHONDEVMODE'] = ''
383        output = subprocess.check_output(args, env=env)
384        self.assertEqual(output.rstrip(), b"True")
385
386    def check_dump_traceback(self, *, filename=None, fd=None):
387        """
388        Explicitly call dump_traceback() function and check its output.
389        Raise an error if the output doesn't match the expected format.
390        """
391        code = """
392            import faulthandler
393
394            filename = {filename!r}
395            fd = {fd}
396
397            def funcB():
398                if filename:
399                    with open(filename, "wb") as fp:
400                        faulthandler.dump_traceback(fp, all_threads=False)
401                elif fd is not None:
402                    faulthandler.dump_traceback(fd,
403                                                all_threads=False)
404                else:
405                    faulthandler.dump_traceback(all_threads=False)
406
407            def funcA():
408                funcB()
409
410            funcA()
411            """
412        code = code.format(
413            filename=filename,
414            fd=fd,
415        )
416        if filename:
417            lineno = 9
418        elif fd is not None:
419            lineno = 12
420        else:
421            lineno = 14
422        expected = [
423            'Stack (most recent call first):',
424            '  File "<string>", line %s in funcB' % lineno,
425            '  File "<string>", line 17 in funcA',
426            '  File "<string>", line 19 in <module>'
427        ]
428        trace, exitcode = self.get_output(code, filename, fd)
429        self.assertEqual(trace, expected)
430        self.assertEqual(exitcode, 0)
431
432    def test_dump_traceback(self):
433        self.check_dump_traceback()
434
435    def test_dump_traceback_file(self):
436        with temporary_filename() as filename:
437            self.check_dump_traceback(filename=filename)
438
439    @unittest.skipIf(sys.platform == "win32",
440                     "subprocess doesn't support pass_fds on Windows")
441    def test_dump_traceback_fd(self):
442        with tempfile.TemporaryFile('wb+') as fp:
443            self.check_dump_traceback(fd=fp.fileno())
444
445    def test_truncate(self):
446        maxlen = 500
447        func_name = 'x' * (maxlen + 50)
448        truncated = 'x' * maxlen + '...'
449        code = """
450            import faulthandler
451
452            def {func_name}():
453                faulthandler.dump_traceback(all_threads=False)
454
455            {func_name}()
456            """
457        code = code.format(
458            func_name=func_name,
459        )
460        expected = [
461            'Stack (most recent call first):',
462            '  File "<string>", line 4 in %s' % truncated,
463            '  File "<string>", line 6 in <module>'
464        ]
465        trace, exitcode = self.get_output(code)
466        self.assertEqual(trace, expected)
467        self.assertEqual(exitcode, 0)
468
469    def check_dump_traceback_threads(self, filename):
470        """
471        Call explicitly dump_traceback(all_threads=True) and check the output.
472        Raise an error if the output doesn't match the expected format.
473        """
474        code = """
475            import faulthandler
476            from threading import Thread, Event
477            import time
478
479            def dump():
480                if {filename}:
481                    with open({filename}, "wb") as fp:
482                        faulthandler.dump_traceback(fp, all_threads=True)
483                else:
484                    faulthandler.dump_traceback(all_threads=True)
485
486            class Waiter(Thread):
487                # avoid blocking if the main thread raises an exception.
488                daemon = True
489
490                def __init__(self):
491                    Thread.__init__(self)
492                    self.running = Event()
493                    self.stop = Event()
494
495                def run(self):
496                    self.running.set()
497                    self.stop.wait()
498
499            waiter = Waiter()
500            waiter.start()
501            waiter.running.wait()
502            dump()
503            waiter.stop.set()
504            waiter.join()
505            """
506        code = code.format(filename=repr(filename))
507        output, exitcode = self.get_output(code, filename)
508        output = '\n'.join(output)
509        if filename:
510            lineno = 8
511        else:
512            lineno = 10
513        regex = r"""
514            ^Thread 0x[0-9a-f]+ \(most recent call first\):
515            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
516            ){{1,3}}  File "<string>", line 23 in run
517              File ".*threading.py", line [0-9]+ in _bootstrap_inner
518              File ".*threading.py", line [0-9]+ in _bootstrap
519
520            Current thread 0x[0-9a-f]+ \(most recent call first\):
521              File "<string>", line {lineno} in dump
522              File "<string>", line 28 in <module>$
523            """
524        regex = dedent(regex.format(lineno=lineno)).strip()
525        self.assertRegex(output, regex)
526        self.assertEqual(exitcode, 0)
527
528    def test_dump_traceback_threads(self):
529        self.check_dump_traceback_threads(None)
530
531    def test_dump_traceback_threads_file(self):
532        with temporary_filename() as filename:
533            self.check_dump_traceback_threads(filename)
534
535    @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
536                     'need faulthandler.dump_traceback_later()')
537    def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
538                                   *, filename=None, fd=None):
539        """
540        Check how many times the traceback is written in timeout x 2.5 seconds,
541        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
542        on repeat and cancel options.
543
544        Raise an error if the output doesn't match the expect format.
545        """
546        timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
547        code = """
548            import faulthandler
549            import time
550            import sys
551
552            timeout = {timeout}
553            repeat = {repeat}
554            cancel = {cancel}
555            loops = {loops}
556            filename = {filename!r}
557            fd = {fd}
558
559            def func(timeout, repeat, cancel, file, loops):
560                for loop in range(loops):
561                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
562                    if cancel:
563                        faulthandler.cancel_dump_traceback_later()
564                    time.sleep(timeout * 5)
565                    faulthandler.cancel_dump_traceback_later()
566
567            if filename:
568                file = open(filename, "wb")
569            elif fd is not None:
570                file = sys.stderr.fileno()
571            else:
572                file = None
573            func(timeout, repeat, cancel, file, loops)
574            if filename:
575                file.close()
576            """
577        code = code.format(
578            timeout=TIMEOUT,
579            repeat=repeat,
580            cancel=cancel,
581            loops=loops,
582            filename=filename,
583            fd=fd,
584        )
585        trace, exitcode = self.get_output(code, filename)
586        trace = '\n'.join(trace)
587
588        if not cancel:
589            count = loops
590            if repeat:
591                count *= 2
592            header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
593            regex = expected_traceback(17, 26, header, min_count=count)
594            self.assertRegex(trace, regex)
595        else:
596            self.assertEqual(trace, '')
597        self.assertEqual(exitcode, 0)
598
599    def test_dump_traceback_later(self):
600        self.check_dump_traceback_later()
601
602    def test_dump_traceback_later_repeat(self):
603        self.check_dump_traceback_later(repeat=True)
604
605    def test_dump_traceback_later_cancel(self):
606        self.check_dump_traceback_later(cancel=True)
607
608    def test_dump_traceback_later_file(self):
609        with temporary_filename() as filename:
610            self.check_dump_traceback_later(filename=filename)
611
612    @unittest.skipIf(sys.platform == "win32",
613                     "subprocess doesn't support pass_fds on Windows")
614    def test_dump_traceback_later_fd(self):
615        with tempfile.TemporaryFile('wb+') as fp:
616            self.check_dump_traceback_later(fd=fp.fileno())
617
618    def test_dump_traceback_later_twice(self):
619        self.check_dump_traceback_later(loops=2)
620
621    @unittest.skipIf(not hasattr(faulthandler, "register"),
622                     "need faulthandler.register")
623    def check_register(self, filename=False, all_threads=False,
624                       unregister=False, chain=False, fd=None):
625        """
626        Register a handler displaying the traceback on a user signal. Raise the
627        signal and check the written traceback.
628
629        If chain is True, check that the previous signal handler is called.
630
631        Raise an error if the output doesn't match the expected format.
632        """
633        signum = signal.SIGUSR1
634        code = """
635            import faulthandler
636            import os
637            import signal
638            import sys
639
640            all_threads = {all_threads}
641            signum = {signum}
642            unregister = {unregister}
643            chain = {chain}
644            filename = {filename!r}
645            fd = {fd}
646
647            def func(signum):
648                os.kill(os.getpid(), signum)
649
650            def handler(signum, frame):
651                handler.called = True
652            handler.called = False
653
654            if filename:
655                file = open(filename, "wb")
656            elif fd is not None:
657                file = sys.stderr.fileno()
658            else:
659                file = None
660            if chain:
661                signal.signal(signum, handler)
662            faulthandler.register(signum, file=file,
663                                  all_threads=all_threads, chain={chain})
664            if unregister:
665                faulthandler.unregister(signum)
666            func(signum)
667            if chain and not handler.called:
668                if file is not None:
669                    output = file
670                else:
671                    output = sys.stderr
672                print("Error: signal handler not called!", file=output)
673                exitcode = 1
674            else:
675                exitcode = 0
676            if filename:
677                file.close()
678            sys.exit(exitcode)
679            """
680        code = code.format(
681            all_threads=all_threads,
682            signum=signum,
683            unregister=unregister,
684            chain=chain,
685            filename=filename,
686            fd=fd,
687        )
688        trace, exitcode = self.get_output(code, filename)
689        trace = '\n'.join(trace)
690        if not unregister:
691            if all_threads:
692                regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
693            else:
694                regex = r'Stack \(most recent call first\):\n'
695            regex = expected_traceback(14, 32, regex)
696            self.assertRegex(trace, regex)
697        else:
698            self.assertEqual(trace, '')
699        if unregister:
700            self.assertNotEqual(exitcode, 0)
701        else:
702            self.assertEqual(exitcode, 0)
703
704    def test_register(self):
705        self.check_register()
706
707    def test_unregister(self):
708        self.check_register(unregister=True)
709
710    def test_register_file(self):
711        with temporary_filename() as filename:
712            self.check_register(filename=filename)
713
714    @unittest.skipIf(sys.platform == "win32",
715                     "subprocess doesn't support pass_fds on Windows")
716    def test_register_fd(self):
717        with tempfile.TemporaryFile('wb+') as fp:
718            self.check_register(fd=fp.fileno())
719
720    def test_register_threads(self):
721        self.check_register(all_threads=True)
722
723    def test_register_chain(self):
724        self.check_register(chain=True)
725
726    @contextmanager
727    def check_stderr_none(self):
728        stderr = sys.stderr
729        try:
730            sys.stderr = None
731            with self.assertRaises(RuntimeError) as cm:
732                yield
733            self.assertEqual(str(cm.exception), "sys.stderr is None")
734        finally:
735            sys.stderr = stderr
736
737    def test_stderr_None(self):
738        # Issue #21497: provide a helpful error if sys.stderr is None,
739        # instead of just an attribute error: "None has no attribute fileno".
740        with self.check_stderr_none():
741            faulthandler.enable()
742        with self.check_stderr_none():
743            faulthandler.dump_traceback()
744        if hasattr(faulthandler, 'dump_traceback_later'):
745            with self.check_stderr_none():
746                faulthandler.dump_traceback_later(1e-3)
747        if hasattr(faulthandler, "register"):
748            with self.check_stderr_none():
749                faulthandler.register(signal.SIGUSR1)
750
751    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
752    def test_raise_exception(self):
753        for exc, name in (
754            ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
755            ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
756            ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
757        ):
758            self.check_windows_exception(f"""
759                import faulthandler
760                faulthandler.enable()
761                faulthandler._raise_exception(faulthandler._{exc})
762                """,
763                3,
764                name)
765
766    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
767    def test_ignore_exception(self):
768        for exc_code in (
769            0xE06D7363,   # MSC exception ("Emsc")
770            0xE0434352,   # COM Callable Runtime exception ("ECCR")
771        ):
772            code = f"""
773                    import faulthandler
774                    faulthandler.enable()
775                    faulthandler._raise_exception({exc_code})
776                    """
777            code = dedent(code)
778            output, exitcode = self.get_output(code)
779            self.assertEqual(output, [])
780            self.assertEqual(exitcode, exc_code)
781
782    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
783    def test_raise_nonfatal_exception(self):
784        # These exceptions are not strictly errors. Letting
785        # faulthandler display the traceback when they are
786        # raised is likely to result in noise. However, they
787        # may still terminate the process if there is no
788        # handler installed for them (which there typically
789        # is, e.g. for debug messages).
790        for exc in (
791            0x00000000,
792            0x34567890,
793            0x40000000,
794            0x40001000,
795            0x70000000,
796            0x7FFFFFFF,
797        ):
798            output, exitcode = self.get_output(f"""
799                import faulthandler
800                faulthandler.enable()
801                faulthandler._raise_exception(0x{exc:x})
802                """
803            )
804            self.assertEqual(output, [])
805            # On Windows older than 7 SP1, the actual exception code has
806            # bit 29 cleared.
807            self.assertIn(exitcode,
808                          (exc, exc & ~0x10000000))
809
810    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
811    def test_disable_windows_exc_handler(self):
812        code = dedent("""
813            import faulthandler
814            faulthandler.enable()
815            faulthandler.disable()
816            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
817            faulthandler._raise_exception(code)
818        """)
819        output, exitcode = self.get_output(code)
820        self.assertEqual(output, [])
821        self.assertEqual(exitcode, 0xC0000005)
822
823
824if __name__ == "__main__":
825    unittest.main()
826