1import unittest 2from test.support import (verbose, refcount_test, run_unittest, 3 strip_python_stderr, cpython_only, start_threads, 4 temp_dir, requires_type_collecting, TESTFN, unlink, 5 import_module) 6from test.support.script_helper import assert_python_ok, make_script 7 8import gc 9import sys 10import sysconfig 11import textwrap 12import threading 13import time 14import weakref 15 16try: 17 from _testcapi import with_tp_del 18except ImportError: 19 def with_tp_del(cls): 20 class C(object): 21 def __new__(cls, *args, **kwargs): 22 raise TypeError('requires _testcapi.with_tp_del') 23 return C 24 25### Support code 26############################################################################### 27 28# Bug 1055820 has several tests of longstanding bugs involving weakrefs and 29# cyclic gc. 30 31# An instance of C1055820 has a self-loop, so becomes cyclic trash when 32# unreachable. 33class C1055820(object): 34 def __init__(self, i): 35 self.i = i 36 self.loop = self 37 38class GC_Detector(object): 39 # Create an instance I. Then gc hasn't happened again so long as 40 # I.gc_happened is false. 41 42 def __init__(self): 43 self.gc_happened = False 44 45 def it_happened(ignored): 46 self.gc_happened = True 47 48 # Create a piece of cyclic trash that triggers it_happened when 49 # gc collects it. 50 self.wr = weakref.ref(C1055820(666), it_happened) 51 52@with_tp_del 53class Uncollectable(object): 54 """Create a reference cycle with multiple __del__ methods. 55 56 An object in a reference cycle will never have zero references, 57 and so must be garbage collected. If one or more objects in the 58 cycle have __del__ methods, the gc refuses to guess an order, 59 and leaves the cycle uncollected.""" 60 def __init__(self, partner=None): 61 if partner is None: 62 self.partner = Uncollectable(partner=self) 63 else: 64 self.partner = partner 65 def __tp_del__(self): 66 pass 67 68if sysconfig.get_config_vars().get('PY_CFLAGS', ''): 69 BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS']) 70else: 71 # Usually, sys.gettotalrefcount() is only present if Python has been 72 # compiled in debug mode. If it's missing, expect that Python has 73 # been released in release mode: with NDEBUG defined. 74 BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount')) 75 76### Tests 77############################################################################### 78 79class GCTests(unittest.TestCase): 80 def test_list(self): 81 l = [] 82 l.append(l) 83 gc.collect() 84 del l 85 self.assertEqual(gc.collect(), 1) 86 87 def test_dict(self): 88 d = {} 89 d[1] = d 90 gc.collect() 91 del d 92 self.assertEqual(gc.collect(), 1) 93 94 def test_tuple(self): 95 # since tuples are immutable we close the loop with a list 96 l = [] 97 t = (l,) 98 l.append(t) 99 gc.collect() 100 del t 101 del l 102 self.assertEqual(gc.collect(), 2) 103 104 def test_class(self): 105 class A: 106 pass 107 A.a = A 108 gc.collect() 109 del A 110 self.assertNotEqual(gc.collect(), 0) 111 112 def test_newstyleclass(self): 113 class A(object): 114 pass 115 gc.collect() 116 del A 117 self.assertNotEqual(gc.collect(), 0) 118 119 def test_instance(self): 120 class A: 121 pass 122 a = A() 123 a.a = a 124 gc.collect() 125 del a 126 self.assertNotEqual(gc.collect(), 0) 127 128 @requires_type_collecting 129 def test_newinstance(self): 130 class A(object): 131 pass 132 a = A() 133 a.a = a 134 gc.collect() 135 del a 136 self.assertNotEqual(gc.collect(), 0) 137 class B(list): 138 pass 139 class C(B, A): 140 pass 141 a = C() 142 a.a = a 143 gc.collect() 144 del a 145 self.assertNotEqual(gc.collect(), 0) 146 del B, C 147 self.assertNotEqual(gc.collect(), 0) 148 A.a = A() 149 del A 150 self.assertNotEqual(gc.collect(), 0) 151 self.assertEqual(gc.collect(), 0) 152 153 def test_method(self): 154 # Tricky: self.__init__ is a bound method, it references the instance. 155 class A: 156 def __init__(self): 157 self.init = self.__init__ 158 a = A() 159 gc.collect() 160 del a 161 self.assertNotEqual(gc.collect(), 0) 162 163 @cpython_only 164 def test_legacy_finalizer(self): 165 # A() is uncollectable if it is part of a cycle, make sure it shows up 166 # in gc.garbage. 167 @with_tp_del 168 class A: 169 def __tp_del__(self): pass 170 class B: 171 pass 172 a = A() 173 a.a = a 174 id_a = id(a) 175 b = B() 176 b.b = b 177 gc.collect() 178 del a 179 del b 180 self.assertNotEqual(gc.collect(), 0) 181 for obj in gc.garbage: 182 if id(obj) == id_a: 183 del obj.a 184 break 185 else: 186 self.fail("didn't find obj in garbage (finalizer)") 187 gc.garbage.remove(obj) 188 189 @cpython_only 190 def test_legacy_finalizer_newclass(self): 191 # A() is uncollectable if it is part of a cycle, make sure it shows up 192 # in gc.garbage. 193 @with_tp_del 194 class A(object): 195 def __tp_del__(self): pass 196 class B(object): 197 pass 198 a = A() 199 a.a = a 200 id_a = id(a) 201 b = B() 202 b.b = b 203 gc.collect() 204 del a 205 del b 206 self.assertNotEqual(gc.collect(), 0) 207 for obj in gc.garbage: 208 if id(obj) == id_a: 209 del obj.a 210 break 211 else: 212 self.fail("didn't find obj in garbage (finalizer)") 213 gc.garbage.remove(obj) 214 215 def test_function(self): 216 # Tricky: f -> d -> f, code should call d.clear() after the exec to 217 # break the cycle. 218 d = {} 219 exec("def f(): pass\n", d) 220 gc.collect() 221 del d 222 self.assertEqual(gc.collect(), 2) 223 224 @refcount_test 225 def test_frame(self): 226 def f(): 227 frame = sys._getframe() 228 gc.collect() 229 f() 230 self.assertEqual(gc.collect(), 1) 231 232 def test_saveall(self): 233 # Verify that cyclic garbage like lists show up in gc.garbage if the 234 # SAVEALL option is enabled. 235 236 # First make sure we don't save away other stuff that just happens to 237 # be waiting for collection. 238 gc.collect() 239 # if this fails, someone else created immortal trash 240 self.assertEqual(gc.garbage, []) 241 242 L = [] 243 L.append(L) 244 id_L = id(L) 245 246 debug = gc.get_debug() 247 gc.set_debug(debug | gc.DEBUG_SAVEALL) 248 del L 249 gc.collect() 250 gc.set_debug(debug) 251 252 self.assertEqual(len(gc.garbage), 1) 253 obj = gc.garbage.pop() 254 self.assertEqual(id(obj), id_L) 255 256 def test_del(self): 257 # __del__ methods can trigger collection, make this to happen 258 thresholds = gc.get_threshold() 259 gc.enable() 260 gc.set_threshold(1) 261 262 class A: 263 def __del__(self): 264 dir(self) 265 a = A() 266 del a 267 268 gc.disable() 269 gc.set_threshold(*thresholds) 270 271 def test_del_newclass(self): 272 # __del__ methods can trigger collection, make this to happen 273 thresholds = gc.get_threshold() 274 gc.enable() 275 gc.set_threshold(1) 276 277 class A(object): 278 def __del__(self): 279 dir(self) 280 a = A() 281 del a 282 283 gc.disable() 284 gc.set_threshold(*thresholds) 285 286 # The following two tests are fragile: 287 # They precisely count the number of allocations, 288 # which is highly implementation-dependent. 289 # For example, disposed tuples are not freed, but reused. 290 # To minimize variations, though, we first store the get_count() results 291 # and check them at the end. 292 @refcount_test 293 def test_get_count(self): 294 gc.collect() 295 a, b, c = gc.get_count() 296 x = [] 297 d, e, f = gc.get_count() 298 self.assertEqual((b, c), (0, 0)) 299 self.assertEqual((e, f), (0, 0)) 300 # This is less fragile than asserting that a equals 0. 301 self.assertLess(a, 5) 302 # Between the two calls to get_count(), at least one object was 303 # created (the list). 304 self.assertGreater(d, a) 305 306 @refcount_test 307 def test_collect_generations(self): 308 gc.collect() 309 # This object will "trickle" into generation N + 1 after 310 # each call to collect(N) 311 x = [] 312 gc.collect(0) 313 # x is now in gen 1 314 a, b, c = gc.get_count() 315 gc.collect(1) 316 # x is now in gen 2 317 d, e, f = gc.get_count() 318 gc.collect(2) 319 # x is now in gen 3 320 g, h, i = gc.get_count() 321 # We don't check a, d, g since their exact values depends on 322 # internal implementation details of the interpreter. 323 self.assertEqual((b, c), (1, 0)) 324 self.assertEqual((e, f), (0, 1)) 325 self.assertEqual((h, i), (0, 0)) 326 327 def test_trashcan(self): 328 class Ouch: 329 n = 0 330 def __del__(self): 331 Ouch.n = Ouch.n + 1 332 if Ouch.n % 17 == 0: 333 gc.collect() 334 335 # "trashcan" is a hack to prevent stack overflow when deallocating 336 # very deeply nested tuples etc. It works in part by abusing the 337 # type pointer and refcount fields, and that can yield horrible 338 # problems when gc tries to traverse the structures. 339 # If this test fails (as it does in 2.0, 2.1 and 2.2), it will 340 # most likely die via segfault. 341 342 # Note: In 2.3 the possibility for compiling without cyclic gc was 343 # removed, and that in turn allows the trashcan mechanism to work 344 # via much simpler means (e.g., it never abuses the type pointer or 345 # refcount fields anymore). Since it's much less likely to cause a 346 # problem now, the various constants in this expensive (we force a lot 347 # of full collections) test are cut back from the 2.2 version. 348 gc.enable() 349 N = 150 350 for count in range(2): 351 t = [] 352 for i in range(N): 353 t = [t, Ouch()] 354 u = [] 355 for i in range(N): 356 u = [u, Ouch()] 357 v = {} 358 for i in range(N): 359 v = {1: v, 2: Ouch()} 360 gc.disable() 361 362 def test_trashcan_threads(self): 363 # Issue #13992: trashcan mechanism should be thread-safe 364 NESTING = 60 365 N_THREADS = 2 366 367 def sleeper_gen(): 368 """A generator that releases the GIL when closed or dealloc'ed.""" 369 try: 370 yield 371 finally: 372 time.sleep(0.000001) 373 374 class C(list): 375 # Appending to a list is atomic, which avoids the use of a lock. 376 inits = [] 377 dels = [] 378 def __init__(self, alist): 379 self[:] = alist 380 C.inits.append(None) 381 def __del__(self): 382 # This __del__ is called by subtype_dealloc(). 383 C.dels.append(None) 384 # `g` will release the GIL when garbage-collected. This 385 # helps assert subtype_dealloc's behaviour when threads 386 # switch in the middle of it. 387 g = sleeper_gen() 388 next(g) 389 # Now that __del__ is finished, subtype_dealloc will proceed 390 # to call list_dealloc, which also uses the trashcan mechanism. 391 392 def make_nested(): 393 """Create a sufficiently nested container object so that the 394 trashcan mechanism is invoked when deallocating it.""" 395 x = C([]) 396 for i in range(NESTING): 397 x = [C([x])] 398 del x 399 400 def run_thread(): 401 """Exercise make_nested() in a loop.""" 402 while not exit: 403 make_nested() 404 405 old_switchinterval = sys.getswitchinterval() 406 sys.setswitchinterval(1e-5) 407 try: 408 exit = [] 409 threads = [] 410 for i in range(N_THREADS): 411 t = threading.Thread(target=run_thread) 412 threads.append(t) 413 with start_threads(threads, lambda: exit.append(1)): 414 time.sleep(1.0) 415 finally: 416 sys.setswitchinterval(old_switchinterval) 417 gc.collect() 418 self.assertEqual(len(C.inits), len(C.dels)) 419 420 def test_boom(self): 421 class Boom: 422 def __getattr__(self, someattribute): 423 del self.attr 424 raise AttributeError 425 426 a = Boom() 427 b = Boom() 428 a.attr = b 429 b.attr = a 430 431 gc.collect() 432 garbagelen = len(gc.garbage) 433 del a, b 434 # a<->b are in a trash cycle now. Collection will invoke 435 # Boom.__getattr__ (to see whether a and b have __del__ methods), and 436 # __getattr__ deletes the internal "attr" attributes as a side effect. 437 # That causes the trash cycle to get reclaimed via refcounts falling to 438 # 0, thus mutating the trash graph as a side effect of merely asking 439 # whether __del__ exists. This used to (before 2.3b1) crash Python. 440 # Now __getattr__ isn't called. 441 self.assertEqual(gc.collect(), 4) 442 self.assertEqual(len(gc.garbage), garbagelen) 443 444 def test_boom2(self): 445 class Boom2: 446 def __init__(self): 447 self.x = 0 448 449 def __getattr__(self, someattribute): 450 self.x += 1 451 if self.x > 1: 452 del self.attr 453 raise AttributeError 454 455 a = Boom2() 456 b = Boom2() 457 a.attr = b 458 b.attr = a 459 460 gc.collect() 461 garbagelen = len(gc.garbage) 462 del a, b 463 # Much like test_boom(), except that __getattr__ doesn't break the 464 # cycle until the second time gc checks for __del__. As of 2.3b1, 465 # there isn't a second time, so this simply cleans up the trash cycle. 466 # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get 467 # reclaimed this way. 468 self.assertEqual(gc.collect(), 4) 469 self.assertEqual(len(gc.garbage), garbagelen) 470 471 def test_boom_new(self): 472 # boom__new and boom2_new are exactly like boom and boom2, except use 473 # new-style classes. 474 475 class Boom_New(object): 476 def __getattr__(self, someattribute): 477 del self.attr 478 raise AttributeError 479 480 a = Boom_New() 481 b = Boom_New() 482 a.attr = b 483 b.attr = a 484 485 gc.collect() 486 garbagelen = len(gc.garbage) 487 del a, b 488 self.assertEqual(gc.collect(), 4) 489 self.assertEqual(len(gc.garbage), garbagelen) 490 491 def test_boom2_new(self): 492 class Boom2_New(object): 493 def __init__(self): 494 self.x = 0 495 496 def __getattr__(self, someattribute): 497 self.x += 1 498 if self.x > 1: 499 del self.attr 500 raise AttributeError 501 502 a = Boom2_New() 503 b = Boom2_New() 504 a.attr = b 505 b.attr = a 506 507 gc.collect() 508 garbagelen = len(gc.garbage) 509 del a, b 510 self.assertEqual(gc.collect(), 4) 511 self.assertEqual(len(gc.garbage), garbagelen) 512 513 def test_get_referents(self): 514 alist = [1, 3, 5] 515 got = gc.get_referents(alist) 516 got.sort() 517 self.assertEqual(got, alist) 518 519 atuple = tuple(alist) 520 got = gc.get_referents(atuple) 521 got.sort() 522 self.assertEqual(got, alist) 523 524 adict = {1: 3, 5: 7} 525 expected = [1, 3, 5, 7] 526 got = gc.get_referents(adict) 527 got.sort() 528 self.assertEqual(got, expected) 529 530 got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0)) 531 got.sort() 532 self.assertEqual(got, [0, 0] + list(range(5))) 533 534 self.assertEqual(gc.get_referents(1, 'a', 4j), []) 535 536 def test_is_tracked(self): 537 # Atomic built-in types are not tracked, user-defined objects and 538 # mutable containers are. 539 # NOTE: types with special optimizations (e.g. tuple) have tests 540 # in their own test files instead. 541 self.assertFalse(gc.is_tracked(None)) 542 self.assertFalse(gc.is_tracked(1)) 543 self.assertFalse(gc.is_tracked(1.0)) 544 self.assertFalse(gc.is_tracked(1.0 + 5.0j)) 545 self.assertFalse(gc.is_tracked(True)) 546 self.assertFalse(gc.is_tracked(False)) 547 self.assertFalse(gc.is_tracked(b"a")) 548 self.assertFalse(gc.is_tracked("a")) 549 self.assertFalse(gc.is_tracked(bytearray(b"a"))) 550 self.assertFalse(gc.is_tracked(type)) 551 self.assertFalse(gc.is_tracked(int)) 552 self.assertFalse(gc.is_tracked(object)) 553 self.assertFalse(gc.is_tracked(object())) 554 555 class UserClass: 556 pass 557 558 class UserInt(int): 559 pass 560 561 # Base class is object; no extra fields. 562 class UserClassSlots: 563 __slots__ = () 564 565 # Base class is fixed size larger than object; no extra fields. 566 class UserFloatSlots(float): 567 __slots__ = () 568 569 # Base class is variable size; no extra fields. 570 class UserIntSlots(int): 571 __slots__ = () 572 573 self.assertTrue(gc.is_tracked(gc)) 574 self.assertTrue(gc.is_tracked(UserClass)) 575 self.assertTrue(gc.is_tracked(UserClass())) 576 self.assertTrue(gc.is_tracked(UserInt())) 577 self.assertTrue(gc.is_tracked([])) 578 self.assertTrue(gc.is_tracked(set())) 579 self.assertFalse(gc.is_tracked(UserClassSlots())) 580 self.assertFalse(gc.is_tracked(UserFloatSlots())) 581 self.assertFalse(gc.is_tracked(UserIntSlots())) 582 583 def test_bug1055820b(self): 584 # Corresponds to temp2b.py in the bug report. 585 586 ouch = [] 587 def callback(ignored): 588 ouch[:] = [wr() for wr in WRs] 589 590 Cs = [C1055820(i) for i in range(2)] 591 WRs = [weakref.ref(c, callback) for c in Cs] 592 c = None 593 594 gc.collect() 595 self.assertEqual(len(ouch), 0) 596 # Make the two instances trash, and collect again. The bug was that 597 # the callback materialized a strong reference to an instance, but gc 598 # cleared the instance's dict anyway. 599 Cs = None 600 gc.collect() 601 self.assertEqual(len(ouch), 2) # else the callbacks didn't run 602 for x in ouch: 603 # If the callback resurrected one of these guys, the instance 604 # would be damaged, with an empty __dict__. 605 self.assertEqual(x, None) 606 607 def test_bug21435(self): 608 # This is a poor test - its only virtue is that it happened to 609 # segfault on Tim's Windows box before the patch for 21435 was 610 # applied. That's a nasty bug relying on specific pieces of cyclic 611 # trash appearing in exactly the right order in finalize_garbage()'s 612 # input list. 613 # But there's no reliable way to force that order from Python code, 614 # so over time chances are good this test won't really be testing much 615 # of anything anymore. Still, if it blows up, there's _some_ 616 # problem ;-) 617 gc.collect() 618 619 class A: 620 pass 621 622 class B: 623 def __init__(self, x): 624 self.x = x 625 626 def __del__(self): 627 self.attr = None 628 629 def do_work(): 630 a = A() 631 b = B(A()) 632 633 a.attr = b 634 b.attr = a 635 636 do_work() 637 gc.collect() # this blows up (bad C pointer) when it fails 638 639 @cpython_only 640 def test_garbage_at_shutdown(self): 641 import subprocess 642 code = """if 1: 643 import gc 644 import _testcapi 645 @_testcapi.with_tp_del 646 class X: 647 def __init__(self, name): 648 self.name = name 649 def __repr__(self): 650 return "<X %%r>" %% self.name 651 def __tp_del__(self): 652 pass 653 654 x = X('first') 655 x.x = x 656 x.y = X('second') 657 del x 658 gc.set_debug(%s) 659 """ 660 def run_command(code): 661 p = subprocess.Popen([sys.executable, "-Wd", "-c", code], 662 stdout=subprocess.PIPE, 663 stderr=subprocess.PIPE) 664 stdout, stderr = p.communicate() 665 p.stdout.close() 666 p.stderr.close() 667 self.assertEqual(p.returncode, 0) 668 self.assertEqual(stdout.strip(), b"") 669 return strip_python_stderr(stderr) 670 671 stderr = run_command(code % "0") 672 self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " 673 b"shutdown; use", stderr) 674 self.assertNotIn(b"<X 'first'>", stderr) 675 # With DEBUG_UNCOLLECTABLE, the garbage list gets printed 676 stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE") 677 self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at " 678 b"shutdown", stderr) 679 self.assertTrue( 680 (b"[<X 'first'>, <X 'second'>]" in stderr) or 681 (b"[<X 'second'>, <X 'first'>]" in stderr), stderr) 682 # With DEBUG_SAVEALL, no additional message should get printed 683 # (because gc.garbage also contains normally reclaimable cyclic 684 # references, and its elements get printed at runtime anyway). 685 stderr = run_command(code % "gc.DEBUG_SAVEALL") 686 self.assertNotIn(b"uncollectable objects at shutdown", stderr) 687 688 @requires_type_collecting 689 def test_gc_main_module_at_shutdown(self): 690 # Create a reference cycle through the __main__ module and check 691 # it gets collected at interpreter shutdown. 692 code = """if 1: 693 class C: 694 def __del__(self): 695 print('__del__ called') 696 l = [C()] 697 l.append(l) 698 """ 699 rc, out, err = assert_python_ok('-c', code) 700 self.assertEqual(out.strip(), b'__del__ called') 701 702 @requires_type_collecting 703 def test_gc_ordinary_module_at_shutdown(self): 704 # Same as above, but with a non-__main__ module. 705 with temp_dir() as script_dir: 706 module = """if 1: 707 class C: 708 def __del__(self): 709 print('__del__ called') 710 l = [C()] 711 l.append(l) 712 """ 713 code = """if 1: 714 import sys 715 sys.path.insert(0, %r) 716 import gctest 717 """ % (script_dir,) 718 make_script(script_dir, 'gctest', module) 719 rc, out, err = assert_python_ok('-c', code) 720 self.assertEqual(out.strip(), b'__del__ called') 721 722 @requires_type_collecting 723 def test_global_del_SystemExit(self): 724 code = """if 1: 725 class ClassWithDel: 726 def __del__(self): 727 print('__del__ called') 728 a = ClassWithDel() 729 a.link = a 730 raise SystemExit(0)""" 731 self.addCleanup(unlink, TESTFN) 732 with open(TESTFN, 'w') as script: 733 script.write(code) 734 rc, out, err = assert_python_ok(TESTFN) 735 self.assertEqual(out.strip(), b'__del__ called') 736 737 def test_get_stats(self): 738 stats = gc.get_stats() 739 self.assertEqual(len(stats), 3) 740 for st in stats: 741 self.assertIsInstance(st, dict) 742 self.assertEqual(set(st), 743 {"collected", "collections", "uncollectable"}) 744 self.assertGreaterEqual(st["collected"], 0) 745 self.assertGreaterEqual(st["collections"], 0) 746 self.assertGreaterEqual(st["uncollectable"], 0) 747 # Check that collection counts are incremented correctly 748 if gc.isenabled(): 749 self.addCleanup(gc.enable) 750 gc.disable() 751 old = gc.get_stats() 752 gc.collect(0) 753 new = gc.get_stats() 754 self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) 755 self.assertEqual(new[1]["collections"], old[1]["collections"]) 756 self.assertEqual(new[2]["collections"], old[2]["collections"]) 757 gc.collect(2) 758 new = gc.get_stats() 759 self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) 760 self.assertEqual(new[1]["collections"], old[1]["collections"]) 761 self.assertEqual(new[2]["collections"], old[2]["collections"] + 1) 762 763 def test_freeze(self): 764 gc.freeze() 765 self.assertGreater(gc.get_freeze_count(), 0) 766 gc.unfreeze() 767 self.assertEqual(gc.get_freeze_count(), 0) 768 769 def test_get_objects(self): 770 gc.collect() 771 l = [] 772 l.append(l) 773 self.assertTrue( 774 any(l is element for element in gc.get_objects(generation=0)) 775 ) 776 self.assertFalse( 777 any(l is element for element in gc.get_objects(generation=1)) 778 ) 779 self.assertFalse( 780 any(l is element for element in gc.get_objects(generation=2)) 781 ) 782 gc.collect(generation=0) 783 self.assertFalse( 784 any(l is element for element in gc.get_objects(generation=0)) 785 ) 786 self.assertTrue( 787 any(l is element for element in gc.get_objects(generation=1)) 788 ) 789 self.assertFalse( 790 any(l is element for element in gc.get_objects(generation=2)) 791 ) 792 gc.collect(generation=1) 793 self.assertFalse( 794 any(l is element for element in gc.get_objects(generation=0)) 795 ) 796 self.assertFalse( 797 any(l is element for element in gc.get_objects(generation=1)) 798 ) 799 self.assertTrue( 800 any(l is element for element in gc.get_objects(generation=2)) 801 ) 802 gc.collect(generation=2) 803 self.assertFalse( 804 any(l is element for element in gc.get_objects(generation=0)) 805 ) 806 self.assertFalse( 807 any(l is element for element in gc.get_objects(generation=1)) 808 ) 809 self.assertTrue( 810 any(l is element for element in gc.get_objects(generation=2)) 811 ) 812 del l 813 gc.collect() 814 815 def test_get_objects_arguments(self): 816 gc.collect() 817 self.assertEqual(len(gc.get_objects()), 818 len(gc.get_objects(generation=None))) 819 820 self.assertRaises(ValueError, gc.get_objects, 1000) 821 self.assertRaises(ValueError, gc.get_objects, -1000) 822 self.assertRaises(TypeError, gc.get_objects, "1") 823 self.assertRaises(TypeError, gc.get_objects, 1.234) 824 825 def test_38379(self): 826 # When a finalizer resurrects objects, stats were reporting them as 827 # having been collected. This affected both collect()'s return 828 # value and the dicts returned by get_stats(). 829 N = 100 830 831 class A: # simple self-loop 832 def __init__(self): 833 self.me = self 834 835 class Z(A): # resurrecting __del__ 836 def __del__(self): 837 zs.append(self) 838 839 zs = [] 840 841 def getstats(): 842 d = gc.get_stats()[-1] 843 return d['collected'], d['uncollectable'] 844 845 gc.collect() 846 gc.disable() 847 848 # No problems if just collecting A() instances. 849 oldc, oldnc = getstats() 850 for i in range(N): 851 A() 852 t = gc.collect() 853 c, nc = getstats() 854 self.assertEqual(t, 2*N) # instance object & its dict 855 self.assertEqual(c - oldc, 2*N) 856 self.assertEqual(nc - oldnc, 0) 857 858 # But Z() is not actually collected. 859 oldc, oldnc = c, nc 860 Z() 861 # Nothing is collected - Z() is merely resurrected. 862 t = gc.collect() 863 c, nc = getstats() 864 #self.assertEqual(t, 2) # before 865 self.assertEqual(t, 0) # after 866 #self.assertEqual(c - oldc, 2) # before 867 self.assertEqual(c - oldc, 0) # after 868 self.assertEqual(nc - oldnc, 0) 869 870 # Unfortunately, a Z() prevents _anything_ from being collected. 871 # It should be possible to collect the A instances anyway, but 872 # that will require non-trivial code changes. 873 oldc, oldnc = c, nc 874 for i in range(N): 875 A() 876 Z() 877 # Z() prevents anything from being collected. 878 t = gc.collect() 879 c, nc = getstats() 880 #self.assertEqual(t, 2*N + 2) # before 881 self.assertEqual(t, 0) # after 882 #self.assertEqual(c - oldc, 2*N + 2) # before 883 self.assertEqual(c - oldc, 0) # after 884 self.assertEqual(nc - oldnc, 0) 885 886 # But the A() trash is reclaimed on the next run. 887 oldc, oldnc = c, nc 888 t = gc.collect() 889 c, nc = getstats() 890 self.assertEqual(t, 2*N) 891 self.assertEqual(c - oldc, 2*N) 892 self.assertEqual(nc - oldnc, 0) 893 894 gc.enable() 895 896class GCCallbackTests(unittest.TestCase): 897 def setUp(self): 898 # Save gc state and disable it. 899 self.enabled = gc.isenabled() 900 gc.disable() 901 self.debug = gc.get_debug() 902 gc.set_debug(0) 903 gc.callbacks.append(self.cb1) 904 gc.callbacks.append(self.cb2) 905 self.othergarbage = [] 906 907 def tearDown(self): 908 # Restore gc state 909 del self.visit 910 gc.callbacks.remove(self.cb1) 911 gc.callbacks.remove(self.cb2) 912 gc.set_debug(self.debug) 913 if self.enabled: 914 gc.enable() 915 # destroy any uncollectables 916 gc.collect() 917 for obj in gc.garbage: 918 if isinstance(obj, Uncollectable): 919 obj.partner = None 920 del gc.garbage[:] 921 del self.othergarbage 922 gc.collect() 923 924 def preclean(self): 925 # Remove all fluff from the system. Invoke this function 926 # manually rather than through self.setUp() for maximum 927 # safety. 928 self.visit = [] 929 gc.collect() 930 garbage, gc.garbage[:] = gc.garbage[:], [] 931 self.othergarbage.append(garbage) 932 self.visit = [] 933 934 def cb1(self, phase, info): 935 self.visit.append((1, phase, dict(info))) 936 937 def cb2(self, phase, info): 938 self.visit.append((2, phase, dict(info))) 939 if phase == "stop" and hasattr(self, "cleanup"): 940 # Clean Uncollectable from garbage 941 uc = [e for e in gc.garbage if isinstance(e, Uncollectable)] 942 gc.garbage[:] = [e for e in gc.garbage 943 if not isinstance(e, Uncollectable)] 944 for e in uc: 945 e.partner = None 946 947 def test_collect(self): 948 self.preclean() 949 gc.collect() 950 # Algorithmically verify the contents of self.visit 951 # because it is long and tortuous. 952 953 # Count the number of visits to each callback 954 n = [v[0] for v in self.visit] 955 n1 = [i for i in n if i == 1] 956 n2 = [i for i in n if i == 2] 957 self.assertEqual(n1, [1]*2) 958 self.assertEqual(n2, [2]*2) 959 960 # Count that we got the right number of start and stop callbacks. 961 n = [v[1] for v in self.visit] 962 n1 = [i for i in n if i == "start"] 963 n2 = [i for i in n if i == "stop"] 964 self.assertEqual(n1, ["start"]*2) 965 self.assertEqual(n2, ["stop"]*2) 966 967 # Check that we got the right info dict for all callbacks 968 for v in self.visit: 969 info = v[2] 970 self.assertTrue("generation" in info) 971 self.assertTrue("collected" in info) 972 self.assertTrue("uncollectable" in info) 973 974 def test_collect_generation(self): 975 self.preclean() 976 gc.collect(2) 977 for v in self.visit: 978 info = v[2] 979 self.assertEqual(info["generation"], 2) 980 981 @cpython_only 982 def test_collect_garbage(self): 983 self.preclean() 984 # Each of these cause four objects to be garbage: Two 985 # Uncollectables and their instance dicts. 986 Uncollectable() 987 Uncollectable() 988 C1055820(666) 989 gc.collect() 990 for v in self.visit: 991 if v[1] != "stop": 992 continue 993 info = v[2] 994 self.assertEqual(info["collected"], 2) 995 self.assertEqual(info["uncollectable"], 8) 996 997 # We should now have the Uncollectables in gc.garbage 998 self.assertEqual(len(gc.garbage), 4) 999 for e in gc.garbage: 1000 self.assertIsInstance(e, Uncollectable) 1001 1002 # Now, let our callback handle the Uncollectable instances 1003 self.cleanup=True 1004 self.visit = [] 1005 gc.garbage[:] = [] 1006 gc.collect() 1007 for v in self.visit: 1008 if v[1] != "stop": 1009 continue 1010 info = v[2] 1011 self.assertEqual(info["collected"], 0) 1012 self.assertEqual(info["uncollectable"], 4) 1013 1014 # Uncollectables should be gone 1015 self.assertEqual(len(gc.garbage), 0) 1016 1017 1018 @unittest.skipIf(BUILD_WITH_NDEBUG, 1019 'built with -NDEBUG') 1020 def test_refcount_errors(self): 1021 self.preclean() 1022 # Verify the "handling" of objects with broken refcounts 1023 1024 # Skip the test if ctypes is not available 1025 import_module("ctypes") 1026 1027 import subprocess 1028 code = textwrap.dedent(''' 1029 from test.support import gc_collect, SuppressCrashReport 1030 1031 a = [1, 2, 3] 1032 b = [a] 1033 1034 # Avoid coredump when Py_FatalError() calls abort() 1035 SuppressCrashReport().__enter__() 1036 1037 # Simulate the refcount of "a" being too low (compared to the 1038 # references held on it by live data), but keeping it above zero 1039 # (to avoid deallocating it): 1040 import ctypes 1041 ctypes.pythonapi.Py_DecRef(ctypes.py_object(a)) 1042 1043 # The garbage collector should now have a fatal error 1044 # when it reaches the broken object 1045 gc_collect() 1046 ''') 1047 p = subprocess.Popen([sys.executable, "-c", code], 1048 stdout=subprocess.PIPE, 1049 stderr=subprocess.PIPE) 1050 stdout, stderr = p.communicate() 1051 p.stdout.close() 1052 p.stderr.close() 1053 # Verify that stderr has a useful error message: 1054 self.assertRegex(stderr, 1055 br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.') 1056 self.assertRegex(stderr, 1057 br'refcount is too small') 1058 # "address : 0x7fb5062efc18" 1059 # "address : 7FB5062EFC18" 1060 address_regex = br'[0-9a-fA-Fx]+' 1061 self.assertRegex(stderr, 1062 br'object address : ' + address_regex) 1063 self.assertRegex(stderr, 1064 br'object refcount : 1') 1065 self.assertRegex(stderr, 1066 br'object type : ' + address_regex) 1067 self.assertRegex(stderr, 1068 br'object type name: list') 1069 self.assertRegex(stderr, 1070 br'object repr : \[1, 2, 3\]') 1071 1072 1073class GCTogglingTests(unittest.TestCase): 1074 def setUp(self): 1075 gc.enable() 1076 1077 def tearDown(self): 1078 gc.disable() 1079 1080 def test_bug1055820c(self): 1081 # Corresponds to temp2c.py in the bug report. This is pretty 1082 # elaborate. 1083 1084 c0 = C1055820(0) 1085 # Move c0 into generation 2. 1086 gc.collect() 1087 1088 c1 = C1055820(1) 1089 c1.keep_c0_alive = c0 1090 del c0.loop # now only c1 keeps c0 alive 1091 1092 c2 = C1055820(2) 1093 c2wr = weakref.ref(c2) # no callback! 1094 1095 ouch = [] 1096 def callback(ignored): 1097 ouch[:] = [c2wr()] 1098 1099 # The callback gets associated with a wr on an object in generation 2. 1100 c0wr = weakref.ref(c0, callback) 1101 1102 c0 = c1 = c2 = None 1103 1104 # What we've set up: c0, c1, and c2 are all trash now. c0 is in 1105 # generation 2. The only thing keeping it alive is that c1 points to 1106 # it. c1 and c2 are in generation 0, and are in self-loops. There's a 1107 # global weakref to c2 (c2wr), but that weakref has no callback. 1108 # There's also a global weakref to c0 (c0wr), and that does have a 1109 # callback, and that callback references c2 via c2wr(). 1110 # 1111 # c0 has a wr with callback, which references c2wr 1112 # ^ 1113 # | 1114 # | Generation 2 above dots 1115 #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . 1116 # | Generation 0 below dots 1117 # | 1118 # | 1119 # ^->c1 ^->c2 has a wr but no callback 1120 # | | | | 1121 # <--v <--v 1122 # 1123 # So this is the nightmare: when generation 0 gets collected, we see 1124 # that c2 has a callback-free weakref, and c1 doesn't even have a 1125 # weakref. Collecting generation 0 doesn't see c0 at all, and c0 is 1126 # the only object that has a weakref with a callback. gc clears c1 1127 # and c2. Clearing c1 has the side effect of dropping the refcount on 1128 # c0 to 0, so c0 goes away (despite that it's in an older generation) 1129 # and c0's wr callback triggers. That in turn materializes a reference 1130 # to c2 via c2wr(), but c2 gets cleared anyway by gc. 1131 1132 # We want to let gc happen "naturally", to preserve the distinction 1133 # between generations. 1134 junk = [] 1135 i = 0 1136 detector = GC_Detector() 1137 while not detector.gc_happened: 1138 i += 1 1139 if i > 10000: 1140 self.fail("gc didn't happen after 10000 iterations") 1141 self.assertEqual(len(ouch), 0) 1142 junk.append([]) # this will eventually trigger gc 1143 1144 self.assertEqual(len(ouch), 1) # else the callback wasn't invoked 1145 for x in ouch: 1146 # If the callback resurrected c2, the instance would be damaged, 1147 # with an empty __dict__. 1148 self.assertEqual(x, None) 1149 1150 def test_bug1055820d(self): 1151 # Corresponds to temp2d.py in the bug report. This is very much like 1152 # test_bug1055820c, but uses a __del__ method instead of a weakref 1153 # callback to sneak in a resurrection of cyclic trash. 1154 1155 ouch = [] 1156 class D(C1055820): 1157 def __del__(self): 1158 ouch[:] = [c2wr()] 1159 1160 d0 = D(0) 1161 # Move all the above into generation 2. 1162 gc.collect() 1163 1164 c1 = C1055820(1) 1165 c1.keep_d0_alive = d0 1166 del d0.loop # now only c1 keeps d0 alive 1167 1168 c2 = C1055820(2) 1169 c2wr = weakref.ref(c2) # no callback! 1170 1171 d0 = c1 = c2 = None 1172 1173 # What we've set up: d0, c1, and c2 are all trash now. d0 is in 1174 # generation 2. The only thing keeping it alive is that c1 points to 1175 # it. c1 and c2 are in generation 0, and are in self-loops. There's 1176 # a global weakref to c2 (c2wr), but that weakref has no callback. 1177 # There are no other weakrefs. 1178 # 1179 # d0 has a __del__ method that references c2wr 1180 # ^ 1181 # | 1182 # | Generation 2 above dots 1183 #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . . 1184 # | Generation 0 below dots 1185 # | 1186 # | 1187 # ^->c1 ^->c2 has a wr but no callback 1188 # | | | | 1189 # <--v <--v 1190 # 1191 # So this is the nightmare: when generation 0 gets collected, we see 1192 # that c2 has a callback-free weakref, and c1 doesn't even have a 1193 # weakref. Collecting generation 0 doesn't see d0 at all. gc clears 1194 # c1 and c2. Clearing c1 has the side effect of dropping the refcount 1195 # on d0 to 0, so d0 goes away (despite that it's in an older 1196 # generation) and d0's __del__ triggers. That in turn materializes 1197 # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc. 1198 1199 # We want to let gc happen "naturally", to preserve the distinction 1200 # between generations. 1201 detector = GC_Detector() 1202 junk = [] 1203 i = 0 1204 while not detector.gc_happened: 1205 i += 1 1206 if i > 10000: 1207 self.fail("gc didn't happen after 10000 iterations") 1208 self.assertEqual(len(ouch), 0) 1209 junk.append([]) # this will eventually trigger gc 1210 1211 self.assertEqual(len(ouch), 1) # else __del__ wasn't invoked 1212 for x in ouch: 1213 # If __del__ resurrected c2, the instance would be damaged, with an 1214 # empty __dict__. 1215 self.assertEqual(x, None) 1216 1217def test_main(): 1218 enabled = gc.isenabled() 1219 gc.disable() 1220 assert not gc.isenabled() 1221 debug = gc.get_debug() 1222 gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak 1223 1224 try: 1225 gc.collect() # Delete 2nd generation garbage 1226 run_unittest(GCTests, GCTogglingTests, GCCallbackTests) 1227 finally: 1228 gc.set_debug(debug) 1229 # test gc.enable() even if GC is disabled by default 1230 if verbose: 1231 print("restoring automatic collection") 1232 # make sure to always test gc.enable() 1233 gc.enable() 1234 assert gc.isenabled() 1235 if not enabled: 1236 gc.disable() 1237 1238if __name__ == "__main__": 1239 test_main() 1240