• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import pprint
3import sys
4import unittest
5
6
7class TestGetProfile(unittest.TestCase):
8    def setUp(self):
9        sys.setprofile(None)
10
11    def tearDown(self):
12        sys.setprofile(None)
13
14    def test_empty(self):
15        self.assertIsNone(sys.getprofile())
16
17    def test_setget(self):
18        def fn(*args):
19            pass
20
21        sys.setprofile(fn)
22        self.assertIs(sys.getprofile(), fn)
23
24class HookWatcher:
25    def __init__(self):
26        self.frames = []
27        self.events = []
28
29    def callback(self, frame, event, arg):
30        if (event == "call"
31            or event == "return"
32            or event == "exception"):
33            self.add_event(event, frame, arg)
34
35    def add_event(self, event, frame=None, arg=None):
36        """Add an event to the log."""
37        if frame is None:
38            frame = sys._getframe(1)
39
40        try:
41            frameno = self.frames.index(frame)
42        except ValueError:
43            frameno = len(self.frames)
44            self.frames.append(frame)
45
46        self.events.append((frameno, event, ident(frame), arg))
47
48    def get_events(self):
49        """Remove calls to add_event()."""
50        disallowed = [ident(self.add_event.__func__), ident(ident)]
51        self.frames = None
52
53        return [item for item in self.events if item[2] not in disallowed]
54
55
56class ProfileSimulator(HookWatcher):
57    def __init__(self, testcase):
58        self.testcase = testcase
59        self.stack = []
60        HookWatcher.__init__(self)
61
62    def callback(self, frame, event, arg):
63        # Callback registered with sys.setprofile()/sys.settrace()
64        self.dispatch[event](self, frame)
65
66    def trace_call(self, frame):
67        self.add_event('call', frame)
68        self.stack.append(frame)
69
70    def trace_return(self, frame):
71        self.add_event('return', frame)
72        self.stack.pop()
73
74    def trace_exception(self, frame):
75        self.testcase.fail(
76            "the profiler should never receive exception events")
77
78    def trace_pass(self, frame):
79        pass
80
81    dispatch = {
82        'call': trace_call,
83        'exception': trace_exception,
84        'return': trace_return,
85        'c_call': trace_pass,
86        'c_return': trace_pass,
87        'c_exception': trace_pass,
88        }
89
90
91class TestCaseBase(unittest.TestCase):
92    def check_events(self, callable, expected, check_args=False):
93        events = capture_events(callable, self.new_watcher())
94        if check_args:
95            if events != expected:
96                self.fail("Expected events:\n%s\nReceived events:\n%s"
97                          % (pprint.pformat(expected), pprint.pformat(events)))
98        else:
99            if [(frameno, event, ident) for frameno, event, ident, arg in events] != expected:
100                self.fail("Expected events:\n%s\nReceived events:\n%s"
101                          % (pprint.pformat(expected), pprint.pformat(events)))
102
103
104class ProfileHookTestCase(TestCaseBase):
105    def new_watcher(self):
106        return HookWatcher()
107
108    def test_simple(self):
109        def f(p):
110            pass
111        f_ident = ident(f)
112        self.check_events(f, [(1, 'call', f_ident),
113                              (1, 'return', f_ident),
114                              ])
115
116    def test_exception(self):
117        def f(p):
118            1/0
119        f_ident = ident(f)
120        self.check_events(f, [(1, 'call', f_ident),
121                              (1, 'return', f_ident),
122                              ])
123
124    def test_caught_exception(self):
125        def f(p):
126            try: 1/0
127            except: pass
128        f_ident = ident(f)
129        self.check_events(f, [(1, 'call', f_ident),
130                              (1, 'return', f_ident),
131                              ])
132
133    def test_caught_nested_exception(self):
134        def f(p):
135            try: 1/0
136            except: pass
137        f_ident = ident(f)
138        self.check_events(f, [(1, 'call', f_ident),
139                              (1, 'return', f_ident),
140                              ])
141
142    def test_nested_exception(self):
143        def f(p):
144            1/0
145        f_ident = ident(f)
146        self.check_events(f, [(1, 'call', f_ident),
147                              # This isn't what I expected:
148                              # (0, 'exception', protect_ident),
149                              # I expected this again:
150                              (1, 'return', f_ident),
151                              ])
152
153    def test_exception_in_except_clause(self):
154        def f(p):
155            1/0
156        def g(p):
157            try:
158                f(p)
159            except:
160                try: f(p)
161                except: pass
162        f_ident = ident(f)
163        g_ident = ident(g)
164        self.check_events(g, [(1, 'call', g_ident),
165                              (2, 'call', f_ident),
166                              (2, 'return', f_ident),
167                              (3, 'call', f_ident),
168                              (3, 'return', f_ident),
169                              (1, 'return', g_ident),
170                              ])
171
172    def test_exception_propagation(self):
173        def f(p):
174            1/0
175        def g(p):
176            try: f(p)
177            finally: p.add_event("falling through")
178        f_ident = ident(f)
179        g_ident = ident(g)
180        self.check_events(g, [(1, 'call', g_ident),
181                              (2, 'call', f_ident),
182                              (2, 'return', f_ident),
183                              (1, 'falling through', g_ident),
184                              (1, 'return', g_ident),
185                              ])
186
187    def test_raise_twice(self):
188        def f(p):
189            try: 1/0
190            except: 1/0
191        f_ident = ident(f)
192        self.check_events(f, [(1, 'call', f_ident),
193                              (1, 'return', f_ident),
194                              ])
195
196    def test_raise_reraise(self):
197        def f(p):
198            try: 1/0
199            except: raise
200        f_ident = ident(f)
201        self.check_events(f, [(1, 'call', f_ident),
202                              (1, 'return', f_ident),
203                              ])
204
205    def test_raise(self):
206        def f(p):
207            raise Exception()
208        f_ident = ident(f)
209        self.check_events(f, [(1, 'call', f_ident),
210                              (1, 'return', f_ident),
211                              ])
212
213    def test_distant_exception(self):
214        def f():
215            1/0
216        def g():
217            f()
218        def h():
219            g()
220        def i():
221            h()
222        def j(p):
223            i()
224        f_ident = ident(f)
225        g_ident = ident(g)
226        h_ident = ident(h)
227        i_ident = ident(i)
228        j_ident = ident(j)
229        self.check_events(j, [(1, 'call', j_ident),
230                              (2, 'call', i_ident),
231                              (3, 'call', h_ident),
232                              (4, 'call', g_ident),
233                              (5, 'call', f_ident),
234                              (5, 'return', f_ident),
235                              (4, 'return', g_ident),
236                              (3, 'return', h_ident),
237                              (2, 'return', i_ident),
238                              (1, 'return', j_ident),
239                              ])
240
241    def test_generator(self):
242        def f():
243            for i in range(2):
244                yield i
245        def g(p):
246            for i in f():
247                pass
248        f_ident = ident(f)
249        g_ident = ident(g)
250        self.check_events(g, [(1, 'call', g_ident),
251                              # call the iterator twice to generate values
252                              (2, 'call', f_ident),
253                              (2, 'return', f_ident),
254                              (2, 'call', f_ident),
255                              (2, 'return', f_ident),
256                              # once more; returns end-of-iteration with
257                              # actually raising an exception
258                              (2, 'call', f_ident),
259                              (2, 'return', f_ident),
260                              (1, 'return', g_ident),
261                              ])
262
263    def test_unfinished_generator(self):
264        def f():
265            for i in range(2):
266                yield i
267        def g(p):
268            next(f())
269
270        f_ident = ident(f)
271        g_ident = ident(g)
272        self.check_events(g, [(1, 'call', g_ident, None),
273                              (2, 'call', f_ident, None),
274                              (2, 'return', f_ident, 0),
275                              (1, 'return', g_ident, None),
276                              ], check_args=True)
277
278    def test_stop_iteration(self):
279        def f():
280            for i in range(2):
281                yield i
282        def g(p):
283            for i in f():
284                pass
285        f_ident = ident(f)
286        g_ident = ident(g)
287        self.check_events(g, [(1, 'call', g_ident),
288                              # call the iterator twice to generate values
289                              (2, 'call', f_ident),
290                              (2, 'return', f_ident),
291                              (2, 'call', f_ident),
292                              (2, 'return', f_ident),
293                              # once more to hit the raise:
294                              (2, 'call', f_ident),
295                              (2, 'return', f_ident),
296                              (1, 'return', g_ident),
297                              ])
298
299
300class ProfileSimulatorTestCase(TestCaseBase):
301    def new_watcher(self):
302        return ProfileSimulator(self)
303
304    def test_simple(self):
305        def f(p):
306            pass
307        f_ident = ident(f)
308        self.check_events(f, [(1, 'call', f_ident),
309                              (1, 'return', f_ident),
310                              ])
311
312    def test_basic_exception(self):
313        def f(p):
314            1/0
315        f_ident = ident(f)
316        self.check_events(f, [(1, 'call', f_ident),
317                              (1, 'return', f_ident),
318                              ])
319
320    def test_caught_exception(self):
321        def f(p):
322            try: 1/0
323            except: pass
324        f_ident = ident(f)
325        self.check_events(f, [(1, 'call', f_ident),
326                              (1, 'return', f_ident),
327                              ])
328
329    def test_distant_exception(self):
330        def f():
331            1/0
332        def g():
333            f()
334        def h():
335            g()
336        def i():
337            h()
338        def j(p):
339            i()
340        f_ident = ident(f)
341        g_ident = ident(g)
342        h_ident = ident(h)
343        i_ident = ident(i)
344        j_ident = ident(j)
345        self.check_events(j, [(1, 'call', j_ident),
346                              (2, 'call', i_ident),
347                              (3, 'call', h_ident),
348                              (4, 'call', g_ident),
349                              (5, 'call', f_ident),
350                              (5, 'return', f_ident),
351                              (4, 'return', g_ident),
352                              (3, 'return', h_ident),
353                              (2, 'return', i_ident),
354                              (1, 'return', j_ident),
355                              ])
356
357    # bpo-34125: profiling method_descriptor with **kwargs
358    def test_unbound_method(self):
359        kwargs = {}
360        def f(p):
361            dict.get({}, 42, **kwargs)
362        f_ident = ident(f)
363        self.check_events(f, [(1, 'call', f_ident),
364                              (1, 'return', f_ident)])
365
366    # Test an invalid call (bpo-34126)
367    def test_unbound_method_no_args(self):
368        def f(p):
369            dict.get()
370        f_ident = ident(f)
371        self.check_events(f, [(1, 'call', f_ident),
372                              (1, 'return', f_ident)])
373
374    # Test an invalid call (bpo-34126)
375    def test_unbound_method_invalid_args(self):
376        def f(p):
377            dict.get(print, 42)
378        f_ident = ident(f)
379        self.check_events(f, [(1, 'call', f_ident),
380                              (1, 'return', f_ident)])
381
382    # Test an invalid call (bpo-34125)
383    def test_unbound_method_no_keyword_args(self):
384        kwargs = {}
385        def f(p):
386            dict.get(**kwargs)
387        f_ident = ident(f)
388        self.check_events(f, [(1, 'call', f_ident),
389                              (1, 'return', f_ident)])
390
391    # Test an invalid call (bpo-34125)
392    def test_unbound_method_invalid_keyword_args(self):
393        kwargs = {}
394        def f(p):
395            dict.get(print, 42, **kwargs)
396        f_ident = ident(f)
397        self.check_events(f, [(1, 'call', f_ident),
398                              (1, 'return', f_ident)])
399
400
401def ident(function):
402    if hasattr(function, "f_code"):
403        code = function.f_code
404    else:
405        code = function.__code__
406    return code.co_firstlineno, code.co_name
407
408
409def protect(f, p):
410    try: f(p)
411    except: pass
412
413protect_ident = ident(protect)
414
415
416def capture_events(callable, p=None):
417    if p is None:
418        p = HookWatcher()
419    # Disable the garbage collector. This prevents __del__s from showing up in
420    # traces.
421    old_gc = gc.isenabled()
422    gc.disable()
423    try:
424        sys.setprofile(p.callback)
425        protect(callable, p)
426        sys.setprofile(None)
427    finally:
428        if old_gc:
429            gc.enable()
430    return p.get_events()[1:-1]
431
432
433def show_events(callable):
434    import pprint
435    pprint.pprint(capture_events(callable))
436
437
438class TestEdgeCases(unittest.TestCase):
439
440    def setUp(self):
441        self.addCleanup(sys.setprofile, sys.getprofile())
442        sys.setprofile(None)
443
444    def test_reentrancy(self):
445        def foo(*args):
446            ...
447
448        def bar(*args):
449            ...
450
451        class A:
452            def __call__(self, *args):
453                pass
454
455            def __del__(self):
456                sys.setprofile(bar)
457
458        sys.setprofile(A())
459        sys.setprofile(foo)
460        self.assertEqual(sys.getprofile(), bar)
461
462    def test_same_object(self):
463        def foo(*args):
464            ...
465
466        sys.setprofile(foo)
467        del foo
468        sys.setprofile(sys.getprofile())
469
470    def test_profile_after_trace_opcodes(self):
471        def f():
472            ...
473
474        sys._getframe().f_trace_opcodes = True
475        prev_trace = sys.gettrace()
476        sys.settrace(lambda *args: None)
477        f()
478        sys.settrace(prev_trace)
479        sys.setprofile(lambda *args: None)
480        f()
481
482    def test_method_with_c_function(self):
483        # gh-122029
484        # When we have a PyMethodObject whose im_func is a C function, we
485        # should record both the call and the return. f = classmethod(repr)
486        # is just a way to create a PyMethodObject with a C function.
487        class A:
488            f = classmethod(repr)
489        events = []
490        sys.setprofile(lambda frame, event, args: events.append(event))
491        A().f()
492        sys.setprofile(None)
493        # The last c_call is the call to sys.setprofile
494        self.assertEqual(events, ['c_call', 'c_return', 'c_call'])
495
496
497if __name__ == "__main__":
498    unittest.main()
499