• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import contextlib
2import os
3import sys
4import tracemalloc
5import unittest
6from unittest.mock import patch
7from test.support.script_helper import (assert_python_ok, assert_python_failure,
8                                        interpreter_requires_environment)
9from test import support
10try:
11    import threading
12except ImportError:
13    threading = None
14try:
15    import _testcapi
16except ImportError:
17    _testcapi = None
18
19
20EMPTY_STRING_SIZE = sys.getsizeof(b'')
21
22
23def get_frames(nframe, lineno_delta):
24    frames = []
25    frame = sys._getframe(1)
26    for index in range(nframe):
27        code = frame.f_code
28        lineno = frame.f_lineno + lineno_delta
29        frames.append((code.co_filename, lineno))
30        lineno_delta = 0
31        frame = frame.f_back
32        if frame is None:
33            break
34    return tuple(frames)
35
36def allocate_bytes(size):
37    nframe = tracemalloc.get_traceback_limit()
38    bytes_len = (size - EMPTY_STRING_SIZE)
39    frames = get_frames(nframe, 1)
40    data = b'x' * bytes_len
41    return data, tracemalloc.Traceback(frames)
42
43def create_snapshots():
44    traceback_limit = 2
45
46    # _tracemalloc._get_traces() returns a list of (domain, size,
47    # traceback_frames) tuples. traceback_frames is a tuple of (filename,
48    # line_number) tuples.
49    raw_traces = [
50        (0, 10, (('a.py', 2), ('b.py', 4))),
51        (0, 10, (('a.py', 2), ('b.py', 4))),
52        (0, 10, (('a.py', 2), ('b.py', 4))),
53
54        (1, 2, (('a.py', 5), ('b.py', 4))),
55
56        (2, 66, (('b.py', 1),)),
57
58        (3, 7, (('<unknown>', 0),)),
59    ]
60    snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
61
62    raw_traces2 = [
63        (0, 10, (('a.py', 2), ('b.py', 4))),
64        (0, 10, (('a.py', 2), ('b.py', 4))),
65        (0, 10, (('a.py', 2), ('b.py', 4))),
66
67        (2, 2, (('a.py', 5), ('b.py', 4))),
68        (2, 5000, (('a.py', 5), ('b.py', 4))),
69
70        (4, 400, (('c.py', 578),)),
71    ]
72    snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
73
74    return (snapshot, snapshot2)
75
76def frame(filename, lineno):
77    return tracemalloc._Frame((filename, lineno))
78
79def traceback(*frames):
80    return tracemalloc.Traceback(frames)
81
82def traceback_lineno(filename, lineno):
83    return traceback((filename, lineno))
84
85def traceback_filename(filename):
86    return traceback_lineno(filename, 0)
87
88
89class TestTracemallocEnabled(unittest.TestCase):
90    def setUp(self):
91        if tracemalloc.is_tracing():
92            self.skipTest("tracemalloc must be stopped before the test")
93
94        tracemalloc.start(1)
95
96    def tearDown(self):
97        tracemalloc.stop()
98
99    def test_get_tracemalloc_memory(self):
100        data = [allocate_bytes(123) for count in range(1000)]
101        size = tracemalloc.get_tracemalloc_memory()
102        self.assertGreaterEqual(size, 0)
103
104        tracemalloc.clear_traces()
105        size2 = tracemalloc.get_tracemalloc_memory()
106        self.assertGreaterEqual(size2, 0)
107        self.assertLessEqual(size2, size)
108
109    def test_get_object_traceback(self):
110        tracemalloc.clear_traces()
111        obj_size = 12345
112        obj, obj_traceback = allocate_bytes(obj_size)
113        traceback = tracemalloc.get_object_traceback(obj)
114        self.assertEqual(traceback, obj_traceback)
115
116    def test_set_traceback_limit(self):
117        obj_size = 10
118
119        tracemalloc.stop()
120        self.assertRaises(ValueError, tracemalloc.start, -1)
121
122        tracemalloc.stop()
123        tracemalloc.start(10)
124        obj2, obj2_traceback = allocate_bytes(obj_size)
125        traceback = tracemalloc.get_object_traceback(obj2)
126        self.assertEqual(len(traceback), 10)
127        self.assertEqual(traceback, obj2_traceback)
128
129        tracemalloc.stop()
130        tracemalloc.start(1)
131        obj, obj_traceback = allocate_bytes(obj_size)
132        traceback = tracemalloc.get_object_traceback(obj)
133        self.assertEqual(len(traceback), 1)
134        self.assertEqual(traceback, obj_traceback)
135
136    def find_trace(self, traces, traceback):
137        for trace in traces:
138            if trace[2] == traceback._frames:
139                return trace
140
141        self.fail("trace not found")
142
143    def test_get_traces(self):
144        tracemalloc.clear_traces()
145        obj_size = 12345
146        obj, obj_traceback = allocate_bytes(obj_size)
147
148        traces = tracemalloc._get_traces()
149        trace = self.find_trace(traces, obj_traceback)
150
151        self.assertIsInstance(trace, tuple)
152        domain, size, traceback = trace
153        self.assertEqual(size, obj_size)
154        self.assertEqual(traceback, obj_traceback._frames)
155
156        tracemalloc.stop()
157        self.assertEqual(tracemalloc._get_traces(), [])
158
159    def test_get_traces_intern_traceback(self):
160        # dummy wrappers to get more useful and identical frames in the traceback
161        def allocate_bytes2(size):
162            return allocate_bytes(size)
163        def allocate_bytes3(size):
164            return allocate_bytes2(size)
165        def allocate_bytes4(size):
166            return allocate_bytes3(size)
167
168        # Ensure that two identical tracebacks are not duplicated
169        tracemalloc.stop()
170        tracemalloc.start(4)
171        obj_size = 123
172        obj1, obj1_traceback = allocate_bytes4(obj_size)
173        obj2, obj2_traceback = allocate_bytes4(obj_size)
174
175        traces = tracemalloc._get_traces()
176
177        trace1 = self.find_trace(traces, obj1_traceback)
178        trace2 = self.find_trace(traces, obj2_traceback)
179        domain1, size1, traceback1 = trace1
180        domain2, size2, traceback2 = trace2
181        self.assertIs(traceback2, traceback1)
182
183    def test_get_traced_memory(self):
184        # Python allocates some internals objects, so the test must tolerate
185        # a small difference between the expected size and the real usage
186        max_error = 2048
187
188        # allocate one object
189        obj_size = 1024 * 1024
190        tracemalloc.clear_traces()
191        obj, obj_traceback = allocate_bytes(obj_size)
192        size, peak_size = tracemalloc.get_traced_memory()
193        self.assertGreaterEqual(size, obj_size)
194        self.assertGreaterEqual(peak_size, size)
195
196        self.assertLessEqual(size - obj_size, max_error)
197        self.assertLessEqual(peak_size - size, max_error)
198
199        # destroy the object
200        obj = None
201        size2, peak_size2 = tracemalloc.get_traced_memory()
202        self.assertLess(size2, size)
203        self.assertGreaterEqual(size - size2, obj_size - max_error)
204        self.assertGreaterEqual(peak_size2, peak_size)
205
206        # clear_traces() must reset traced memory counters
207        tracemalloc.clear_traces()
208        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
209
210        # allocate another object
211        obj, obj_traceback = allocate_bytes(obj_size)
212        size, peak_size = tracemalloc.get_traced_memory()
213        self.assertGreaterEqual(size, obj_size)
214
215        # stop() also resets traced memory counters
216        tracemalloc.stop()
217        self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
218
219    def test_clear_traces(self):
220        obj, obj_traceback = allocate_bytes(123)
221        traceback = tracemalloc.get_object_traceback(obj)
222        self.assertIsNotNone(traceback)
223
224        tracemalloc.clear_traces()
225        traceback2 = tracemalloc.get_object_traceback(obj)
226        self.assertIsNone(traceback2)
227
228    def test_is_tracing(self):
229        tracemalloc.stop()
230        self.assertFalse(tracemalloc.is_tracing())
231
232        tracemalloc.start()
233        self.assertTrue(tracemalloc.is_tracing())
234
235    def test_snapshot(self):
236        obj, source = allocate_bytes(123)
237
238        # take a snapshot
239        snapshot = tracemalloc.take_snapshot()
240
241        # write on disk
242        snapshot.dump(support.TESTFN)
243        self.addCleanup(support.unlink, support.TESTFN)
244
245        # load from disk
246        snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
247        self.assertEqual(snapshot2.traces, snapshot.traces)
248
249        # tracemalloc must be tracing memory allocations to take a snapshot
250        tracemalloc.stop()
251        with self.assertRaises(RuntimeError) as cm:
252            tracemalloc.take_snapshot()
253        self.assertEqual(str(cm.exception),
254                         "the tracemalloc module must be tracing memory "
255                         "allocations to take a snapshot")
256
257    def test_snapshot_save_attr(self):
258        # take a snapshot with a new attribute
259        snapshot = tracemalloc.take_snapshot()
260        snapshot.test_attr = "new"
261        snapshot.dump(support.TESTFN)
262        self.addCleanup(support.unlink, support.TESTFN)
263
264        # load() should recreate the attribute
265        snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
266        self.assertEqual(snapshot2.test_attr, "new")
267
268    def fork_child(self):
269        if not tracemalloc.is_tracing():
270            return 2
271
272        obj_size = 12345
273        obj, obj_traceback = allocate_bytes(obj_size)
274        traceback = tracemalloc.get_object_traceback(obj)
275        if traceback is None:
276            return 3
277
278        # everything is fine
279        return 0
280
281    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
282    def test_fork(self):
283        # check that tracemalloc is still working after fork
284        pid = os.fork()
285        if not pid:
286            # child
287            exitcode = 1
288            try:
289                exitcode = self.fork_child()
290            finally:
291                os._exit(exitcode)
292        else:
293            pid2, status = os.waitpid(pid, 0)
294            self.assertTrue(os.WIFEXITED(status))
295            exitcode = os.WEXITSTATUS(status)
296            self.assertEqual(exitcode, 0)
297
298
299class TestSnapshot(unittest.TestCase):
300    maxDiff = 4000
301
302    def test_create_snapshot(self):
303        raw_traces = [(0, 5, (('a.py', 2),))]
304
305        with contextlib.ExitStack() as stack:
306            stack.enter_context(patch.object(tracemalloc, 'is_tracing',
307                                             return_value=True))
308            stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
309                                             return_value=5))
310            stack.enter_context(patch.object(tracemalloc, '_get_traces',
311                                             return_value=raw_traces))
312
313            snapshot = tracemalloc.take_snapshot()
314            self.assertEqual(snapshot.traceback_limit, 5)
315            self.assertEqual(len(snapshot.traces), 1)
316            trace = snapshot.traces[0]
317            self.assertEqual(trace.size, 5)
318            self.assertEqual(len(trace.traceback), 1)
319            self.assertEqual(trace.traceback[0].filename, 'a.py')
320            self.assertEqual(trace.traceback[0].lineno, 2)
321
322    def test_filter_traces(self):
323        snapshot, snapshot2 = create_snapshots()
324        filter1 = tracemalloc.Filter(False, "b.py")
325        filter2 = tracemalloc.Filter(True, "a.py", 2)
326        filter3 = tracemalloc.Filter(True, "a.py", 5)
327
328        original_traces = list(snapshot.traces._traces)
329
330        # exclude b.py
331        snapshot3 = snapshot.filter_traces((filter1,))
332        self.assertEqual(snapshot3.traces._traces, [
333            (0, 10, (('a.py', 2), ('b.py', 4))),
334            (0, 10, (('a.py', 2), ('b.py', 4))),
335            (0, 10, (('a.py', 2), ('b.py', 4))),
336            (1, 2, (('a.py', 5), ('b.py', 4))),
337            (3, 7, (('<unknown>', 0),)),
338        ])
339
340        # filter_traces() must not touch the original snapshot
341        self.assertEqual(snapshot.traces._traces, original_traces)
342
343        # only include two lines of a.py
344        snapshot4 = snapshot3.filter_traces((filter2, filter3))
345        self.assertEqual(snapshot4.traces._traces, [
346            (0, 10, (('a.py', 2), ('b.py', 4))),
347            (0, 10, (('a.py', 2), ('b.py', 4))),
348            (0, 10, (('a.py', 2), ('b.py', 4))),
349            (1, 2, (('a.py', 5), ('b.py', 4))),
350        ])
351
352        # No filter: just duplicate the snapshot
353        snapshot5 = snapshot.filter_traces(())
354        self.assertIsNot(snapshot5, snapshot)
355        self.assertIsNot(snapshot5.traces, snapshot.traces)
356        self.assertEqual(snapshot5.traces, snapshot.traces)
357
358        self.assertRaises(TypeError, snapshot.filter_traces, filter1)
359
360    def test_filter_traces_domain(self):
361        snapshot, snapshot2 = create_snapshots()
362        filter1 = tracemalloc.Filter(False, "a.py", domain=1)
363        filter2 = tracemalloc.Filter(True, "a.py", domain=1)
364
365        original_traces = list(snapshot.traces._traces)
366
367        # exclude a.py of domain 1
368        snapshot3 = snapshot.filter_traces((filter1,))
369        self.assertEqual(snapshot3.traces._traces, [
370            (0, 10, (('a.py', 2), ('b.py', 4))),
371            (0, 10, (('a.py', 2), ('b.py', 4))),
372            (0, 10, (('a.py', 2), ('b.py', 4))),
373            (2, 66, (('b.py', 1),)),
374            (3, 7, (('<unknown>', 0),)),
375        ])
376
377        # include domain 1
378        snapshot3 = snapshot.filter_traces((filter1,))
379        self.assertEqual(snapshot3.traces._traces, [
380            (0, 10, (('a.py', 2), ('b.py', 4))),
381            (0, 10, (('a.py', 2), ('b.py', 4))),
382            (0, 10, (('a.py', 2), ('b.py', 4))),
383            (2, 66, (('b.py', 1),)),
384            (3, 7, (('<unknown>', 0),)),
385        ])
386
387    def test_filter_traces_domain_filter(self):
388        snapshot, snapshot2 = create_snapshots()
389        filter1 = tracemalloc.DomainFilter(False, domain=3)
390        filter2 = tracemalloc.DomainFilter(True, domain=3)
391
392        # exclude domain 2
393        snapshot3 = snapshot.filter_traces((filter1,))
394        self.assertEqual(snapshot3.traces._traces, [
395            (0, 10, (('a.py', 2), ('b.py', 4))),
396            (0, 10, (('a.py', 2), ('b.py', 4))),
397            (0, 10, (('a.py', 2), ('b.py', 4))),
398            (1, 2, (('a.py', 5), ('b.py', 4))),
399            (2, 66, (('b.py', 1),)),
400        ])
401
402        # include domain 2
403        snapshot3 = snapshot.filter_traces((filter2,))
404        self.assertEqual(snapshot3.traces._traces, [
405            (3, 7, (('<unknown>', 0),)),
406        ])
407
408    def test_snapshot_group_by_line(self):
409        snapshot, snapshot2 = create_snapshots()
410        tb_0 = traceback_lineno('<unknown>', 0)
411        tb_a_2 = traceback_lineno('a.py', 2)
412        tb_a_5 = traceback_lineno('a.py', 5)
413        tb_b_1 = traceback_lineno('b.py', 1)
414        tb_c_578 = traceback_lineno('c.py', 578)
415
416        # stats per file and line
417        stats1 = snapshot.statistics('lineno')
418        self.assertEqual(stats1, [
419            tracemalloc.Statistic(tb_b_1, 66, 1),
420            tracemalloc.Statistic(tb_a_2, 30, 3),
421            tracemalloc.Statistic(tb_0, 7, 1),
422            tracemalloc.Statistic(tb_a_5, 2, 1),
423        ])
424
425        # stats per file and line (2)
426        stats2 = snapshot2.statistics('lineno')
427        self.assertEqual(stats2, [
428            tracemalloc.Statistic(tb_a_5, 5002, 2),
429            tracemalloc.Statistic(tb_c_578, 400, 1),
430            tracemalloc.Statistic(tb_a_2, 30, 3),
431        ])
432
433        # stats diff per file and line
434        statistics = snapshot2.compare_to(snapshot, 'lineno')
435        self.assertEqual(statistics, [
436            tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1),
437            tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1),
438            tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1),
439            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
440            tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0),
441        ])
442
443    def test_snapshot_group_by_file(self):
444        snapshot, snapshot2 = create_snapshots()
445        tb_0 = traceback_filename('<unknown>')
446        tb_a = traceback_filename('a.py')
447        tb_b = traceback_filename('b.py')
448        tb_c = traceback_filename('c.py')
449
450        # stats per file
451        stats1 = snapshot.statistics('filename')
452        self.assertEqual(stats1, [
453            tracemalloc.Statistic(tb_b, 66, 1),
454            tracemalloc.Statistic(tb_a, 32, 4),
455            tracemalloc.Statistic(tb_0, 7, 1),
456        ])
457
458        # stats per file (2)
459        stats2 = snapshot2.statistics('filename')
460        self.assertEqual(stats2, [
461            tracemalloc.Statistic(tb_a, 5032, 5),
462            tracemalloc.Statistic(tb_c, 400, 1),
463        ])
464
465        # stats diff per file
466        diff = snapshot2.compare_to(snapshot, 'filename')
467        self.assertEqual(diff, [
468            tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1),
469            tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1),
470            tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1),
471            tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1),
472        ])
473
474    def test_snapshot_group_by_traceback(self):
475        snapshot, snapshot2 = create_snapshots()
476
477        # stats per file
478        tb1 = traceback(('a.py', 2), ('b.py', 4))
479        tb2 = traceback(('a.py', 5), ('b.py', 4))
480        tb3 = traceback(('b.py', 1))
481        tb4 = traceback(('<unknown>', 0))
482        stats1 = snapshot.statistics('traceback')
483        self.assertEqual(stats1, [
484            tracemalloc.Statistic(tb3, 66, 1),
485            tracemalloc.Statistic(tb1, 30, 3),
486            tracemalloc.Statistic(tb4, 7, 1),
487            tracemalloc.Statistic(tb2, 2, 1),
488        ])
489
490        # stats per file (2)
491        tb5 = traceback(('c.py', 578))
492        stats2 = snapshot2.statistics('traceback')
493        self.assertEqual(stats2, [
494            tracemalloc.Statistic(tb2, 5002, 2),
495            tracemalloc.Statistic(tb5, 400, 1),
496            tracemalloc.Statistic(tb1, 30, 3),
497        ])
498
499        # stats diff per file
500        diff = snapshot2.compare_to(snapshot, 'traceback')
501        self.assertEqual(diff, [
502            tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1),
503            tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1),
504            tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1),
505            tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1),
506            tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0),
507        ])
508
509        self.assertRaises(ValueError,
510                          snapshot.statistics, 'traceback', cumulative=True)
511
512    def test_snapshot_group_by_cumulative(self):
513        snapshot, snapshot2 = create_snapshots()
514        tb_0 = traceback_filename('<unknown>')
515        tb_a = traceback_filename('a.py')
516        tb_b = traceback_filename('b.py')
517        tb_a_2 = traceback_lineno('a.py', 2)
518        tb_a_5 = traceback_lineno('a.py', 5)
519        tb_b_1 = traceback_lineno('b.py', 1)
520        tb_b_4 = traceback_lineno('b.py', 4)
521
522        # per file
523        stats = snapshot.statistics('filename', True)
524        self.assertEqual(stats, [
525            tracemalloc.Statistic(tb_b, 98, 5),
526            tracemalloc.Statistic(tb_a, 32, 4),
527            tracemalloc.Statistic(tb_0, 7, 1),
528        ])
529
530        # per line
531        stats = snapshot.statistics('lineno', True)
532        self.assertEqual(stats, [
533            tracemalloc.Statistic(tb_b_1, 66, 1),
534            tracemalloc.Statistic(tb_b_4, 32, 4),
535            tracemalloc.Statistic(tb_a_2, 30, 3),
536            tracemalloc.Statistic(tb_0, 7, 1),
537            tracemalloc.Statistic(tb_a_5, 2, 1),
538        ])
539
540    def test_trace_format(self):
541        snapshot, snapshot2 = create_snapshots()
542        trace = snapshot.traces[0]
543        self.assertEqual(str(trace), 'a.py:2: 10 B')
544        traceback = trace.traceback
545        self.assertEqual(str(traceback), 'a.py:2')
546        frame = traceback[0]
547        self.assertEqual(str(frame), 'a.py:2')
548
549    def test_statistic_format(self):
550        snapshot, snapshot2 = create_snapshots()
551        stats = snapshot.statistics('lineno')
552        stat = stats[0]
553        self.assertEqual(str(stat),
554                         'b.py:1: size=66 B, count=1, average=66 B')
555
556    def test_statistic_diff_format(self):
557        snapshot, snapshot2 = create_snapshots()
558        stats = snapshot2.compare_to(snapshot, 'lineno')
559        stat = stats[0]
560        self.assertEqual(str(stat),
561                         'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B')
562
563    def test_slices(self):
564        snapshot, snapshot2 = create_snapshots()
565        self.assertEqual(snapshot.traces[:2],
566                         (snapshot.traces[0], snapshot.traces[1]))
567
568        traceback = snapshot.traces[0].traceback
569        self.assertEqual(traceback[:2],
570                         (traceback[0], traceback[1]))
571
572    def test_format_traceback(self):
573        snapshot, snapshot2 = create_snapshots()
574        def getline(filename, lineno):
575            return '  <%s, %s>' % (filename, lineno)
576        with unittest.mock.patch('tracemalloc.linecache.getline',
577                                 side_effect=getline):
578            tb = snapshot.traces[0].traceback
579            self.assertEqual(tb.format(),
580                             ['  File "a.py", line 2',
581                              '    <a.py, 2>',
582                              '  File "b.py", line 4',
583                              '    <b.py, 4>'])
584
585            self.assertEqual(tb.format(limit=1),
586                             ['  File "a.py", line 2',
587                              '    <a.py, 2>'])
588
589            self.assertEqual(tb.format(limit=-1),
590                             [])
591
592
593class TestFilters(unittest.TestCase):
594    maxDiff = 2048
595
596    def test_filter_attributes(self):
597        # test default values
598        f = tracemalloc.Filter(True, "abc")
599        self.assertEqual(f.inclusive, True)
600        self.assertEqual(f.filename_pattern, "abc")
601        self.assertIsNone(f.lineno)
602        self.assertEqual(f.all_frames, False)
603
604        # test custom values
605        f = tracemalloc.Filter(False, "test.py", 123, True)
606        self.assertEqual(f.inclusive, False)
607        self.assertEqual(f.filename_pattern, "test.py")
608        self.assertEqual(f.lineno, 123)
609        self.assertEqual(f.all_frames, True)
610
611        # parameters passed by keyword
612        f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True)
613        self.assertEqual(f.inclusive, False)
614        self.assertEqual(f.filename_pattern, "test.py")
615        self.assertEqual(f.lineno, 123)
616        self.assertEqual(f.all_frames, True)
617
618        # read-only attribute
619        self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc")
620
621    def test_filter_match(self):
622        # filter without line number
623        f = tracemalloc.Filter(True, "abc")
624        self.assertTrue(f._match_frame("abc", 0))
625        self.assertTrue(f._match_frame("abc", 5))
626        self.assertTrue(f._match_frame("abc", 10))
627        self.assertFalse(f._match_frame("12356", 0))
628        self.assertFalse(f._match_frame("12356", 5))
629        self.assertFalse(f._match_frame("12356", 10))
630
631        f = tracemalloc.Filter(False, "abc")
632        self.assertFalse(f._match_frame("abc", 0))
633        self.assertFalse(f._match_frame("abc", 5))
634        self.assertFalse(f._match_frame("abc", 10))
635        self.assertTrue(f._match_frame("12356", 0))
636        self.assertTrue(f._match_frame("12356", 5))
637        self.assertTrue(f._match_frame("12356", 10))
638
639        # filter with line number > 0
640        f = tracemalloc.Filter(True, "abc", 5)
641        self.assertFalse(f._match_frame("abc", 0))
642        self.assertTrue(f._match_frame("abc", 5))
643        self.assertFalse(f._match_frame("abc", 10))
644        self.assertFalse(f._match_frame("12356", 0))
645        self.assertFalse(f._match_frame("12356", 5))
646        self.assertFalse(f._match_frame("12356", 10))
647
648        f = tracemalloc.Filter(False, "abc", 5)
649        self.assertTrue(f._match_frame("abc", 0))
650        self.assertFalse(f._match_frame("abc", 5))
651        self.assertTrue(f._match_frame("abc", 10))
652        self.assertTrue(f._match_frame("12356", 0))
653        self.assertTrue(f._match_frame("12356", 5))
654        self.assertTrue(f._match_frame("12356", 10))
655
656        # filter with line number 0
657        f = tracemalloc.Filter(True, "abc", 0)
658        self.assertTrue(f._match_frame("abc", 0))
659        self.assertFalse(f._match_frame("abc", 5))
660        self.assertFalse(f._match_frame("abc", 10))
661        self.assertFalse(f._match_frame("12356", 0))
662        self.assertFalse(f._match_frame("12356", 5))
663        self.assertFalse(f._match_frame("12356", 10))
664
665        f = tracemalloc.Filter(False, "abc", 0)
666        self.assertFalse(f._match_frame("abc", 0))
667        self.assertTrue(f._match_frame("abc", 5))
668        self.assertTrue(f._match_frame("abc", 10))
669        self.assertTrue(f._match_frame("12356", 0))
670        self.assertTrue(f._match_frame("12356", 5))
671        self.assertTrue(f._match_frame("12356", 10))
672
673    def test_filter_match_filename(self):
674        def fnmatch(inclusive, filename, pattern):
675            f = tracemalloc.Filter(inclusive, pattern)
676            return f._match_frame(filename, 0)
677
678        self.assertTrue(fnmatch(True, "abc", "abc"))
679        self.assertFalse(fnmatch(True, "12356", "abc"))
680        self.assertFalse(fnmatch(True, "<unknown>", "abc"))
681
682        self.assertFalse(fnmatch(False, "abc", "abc"))
683        self.assertTrue(fnmatch(False, "12356", "abc"))
684        self.assertTrue(fnmatch(False, "<unknown>", "abc"))
685
686    def test_filter_match_filename_joker(self):
687        def fnmatch(filename, pattern):
688            filter = tracemalloc.Filter(True, pattern)
689            return filter._match_frame(filename, 0)
690
691        # empty string
692        self.assertFalse(fnmatch('abc', ''))
693        self.assertFalse(fnmatch('', 'abc'))
694        self.assertTrue(fnmatch('', ''))
695        self.assertTrue(fnmatch('', '*'))
696
697        # no *
698        self.assertTrue(fnmatch('abc', 'abc'))
699        self.assertFalse(fnmatch('abc', 'abcd'))
700        self.assertFalse(fnmatch('abc', 'def'))
701
702        # a*
703        self.assertTrue(fnmatch('abc', 'a*'))
704        self.assertTrue(fnmatch('abc', 'abc*'))
705        self.assertFalse(fnmatch('abc', 'b*'))
706        self.assertFalse(fnmatch('abc', 'abcd*'))
707
708        # a*b
709        self.assertTrue(fnmatch('abc', 'a*c'))
710        self.assertTrue(fnmatch('abcdcx', 'a*cx'))
711        self.assertFalse(fnmatch('abb', 'a*c'))
712        self.assertFalse(fnmatch('abcdce', 'a*cx'))
713
714        # a*b*c
715        self.assertTrue(fnmatch('abcde', 'a*c*e'))
716        self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg'))
717        self.assertFalse(fnmatch('abcdd', 'a*c*e'))
718        self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg'))
719
720        # replace .pyc suffix with .py
721        self.assertTrue(fnmatch('a.pyc', 'a.py'))
722        self.assertTrue(fnmatch('a.py', 'a.pyc'))
723
724        if os.name == 'nt':
725            # case insensitive
726            self.assertTrue(fnmatch('aBC', 'ABc'))
727            self.assertTrue(fnmatch('aBcDe', 'Ab*dE'))
728
729            self.assertTrue(fnmatch('a.pyc', 'a.PY'))
730            self.assertTrue(fnmatch('a.py', 'a.PYC'))
731        else:
732            # case sensitive
733            self.assertFalse(fnmatch('aBC', 'ABc'))
734            self.assertFalse(fnmatch('aBcDe', 'Ab*dE'))
735
736            self.assertFalse(fnmatch('a.pyc', 'a.PY'))
737            self.assertFalse(fnmatch('a.py', 'a.PYC'))
738
739        if os.name == 'nt':
740            # normalize alternate separator "/" to the standard separator "\"
741            self.assertTrue(fnmatch(r'a/b', r'a\b'))
742            self.assertTrue(fnmatch(r'a\b', r'a/b'))
743            self.assertTrue(fnmatch(r'a/b\c', r'a\b/c'))
744            self.assertTrue(fnmatch(r'a/b/c', r'a\b\c'))
745        else:
746            # there is no alternate separator
747            self.assertFalse(fnmatch(r'a/b', r'a\b'))
748            self.assertFalse(fnmatch(r'a\b', r'a/b'))
749            self.assertFalse(fnmatch(r'a/b\c', r'a\b/c'))
750            self.assertFalse(fnmatch(r'a/b/c', r'a\b\c'))
751
752        # as of 3.5, .pyo is no longer munged to .py
753        self.assertFalse(fnmatch('a.pyo', 'a.py'))
754
755    def test_filter_match_trace(self):
756        t1 = (("a.py", 2), ("b.py", 3))
757        t2 = (("b.py", 4), ("b.py", 5))
758        t3 = (("c.py", 5), ('<unknown>', 0))
759        unknown = (('<unknown>', 0),)
760
761        f = tracemalloc.Filter(True, "b.py", all_frames=True)
762        self.assertTrue(f._match_traceback(t1))
763        self.assertTrue(f._match_traceback(t2))
764        self.assertFalse(f._match_traceback(t3))
765        self.assertFalse(f._match_traceback(unknown))
766
767        f = tracemalloc.Filter(True, "b.py", all_frames=False)
768        self.assertFalse(f._match_traceback(t1))
769        self.assertTrue(f._match_traceback(t2))
770        self.assertFalse(f._match_traceback(t3))
771        self.assertFalse(f._match_traceback(unknown))
772
773        f = tracemalloc.Filter(False, "b.py", all_frames=True)
774        self.assertFalse(f._match_traceback(t1))
775        self.assertFalse(f._match_traceback(t2))
776        self.assertTrue(f._match_traceback(t3))
777        self.assertTrue(f._match_traceback(unknown))
778
779        f = tracemalloc.Filter(False, "b.py", all_frames=False)
780        self.assertTrue(f._match_traceback(t1))
781        self.assertFalse(f._match_traceback(t2))
782        self.assertTrue(f._match_traceback(t3))
783        self.assertTrue(f._match_traceback(unknown))
784
785        f = tracemalloc.Filter(False, "<unknown>", all_frames=False)
786        self.assertTrue(f._match_traceback(t1))
787        self.assertTrue(f._match_traceback(t2))
788        self.assertTrue(f._match_traceback(t3))
789        self.assertFalse(f._match_traceback(unknown))
790
791        f = tracemalloc.Filter(True, "<unknown>", all_frames=True)
792        self.assertFalse(f._match_traceback(t1))
793        self.assertFalse(f._match_traceback(t2))
794        self.assertTrue(f._match_traceback(t3))
795        self.assertTrue(f._match_traceback(unknown))
796
797        f = tracemalloc.Filter(False, "<unknown>", all_frames=True)
798        self.assertTrue(f._match_traceback(t1))
799        self.assertTrue(f._match_traceback(t2))
800        self.assertFalse(f._match_traceback(t3))
801        self.assertFalse(f._match_traceback(unknown))
802
803
804class TestCommandLine(unittest.TestCase):
805    def test_env_var_disabled_by_default(self):
806        # not tracing by default
807        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
808        ok, stdout, stderr = assert_python_ok('-c', code)
809        stdout = stdout.rstrip()
810        self.assertEqual(stdout, b'False')
811
812    @unittest.skipIf(interpreter_requires_environment(),
813                     'Cannot run -E tests when PYTHON env vars are required.')
814    def test_env_var_ignored_with_E(self):
815        """PYTHON* environment variables must be ignored when -E is present."""
816        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
817        ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1')
818        stdout = stdout.rstrip()
819        self.assertEqual(stdout, b'False')
820
821    def test_env_var_enabled_at_startup(self):
822        # tracing at startup
823        code = 'import tracemalloc; print(tracemalloc.is_tracing())'
824        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1')
825        stdout = stdout.rstrip()
826        self.assertEqual(stdout, b'True')
827
828    def test_env_limit(self):
829        # start and set the number of frames
830        code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
831        ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10')
832        stdout = stdout.rstrip()
833        self.assertEqual(stdout, b'10')
834
835    def test_env_var_invalid(self):
836        for nframe in (-1, 0, 2**30):
837            with self.subTest(nframe=nframe):
838                with support.SuppressCrashReport():
839                    ok, stdout, stderr = assert_python_failure(
840                        '-c', 'pass',
841                        PYTHONTRACEMALLOC=str(nframe))
842                    self.assertIn(b'PYTHONTRACEMALLOC: invalid '
843                                  b'number of frames',
844                                  stderr)
845
846    def test_sys_xoptions(self):
847        for xoptions, nframe in (
848            ('tracemalloc', 1),
849            ('tracemalloc=1', 1),
850            ('tracemalloc=15', 15),
851        ):
852            with self.subTest(xoptions=xoptions, nframe=nframe):
853                code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())'
854                ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code)
855                stdout = stdout.rstrip()
856                self.assertEqual(stdout, str(nframe).encode('ascii'))
857
858    def test_sys_xoptions_invalid(self):
859        for nframe in (-1, 0, 2**30):
860            with self.subTest(nframe=nframe):
861                with support.SuppressCrashReport():
862                    args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass')
863                    ok, stdout, stderr = assert_python_failure(*args)
864                    self.assertIn(b'-X tracemalloc=NFRAME: invalid '
865                                  b'number of frames',
866                                  stderr)
867
868    def test_pymem_alloc0(self):
869        # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled
870        # does not crash.
871        code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1'
872        assert_python_ok('-X', 'tracemalloc', '-c', code)
873
874
875@unittest.skipIf(_testcapi is None, 'need _testcapi')
876class TestCAPI(unittest.TestCase):
877    maxDiff = 80 * 20
878
879    def setUp(self):
880        if tracemalloc.is_tracing():
881            self.skipTest("tracemalloc must be stopped before the test")
882
883        self.domain = 5
884        self.size = 123
885        self.obj = allocate_bytes(self.size)[0]
886
887        # for the type "object", id(obj) is the address of its memory block.
888        # This type is not tracked by the garbage collector
889        self.ptr = id(self.obj)
890
891    def tearDown(self):
892        tracemalloc.stop()
893
894    def get_traceback(self):
895        frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr)
896        if frames is not None:
897            return tracemalloc.Traceback(frames)
898        else:
899            return None
900
901    def track(self, release_gil=False, nframe=1):
902        frames = get_frames(nframe, 2)
903        _testcapi.tracemalloc_track(self.domain, self.ptr, self.size,
904                                    release_gil)
905        return frames
906
907    def untrack(self):
908        _testcapi.tracemalloc_untrack(self.domain, self.ptr)
909
910    def get_traced_memory(self):
911        # Get the traced size in the domain
912        snapshot = tracemalloc.take_snapshot()
913        domain_filter = tracemalloc.DomainFilter(True, self.domain)
914        snapshot = snapshot.filter_traces([domain_filter])
915        return sum(trace.size for trace in snapshot.traces)
916
917    def check_track(self, release_gil):
918        nframe = 5
919        tracemalloc.start(nframe)
920
921        size = tracemalloc.get_traced_memory()[0]
922
923        frames = self.track(release_gil, nframe)
924        self.assertEqual(self.get_traceback(),
925                         tracemalloc.Traceback(frames))
926
927        self.assertEqual(self.get_traced_memory(), self.size)
928
929    def test_track(self):
930        self.check_track(False)
931
932    def test_track_without_gil(self):
933        # check that calling _PyTraceMalloc_Track() without holding the GIL
934        # works too
935        self.check_track(True)
936
937    def test_track_already_tracked(self):
938        nframe = 5
939        tracemalloc.start(nframe)
940
941        # track a first time
942        self.track()
943
944        # calling _PyTraceMalloc_Track() must remove the old trace and add
945        # a new trace with the new traceback
946        frames = self.track(nframe=nframe)
947        self.assertEqual(self.get_traceback(),
948                         tracemalloc.Traceback(frames))
949
950    def test_untrack(self):
951        tracemalloc.start()
952
953        self.track()
954        self.assertIsNotNone(self.get_traceback())
955        self.assertEqual(self.get_traced_memory(), self.size)
956
957        # untrack must remove the trace
958        self.untrack()
959        self.assertIsNone(self.get_traceback())
960        self.assertEqual(self.get_traced_memory(), 0)
961
962        # calling _PyTraceMalloc_Untrack() multiple times must not crash
963        self.untrack()
964        self.untrack()
965
966    def test_stop_track(self):
967        tracemalloc.start()
968        tracemalloc.stop()
969
970        with self.assertRaises(RuntimeError):
971            self.track()
972        self.assertIsNone(self.get_traceback())
973
974    def test_stop_untrack(self):
975        tracemalloc.start()
976        self.track()
977
978        tracemalloc.stop()
979        with self.assertRaises(RuntimeError):
980            self.untrack()
981
982
983def test_main():
984    support.run_unittest(
985        TestTracemallocEnabled,
986        TestSnapshot,
987        TestFilters,
988        TestCommandLine,
989        TestCAPI,
990    )
991
992if __name__ == "__main__":
993    test_main()
994