• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import os
3import sys
4import tracemalloc
5import unittest
6from unittest.mock import patch
7from test.support.script_helper import (assert_python_ok, assert_python_failure,
8                                        interpreter_requires_environment)
9from test import support
10from test.support import os_helper
11from test.support import force_not_colorized
12
13try:
14    import _testcapi
15    import _testinternalcapi
16except ImportError:
17    _testcapi = None
18    _testinternalcapi = None
19
20
21EMPTY_STRING_SIZE = sys.getsizeof(b'')
22INVALID_NFRAME = (-1, 2**30)
23
24
25def get_frames(nframe, lineno_delta):
26    frames = []
27    frame = sys._getframe(1)
28    for index in range(nframe):
29        code = frame.f_code
30        lineno = frame.f_lineno + lineno_delta
31        frames.append((code.co_filename, lineno))
32        lineno_delta = 0
33        frame = frame.f_back
34        if frame is None:
35            break
36    return tuple(frames)
37
38def allocate_bytes(size):
39    nframe = tracemalloc.get_traceback_limit()
40    bytes_len = (size - EMPTY_STRING_SIZE)
41    frames = get_frames(nframe, 1)
42    data = b'x' * bytes_len
43    return data, tracemalloc.Traceback(frames, min(len(frames), nframe))
44
45def create_snapshots():
46    traceback_limit = 2
47
48    # _tracemalloc._get_traces() returns a list of (domain, size,
49    # traceback_frames) tuples. traceback_frames is a tuple of (filename,
50    # line_number) tuples.
51    raw_traces = [
52        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
53        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
54        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
55
56        (1, 2, (('a.py', 5), ('b.py', 4)), 3),
57
58        (2, 66, (('b.py', 1),), 1),
59
60        (3, 7, (('<unknown>', 0),), 1),
61    ]
62    snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
63
64    raw_traces2 = [
65        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
66        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
67        (0, 10, (('a.py', 2), ('b.py', 4)), 3),
68
69        (2, 2, (('a.py', 5), ('b.py', 4)), 3),
70        (2, 5000, (('a.py', 5), ('b.py', 4)), 3),
71
72        (4, 400, (('c.py', 578),), 1),
73    ]
74    snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
75
76    return (snapshot, snapshot2)
77
78def frame(filename, lineno):
79    return tracemalloc._Frame((filename, lineno))
80
81def traceback(*frames):
82    return tracemalloc.Traceback(frames)
83
84def traceback_lineno(filename, lineno):
85    return traceback((filename, lineno))
86
87def traceback_filename(filename):
88    return traceback_lineno(filename, 0)
89
90
91class TestTraceback(unittest.TestCase):
92    def test_repr(self):
93        def get_repr(*args) -> str:
94            return repr(tracemalloc.Traceback(*args))
95
96        self.assertEqual(get_repr(()), "<Traceback ()>")
97        self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>")
98
99        frames = (("f1", 1), ("f2", 2))
100        exp_repr_frames = (
101            "(<Frame filename='f2' lineno=2>,"
102            " <Frame filename='f1' lineno=1>)"
103        )
104        self.assertEqual(get_repr(frames),
105                         f"<Traceback {exp_repr_frames}>")
106        self.assertEqual(get_repr(frames, 2),
107                         f"<Traceback {exp_repr_frames} total_nframe=2>")
108
109
110class TestTracemallocEnabled(unittest.TestCase):
111    def setUp(self):
112        if tracemalloc.is_tracing():
113            self.skipTest("tracemalloc must be stopped before the test")
114
115        tracemalloc.start(1)
116
117    def tearDown(self):
118        tracemalloc.stop()
119
120    def test_get_tracemalloc_memory(self):
121        data = [allocate_bytes(123) for count in range(1000)]
122        size = tracemalloc.get_tracemalloc_memory()
123        self.assertGreaterEqual(size, 0)
124
125        tracemalloc.clear_traces()
126        size2 = tracemalloc.get_tracemalloc_memory()
127        self.assertGreaterEqual(size2, 0)
128        self.assertLessEqual(size2, size)
129
130    def test_get_object_traceback(self):
131        tracemalloc.clear_traces()
132        obj_size = 12345
133        obj, obj_traceback = allocate_bytes(obj_size)
134        traceback = tracemalloc.get_object_traceback(obj)
135        self.assertEqual(traceback, obj_traceback)
136
137    def test_new_reference(self):
138        tracemalloc.clear_traces()
139        # gc.collect() indirectly calls PyList_ClearFreeList()
140        support.gc_collect()
141
142        # Create a list and "destroy it": put it in the PyListObject free list
143        obj = []
144        obj = None
145
146        # Create a list which should reuse the previously created empty list
147        obj = []
148
149        nframe = tracemalloc.get_traceback_limit()
150        frames = get_frames(nframe, -3)
151        obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe))
152
153        traceback = tracemalloc.get_object_traceback(obj)
154        self.assertIsNotNone(traceback)
155        self.assertEqual(traceback, obj_traceback)
156
157    def test_set_traceback_limit(self):
158        obj_size = 10
159
160        tracemalloc.stop()
161        self.assertRaises(ValueError, tracemalloc.start, -1)
162
163        tracemalloc.stop()
164        tracemalloc.start(10)
165        obj2, obj2_traceback = allocate_bytes(obj_size)
166        traceback = tracemalloc.get_object_traceback(obj2)
167        self.assertEqual(len(traceback), 10)
168        self.assertEqual(traceback, obj2_traceback)
169
170        tracemalloc.stop()
171        tracemalloc.start(1)
172        obj, obj_traceback = allocate_bytes(obj_size)
173        traceback = tracemalloc.get_object_traceback(obj)
174        self.assertEqual(len(traceback), 1)
175        self.assertEqual(traceback, obj_traceback)
176
177    def find_trace(self, traces, traceback, size):
178        # filter also by size to ignore the memory allocated by
179        # _PyRefchain_Trace() if Python is built with Py_TRACE_REFS.
180        for trace in traces:
181            if trace[2] == traceback._frames and trace[1] == size:
182                return trace
183
184        self.fail("trace not found")
185
186    def test_get_traces(self):
187        tracemalloc.clear_traces()
188        obj_size = 12345
189        obj, obj_traceback = allocate_bytes(obj_size)
190
191        traces = tracemalloc._get_traces()
192        trace = self.find_trace(traces, obj_traceback, obj_size)
193
194        self.assertIsInstance(trace, tuple)
195        domain, size, traceback, length = trace
196        self.assertEqual(traceback, obj_traceback._frames)
197
198        tracemalloc.stop()
199        self.assertEqual(tracemalloc._get_traces(), [])
200
201    def test_get_traces_intern_traceback(self):
202        # dummy wrappers to get more useful and identical frames in the traceback
203        def allocate_bytes2(size):
204            return allocate_bytes(size)
205        def allocate_bytes3(size):
206            return allocate_bytes2(size)
207        def allocate_bytes4(size):
208            return allocate_bytes3(size)
209
210        # Ensure that two identical tracebacks are not duplicated
211        tracemalloc.stop()
212        tracemalloc.start(4)
213        obj1_size = 123
214        obj2_size = 125
215        obj1, obj1_traceback = allocate_bytes4(obj1_size)
216        obj2, obj2_traceback = allocate_bytes4(obj2_size)
217
218        traces = tracemalloc._get_traces()
219
220        obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
221        obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
222
223        trace1 = self.find_trace(traces, obj1_traceback, obj1_size)
224        trace2 = self.find_trace(traces, obj2_traceback, obj2_size)
225        domain1, size1, traceback1, length1 = trace1
226        domain2, size2, traceback2, length2 = trace2
227        self.assertIs(traceback2, traceback1)
228
229    def test_get_traced_memory(self):
230        # Python allocates some internals objects, so the test must tolerate
231        # a small difference between the expected size and the real usage
232        max_error = 2048
233
234        # allocate one object
235        obj_size = 1024 * 1024
236        tracemalloc.clear_traces()
237        obj, obj_traceback = allocate_bytes(obj_size)
238        size, peak_size = tracemalloc.get_traced_memory()
239        self.assertGreaterEqual(size, obj_size)
240        self.assertGreaterEqual(peak_size, size)
241
242        self.assertLessEqual(size - obj_size, max_error)
243        self.assertLessEqual(peak_size - size, max_error)
244
245        # destroy the object
246        obj = None
247        size2, peak_size2 = tracemalloc.get_traced_memory()
248        self.assertLess(size2, size)
249        self.assertGreaterEqual(size - size2, obj_size - max_error)
250        self.assertGreaterEqual(peak_size2, peak_size)
251
252        # clear_traces() must reset traced memory counters
253        tracemalloc.clear_traces()
254        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
255
256        # allocate another object
257        obj, obj_traceback = allocate_bytes(obj_size)
258        size, peak_size = tracemalloc.get_traced_memory()
259        self.assertGreaterEqual(size, obj_size)
260
261        # stop() also resets traced memory counters
262        tracemalloc.stop()
263        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
264
265    def test_clear_traces(self):
266        obj, obj_traceback = allocate_bytes(123)
267        traceback = tracemalloc.get_object_traceback(obj)
268        self.assertIsNotNone(traceback)
269
270        tracemalloc.clear_traces()
271        traceback2 = tracemalloc.get_object_traceback(obj)
272        self.assertIsNone(traceback2)
273
274    def test_reset_peak(self):
275        # Python allocates some internals objects, so the test must tolerate
276        # a small difference between the expected size and the real usage
277        tracemalloc.clear_traces()
278
279        # Example: allocate a large piece of memory, temporarily
280        large_sum = sum(list(range(100000)))
281        size1, peak1 = tracemalloc.get_traced_memory()
282
283        # reset_peak() resets peak to traced memory: peak2 < peak1
284        tracemalloc.reset_peak()
285        size2, peak2 = tracemalloc.get_traced_memory()
286        self.assertGreaterEqual(peak2, size2)
287        self.assertLess(peak2, peak1)
288
289        # check that peak continue to be updated if new memory is allocated:
290        # peak3 > peak2
291        obj_size = 1024 * 1024
292        obj, obj_traceback = allocate_bytes(obj_size)
293        size3, peak3 = tracemalloc.get_traced_memory()
294        self.assertGreaterEqual(peak3, size3)
295        self.assertGreater(peak3, peak2)
296        self.assertGreaterEqual(peak3 - peak2, obj_size)
297
298    def test_is_tracing(self):
299        tracemalloc.stop()
300        self.assertFalse(tracemalloc.is_tracing())
301
302        tracemalloc.start()
303        self.assertTrue(tracemalloc.is_tracing())
304
305    def test_snapshot(self):
306        obj, source = allocate_bytes(123)
307
308        # take a snapshot
309        snapshot = tracemalloc.take_snapshot()
310
311        # This can vary
312        self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10)
313
314        # write on disk
315        snapshot.dump(os_helper.TESTFN)
316        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
317
318        # load from disk
319        snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
320        self.assertEqual(snapshot2.traces, snapshot.traces)
321
322        # tracemalloc must be tracing memory allocations to take a snapshot
323        tracemalloc.stop()
324        with self.assertRaises(RuntimeError) as cm:
325            tracemalloc.take_snapshot()
326        self.assertEqual(str(cm.exception),
327                         "the tracemalloc module must be tracing memory "
328                         "allocations to take a snapshot")
329
330    def test_snapshot_save_attr(self):
331        # take a snapshot with a new attribute
332        snapshot = tracemalloc.take_snapshot()
333        snapshot.test_attr = "new"
334        snapshot.dump(os_helper.TESTFN)
335        self.addCleanup(os_helper.unlink, os_helper.TESTFN)
336
337        # load() should recreate the attribute
338        snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN)
339        self.assertEqual(snapshot2.test_attr, "new")
340
341    def fork_child(self):
342        if not tracemalloc.is_tracing():
343            return 2
344
345        obj_size = 12345
346        obj, obj_traceback = allocate_bytes(obj_size)
347        traceback = tracemalloc.get_object_traceback(obj)
348        if traceback is None:
349            return 3
350
351        # everything is fine
352        return 0
353
354    @support.requires_fork()
355    def test_fork(self):
356        # check that tracemalloc is still working after fork
357        pid = os.fork()
358        if not pid:
359            # child
360            exitcode = 1
361            try:
362                exitcode = self.fork_child()
363            finally:
364                os._exit(exitcode)
365        else:
366            support.wait_process(pid, exitcode=0)
367
368    def test_no_incomplete_frames(self):
369        tracemalloc.stop()
370        tracemalloc.start(8)
371
372        def f(x):
373            def g():
374                return x
375            return g
376
377        obj = f(0).__closure__[0]
378        traceback = tracemalloc.get_object_traceback(obj)
379        self.assertIn("test_tracemalloc", traceback[-1].filename)
380        self.assertNotIn("test_tracemalloc", traceback[-2].filename)
381
382
383class TestSnapshot(unittest.TestCase):
384    maxDiff = 4000
385
386    def test_create_snapshot(self):
387        raw_traces = [(0, 5, (('a.py', 2),), 10)]
388
389        with contextlib.ExitStack() as stack:
390            stack.enter_context(patch.object(tracemalloc, 'is_tracing',
391                                             return_value=True))
392            stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
393                                             return_value=5))
394            stack.enter_context(patch.object(tracemalloc, '_get_traces',
395                                             return_value=raw_traces))
396
397            snapshot = tracemalloc.take_snapshot()
398            self.assertEqual(snapshot.traceback_limit, 5)
399            self.assertEqual(len(snapshot.traces), 1)
400            trace = snapshot.traces[0]
401            self.assertEqual(trace.size, 5)
402            self.assertEqual(trace.traceback.total_nframe, 10)
403            self.assertEqual(len(trace.traceback), 1)
404            self.assertEqual(trace.traceback[0].filename, 'a.py')
405            self.assertEqual(trace.traceback[0].lineno, 2)
406
407    def test_filter_traces(self):
408        snapshot, snapshot2 = create_snapshots()
409        filter1 = tracemalloc.Filter(False, "b.py")
410        filter2 = tracemalloc.Filter(True, "a.py", 2)
411        filter3 = tracemalloc.Filter(True, "a.py", 5)
412
413        original_traces = list(snapshot.traces._traces)
414
415        # exclude b.py
416        snapshot3 = snapshot.filter_traces((filter1,))
417        self.assertEqual(snapshot3.traces._traces, [
418            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
419            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
420            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
421            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
422            (3, 7, (('<unknown>', 0),), 1),
423        ])
424
425        # filter_traces() must not touch the original snapshot
426        self.assertEqual(snapshot.traces._traces, original_traces)
427
428        # only include two lines of a.py
429        snapshot4 = snapshot3.filter_traces((filter2, filter3))
430        self.assertEqual(snapshot4.traces._traces, [
431            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
432            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
433            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
434            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
435        ])
436
437        # No filter: just duplicate the snapshot
438        snapshot5 = snapshot.filter_traces(())
439        self.assertIsNot(snapshot5, snapshot)
440        self.assertIsNot(snapshot5.traces, snapshot.traces)
441        self.assertEqual(snapshot5.traces, snapshot.traces)
442
443        self.assertRaises(TypeError, snapshot.filter_traces, filter1)
444
445    def test_filter_traces_domain(self):
446        snapshot, snapshot2 = create_snapshots()
447        filter1 = tracemalloc.Filter(False, "a.py", domain=1)
448        filter2 = tracemalloc.Filter(True, "a.py", domain=1)
449
450        original_traces = list(snapshot.traces._traces)
451
452        # exclude a.py of domain 1
453        snapshot3 = snapshot.filter_traces((filter1,))
454        self.assertEqual(snapshot3.traces._traces, [
455            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
456            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
457            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
458            (2, 66, (('b.py', 1),), 1),
459            (3, 7, (('<unknown>', 0),), 1),
460        ])
461
462        # include domain 1
463        snapshot3 = snapshot.filter_traces((filter1,))
464        self.assertEqual(snapshot3.traces._traces, [
465            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
466            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
467            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
468            (2, 66, (('b.py', 1),), 1),
469            (3, 7, (('<unknown>', 0),), 1),
470        ])
471
472    def test_filter_traces_domain_filter(self):
473        snapshot, snapshot2 = create_snapshots()
474        filter1 = tracemalloc.DomainFilter(False, domain=3)
475        filter2 = tracemalloc.DomainFilter(True, domain=3)
476
477        # exclude domain 2
478        snapshot3 = snapshot.filter_traces((filter1,))
479        self.assertEqual(snapshot3.traces._traces, [
480            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
481            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
482            (0, 10, (('a.py', 2), ('b.py', 4)), 3),
483            (1, 2, (('a.py', 5), ('b.py', 4)), 3),
484            (2, 66, (('b.py', 1),), 1),
485        ])
486
487        # include domain 2
488        snapshot3 = snapshot.filter_traces((filter2,))
489        self.assertEqual(snapshot3.traces._traces, [
490            (3, 7, (('<unknown>', 0),), 1),
491        ])
492
493    def test_snapshot_group_by_line(self):
494        snapshot, snapshot2 = create_snapshots()
495        tb_0 = traceback_lineno('<unknown>', 0)
496        tb_a_2 = traceback_lineno('a.py', 2)
497        tb_a_5 = traceback_lineno('a.py', 5)
498        tb_b_1 = traceback_lineno('b.py', 1)
499        tb_c_578 = traceback_lineno('c.py', 578)
500
501        # stats per file and line
502        stats1 = snapshot.statistics('lineno')
503        self.assertEqual(stats1, [
504            tracemalloc.Statistic(tb_b_1, 66, 1),
505            tracemalloc.Statistic(tb_a_2, 30, 3),
506            tracemalloc.Statistic(tb_0, 7, 1),
507            tracemalloc.Statistic(tb_a_5, 2, 1),
508        ])
509
510        # stats per file and line (2)
511        stats2 = snapshot2.statistics('lineno')
512        self.assertEqual(stats2, [
513            tracemalloc.Statistic(tb_a_5, 5002, 2),
514            tracemalloc.Statistic(tb_c_578, 400, 1),
515            tracemalloc.Statistic(tb_a_2, 30, 3),
516        ])
517
518        # stats diff per file and line
519        statistics = snapshot2.compare_to(snapshot, 'lineno')
520        self.assertEqual(statistics, [
521            tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
522            tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
523            tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
524            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
525            tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
526        ])
527
528    def test_snapshot_group_by_file(self):
529        snapshot, snapshot2 = create_snapshots()
530        tb_0 = traceback_filename('<unknown>')
531        tb_a = traceback_filename('a.py')
532        tb_b = traceback_filename('b.py')
533        tb_c = traceback_filename('c.py')
534
535        # stats per file
536        stats1 = snapshot.statistics('filename')
537        self.assertEqual(stats1, [
538            tracemalloc.Statistic(tb_b, 66, 1),
539            tracemalloc.Statistic(tb_a, 32, 4),
540            tracemalloc.Statistic(tb_0, 7, 1),
541        ])
542
543        # stats per file (2)
544        stats2 = snapshot2.statistics('filename')
545        self.assertEqual(stats2, [
546            tracemalloc.Statistic(tb_a, 5032, 5),
547            tracemalloc.Statistic(tb_c, 400, 1),
548        ])
549
550        # stats diff per file
551        diff = snapshot2.compare_to(snapshot, 'filename')
552        self.assertEqual(diff, [
553            tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
554            tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
555            tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
556            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
557        ])
558
559    def test_snapshot_group_by_traceback(self):
560        snapshot, snapshot2 = create_snapshots()
561
562        # stats per file
563        tb1 = traceback(('a.py', 2), ('b.py', 4))
564        tb2 = traceback(('a.py', 5), ('b.py', 4))
565        tb3 = traceback(('b.py', 1))
566        tb4 = traceback(('<unknown>', 0))
567        stats1 = snapshot.statistics('traceback')
568        self.assertEqual(stats1, [
569            tracemalloc.Statistic(tb3, 66, 1),
570            tracemalloc.Statistic(tb1, 30, 3),
571            tracemalloc.Statistic(tb4, 7, 1),
572            tracemalloc.Statistic(tb2, 2, 1),
573        ])
574
575        # stats per file (2)
576        tb5 = traceback(('c.py', 578))
577        stats2 = snapshot2.statistics('traceback')
578        self.assertEqual(stats2, [
579            tracemalloc.Statistic(tb2, 5002, 2),
580            tracemalloc.Statistic(tb5, 400, 1),
581            tracemalloc.Statistic(tb1, 30, 3),
582        ])
583
584        # stats diff per file
585        diff = snapshot2.compare_to(snapshot, 'traceback')
586        self.assertEqual(diff, [
587            tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
588            tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
589            tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
590            tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
591            tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
592        ])
593
594        self.assertRaises(ValueError,
595                          snapshot.statistics, 'traceback', cumulative=True)
596
597    def test_snapshot_group_by_cumulative(self):
598        snapshot, snapshot2 = create_snapshots()
599        tb_0 = traceback_filename('<unknown>')
600        tb_a = traceback_filename('a.py')
601        tb_b = traceback_filename('b.py')
602        tb_a_2 = traceback_lineno('a.py', 2)
603        tb_a_5 = traceback_lineno('a.py', 5)
604        tb_b_1 = traceback_lineno('b.py', 1)
605        tb_b_4 = traceback_lineno('b.py', 4)
606
607        # per file
608        stats = snapshot.statistics('filename', True)
609        self.assertEqual(stats, [
610            tracemalloc.Statistic(tb_b, 98, 5),
611            tracemalloc.Statistic(tb_a, 32, 4),
612            tracemalloc.Statistic(tb_0, 7, 1),
613        ])
614
615        # per line
616        stats = snapshot.statistics('lineno', True)
617        self.assertEqual(stats, [
618            tracemalloc.Statistic(tb_b_1, 66, 1),
619            tracemalloc.Statistic(tb_b_4, 32, 4),
620            tracemalloc.Statistic(tb_a_2, 30, 3),
621            tracemalloc.Statistic(tb_0, 7, 1),
622            tracemalloc.Statistic(tb_a_5, 2, 1),
623        ])
624
625    def test_trace_format(self):
626        snapshot, snapshot2 = create_snapshots()
627        trace = snapshot.traces[0]
628        self.assertEqual(str(trace), 'b.py:4: 10 B')
629        traceback = trace.traceback
630        self.assertEqual(str(traceback), 'b.py:4')
631        frame = traceback[0]
632        self.assertEqual(str(frame), 'b.py:4')
633
634    def test_statistic_format(self):
635        snapshot, snapshot2 = create_snapshots()
636        stats = snapshot.statistics('lineno')
637        stat = stats[0]
638        self.assertEqual(str(stat),
639                         'b.py:1: size=66 B, count=1, average=66 B')
640
641    def test_statistic_diff_format(self):
642        snapshot, snapshot2 = create_snapshots()
643        stats = snapshot2.compare_to(snapshot, 'lineno')
644        stat = stats[0]
645        self.assertEqual(str(stat),
646                         'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
647
648    def test_slices(self):
649        snapshot, snapshot2 = create_snapshots()
650        self.assertEqual(snapshot.traces[:2],
651                         (snapshot.traces[0], snapshot.traces[1]))
652
653        traceback = snapshot.traces[0].traceback
654        self.assertEqual(traceback[:2],
655                         (traceback[0], traceback[1]))
656
657    def test_format_traceback(self):
658        snapshot, snapshot2 = create_snapshots()
659        def getline(filename, lineno):
660            return '  <%s, %s>' % (filename, lineno)
661        with unittest.mock.patch('tracemalloc.linecache.getline',
662                                 side_effect=getline):
663            tb = snapshot.traces[0].traceback
664            self.assertEqual(tb.format(),
665                             ['  File "b.py", line 4',
666                              '    <b.py, 4>',
667                              '  File "a.py", line 2',
668                              '    <a.py, 2>'])
669
670            self.assertEqual(tb.format(limit=1),
671                             ['  File "a.py", line 2',
672                              '    <a.py, 2>'])
673
674            self.assertEqual(tb.format(limit=-1),
675                             ['  File "b.py", line 4',
676                              '    <b.py, 4>'])
677
678            self.assertEqual(tb.format(most_recent_first=True),
679                             ['  File "a.py", line 2',
680                              '    <a.py, 2>',
681                              '  File "b.py", line 4',
682                              '    <b.py, 4>'])
683
684            self.assertEqual(tb.format(limit=1, most_recent_first=True),
685                             ['  File "a.py", line 2',
686                              '    <a.py, 2>'])
687
688            self.assertEqual(tb.format(limit=-1, most_recent_first=True),
689                             ['  File "b.py", line 4',
690                              '    <b.py, 4>'])
691
692
693class TestFilters(unittest.TestCase):
694    maxDiff = 2048
695
696    def test_filter_attributes(self):
697        # test default values
698        f = tracemalloc.Filter(True, "abc")
699        self.assertEqual(f.inclusive, True)
700        self.assertEqual(f.filename_pattern, "abc")
701        self.assertIsNone(f.lineno)
702        self.assertEqual(f.all_frames, False)
703
704        # test custom values
705        f = tracemalloc.Filter(False, "test.py", 123, True)
706        self.assertEqual(f.inclusive, False)
707        self.assertEqual(f.filename_pattern, "test.py")
708        self.assertEqual(f.lineno, 123)
709        self.assertEqual(f.all_frames, True)
710
711        # parameters passed by keyword
712        f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
713        self.assertEqual(f.inclusive, False)
714        self.assertEqual(f.filename_pattern, "test.py")
715        self.assertEqual(f.lineno, 123)
716        self.assertEqual(f.all_frames, True)
717
718        # read-only attribute
719        self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
720
721    def test_filter_match(self):
722        # filter without line number
723        f = tracemalloc.Filter(True, "abc")
724        self.assertTrue(f._match_frame("abc", 0))
725        self.assertTrue(f._match_frame("abc", 5))
726        self.assertTrue(f._match_frame("abc", 10))
727        self.assertFalse(f._match_frame("12356", 0))
728        self.assertFalse(f._match_frame("12356", 5))
729        self.assertFalse(f._match_frame("12356", 10))
730
731        f = tracemalloc.Filter(False, "abc")
732        self.assertFalse(f._match_frame("abc", 0))
733        self.assertFalse(f._match_frame("abc", 5))
734        self.assertFalse(f._match_frame("abc", 10))
735        self.assertTrue(f._match_frame("12356", 0))
736        self.assertTrue(f._match_frame("12356", 5))
737        self.assertTrue(f._match_frame("12356", 10))
738
739        # filter with line number > 0
740        f = tracemalloc.Filter(True, "abc", 5)
741        self.assertFalse(f._match_frame("abc", 0))
742        self.assertTrue(f._match_frame("abc", 5))
743        self.assertFalse(f._match_frame("abc", 10))
744        self.assertFalse(f._match_frame("12356", 0))
745        self.assertFalse(f._match_frame("12356", 5))
746        self.assertFalse(f._match_frame("12356", 10))
747
748        f = tracemalloc.Filter(False, "abc", 5)
749        self.assertTrue(f._match_frame("abc", 0))
750        self.assertFalse(f._match_frame("abc", 5))
751        self.assertTrue(f._match_frame("abc", 10))
752        self.assertTrue(f._match_frame("12356", 0))
753        self.assertTrue(f._match_frame("12356", 5))
754        self.assertTrue(f._match_frame("12356", 10))
755
756        # filter with line number 0
757        f = tracemalloc.Filter(True, "abc", 0)
758        self.assertTrue(f._match_frame("abc", 0))
759        self.assertFalse(f._match_frame("abc", 5))
760        self.assertFalse(f._match_frame("abc", 10))
761        self.assertFalse(f._match_frame("12356", 0))
762        self.assertFalse(f._match_frame("12356", 5))
763        self.assertFalse(f._match_frame("12356", 10))
764
765        f = tracemalloc.Filter(False, "abc", 0)
766        self.assertFalse(f._match_frame("abc", 0))
767        self.assertTrue(f._match_frame("abc", 5))
768        self.assertTrue(f._match_frame("abc", 10))
769        self.assertTrue(f._match_frame("12356", 0))
770        self.assertTrue(f._match_frame("12356", 5))
771        self.assertTrue(f._match_frame("12356", 10))
772
773    def test_filter_match_filename(self):
774        def fnmatch(inclusive, filename, pattern):
775            f = tracemalloc.Filter(inclusive, pattern)
776            return f._match_frame(filename, 0)
777
778        self.assertTrue(fnmatch(True, "abc", "abc"))
779        self.assertFalse(fnmatch(True, "12356", "abc"))
780        self.assertFalse(fnmatch(True, "<unknown>", "abc"))
781
782        self.assertFalse(fnmatch(False, "abc", "abc"))
783        self.assertTrue(fnmatch(False, "12356", "abc"))
784        self.assertTrue(fnmatch(False, "<unknown>", "abc"))
785
786    def test_filter_match_filename_joker(self):
787        def fnmatch(filename, pattern):
788            filter = tracemalloc.Filter(True, pattern)
789            return filter._match_frame(filename, 0)
790
791        # empty string
792        self.assertFalse(fnmatch('abc', ''))
793        self.assertFalse(fnmatch('', 'abc'))
794        self.assertTrue(fnmatch('', ''))
795        self.assertTrue(fnmatch('', '*'))
796
797        # no *
798        self.assertTrue(fnmatch('abc', 'abc'))
799        self.assertFalse(fnmatch('abc', 'abcd'))
800        self.assertFalse(fnmatch('abc', 'def'))
801
802        # a*
803        self.assertTrue(fnmatch('abc', 'a*'))
804        self.assertTrue(fnmatch('abc', 'abc*'))
805        self.assertFalse(fnmatch('abc', 'b*'))
806        self.assertFalse(fnmatch('abc', 'abcd*'))
807
808        # a*b
809        self.assertTrue(fnmatch('abc', 'a*c'))
810        self.assertTrue(fnmatch('abcdcx', 'a*cx'))
811        self.assertFalse(fnmatch('abb', 'a*c'))
812        self.assertFalse(fnmatch('abcdce', 'a*cx'))
813
814        # a*b*c
815        self.assertTrue(fnmatch('abcde', 'a*c*e'))
816        self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
817        self.assertFalse(fnmatch('abcdd', 'a*c*e'))
818        self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
819
820        # replace .pyc suffix with .py
821        self.assertTrue(fnmatch('a.pyc', 'a.py'))
822        self.assertTrue(fnmatch('a.py', 'a.pyc'))
823
824        if os.name == 'nt':
825            # case insensitive
826            self.assertTrue(fnmatch('aBC', 'ABc'))
827            self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
828
829            self.assertTrue(fnmatch('a.pyc', 'a.PY'))
830            self.assertTrue(fnmatch('a.py', 'a.PYC'))
831        else:
832            # case sensitive
833            self.assertFalse(fnmatch('aBC', 'ABc'))
834            self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
835
836            self.assertFalse(fnmatch('a.pyc', 'a.PY'))
837            self.assertFalse(fnmatch('a.py', 'a.PYC'))
838
839        if os.name == 'nt':
840            # normalize alternate separator "/" to the standard separator "\"
841            self.assertTrue(fnmatch(r'a/b', r'a\b'))
842            self.assertTrue(fnmatch(r'a\b', r'a/b'))
843            self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
844            self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
845        else:
846            # there is no alternate separator
847            self.assertFalse(fnmatch(r'a/b', r'a\b'))
848            self.assertFalse(fnmatch(r'a\b', r'a/b'))
849            self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
850            self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
851
852        # as of 3.5, .pyo is no longer munged to .py
853        self.assertFalse(fnmatch('a.pyo', 'a.py'))
854
855    def test_filter_match_trace(self):
856        t1 = (("a.py", 2), ("b.py", 3))
857        t2 = (("b.py", 4), ("b.py", 5))
858        t3 = (("c.py", 5), ('<unknown>', 0))
859        unknown = (('<unknown>', 0),)
860
861        f = tracemalloc.Filter(True, "b.py", all_frames=True)
862        self.assertTrue(f._match_traceback(t1))
863        self.assertTrue(f._match_traceback(t2))
864        self.assertFalse(f._match_traceback(t3))
865        self.assertFalse(f._match_traceback(unknown))
866
867        f = tracemalloc.Filter(True, "b.py", all_frames=False)
868        self.assertFalse(f._match_traceback(t1))
869        self.assertTrue(f._match_traceback(t2))
870        self.assertFalse(f._match_traceback(t3))
871        self.assertFalse(f._match_traceback(unknown))
872
873        f = tracemalloc.Filter(False, "b.py", all_frames=True)
874        self.assertFalse(f._match_traceback(t1))
875        self.assertFalse(f._match_traceback(t2))
876        self.assertTrue(f._match_traceback(t3))
877        self.assertTrue(f._match_traceback(unknown))
878
879        f = tracemalloc.Filter(False, "b.py", all_frames=False)
880        self.assertTrue(f._match_traceback(t1))
881        self.assertFalse(f._match_traceback(t2))
882        self.assertTrue(f._match_traceback(t3))
883        self.assertTrue(f._match_traceback(unknown))
884
885        f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
886        self.assertTrue(f._match_traceback(t1))
887        self.assertTrue(f._match_traceback(t2))
888        self.assertTrue(f._match_traceback(t3))
889        self.assertFalse(f._match_traceback(unknown))
890
891        f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
892        self.assertFalse(f._match_traceback(t1))
893        self.assertFalse(f._match_traceback(t2))
894        self.assertTrue(f._match_traceback(t3))
895        self.assertTrue(f._match_traceback(unknown))
896
897        f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
898        self.assertTrue(f._match_traceback(t1))
899        self.assertTrue(f._match_traceback(t2))
900        self.assertFalse(f._match_traceback(t3))
901        self.assertFalse(f._match_traceback(unknown))
902
903
904class TestCommandLine(unittest.TestCase):
905    def test_env_var_disabled_by_default(self):
906        # not tracing by default
907        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
908        ok, stdout, stderr = assert_python_ok('-c', code)
909        stdout = stdout.rstrip()
910        self.assertEqual(stdout, b'False')
911
912    @unittest.skipIf(interpreter_requires_environment(),
913                     'Cannot run -E tests when PYTHON env vars are required.')
914    def test_env_var_ignored_with_E(self):
915        """PYTHON* environment variables must be ignored when -E is present."""
916        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
917        ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
918        stdout = stdout.rstrip()
919        self.assertEqual(stdout, b'False')
920
921    def test_env_var_disabled(self):
922        # tracing at startup
923        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
924        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0')
925        stdout = stdout.rstrip()
926        self.assertEqual(stdout, b'False')
927
928    def test_env_var_enabled_at_startup(self):
929        # tracing at startup
930        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
931        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
932        stdout = stdout.rstrip()
933        self.assertEqual(stdout, b'True')
934
935    def test_env_limit(self):
936        # start and set the number of frames
937        code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
938        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
939        stdout = stdout.rstrip()
940        self.assertEqual(stdout, b'10')
941
942    @force_not_colorized
943    def check_env_var_invalid(self, nframe):
944        with support.SuppressCrashReport():
945            ok, stdout, stderr = assert_python_failure(
946                '-c', 'pass',
947                PYTHONTRACEMALLOC=str(nframe))
948
949        if b'ValueError: the number of frames must be in range' in stderr:
950            return
951        if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr:
952            return
953        self.fail(f"unexpected output: {stderr!a}")
954
955
956    def test_env_var_invalid(self):
957        for nframe in INVALID_NFRAME:
958            with self.subTest(nframe=nframe):
959                self.check_env_var_invalid(nframe)
960
961    def test_sys_xoptions(self):
962        for xoptions, nframe in (
963            ('tracemalloc', 1),
964            ('tracemalloc=1', 1),
965            ('tracemalloc=15', 15),
966        ):
967            with self.subTest(xoptions=xoptions, nframe=nframe):
968                code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
969                ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
970                stdout = stdout.rstrip()
971                self.assertEqual(stdout, str(nframe).encode('ascii'))
972
973    def check_sys_xoptions_invalid(self, nframe):
974        args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
975        with support.SuppressCrashReport():
976            ok, stdout, stderr = assert_python_failure(*args)
977
978        if b'ValueError: the number of frames must be in range' in stderr:
979            return
980        if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr:
981            return
982        self.fail(f"unexpected output: {stderr!a}")
983
984    def test_sys_xoptions_invalid(self):
985        for nframe in INVALID_NFRAME:
986            with self.subTest(nframe=nframe):
987                self.check_sys_xoptions_invalid(nframe)
988
989    @unittest.skipIf(_testcapi is None, 'need _testcapi')
990    def test_pymem_alloc0(self):
991        # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
992        # does not crash.
993        code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
994        assert_python_ok('-X', 'tracemalloc', '-c', code)
995
996
997@unittest.skipIf(_testcapi is None, 'need _testcapi')
998class TestCAPI(unittest.TestCase):
999    maxDiff = 80 * 20
1000
1001    def setUp(self):
1002        if tracemalloc.is_tracing():
1003            self.skipTest("tracemalloc must be stopped before the test")
1004
1005        self.domain = 5
1006        self.size = 123
1007        self.obj = allocate_bytes(self.size)[0]
1008
1009        # for the type "object", id(obj) is the address of its memory block.
1010        # This type is not tracked by the garbage collector
1011        self.ptr = id(self.obj)
1012
1013    def tearDown(self):
1014        tracemalloc.stop()
1015
1016    def get_traceback(self):
1017        frames = _testinternalcapi._PyTraceMalloc_GetTraceback(self.domain, self.ptr)
1018        if frames is not None:
1019            return tracemalloc.Traceback(frames)
1020        else:
1021            return None
1022
1023    def track(self, release_gil=False, nframe=1):
1024        frames = get_frames(nframe, 1)
1025        _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
1026                                    release_gil)
1027        return frames
1028
1029    def untrack(self):
1030        _testcapi.tracemalloc_untrack(self.domain, self.ptr)
1031
1032    def get_traced_memory(self):
1033        # Get the traced size in the domain
1034        snapshot = tracemalloc.take_snapshot()
1035        domain_filter = tracemalloc.DomainFilter(True, self.domain)
1036        snapshot = snapshot.filter_traces([domain_filter])
1037        return sum(trace.size for trace in snapshot.traces)
1038
1039    def check_track(self, release_gil):
1040        nframe = 5
1041        tracemalloc.start(nframe)
1042
1043        size = tracemalloc.get_traced_memory()[0]
1044
1045        frames = self.track(release_gil, nframe)
1046        self.assertEqual(self.get_traceback(),
1047                         tracemalloc.Traceback(frames))
1048
1049        self.assertEqual(self.get_traced_memory(), self.size)
1050
1051    def test_track(self):
1052        self.check_track(False)
1053
1054    def test_track_without_gil(self):
1055        # check that calling _PyTraceMalloc_Track() without holding the GIL
1056        # works too
1057        self.check_track(True)
1058
1059    def test_track_already_tracked(self):
1060        nframe = 5
1061        tracemalloc.start(nframe)
1062
1063        # track a first time
1064        self.track()
1065
1066        # calling _PyTraceMalloc_Track() must remove the old trace and add
1067        # a new trace with the new traceback
1068        frames = self.track(nframe=nframe)
1069        self.assertEqual(self.get_traceback(),
1070                         tracemalloc.Traceback(frames))
1071
1072    def test_untrack(self):
1073        tracemalloc.start()
1074
1075        self.track()
1076        self.assertIsNotNone(self.get_traceback())
1077        self.assertEqual(self.get_traced_memory(), self.size)
1078
1079        # untrack must remove the trace
1080        self.untrack()
1081        self.assertIsNone(self.get_traceback())
1082        self.assertEqual(self.get_traced_memory(), 0)
1083
1084        # calling _PyTraceMalloc_Untrack() multiple times must not crash
1085        self.untrack()
1086        self.untrack()
1087
1088    def test_stop_track(self):
1089        tracemalloc.start()
1090        tracemalloc.stop()
1091
1092        with self.assertRaises(RuntimeError):
1093            self.track()
1094        self.assertIsNone(self.get_traceback())
1095
1096    def test_stop_untrack(self):
1097        tracemalloc.start()
1098        self.track()
1099
1100        tracemalloc.stop()
1101        with self.assertRaises(RuntimeError):
1102            self.untrack()
1103
1104
1105if __name__ == "__main__":
1106    unittest.main()
1107