1import dis 2import os.path 3import re 4import subprocess 5import sys 6import types 7import unittest 8 9from test.support import findfile 10 11 12def abspath(filename): 13 return os.path.abspath(findfile(filename, subdir="dtracedata")) 14 15 16def normalize_trace_output(output): 17 """Normalize DTrace output for comparison. 18 19 DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers 20 are concatenated. So if the operating system moves our thread around, the 21 straight result can be "non-causal". So we add timestamps to the probe 22 firing, sort by that field, then strip it from the output""" 23 24 # When compiling with '--with-pydebug', strip '[# refs]' debug output. 25 output = re.sub(r"\[[0-9]+ refs\]", "", output) 26 try: 27 result = [ 28 row.split("\t") 29 for row in output.splitlines() 30 if row and not row.startswith('#') 31 ] 32 result.sort(key=lambda row: int(row[0])) 33 result = [row[1] for row in result] 34 return "\n".join(result) 35 except (IndexError, ValueError): 36 raise AssertionError( 37 "tracer produced unparsable output:\n{}".format(output) 38 ) 39 40 41class TraceBackend: 42 EXTENSION = None 43 COMMAND = None 44 COMMAND_ARGS = [] 45 46 def run_case(self, name, optimize_python=None): 47 actual_output = normalize_trace_output(self.trace_python( 48 script_file=abspath(name + self.EXTENSION), 49 python_file=abspath(name + ".py"), 50 optimize_python=optimize_python)) 51 52 with open(abspath(name + self.EXTENSION + ".expected")) as f: 53 expected_output = f.read().rstrip() 54 55 return (expected_output, actual_output) 56 57 def generate_trace_command(self, script_file, subcommand=None): 58 command = self.COMMAND + [script_file] 59 if subcommand: 60 command += ["-c", subcommand] 61 return command 62 63 def trace(self, script_file, subcommand=None): 64 command = self.generate_trace_command(script_file, subcommand) 65 stdout, _ = subprocess.Popen(command, 66 stdout=subprocess.PIPE, 67 stderr=subprocess.STDOUT, 68 universal_newlines=True).communicate() 69 return stdout 70 71 def trace_python(self, script_file, python_file, optimize_python=None): 72 python_flags = [] 73 if optimize_python: 74 python_flags.extend(["-O"] * optimize_python) 75 subcommand = " ".join([sys.executable] + python_flags + [python_file]) 76 return self.trace(script_file, subcommand) 77 78 def assert_usable(self): 79 try: 80 output = self.trace(abspath("assert_usable" + self.EXTENSION)) 81 output = output.strip() 82 except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe: 83 output = str(fnfe) 84 if output != "probe: success": 85 raise unittest.SkipTest( 86 "{}(1) failed: {}".format(self.COMMAND[0], output) 87 ) 88 89 90class DTraceBackend(TraceBackend): 91 EXTENSION = ".d" 92 COMMAND = ["dtrace", "-q", "-s"] 93 94 95class SystemTapBackend(TraceBackend): 96 EXTENSION = ".stp" 97 COMMAND = ["stap", "-g"] 98 99 100class TraceTests: 101 # unittest.TestCase options 102 maxDiff = None 103 104 # TraceTests options 105 backend = None 106 optimize_python = 0 107 108 @classmethod 109 def setUpClass(self): 110 self.backend.assert_usable() 111 112 def run_case(self, name): 113 actual_output, expected_output = self.backend.run_case( 114 name, optimize_python=self.optimize_python) 115 self.assertEqual(actual_output, expected_output) 116 117 def test_function_entry_return(self): 118 self.run_case("call_stack") 119 120 def test_verify_call_opcodes(self): 121 """Ensure our call stack test hits all function call opcodes""" 122 123 opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"]) 124 125 with open(abspath("call_stack.py")) as f: 126 code_string = f.read() 127 128 def get_function_instructions(funcname): 129 # Recompile with appropriate optimization setting 130 code = compile(source=code_string, 131 filename="<string>", 132 mode="exec", 133 optimize=self.optimize_python) 134 135 for c in code.co_consts: 136 if isinstance(c, types.CodeType) and c.co_name == funcname: 137 return dis.get_instructions(c) 138 return [] 139 140 for instruction in get_function_instructions('start'): 141 opcodes.discard(instruction.opname) 142 143 self.assertEqual(set(), opcodes) 144 145 def test_gc(self): 146 self.run_case("gc") 147 148 def test_line(self): 149 self.run_case("line") 150 151 152class DTraceNormalTests(TraceTests, unittest.TestCase): 153 backend = DTraceBackend() 154 optimize_python = 0 155 156 157class DTraceOptimizedTests(TraceTests, unittest.TestCase): 158 backend = DTraceBackend() 159 optimize_python = 2 160 161 162class SystemTapNormalTests(TraceTests, unittest.TestCase): 163 backend = SystemTapBackend() 164 optimize_python = 0 165 166 167class SystemTapOptimizedTests(TraceTests, unittest.TestCase): 168 backend = SystemTapBackend() 169 optimize_python = 2 170 171 172if __name__ == '__main__': 173 unittest.main() 174