• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2import os
3import textwrap
4import importlib
5import sys
6from test.support import os_helper, SHORT_TIMEOUT
7from test.support.script_helper import make_script
8
9import subprocess
10
11PROCESS_VM_READV_SUPPORTED = False
12
13try:
14    from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
15    from _testexternalinspection import get_stack_trace
16except ImportError:
17    raise unittest.SkipTest("Test only runs when _testexternalinspection is available")
18
19def _make_test_script(script_dir, script_basename, source):
20    to_return = make_script(script_dir, script_basename, source)
21    importlib.invalidate_caches()
22    return to_return
23
24class TestGetStackTrace(unittest.TestCase):
25
26    @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
27    @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
28    def test_remote_stack_trace(self):
29        # Spawn a process with some realistic Python code
30        script = textwrap.dedent("""\
31            import time, sys, os
32            def bar():
33                for x in range(100):
34                    if x == 50:
35                        baz()
36            def baz():
37                foo()
38
39            def foo():
40                fifo = sys.argv[1]
41                with open(sys.argv[1], "w") as fifo:
42                    fifo.write("ready")
43                time.sleep(1000)
44
45            bar()
46            """)
47        stack_trace = None
48        with os_helper.temp_dir() as work_dir:
49            script_dir = os.path.join(work_dir, "script_pkg")
50            os.mkdir(script_dir)
51            fifo = f"{work_dir}/the_fifo"
52            os.mkfifo(fifo)
53            script_name = _make_test_script(script_dir, 'script', script)
54            try:
55                p = subprocess.Popen([sys.executable, script_name,  str(fifo)])
56                with open(fifo, "r") as fifo_file:
57                    response = fifo_file.read()
58                self.assertEqual(response, "ready")
59                stack_trace = get_stack_trace(p.pid)
60            except PermissionError:
61                self.skipTest("Insufficient permissions to read the stack trace")
62            finally:
63                os.remove(fifo)
64                p.kill()
65                p.terminate()
66                p.wait(timeout=SHORT_TIMEOUT)
67
68
69            expected_stack_trace = [
70                'foo',
71                'baz',
72                'bar',
73                '<module>'
74            ]
75            self.assertEqual(stack_trace, expected_stack_trace)
76
77    @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
78    @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
79    def test_self_trace(self):
80        stack_trace = get_stack_trace(os.getpid())
81        self.assertEqual(stack_trace[0], "test_self_trace")
82
83if __name__ == "__main__":
84    unittest.main()
85