1import contextlib 2import os 3import sys 4import tracemalloc 5import unittest 6from unittest.mock import patch 7from test.support.script_helper import (assert_python_ok, assert_python_failure, 8 interpreter_requires_environment) 9from test import support 10try: 11 import threading 12except ImportError: 13 threading = None 14try: 15 import _testcapi 16except ImportError: 17 _testcapi = None 18 19 20EMPTY_STRING_SIZE = sys.getsizeof(b'') 21 22 23def get_frames(nframe, lineno_delta): 24 frames = [] 25 frame = sys._getframe(1) 26 for index in range(nframe): 27 code = frame.f_code 28 lineno = frame.f_lineno + lineno_delta 29 frames.append((code.co_filename, lineno)) 30 lineno_delta = 0 31 frame = frame.f_back 32 if frame is None: 33 break 34 return tuple(frames) 35 36def allocate_bytes(size): 37 nframe = tracemalloc.get_traceback_limit() 38 bytes_len = (size - EMPTY_STRING_SIZE) 39 frames = get_frames(nframe, 1) 40 data = b'x' * bytes_len 41 return data, tracemalloc.Traceback(frames) 42 43def create_snapshots(): 44 traceback_limit = 2 45 46 # _tracemalloc._get_traces() returns a list of (domain, size, 47 # traceback_frames) tuples. traceback_frames is a tuple of (filename, 48 # line_number) tuples. 49 raw_traces = [ 50 (0, 10, (('a.py', 2), ('b.py', 4))), 51 (0, 10, (('a.py', 2), ('b.py', 4))), 52 (0, 10, (('a.py', 2), ('b.py', 4))), 53 54 (1, 2, (('a.py', 5), ('b.py', 4))), 55 56 (2, 66, (('b.py', 1),)), 57 58 (3, 7, (('<unknown>', 0),)), 59 ] 60 snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit) 61 62 raw_traces2 = [ 63 (0, 10, (('a.py', 2), ('b.py', 4))), 64 (0, 10, (('a.py', 2), ('b.py', 4))), 65 (0, 10, (('a.py', 2), ('b.py', 4))), 66 67 (2, 2, (('a.py', 5), ('b.py', 4))), 68 (2, 5000, (('a.py', 5), ('b.py', 4))), 69 70 (4, 400, (('c.py', 578),)), 71 ] 72 snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit) 73 74 return (snapshot, snapshot2) 75 76def frame(filename, lineno): 77 return tracemalloc._Frame((filename, lineno)) 78 79def traceback(*frames): 80 return tracemalloc.Traceback(frames) 81 82def traceback_lineno(filename, lineno): 83 return traceback((filename, lineno)) 84 85def traceback_filename(filename): 86 return traceback_lineno(filename, 0) 87 88 89class TestTracemallocEnabled(unittest.TestCase): 90 def setUp(self): 91 if tracemalloc.is_tracing(): 92 self.skipTest("tracemalloc must be stopped before the test") 93 94 tracemalloc.start(1) 95 96 def tearDown(self): 97 tracemalloc.stop() 98 99 def test_get_tracemalloc_memory(self): 100 data = [allocate_bytes(123) for count in range(1000)] 101 size = tracemalloc.get_tracemalloc_memory() 102 self.assertGreaterEqual(size, 0) 103 104 tracemalloc.clear_traces() 105 size2 = tracemalloc.get_tracemalloc_memory() 106 self.assertGreaterEqual(size2, 0) 107 self.assertLessEqual(size2, size) 108 109 def test_get_object_traceback(self): 110 tracemalloc.clear_traces() 111 obj_size = 12345 112 obj, obj_traceback = allocate_bytes(obj_size) 113 traceback = tracemalloc.get_object_traceback(obj) 114 self.assertEqual(traceback, obj_traceback) 115 116 def test_set_traceback_limit(self): 117 obj_size = 10 118 119 tracemalloc.stop() 120 self.assertRaises(ValueError, tracemalloc.start, -1) 121 122 tracemalloc.stop() 123 tracemalloc.start(10) 124 obj2, obj2_traceback = allocate_bytes(obj_size) 125 traceback = tracemalloc.get_object_traceback(obj2) 126 self.assertEqual(len(traceback), 10) 127 self.assertEqual(traceback, obj2_traceback) 128 129 tracemalloc.stop() 130 tracemalloc.start(1) 131 obj, obj_traceback = allocate_bytes(obj_size) 132 traceback = tracemalloc.get_object_traceback(obj) 133 self.assertEqual(len(traceback), 1) 134 self.assertEqual(traceback, obj_traceback) 135 136 def find_trace(self, traces, traceback): 137 for trace in traces: 138 if trace[2] == traceback._frames: 139 return trace 140 141 self.fail("trace not found") 142 143 def test_get_traces(self): 144 tracemalloc.clear_traces() 145 obj_size = 12345 146 obj, obj_traceback = allocate_bytes(obj_size) 147 148 traces = tracemalloc._get_traces() 149 trace = self.find_trace(traces, obj_traceback) 150 151 self.assertIsInstance(trace, tuple) 152 domain, size, traceback = trace 153 self.assertEqual(size, obj_size) 154 self.assertEqual(traceback, obj_traceback._frames) 155 156 tracemalloc.stop() 157 self.assertEqual(tracemalloc._get_traces(), []) 158 159 def test_get_traces_intern_traceback(self): 160 # dummy wrappers to get more useful and identical frames in the traceback 161 def allocate_bytes2(size): 162 return allocate_bytes(size) 163 def allocate_bytes3(size): 164 return allocate_bytes2(size) 165 def allocate_bytes4(size): 166 return allocate_bytes3(size) 167 168 # Ensure that two identical tracebacks are not duplicated 169 tracemalloc.stop() 170 tracemalloc.start(4) 171 obj_size = 123 172 obj1, obj1_traceback = allocate_bytes4(obj_size) 173 obj2, obj2_traceback = allocate_bytes4(obj_size) 174 175 traces = tracemalloc._get_traces() 176 177 trace1 = self.find_trace(traces, obj1_traceback) 178 trace2 = self.find_trace(traces, obj2_traceback) 179 domain1, size1, traceback1 = trace1 180 domain2, size2, traceback2 = trace2 181 self.assertIs(traceback2, traceback1) 182 183 def test_get_traced_memory(self): 184 # Python allocates some internals objects, so the test must tolerate 185 # a small difference between the expected size and the real usage 186 max_error = 2048 187 188 # allocate one object 189 obj_size = 1024 * 1024 190 tracemalloc.clear_traces() 191 obj, obj_traceback = allocate_bytes(obj_size) 192 size, peak_size = tracemalloc.get_traced_memory() 193 self.assertGreaterEqual(size, obj_size) 194 self.assertGreaterEqual(peak_size, size) 195 196 self.assertLessEqual(size - obj_size, max_error) 197 self.assertLessEqual(peak_size - size, max_error) 198 199 # destroy the object 200 obj = None 201 size2, peak_size2 = tracemalloc.get_traced_memory() 202 self.assertLess(size2, size) 203 self.assertGreaterEqual(size - size2, obj_size - max_error) 204 self.assertGreaterEqual(peak_size2, peak_size) 205 206 # clear_traces() must reset traced memory counters 207 tracemalloc.clear_traces() 208 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 209 210 # allocate another object 211 obj, obj_traceback = allocate_bytes(obj_size) 212 size, peak_size = tracemalloc.get_traced_memory() 213 self.assertGreaterEqual(size, obj_size) 214 215 # stop() also resets traced memory counters 216 tracemalloc.stop() 217 self.assertEqual(tracemalloc.get_traced_memory(), (0, 0)) 218 219 def test_clear_traces(self): 220 obj, obj_traceback = allocate_bytes(123) 221 traceback = tracemalloc.get_object_traceback(obj) 222 self.assertIsNotNone(traceback) 223 224 tracemalloc.clear_traces() 225 traceback2 = tracemalloc.get_object_traceback(obj) 226 self.assertIsNone(traceback2) 227 228 def test_is_tracing(self): 229 tracemalloc.stop() 230 self.assertFalse(tracemalloc.is_tracing()) 231 232 tracemalloc.start() 233 self.assertTrue(tracemalloc.is_tracing()) 234 235 def test_snapshot(self): 236 obj, source = allocate_bytes(123) 237 238 # take a snapshot 239 snapshot = tracemalloc.take_snapshot() 240 241 # write on disk 242 snapshot.dump(support.TESTFN) 243 self.addCleanup(support.unlink, support.TESTFN) 244 245 # load from disk 246 snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) 247 self.assertEqual(snapshot2.traces, snapshot.traces) 248 249 # tracemalloc must be tracing memory allocations to take a snapshot 250 tracemalloc.stop() 251 with self.assertRaises(RuntimeError) as cm: 252 tracemalloc.take_snapshot() 253 self.assertEqual(str(cm.exception), 254 "the tracemalloc module must be tracing memory " 255 "allocations to take a snapshot") 256 257 def test_snapshot_save_attr(self): 258 # take a snapshot with a new attribute 259 snapshot = tracemalloc.take_snapshot() 260 snapshot.test_attr = "new" 261 snapshot.dump(support.TESTFN) 262 self.addCleanup(support.unlink, support.TESTFN) 263 264 # load() should recreate the attribute 265 snapshot2 = tracemalloc.Snapshot.load(support.TESTFN) 266 self.assertEqual(snapshot2.test_attr, "new") 267 268 def fork_child(self): 269 if not tracemalloc.is_tracing(): 270 return 2 271 272 obj_size = 12345 273 obj, obj_traceback = allocate_bytes(obj_size) 274 traceback = tracemalloc.get_object_traceback(obj) 275 if traceback is None: 276 return 3 277 278 # everything is fine 279 return 0 280 281 @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()') 282 def test_fork(self): 283 # check that tracemalloc is still working after fork 284 pid = os.fork() 285 if not pid: 286 # child 287 exitcode = 1 288 try: 289 exitcode = self.fork_child() 290 finally: 291 os._exit(exitcode) 292 else: 293 pid2, status = os.waitpid(pid, 0) 294 self.assertTrue(os.WIFEXITED(status)) 295 exitcode = os.WEXITSTATUS(status) 296 self.assertEqual(exitcode, 0) 297 298 299class TestSnapshot(unittest.TestCase): 300 maxDiff = 4000 301 302 def test_create_snapshot(self): 303 raw_traces = [(0, 5, (('a.py', 2),))] 304 305 with contextlib.ExitStack() as stack: 306 stack.enter_context(patch.object(tracemalloc, 'is_tracing', 307 return_value=True)) 308 stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit', 309 return_value=5)) 310 stack.enter_context(patch.object(tracemalloc, '_get_traces', 311 return_value=raw_traces)) 312 313 snapshot = tracemalloc.take_snapshot() 314 self.assertEqual(snapshot.traceback_limit, 5) 315 self.assertEqual(len(snapshot.traces), 1) 316 trace = snapshot.traces[0] 317 self.assertEqual(trace.size, 5) 318 self.assertEqual(len(trace.traceback), 1) 319 self.assertEqual(trace.traceback[0].filename, 'a.py') 320 self.assertEqual(trace.traceback[0].lineno, 2) 321 322 def test_filter_traces(self): 323 snapshot, snapshot2 = create_snapshots() 324 filter1 = tracemalloc.Filter(False, "b.py") 325 filter2 = tracemalloc.Filter(True, "a.py", 2) 326 filter3 = tracemalloc.Filter(True, "a.py", 5) 327 328 original_traces = list(snapshot.traces._traces) 329 330 # exclude b.py 331 snapshot3 = snapshot.filter_traces((filter1,)) 332 self.assertEqual(snapshot3.traces._traces, [ 333 (0, 10, (('a.py', 2), ('b.py', 4))), 334 (0, 10, (('a.py', 2), ('b.py', 4))), 335 (0, 10, (('a.py', 2), ('b.py', 4))), 336 (1, 2, (('a.py', 5), ('b.py', 4))), 337 (3, 7, (('<unknown>', 0),)), 338 ]) 339 340 # filter_traces() must not touch the original snapshot 341 self.assertEqual(snapshot.traces._traces, original_traces) 342 343 # only include two lines of a.py 344 snapshot4 = snapshot3.filter_traces((filter2, filter3)) 345 self.assertEqual(snapshot4.traces._traces, [ 346 (0, 10, (('a.py', 2), ('b.py', 4))), 347 (0, 10, (('a.py', 2), ('b.py', 4))), 348 (0, 10, (('a.py', 2), ('b.py', 4))), 349 (1, 2, (('a.py', 5), ('b.py', 4))), 350 ]) 351 352 # No filter: just duplicate the snapshot 353 snapshot5 = snapshot.filter_traces(()) 354 self.assertIsNot(snapshot5, snapshot) 355 self.assertIsNot(snapshot5.traces, snapshot.traces) 356 self.assertEqual(snapshot5.traces, snapshot.traces) 357 358 self.assertRaises(TypeError, snapshot.filter_traces, filter1) 359 360 def test_filter_traces_domain(self): 361 snapshot, snapshot2 = create_snapshots() 362 filter1 = tracemalloc.Filter(False, "a.py", domain=1) 363 filter2 = tracemalloc.Filter(True, "a.py", domain=1) 364 365 original_traces = list(snapshot.traces._traces) 366 367 # exclude a.py of domain 1 368 snapshot3 = snapshot.filter_traces((filter1,)) 369 self.assertEqual(snapshot3.traces._traces, [ 370 (0, 10, (('a.py', 2), ('b.py', 4))), 371 (0, 10, (('a.py', 2), ('b.py', 4))), 372 (0, 10, (('a.py', 2), ('b.py', 4))), 373 (2, 66, (('b.py', 1),)), 374 (3, 7, (('<unknown>', 0),)), 375 ]) 376 377 # include domain 1 378 snapshot3 = snapshot.filter_traces((filter1,)) 379 self.assertEqual(snapshot3.traces._traces, [ 380 (0, 10, (('a.py', 2), ('b.py', 4))), 381 (0, 10, (('a.py', 2), ('b.py', 4))), 382 (0, 10, (('a.py', 2), ('b.py', 4))), 383 (2, 66, (('b.py', 1),)), 384 (3, 7, (('<unknown>', 0),)), 385 ]) 386 387 def test_filter_traces_domain_filter(self): 388 snapshot, snapshot2 = create_snapshots() 389 filter1 = tracemalloc.DomainFilter(False, domain=3) 390 filter2 = tracemalloc.DomainFilter(True, domain=3) 391 392 # exclude domain 2 393 snapshot3 = snapshot.filter_traces((filter1,)) 394 self.assertEqual(snapshot3.traces._traces, [ 395 (0, 10, (('a.py', 2), ('b.py', 4))), 396 (0, 10, (('a.py', 2), ('b.py', 4))), 397 (0, 10, (('a.py', 2), ('b.py', 4))), 398 (1, 2, (('a.py', 5), ('b.py', 4))), 399 (2, 66, (('b.py', 1),)), 400 ]) 401 402 # include domain 2 403 snapshot3 = snapshot.filter_traces((filter2,)) 404 self.assertEqual(snapshot3.traces._traces, [ 405 (3, 7, (('<unknown>', 0),)), 406 ]) 407 408 def test_snapshot_group_by_line(self): 409 snapshot, snapshot2 = create_snapshots() 410 tb_0 = traceback_lineno('<unknown>', 0) 411 tb_a_2 = traceback_lineno('a.py', 2) 412 tb_a_5 = traceback_lineno('a.py', 5) 413 tb_b_1 = traceback_lineno('b.py', 1) 414 tb_c_578 = traceback_lineno('c.py', 578) 415 416 # stats per file and line 417 stats1 = snapshot.statistics('lineno') 418 self.assertEqual(stats1, [ 419 tracemalloc.Statistic(tb_b_1, 66, 1), 420 tracemalloc.Statistic(tb_a_2, 30, 3), 421 tracemalloc.Statistic(tb_0, 7, 1), 422 tracemalloc.Statistic(tb_a_5, 2, 1), 423 ]) 424 425 # stats per file and line (2) 426 stats2 = snapshot2.statistics('lineno') 427 self.assertEqual(stats2, [ 428 tracemalloc.Statistic(tb_a_5, 5002, 2), 429 tracemalloc.Statistic(tb_c_578, 400, 1), 430 tracemalloc.Statistic(tb_a_2, 30, 3), 431 ]) 432 433 # stats diff per file and line 434 statistics = snapshot2.compare_to(snapshot, 'lineno') 435 self.assertEqual(statistics, [ 436 tracemalloc.StatisticDiff(tb_a_5, 5002, 5000, 2, 1), 437 tracemalloc.StatisticDiff(tb_c_578, 400, 400, 1, 1), 438 tracemalloc.StatisticDiff(tb_b_1, 0, -66, 0, -1), 439 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 440 tracemalloc.StatisticDiff(tb_a_2, 30, 0, 3, 0), 441 ]) 442 443 def test_snapshot_group_by_file(self): 444 snapshot, snapshot2 = create_snapshots() 445 tb_0 = traceback_filename('<unknown>') 446 tb_a = traceback_filename('a.py') 447 tb_b = traceback_filename('b.py') 448 tb_c = traceback_filename('c.py') 449 450 # stats per file 451 stats1 = snapshot.statistics('filename') 452 self.assertEqual(stats1, [ 453 tracemalloc.Statistic(tb_b, 66, 1), 454 tracemalloc.Statistic(tb_a, 32, 4), 455 tracemalloc.Statistic(tb_0, 7, 1), 456 ]) 457 458 # stats per file (2) 459 stats2 = snapshot2.statistics('filename') 460 self.assertEqual(stats2, [ 461 tracemalloc.Statistic(tb_a, 5032, 5), 462 tracemalloc.Statistic(tb_c, 400, 1), 463 ]) 464 465 # stats diff per file 466 diff = snapshot2.compare_to(snapshot, 'filename') 467 self.assertEqual(diff, [ 468 tracemalloc.StatisticDiff(tb_a, 5032, 5000, 5, 1), 469 tracemalloc.StatisticDiff(tb_c, 400, 400, 1, 1), 470 tracemalloc.StatisticDiff(tb_b, 0, -66, 0, -1), 471 tracemalloc.StatisticDiff(tb_0, 0, -7, 0, -1), 472 ]) 473 474 def test_snapshot_group_by_traceback(self): 475 snapshot, snapshot2 = create_snapshots() 476 477 # stats per file 478 tb1 = traceback(('a.py', 2), ('b.py', 4)) 479 tb2 = traceback(('a.py', 5), ('b.py', 4)) 480 tb3 = traceback(('b.py', 1)) 481 tb4 = traceback(('<unknown>', 0)) 482 stats1 = snapshot.statistics('traceback') 483 self.assertEqual(stats1, [ 484 tracemalloc.Statistic(tb3, 66, 1), 485 tracemalloc.Statistic(tb1, 30, 3), 486 tracemalloc.Statistic(tb4, 7, 1), 487 tracemalloc.Statistic(tb2, 2, 1), 488 ]) 489 490 # stats per file (2) 491 tb5 = traceback(('c.py', 578)) 492 stats2 = snapshot2.statistics('traceback') 493 self.assertEqual(stats2, [ 494 tracemalloc.Statistic(tb2, 5002, 2), 495 tracemalloc.Statistic(tb5, 400, 1), 496 tracemalloc.Statistic(tb1, 30, 3), 497 ]) 498 499 # stats diff per file 500 diff = snapshot2.compare_to(snapshot, 'traceback') 501 self.assertEqual(diff, [ 502 tracemalloc.StatisticDiff(tb2, 5002, 5000, 2, 1), 503 tracemalloc.StatisticDiff(tb5, 400, 400, 1, 1), 504 tracemalloc.StatisticDiff(tb3, 0, -66, 0, -1), 505 tracemalloc.StatisticDiff(tb4, 0, -7, 0, -1), 506 tracemalloc.StatisticDiff(tb1, 30, 0, 3, 0), 507 ]) 508 509 self.assertRaises(ValueError, 510 snapshot.statistics, 'traceback', cumulative=True) 511 512 def test_snapshot_group_by_cumulative(self): 513 snapshot, snapshot2 = create_snapshots() 514 tb_0 = traceback_filename('<unknown>') 515 tb_a = traceback_filename('a.py') 516 tb_b = traceback_filename('b.py') 517 tb_a_2 = traceback_lineno('a.py', 2) 518 tb_a_5 = traceback_lineno('a.py', 5) 519 tb_b_1 = traceback_lineno('b.py', 1) 520 tb_b_4 = traceback_lineno('b.py', 4) 521 522 # per file 523 stats = snapshot.statistics('filename', True) 524 self.assertEqual(stats, [ 525 tracemalloc.Statistic(tb_b, 98, 5), 526 tracemalloc.Statistic(tb_a, 32, 4), 527 tracemalloc.Statistic(tb_0, 7, 1), 528 ]) 529 530 # per line 531 stats = snapshot.statistics('lineno', True) 532 self.assertEqual(stats, [ 533 tracemalloc.Statistic(tb_b_1, 66, 1), 534 tracemalloc.Statistic(tb_b_4, 32, 4), 535 tracemalloc.Statistic(tb_a_2, 30, 3), 536 tracemalloc.Statistic(tb_0, 7, 1), 537 tracemalloc.Statistic(tb_a_5, 2, 1), 538 ]) 539 540 def test_trace_format(self): 541 snapshot, snapshot2 = create_snapshots() 542 trace = snapshot.traces[0] 543 self.assertEqual(str(trace), 'a.py:2: 10 B') 544 traceback = trace.traceback 545 self.assertEqual(str(traceback), 'a.py:2') 546 frame = traceback[0] 547 self.assertEqual(str(frame), 'a.py:2') 548 549 def test_statistic_format(self): 550 snapshot, snapshot2 = create_snapshots() 551 stats = snapshot.statistics('lineno') 552 stat = stats[0] 553 self.assertEqual(str(stat), 554 'b.py:1: size=66 B, count=1, average=66 B') 555 556 def test_statistic_diff_format(self): 557 snapshot, snapshot2 = create_snapshots() 558 stats = snapshot2.compare_to(snapshot, 'lineno') 559 stat = stats[0] 560 self.assertEqual(str(stat), 561 'a.py:5: size=5002 B (+5000 B), count=2 (+1), average=2501 B') 562 563 def test_slices(self): 564 snapshot, snapshot2 = create_snapshots() 565 self.assertEqual(snapshot.traces[:2], 566 (snapshot.traces[0], snapshot.traces[1])) 567 568 traceback = snapshot.traces[0].traceback 569 self.assertEqual(traceback[:2], 570 (traceback[0], traceback[1])) 571 572 def test_format_traceback(self): 573 snapshot, snapshot2 = create_snapshots() 574 def getline(filename, lineno): 575 return ' <%s, %s>' % (filename, lineno) 576 with unittest.mock.patch('tracemalloc.linecache.getline', 577 side_effect=getline): 578 tb = snapshot.traces[0].traceback 579 self.assertEqual(tb.format(), 580 [' File "a.py", line 2', 581 ' <a.py, 2>', 582 ' File "b.py", line 4', 583 ' <b.py, 4>']) 584 585 self.assertEqual(tb.format(limit=1), 586 [' File "a.py", line 2', 587 ' <a.py, 2>']) 588 589 self.assertEqual(tb.format(limit=-1), 590 []) 591 592 593class TestFilters(unittest.TestCase): 594 maxDiff = 2048 595 596 def test_filter_attributes(self): 597 # test default values 598 f = tracemalloc.Filter(True, "abc") 599 self.assertEqual(f.inclusive, True) 600 self.assertEqual(f.filename_pattern, "abc") 601 self.assertIsNone(f.lineno) 602 self.assertEqual(f.all_frames, False) 603 604 # test custom values 605 f = tracemalloc.Filter(False, "test.py", 123, True) 606 self.assertEqual(f.inclusive, False) 607 self.assertEqual(f.filename_pattern, "test.py") 608 self.assertEqual(f.lineno, 123) 609 self.assertEqual(f.all_frames, True) 610 611 # parameters passed by keyword 612 f = tracemalloc.Filter(inclusive=False, filename_pattern="test.py", lineno=123, all_frames=True) 613 self.assertEqual(f.inclusive, False) 614 self.assertEqual(f.filename_pattern, "test.py") 615 self.assertEqual(f.lineno, 123) 616 self.assertEqual(f.all_frames, True) 617 618 # read-only attribute 619 self.assertRaises(AttributeError, setattr, f, "filename_pattern", "abc") 620 621 def test_filter_match(self): 622 # filter without line number 623 f = tracemalloc.Filter(True, "abc") 624 self.assertTrue(f._match_frame("abc", 0)) 625 self.assertTrue(f._match_frame("abc", 5)) 626 self.assertTrue(f._match_frame("abc", 10)) 627 self.assertFalse(f._match_frame("12356", 0)) 628 self.assertFalse(f._match_frame("12356", 5)) 629 self.assertFalse(f._match_frame("12356", 10)) 630 631 f = tracemalloc.Filter(False, "abc") 632 self.assertFalse(f._match_frame("abc", 0)) 633 self.assertFalse(f._match_frame("abc", 5)) 634 self.assertFalse(f._match_frame("abc", 10)) 635 self.assertTrue(f._match_frame("12356", 0)) 636 self.assertTrue(f._match_frame("12356", 5)) 637 self.assertTrue(f._match_frame("12356", 10)) 638 639 # filter with line number > 0 640 f = tracemalloc.Filter(True, "abc", 5) 641 self.assertFalse(f._match_frame("abc", 0)) 642 self.assertTrue(f._match_frame("abc", 5)) 643 self.assertFalse(f._match_frame("abc", 10)) 644 self.assertFalse(f._match_frame("12356", 0)) 645 self.assertFalse(f._match_frame("12356", 5)) 646 self.assertFalse(f._match_frame("12356", 10)) 647 648 f = tracemalloc.Filter(False, "abc", 5) 649 self.assertTrue(f._match_frame("abc", 0)) 650 self.assertFalse(f._match_frame("abc", 5)) 651 self.assertTrue(f._match_frame("abc", 10)) 652 self.assertTrue(f._match_frame("12356", 0)) 653 self.assertTrue(f._match_frame("12356", 5)) 654 self.assertTrue(f._match_frame("12356", 10)) 655 656 # filter with line number 0 657 f = tracemalloc.Filter(True, "abc", 0) 658 self.assertTrue(f._match_frame("abc", 0)) 659 self.assertFalse(f._match_frame("abc", 5)) 660 self.assertFalse(f._match_frame("abc", 10)) 661 self.assertFalse(f._match_frame("12356", 0)) 662 self.assertFalse(f._match_frame("12356", 5)) 663 self.assertFalse(f._match_frame("12356", 10)) 664 665 f = tracemalloc.Filter(False, "abc", 0) 666 self.assertFalse(f._match_frame("abc", 0)) 667 self.assertTrue(f._match_frame("abc", 5)) 668 self.assertTrue(f._match_frame("abc", 10)) 669 self.assertTrue(f._match_frame("12356", 0)) 670 self.assertTrue(f._match_frame("12356", 5)) 671 self.assertTrue(f._match_frame("12356", 10)) 672 673 def test_filter_match_filename(self): 674 def fnmatch(inclusive, filename, pattern): 675 f = tracemalloc.Filter(inclusive, pattern) 676 return f._match_frame(filename, 0) 677 678 self.assertTrue(fnmatch(True, "abc", "abc")) 679 self.assertFalse(fnmatch(True, "12356", "abc")) 680 self.assertFalse(fnmatch(True, "<unknown>", "abc")) 681 682 self.assertFalse(fnmatch(False, "abc", "abc")) 683 self.assertTrue(fnmatch(False, "12356", "abc")) 684 self.assertTrue(fnmatch(False, "<unknown>", "abc")) 685 686 def test_filter_match_filename_joker(self): 687 def fnmatch(filename, pattern): 688 filter = tracemalloc.Filter(True, pattern) 689 return filter._match_frame(filename, 0) 690 691 # empty string 692 self.assertFalse(fnmatch('abc', '')) 693 self.assertFalse(fnmatch('', 'abc')) 694 self.assertTrue(fnmatch('', '')) 695 self.assertTrue(fnmatch('', '*')) 696 697 # no * 698 self.assertTrue(fnmatch('abc', 'abc')) 699 self.assertFalse(fnmatch('abc', 'abcd')) 700 self.assertFalse(fnmatch('abc', 'def')) 701 702 # a* 703 self.assertTrue(fnmatch('abc', 'a*')) 704 self.assertTrue(fnmatch('abc', 'abc*')) 705 self.assertFalse(fnmatch('abc', 'b*')) 706 self.assertFalse(fnmatch('abc', 'abcd*')) 707 708 # a*b 709 self.assertTrue(fnmatch('abc', 'a*c')) 710 self.assertTrue(fnmatch('abcdcx', 'a*cx')) 711 self.assertFalse(fnmatch('abb', 'a*c')) 712 self.assertFalse(fnmatch('abcdce', 'a*cx')) 713 714 # a*b*c 715 self.assertTrue(fnmatch('abcde', 'a*c*e')) 716 self.assertTrue(fnmatch('abcbdefeg', 'a*bd*eg')) 717 self.assertFalse(fnmatch('abcdd', 'a*c*e')) 718 self.assertFalse(fnmatch('abcbdefef', 'a*bd*eg')) 719 720 # replace .pyc suffix with .py 721 self.assertTrue(fnmatch('a.pyc', 'a.py')) 722 self.assertTrue(fnmatch('a.py', 'a.pyc')) 723 724 if os.name == 'nt': 725 # case insensitive 726 self.assertTrue(fnmatch('aBC', 'ABc')) 727 self.assertTrue(fnmatch('aBcDe', 'Ab*dE')) 728 729 self.assertTrue(fnmatch('a.pyc', 'a.PY')) 730 self.assertTrue(fnmatch('a.py', 'a.PYC')) 731 else: 732 # case sensitive 733 self.assertFalse(fnmatch('aBC', 'ABc')) 734 self.assertFalse(fnmatch('aBcDe', 'Ab*dE')) 735 736 self.assertFalse(fnmatch('a.pyc', 'a.PY')) 737 self.assertFalse(fnmatch('a.py', 'a.PYC')) 738 739 if os.name == 'nt': 740 # normalize alternate separator "/" to the standard separator "\" 741 self.assertTrue(fnmatch(r'a/b', r'a\b')) 742 self.assertTrue(fnmatch(r'a\b', r'a/b')) 743 self.assertTrue(fnmatch(r'a/b\c', r'a\b/c')) 744 self.assertTrue(fnmatch(r'a/b/c', r'a\b\c')) 745 else: 746 # there is no alternate separator 747 self.assertFalse(fnmatch(r'a/b', r'a\b')) 748 self.assertFalse(fnmatch(r'a\b', r'a/b')) 749 self.assertFalse(fnmatch(r'a/b\c', r'a\b/c')) 750 self.assertFalse(fnmatch(r'a/b/c', r'a\b\c')) 751 752 # as of 3.5, .pyo is no longer munged to .py 753 self.assertFalse(fnmatch('a.pyo', 'a.py')) 754 755 def test_filter_match_trace(self): 756 t1 = (("a.py", 2), ("b.py", 3)) 757 t2 = (("b.py", 4), ("b.py", 5)) 758 t3 = (("c.py", 5), ('<unknown>', 0)) 759 unknown = (('<unknown>', 0),) 760 761 f = tracemalloc.Filter(True, "b.py", all_frames=True) 762 self.assertTrue(f._match_traceback(t1)) 763 self.assertTrue(f._match_traceback(t2)) 764 self.assertFalse(f._match_traceback(t3)) 765 self.assertFalse(f._match_traceback(unknown)) 766 767 f = tracemalloc.Filter(True, "b.py", all_frames=False) 768 self.assertFalse(f._match_traceback(t1)) 769 self.assertTrue(f._match_traceback(t2)) 770 self.assertFalse(f._match_traceback(t3)) 771 self.assertFalse(f._match_traceback(unknown)) 772 773 f = tracemalloc.Filter(False, "b.py", all_frames=True) 774 self.assertFalse(f._match_traceback(t1)) 775 self.assertFalse(f._match_traceback(t2)) 776 self.assertTrue(f._match_traceback(t3)) 777 self.assertTrue(f._match_traceback(unknown)) 778 779 f = tracemalloc.Filter(False, "b.py", all_frames=False) 780 self.assertTrue(f._match_traceback(t1)) 781 self.assertFalse(f._match_traceback(t2)) 782 self.assertTrue(f._match_traceback(t3)) 783 self.assertTrue(f._match_traceback(unknown)) 784 785 f = tracemalloc.Filter(False, "<unknown>", all_frames=False) 786 self.assertTrue(f._match_traceback(t1)) 787 self.assertTrue(f._match_traceback(t2)) 788 self.assertTrue(f._match_traceback(t3)) 789 self.assertFalse(f._match_traceback(unknown)) 790 791 f = tracemalloc.Filter(True, "<unknown>", all_frames=True) 792 self.assertFalse(f._match_traceback(t1)) 793 self.assertFalse(f._match_traceback(t2)) 794 self.assertTrue(f._match_traceback(t3)) 795 self.assertTrue(f._match_traceback(unknown)) 796 797 f = tracemalloc.Filter(False, "<unknown>", all_frames=True) 798 self.assertTrue(f._match_traceback(t1)) 799 self.assertTrue(f._match_traceback(t2)) 800 self.assertFalse(f._match_traceback(t3)) 801 self.assertFalse(f._match_traceback(unknown)) 802 803 804class TestCommandLine(unittest.TestCase): 805 def test_env_var_disabled_by_default(self): 806 # not tracing by default 807 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 808 ok, stdout, stderr = assert_python_ok('-c', code) 809 stdout = stdout.rstrip() 810 self.assertEqual(stdout, b'False') 811 812 @unittest.skipIf(interpreter_requires_environment(), 813 'Cannot run -E tests when PYTHON env vars are required.') 814 def test_env_var_ignored_with_E(self): 815 """PYTHON* environment variables must be ignored when -E is present.""" 816 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 817 ok, stdout, stderr = assert_python_ok('-E', '-c', code, PYTHONTRACEMALLOC='1') 818 stdout = stdout.rstrip() 819 self.assertEqual(stdout, b'False') 820 821 def test_env_var_enabled_at_startup(self): 822 # tracing at startup 823 code = 'import tracemalloc; print(tracemalloc.is_tracing())' 824 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='1') 825 stdout = stdout.rstrip() 826 self.assertEqual(stdout, b'True') 827 828 def test_env_limit(self): 829 # start and set the number of frames 830 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 831 ok, stdout, stderr = assert_python_ok('-c', code, PYTHONTRACEMALLOC='10') 832 stdout = stdout.rstrip() 833 self.assertEqual(stdout, b'10') 834 835 def test_env_var_invalid(self): 836 for nframe in (-1, 0, 2**30): 837 with self.subTest(nframe=nframe): 838 with support.SuppressCrashReport(): 839 ok, stdout, stderr = assert_python_failure( 840 '-c', 'pass', 841 PYTHONTRACEMALLOC=str(nframe)) 842 self.assertIn(b'PYTHONTRACEMALLOC: invalid ' 843 b'number of frames', 844 stderr) 845 846 def test_sys_xoptions(self): 847 for xoptions, nframe in ( 848 ('tracemalloc', 1), 849 ('tracemalloc=1', 1), 850 ('tracemalloc=15', 15), 851 ): 852 with self.subTest(xoptions=xoptions, nframe=nframe): 853 code = 'import tracemalloc; print(tracemalloc.get_traceback_limit())' 854 ok, stdout, stderr = assert_python_ok('-X', xoptions, '-c', code) 855 stdout = stdout.rstrip() 856 self.assertEqual(stdout, str(nframe).encode('ascii')) 857 858 def test_sys_xoptions_invalid(self): 859 for nframe in (-1, 0, 2**30): 860 with self.subTest(nframe=nframe): 861 with support.SuppressCrashReport(): 862 args = ('-X', 'tracemalloc=%s' % nframe, '-c', 'pass') 863 ok, stdout, stderr = assert_python_failure(*args) 864 self.assertIn(b'-X tracemalloc=NFRAME: invalid ' 865 b'number of frames', 866 stderr) 867 868 def test_pymem_alloc0(self): 869 # Issue #21639: Check that PyMem_Malloc(0) with tracemalloc enabled 870 # does not crash. 871 code = 'import _testcapi; _testcapi.test_pymem_alloc0(); 1' 872 assert_python_ok('-X', 'tracemalloc', '-c', code) 873 874 875@unittest.skipIf(_testcapi is None, 'need _testcapi') 876class TestCAPI(unittest.TestCase): 877 maxDiff = 80 * 20 878 879 def setUp(self): 880 if tracemalloc.is_tracing(): 881 self.skipTest("tracemalloc must be stopped before the test") 882 883 self.domain = 5 884 self.size = 123 885 self.obj = allocate_bytes(self.size)[0] 886 887 # for the type "object", id(obj) is the address of its memory block. 888 # This type is not tracked by the garbage collector 889 self.ptr = id(self.obj) 890 891 def tearDown(self): 892 tracemalloc.stop() 893 894 def get_traceback(self): 895 frames = _testcapi.tracemalloc_get_traceback(self.domain, self.ptr) 896 if frames is not None: 897 return tracemalloc.Traceback(frames) 898 else: 899 return None 900 901 def track(self, release_gil=False, nframe=1): 902 frames = get_frames(nframe, 2) 903 _testcapi.tracemalloc_track(self.domain, self.ptr, self.size, 904 release_gil) 905 return frames 906 907 def untrack(self): 908 _testcapi.tracemalloc_untrack(self.domain, self.ptr) 909 910 def get_traced_memory(self): 911 # Get the traced size in the domain 912 snapshot = tracemalloc.take_snapshot() 913 domain_filter = tracemalloc.DomainFilter(True, self.domain) 914 snapshot = snapshot.filter_traces([domain_filter]) 915 return sum(trace.size for trace in snapshot.traces) 916 917 def check_track(self, release_gil): 918 nframe = 5 919 tracemalloc.start(nframe) 920 921 size = tracemalloc.get_traced_memory()[0] 922 923 frames = self.track(release_gil, nframe) 924 self.assertEqual(self.get_traceback(), 925 tracemalloc.Traceback(frames)) 926 927 self.assertEqual(self.get_traced_memory(), self.size) 928 929 def test_track(self): 930 self.check_track(False) 931 932 def test_track_without_gil(self): 933 # check that calling _PyTraceMalloc_Track() without holding the GIL 934 # works too 935 self.check_track(True) 936 937 def test_track_already_tracked(self): 938 nframe = 5 939 tracemalloc.start(nframe) 940 941 # track a first time 942 self.track() 943 944 # calling _PyTraceMalloc_Track() must remove the old trace and add 945 # a new trace with the new traceback 946 frames = self.track(nframe=nframe) 947 self.assertEqual(self.get_traceback(), 948 tracemalloc.Traceback(frames)) 949 950 def test_untrack(self): 951 tracemalloc.start() 952 953 self.track() 954 self.assertIsNotNone(self.get_traceback()) 955 self.assertEqual(self.get_traced_memory(), self.size) 956 957 # untrack must remove the trace 958 self.untrack() 959 self.assertIsNone(self.get_traceback()) 960 self.assertEqual(self.get_traced_memory(), 0) 961 962 # calling _PyTraceMalloc_Untrack() multiple times must not crash 963 self.untrack() 964 self.untrack() 965 966 def test_stop_track(self): 967 tracemalloc.start() 968 tracemalloc.stop() 969 970 with self.assertRaises(RuntimeError): 971 self.track() 972 self.assertIsNone(self.get_traceback()) 973 974 def test_stop_untrack(self): 975 tracemalloc.start() 976 self.track() 977 978 tracemalloc.stop() 979 with self.assertRaises(RuntimeError): 980 self.untrack() 981 982 983def test_main(): 984 support.run_unittest( 985 TestTracemallocEnabled, 986 TestSnapshot, 987 TestFilters, 988 TestCommandLine, 989 TestCAPI, 990 ) 991 992if __name__ == "__main__": 993 test_main() 994