1import unittest 2import os 3import textwrap 4import importlib 5import sys 6from test.support import os_helper, SHORT_TIMEOUT 7from test.support.script_helper import make_script 8 9import subprocess 10 11PROCESS_VM_READV_SUPPORTED = False 12 13try: 14 from _testexternalinspection import PROCESS_VM_READV_SUPPORTED 15 from _testexternalinspection import get_stack_trace 16except ImportError: 17 raise unittest.SkipTest("Test only runs when _testexternalinspection is available") 18 19def _make_test_script(script_dir, script_basename, source): 20 to_return = make_script(script_dir, script_basename, source) 21 importlib.invalidate_caches() 22 return to_return 23 24class TestGetStackTrace(unittest.TestCase): 25 26 @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS") 27 @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support") 28 def test_remote_stack_trace(self): 29 # Spawn a process with some realistic Python code 30 script = textwrap.dedent("""\ 31 import time, sys, os 32 def bar(): 33 for x in range(100): 34 if x == 50: 35 baz() 36 def baz(): 37 foo() 38 39 def foo(): 40 fifo = sys.argv[1] 41 with open(sys.argv[1], "w") as fifo: 42 fifo.write("ready") 43 time.sleep(1000) 44 45 bar() 46 """) 47 stack_trace = None 48 with os_helper.temp_dir() as work_dir: 49 script_dir = os.path.join(work_dir, "script_pkg") 50 os.mkdir(script_dir) 51 fifo = f"{work_dir}/the_fifo" 52 os.mkfifo(fifo) 53 script_name = _make_test_script(script_dir, 'script', script) 54 try: 55 p = subprocess.Popen([sys.executable, script_name, str(fifo)]) 56 with open(fifo, "r") as fifo_file: 57 response = fifo_file.read() 58 self.assertEqual(response, "ready") 59 stack_trace = get_stack_trace(p.pid) 60 except PermissionError: 61 self.skipTest("Insufficient permissions to read the stack trace") 62 finally: 63 os.remove(fifo) 64 p.kill() 65 p.terminate() 66 p.wait(timeout=SHORT_TIMEOUT) 67 68 69 expected_stack_trace = [ 70 'foo', 71 'baz', 72 'bar', 73 '<module>' 74 ] 75 self.assertEqual(stack_trace, expected_stack_trace) 76 77 @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS") 78 @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support") 79 def test_self_trace(self): 80 stack_trace = get_stack_trace(os.getpid()) 81 self.assertEqual(stack_trace[0], "test_self_trace") 82 83if __name__ == "__main__": 84 unittest.main() 85