• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test.support import (verbose, refcount_test, run_unittest,
3                          strip_python_stderr, cpython_only, start_threads,
4                          temp_dir, requires_type_collecting, TESTFN, unlink,
5                          import_module)
6from test.support.script_helper import assert_python_ok, make_script
7
8import gc
9import sys
10import sysconfig
11import textwrap
12import threading
13import time
14import weakref
15
16try:
17    from _testcapi import with_tp_del
18except ImportError:
19    def with_tp_del(cls):
20        class C(object):
21            def __new__(cls, *args, **kwargs):
22                raise TypeError('requires _testcapi.with_tp_del')
23        return C
24
25### Support code
26###############################################################################
27
28# Bug 1055820 has several tests of longstanding bugs involving weakrefs and
29# cyclic gc.
30
31# An instance of C1055820 has a self-loop, so becomes cyclic trash when
32# unreachable.
33class C1055820(object):
34    def __init__(self, i):
35        self.i = i
36        self.loop = self
37
38class GC_Detector(object):
39    # Create an instance I.  Then gc hasn't happened again so long as
40    # I.gc_happened is false.
41
42    def __init__(self):
43        self.gc_happened = False
44
45        def it_happened(ignored):
46            self.gc_happened = True
47
48        # Create a piece of cyclic trash that triggers it_happened when
49        # gc collects it.
50        self.wr = weakref.ref(C1055820(666), it_happened)
51
52@with_tp_del
53class Uncollectable(object):
54    """Create a reference cycle with multiple __del__ methods.
55
56    An object in a reference cycle will never have zero references,
57    and so must be garbage collected.  If one or more objects in the
58    cycle have __del__ methods, the gc refuses to guess an order,
59    and leaves the cycle uncollected."""
60    def __init__(self, partner=None):
61        if partner is None:
62            self.partner = Uncollectable(partner=self)
63        else:
64            self.partner = partner
65    def __tp_del__(self):
66        pass
67
68if sysconfig.get_config_vars().get('PY_CFLAGS', ''):
69    BUILD_WITH_NDEBUG = ('-DNDEBUG' in sysconfig.get_config_vars()['PY_CFLAGS'])
70else:
71    # Usually, sys.gettotalrefcount() is only present if Python has been
72    # compiled in debug mode. If it's missing, expect that Python has
73    # been released in release mode: with NDEBUG defined.
74    BUILD_WITH_NDEBUG = (not hasattr(sys, 'gettotalrefcount'))
75
76### Tests
77###############################################################################
78
79class GCTests(unittest.TestCase):
80    def test_list(self):
81        l = []
82        l.append(l)
83        gc.collect()
84        del l
85        self.assertEqual(gc.collect(), 1)
86
87    def test_dict(self):
88        d = {}
89        d[1] = d
90        gc.collect()
91        del d
92        self.assertEqual(gc.collect(), 1)
93
94    def test_tuple(self):
95        # since tuples are immutable we close the loop with a list
96        l = []
97        t = (l,)
98        l.append(t)
99        gc.collect()
100        del t
101        del l
102        self.assertEqual(gc.collect(), 2)
103
104    def test_class(self):
105        class A:
106            pass
107        A.a = A
108        gc.collect()
109        del A
110        self.assertNotEqual(gc.collect(), 0)
111
112    def test_newstyleclass(self):
113        class A(object):
114            pass
115        gc.collect()
116        del A
117        self.assertNotEqual(gc.collect(), 0)
118
119    def test_instance(self):
120        class A:
121            pass
122        a = A()
123        a.a = a
124        gc.collect()
125        del a
126        self.assertNotEqual(gc.collect(), 0)
127
128    @requires_type_collecting
129    def test_newinstance(self):
130        class A(object):
131            pass
132        a = A()
133        a.a = a
134        gc.collect()
135        del a
136        self.assertNotEqual(gc.collect(), 0)
137        class B(list):
138            pass
139        class C(B, A):
140            pass
141        a = C()
142        a.a = a
143        gc.collect()
144        del a
145        self.assertNotEqual(gc.collect(), 0)
146        del B, C
147        self.assertNotEqual(gc.collect(), 0)
148        A.a = A()
149        del A
150        self.assertNotEqual(gc.collect(), 0)
151        self.assertEqual(gc.collect(), 0)
152
153    def test_method(self):
154        # Tricky: self.__init__ is a bound method, it references the instance.
155        class A:
156            def __init__(self):
157                self.init = self.__init__
158        a = A()
159        gc.collect()
160        del a
161        self.assertNotEqual(gc.collect(), 0)
162
163    @cpython_only
164    def test_legacy_finalizer(self):
165        # A() is uncollectable if it is part of a cycle, make sure it shows up
166        # in gc.garbage.
167        @with_tp_del
168        class A:
169            def __tp_del__(self): pass
170        class B:
171            pass
172        a = A()
173        a.a = a
174        id_a = id(a)
175        b = B()
176        b.b = b
177        gc.collect()
178        del a
179        del b
180        self.assertNotEqual(gc.collect(), 0)
181        for obj in gc.garbage:
182            if id(obj) == id_a:
183                del obj.a
184                break
185        else:
186            self.fail("didn't find obj in garbage (finalizer)")
187        gc.garbage.remove(obj)
188
189    @cpython_only
190    def test_legacy_finalizer_newclass(self):
191        # A() is uncollectable if it is part of a cycle, make sure it shows up
192        # in gc.garbage.
193        @with_tp_del
194        class A(object):
195            def __tp_del__(self): pass
196        class B(object):
197            pass
198        a = A()
199        a.a = a
200        id_a = id(a)
201        b = B()
202        b.b = b
203        gc.collect()
204        del a
205        del b
206        self.assertNotEqual(gc.collect(), 0)
207        for obj in gc.garbage:
208            if id(obj) == id_a:
209                del obj.a
210                break
211        else:
212            self.fail("didn't find obj in garbage (finalizer)")
213        gc.garbage.remove(obj)
214
215    def test_function(self):
216        # Tricky: f -> d -> f, code should call d.clear() after the exec to
217        # break the cycle.
218        d = {}
219        exec("def f(): pass\n", d)
220        gc.collect()
221        del d
222        self.assertEqual(gc.collect(), 2)
223
224    @refcount_test
225    def test_frame(self):
226        def f():
227            frame = sys._getframe()
228        gc.collect()
229        f()
230        self.assertEqual(gc.collect(), 1)
231
232    def test_saveall(self):
233        # Verify that cyclic garbage like lists show up in gc.garbage if the
234        # SAVEALL option is enabled.
235
236        # First make sure we don't save away other stuff that just happens to
237        # be waiting for collection.
238        gc.collect()
239        # if this fails, someone else created immortal trash
240        self.assertEqual(gc.garbage, [])
241
242        L = []
243        L.append(L)
244        id_L = id(L)
245
246        debug = gc.get_debug()
247        gc.set_debug(debug | gc.DEBUG_SAVEALL)
248        del L
249        gc.collect()
250        gc.set_debug(debug)
251
252        self.assertEqual(len(gc.garbage), 1)
253        obj = gc.garbage.pop()
254        self.assertEqual(id(obj), id_L)
255
256    def test_del(self):
257        # __del__ methods can trigger collection, make this to happen
258        thresholds = gc.get_threshold()
259        gc.enable()
260        gc.set_threshold(1)
261
262        class A:
263            def __del__(self):
264                dir(self)
265        a = A()
266        del a
267
268        gc.disable()
269        gc.set_threshold(*thresholds)
270
271    def test_del_newclass(self):
272        # __del__ methods can trigger collection, make this to happen
273        thresholds = gc.get_threshold()
274        gc.enable()
275        gc.set_threshold(1)
276
277        class A(object):
278            def __del__(self):
279                dir(self)
280        a = A()
281        del a
282
283        gc.disable()
284        gc.set_threshold(*thresholds)
285
286    # The following two tests are fragile:
287    # They precisely count the number of allocations,
288    # which is highly implementation-dependent.
289    # For example, disposed tuples are not freed, but reused.
290    # To minimize variations, though, we first store the get_count() results
291    # and check them at the end.
292    @refcount_test
293    def test_get_count(self):
294        gc.collect()
295        a, b, c = gc.get_count()
296        x = []
297        d, e, f = gc.get_count()
298        self.assertEqual((b, c), (0, 0))
299        self.assertEqual((e, f), (0, 0))
300        # This is less fragile than asserting that a equals 0.
301        self.assertLess(a, 5)
302        # Between the two calls to get_count(), at least one object was
303        # created (the list).
304        self.assertGreater(d, a)
305
306    @refcount_test
307    def test_collect_generations(self):
308        gc.collect()
309        # This object will "trickle" into generation N + 1 after
310        # each call to collect(N)
311        x = []
312        gc.collect(0)
313        # x is now in gen 1
314        a, b, c = gc.get_count()
315        gc.collect(1)
316        # x is now in gen 2
317        d, e, f = gc.get_count()
318        gc.collect(2)
319        # x is now in gen 3
320        g, h, i = gc.get_count()
321        # We don't check a, d, g since their exact values depends on
322        # internal implementation details of the interpreter.
323        self.assertEqual((b, c), (1, 0))
324        self.assertEqual((e, f), (0, 1))
325        self.assertEqual((h, i), (0, 0))
326
327    def test_trashcan(self):
328        class Ouch:
329            n = 0
330            def __del__(self):
331                Ouch.n = Ouch.n + 1
332                if Ouch.n % 17 == 0:
333                    gc.collect()
334
335        # "trashcan" is a hack to prevent stack overflow when deallocating
336        # very deeply nested tuples etc.  It works in part by abusing the
337        # type pointer and refcount fields, and that can yield horrible
338        # problems when gc tries to traverse the structures.
339        # If this test fails (as it does in 2.0, 2.1 and 2.2), it will
340        # most likely die via segfault.
341
342        # Note:  In 2.3 the possibility for compiling without cyclic gc was
343        # removed, and that in turn allows the trashcan mechanism to work
344        # via much simpler means (e.g., it never abuses the type pointer or
345        # refcount fields anymore).  Since it's much less likely to cause a
346        # problem now, the various constants in this expensive (we force a lot
347        # of full collections) test are cut back from the 2.2 version.
348        gc.enable()
349        N = 150
350        for count in range(2):
351            t = []
352            for i in range(N):
353                t = [t, Ouch()]
354            u = []
355            for i in range(N):
356                u = [u, Ouch()]
357            v = {}
358            for i in range(N):
359                v = {1: v, 2: Ouch()}
360        gc.disable()
361
362    def test_trashcan_threads(self):
363        # Issue #13992: trashcan mechanism should be thread-safe
364        NESTING = 60
365        N_THREADS = 2
366
367        def sleeper_gen():
368            """A generator that releases the GIL when closed or dealloc'ed."""
369            try:
370                yield
371            finally:
372                time.sleep(0.000001)
373
374        class C(list):
375            # Appending to a list is atomic, which avoids the use of a lock.
376            inits = []
377            dels = []
378            def __init__(self, alist):
379                self[:] = alist
380                C.inits.append(None)
381            def __del__(self):
382                # This __del__ is called by subtype_dealloc().
383                C.dels.append(None)
384                # `g` will release the GIL when garbage-collected.  This
385                # helps assert subtype_dealloc's behaviour when threads
386                # switch in the middle of it.
387                g = sleeper_gen()
388                next(g)
389                # Now that __del__ is finished, subtype_dealloc will proceed
390                # to call list_dealloc, which also uses the trashcan mechanism.
391
392        def make_nested():
393            """Create a sufficiently nested container object so that the
394            trashcan mechanism is invoked when deallocating it."""
395            x = C([])
396            for i in range(NESTING):
397                x = [C([x])]
398            del x
399
400        def run_thread():
401            """Exercise make_nested() in a loop."""
402            while not exit:
403                make_nested()
404
405        old_switchinterval = sys.getswitchinterval()
406        sys.setswitchinterval(1e-5)
407        try:
408            exit = []
409            threads = []
410            for i in range(N_THREADS):
411                t = threading.Thread(target=run_thread)
412                threads.append(t)
413            with start_threads(threads, lambda: exit.append(1)):
414                time.sleep(1.0)
415        finally:
416            sys.setswitchinterval(old_switchinterval)
417        gc.collect()
418        self.assertEqual(len(C.inits), len(C.dels))
419
420    def test_boom(self):
421        class Boom:
422            def __getattr__(self, someattribute):
423                del self.attr
424                raise AttributeError
425
426        a = Boom()
427        b = Boom()
428        a.attr = b
429        b.attr = a
430
431        gc.collect()
432        garbagelen = len(gc.garbage)
433        del a, b
434        # a<->b are in a trash cycle now.  Collection will invoke
435        # Boom.__getattr__ (to see whether a and b have __del__ methods), and
436        # __getattr__ deletes the internal "attr" attributes as a side effect.
437        # That causes the trash cycle to get reclaimed via refcounts falling to
438        # 0, thus mutating the trash graph as a side effect of merely asking
439        # whether __del__ exists.  This used to (before 2.3b1) crash Python.
440        # Now __getattr__ isn't called.
441        self.assertEqual(gc.collect(), 4)
442        self.assertEqual(len(gc.garbage), garbagelen)
443
444    def test_boom2(self):
445        class Boom2:
446            def __init__(self):
447                self.x = 0
448
449            def __getattr__(self, someattribute):
450                self.x += 1
451                if self.x > 1:
452                    del self.attr
453                raise AttributeError
454
455        a = Boom2()
456        b = Boom2()
457        a.attr = b
458        b.attr = a
459
460        gc.collect()
461        garbagelen = len(gc.garbage)
462        del a, b
463        # Much like test_boom(), except that __getattr__ doesn't break the
464        # cycle until the second time gc checks for __del__.  As of 2.3b1,
465        # there isn't a second time, so this simply cleans up the trash cycle.
466        # We expect a, b, a.__dict__ and b.__dict__ (4 objects) to get
467        # reclaimed this way.
468        self.assertEqual(gc.collect(), 4)
469        self.assertEqual(len(gc.garbage), garbagelen)
470
471    def test_boom_new(self):
472        # boom__new and boom2_new are exactly like boom and boom2, except use
473        # new-style classes.
474
475        class Boom_New(object):
476            def __getattr__(self, someattribute):
477                del self.attr
478                raise AttributeError
479
480        a = Boom_New()
481        b = Boom_New()
482        a.attr = b
483        b.attr = a
484
485        gc.collect()
486        garbagelen = len(gc.garbage)
487        del a, b
488        self.assertEqual(gc.collect(), 4)
489        self.assertEqual(len(gc.garbage), garbagelen)
490
491    def test_boom2_new(self):
492        class Boom2_New(object):
493            def __init__(self):
494                self.x = 0
495
496            def __getattr__(self, someattribute):
497                self.x += 1
498                if self.x > 1:
499                    del self.attr
500                raise AttributeError
501
502        a = Boom2_New()
503        b = Boom2_New()
504        a.attr = b
505        b.attr = a
506
507        gc.collect()
508        garbagelen = len(gc.garbage)
509        del a, b
510        self.assertEqual(gc.collect(), 4)
511        self.assertEqual(len(gc.garbage), garbagelen)
512
513    def test_get_referents(self):
514        alist = [1, 3, 5]
515        got = gc.get_referents(alist)
516        got.sort()
517        self.assertEqual(got, alist)
518
519        atuple = tuple(alist)
520        got = gc.get_referents(atuple)
521        got.sort()
522        self.assertEqual(got, alist)
523
524        adict = {1: 3, 5: 7}
525        expected = [1, 3, 5, 7]
526        got = gc.get_referents(adict)
527        got.sort()
528        self.assertEqual(got, expected)
529
530        got = gc.get_referents([1, 2], {3: 4}, (0, 0, 0))
531        got.sort()
532        self.assertEqual(got, [0, 0] + list(range(5)))
533
534        self.assertEqual(gc.get_referents(1, 'a', 4j), [])
535
536    def test_is_tracked(self):
537        # Atomic built-in types are not tracked, user-defined objects and
538        # mutable containers are.
539        # NOTE: types with special optimizations (e.g. tuple) have tests
540        # in their own test files instead.
541        self.assertFalse(gc.is_tracked(None))
542        self.assertFalse(gc.is_tracked(1))
543        self.assertFalse(gc.is_tracked(1.0))
544        self.assertFalse(gc.is_tracked(1.0 + 5.0j))
545        self.assertFalse(gc.is_tracked(True))
546        self.assertFalse(gc.is_tracked(False))
547        self.assertFalse(gc.is_tracked(b"a"))
548        self.assertFalse(gc.is_tracked("a"))
549        self.assertFalse(gc.is_tracked(bytearray(b"a")))
550        self.assertFalse(gc.is_tracked(type))
551        self.assertFalse(gc.is_tracked(int))
552        self.assertFalse(gc.is_tracked(object))
553        self.assertFalse(gc.is_tracked(object()))
554
555        class UserClass:
556            pass
557
558        class UserInt(int):
559            pass
560
561        # Base class is object; no extra fields.
562        class UserClassSlots:
563            __slots__ = ()
564
565        # Base class is fixed size larger than object; no extra fields.
566        class UserFloatSlots(float):
567            __slots__ = ()
568
569        # Base class is variable size; no extra fields.
570        class UserIntSlots(int):
571            __slots__ = ()
572
573        self.assertTrue(gc.is_tracked(gc))
574        self.assertTrue(gc.is_tracked(UserClass))
575        self.assertTrue(gc.is_tracked(UserClass()))
576        self.assertTrue(gc.is_tracked(UserInt()))
577        self.assertTrue(gc.is_tracked([]))
578        self.assertTrue(gc.is_tracked(set()))
579        self.assertFalse(gc.is_tracked(UserClassSlots()))
580        self.assertFalse(gc.is_tracked(UserFloatSlots()))
581        self.assertFalse(gc.is_tracked(UserIntSlots()))
582
583    def test_bug1055820b(self):
584        # Corresponds to temp2b.py in the bug report.
585
586        ouch = []
587        def callback(ignored):
588            ouch[:] = [wr() for wr in WRs]
589
590        Cs = [C1055820(i) for i in range(2)]
591        WRs = [weakref.ref(c, callback) for c in Cs]
592        c = None
593
594        gc.collect()
595        self.assertEqual(len(ouch), 0)
596        # Make the two instances trash, and collect again.  The bug was that
597        # the callback materialized a strong reference to an instance, but gc
598        # cleared the instance's dict anyway.
599        Cs = None
600        gc.collect()
601        self.assertEqual(len(ouch), 2)  # else the callbacks didn't run
602        for x in ouch:
603            # If the callback resurrected one of these guys, the instance
604            # would be damaged, with an empty __dict__.
605            self.assertEqual(x, None)
606
607    def test_bug21435(self):
608        # This is a poor test - its only virtue is that it happened to
609        # segfault on Tim's Windows box before the patch for 21435 was
610        # applied.  That's a nasty bug relying on specific pieces of cyclic
611        # trash appearing in exactly the right order in finalize_garbage()'s
612        # input list.
613        # But there's no reliable way to force that order from Python code,
614        # so over time chances are good this test won't really be testing much
615        # of anything anymore.  Still, if it blows up, there's _some_
616        # problem ;-)
617        gc.collect()
618
619        class A:
620            pass
621
622        class B:
623            def __init__(self, x):
624                self.x = x
625
626            def __del__(self):
627                self.attr = None
628
629        def do_work():
630            a = A()
631            b = B(A())
632
633            a.attr = b
634            b.attr = a
635
636        do_work()
637        gc.collect() # this blows up (bad C pointer) when it fails
638
639    @cpython_only
640    def test_garbage_at_shutdown(self):
641        import subprocess
642        code = """if 1:
643            import gc
644            import _testcapi
645            @_testcapi.with_tp_del
646            class X:
647                def __init__(self, name):
648                    self.name = name
649                def __repr__(self):
650                    return "<X %%r>" %% self.name
651                def __tp_del__(self):
652                    pass
653
654            x = X('first')
655            x.x = x
656            x.y = X('second')
657            del x
658            gc.set_debug(%s)
659        """
660        def run_command(code):
661            p = subprocess.Popen([sys.executable, "-Wd", "-c", code],
662                stdout=subprocess.PIPE,
663                stderr=subprocess.PIPE)
664            stdout, stderr = p.communicate()
665            p.stdout.close()
666            p.stderr.close()
667            self.assertEqual(p.returncode, 0)
668            self.assertEqual(stdout.strip(), b"")
669            return strip_python_stderr(stderr)
670
671        stderr = run_command(code % "0")
672        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
673                      b"shutdown; use", stderr)
674        self.assertNotIn(b"<X 'first'>", stderr)
675        # With DEBUG_UNCOLLECTABLE, the garbage list gets printed
676        stderr = run_command(code % "gc.DEBUG_UNCOLLECTABLE")
677        self.assertIn(b"ResourceWarning: gc: 2 uncollectable objects at "
678                      b"shutdown", stderr)
679        self.assertTrue(
680            (b"[<X 'first'>, <X 'second'>]" in stderr) or
681            (b"[<X 'second'>, <X 'first'>]" in stderr), stderr)
682        # With DEBUG_SAVEALL, no additional message should get printed
683        # (because gc.garbage also contains normally reclaimable cyclic
684        # references, and its elements get printed at runtime anyway).
685        stderr = run_command(code % "gc.DEBUG_SAVEALL")
686        self.assertNotIn(b"uncollectable objects at shutdown", stderr)
687
688    @requires_type_collecting
689    def test_gc_main_module_at_shutdown(self):
690        # Create a reference cycle through the __main__ module and check
691        # it gets collected at interpreter shutdown.
692        code = """if 1:
693            class C:
694                def __del__(self):
695                    print('__del__ called')
696            l = [C()]
697            l.append(l)
698            """
699        rc, out, err = assert_python_ok('-c', code)
700        self.assertEqual(out.strip(), b'__del__ called')
701
702    @requires_type_collecting
703    def test_gc_ordinary_module_at_shutdown(self):
704        # Same as above, but with a non-__main__ module.
705        with temp_dir() as script_dir:
706            module = """if 1:
707                class C:
708                    def __del__(self):
709                        print('__del__ called')
710                l = [C()]
711                l.append(l)
712                """
713            code = """if 1:
714                import sys
715                sys.path.insert(0, %r)
716                import gctest
717                """ % (script_dir,)
718            make_script(script_dir, 'gctest', module)
719            rc, out, err = assert_python_ok('-c', code)
720            self.assertEqual(out.strip(), b'__del__ called')
721
722    @requires_type_collecting
723    def test_global_del_SystemExit(self):
724        code = """if 1:
725            class ClassWithDel:
726                def __del__(self):
727                    print('__del__ called')
728            a = ClassWithDel()
729            a.link = a
730            raise SystemExit(0)"""
731        self.addCleanup(unlink, TESTFN)
732        with open(TESTFN, 'w') as script:
733            script.write(code)
734        rc, out, err = assert_python_ok(TESTFN)
735        self.assertEqual(out.strip(), b'__del__ called')
736
737    def test_get_stats(self):
738        stats = gc.get_stats()
739        self.assertEqual(len(stats), 3)
740        for st in stats:
741            self.assertIsInstance(st, dict)
742            self.assertEqual(set(st),
743                             {"collected", "collections", "uncollectable"})
744            self.assertGreaterEqual(st["collected"], 0)
745            self.assertGreaterEqual(st["collections"], 0)
746            self.assertGreaterEqual(st["uncollectable"], 0)
747        # Check that collection counts are incremented correctly
748        if gc.isenabled():
749            self.addCleanup(gc.enable)
750            gc.disable()
751        old = gc.get_stats()
752        gc.collect(0)
753        new = gc.get_stats()
754        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
755        self.assertEqual(new[1]["collections"], old[1]["collections"])
756        self.assertEqual(new[2]["collections"], old[2]["collections"])
757        gc.collect(2)
758        new = gc.get_stats()
759        self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
760        self.assertEqual(new[1]["collections"], old[1]["collections"])
761        self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
762
763    def test_freeze(self):
764        gc.freeze()
765        self.assertGreater(gc.get_freeze_count(), 0)
766        gc.unfreeze()
767        self.assertEqual(gc.get_freeze_count(), 0)
768
769    def test_get_objects(self):
770        gc.collect()
771        l = []
772        l.append(l)
773        self.assertTrue(
774                any(l is element for element in gc.get_objects(generation=0))
775        )
776        self.assertFalse(
777                any(l is element for element in  gc.get_objects(generation=1))
778        )
779        self.assertFalse(
780                any(l is element for element in gc.get_objects(generation=2))
781        )
782        gc.collect(generation=0)
783        self.assertFalse(
784                any(l is element for element in gc.get_objects(generation=0))
785        )
786        self.assertTrue(
787                any(l is element for element in  gc.get_objects(generation=1))
788        )
789        self.assertFalse(
790                any(l is element for element in gc.get_objects(generation=2))
791        )
792        gc.collect(generation=1)
793        self.assertFalse(
794                any(l is element for element in gc.get_objects(generation=0))
795        )
796        self.assertFalse(
797                any(l is element for element in  gc.get_objects(generation=1))
798        )
799        self.assertTrue(
800                any(l is element for element in gc.get_objects(generation=2))
801        )
802        gc.collect(generation=2)
803        self.assertFalse(
804                any(l is element for element in gc.get_objects(generation=0))
805        )
806        self.assertFalse(
807                any(l is element for element in  gc.get_objects(generation=1))
808        )
809        self.assertTrue(
810                any(l is element for element in gc.get_objects(generation=2))
811        )
812        del l
813        gc.collect()
814
815    def test_get_objects_arguments(self):
816        gc.collect()
817        self.assertEqual(len(gc.get_objects()),
818                         len(gc.get_objects(generation=None)))
819
820        self.assertRaises(ValueError, gc.get_objects, 1000)
821        self.assertRaises(ValueError, gc.get_objects, -1000)
822        self.assertRaises(TypeError, gc.get_objects, "1")
823        self.assertRaises(TypeError, gc.get_objects, 1.234)
824
825    def test_38379(self):
826        # When a finalizer resurrects objects, stats were reporting them as
827        # having been collected.  This affected both collect()'s return
828        # value and the dicts returned by get_stats().
829        N = 100
830
831        class A:  # simple self-loop
832            def __init__(self):
833                self.me = self
834
835        class Z(A):  # resurrecting __del__
836            def __del__(self):
837                zs.append(self)
838
839        zs = []
840
841        def getstats():
842            d = gc.get_stats()[-1]
843            return d['collected'], d['uncollectable']
844
845        gc.collect()
846        gc.disable()
847
848        # No problems if just collecting A() instances.
849        oldc, oldnc = getstats()
850        for i in range(N):
851            A()
852        t = gc.collect()
853        c, nc = getstats()
854        self.assertEqual(t, 2*N) # instance object & its dict
855        self.assertEqual(c - oldc, 2*N)
856        self.assertEqual(nc - oldnc, 0)
857
858        # But Z() is not actually collected.
859        oldc, oldnc = c, nc
860        Z()
861        # Nothing is collected - Z() is merely resurrected.
862        t = gc.collect()
863        c, nc = getstats()
864        #self.assertEqual(t, 2)  # before
865        self.assertEqual(t, 0)  # after
866        #self.assertEqual(c - oldc, 2)   # before
867        self.assertEqual(c - oldc, 0)   # after
868        self.assertEqual(nc - oldnc, 0)
869
870        # Unfortunately, a Z() prevents _anything_ from being collected.
871        # It should be possible to collect the A instances anyway, but
872        # that will require non-trivial code changes.
873        oldc, oldnc = c, nc
874        for i in range(N):
875            A()
876        Z()
877        # Z() prevents anything from being collected.
878        t = gc.collect()
879        c, nc = getstats()
880        #self.assertEqual(t, 2*N + 2)  # before
881        self.assertEqual(t, 0)  # after
882        #self.assertEqual(c - oldc, 2*N + 2)   # before
883        self.assertEqual(c - oldc, 0)   # after
884        self.assertEqual(nc - oldnc, 0)
885
886        # But the A() trash is reclaimed on the next run.
887        oldc, oldnc = c, nc
888        t = gc.collect()
889        c, nc = getstats()
890        self.assertEqual(t, 2*N)
891        self.assertEqual(c - oldc, 2*N)
892        self.assertEqual(nc - oldnc, 0)
893
894        gc.enable()
895
896class GCCallbackTests(unittest.TestCase):
897    def setUp(self):
898        # Save gc state and disable it.
899        self.enabled = gc.isenabled()
900        gc.disable()
901        self.debug = gc.get_debug()
902        gc.set_debug(0)
903        gc.callbacks.append(self.cb1)
904        gc.callbacks.append(self.cb2)
905        self.othergarbage = []
906
907    def tearDown(self):
908        # Restore gc state
909        del self.visit
910        gc.callbacks.remove(self.cb1)
911        gc.callbacks.remove(self.cb2)
912        gc.set_debug(self.debug)
913        if self.enabled:
914            gc.enable()
915        # destroy any uncollectables
916        gc.collect()
917        for obj in gc.garbage:
918            if isinstance(obj, Uncollectable):
919                obj.partner = None
920        del gc.garbage[:]
921        del self.othergarbage
922        gc.collect()
923
924    def preclean(self):
925        # Remove all fluff from the system.  Invoke this function
926        # manually rather than through self.setUp() for maximum
927        # safety.
928        self.visit = []
929        gc.collect()
930        garbage, gc.garbage[:] = gc.garbage[:], []
931        self.othergarbage.append(garbage)
932        self.visit = []
933
934    def cb1(self, phase, info):
935        self.visit.append((1, phase, dict(info)))
936
937    def cb2(self, phase, info):
938        self.visit.append((2, phase, dict(info)))
939        if phase == "stop" and hasattr(self, "cleanup"):
940            # Clean Uncollectable from garbage
941            uc = [e for e in gc.garbage if isinstance(e, Uncollectable)]
942            gc.garbage[:] = [e for e in gc.garbage
943                             if not isinstance(e, Uncollectable)]
944            for e in uc:
945                e.partner = None
946
947    def test_collect(self):
948        self.preclean()
949        gc.collect()
950        # Algorithmically verify the contents of self.visit
951        # because it is long and tortuous.
952
953        # Count the number of visits to each callback
954        n = [v[0] for v in self.visit]
955        n1 = [i for i in n if i == 1]
956        n2 = [i for i in n if i == 2]
957        self.assertEqual(n1, [1]*2)
958        self.assertEqual(n2, [2]*2)
959
960        # Count that we got the right number of start and stop callbacks.
961        n = [v[1] for v in self.visit]
962        n1 = [i for i in n if i == "start"]
963        n2 = [i for i in n if i == "stop"]
964        self.assertEqual(n1, ["start"]*2)
965        self.assertEqual(n2, ["stop"]*2)
966
967        # Check that we got the right info dict for all callbacks
968        for v in self.visit:
969            info = v[2]
970            self.assertTrue("generation" in info)
971            self.assertTrue("collected" in info)
972            self.assertTrue("uncollectable" in info)
973
974    def test_collect_generation(self):
975        self.preclean()
976        gc.collect(2)
977        for v in self.visit:
978            info = v[2]
979            self.assertEqual(info["generation"], 2)
980
981    @cpython_only
982    def test_collect_garbage(self):
983        self.preclean()
984        # Each of these cause four objects to be garbage: Two
985        # Uncollectables and their instance dicts.
986        Uncollectable()
987        Uncollectable()
988        C1055820(666)
989        gc.collect()
990        for v in self.visit:
991            if v[1] != "stop":
992                continue
993            info = v[2]
994            self.assertEqual(info["collected"], 2)
995            self.assertEqual(info["uncollectable"], 8)
996
997        # We should now have the Uncollectables in gc.garbage
998        self.assertEqual(len(gc.garbage), 4)
999        for e in gc.garbage:
1000            self.assertIsInstance(e, Uncollectable)
1001
1002        # Now, let our callback handle the Uncollectable instances
1003        self.cleanup=True
1004        self.visit = []
1005        gc.garbage[:] = []
1006        gc.collect()
1007        for v in self.visit:
1008            if v[1] != "stop":
1009                continue
1010            info = v[2]
1011            self.assertEqual(info["collected"], 0)
1012            self.assertEqual(info["uncollectable"], 4)
1013
1014        # Uncollectables should be gone
1015        self.assertEqual(len(gc.garbage), 0)
1016
1017
1018    @unittest.skipIf(BUILD_WITH_NDEBUG,
1019                     'built with -NDEBUG')
1020    def test_refcount_errors(self):
1021        self.preclean()
1022        # Verify the "handling" of objects with broken refcounts
1023
1024        # Skip the test if ctypes is not available
1025        import_module("ctypes")
1026
1027        import subprocess
1028        code = textwrap.dedent('''
1029            from test.support import gc_collect, SuppressCrashReport
1030
1031            a = [1, 2, 3]
1032            b = [a]
1033
1034            # Avoid coredump when Py_FatalError() calls abort()
1035            SuppressCrashReport().__enter__()
1036
1037            # Simulate the refcount of "a" being too low (compared to the
1038            # references held on it by live data), but keeping it above zero
1039            # (to avoid deallocating it):
1040            import ctypes
1041            ctypes.pythonapi.Py_DecRef(ctypes.py_object(a))
1042
1043            # The garbage collector should now have a fatal error
1044            # when it reaches the broken object
1045            gc_collect()
1046        ''')
1047        p = subprocess.Popen([sys.executable, "-c", code],
1048                             stdout=subprocess.PIPE,
1049                             stderr=subprocess.PIPE)
1050        stdout, stderr = p.communicate()
1051        p.stdout.close()
1052        p.stderr.close()
1053        # Verify that stderr has a useful error message:
1054        self.assertRegex(stderr,
1055            br'gcmodule\.c:[0-9]+: gc_decref: Assertion "gc_get_refs\(g\) > 0" failed.')
1056        self.assertRegex(stderr,
1057            br'refcount is too small')
1058        # "address : 0x7fb5062efc18"
1059        # "address : 7FB5062EFC18"
1060        address_regex = br'[0-9a-fA-Fx]+'
1061        self.assertRegex(stderr,
1062            br'object address  : ' + address_regex)
1063        self.assertRegex(stderr,
1064            br'object refcount : 1')
1065        self.assertRegex(stderr,
1066            br'object type     : ' + address_regex)
1067        self.assertRegex(stderr,
1068            br'object type name: list')
1069        self.assertRegex(stderr,
1070            br'object repr     : \[1, 2, 3\]')
1071
1072
1073class GCTogglingTests(unittest.TestCase):
1074    def setUp(self):
1075        gc.enable()
1076
1077    def tearDown(self):
1078        gc.disable()
1079
1080    def test_bug1055820c(self):
1081        # Corresponds to temp2c.py in the bug report.  This is pretty
1082        # elaborate.
1083
1084        c0 = C1055820(0)
1085        # Move c0 into generation 2.
1086        gc.collect()
1087
1088        c1 = C1055820(1)
1089        c1.keep_c0_alive = c0
1090        del c0.loop # now only c1 keeps c0 alive
1091
1092        c2 = C1055820(2)
1093        c2wr = weakref.ref(c2) # no callback!
1094
1095        ouch = []
1096        def callback(ignored):
1097            ouch[:] = [c2wr()]
1098
1099        # The callback gets associated with a wr on an object in generation 2.
1100        c0wr = weakref.ref(c0, callback)
1101
1102        c0 = c1 = c2 = None
1103
1104        # What we've set up:  c0, c1, and c2 are all trash now.  c0 is in
1105        # generation 2.  The only thing keeping it alive is that c1 points to
1106        # it. c1 and c2 are in generation 0, and are in self-loops.  There's a
1107        # global weakref to c2 (c2wr), but that weakref has no callback.
1108        # There's also a global weakref to c0 (c0wr), and that does have a
1109        # callback, and that callback references c2 via c2wr().
1110        #
1111        #               c0 has a wr with callback, which references c2wr
1112        #               ^
1113        #               |
1114        #               |     Generation 2 above dots
1115        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1116        #               |     Generation 0 below dots
1117        #               |
1118        #               |
1119        #            ^->c1   ^->c2 has a wr but no callback
1120        #            |  |    |  |
1121        #            <--v    <--v
1122        #
1123        # So this is the nightmare:  when generation 0 gets collected, we see
1124        # that c2 has a callback-free weakref, and c1 doesn't even have a
1125        # weakref.  Collecting generation 0 doesn't see c0 at all, and c0 is
1126        # the only object that has a weakref with a callback.  gc clears c1
1127        # and c2.  Clearing c1 has the side effect of dropping the refcount on
1128        # c0 to 0, so c0 goes away (despite that it's in an older generation)
1129        # and c0's wr callback triggers.  That in turn materializes a reference
1130        # to c2 via c2wr(), but c2 gets cleared anyway by gc.
1131
1132        # We want to let gc happen "naturally", to preserve the distinction
1133        # between generations.
1134        junk = []
1135        i = 0
1136        detector = GC_Detector()
1137        while not detector.gc_happened:
1138            i += 1
1139            if i > 10000:
1140                self.fail("gc didn't happen after 10000 iterations")
1141            self.assertEqual(len(ouch), 0)
1142            junk.append([])  # this will eventually trigger gc
1143
1144        self.assertEqual(len(ouch), 1)  # else the callback wasn't invoked
1145        for x in ouch:
1146            # If the callback resurrected c2, the instance would be damaged,
1147            # with an empty __dict__.
1148            self.assertEqual(x, None)
1149
1150    def test_bug1055820d(self):
1151        # Corresponds to temp2d.py in the bug report.  This is very much like
1152        # test_bug1055820c, but uses a __del__ method instead of a weakref
1153        # callback to sneak in a resurrection of cyclic trash.
1154
1155        ouch = []
1156        class D(C1055820):
1157            def __del__(self):
1158                ouch[:] = [c2wr()]
1159
1160        d0 = D(0)
1161        # Move all the above into generation 2.
1162        gc.collect()
1163
1164        c1 = C1055820(1)
1165        c1.keep_d0_alive = d0
1166        del d0.loop # now only c1 keeps d0 alive
1167
1168        c2 = C1055820(2)
1169        c2wr = weakref.ref(c2) # no callback!
1170
1171        d0 = c1 = c2 = None
1172
1173        # What we've set up:  d0, c1, and c2 are all trash now.  d0 is in
1174        # generation 2.  The only thing keeping it alive is that c1 points to
1175        # it.  c1 and c2 are in generation 0, and are in self-loops.  There's
1176        # a global weakref to c2 (c2wr), but that weakref has no callback.
1177        # There are no other weakrefs.
1178        #
1179        #               d0 has a __del__ method that references c2wr
1180        #               ^
1181        #               |
1182        #               |     Generation 2 above dots
1183        #. . . . . . . .|. . . . . . . . . . . . . . . . . . . . . . . .
1184        #               |     Generation 0 below dots
1185        #               |
1186        #               |
1187        #            ^->c1   ^->c2 has a wr but no callback
1188        #            |  |    |  |
1189        #            <--v    <--v
1190        #
1191        # So this is the nightmare:  when generation 0 gets collected, we see
1192        # that c2 has a callback-free weakref, and c1 doesn't even have a
1193        # weakref.  Collecting generation 0 doesn't see d0 at all.  gc clears
1194        # c1 and c2.  Clearing c1 has the side effect of dropping the refcount
1195        # on d0 to 0, so d0 goes away (despite that it's in an older
1196        # generation) and d0's __del__ triggers.  That in turn materializes
1197        # a reference to c2 via c2wr(), but c2 gets cleared anyway by gc.
1198
1199        # We want to let gc happen "naturally", to preserve the distinction
1200        # between generations.
1201        detector = GC_Detector()
1202        junk = []
1203        i = 0
1204        while not detector.gc_happened:
1205            i += 1
1206            if i > 10000:
1207                self.fail("gc didn't happen after 10000 iterations")
1208            self.assertEqual(len(ouch), 0)
1209            junk.append([])  # this will eventually trigger gc
1210
1211        self.assertEqual(len(ouch), 1)  # else __del__ wasn't invoked
1212        for x in ouch:
1213            # If __del__ resurrected c2, the instance would be damaged, with an
1214            # empty __dict__.
1215            self.assertEqual(x, None)
1216
1217def test_main():
1218    enabled = gc.isenabled()
1219    gc.disable()
1220    assert not gc.isenabled()
1221    debug = gc.get_debug()
1222    gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak
1223
1224    try:
1225        gc.collect() # Delete 2nd generation garbage
1226        run_unittest(GCTests, GCTogglingTests, GCCallbackTests)
1227    finally:
1228        gc.set_debug(debug)
1229        # test gc.enable() even if GC is disabled by default
1230        if verbose:
1231            print("restoring automatic collection")
1232        # make sure to always test gc.enable()
1233        gc.enable()
1234        assert gc.isenabled()
1235        if not enabled:
1236            gc.disable()
1237
1238if __name__ == "__main__":
1239    test_main()
1240