1import contextlib 2import os 3import sys 4import tracemalloc 5import unittest 6from unittest.mock import patch 7from test.support.script_helper import (assert_python_ok, assert_python_failure, 8 interpreter_requires_environment) 9from test import support 10from test.support import os_helper 11from test.support import force_not_colorized 12 13try: 14 import _testcapi 15 import _testinternalcapi 16except ImportError: 17 _testcapi = None 18 _testinternalcapi = None 19 20 21EMPTY_STRING_SIZE = sys.getsizeof(b'') 22INVALID_NFRAME = (-1, 2**30) 23 24 25def get_frames(nframe, lineno_delta): 26 frames = [] 27 frame = sys._getframe(1) 28 for index in range(nframe): 29 code = frame.f_code 30 lineno = frame.f_lineno + lineno_delta 31 frames.append((code.co_filename, lineno)) 32 lineno_delta = 0 33 frame = frame.f_back 34 if frame is None: 35 break 36 return tuple(frames) 37 38def allocate_bytes(size): 39 nframe = tracemalloc.get_traceback_limit() 40 bytes_len = (size - EMPTY_STRING_SIZE) 41 frames = get_frames(nframe, 1) 42 data = b'x' * bytes_len 43 return data, tracemalloc.Traceback(frames, min(len(frames), nframe)) 44 45def create_snapshots(): 46 traceback_limit = 2 47 48 # _tracemalloc._get_traces() returns a list of (domain, size, 49 # traceback_frames) tuples. traceback_frames is a tuple of (filename, 50 # line_number) tuples. 51 raw_traces = [ 52 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 53 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 54 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 55 56 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 57 58 (2, 66, (('b.py', 1),), 1), 59 60 (3, 7, (('<unknown>', 0),), 1), 61 ] 62 snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) 63 64 raw_traces2 = [ 65 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 66 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 67 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 68 69 (2, 2, (('a.py', 5), ('b.py', 4)), 3), 70 (2, 5000, (('a.py', 5), ('b.py', 4)), 3), 71 72 (4, 400, (('c.py', 578),), 1), 73 ] 74 snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) 75 76 return (snapshot, snapshot2) 77 78def frame(filename, lineno): 79 return tracemalloc._Frame((filename, lineno)) 80 81def traceback(*frames): 82 return tracemalloc.Traceback(frames) 83 84def traceback_lineno(filename, lineno): 85 return traceback((filename, lineno)) 86 87def traceback_filename(filename): 88 return traceback_lineno(filename, 0) 89 90 91class TestTraceback(unittest.TestCase): 92 def test_repr(self): 93 def get_repr(*args) -> str: 94 return repr(tracemalloc.Traceback(*args)) 95 96 self.assertEqual(get_repr(()), "<Traceback ()>") 97 self.assertEqual(get_repr((), 0), "<Traceback () total_nframe=0>") 98 99 frames = (("f1", 1), ("f2", 2)) 100 exp_repr_frames = ( 101 "(<Frame filename='f2' lineno=2>," 102 " <Frame filename='f1' lineno=1>)" 103 ) 104 self.assertEqual(get_repr(frames), 105 f"<Traceback {exp_repr_frames}>") 106 self.assertEqual(get_repr(frames, 2), 107 f"<Traceback {exp_repr_frames} total_nframe=2>") 108 109 110class TestTracemallocEnabled(unittest.TestCase): 111 def setUp(self): 112 if tracemalloc.is_tracing(): 113 self.skipTest("tracemalloc must be stopped before the test") 114 115 tracemalloc.start(1) 116 117 def tearDown(self): 118 tracemalloc.stop() 119 120 def test_get_tracemalloc_memory(self): 121 data = [allocate_bytes(123) for count in range(1000)] 122 size = tracemalloc.get_tracemalloc_memory() 123 self.assertGreaterEqual(size, 0) 124 125 tracemalloc.clear_traces() 126 size2 = tracemalloc.get_tracemalloc_memory() 127 self.assertGreaterEqual(size2, 0) 128 self.assertLessEqual(size2, size) 129 130 def test_get_object_traceback(self): 131 tracemalloc.clear_traces() 132 obj_size = 12345 133 obj, obj_traceback = allocate_bytes(obj_size) 134 traceback = tracemalloc.get_object_traceback(obj) 135 self.assertEqual(traceback, obj_traceback) 136 137 def test_new_reference(self): 138 tracemalloc.clear_traces() 139 # gc.collect() indirectly calls PyList_ClearFreeList() 140 support.gc_collect() 141 142 # Create a list and "destroy it": put it in the PyListObject free list 143 obj = [] 144 obj = None 145 146 # Create a list which should reuse the previously created empty list 147 obj = [] 148 149 nframe = tracemalloc.get_traceback_limit() 150 frames = get_frames(nframe, -3) 151 obj_traceback = tracemalloc.Traceback(frames, min(len(frames), nframe)) 152 153 traceback = tracemalloc.get_object_traceback(obj) 154 self.assertIsNotNone(traceback) 155 self.assertEqual(traceback, obj_traceback) 156 157 def test_set_traceback_limit(self): 158 obj_size = 10 159 160 tracemalloc.stop() 161 self.assertRaises(ValueError, tracemalloc.start, -1) 162 163 tracemalloc.stop() 164 tracemalloc.start(10) 165 obj2, obj2_traceback = allocate_bytes(obj_size) 166 traceback = tracemalloc.get_object_traceback(obj2) 167 self.assertEqual(len(traceback), 10) 168 self.assertEqual(traceback, obj2_traceback) 169 170 tracemalloc.stop() 171 tracemalloc.start(1) 172 obj, obj_traceback = allocate_bytes(obj_size) 173 traceback = tracemalloc.get_object_traceback(obj) 174 self.assertEqual(len(traceback), 1) 175 self.assertEqual(traceback, obj_traceback) 176 177 def find_trace(self, traces, traceback, size): 178 # filter also by size to ignore the memory allocated by 179 # _PyRefchain_Trace() if Python is built with Py_TRACE_REFS. 180 for trace in traces: 181 if trace[2] == traceback._frames and trace[1] == size: 182 return trace 183 184 self.fail("trace not found") 185 186 def test_get_traces(self): 187 tracemalloc.clear_traces() 188 obj_size = 12345 189 obj, obj_traceback = allocate_bytes(obj_size) 190 191 traces = tracemalloc._get_traces() 192 trace = self.find_trace(traces, obj_traceback, obj_size) 193 194 self.assertIsInstance(trace, tuple) 195 domain, size, traceback, length = trace 196 self.assertEqual(traceback, obj_traceback._frames) 197 198 tracemalloc.stop() 199 self.assertEqual(tracemalloc._get_traces(), []) 200 201 def test_get_traces_intern_traceback(self): 202 # dummy wrappers to get more useful and identical frames in the traceback 203 def allocate_bytes2(size): 204 return allocate_bytes(size) 205 def allocate_bytes3(size): 206 return allocate_bytes2(size) 207 def allocate_bytes4(size): 208 return allocate_bytes3(size) 209 210 # Ensure that two identical tracebacks are not duplicated 211 tracemalloc.stop() 212 tracemalloc.start(4) 213 obj1_size = 123 214 obj2_size = 125 215 obj1, obj1_traceback = allocate_bytes4(obj1_size) 216 obj2, obj2_traceback = allocate_bytes4(obj2_size) 217 218 traces = tracemalloc._get_traces() 219 220 obj1_traceback._frames = tuple(reversed(obj1_traceback._frames)) 221 obj2_traceback._frames = tuple(reversed(obj2_traceback._frames)) 222 223 trace1 = self.find_trace(traces, obj1_traceback, obj1_size) 224 trace2 = self.find_trace(traces, obj2_traceback, obj2_size) 225 domain1, size1, traceback1, length1 = trace1 226 domain2, size2, traceback2, length2 = trace2 227 self.assertIs(traceback2, traceback1) 228 229 def test_get_traced_memory(self): 230 # Python allocates some internals objects, so the test must tolerate 231 # a small difference between the expected size and the real usage 232 max_error = 2048 233 234 # allocate one object 235 obj_size = 1024 * 1024 236 tracemalloc.clear_traces() 237 obj, obj_traceback = allocate_bytes(obj_size) 238 size, peak_size = tracemalloc.get_traced_memory() 239 self.assertGreaterEqual(size, obj_size) 240 self.assertGreaterEqual(peak_size, size) 241 242 self.assertLessEqual(size - obj_size, max_error) 243 self.assertLessEqual(peak_size - size, max_error) 244 245 # destroy the object 246 obj = None 247 size2, peak_size2 = tracemalloc.get_traced_memory() 248 self.assertLess(size2, size) 249 self.assertGreaterEqual(size - size2, obj_size - max_error) 250 self.assertGreaterEqual(peak_size2, peak_size) 251 252 # clear_traces() must reset traced memory counters 253 tracemalloc.clear_traces() 254 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 255 256 # allocate another object 257 obj, obj_traceback = allocate_bytes(obj_size) 258 size, peak_size = tracemalloc.get_traced_memory() 259 self.assertGreaterEqual(size, obj_size) 260 261 # stop() also resets traced memory counters 262 tracemalloc.stop() 263 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 264 265 def test_clear_traces(self): 266 obj, obj_traceback = allocate_bytes(123) 267 traceback = tracemalloc.get_object_traceback(obj) 268 self.assertIsNotNone(traceback) 269 270 tracemalloc.clear_traces() 271 traceback2 = tracemalloc.get_object_traceback(obj) 272 self.assertIsNone(traceback2) 273 274 def test_reset_peak(self): 275 # Python allocates some internals objects, so the test must tolerate 276 # a small difference between the expected size and the real usage 277 tracemalloc.clear_traces() 278 279 # Example: allocate a large piece of memory, temporarily 280 large_sum = sum(list(range(100000))) 281 size1, peak1 = tracemalloc.get_traced_memory() 282 283 # reset_peak() resets peak to traced memory: peak2 < peak1 284 tracemalloc.reset_peak() 285 size2, peak2 = tracemalloc.get_traced_memory() 286 self.assertGreaterEqual(peak2, size2) 287 self.assertLess(peak2, peak1) 288 289 # check that peak continue to be updated if new memory is allocated: 290 # peak3 > peak2 291 obj_size = 1024 * 1024 292 obj, obj_traceback = allocate_bytes(obj_size) 293 size3, peak3 = tracemalloc.get_traced_memory() 294 self.assertGreaterEqual(peak3, size3) 295 self.assertGreater(peak3, peak2) 296 self.assertGreaterEqual(peak3 - peak2, obj_size) 297 298 def test_is_tracing(self): 299 tracemalloc.stop() 300 self.assertFalse(tracemalloc.is_tracing()) 301 302 tracemalloc.start() 303 self.assertTrue(tracemalloc.is_tracing()) 304 305 def test_snapshot(self): 306 obj, source = allocate_bytes(123) 307 308 # take a snapshot 309 snapshot = tracemalloc.take_snapshot() 310 311 # This can vary 312 self.assertGreater(snapshot.traces[1].traceback.total_nframe, 10) 313 314 # write on disk 315 snapshot.dump(os_helper.TESTFN) 316 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 317 318 # load from disk 319 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) 320 self.assertEqual(snapshot2.traces, snapshot.traces) 321 322 # tracemalloc must be tracing memory allocations to take a snapshot 323 tracemalloc.stop() 324 with self.assertRaises(RuntimeError) as cm: 325 tracemalloc.take_snapshot() 326 self.assertEqual(str(cm.exception), 327 "the tracemalloc module must be tracing memory " 328 "allocations to take a snapshot") 329 330 def test_snapshot_save_attr(self): 331 # take a snapshot with a new attribute 332 snapshot = tracemalloc.take_snapshot() 333 snapshot.test_attr = "new" 334 snapshot.dump(os_helper.TESTFN) 335 self.addCleanup(os_helper.unlink, os_helper.TESTFN) 336 337 # load() should recreate the attribute 338 snapshot2 = tracemalloc.Snapshot.load(os_helper.TESTFN) 339 self.assertEqual(snapshot2.test_attr, "new") 340 341 def fork_child(self): 342 if not tracemalloc.is_tracing(): 343 return 2 344 345 obj_size = 12345 346 obj, obj_traceback = allocate_bytes(obj_size) 347 traceback = tracemalloc.get_object_traceback(obj) 348 if traceback is None: 349 return 3 350 351 # everything is fine 352 return 0 353 354 @support.requires_fork() 355 def test_fork(self): 356 # check that tracemalloc is still working after fork 357 pid = os.fork() 358 if not pid: 359 # child 360 exitcode = 1 361 try: 362 exitcode = self.fork_child() 363 finally: 364 os._exit(exitcode) 365 else: 366 support.wait_process(pid, exitcode=0) 367 368 def test_no_incomplete_frames(self): 369 tracemalloc.stop() 370 tracemalloc.start(8) 371 372 def f(x): 373 def g(): 374 return x 375 return g 376 377 obj = f(0).__closure__[0] 378 traceback = tracemalloc.get_object_traceback(obj) 379 self.assertIn("test_tracemalloc", traceback[-1].filename) 380 self.assertNotIn("test_tracemalloc", traceback[-2].filename) 381 382 383class TestSnapshot(unittest.TestCase): 384 maxDiff = 4000 385 386 def test_create_snapshot(self): 387 raw_traces = [(0, 5, (('a.py', 2),), 10)] 388 389 with contextlib.ExitStack() as stack: 390 stack.enter_context(patch.object(tracemalloc, 'is_tracing', 391 return_value=True)) 392 stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', 393 return_value=5)) 394 stack.enter_context(patch.object(tracemalloc, '_get_traces', 395 return_value=raw_traces)) 396 397 snapshot = tracemalloc.take_snapshot() 398 self.assertEqual(snapshot.traceback_limit, 5) 399 self.assertEqual(len(snapshot.traces), 1) 400 trace = snapshot.traces[0] 401 self.assertEqual(trace.size, 5) 402 self.assertEqual(trace.traceback.total_nframe, 10) 403 self.assertEqual(len(trace.traceback), 1) 404 self.assertEqual(trace.traceback[0].filename, 'a.py') 405 self.assertEqual(trace.traceback[0].lineno, 2) 406 407 def test_filter_traces(self): 408 snapshot, snapshot2 = create_snapshots() 409 filter1 = tracemalloc.Filter(False, "b.py") 410 filter2 = tracemalloc.Filter(True, "a.py", 2) 411 filter3 = tracemalloc.Filter(True, "a.py", 5) 412 413 original_traces = list(snapshot.traces._traces) 414 415 # exclude b.py 416 snapshot3 = snapshot.filter_traces((filter1,)) 417 self.assertEqual(snapshot3.traces._traces, [ 418 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 419 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 420 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 421 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 422 (3, 7, (('<unknown>', 0),), 1), 423 ]) 424 425 # filter_traces() must not touch the original snapshot 426 self.assertEqual(snapshot.traces._traces, original_traces) 427 428 # only include two lines of a.py 429 snapshot4 = snapshot3.filter_traces((filter2, filter3)) 430 self.assertEqual(snapshot4.traces._traces, [ 431 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 432 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 433 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 434 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 435 ]) 436 437 # No filter: just duplicate the snapshot 438 snapshot5 = snapshot.filter_traces(()) 439 self.assertIsNot(snapshot5, snapshot) 440 self.assertIsNot(snapshot5.traces, snapshot.traces) 441 self.assertEqual(snapshot5.traces, snapshot.traces) 442 443 self.assertRaises(TypeError, snapshot.filter_traces, filter1) 444 445 def test_filter_traces_domain(self): 446 snapshot, snapshot2 = create_snapshots() 447 filter1 = tracemalloc.Filter(False, "a.py", domain=1) 448 filter2 = tracemalloc.Filter(True, "a.py", domain=1) 449 450 original_traces = list(snapshot.traces._traces) 451 452 # exclude a.py of domain 1 453 snapshot3 = snapshot.filter_traces((filter1,)) 454 self.assertEqual(snapshot3.traces._traces, [ 455 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 456 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 457 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 458 (2, 66, (('b.py', 1),), 1), 459 (3, 7, (('<unknown>', 0),), 1), 460 ]) 461 462 # include domain 1 463 snapshot3 = snapshot.filter_traces((filter1,)) 464 self.assertEqual(snapshot3.traces._traces, [ 465 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 466 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 467 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 468 (2, 66, (('b.py', 1),), 1), 469 (3, 7, (('<unknown>', 0),), 1), 470 ]) 471 472 def test_filter_traces_domain_filter(self): 473 snapshot, snapshot2 = create_snapshots() 474 filter1 = tracemalloc.DomainFilter(False, domain=3) 475 filter2 = tracemalloc.DomainFilter(True, domain=3) 476 477 # exclude domain 2 478 snapshot3 = snapshot.filter_traces((filter1,)) 479 self.assertEqual(snapshot3.traces._traces, [ 480 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 481 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 482 (0, 10, (('a.py', 2), ('b.py', 4)), 3), 483 (1, 2, (('a.py', 5), ('b.py', 4)), 3), 484 (2, 66, (('b.py', 1),), 1), 485 ]) 486 487 # include domain 2 488 snapshot3 = snapshot.filter_traces((filter2,)) 489 self.assertEqual(snapshot3.traces._traces, [ 490 (3, 7, (('<unknown>', 0),), 1), 491 ]) 492 493 def test_snapshot_group_by_line(self): 494 snapshot, snapshot2 = create_snapshots() 495 tb_0 = traceback_lineno('<unknown>', 0) 496 tb_a_2 = traceback_lineno('a.py', 2) 497 tb_a_5 = traceback_lineno('a.py', 5) 498 tb_b_1 = traceback_lineno('b.py', 1) 499 tb_c_578 = traceback_lineno('c.py', 578) 500 501 # stats per file and line 502 stats1 = snapshot.statistics('lineno') 503 self.assertEqual(stats1, [ 504 tracemalloc.Statistic(tb_b_1, 66, 1), 505 tracemalloc.Statistic(tb_a_2, 30, 3), 506 tracemalloc.Statistic(tb_0, 7, 1), 507 tracemalloc.Statistic(tb_a_5, 2, 1), 508 ]) 509 510 # stats per file and line (2) 511 stats2 = snapshot2.statistics('lineno') 512 self.assertEqual(stats2, [ 513 tracemalloc.Statistic(tb_a_5, 5002, 2), 514 tracemalloc.Statistic(tb_c_578, 400, 1), 515 tracemalloc.Statistic(tb_a_2, 30, 3), 516 ]) 517 518 # stats diff per file and line 519 statistics = snapshot2.compare_to(snapshot, 'lineno') 520 self.assertEqual(statistics, [ 521 tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), 522 tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), 523 tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), 524 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 525 tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), 526 ]) 527 528 def test_snapshot_group_by_file(self): 529 snapshot, snapshot2 = create_snapshots() 530 tb_0 = traceback_filename('<unknown>') 531 tb_a = traceback_filename('a.py') 532 tb_b = traceback_filename('b.py') 533 tb_c = traceback_filename('c.py') 534 535 # stats per file 536 stats1 = snapshot.statistics('filename') 537 self.assertEqual(stats1, [ 538 tracemalloc.Statistic(tb_b, 66, 1), 539 tracemalloc.Statistic(tb_a, 32, 4), 540 tracemalloc.Statistic(tb_0, 7, 1), 541 ]) 542 543 # stats per file (2) 544 stats2 = snapshot2.statistics('filename') 545 self.assertEqual(stats2, [ 546 tracemalloc.Statistic(tb_a, 5032, 5), 547 tracemalloc.Statistic(tb_c, 400, 1), 548 ]) 549 550 # stats diff per file 551 diff = snapshot2.compare_to(snapshot, 'filename') 552 self.assertEqual(diff, [ 553 tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), 554 tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), 555 tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), 556 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 557 ]) 558 559 def test_snapshot_group_by_traceback(self): 560 snapshot, snapshot2 = create_snapshots() 561 562 # stats per file 563 tb1 = traceback(('a.py', 2), ('b.py', 4)) 564 tb2 = traceback(('a.py', 5), ('b.py', 4)) 565 tb3 = traceback(('b.py', 1)) 566 tb4 = traceback(('<unknown>', 0)) 567 stats1 = snapshot.statistics('traceback') 568 self.assertEqual(stats1, [ 569 tracemalloc.Statistic(tb3, 66, 1), 570 tracemalloc.Statistic(tb1, 30, 3), 571 tracemalloc.Statistic(tb4, 7, 1), 572 tracemalloc.Statistic(tb2, 2, 1), 573 ]) 574 575 # stats per file (2) 576 tb5 = traceback(('c.py', 578)) 577 stats2 = snapshot2.statistics('traceback') 578 self.assertEqual(stats2, [ 579 tracemalloc.Statistic(tb2, 5002, 2), 580 tracemalloc.Statistic(tb5, 400, 1), 581 tracemalloc.Statistic(tb1, 30, 3), 582 ]) 583 584 # stats diff per file 585 diff = snapshot2.compare_to(snapshot, 'traceback') 586 self.assertEqual(diff, [ 587 tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), 588 tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), 589 tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), 590 tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), 591 tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), 592 ]) 593 594 self.assertRaises(ValueError, 595 snapshot.statistics, 'traceback', cumulative=True) 596 597 def test_snapshot_group_by_cumulative(self): 598 snapshot, snapshot2 = create_snapshots() 599 tb_0 = traceback_filename('<unknown>') 600 tb_a = traceback_filename('a.py') 601 tb_b = traceback_filename('b.py') 602 tb_a_2 = traceback_lineno('a.py', 2) 603 tb_a_5 = traceback_lineno('a.py', 5) 604 tb_b_1 = traceback_lineno('b.py', 1) 605 tb_b_4 = traceback_lineno('b.py', 4) 606 607 # per file 608 stats = snapshot.statistics('filename', True) 609 self.assertEqual(stats, [ 610 tracemalloc.Statistic(tb_b, 98, 5), 611 tracemalloc.Statistic(tb_a, 32, 4), 612 tracemalloc.Statistic(tb_0, 7, 1), 613 ]) 614 615 # per line 616 stats = snapshot.statistics('lineno', True) 617 self.assertEqual(stats, [ 618 tracemalloc.Statistic(tb_b_1, 66, 1), 619 tracemalloc.Statistic(tb_b_4, 32, 4), 620 tracemalloc.Statistic(tb_a_2, 30, 3), 621 tracemalloc.Statistic(tb_0, 7, 1), 622 tracemalloc.Statistic(tb_a_5, 2, 1), 623 ]) 624 625 def test_trace_format(self): 626 snapshot, snapshot2 = create_snapshots() 627 trace = snapshot.traces[0] 628 self.assertEqual(str(trace), 'b.py:4: 10 B') 629 traceback = trace.traceback 630 self.assertEqual(str(traceback), 'b.py:4') 631 frame = traceback[0] 632 self.assertEqual(str(frame), 'b.py:4') 633 634 def test_statistic_format(self): 635 snapshot, snapshot2 = create_snapshots() 636 stats = snapshot.statistics('lineno') 637 stat = stats[0] 638 self.assertEqual(str(stat), 639 'b.py:1: size=66 B, count=1, average=66 B') 640 641 def test_statistic_diff_format(self): 642 snapshot, snapshot2 = create_snapshots() 643 stats = snapshot2.compare_to(snapshot, 'lineno') 644 stat = stats[0] 645 self.assertEqual(str(stat), 646 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') 647 648 def test_slices(self): 649 snapshot, snapshot2 = create_snapshots() 650 self.assertEqual(snapshot.traces[:2], 651 (snapshot.traces[0], snapshot.traces[1])) 652 653 traceback = snapshot.traces[0].traceback 654 self.assertEqual(traceback[:2], 655 (traceback[0], traceback[1])) 656 657 def test_format_traceback(self): 658 snapshot, snapshot2 = create_snapshots() 659 def getline(filename, lineno): 660 return ' <%s, %s>' % (filename, lineno) 661 with unittest.mock.patch('tracemalloc.linecache.getline', 662 side_effect=getline): 663 tb = snapshot.traces[0].traceback 664 self.assertEqual(tb.format(), 665 [' File "b.py", line 4', 666 ' <b.py, 4>', 667 ' File "a.py", line 2', 668 ' <a.py, 2>']) 669 670 self.assertEqual(tb.format(limit=1), 671 [' File "a.py", line 2', 672 ' <a.py, 2>']) 673 674 self.assertEqual(tb.format(limit=-1), 675 [' File "b.py", line 4', 676 ' <b.py, 4>']) 677 678 self.assertEqual(tb.format(most_recent_first=True), 679 [' File "a.py", line 2', 680 ' <a.py, 2>', 681 ' File "b.py", line 4', 682 ' <b.py, 4>']) 683 684 self.assertEqual(tb.format(limit=1, most_recent_first=True), 685 [' File "a.py", line 2', 686 ' <a.py, 2>']) 687 688 self.assertEqual(tb.format(limit=-1, most_recent_first=True), 689 [' File "b.py", line 4', 690 ' <b.py, 4>']) 691 692 693class TestFilters(unittest.TestCase): 694 maxDiff = 2048 695 696 def test_filter_attributes(self): 697 # test default values 698 f = tracemalloc.Filter(True, "abc") 699 self.assertEqual(f.inclusive, True) 700 self.assertEqual(f.filename_pattern, "abc") 701 self.assertIsNone(f.lineno) 702 self.assertEqual(f.all_frames, False) 703 704 # test custom values 705 f = tracemalloc.Filter(False, "test.py", 123, True) 706 self.assertEqual(f.inclusive, False) 707 self.assertEqual(f.filename_pattern, "test.py") 708 self.assertEqual(f.lineno, 123) 709 self.assertEqual(f.all_frames, True) 710 711 # parameters passed by keyword 712 f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) 713 self.assertEqual(f.inclusive, False) 714 self.assertEqual(f.filename_pattern, "test.py") 715 self.assertEqual(f.lineno, 123) 716 self.assertEqual(f.all_frames, True) 717 718 # read-only attribute 719 self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") 720 721 def test_filter_match(self): 722 # filter without line number 723 f = tracemalloc.Filter(True, "abc") 724 self.assertTrue(f._match_frame("abc", 0)) 725 self.assertTrue(f._match_frame("abc", 5)) 726 self.assertTrue(f._match_frame("abc", 10)) 727 self.assertFalse(f._match_frame("12356", 0)) 728 self.assertFalse(f._match_frame("12356", 5)) 729 self.assertFalse(f._match_frame("12356", 10)) 730 731 f = tracemalloc.Filter(False, "abc") 732 self.assertFalse(f._match_frame("abc", 0)) 733 self.assertFalse(f._match_frame("abc", 5)) 734 self.assertFalse(f._match_frame("abc", 10)) 735 self.assertTrue(f._match_frame("12356", 0)) 736 self.assertTrue(f._match_frame("12356", 5)) 737 self.assertTrue(f._match_frame("12356", 10)) 738 739 # filter with line number > 0 740 f = tracemalloc.Filter(True, "abc", 5) 741 self.assertFalse(f._match_frame("abc", 0)) 742 self.assertTrue(f._match_frame("abc", 5)) 743 self.assertFalse(f._match_frame("abc", 10)) 744 self.assertFalse(f._match_frame("12356", 0)) 745 self.assertFalse(f._match_frame("12356", 5)) 746 self.assertFalse(f._match_frame("12356", 10)) 747 748 f = tracemalloc.Filter(False, "abc", 5) 749 self.assertTrue(f._match_frame("abc", 0)) 750 self.assertFalse(f._match_frame("abc", 5)) 751 self.assertTrue(f._match_frame("abc", 10)) 752 self.assertTrue(f._match_frame("12356", 0)) 753 self.assertTrue(f._match_frame("12356", 5)) 754 self.assertTrue(f._match_frame("12356", 10)) 755 756 # filter with line number 0 757 f = tracemalloc.Filter(True, "abc", 0) 758 self.assertTrue(f._match_frame("abc", 0)) 759 self.assertFalse(f._match_frame("abc", 5)) 760 self.assertFalse(f._match_frame("abc", 10)) 761 self.assertFalse(f._match_frame("12356", 0)) 762 self.assertFalse(f._match_frame("12356", 5)) 763 self.assertFalse(f._match_frame("12356", 10)) 764 765 f = tracemalloc.Filter(False, "abc", 0) 766 self.assertFalse(f._match_frame("abc", 0)) 767 self.assertTrue(f._match_frame("abc", 5)) 768 self.assertTrue(f._match_frame("abc", 10)) 769 self.assertTrue(f._match_frame("12356", 0)) 770 self.assertTrue(f._match_frame("12356", 5)) 771 self.assertTrue(f._match_frame("12356", 10)) 772 773 def test_filter_match_filename(self): 774 def fnmatch(inclusive, filename, pattern): 775 f = tracemalloc.Filter(inclusive, pattern) 776 return f._match_frame(filename, 0) 777 778 self.assertTrue(fnmatch(True, "abc", "abc")) 779 self.assertFalse(fnmatch(True, "12356", "abc")) 780 self.assertFalse(fnmatch(True, "<unknown>", "abc")) 781 782 self.assertFalse(fnmatch(False, "abc", "abc")) 783 self.assertTrue(fnmatch(False, "12356", "abc")) 784 self.assertTrue(fnmatch(False, "<unknown>", "abc")) 785 786 def test_filter_match_filename_joker(self): 787 def fnmatch(filename, pattern): 788 filter = tracemalloc.Filter(True, pattern) 789 return filter._match_frame(filename, 0) 790 791 # empty string 792 self.assertFalse(fnmatch('abc', '')) 793 self.assertFalse(fnmatch('', 'abc')) 794 self.assertTrue(fnmatch('', '')) 795 self.assertTrue(fnmatch('', '*')) 796 797 # no * 798 self.assertTrue(fnmatch('abc', 'abc')) 799 self.assertFalse(fnmatch('abc', 'abcd')) 800 self.assertFalse(fnmatch('abc', 'def')) 801 802 # a* 803 self.assertTrue(fnmatch('abc', 'a*')) 804 self.assertTrue(fnmatch('abc', 'abc*')) 805 self.assertFalse(fnmatch('abc', 'b*')) 806 self.assertFalse(fnmatch('abc', 'abcd*')) 807 808 # a*b 809 self.assertTrue(fnmatch('abc', 'a*c')) 810 self.assertTrue(fnmatch('abcdcx', 'a*cx')) 811 self.assertFalse(fnmatch('abb', 'a*c')) 812 self.assertFalse(fnmatch('abcdce', 'a*cx')) 813 814 # a*b*c 815 self.assertTrue(fnmatch('abcde', 'a*c*e')) 816 self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) 817 self.assertFalse(fnmatch('abcdd', 'a*c*e')) 818 self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) 819 820 # replace .pyc suffix with .py 821 self.assertTrue(fnmatch('a.pyc', 'a.py')) 822 self.assertTrue(fnmatch('a.py', 'a.pyc')) 823 824 if os.name == 'nt': 825 # case insensitive 826 self.assertTrue(fnmatch('aBC', 'ABc')) 827 self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) 828 829 self.assertTrue(fnmatch('a.pyc', 'a.PY')) 830 self.assertTrue(fnmatch('a.py', 'a.PYC')) 831 else: 832 # case sensitive 833 self.assertFalse(fnmatch('aBC', 'ABc')) 834 self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) 835 836 self.assertFalse(fnmatch('a.pyc', 'a.PY')) 837 self.assertFalse(fnmatch('a.py', 'a.PYC')) 838 839 if os.name == 'nt': 840 # normalize alternate separator "/" to the standard separator "\" 841 self.assertTrue(fnmatch(r'a/b', r'a\b')) 842 self.assertTrue(fnmatch(r'a\b', r'a/b')) 843 self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) 844 self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) 845 else: 846 # there is no alternate separator 847 self.assertFalse(fnmatch(r'a/b', r'a\b')) 848 self.assertFalse(fnmatch(r'a\b', r'a/b')) 849 self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) 850 self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) 851 852 # as of 3.5, .pyo is no longer munged to .py 853 self.assertFalse(fnmatch('a.pyo', 'a.py')) 854 855 def test_filter_match_trace(self): 856 t1 = (("a.py", 2), ("b.py", 3)) 857 t2 = (("b.py", 4), ("b.py", 5)) 858 t3 = (("c.py", 5), ('<unknown>', 0)) 859 unknown = (('<unknown>', 0),) 860 861 f = tracemalloc.Filter(True, "b.py", all_frames=True) 862 self.assertTrue(f._match_traceback(t1)) 863 self.assertTrue(f._match_traceback(t2)) 864 self.assertFalse(f._match_traceback(t3)) 865 self.assertFalse(f._match_traceback(unknown)) 866 867 f = tracemalloc.Filter(True, "b.py", all_frames=False) 868 self.assertFalse(f._match_traceback(t1)) 869 self.assertTrue(f._match_traceback(t2)) 870 self.assertFalse(f._match_traceback(t3)) 871 self.assertFalse(f._match_traceback(unknown)) 872 873 f = tracemalloc.Filter(False, "b.py", all_frames=True) 874 self.assertFalse(f._match_traceback(t1)) 875 self.assertFalse(f._match_traceback(t2)) 876 self.assertTrue(f._match_traceback(t3)) 877 self.assertTrue(f._match_traceback(unknown)) 878 879 f = tracemalloc.Filter(False, "b.py", all_frames=False) 880 self.assertTrue(f._match_traceback(t1)) 881 self.assertFalse(f._match_traceback(t2)) 882 self.assertTrue(f._match_traceback(t3)) 883 self.assertTrue(f._match_traceback(unknown)) 884 885 f = tracemalloc.Filter(False, "<unknown>", all_frames=False) 886 self.assertTrue(f._match_traceback(t1)) 887 self.assertTrue(f._match_traceback(t2)) 888 self.assertTrue(f._match_traceback(t3)) 889 self.assertFalse(f._match_traceback(unknown)) 890 891 f = tracemalloc.Filter(True, "<unknown>", all_frames=True) 892 self.assertFalse(f._match_traceback(t1)) 893 self.assertFalse(f._match_traceback(t2)) 894 self.assertTrue(f._match_traceback(t3)) 895 self.assertTrue(f._match_traceback(unknown)) 896 897 f = tracemalloc.Filter(False, "<unknown>", all_frames=True) 898 self.assertTrue(f._match_traceback(t1)) 899 self.assertTrue(f._match_traceback(t2)) 900 self.assertFalse(f._match_traceback(t3)) 901 self.assertFalse(f._match_traceback(unknown)) 902 903 904class TestCommandLine(unittest.TestCase): 905 def test_env_var_disabled_by_default(self): 906 # not tracing by default 907 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 908 ok, stdout, stderr = assert_python_ok('-c', code) 909 stdout = stdout.rstrip() 910 self.assertEqual(stdout, b'False') 911 912 @unittest.skipIf(interpreter_requires_environment(), 913 'Cannot run -E tests when PYTHON env vars are required.') 914 def test_env_var_ignored_with_E(self): 915 """PYTHON* environment variables must be ignored when -E is present.""" 916 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 917 ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') 918 stdout = stdout.rstrip() 919 self.assertEqual(stdout, b'False') 920 921 def test_env_var_disabled(self): 922 # tracing at startup 923 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 924 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='0') 925 stdout = stdout.rstrip() 926 self.assertEqual(stdout, b'False') 927 928 def test_env_var_enabled_at_startup(self): 929 # tracing at startup 930 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 931 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') 932 stdout = stdout.rstrip() 933 self.assertEqual(stdout, b'True') 934 935 def test_env_limit(self): 936 # start and set the number of frames 937 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 938 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') 939 stdout = stdout.rstrip() 940 self.assertEqual(stdout, b'10') 941 942 @force_not_colorized 943 def check_env_var_invalid(self, nframe): 944 with support.SuppressCrashReport(): 945 ok, stdout, stderr = assert_python_failure( 946 '-c', 'pass', 947 PYTHONTRACEMALLOC=str(nframe)) 948 949 if b'ValueError: the number of frames must be in range' in stderr: 950 return 951 if b'PYTHONTRACEMALLOC: invalid number of frames' in stderr: 952 return 953 self.fail(f"unexpected output: {stderr!a}") 954 955 956 def test_env_var_invalid(self): 957 for nframe in INVALID_NFRAME: 958 with self.subTest(nframe=nframe): 959 self.check_env_var_invalid(nframe) 960 961 def test_sys_xoptions(self): 962 for xoptions, nframe in ( 963 ('tracemalloc', 1), 964 ('tracemalloc=1', 1), 965 ('tracemalloc=15', 15), 966 ): 967 with self.subTest(xoptions=xoptions, nframe=nframe): 968 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 969 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) 970 stdout = stdout.rstrip() 971 self.assertEqual(stdout, str(nframe).encode('ascii')) 972 973 def check_sys_xoptions_invalid(self, nframe): 974 args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') 975 with support.SuppressCrashReport(): 976 ok, stdout, stderr = assert_python_failure(*args) 977 978 if b'ValueError: the number of frames must be in range' in stderr: 979 return 980 if b'-X tracemalloc=NFRAME: invalid number of frames' in stderr: 981 return 982 self.fail(f"unexpected output: {stderr!a}") 983 984 def test_sys_xoptions_invalid(self): 985 for nframe in INVALID_NFRAME: 986 with self.subTest(nframe=nframe): 987 self.check_sys_xoptions_invalid(nframe) 988 989 @unittest.skipIf(_testcapi is None, 'need _testcapi') 990 def test_pymem_alloc0(self): 991 # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled 992 # does not crash. 993 code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' 994 assert_python_ok('-X', 'tracemalloc', '-c', code) 995 996 997@unittest.skipIf(_testcapi is None, 'need _testcapi') 998class TestCAPI(unittest.TestCase): 999 maxDiff = 80 * 20 1000 1001 def setUp(self): 1002 if tracemalloc.is_tracing(): 1003 self.skipTest("tracemalloc must be stopped before the test") 1004 1005 self.domain = 5 1006 self.size = 123 1007 self.obj = allocate_bytes(self.size)[0] 1008 1009 # for the type "object", id(obj) is the address of its memory block. 1010 # This type is not tracked by the garbage collector 1011 self.ptr = id(self.obj) 1012 1013 def tearDown(self): 1014 tracemalloc.stop() 1015 1016 def get_traceback(self): 1017 frames = _testinternalcapi._PyTraceMalloc_GetTraceback(self.domain, self.ptr) 1018 if frames is not None: 1019 return tracemalloc.Traceback(frames) 1020 else: 1021 return None 1022 1023 def track(self, release_gil=False, nframe=1): 1024 frames = get_frames(nframe, 1) 1025 _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, 1026 release_gil) 1027 return frames 1028 1029 def untrack(self): 1030 _testcapi.tracemalloc_untrack(self.domain, self.ptr) 1031 1032 def get_traced_memory(self): 1033 # Get the traced size in the domain 1034 snapshot = tracemalloc.take_snapshot() 1035 domain_filter = tracemalloc.DomainFilter(True, self.domain) 1036 snapshot = snapshot.filter_traces([domain_filter]) 1037 return sum(trace.size for trace in snapshot.traces) 1038 1039 def check_track(self, release_gil): 1040 nframe = 5 1041 tracemalloc.start(nframe) 1042 1043 size = tracemalloc.get_traced_memory()[0] 1044 1045 frames = self.track(release_gil, nframe) 1046 self.assertEqual(self.get_traceback(), 1047 tracemalloc.Traceback(frames)) 1048 1049 self.assertEqual(self.get_traced_memory(), self.size) 1050 1051 def test_track(self): 1052 self.check_track(False) 1053 1054 def test_track_without_gil(self): 1055 # check that calling _PyTraceMalloc_Track() without holding the GIL 1056 # works too 1057 self.check_track(True) 1058 1059 def test_track_already_tracked(self): 1060 nframe = 5 1061 tracemalloc.start(nframe) 1062 1063 # track a first time 1064 self.track() 1065 1066 # calling _PyTraceMalloc_Track() must remove the old trace and add 1067 # a new trace with the new traceback 1068 frames = self.track(nframe=nframe) 1069 self.assertEqual(self.get_traceback(), 1070 tracemalloc.Traceback(frames)) 1071 1072 def test_untrack(self): 1073 tracemalloc.start() 1074 1075 self.track() 1076 self.assertIsNotNone(self.get_traceback()) 1077 self.assertEqual(self.get_traced_memory(), self.size) 1078 1079 # untrack must remove the trace 1080 self.untrack() 1081 self.assertIsNone(self.get_traceback()) 1082 self.assertEqual(self.get_traced_memory(), 0) 1083 1084 # calling _PyTraceMalloc_Untrack() multiple times must not crash 1085 self.untrack() 1086 self.untrack() 1087 1088 def test_stop_track(self): 1089 tracemalloc.start() 1090 tracemalloc.stop() 1091 1092 with self.assertRaises(RuntimeError): 1093 self.track() 1094 self.assertIsNone(self.get_traceback()) 1095 1096 def test_stop_untrack(self): 1097 tracemalloc.start() 1098 self.track() 1099 1100 tracemalloc.stop() 1101 with self.assertRaises(RuntimeError): 1102 self.untrack() 1103 1104 1105if __name__ == "__main__": 1106 unittest.main() 1107