• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import pprint
3import sys
4import unittest
5
6
7class TestGetProfile(unittest.TestCase):
8    def setUp(self):
9        sys.setprofile(None)
10
11    def tearDown(self):
12        sys.setprofile(None)
13
14    def test_empty(self):
15        self.assertIsNone(sys.getprofile())
16
17    def test_setget(self):
18        def fn(*args):
19            pass
20
21        sys.setprofile(fn)
22        self.assertIs(sys.getprofile(), fn)
23
24class HookWatcher:
25    def __init__(self):
26        self.frames = []
27        self.events = []
28
29    def callback(self, frame, event, arg):
30        if (event == "call"
31            or event == "return"
32            or event == "exception"):
33            self.add_event(event, frame)
34
35    def add_event(self, event, frame=None):
36        """Add an event to the log."""
37        if frame is None:
38            frame = sys._getframe(1)
39
40        try:
41            frameno = self.frames.index(frame)
42        except ValueError:
43            frameno = len(self.frames)
44            self.frames.append(frame)
45
46        self.events.append((frameno, event, ident(frame)))
47
48    def get_events(self):
49        """Remove calls to add_event()."""
50        disallowed = [ident(self.add_event.__func__), ident(ident)]
51        self.frames = None
52
53        return [item for item in self.events if item[2] not in disallowed]
54
55
56class ProfileSimulator(HookWatcher):
57    def __init__(self, testcase):
58        self.testcase = testcase
59        self.stack = []
60        HookWatcher.__init__(self)
61
62    def callback(self, frame, event, arg):
63        # Callback registered with sys.setprofile()/sys.settrace()
64        self.dispatch[event](self, frame)
65
66    def trace_call(self, frame):
67        self.add_event('call', frame)
68        self.stack.append(frame)
69
70    def trace_return(self, frame):
71        self.add_event('return', frame)
72        self.stack.pop()
73
74    def trace_exception(self, frame):
75        self.testcase.fail(
76            "the profiler should never receive exception events")
77
78    def trace_pass(self, frame):
79        pass
80
81    dispatch = {
82        'call': trace_call,
83        'exception': trace_exception,
84        'return': trace_return,
85        'c_call': trace_pass,
86        'c_return': trace_pass,
87        'c_exception': trace_pass,
88        }
89
90
91class TestCaseBase(unittest.TestCase):
92    def check_events(self, callable, expected):
93        events = capture_events(callable, self.new_watcher())
94        if events != expected:
95            self.fail("Expected events:\n%s\nReceived events:\n%s"
96                      % (pprint.pformat(expected), pprint.pformat(events)))
97
98
99class ProfileHookTestCase(TestCaseBase):
100    def new_watcher(self):
101        return HookWatcher()
102
103    def test_simple(self):
104        def f(p):
105            pass
106        f_ident = ident(f)
107        self.check_events(f, [(1, 'call', f_ident),
108                              (1, 'return', f_ident),
109                              ])
110
111    def test_exception(self):
112        def f(p):
113            1/0
114        f_ident = ident(f)
115        self.check_events(f, [(1, 'call', f_ident),
116                              (1, 'return', f_ident),
117                              ])
118
119    def test_caught_exception(self):
120        def f(p):
121            try: 1/0
122            except: pass
123        f_ident = ident(f)
124        self.check_events(f, [(1, 'call', f_ident),
125                              (1, 'return', f_ident),
126                              ])
127
128    def test_caught_nested_exception(self):
129        def f(p):
130            try: 1/0
131            except: pass
132        f_ident = ident(f)
133        self.check_events(f, [(1, 'call', f_ident),
134                              (1, 'return', f_ident),
135                              ])
136
137    def test_nested_exception(self):
138        def f(p):
139            1/0
140        f_ident = ident(f)
141        self.check_events(f, [(1, 'call', f_ident),
142                              # This isn't what I expected:
143                              # (0, 'exception', protect_ident),
144                              # I expected this again:
145                              (1, 'return', f_ident),
146                              ])
147
148    def test_exception_in_except_clause(self):
149        def f(p):
150            1/0
151        def g(p):
152            try:
153                f(p)
154            except:
155                try: f(p)
156                except: pass
157        f_ident = ident(f)
158        g_ident = ident(g)
159        self.check_events(g, [(1, 'call', g_ident),
160                              (2, 'call', f_ident),
161                              (2, 'return', f_ident),
162                              (3, 'call', f_ident),
163                              (3, 'return', f_ident),
164                              (1, 'return', g_ident),
165                              ])
166
167    def test_exception_propagation(self):
168        def f(p):
169            1/0
170        def g(p):
171            try: f(p)
172            finally: p.add_event("falling through")
173        f_ident = ident(f)
174        g_ident = ident(g)
175        self.check_events(g, [(1, 'call', g_ident),
176                              (2, 'call', f_ident),
177                              (2, 'return', f_ident),
178                              (1, 'falling through', g_ident),
179                              (1, 'return', g_ident),
180                              ])
181
182    def test_raise_twice(self):
183        def f(p):
184            try: 1/0
185            except: 1/0
186        f_ident = ident(f)
187        self.check_events(f, [(1, 'call', f_ident),
188                              (1, 'return', f_ident),
189                              ])
190
191    def test_raise_reraise(self):
192        def f(p):
193            try: 1/0
194            except: raise
195        f_ident = ident(f)
196        self.check_events(f, [(1, 'call', f_ident),
197                              (1, 'return', f_ident),
198                              ])
199
200    def test_raise(self):
201        def f(p):
202            raise Exception()
203        f_ident = ident(f)
204        self.check_events(f, [(1, 'call', f_ident),
205                              (1, 'return', f_ident),
206                              ])
207
208    def test_distant_exception(self):
209        def f():
210            1/0
211        def g():
212            f()
213        def h():
214            g()
215        def i():
216            h()
217        def j(p):
218            i()
219        f_ident = ident(f)
220        g_ident = ident(g)
221        h_ident = ident(h)
222        i_ident = ident(i)
223        j_ident = ident(j)
224        self.check_events(j, [(1, 'call', j_ident),
225                              (2, 'call', i_ident),
226                              (3, 'call', h_ident),
227                              (4, 'call', g_ident),
228                              (5, 'call', f_ident),
229                              (5, 'return', f_ident),
230                              (4, 'return', g_ident),
231                              (3, 'return', h_ident),
232                              (2, 'return', i_ident),
233                              (1, 'return', j_ident),
234                              ])
235
236    def test_generator(self):
237        def f():
238            for i in range(2):
239                yield i
240        def g(p):
241            for i in f():
242                pass
243        f_ident = ident(f)
244        g_ident = ident(g)
245        self.check_events(g, [(1, 'call', g_ident),
246                              # call the iterator twice to generate values
247                              (2, 'call', f_ident),
248                              (2, 'return', f_ident),
249                              (2, 'call', f_ident),
250                              (2, 'return', f_ident),
251                              # once more; returns end-of-iteration with
252                              # actually raising an exception
253                              (2, 'call', f_ident),
254                              (2, 'return', f_ident),
255                              (1, 'return', g_ident),
256                              ])
257
258    def test_stop_iteration(self):
259        def f():
260            for i in range(2):
261                yield i
262        def g(p):
263            for i in f():
264                pass
265        f_ident = ident(f)
266        g_ident = ident(g)
267        self.check_events(g, [(1, 'call', g_ident),
268                              # call the iterator twice to generate values
269                              (2, 'call', f_ident),
270                              (2, 'return', f_ident),
271                              (2, 'call', f_ident),
272                              (2, 'return', f_ident),
273                              # once more to hit the raise:
274                              (2, 'call', f_ident),
275                              (2, 'return', f_ident),
276                              (1, 'return', g_ident),
277                              ])
278
279
280class ProfileSimulatorTestCase(TestCaseBase):
281    def new_watcher(self):
282        return ProfileSimulator(self)
283
284    def test_simple(self):
285        def f(p):
286            pass
287        f_ident = ident(f)
288        self.check_events(f, [(1, 'call', f_ident),
289                              (1, 'return', f_ident),
290                              ])
291
292    def test_basic_exception(self):
293        def f(p):
294            1/0
295        f_ident = ident(f)
296        self.check_events(f, [(1, 'call', f_ident),
297                              (1, 'return', f_ident),
298                              ])
299
300    def test_caught_exception(self):
301        def f(p):
302            try: 1/0
303            except: pass
304        f_ident = ident(f)
305        self.check_events(f, [(1, 'call', f_ident),
306                              (1, 'return', f_ident),
307                              ])
308
309    def test_distant_exception(self):
310        def f():
311            1/0
312        def g():
313            f()
314        def h():
315            g()
316        def i():
317            h()
318        def j(p):
319            i()
320        f_ident = ident(f)
321        g_ident = ident(g)
322        h_ident = ident(h)
323        i_ident = ident(i)
324        j_ident = ident(j)
325        self.check_events(j, [(1, 'call', j_ident),
326                              (2, 'call', i_ident),
327                              (3, 'call', h_ident),
328                              (4, 'call', g_ident),
329                              (5, 'call', f_ident),
330                              (5, 'return', f_ident),
331                              (4, 'return', g_ident),
332                              (3, 'return', h_ident),
333                              (2, 'return', i_ident),
334                              (1, 'return', j_ident),
335                              ])
336
337    # bpo-34125: profiling method_descriptor with **kwargs
338    def test_unbound_method(self):
339        kwargs = {}
340        def f(p):
341            dict.get({}, 42, **kwargs)
342        f_ident = ident(f)
343        self.check_events(f, [(1, 'call', f_ident),
344                              (1, 'return', f_ident)])
345
346    # Test an invalid call (bpo-34126)
347    def test_unbound_method_no_args(self):
348        def f(p):
349            dict.get()
350        f_ident = ident(f)
351        self.check_events(f, [(1, 'call', f_ident),
352                              (1, 'return', f_ident)])
353
354    # Test an invalid call (bpo-34126)
355    def test_unbound_method_invalid_args(self):
356        def f(p):
357            dict.get(print, 42)
358        f_ident = ident(f)
359        self.check_events(f, [(1, 'call', f_ident),
360                              (1, 'return', f_ident)])
361
362    # Test an invalid call (bpo-34125)
363    def test_unbound_method_no_keyword_args(self):
364        kwargs = {}
365        def f(p):
366            dict.get(**kwargs)
367        f_ident = ident(f)
368        self.check_events(f, [(1, 'call', f_ident),
369                              (1, 'return', f_ident)])
370
371    # Test an invalid call (bpo-34125)
372    def test_unbound_method_invalid_keyword_args(self):
373        kwargs = {}
374        def f(p):
375            dict.get(print, 42, **kwargs)
376        f_ident = ident(f)
377        self.check_events(f, [(1, 'call', f_ident),
378                              (1, 'return', f_ident)])
379
380
381def ident(function):
382    if hasattr(function, "f_code"):
383        code = function.f_code
384    else:
385        code = function.__code__
386    return code.co_firstlineno, code.co_name
387
388
389def protect(f, p):
390    try: f(p)
391    except: pass
392
393protect_ident = ident(protect)
394
395
396def capture_events(callable, p=None):
397    if p is None:
398        p = HookWatcher()
399    # Disable the garbage collector. This prevents __del__s from showing up in
400    # traces.
401    old_gc = gc.isenabled()
402    gc.disable()
403    try:
404        sys.setprofile(p.callback)
405        protect(callable, p)
406        sys.setprofile(None)
407    finally:
408        if old_gc:
409            gc.enable()
410    return p.get_events()[1:-1]
411
412
413def show_events(callable):
414    import pprint
415    pprint.pprint(capture_events(callable))
416
417
418if __name__ == "__main__":
419    unittest.main()
420