1import gc 2import pprint 3import sys 4import unittest 5 6 7class TestGetProfile(unittest.TestCase): 8 def setUp(self): 9 sys.setprofile(None) 10 11 def tearDown(self): 12 sys.setprofile(None) 13 14 def test_empty(self): 15 self.assertIsNone(sys.getprofile()) 16 17 def test_setget(self): 18 def fn(*args): 19 pass 20 21 sys.setprofile(fn) 22 self.assertIs(sys.getprofile(), fn) 23 24class HookWatcher: 25 def __init__(self): 26 self.frames = [] 27 self.events = [] 28 29 def callback(self, frame, event, arg): 30 if (event == "call" 31 or event == "return" 32 or event == "exception"): 33 self.add_event(event, frame) 34 35 def add_event(self, event, frame=None): 36 """Add an event to the log.""" 37 if frame is None: 38 frame = sys._getframe(1) 39 40 try: 41 frameno = self.frames.index(frame) 42 except ValueError: 43 frameno = len(self.frames) 44 self.frames.append(frame) 45 46 self.events.append((frameno, event, ident(frame))) 47 48 def get_events(self): 49 """Remove calls to add_event().""" 50 disallowed = [ident(self.add_event.__func__), ident(ident)] 51 self.frames = None 52 53 return [item for item in self.events if item[2] not in disallowed] 54 55 56class ProfileSimulator(HookWatcher): 57 def __init__(self, testcase): 58 self.testcase = testcase 59 self.stack = [] 60 HookWatcher.__init__(self) 61 62 def callback(self, frame, event, arg): 63 # Callback registered with sys.setprofile()/sys.settrace() 64 self.dispatch[event](self, frame) 65 66 def trace_call(self, frame): 67 self.add_event('call', frame) 68 self.stack.append(frame) 69 70 def trace_return(self, frame): 71 self.add_event('return', frame) 72 self.stack.pop() 73 74 def trace_exception(self, frame): 75 self.testcase.fail( 76 "the profiler should never receive exception events") 77 78 def trace_pass(self, frame): 79 pass 80 81 dispatch = { 82 'call': trace_call, 83 'exception': trace_exception, 84 'return': trace_return, 85 'c_call': trace_pass, 86 'c_return': trace_pass, 87 'c_exception': trace_pass, 88 } 89 90 91class TestCaseBase(unittest.TestCase): 92 def check_events(self, callable, expected): 93 events = capture_events(callable, self.new_watcher()) 94 if events != expected: 95 self.fail("Expected events:\n%s\nReceived events:\n%s" 96 % (pprint.pformat(expected), pprint.pformat(events))) 97 98 99class ProfileHookTestCase(TestCaseBase): 100 def new_watcher(self): 101 return HookWatcher() 102 103 def test_simple(self): 104 def f(p): 105 pass 106 f_ident = ident(f) 107 self.check_events(f, [(1, 'call', f_ident), 108 (1, 'return', f_ident), 109 ]) 110 111 def test_exception(self): 112 def f(p): 113 1/0 114 f_ident = ident(f) 115 self.check_events(f, [(1, 'call', f_ident), 116 (1, 'return', f_ident), 117 ]) 118 119 def test_caught_exception(self): 120 def f(p): 121 try: 1/0 122 except: pass 123 f_ident = ident(f) 124 self.check_events(f, [(1, 'call', f_ident), 125 (1, 'return', f_ident), 126 ]) 127 128 def test_caught_nested_exception(self): 129 def f(p): 130 try: 1/0 131 except: pass 132 f_ident = ident(f) 133 self.check_events(f, [(1, 'call', f_ident), 134 (1, 'return', f_ident), 135 ]) 136 137 def test_nested_exception(self): 138 def f(p): 139 1/0 140 f_ident = ident(f) 141 self.check_events(f, [(1, 'call', f_ident), 142 # This isn't what I expected: 143 # (0, 'exception', protect_ident), 144 # I expected this again: 145 (1, 'return', f_ident), 146 ]) 147 148 def test_exception_in_except_clause(self): 149 def f(p): 150 1/0 151 def g(p): 152 try: 153 f(p) 154 except: 155 try: f(p) 156 except: pass 157 f_ident = ident(f) 158 g_ident = ident(g) 159 self.check_events(g, [(1, 'call', g_ident), 160 (2, 'call', f_ident), 161 (2, 'return', f_ident), 162 (3, 'call', f_ident), 163 (3, 'return', f_ident), 164 (1, 'return', g_ident), 165 ]) 166 167 def test_exception_propagation(self): 168 def f(p): 169 1/0 170 def g(p): 171 try: f(p) 172 finally: p.add_event("falling through") 173 f_ident = ident(f) 174 g_ident = ident(g) 175 self.check_events(g, [(1, 'call', g_ident), 176 (2, 'call', f_ident), 177 (2, 'return', f_ident), 178 (1, 'falling through', g_ident), 179 (1, 'return', g_ident), 180 ]) 181 182 def test_raise_twice(self): 183 def f(p): 184 try: 1/0 185 except: 1/0 186 f_ident = ident(f) 187 self.check_events(f, [(1, 'call', f_ident), 188 (1, 'return', f_ident), 189 ]) 190 191 def test_raise_reraise(self): 192 def f(p): 193 try: 1/0 194 except: raise 195 f_ident = ident(f) 196 self.check_events(f, [(1, 'call', f_ident), 197 (1, 'return', f_ident), 198 ]) 199 200 def test_raise(self): 201 def f(p): 202 raise Exception() 203 f_ident = ident(f) 204 self.check_events(f, [(1, 'call', f_ident), 205 (1, 'return', f_ident), 206 ]) 207 208 def test_distant_exception(self): 209 def f(): 210 1/0 211 def g(): 212 f() 213 def h(): 214 g() 215 def i(): 216 h() 217 def j(p): 218 i() 219 f_ident = ident(f) 220 g_ident = ident(g) 221 h_ident = ident(h) 222 i_ident = ident(i) 223 j_ident = ident(j) 224 self.check_events(j, [(1, 'call', j_ident), 225 (2, 'call', i_ident), 226 (3, 'call', h_ident), 227 (4, 'call', g_ident), 228 (5, 'call', f_ident), 229 (5, 'return', f_ident), 230 (4, 'return', g_ident), 231 (3, 'return', h_ident), 232 (2, 'return', i_ident), 233 (1, 'return', j_ident), 234 ]) 235 236 def test_generator(self): 237 def f(): 238 for i in range(2): 239 yield i 240 def g(p): 241 for i in f(): 242 pass 243 f_ident = ident(f) 244 g_ident = ident(g) 245 self.check_events(g, [(1, 'call', g_ident), 246 # call the iterator twice to generate values 247 (2, 'call', f_ident), 248 (2, 'return', f_ident), 249 (2, 'call', f_ident), 250 (2, 'return', f_ident), 251 # once more; returns end-of-iteration with 252 # actually raising an exception 253 (2, 'call', f_ident), 254 (2, 'return', f_ident), 255 (1, 'return', g_ident), 256 ]) 257 258 def test_stop_iteration(self): 259 def f(): 260 for i in range(2): 261 yield i 262 def g(p): 263 for i in f(): 264 pass 265 f_ident = ident(f) 266 g_ident = ident(g) 267 self.check_events(g, [(1, 'call', g_ident), 268 # call the iterator twice to generate values 269 (2, 'call', f_ident), 270 (2, 'return', f_ident), 271 (2, 'call', f_ident), 272 (2, 'return', f_ident), 273 # once more to hit the raise: 274 (2, 'call', f_ident), 275 (2, 'return', f_ident), 276 (1, 'return', g_ident), 277 ]) 278 279 280class ProfileSimulatorTestCase(TestCaseBase): 281 def new_watcher(self): 282 return ProfileSimulator(self) 283 284 def test_simple(self): 285 def f(p): 286 pass 287 f_ident = ident(f) 288 self.check_events(f, [(1, 'call', f_ident), 289 (1, 'return', f_ident), 290 ]) 291 292 def test_basic_exception(self): 293 def f(p): 294 1/0 295 f_ident = ident(f) 296 self.check_events(f, [(1, 'call', f_ident), 297 (1, 'return', f_ident), 298 ]) 299 300 def test_caught_exception(self): 301 def f(p): 302 try: 1/0 303 except: pass 304 f_ident = ident(f) 305 self.check_events(f, [(1, 'call', f_ident), 306 (1, 'return', f_ident), 307 ]) 308 309 def test_distant_exception(self): 310 def f(): 311 1/0 312 def g(): 313 f() 314 def h(): 315 g() 316 def i(): 317 h() 318 def j(p): 319 i() 320 f_ident = ident(f) 321 g_ident = ident(g) 322 h_ident = ident(h) 323 i_ident = ident(i) 324 j_ident = ident(j) 325 self.check_events(j, [(1, 'call', j_ident), 326 (2, 'call', i_ident), 327 (3, 'call', h_ident), 328 (4, 'call', g_ident), 329 (5, 'call', f_ident), 330 (5, 'return', f_ident), 331 (4, 'return', g_ident), 332 (3, 'return', h_ident), 333 (2, 'return', i_ident), 334 (1, 'return', j_ident), 335 ]) 336 337 # bpo-34125: profiling method_descriptor with **kwargs 338 def test_unbound_method(self): 339 kwargs = {} 340 def f(p): 341 dict.get({}, 42, **kwargs) 342 f_ident = ident(f) 343 self.check_events(f, [(1, 'call', f_ident), 344 (1, 'return', f_ident)]) 345 346 # Test an invalid call (bpo-34126) 347 def test_unbound_method_no_args(self): 348 def f(p): 349 dict.get() 350 f_ident = ident(f) 351 self.check_events(f, [(1, 'call', f_ident), 352 (1, 'return', f_ident)]) 353 354 # Test an invalid call (bpo-34126) 355 def test_unbound_method_invalid_args(self): 356 def f(p): 357 dict.get(print, 42) 358 f_ident = ident(f) 359 self.check_events(f, [(1, 'call', f_ident), 360 (1, 'return', f_ident)]) 361 362 # Test an invalid call (bpo-34125) 363 def test_unbound_method_no_keyword_args(self): 364 kwargs = {} 365 def f(p): 366 dict.get(**kwargs) 367 f_ident = ident(f) 368 self.check_events(f, [(1, 'call', f_ident), 369 (1, 'return', f_ident)]) 370 371 # Test an invalid call (bpo-34125) 372 def test_unbound_method_invalid_keyword_args(self): 373 kwargs = {} 374 def f(p): 375 dict.get(print, 42, **kwargs) 376 f_ident = ident(f) 377 self.check_events(f, [(1, 'call', f_ident), 378 (1, 'return', f_ident)]) 379 380 381def ident(function): 382 if hasattr(function, "f_code"): 383 code = function.f_code 384 else: 385 code = function.__code__ 386 return code.co_firstlineno, code.co_name 387 388 389def protect(f, p): 390 try: f(p) 391 except: pass 392 393protect_ident = ident(protect) 394 395 396def capture_events(callable, p=None): 397 if p is None: 398 p = HookWatcher() 399 # Disable the garbage collector. This prevents __del__s from showing up in 400 # traces. 401 old_gc = gc.isenabled() 402 gc.disable() 403 try: 404 sys.setprofile(p.callback) 405 protect(callable, p) 406 sys.setprofile(None) 407 finally: 408 if old_gc: 409 gc.enable() 410 return p.get_events()[1:-1] 411 412 413def show_events(callable): 414 import pprint 415 pprint.pprint(capture_events(callable)) 416 417 418if __name__ == "__main__": 419 unittest.main() 420