• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import pprint
3import sys
4import unittest
5
6from test import test_support
7
8class TestGetProfile(unittest.TestCase):
9    def setUp(self):
10        sys.setprofile(None)
11
12    def tearDown(self):
13        sys.setprofile(None)
14
15    def test_empty(self):
16        self.assertIsNone(sys.getprofile())
17
18    def test_setget(self):
19        def fn(*args):
20            pass
21
22        sys.setprofile(fn)
23        self.assertIs(sys.getprofile(), fn)
24
25class HookWatcher:
26    def __init__(self):
27        self.frames = []
28        self.events = []
29
30    def callback(self, frame, event, arg):
31        if (event == "call"
32            or event == "return"
33            or event == "exception"):
34            self.add_event(event, frame)
35
36    def add_event(self, event, frame=None):
37        """Add an event to the log."""
38        if frame is None:
39            frame = sys._getframe(1)
40
41        try:
42            frameno = self.frames.index(frame)
43        except ValueError:
44            frameno = len(self.frames)
45            self.frames.append(frame)
46
47        self.events.append((frameno, event, ident(frame)))
48
49    def get_events(self):
50        """Remove calls to add_event()."""
51        disallowed = [ident(self.add_event.im_func), ident(ident)]
52        self.frames = None
53
54        return [item for item in self.events if item[2] not in disallowed]
55
56
57class ProfileSimulator(HookWatcher):
58    def __init__(self, testcase):
59        self.testcase = testcase
60        self.stack = []
61        HookWatcher.__init__(self)
62
63    def callback(self, frame, event, arg):
64        # Callback registered with sys.setprofile()/sys.settrace()
65        self.dispatch[event](self, frame)
66
67    def trace_call(self, frame):
68        self.add_event('call', frame)
69        self.stack.append(frame)
70
71    def trace_return(self, frame):
72        self.add_event('return', frame)
73        self.stack.pop()
74
75    def trace_exception(self, frame):
76        self.testcase.fail(
77            "the profiler should never receive exception events")
78
79    def trace_pass(self, frame):
80        pass
81
82    dispatch = {
83        'call': trace_call,
84        'exception': trace_exception,
85        'return': trace_return,
86        'c_call': trace_pass,
87        'c_return': trace_pass,
88        'c_exception': trace_pass,
89        }
90
91
92class TestCaseBase(unittest.TestCase):
93    def check_events(self, callable, expected):
94        events = capture_events(callable, self.new_watcher())
95        if events != expected:
96            self.fail("Expected events:\n%s\nReceived events:\n%s"
97                      % (pprint.pformat(expected), pprint.pformat(events)))
98
99
100class ProfileHookTestCase(TestCaseBase):
101    def new_watcher(self):
102        return HookWatcher()
103
104    def test_simple(self):
105        def f(p):
106            pass
107        f_ident = ident(f)
108        self.check_events(f, [(1, 'call', f_ident),
109                              (1, 'return', f_ident),
110                              ])
111
112    def test_exception(self):
113        def f(p):
114            1./0
115        f_ident = ident(f)
116        self.check_events(f, [(1, 'call', f_ident),
117                              (1, 'return', f_ident),
118                              ])
119
120    def test_caught_exception(self):
121        def f(p):
122            try: 1./0
123            except: pass
124        f_ident = ident(f)
125        self.check_events(f, [(1, 'call', f_ident),
126                              (1, 'return', f_ident),
127                              ])
128
129    def test_caught_nested_exception(self):
130        def f(p):
131            try: 1./0
132            except: pass
133        f_ident = ident(f)
134        self.check_events(f, [(1, 'call', f_ident),
135                              (1, 'return', f_ident),
136                              ])
137
138    def test_nested_exception(self):
139        def f(p):
140            1./0
141        f_ident = ident(f)
142        self.check_events(f, [(1, 'call', f_ident),
143                              # This isn't what I expected:
144                              # (0, 'exception', protect_ident),
145                              # I expected this again:
146                              (1, 'return', f_ident),
147                              ])
148
149    def test_exception_in_except_clause(self):
150        def f(p):
151            1./0
152        def g(p):
153            try:
154                f(p)
155            except:
156                try: f(p)
157                except: pass
158        f_ident = ident(f)
159        g_ident = ident(g)
160        self.check_events(g, [(1, 'call', g_ident),
161                              (2, 'call', f_ident),
162                              (2, 'return', f_ident),
163                              (3, 'call', f_ident),
164                              (3, 'return', f_ident),
165                              (1, 'return', g_ident),
166                              ])
167
168    def test_exception_propagation(self):
169        def f(p):
170            1./0
171        def g(p):
172            try: f(p)
173            finally: p.add_event("falling through")
174        f_ident = ident(f)
175        g_ident = ident(g)
176        self.check_events(g, [(1, 'call', g_ident),
177                              (2, 'call', f_ident),
178                              (2, 'return', f_ident),
179                              (1, 'falling through', g_ident),
180                              (1, 'return', g_ident),
181                              ])
182
183    def test_raise_twice(self):
184        def f(p):
185            try: 1./0
186            except: 1./0
187        f_ident = ident(f)
188        self.check_events(f, [(1, 'call', f_ident),
189                              (1, 'return', f_ident),
190                              ])
191
192    def test_raise_reraise(self):
193        def f(p):
194            try: 1./0
195            except: raise
196        f_ident = ident(f)
197        self.check_events(f, [(1, 'call', f_ident),
198                              (1, 'return', f_ident),
199                              ])
200
201    def test_raise(self):
202        def f(p):
203            raise Exception()
204        f_ident = ident(f)
205        self.check_events(f, [(1, 'call', f_ident),
206                              (1, 'return', f_ident),
207                              ])
208
209    def test_distant_exception(self):
210        def f():
211            1./0
212        def g():
213            f()
214        def h():
215            g()
216        def i():
217            h()
218        def j(p):
219            i()
220        f_ident = ident(f)
221        g_ident = ident(g)
222        h_ident = ident(h)
223        i_ident = ident(i)
224        j_ident = ident(j)
225        self.check_events(j, [(1, 'call', j_ident),
226                              (2, 'call', i_ident),
227                              (3, 'call', h_ident),
228                              (4, 'call', g_ident),
229                              (5, 'call', f_ident),
230                              (5, 'return', f_ident),
231                              (4, 'return', g_ident),
232                              (3, 'return', h_ident),
233                              (2, 'return', i_ident),
234                              (1, 'return', j_ident),
235                              ])
236
237    def test_generator(self):
238        def f():
239            for i in range(2):
240                yield i
241        def g(p):
242            for i in f():
243                pass
244        f_ident = ident(f)
245        g_ident = ident(g)
246        self.check_events(g, [(1, 'call', g_ident),
247                              # call the iterator twice to generate values
248                              (2, 'call', f_ident),
249                              (2, 'return', f_ident),
250                              (2, 'call', f_ident),
251                              (2, 'return', f_ident),
252                              # once more; returns end-of-iteration with
253                              # actually raising an exception
254                              (2, 'call', f_ident),
255                              (2, 'return', f_ident),
256                              (1, 'return', g_ident),
257                              ])
258
259    def test_stop_iteration(self):
260        def f():
261            for i in range(2):
262                yield i
263            raise StopIteration
264        def g(p):
265            for i in f():
266                pass
267        f_ident = ident(f)
268        g_ident = ident(g)
269        self.check_events(g, [(1, 'call', g_ident),
270                              # call the iterator twice to generate values
271                              (2, 'call', f_ident),
272                              (2, 'return', f_ident),
273                              (2, 'call', f_ident),
274                              (2, 'return', f_ident),
275                              # once more to hit the raise:
276                              (2, 'call', f_ident),
277                              (2, 'return', f_ident),
278                              (1, 'return', g_ident),
279                              ])
280
281
282class ProfileSimulatorTestCase(TestCaseBase):
283    def new_watcher(self):
284        return ProfileSimulator(self)
285
286    def test_simple(self):
287        def f(p):
288            pass
289        f_ident = ident(f)
290        self.check_events(f, [(1, 'call', f_ident),
291                              (1, 'return', f_ident),
292                              ])
293
294    def test_basic_exception(self):
295        def f(p):
296            1./0
297        f_ident = ident(f)
298        self.check_events(f, [(1, 'call', f_ident),
299                              (1, 'return', f_ident),
300                              ])
301
302    def test_caught_exception(self):
303        def f(p):
304            try: 1./0
305            except: pass
306        f_ident = ident(f)
307        self.check_events(f, [(1, 'call', f_ident),
308                              (1, 'return', f_ident),
309                              ])
310
311    def test_distant_exception(self):
312        def f():
313            1./0
314        def g():
315            f()
316        def h():
317            g()
318        def i():
319            h()
320        def j(p):
321            i()
322        f_ident = ident(f)
323        g_ident = ident(g)
324        h_ident = ident(h)
325        i_ident = ident(i)
326        j_ident = ident(j)
327        self.check_events(j, [(1, 'call', j_ident),
328                              (2, 'call', i_ident),
329                              (3, 'call', h_ident),
330                              (4, 'call', g_ident),
331                              (5, 'call', f_ident),
332                              (5, 'return', f_ident),
333                              (4, 'return', g_ident),
334                              (3, 'return', h_ident),
335                              (2, 'return', i_ident),
336                              (1, 'return', j_ident),
337                              ])
338
339
340def ident(function):
341    if hasattr(function, "f_code"):
342        code = function.f_code
343    else:
344        code = function.func_code
345    return code.co_firstlineno, code.co_name
346
347
348def protect(f, p):
349    try: f(p)
350    except: pass
351
352protect_ident = ident(protect)
353
354
355def capture_events(callable, p=None):
356    if p is None:
357        p = HookWatcher()
358    # Disable the garbage collector. This prevents __del__s from showing up in
359    # traces.
360    old_gc = gc.isenabled()
361    gc.disable()
362    try:
363        sys.setprofile(p.callback)
364        protect(callable, p)
365        sys.setprofile(None)
366    finally:
367        if old_gc:
368            gc.enable()
369    return p.get_events()[1:-1]
370
371
372def show_events(callable):
373    import pprint
374    pprint.pprint(capture_events(callable))
375
376
377def test_main():
378    test_support.run_unittest(
379        TestGetProfile,
380        ProfileHookTestCase,
381        ProfileSimulatorTestCase
382    )
383
384
385if __name__ == "__main__":
386    test_main()
387