1import gc 2import pprint 3import sys 4import unittest 5 6from test import test_support 7 8class TestGetProfile(unittest.TestCase): 9 def setUp(self): 10 sys.setprofile(None) 11 12 def tearDown(self): 13 sys.setprofile(None) 14 15 def test_empty(self): 16 self.assertIsNone(sys.getprofile()) 17 18 def test_setget(self): 19 def fn(*args): 20 pass 21 22 sys.setprofile(fn) 23 self.assertIs(sys.getprofile(), fn) 24 25class HookWatcher: 26 def __init__(self): 27 self.frames = [] 28 self.events = [] 29 30 def callback(self, frame, event, arg): 31 if (event == "call" 32 or event == "return" 33 or event == "exception"): 34 self.add_event(event, frame) 35 36 def add_event(self, event, frame=None): 37 """Add an event to the log.""" 38 if frame is None: 39 frame = sys._getframe(1) 40 41 try: 42 frameno = self.frames.index(frame) 43 except ValueError: 44 frameno = len(self.frames) 45 self.frames.append(frame) 46 47 self.events.append((frameno, event, ident(frame))) 48 49 def get_events(self): 50 """Remove calls to add_event().""" 51 disallowed = [ident(self.add_event.im_func), ident(ident)] 52 self.frames = None 53 54 return [item for item in self.events if item[2] not in disallowed] 55 56 57class ProfileSimulator(HookWatcher): 58 def __init__(self, testcase): 59 self.testcase = testcase 60 self.stack = [] 61 HookWatcher.__init__(self) 62 63 def callback(self, frame, event, arg): 64 # Callback registered with sys.setprofile()/sys.settrace() 65 self.dispatch[event](self, frame) 66 67 def trace_call(self, frame): 68 self.add_event('call', frame) 69 self.stack.append(frame) 70 71 def trace_return(self, frame): 72 self.add_event('return', frame) 73 self.stack.pop() 74 75 def trace_exception(self, frame): 76 self.testcase.fail( 77 "the profiler should never receive exception events") 78 79 def trace_pass(self, frame): 80 pass 81 82 dispatch = { 83 'call': trace_call, 84 'exception': trace_exception, 85 'return': trace_return, 86 'c_call': trace_pass, 87 'c_return': trace_pass, 88 'c_exception': trace_pass, 89 } 90 91 92class TestCaseBase(unittest.TestCase): 93 def check_events(self, callable, expected): 94 events = capture_events(callable, self.new_watcher()) 95 if events != expected: 96 self.fail("Expected events:\n%s\nReceived events:\n%s" 97 % (pprint.pformat(expected), pprint.pformat(events))) 98 99 100class ProfileHookTestCase(TestCaseBase): 101 def new_watcher(self): 102 return HookWatcher() 103 104 def test_simple(self): 105 def f(p): 106 pass 107 f_ident = ident(f) 108 self.check_events(f, [(1, 'call', f_ident), 109 (1, 'return', f_ident), 110 ]) 111 112 def test_exception(self): 113 def f(p): 114 1./0 115 f_ident = ident(f) 116 self.check_events(f, [(1, 'call', f_ident), 117 (1, 'return', f_ident), 118 ]) 119 120 def test_caught_exception(self): 121 def f(p): 122 try: 1./0 123 except: pass 124 f_ident = ident(f) 125 self.check_events(f, [(1, 'call', f_ident), 126 (1, 'return', f_ident), 127 ]) 128 129 def test_caught_nested_exception(self): 130 def f(p): 131 try: 1./0 132 except: pass 133 f_ident = ident(f) 134 self.check_events(f, [(1, 'call', f_ident), 135 (1, 'return', f_ident), 136 ]) 137 138 def test_nested_exception(self): 139 def f(p): 140 1./0 141 f_ident = ident(f) 142 self.check_events(f, [(1, 'call', f_ident), 143 # This isn't what I expected: 144 # (0, 'exception', protect_ident), 145 # I expected this again: 146 (1, 'return', f_ident), 147 ]) 148 149 def test_exception_in_except_clause(self): 150 def f(p): 151 1./0 152 def g(p): 153 try: 154 f(p) 155 except: 156 try: f(p) 157 except: pass 158 f_ident = ident(f) 159 g_ident = ident(g) 160 self.check_events(g, [(1, 'call', g_ident), 161 (2, 'call', f_ident), 162 (2, 'return', f_ident), 163 (3, 'call', f_ident), 164 (3, 'return', f_ident), 165 (1, 'return', g_ident), 166 ]) 167 168 def test_exception_propagation(self): 169 def f(p): 170 1./0 171 def g(p): 172 try: f(p) 173 finally: p.add_event("falling through") 174 f_ident = ident(f) 175 g_ident = ident(g) 176 self.check_events(g, [(1, 'call', g_ident), 177 (2, 'call', f_ident), 178 (2, 'return', f_ident), 179 (1, 'falling through', g_ident), 180 (1, 'return', g_ident), 181 ]) 182 183 def test_raise_twice(self): 184 def f(p): 185 try: 1./0 186 except: 1./0 187 f_ident = ident(f) 188 self.check_events(f, [(1, 'call', f_ident), 189 (1, 'return', f_ident), 190 ]) 191 192 def test_raise_reraise(self): 193 def f(p): 194 try: 1./0 195 except: raise 196 f_ident = ident(f) 197 self.check_events(f, [(1, 'call', f_ident), 198 (1, 'return', f_ident), 199 ]) 200 201 def test_raise(self): 202 def f(p): 203 raise Exception() 204 f_ident = ident(f) 205 self.check_events(f, [(1, 'call', f_ident), 206 (1, 'return', f_ident), 207 ]) 208 209 def test_distant_exception(self): 210 def f(): 211 1./0 212 def g(): 213 f() 214 def h(): 215 g() 216 def i(): 217 h() 218 def j(p): 219 i() 220 f_ident = ident(f) 221 g_ident = ident(g) 222 h_ident = ident(h) 223 i_ident = ident(i) 224 j_ident = ident(j) 225 self.check_events(j, [(1, 'call', j_ident), 226 (2, 'call', i_ident), 227 (3, 'call', h_ident), 228 (4, 'call', g_ident), 229 (5, 'call', f_ident), 230 (5, 'return', f_ident), 231 (4, 'return', g_ident), 232 (3, 'return', h_ident), 233 (2, 'return', i_ident), 234 (1, 'return', j_ident), 235 ]) 236 237 def test_generator(self): 238 def f(): 239 for i in range(2): 240 yield i 241 def g(p): 242 for i in f(): 243 pass 244 f_ident = ident(f) 245 g_ident = ident(g) 246 self.check_events(g, [(1, 'call', g_ident), 247 # call the iterator twice to generate values 248 (2, 'call', f_ident), 249 (2, 'return', f_ident), 250 (2, 'call', f_ident), 251 (2, 'return', f_ident), 252 # once more; returns end-of-iteration with 253 # actually raising an exception 254 (2, 'call', f_ident), 255 (2, 'return', f_ident), 256 (1, 'return', g_ident), 257 ]) 258 259 def test_stop_iteration(self): 260 def f(): 261 for i in range(2): 262 yield i 263 raise StopIteration 264 def g(p): 265 for i in f(): 266 pass 267 f_ident = ident(f) 268 g_ident = ident(g) 269 self.check_events(g, [(1, 'call', g_ident), 270 # call the iterator twice to generate values 271 (2, 'call', f_ident), 272 (2, 'return', f_ident), 273 (2, 'call', f_ident), 274 (2, 'return', f_ident), 275 # once more to hit the raise: 276 (2, 'call', f_ident), 277 (2, 'return', f_ident), 278 (1, 'return', g_ident), 279 ]) 280 281 282class ProfileSimulatorTestCase(TestCaseBase): 283 def new_watcher(self): 284 return ProfileSimulator(self) 285 286 def test_simple(self): 287 def f(p): 288 pass 289 f_ident = ident(f) 290 self.check_events(f, [(1, 'call', f_ident), 291 (1, 'return', f_ident), 292 ]) 293 294 def test_basic_exception(self): 295 def f(p): 296 1./0 297 f_ident = ident(f) 298 self.check_events(f, [(1, 'call', f_ident), 299 (1, 'return', f_ident), 300 ]) 301 302 def test_caught_exception(self): 303 def f(p): 304 try: 1./0 305 except: pass 306 f_ident = ident(f) 307 self.check_events(f, [(1, 'call', f_ident), 308 (1, 'return', f_ident), 309 ]) 310 311 def test_distant_exception(self): 312 def f(): 313 1./0 314 def g(): 315 f() 316 def h(): 317 g() 318 def i(): 319 h() 320 def j(p): 321 i() 322 f_ident = ident(f) 323 g_ident = ident(g) 324 h_ident = ident(h) 325 i_ident = ident(i) 326 j_ident = ident(j) 327 self.check_events(j, [(1, 'call', j_ident), 328 (2, 'call', i_ident), 329 (3, 'call', h_ident), 330 (4, 'call', g_ident), 331 (5, 'call', f_ident), 332 (5, 'return', f_ident), 333 (4, 'return', g_ident), 334 (3, 'return', h_ident), 335 (2, 'return', i_ident), 336 (1, 'return', j_ident), 337 ]) 338 339 340def ident(function): 341 if hasattr(function, "f_code"): 342 code = function.f_code 343 else: 344 code = function.func_code 345 return code.co_firstlineno, code.co_name 346 347 348def protect(f, p): 349 try: f(p) 350 except: pass 351 352protect_ident = ident(protect) 353 354 355def capture_events(callable, p=None): 356 if p is None: 357 p = HookWatcher() 358 # Disable the garbage collector. This prevents __del__s from showing up in 359 # traces. 360 old_gc = gc.isenabled() 361 gc.disable() 362 try: 363 sys.setprofile(p.callback) 364 protect(callable, p) 365 sys.setprofile(None) 366 finally: 367 if old_gc: 368 gc.enable() 369 return p.get_events()[1:-1] 370 371 372def show_events(callable): 373 import pprint 374 pprint.pprint(capture_events(callable)) 375 376 377def test_main(): 378 test_support.run_unittest( 379 TestGetProfile, 380 ProfileHookTestCase, 381 ProfileSimulatorTestCase 382 ) 383 384 385if __name__ == "__main__": 386 test_main() 387