1import gc 2import pprint 3import sys 4import unittest 5 6 7class TestGetProfile(unittest.TestCase): 8 def setUp(self): 9 sys.setprofile(None) 10 11 def tearDown(self): 12 sys.setprofile(None) 13 14 def test_empty(self): 15 self.assertIsNone(sys.getprofile()) 16 17 def test_setget(self): 18 def fn(*args): 19 pass 20 21 sys.setprofile(fn) 22 self.assertIs(sys.getprofile(), fn) 23 24class HookWatcher: 25 def __init__(self): 26 self.frames = [] 27 self.events = [] 28 29 def callback(self, frame, event, arg): 30 if (event == "call" 31 or event == "return" 32 or event == "exception"): 33 self.add_event(event, frame, arg) 34 35 def add_event(self, event, frame=None, arg=None): 36 """Add an event to the log.""" 37 if frame is None: 38 frame = sys._getframe(1) 39 40 try: 41 frameno = self.frames.index(frame) 42 except ValueError: 43 frameno = len(self.frames) 44 self.frames.append(frame) 45 46 self.events.append((frameno, event, ident(frame), arg)) 47 48 def get_events(self): 49 """Remove calls to add_event().""" 50 disallowed = [ident(self.add_event.__func__), ident(ident)] 51 self.frames = None 52 53 return [item for item in self.events if item[2] not in disallowed] 54 55 56class ProfileSimulator(HookWatcher): 57 def __init__(self, testcase): 58 self.testcase = testcase 59 self.stack = [] 60 HookWatcher.__init__(self) 61 62 def callback(self, frame, event, arg): 63 # Callback registered with sys.setprofile()/sys.settrace() 64 self.dispatch[event](self, frame) 65 66 def trace_call(self, frame): 67 self.add_event('call', frame) 68 self.stack.append(frame) 69 70 def trace_return(self, frame): 71 self.add_event('return', frame) 72 self.stack.pop() 73 74 def trace_exception(self, frame): 75 self.testcase.fail( 76 "the profiler should never receive exception events") 77 78 def trace_pass(self, frame): 79 pass 80 81 dispatch = { 82 'call': trace_call, 83 'exception': trace_exception, 84 'return': trace_return, 85 'c_call': trace_pass, 86 'c_return': trace_pass, 87 'c_exception': trace_pass, 88 } 89 90 91class TestCaseBase(unittest.TestCase): 92 def check_events(self, callable, expected, check_args=False): 93 events = capture_events(callable, self.new_watcher()) 94 if check_args: 95 if events != expected: 96 self.fail("Expected events:\n%s\nReceived events:\n%s" 97 % (pprint.pformat(expected), pprint.pformat(events))) 98 else: 99 if [(frameno, event, ident) for frameno, event, ident, arg in events] != expected: 100 self.fail("Expected events:\n%s\nReceived events:\n%s" 101 % (pprint.pformat(expected), pprint.pformat(events))) 102 103 104class ProfileHookTestCase(TestCaseBase): 105 def new_watcher(self): 106 return HookWatcher() 107 108 def test_simple(self): 109 def f(p): 110 pass 111 f_ident = ident(f) 112 self.check_events(f, [(1, 'call', f_ident), 113 (1, 'return', f_ident), 114 ]) 115 116 def test_exception(self): 117 def f(p): 118 1/0 119 f_ident = ident(f) 120 self.check_events(f, [(1, 'call', f_ident), 121 (1, 'return', f_ident), 122 ]) 123 124 def test_caught_exception(self): 125 def f(p): 126 try: 1/0 127 except: pass 128 f_ident = ident(f) 129 self.check_events(f, [(1, 'call', f_ident), 130 (1, 'return', f_ident), 131 ]) 132 133 def test_caught_nested_exception(self): 134 def f(p): 135 try: 1/0 136 except: pass 137 f_ident = ident(f) 138 self.check_events(f, [(1, 'call', f_ident), 139 (1, 'return', f_ident), 140 ]) 141 142 def test_nested_exception(self): 143 def f(p): 144 1/0 145 f_ident = ident(f) 146 self.check_events(f, [(1, 'call', f_ident), 147 # This isn't what I expected: 148 # (0, 'exception', protect_ident), 149 # I expected this again: 150 (1, 'return', f_ident), 151 ]) 152 153 def test_exception_in_except_clause(self): 154 def f(p): 155 1/0 156 def g(p): 157 try: 158 f(p) 159 except: 160 try: f(p) 161 except: pass 162 f_ident = ident(f) 163 g_ident = ident(g) 164 self.check_events(g, [(1, 'call', g_ident), 165 (2, 'call', f_ident), 166 (2, 'return', f_ident), 167 (3, 'call', f_ident), 168 (3, 'return', f_ident), 169 (1, 'return', g_ident), 170 ]) 171 172 def test_exception_propagation(self): 173 def f(p): 174 1/0 175 def g(p): 176 try: f(p) 177 finally: p.add_event("falling through") 178 f_ident = ident(f) 179 g_ident = ident(g) 180 self.check_events(g, [(1, 'call', g_ident), 181 (2, 'call', f_ident), 182 (2, 'return', f_ident), 183 (1, 'falling through', g_ident), 184 (1, 'return', g_ident), 185 ]) 186 187 def test_raise_twice(self): 188 def f(p): 189 try: 1/0 190 except: 1/0 191 f_ident = ident(f) 192 self.check_events(f, [(1, 'call', f_ident), 193 (1, 'return', f_ident), 194 ]) 195 196 def test_raise_reraise(self): 197 def f(p): 198 try: 1/0 199 except: raise 200 f_ident = ident(f) 201 self.check_events(f, [(1, 'call', f_ident), 202 (1, 'return', f_ident), 203 ]) 204 205 def test_raise(self): 206 def f(p): 207 raise Exception() 208 f_ident = ident(f) 209 self.check_events(f, [(1, 'call', f_ident), 210 (1, 'return', f_ident), 211 ]) 212 213 def test_distant_exception(self): 214 def f(): 215 1/0 216 def g(): 217 f() 218 def h(): 219 g() 220 def i(): 221 h() 222 def j(p): 223 i() 224 f_ident = ident(f) 225 g_ident = ident(g) 226 h_ident = ident(h) 227 i_ident = ident(i) 228 j_ident = ident(j) 229 self.check_events(j, [(1, 'call', j_ident), 230 (2, 'call', i_ident), 231 (3, 'call', h_ident), 232 (4, 'call', g_ident), 233 (5, 'call', f_ident), 234 (5, 'return', f_ident), 235 (4, 'return', g_ident), 236 (3, 'return', h_ident), 237 (2, 'return', i_ident), 238 (1, 'return', j_ident), 239 ]) 240 241 def test_generator(self): 242 def f(): 243 for i in range(2): 244 yield i 245 def g(p): 246 for i in f(): 247 pass 248 f_ident = ident(f) 249 g_ident = ident(g) 250 self.check_events(g, [(1, 'call', g_ident), 251 # call the iterator twice to generate values 252 (2, 'call', f_ident), 253 (2, 'return', f_ident), 254 (2, 'call', f_ident), 255 (2, 'return', f_ident), 256 # once more; returns end-of-iteration with 257 # actually raising an exception 258 (2, 'call', f_ident), 259 (2, 'return', f_ident), 260 (1, 'return', g_ident), 261 ]) 262 263 def test_unfinished_generator(self): 264 def f(): 265 for i in range(2): 266 yield i 267 def g(p): 268 next(f()) 269 270 f_ident = ident(f) 271 g_ident = ident(g) 272 self.check_events(g, [(1, 'call', g_ident, None), 273 (2, 'call', f_ident, None), 274 (2, 'return', f_ident, 0), 275 (1, 'return', g_ident, None), 276 ], check_args=True) 277 278 def test_stop_iteration(self): 279 def f(): 280 for i in range(2): 281 yield i 282 def g(p): 283 for i in f(): 284 pass 285 f_ident = ident(f) 286 g_ident = ident(g) 287 self.check_events(g, [(1, 'call', g_ident), 288 # call the iterator twice to generate values 289 (2, 'call', f_ident), 290 (2, 'return', f_ident), 291 (2, 'call', f_ident), 292 (2, 'return', f_ident), 293 # once more to hit the raise: 294 (2, 'call', f_ident), 295 (2, 'return', f_ident), 296 (1, 'return', g_ident), 297 ]) 298 299 300class ProfileSimulatorTestCase(TestCaseBase): 301 def new_watcher(self): 302 return ProfileSimulator(self) 303 304 def test_simple(self): 305 def f(p): 306 pass 307 f_ident = ident(f) 308 self.check_events(f, [(1, 'call', f_ident), 309 (1, 'return', f_ident), 310 ]) 311 312 def test_basic_exception(self): 313 def f(p): 314 1/0 315 f_ident = ident(f) 316 self.check_events(f, [(1, 'call', f_ident), 317 (1, 'return', f_ident), 318 ]) 319 320 def test_caught_exception(self): 321 def f(p): 322 try: 1/0 323 except: pass 324 f_ident = ident(f) 325 self.check_events(f, [(1, 'call', f_ident), 326 (1, 'return', f_ident), 327 ]) 328 329 def test_distant_exception(self): 330 def f(): 331 1/0 332 def g(): 333 f() 334 def h(): 335 g() 336 def i(): 337 h() 338 def j(p): 339 i() 340 f_ident = ident(f) 341 g_ident = ident(g) 342 h_ident = ident(h) 343 i_ident = ident(i) 344 j_ident = ident(j) 345 self.check_events(j, [(1, 'call', j_ident), 346 (2, 'call', i_ident), 347 (3, 'call', h_ident), 348 (4, 'call', g_ident), 349 (5, 'call', f_ident), 350 (5, 'return', f_ident), 351 (4, 'return', g_ident), 352 (3, 'return', h_ident), 353 (2, 'return', i_ident), 354 (1, 'return', j_ident), 355 ]) 356 357 # bpo-34125: profiling method_descriptor with **kwargs 358 def test_unbound_method(self): 359 kwargs = {} 360 def f(p): 361 dict.get({}, 42, **kwargs) 362 f_ident = ident(f) 363 self.check_events(f, [(1, 'call', f_ident), 364 (1, 'return', f_ident)]) 365 366 # Test an invalid call (bpo-34126) 367 def test_unbound_method_no_args(self): 368 def f(p): 369 dict.get() 370 f_ident = ident(f) 371 self.check_events(f, [(1, 'call', f_ident), 372 (1, 'return', f_ident)]) 373 374 # Test an invalid call (bpo-34126) 375 def test_unbound_method_invalid_args(self): 376 def f(p): 377 dict.get(print, 42) 378 f_ident = ident(f) 379 self.check_events(f, [(1, 'call', f_ident), 380 (1, 'return', f_ident)]) 381 382 # Test an invalid call (bpo-34125) 383 def test_unbound_method_no_keyword_args(self): 384 kwargs = {} 385 def f(p): 386 dict.get(**kwargs) 387 f_ident = ident(f) 388 self.check_events(f, [(1, 'call', f_ident), 389 (1, 'return', f_ident)]) 390 391 # Test an invalid call (bpo-34125) 392 def test_unbound_method_invalid_keyword_args(self): 393 kwargs = {} 394 def f(p): 395 dict.get(print, 42, **kwargs) 396 f_ident = ident(f) 397 self.check_events(f, [(1, 'call', f_ident), 398 (1, 'return', f_ident)]) 399 400 401def ident(function): 402 if hasattr(function, "f_code"): 403 code = function.f_code 404 else: 405 code = function.__code__ 406 return code.co_firstlineno, code.co_name 407 408 409def protect(f, p): 410 try: f(p) 411 except: pass 412 413protect_ident = ident(protect) 414 415 416def capture_events(callable, p=None): 417 if p is None: 418 p = HookWatcher() 419 # Disable the garbage collector. This prevents __del__s from showing up in 420 # traces. 421 old_gc = gc.isenabled() 422 gc.disable() 423 try: 424 sys.setprofile(p.callback) 425 protect(callable, p) 426 sys.setprofile(None) 427 finally: 428 if old_gc: 429 gc.enable() 430 return p.get_events()[1:-1] 431 432 433def show_events(callable): 434 import pprint 435 pprint.pprint(capture_events(callable)) 436 437 438class TestEdgeCases(unittest.TestCase): 439 440 def setUp(self): 441 self.addCleanup(sys.setprofile, sys.getprofile()) 442 sys.setprofile(None) 443 444 def test_reentrancy(self): 445 def foo(*args): 446 ... 447 448 def bar(*args): 449 ... 450 451 class A: 452 def __call__(self, *args): 453 pass 454 455 def __del__(self): 456 sys.setprofile(bar) 457 458 sys.setprofile(A()) 459 sys.setprofile(foo) 460 self.assertEqual(sys.getprofile(), bar) 461 462 def test_same_object(self): 463 def foo(*args): 464 ... 465 466 sys.setprofile(foo) 467 del foo 468 sys.setprofile(sys.getprofile()) 469 470 def test_profile_after_trace_opcodes(self): 471 def f(): 472 ... 473 474 sys._getframe().f_trace_opcodes = True 475 prev_trace = sys.gettrace() 476 sys.settrace(lambda *args: None) 477 f() 478 sys.settrace(prev_trace) 479 sys.setprofile(lambda *args: None) 480 f() 481 482 def test_method_with_c_function(self): 483 # gh-122029 484 # When we have a PyMethodObject whose im_func is a C function, we 485 # should record both the call and the return. f = classmethod(repr) 486 # is just a way to create a PyMethodObject with a C function. 487 class A: 488 f = classmethod(repr) 489 events = [] 490 sys.setprofile(lambda frame, event, args: events.append(event)) 491 A().f() 492 sys.setprofile(None) 493 # The last c_call is the call to sys.setprofile 494 self.assertEqual(events, ['c_call', 'c_return', 'c_call']) 495 496 497if __name__ == "__main__": 498 unittest.main() 499