• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2import unittest.mock
3from test.support import (verbose, refcount_test,
4                          cpython_only)
5from test.support.import_helper import import_module
6from test.support.os_helper import temp_dir, TESTFN, unlink
7from test.support.script_helper import assert_python_ok, make_script
8from test.support import threading_helper
9
10import gc
11import sys
12import sysconfig
13import textwrap
14import threading
15import time
16import weakref
17
18try:
19    from _testcapi import with_tp_del
20except ImportError:
21    def with_tp_del(cls):
22        class C(object):
23            def __new__(cls, *args, **kwargs):
24                raise TypeError('requires _testcapi.with_tp_del')
25        return C
26
27try:
28    from _testcapi import ContainerNoGC
29except ImportError:
30    ContainerNoGC = None
31
32### Support code
33###############################################################################
34
35# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
36# cyclic gc.
37
38# An instance of C1055820 has a self-loop, so becomes cyclic trash when
39# unreachable.
40class C1055820(object):
41    def __init__(self, i):
42        self.i = i
43        self.loop = self
44
45class GC_Detector(object):
46    # Create an instance I.  Then gc hasn't happened again so long as
47    # I.gc_happened is false.
48
49    def __init__(self):
50        self.gc_happened = False
51
52        def it_happened(ignored):
53            self.gc_happened = True
54
55        # Create a piece of cyclic trash that triggers it_happened when
56        # gc collects it.
57        self.wr = weakref.ref(C1055820(666), it_happened)
58
59@with_tp_del
60class Uncollectable(object):
61    """Create a reference cycle with multiple __del__ methods.
62
63    An object in a reference cycle will never have zero references,
64    and so must be garbage collected.  If one or more objects in the
65    cycle have __del__ methods, the gc refuses to guess an order,
66    and leaves the cycle uncollected."""
67    def __init__(self, partner=None):
68        if partner is None:
69            self.partner = Uncollectable(partner=self)
70        else:
71            self.partner = partner
72    def __tp_del__(self):
73        pass
74
75if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
76    BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
77else:
78    # Usually, sys.gettotalrefcount() is only present if Python has been
79    # compiled in debug mode. If it's missing, expect that Python has
80    # been released in release mode: with NDEBUG defined.
81    BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))
82
83### Tests
84###############################################################################
85
86class GCTests(unittest.TestCase):
87    def test_list(self):
88        l = []
89        l.append(l)
90        gc.collect()
91        del l
92        self.assertEqual(gc.collect(), 1)
93
94    def test_dict(self):
95        d = {}
96        d[1] = d
97        gc.collect()
98        del d
99        self.assertEqual(gc.collect(), 1)
100
101    def test_tuple(self):
102        # since tuples are immutable we close the loop with a list
103        l = []
104        t = (l,)
105        l.append(t)
106        gc.collect()
107        del t
108        del l
109        self.assertEqual(gc.collect(), 2)
110
111    def test_class(self):
112        class A:
113            pass
114        A.a = A
115        gc.collect()
116        del A
117        self.assertNotEqual(gc.collect(), 0)
118
119    def test_newstyleclass(self):
120        class A(object):
121            pass
122        gc.collect()
123        del A
124        self.assertNotEqual(gc.collect(), 0)
125
126    def test_instance(self):
127        class A:
128            pass
129        a = A()
130        a.a = a
131        gc.collect()
132        del a
133        self.assertNotEqual(gc.collect(), 0)
134
135    def test_newinstance(self):
136        class A(object):
137            pass
138        a = A()
139        a.a = a
140        gc.collect()
141        del a
142        self.assertNotEqual(gc.collect(), 0)
143        class B(list):
144            pass
145        class C(B, A):
146            pass
147        a = C()
148        a.a = a
149        gc.collect()
150        del a
151        self.assertNotEqual(gc.collect(), 0)
152        del B, C
153        self.assertNotEqual(gc.collect(), 0)
154        A.a = A()
155        del A
156        self.assertNotEqual(gc.collect(), 0)
157        self.assertEqual(gc.collect(), 0)
158
159    def test_method(self):
160        # Tricky: self.__init__ is a bound method, it references the instance.
161        class A:
162            def __init__(self):
163                self.init = self.__init__
164        a = A()
165        gc.collect()
166        del a
167        self.assertNotEqual(gc.collect(), 0)
168
169    @cpython_only
170    def test_legacy_finalizer(self):
171        # A() is uncollectable if it is part of a cycle, make sure it shows up
172        # in gc.garbage.
173        @with_tp_del
174        class A:
175            def __tp_del__(self): pass
176        class B:
177            pass
178        a = A()
179        a.a = a
180        id_a = id(a)
181        b = B()
182        b.b = b
183        gc.collect()
184        del a
185        del b
186        self.assertNotEqual(gc.collect(), 0)
187        for obj in gc.garbage:
188            if id(obj) == id_a:
189                del obj.a
190                break
191        else:
192            self.fail("didn't find obj in garbage (finalizer)")
193        gc.garbage.remove(obj)
194
195    @cpython_only
196    def test_legacy_finalizer_newclass(self):
197        # A() is uncollectable if it is part of a cycle, make sure it shows up
198        # in gc.garbage.
199        @with_tp_del
200        class A(object):
201            def __tp_del__(self): pass
202        class B(object):
203            pass
204        a = A()
205        a.a = a
206        id_a = id(a)
207        b = B()
208        b.b = b
209        gc.collect()
210        del a
211        del b
212        self.assertNotEqual(gc.collect(), 0)
213        for obj in gc.garbage:
214            if id(obj) == id_a:
215                del obj.a
216                break
217        else:
218            self.fail("didn't find obj in garbage (finalizer)")
219        gc.garbage.remove(obj)
220
221    def test_function(self):
222        # Tricky: f -> d -> f, code should call d.clear() after the exec to
223        # break the cycle.
224        d = {}
225        exec("def f(): pass\n", d)
226        gc.collect()
227        del d
228        self.assertEqual(gc.collect(), 2)
229
230    @refcount_test
231    def test_frame(self):
232        def f():
233            frame = sys._getframe()
234        gc.collect()
235        f()
236        self.assertEqual(gc.collect(), 1)
237
238    def test_saveall(self):
239        # Verify that cyclic garbage like lists show up in gc.garbage if the
240        # SAVEALL option is enabled.
241
242        # First make sure we don't save away other stuff that just happens to
243        # be waiting for collection.
244        gc.collect()
245        # if this fails, someone else created immortal trash
246        self.assertEqual(gc.garbage, [])
247
248        L = []
249        L.append(L)
250        id_L = id(L)
251
252        debug = gc.get_debug()
253        gc.set_debug(debug | gc.DEBUG_SAVEALL)
254        del L
255        gc.collect()
256        gc.set_debug(debug)
257
258        self.assertEqual(len(gc.garbage), 1)
259        obj = gc.garbage.pop()
260        self.assertEqual(id(obj), id_L)
261
262    def test_del(self):
263        # __del__ methods can trigger collection, make this to happen
264        thresholds = gc.get_threshold()
265        gc.enable()
266        gc.set_threshold(1)
267
268        class A:
269            def __del__(self):
270                dir(self)
271        a = A()
272        del a
273
274        gc.disable()
275        gc.set_threshold(*thresholds)
276
277    def test_del_newclass(self):
278        # __del__ methods can trigger collection, make this to happen
279        thresholds = gc.get_threshold()
280        gc.enable()
281        gc.set_threshold(1)
282
283        class A(object):
284            def __del__(self):
285                dir(self)
286        a = A()
287        del a
288
289        gc.disable()
290        gc.set_threshold(*thresholds)
291
292    # The following two tests are fragile:
293    # They precisely count the number of allocations,
294    # which is highly implementation-dependent.
295    # For example, disposed tuples are not freed, but reused.
296    # To minimize variations, though, we first store the get_count() results
297    # and check them at the end.
298    @refcount_test
299    def test_get_count(self):
300        gc.collect()
301        a, b, c = gc.get_count()
302        x = []
303        d, e, f = gc.get_count()
304        self.assertEqual((b, c), (0, 0))
305        self.assertEqual((e, f), (0, 0))
306        # This is less fragile than asserting that a equals 0.
307        self.assertLess(a, 5)
308        # Between the two calls to get_count(), at least one object was
309        # created (the list).
310        self.assertGreater(d, a)
311
312    @refcount_test
313    def test_collect_generations(self):
314        gc.collect()
315        # This object will "trickle" into generation N + 1 after
316        # each call to collect(N)
317        x = []
318        gc.collect(0)
319        # x is now in gen 1
320        a, b, c = gc.get_count()
321        gc.collect(1)
322        # x is now in gen 2
323        d, e, f = gc.get_count()
324        gc.collect(2)
325        # x is now in gen 3
326        g, h, i = gc.get_count()
327        # We don't check a, d, g since their exact values depends on
328        # internal implementation details of the interpreter.
329        self.assertEqual((b, c), (1, 0))
330        self.assertEqual((e, f), (0, 1))
331        self.assertEqual((h, i), (0, 0))
332
333    def test_trashcan(self):
334        class Ouch:
335            n = 0
336            def __del__(self):
337                Ouch.n = Ouch.n + 1
338                if Ouch.n % 17 == 0:
339                    gc.collect()
340
341        # "trashcan" is a hack to prevent stack overflow when deallocating
342        # very deeply nested tuples etc.  It works in part by abusing the
343        # type pointer and refcount fields, and that can yield horrible
344        # problems when gc tries to traverse the structures.
345        # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
346        # most likely die via segfault.
347
348        # Note:  In 2.3 the possibility for compiling without cyclic gc was
349        # removed, and that in turn allows the trashcan mechanism to work
350        # via much simpler means (e.g., it never abuses the type pointer or
351        # refcount fields anymore).  Since it's much less likely to cause a
352        # problem now, the various constants in this expensive (we force a lot
353        # of full collections) test are cut back from the 2.2 version.
354        gc.enable()
355        N = 150
356        for count in range(2):
357            t = []
358            for i in range(N):
359                t = [t, Ouch()]
360            u = []
361            for i in range(N):
362                u = [u, Ouch()]
363            v = {}
364            for i in range(N):
365                v = {1: v, 2: Ouch()}
366        gc.disable()
367
368    def test_trashcan_threads(self):
369        # Issue #13992: trashcan mechanism should be thread-safe
370        NESTING = 60
371        N_THREADS = 2
372
373        def sleeper_gen():
374            """A generator that releases the GIL when closed or dealloc'ed."""
375            try:
376                yield
377            finally:
378                time.sleep(0.000001)
379
380        class C(list):
381            # Appending to a list is atomic, which avoids the use of a lock.
382            inits = []
383            dels = []
384            def __init__(self, alist):
385                self[:] = alist
386                C.inits.append(None)
387            def __del__(self):
388                # This __del__ is called by subtype_dealloc().
389                C.dels.append(None)
390                # `g` will release the GIL when garbage-collected.  This
391                # helps assert subtype_dealloc's behaviour when threads
392                # switch in the middle of it.
393                g = sleeper_gen()
394                next(g)
395                # Now that __del__ is finished, subtype_dealloc will proceed
396                # to call list_dealloc, which also uses the trashcan mechanism.
397
398        def make_nested():
399            """Create a sufficiently nested container object so that the
400            trashcan mechanism is invoked when deallocating it."""
401            x = C([])
402            for i in range(NESTING):
403                x = [C([x])]
404            del x
405
406        def run_thread():
407            """Exercise make_nested() in a loop."""
408            while not exit:
409                make_nested()
410
411        old_switchinterval = sys.getswitchinterval()
412        sys.setswitchinterval(1e-5)
413        try:
414            exit = []
415            threads = []
416            for i in range(N_THREADS):
417                t = threading.Thread(target=run_thread)
418                threads.append(t)
419            with threading_helper.start_threads(threads, lambda: exit.append(1)):
420                time.sleep(1.0)
421        finally:
422            sys.setswitchinterval(old_switchinterval)
423        gc.collect()
424        self.assertEqual(len(C.inits), len(C.dels))
425
426    def test_boom(self):
427        class Boom:
428            def __getattr__(self, someattribute):
429                del self.attr
430                raise AttributeError
431
432        a = Boom()
433        b = Boom()
434        a.attr = b
435        b.attr = a
436
437        gc.collect()
438        garbagelen = len(gc.garbage)
439        del a, b
440        # a<->b are in a trash cycle now.  Collection will invoke
441        # Boom.__getattr__ (to see whether a and b have __del__ methods), and
442        # __getattr__ deletes the internal "attr" attributes as a side effect.
443        # That causes the trash cycle to get reclaimed via refcounts falling to
444        # 0, thus mutating the trash graph as a side effect of merely asking
445        # whether __del__ exists.  This used to (before 2.3b1) crash Python.
446        # Now __getattr__ isn't called.
447        self.assertEqual(gc.collect(), 4)
448        self.assertEqual(len(gc.garbage), garbagelen)
449
450    def test_boom2(self):
451        class Boom2:
452            def __init__(self):
453                self.x = 0
454
455            def __getattr__(self, someattribute):
456                self.x += 1
457                if self.x > 1:
458                    del self.attr
459                raise AttributeError
460
461        a = Boom2()
462        b = Boom2()
463        a.attr = b
464        b.attr = a
465
466        gc.collect()
467        garbagelen = len(gc.garbage)
468        del a, b
469        # Much like test_boom(), except that __getattr__ doesn't break the
470        # cycle until the second time gc checks for __del__.  As of 2.3b1,
471        # there isn't a second time, so this simply cleans up the trash cycle.
472        # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
473        # reclaimed this way.
474        self.assertEqual(gc.collect(), 4)
475        self.assertEqual(len(gc.garbage), garbagelen)
476
477    def test_boom_new(self):
478        # boom__new and boom2_new are exactly like boom and boom2, except use
479        # new-style classes.
480
481        class Boom_New(object):
482            def __getattr__(self, someattribute):
483                del self.attr
484                raise AttributeError
485
486        a = Boom_New()
487        b = Boom_New()
488        a.attr = b
489        b.attr = a
490
491        gc.collect()
492        garbagelen = len(gc.garbage)
493        del a, b
494        self.assertEqual(gc.collect(), 4)
495        self.assertEqual(len(gc.garbage), garbagelen)
496
497    def test_boom2_new(self):
498        class Boom2_New(object):
499            def __init__(self):
500                self.x = 0
501
502            def __getattr__(self, someattribute):
503                self.x += 1
504                if self.x > 1:
505                    del self.attr
506                raise AttributeError
507
508        a = Boom2_New()
509        b = Boom2_New()
510        a.attr = b
511        b.attr = a
512
513        gc.collect()
514        garbagelen = len(gc.garbage)
515        del a, b
516        self.assertEqual(gc.collect(), 4)
517        self.assertEqual(len(gc.garbage), garbagelen)
518
519    def test_get_referents(self):
520        alist = [1, 3, 5]
521        got = gc.get_referents(alist)
522        got.sort()
523        self.assertEqual(got, alist)
524
525        atuple = tuple(alist)
526        got = gc.get_referents(atuple)
527        got.sort()
528        self.assertEqual(got, alist)
529
530        adict = {1: 3, 5: 7}
531        expected = [1, 3, 5, 7]
532        got = gc.get_referents(adict)
533        got.sort()
534        self.assertEqual(got, expected)
535
536        got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
537        got.sort()
538        self.assertEqual(got, [0, 0] + list(range(5)))
539
540        self.assertEqual(gc.get_referents(1, 'a', 4j), [])
541
542    def test_is_tracked(self):
543        # Atomic built-in types are not tracked, user-defined objects and
544        # mutable containers are.
545        # NOTE: types with special optimizations (e.g. tuple) have tests
546        # in their own test files instead.
547        self.assertFalse(gc.is_tracked(None))
548        self.assertFalse(gc.is_tracked(1))
549        self.assertFalse(gc.is_tracked(1.0))
550        self.assertFalse(gc.is_tracked(1.0 + 5.0j))
551        self.assertFalse(gc.is_tracked(True))
552        self.assertFalse(gc.is_tracked(False))
553        self.assertFalse(gc.is_tracked(b"a"))
554        self.assertFalse(gc.is_tracked("a"))
555        self.assertFalse(gc.is_tracked(bytearray(b"a")))
556        self.assertFalse(gc.is_tracked(type))
557        self.assertFalse(gc.is_tracked(int))
558        self.assertFalse(gc.is_tracked(object))
559        self.assertFalse(gc.is_tracked(object()))
560
561        class UserClass:
562            pass
563
564        class UserInt(int):
565            pass
566
567        # Base class is object; no extra fields.
568        class UserClassSlots:
569            __slots__ = ()
570
571        # Base class is fixed size larger than object; no extra fields.
572        class UserFloatSlots(float):
573            __slots__ = ()
574
575        # Base class is variable size; no extra fields.
576        class UserIntSlots(int):
577            __slots__ = ()
578
579        self.assertTrue(gc.is_tracked(gc))
580        self.assertTrue(gc.is_tracked(UserClass))
581        self.assertTrue(gc.is_tracked(UserClass()))
582        self.assertTrue(gc.is_tracked(UserInt()))
583        self.assertTrue(gc.is_tracked([]))
584        self.assertTrue(gc.is_tracked(set()))
585        self.assertTrue(gc.is_tracked(UserClassSlots()))
586        self.assertTrue(gc.is_tracked(UserFloatSlots()))
587        self.assertTrue(gc.is_tracked(UserIntSlots()))
588
589    def test_is_finalized(self):
590        # Objects not tracked by the always gc return false
591        self.assertFalse(gc.is_finalized(3))
592
593        storage = []
594        class Lazarus:
595            def __del__(self):
596                storage.append(self)
597
598        lazarus = Lazarus()
599        self.assertFalse(gc.is_finalized(lazarus))
600
601        del lazarus
602        gc.collect()
603
604        lazarus = storage.pop()
605        self.assertTrue(gc.is_finalized(lazarus))
606
607    def test_bug1055820b(self):
608        # Corresponds to temp2b.py in the bug report.
609
610        ouch = []
611        def callback(ignored):
612            ouch[:] = [wr() for wr in WRs]
613
614        Cs = [C1055820(i) for i in range(2)]
615        WRs = [weakref.ref(c, callback) for c in Cs]
616        c = None
617
618        gc.collect()
619        self.assertEqual(len(ouch), 0)
620        # Make the two instances trash, and collect again.  The bug was that
621        # the callback materialized a strong reference to an instance, but gc
622        # cleared the instance's dict anyway.
623        Cs = None
624        gc.collect()
625        self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
626        for x in ouch:
627            # If the callback resurrected one of these guys, the instance
628            # would be damaged, with an empty __dict__.
629            self.assertEqual(x, None)
630
631    def test_bug21435(self):
632        # This is a poor test - its only virtue is that it happened to
633        # segfault on Tim's Windows box before the patch for 21435 was
634        # applied.  That's a nasty bug relying on specific pieces of cyclic
635        # trash appearing in exactly the right order in finalize_garbage()'s
636        # input list.
637        # But there's no reliable way to force that order from Python code,
638        # so over time chances are good this test won't really be testing much
639        # of anything anymore.  Still, if it blows up, there's _some_
640        # problem ;-)
641        gc.collect()
642
643        class A:
644            pass
645
646        class B:
647            def __init__(self, x):
648                self.x = x
649
650            def __del__(self):
651                self.attr = None
652
653        def do_work():
654            a = A()
655            b = B(A())
656
657            a.attr = b
658            b.attr = a
659
660        do_work()
661        gc.collect() # this blows up (bad C pointer) when it fails
662
663    @cpython_only
664    def test_garbage_at_shutdown(self):
665        import subprocess
666        code = """if 1:
667            import gc
668            import _testcapi
669            @_testcapi.with_tp_del
670            class X:
671                def __init__(self, name):
672                    self.name = name
673                def __repr__(self):
674                    return "<X %%r>" %% self.name
675                def __tp_del__(self):
676                    pass
677
678            x = X('first')
679            x.x = x
680            x.y = X('second')
681            del x
682            gc.set_debug(%s)
683        """
684        def run_command(code):
685            p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
686                stdout=subprocess.PIPE,
687                stderr=subprocess.PIPE)
688            stdout, stderr = p.communicate()
689            p.stdout.close()
690            p.stderr.close()
691            self.assertEqual(p.returncode, 0)
692            self.assertEqual(stdout, b"")
693            return stderr
694
695        stderr = run_command(code % "0")
696        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
697                      b"shutdown; use", stderr)
698        self.assertNotIn(b"<X 'first'>", stderr)
699        # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
700        stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
701        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
702                      b"shutdown", stderr)
703        self.assertTrue(
704            (b"[<X 'first'>, <X 'second'>]" in stderr) or
705            (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
706        # With DEBUG_SAVEALL, no additional message should get printed
707        # (because gc.garbage also contains normally reclaimable cyclic
708        # references, and its elements get printed at runtime anyway).
709        stderr = run_command(code % "gc.DEBUG_SAVEALL")
710        self.assertNotIn(b"uncollectable objects at shutdown", stderr)
711
712    def test_gc_main_module_at_shutdown(self):
713        # Create a reference cycle through the __main__ module and check
714        # it gets collected at interpreter shutdown.
715        code = """if 1:
716            class C:
717                def __del__(self):
718                    print('__del__ called')
719            l = [C()]
720            l.append(l)
721            """
722        rc, out, err = assert_python_ok('-c', code)
723        self.assertEqual(out.strip(), b'__del__ called')
724
725    def test_gc_ordinary_module_at_shutdown(self):
726        # Same as above, but with a non-__main__ module.
727        with temp_dir() as script_dir:
728            module = """if 1:
729                class C:
730                    def __del__(self):
731                        print('__del__ called')
732                l = [C()]
733                l.append(l)
734                """
735            code = """if 1:
736                import sys
737                sys.path.insert(0, %r)
738                import gctest
739                """ % (script_dir,)
740            make_script(script_dir, 'gctest', module)
741            rc, out, err = assert_python_ok('-c', code)
742            self.assertEqual(out.strip(), b'__del__ called')
743
744    def test_global_del_SystemExit(self):
745        code = """if 1:
746            class ClassWithDel:
747                def __del__(self):
748                    print('__del__ called')
749            a = ClassWithDel()
750            a.link = a
751            raise SystemExit(0)"""
752        self.addCleanup(unlink, TESTFN)
753        with open(TESTFN, 'w', encoding="utf-8") as script:
754            script.write(code)
755        rc, out, err = assert_python_ok(TESTFN)
756        self.assertEqual(out.strip(), b'__del__ called')
757
758    def test_get_stats(self):
759        stats = gc.get_stats()
760        self.assertEqual(len(stats), 3)
761        for st in stats:
762            self.assertIsInstance(st, dict)
763            self.assertEqual(set(st),
764                             {"collected", "collections", "uncollectable"})
765            self.assertGreaterEqual(st["collected"], 0)
766            self.assertGreaterEqual(st["collections"], 0)
767            self.assertGreaterEqual(st["uncollectable"], 0)
768        # Check that collection counts are incremented correctly
769        if gc.isenabled():
770            self.addCleanup(gc.enable)
771            gc.disable()
772        old = gc.get_stats()
773        gc.collect(0)
774        new = gc.get_stats()
775        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
776        self.assertEqual(new[1]["collections"], old[1]["collections"])
777        self.assertEqual(new[2]["collections"], old[2]["collections"])
778        gc.collect(2)
779        new = gc.get_stats()
780        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
781        self.assertEqual(new[1]["collections"], old[1]["collections"])
782        self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
783
784    def test_freeze(self):
785        gc.freeze()
786        self.assertGreater(gc.get_freeze_count(), 0)
787        gc.unfreeze()
788        self.assertEqual(gc.get_freeze_count(), 0)
789
790    def test_get_objects(self):
791        gc.collect()
792        l = []
793        l.append(l)
794        self.assertTrue(
795                any(l is element for element in gc.get_objects(generation=0))
796        )
797        self.assertFalse(
798                any(l is element for element in  gc.get_objects(generation=1))
799        )
800        self.assertFalse(
801                any(l is element for element in gc.get_objects(generation=2))
802        )
803        gc.collect(generation=0)
804        self.assertFalse(
805                any(l is element for element in gc.get_objects(generation=0))
806        )
807        self.assertTrue(
808                any(l is element for element in  gc.get_objects(generation=1))
809        )
810        self.assertFalse(
811                any(l is element for element in gc.get_objects(generation=2))
812        )
813        gc.collect(generation=1)
814        self.assertFalse(
815                any(l is element for element in gc.get_objects(generation=0))
816        )
817        self.assertFalse(
818                any(l is element for element in  gc.get_objects(generation=1))
819        )
820        self.assertTrue(
821                any(l is element for element in gc.get_objects(generation=2))
822        )
823        gc.collect(generation=2)
824        self.assertFalse(
825                any(l is element for element in gc.get_objects(generation=0))
826        )
827        self.assertFalse(
828                any(l is element for element in  gc.get_objects(generation=1))
829        )
830        self.assertTrue(
831                any(l is element for element in gc.get_objects(generation=2))
832        )
833        del l
834        gc.collect()
835
836    def test_get_objects_arguments(self):
837        gc.collect()
838        self.assertEqual(len(gc.get_objects()),
839                         len(gc.get_objects(generation=None)))
840
841        self.assertRaises(ValueError, gc.get_objects, 1000)
842        self.assertRaises(ValueError, gc.get_objects, -1000)
843        self.assertRaises(TypeError, gc.get_objects, "1")
844        self.assertRaises(TypeError, gc.get_objects, 1.234)
845
846    def test_resurrection_only_happens_once_per_object(self):
847        class A:  # simple self-loop
848            def __init__(self):
849                self.me = self
850
851        class Lazarus(A):
852            resurrected = 0
853            resurrected_instances = []
854
855            def __del__(self):
856                Lazarus.resurrected += 1
857                Lazarus.resurrected_instances.append(self)
858
859        gc.collect()
860        gc.disable()
861
862        # We start with 0 resurrections
863        laz = Lazarus()
864        self.assertEqual(Lazarus.resurrected, 0)
865
866        # Deleting the instance and triggering a collection
867        # resurrects the object
868        del laz
869        gc.collect()
870        self.assertEqual(Lazarus.resurrected, 1)
871        self.assertEqual(len(Lazarus.resurrected_instances), 1)
872
873        # Clearing the references and forcing a collection
874        # should not resurrect the object again.
875        Lazarus.resurrected_instances.clear()
876        self.assertEqual(Lazarus.resurrected, 1)
877        gc.collect()
878        self.assertEqual(Lazarus.resurrected, 1)
879
880        gc.enable()
881
882    def test_resurrection_is_transitive(self):
883        class Cargo:
884            def __init__(self):
885                self.me = self
886
887        class Lazarus:
888            resurrected_instances = []
889
890            def __del__(self):
891                Lazarus.resurrected_instances.append(self)
892
893        gc.collect()
894        gc.disable()
895
896        laz = Lazarus()
897        cargo = Cargo()
898        cargo_id = id(cargo)
899
900        # Create a cycle between cargo and laz
901        laz.cargo = cargo
902        cargo.laz = laz
903
904        # Drop the references, force a collection and check that
905        # everything was resurrected.
906        del laz, cargo
907        gc.collect()
908        self.assertEqual(len(Lazarus.resurrected_instances), 1)
909        instance = Lazarus.resurrected_instances.pop()
910        self.assertTrue(hasattr(instance, "cargo"))
911        self.assertEqual(id(instance.cargo), cargo_id)
912
913        gc.collect()
914        gc.enable()
915
916    def test_resurrection_does_not_block_cleanup_of_other_objects(self):
917
918        # When a finalizer resurrects objects, stats were reporting them as
919        # having been collected.  This affected both collect()'s return
920        # value and the dicts returned by get_stats().
921        N = 100
922
923        class A:  # simple self-loop
924            def __init__(self):
925                self.me = self
926
927        class Z(A):  # resurrecting __del__
928            def __del__(self):
929                zs.append(self)
930
931        zs = []
932
933        def getstats():
934            d = gc.get_stats()[-1]
935            return d['collected'], d['uncollectable']
936
937        gc.collect()
938        gc.disable()
939
940        # No problems if just collecting A() instances.
941        oldc, oldnc = getstats()
942        for i in range(N):
943            A()
944        t = gc.collect()
945        c, nc = getstats()
946        self.assertEqual(t, 2*N) # instance object & its dict
947        self.assertEqual(c - oldc, 2*N)
948        self.assertEqual(nc - oldnc, 0)
949
950        # But Z() is not actually collected.
951        oldc, oldnc = c, nc
952        Z()
953        # Nothing is collected - Z() is merely resurrected.
954        t = gc.collect()
955        c, nc = getstats()
956        self.assertEqual(t, 0)
957        self.assertEqual(c - oldc, 0)
958        self.assertEqual(nc - oldnc, 0)
959
960        # Z() should not prevent anything else from being collected.
961        oldc, oldnc = c, nc
962        for i in range(N):
963            A()
964        Z()
965        t = gc.collect()
966        c, nc = getstats()
967        self.assertEqual(t, 2*N)
968        self.assertEqual(c - oldc, 2*N)
969        self.assertEqual(nc - oldnc, 0)
970
971        # The A() trash should have been reclaimed already but the
972        # 2 copies of Z are still in zs (and the associated dicts).
973        oldc, oldnc = c, nc
974        zs.clear()
975        t = gc.collect()
976        c, nc = getstats()
977        self.assertEqual(t, 4)
978        self.assertEqual(c - oldc, 4)
979        self.assertEqual(nc - oldnc, 0)
980
981        gc.enable()
982
983    @unittest.skipIf(ContainerNoGC is None,
984                     'requires ContainerNoGC extension type')
985    def test_trash_weakref_clear(self):
986        # Test that trash weakrefs are properly cleared (bpo-38006).
987        #
988        # Structure we are creating:
989        #
990        #   Z <- Y <- A--+--> WZ -> C
991        #             ^  |
992        #             +--+
993        # where:
994        #   WZ is a weakref to Z with callback C
995        #   Y doesn't implement tp_traverse
996        #   A contains a reference to itself, Y and WZ
997        #
998        # A, Y, Z, WZ are all trash.  The GC doesn't know that Z is trash
999        # because Y does not implement tp_traverse.  To show the bug, WZ needs
1000        # to live long enough so that Z is deallocated before it.  Then, if
1001        # gcmodule is buggy, when Z is being deallocated, C will run.
1002        #
1003        # To ensure WZ lives long enough, we put it in a second reference
1004        # cycle.  That trick only works due to the ordering of the GC prev/next
1005        # linked lists.  So, this test is a bit fragile.
1006        #
1007        # The bug reported in bpo-38006 is caused because the GC did not
1008        # clear WZ before starting the process of calling tp_clear on the
1009        # trash.  Normally, handle_weakrefs() would find the weakref via Z and
1010        # clear it.  However, since the GC cannot find Z, WR is not cleared and
1011        # it can execute during delete_garbage().  That can lead to disaster
1012        # since the callback might tinker with objects that have already had
1013        # tp_clear called on them (leaving them in possibly invalid states).
1014
1015        callback = unittest.mock.Mock()
1016
1017        class A:
1018            __slots__ = ['a', 'y', 'wz']
1019
1020        class Z:
1021            pass
1022
1023        # setup required object graph, as described above
1024        a = A()
1025        a.a = a
1026        a.y = ContainerNoGC(Z())
1027        a.wz = weakref.ref(a.y.value, callback)
1028        # create second cycle to keep WZ alive longer
1029        wr_cycle = [a.wz]
1030        wr_cycle.append(wr_cycle)
1031        # ensure trash unrelated to this test is gone
1032        gc.collect()
1033        gc.disable()
1034        # release references and create trash
1035        del a, wr_cycle
1036        gc.collect()
1037        # if called, it means there is a bug in the GC.  The weakref should be
1038        # cleared before Z dies.
1039        callback.assert_not_called()
1040        gc.enable()
1041
1042
1043class GCCallbackTests(unittest.TestCase):
1044    def setUp(self):
1045        # Save gc state and disable it.
1046        self.enabled = gc.isenabled()
1047        gc.disable()
1048        self.debug = gc.get_debug()
1049        gc.set_debug(0)
1050        gc.callbacks.append(self.cb1)
1051        gc.callbacks.append(self.cb2)
1052        self.othergarbage = []
1053
1054    def tearDown(self):
1055        # Restore gc state
1056        del self.visit
1057        gc.callbacks.remove(self.cb1)
1058        gc.callbacks.remove(self.cb2)
1059        gc.set_debug(self.debug)
1060        if self.enabled:
1061            gc.enable()
1062        # destroy any uncollectables
1063        gc.collect()
1064        for obj in gc.garbage:
1065            if isinstance(obj, Uncollectable):
1066                obj.partner = None
1067        del gc.garbage[:]
1068        del self.othergarbage
1069        gc.collect()
1070
1071    def preclean(self):
1072        # Remove all fluff from the system.  Invoke this function
1073        # manually rather than through self.setUp() for maximum
1074        # safety.
1075        self.visit = []
1076        gc.collect()
1077        garbage, gc.garbage[:] = gc.garbage[:], []
1078        self.othergarbage.append(garbage)
1079        self.visit = []
1080
1081    def cb1(self, phase, info):
1082        self.visit.append((1, phase, dict(info)))
1083
1084    def cb2(self, phase, info):
1085        self.visit.append((2, phase, dict(info)))
1086        if phase == "stop" and hasattr(self, "cleanup"):
1087            # Clean Uncollectable from garbage
1088            uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
1089            gc.garbage[:] = [e for e in gc.garbage
1090                             if not isinstance(e, Uncollectable)]
1091            for e in uc:
1092                e.partner = None
1093
1094    def test_collect(self):
1095        self.preclean()
1096        gc.collect()
1097        # Algorithmically verify the contents of self.visit
1098        # because it is long and tortuous.
1099
1100        # Count the number of visits to each callback
1101        n = [v[0] for v in self.visit]
1102        n1 = [i for i in n if i == 1]
1103        n2 = [i for i in n if i == 2]
1104        self.assertEqual(n1, [1]*2)
1105        self.assertEqual(n2, [2]*2)
1106
1107        # Count that we got the right number of start and stop callbacks.
1108        n = [v[1] for v in self.visit]
1109        n1 = [i for i in n if i == "start"]
1110        n2 = [i for i in n if i == "stop"]
1111        self.assertEqual(n1, ["start"]*2)
1112        self.assertEqual(n2, ["stop"]*2)
1113
1114        # Check that we got the right info dict for all callbacks
1115        for v in self.visit:
1116            info = v[2]
1117            self.assertTrue("generation" in info)
1118            self.assertTrue("collected" in info)
1119            self.assertTrue("uncollectable" in info)
1120
1121    def test_collect_generation(self):
1122        self.preclean()
1123        gc.collect(2)
1124        for v in self.visit:
1125            info = v[2]
1126            self.assertEqual(info["generation"], 2)
1127
1128    @cpython_only
1129    def test_collect_garbage(self):
1130        self.preclean()
1131        # Each of these cause four objects to be garbage: Two
1132        # Uncollectables and their instance dicts.
1133        Uncollectable()
1134        Uncollectable()
1135        C1055820(666)
1136        gc.collect()
1137        for v in self.visit:
1138            if v[1] != "stop":
1139                continue
1140            info = v[2]
1141            self.assertEqual(info["collected"], 2)
1142            self.assertEqual(info["uncollectable"], 8)
1143
1144        # We should now have the Uncollectables in gc.garbage
1145        self.assertEqual(len(gc.garbage), 4)
1146        for e in gc.garbage:
1147            self.assertIsInstance(e, Uncollectable)
1148
1149        # Now, let our callback handle the Uncollectable instances
1150        self.cleanup=True
1151        self.visit = []
1152        gc.garbage[:] = []
1153        gc.collect()
1154        for v in self.visit:
1155            if v[1] != "stop":
1156                continue
1157            info = v[2]
1158            self.assertEqual(info["collected"], 0)
1159            self.assertEqual(info["uncollectable"], 4)
1160
1161        # Uncollectables should be gone
1162        self.assertEqual(len(gc.garbage), 0)
1163
1164
1165    @unittest.skipIf(BUILD_WITH_NDEBUG,
1166                     'built with -NDEBUG')
1167    def test_refcount_errors(self):
1168        self.preclean()
1169        # Verify the "handling" of objects with broken refcounts
1170
1171        # Skip the test if ctypes is not available
1172        import_module("ctypes")
1173
1174        import subprocess
1175        code = textwrap.dedent('''
1176            from test.support import gc_collect, SuppressCrashReport
1177
1178            a = [1, 2, 3]
1179            b = [a]
1180
1181            # Avoid coredump when Py_FatalError() calls abort()
1182            SuppressCrashReport().__enter__()
1183
1184            # Simulate the refcount of "a" being too low (compared to the
1185            # references held on it by live data), but keeping it above zero
1186            # (to avoid deallocating it):
1187            import ctypes
1188            ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
1189
1190            # The garbage collector should now have a fatal error
1191            # when it reaches the broken object
1192            gc_collect()
1193        ''')
1194        p = subprocess.Popen([sys.executable, "-c", code],
1195                             stdout=subprocess.PIPE,
1196                             stderr=subprocess.PIPE)
1197        stdout, stderr = p.communicate()
1198        p.stdout.close()
1199        p.stderr.close()
1200        # Verify that stderr has a useful error message:
1201        self.assertRegex(stderr,
1202            br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.')
1203        self.assertRegex(stderr,
1204            br'refcount is too small')
1205        # "address : 0x7fb5062efc18"
1206        # "address : 7FB5062EFC18"
1207        address_regex = br'[0-9a-fA-Fx]+'
1208        self.assertRegex(stderr,
1209            br'object address  : ' + address_regex)
1210        self.assertRegex(stderr,
1211            br'object refcount : 1')
1212        self.assertRegex(stderr,
1213            br'object type     : ' + address_regex)
1214        self.assertRegex(stderr,
1215            br'object type name: list')
1216        self.assertRegex(stderr,
1217            br'object repr     : \[1, 2, 3\]')
1218
1219
1220class GCTogglingTests(unittest.TestCase):
1221    def setUp(self):
1222        gc.enable()
1223
1224    def tearDown(self):
1225        gc.disable()
1226
1227    def test_bug1055820c(self):
1228        # Corresponds to temp2c.py in the bug report.  This is pretty
1229        # elaborate.
1230
1231        c0 = C1055820(0)
1232        # Move c0 into generation 2.
1233        gc.collect()
1234
1235        c1 = C1055820(1)
1236        c1.keep_c0_alive = c0
1237        del c0.loop # now only c1 keeps c0 alive
1238
1239        c2 = C1055820(2)
1240        c2wr = weakref.ref(c2) # no callback!
1241
1242        ouch = []
1243        def callback(ignored):
1244            ouch[:] = [c2wr()]
1245
1246        # The callback gets associated with a wr on an object in generation 2.
1247        c0wr = weakref.ref(c0, callback)
1248
1249        c0 = c1 = c2 = None
1250
1251        # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
1252        # generation 2.  The only thing keeping it alive is that c1 points to
1253        # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
1254        # global weakref to c2 (c2wr), but that weakref has no callback.
1255        # There's also a global weakref to c0 (c0wr), and that does have a
1256        # callback, and that callback references c2 via c2wr().
1257        #
1258        #               c0 has a wr with callback, which references c2wr
1259        #               ^
1260        #               |
1261        #               |     Generation 2 above dots
1262        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1263        #               |     Generation 0 below dots
1264        #               |
1265        #               |
1266        #            ^->c1   ^->c2 has a wr but no callback
1267        #            |  |    |  |
1268        #            <--v    <--v
1269        #
1270        # So this is the nightmare:  when generation 0 gets collected, we see
1271        # that c2 has a callback-free weakref, and c1 doesn't even have a
1272        # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
1273        # the only object that has a weakref with a callback.  gc clears c1
1274        # and c2.  Clearing c1 has the side effect of dropping the refcount on
1275        # c0 to 0, so c0 goes away (despite that it's in an older generation)
1276        # and c0's wr callback triggers.  That in turn materializes a reference
1277        # to c2 via c2wr(), but c2 gets cleared anyway by gc.
1278
1279        # We want to let gc happen "naturally", to preserve the distinction
1280        # between generations.
1281        junk = []
1282        i = 0
1283        detector = GC_Detector()
1284        while not detector.gc_happened:
1285            i += 1
1286            if i > 10000:
1287                self.fail("gc didn't happen after 10000 iterations")
1288            self.assertEqual(len(ouch), 0)
1289            junk.append([])  # this will eventually trigger gc
1290
1291        self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
1292        for x in ouch:
1293            # If the callback resurrected c2, the instance would be damaged,
1294            # with an empty __dict__.
1295            self.assertEqual(x, None)
1296
1297    def test_bug1055820d(self):
1298        # Corresponds to temp2d.py in the bug report.  This is very much like
1299        # test_bug1055820c, but uses a __del__ method instead of a weakref
1300        # callback to sneak in a resurrection of cyclic trash.
1301
1302        ouch = []
1303        class D(C1055820):
1304            def __del__(self):
1305                ouch[:] = [c2wr()]
1306
1307        d0 = D(0)
1308        # Move all the above into generation 2.
1309        gc.collect()
1310
1311        c1 = C1055820(1)
1312        c1.keep_d0_alive = d0
1313        del d0.loop # now only c1 keeps d0 alive
1314
1315        c2 = C1055820(2)
1316        c2wr = weakref.ref(c2) # no callback!
1317
1318        d0 = c1 = c2 = None
1319
1320        # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
1321        # generation 2.  The only thing keeping it alive is that c1 points to
1322        # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
1323        # a global weakref to c2 (c2wr), but that weakref has no callback.
1324        # There are no other weakrefs.
1325        #
1326        #               d0 has a __del__ method that references c2wr
1327        #               ^
1328        #               |
1329        #               |     Generation 2 above dots
1330        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1331        #               |     Generation 0 below dots
1332        #               |
1333        #               |
1334        #            ^->c1   ^->c2 has a wr but no callback
1335        #            |  |    |  |
1336        #            <--v    <--v
1337        #
1338        # So this is the nightmare:  when generation 0 gets collected, we see
1339        # that c2 has a callback-free weakref, and c1 doesn't even have a
1340        # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
1341        # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
1342        # on d0 to 0, so d0 goes away (despite that it's in an older
1343        # generation) and d0's __del__ triggers.  That in turn materializes
1344        # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
1345
1346        # We want to let gc happen "naturally", to preserve the distinction
1347        # between generations.
1348        detector = GC_Detector()
1349        junk = []
1350        i = 0
1351        while not detector.gc_happened:
1352            i += 1
1353            if i > 10000:
1354                self.fail("gc didn't happen after 10000 iterations")
1355            self.assertEqual(len(ouch), 0)
1356            junk.append([])  # this will eventually trigger gc
1357
1358        self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
1359        for x in ouch:
1360            # If __del__ resurrected c2, the instance would be damaged, with an
1361            # empty __dict__.
1362            self.assertEqual(x, None)
1363
1364
1365class PythonFinalizationTests(unittest.TestCase):
1366    def test_ast_fini(self):
1367        # bpo-44184: Regression test for subtype_dealloc() when deallocating
1368        # an AST instance also destroy its AST type: subtype_dealloc() must
1369        # not access the type memory after deallocating the instance, since
1370        # the type memory can be freed as well. The test is also related to
1371        # _PyAST_Fini() which clears references to AST types.
1372        code = textwrap.dedent("""
1373            import ast
1374            import codecs
1375
1376            # Small AST tree to keep their AST types alive
1377            tree = ast.parse("def f(x, y): return 2*x-y")
1378            x = [tree]
1379            x.append(x)
1380
1381            # Put the cycle somewhere to survive until the last GC collection.
1382            # Codec search functions are only cleared at the end of
1383            # interpreter_clear().
1384            def search_func(encoding):
1385                return None
1386            search_func.a = x
1387            codecs.register(search_func)
1388        """)
1389        assert_python_ok("-c", code)
1390
1391
1392def setUpModule():
1393    global enabled, debug
1394    enabled = gc.isenabled()
1395    gc.disable()
1396    assert not gc.isenabled()
1397    debug = gc.get_debug()
1398    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
1399    gc.collect() # Delete 2nd generation garbage
1400
1401
1402def tearDownModule():
1403    gc.set_debug(debug)
1404    # test gc.enable() even if GC is disabled by default
1405    if verbose:
1406        print("restoring automatic collection")
1407    # make sure to always test gc.enable()
1408    gc.enable()
1409    assert gc.isenabled()
1410    if not enabled:
1411        gc.disable()
1412
1413
1414if __name__ == "__main__":
1415    unittest.main()
1416