• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""
2Tests for object finalization semantics, as outlined in PEP 442.
3"""
4
5import contextlib
6import gc
7import unittest
8import weakref
9
10try:
11    from _testcapi import with_tp_del
12except ImportError:
13    def with_tp_del(cls):
14        class C(object):
15            def __new__(cls, *args, **kwargs):
16                raise TypeError('requires _testcapi.with_tp_del')
17        return C
18
19from test import support
20
21
22class NonGCSimpleBase:
23    """
24    The base class for all the objects under test, equipped with various
25    testing features.
26    """
27
28    survivors = []
29    del_calls = []
30    tp_del_calls = []
31    errors = []
32
33    _cleaning = False
34
35    __slots__ = ()
36
37    @classmethod
38    def _cleanup(cls):
39        cls.survivors.clear()
40        cls.errors.clear()
41        gc.garbage.clear()
42        gc.collect()
43        cls.del_calls.clear()
44        cls.tp_del_calls.clear()
45
46    @classmethod
47    @contextlib.contextmanager
48    def test(cls):
49        """
50        A context manager to use around all finalization tests.
51        """
52        with support.disable_gc():
53            cls.del_calls.clear()
54            cls.tp_del_calls.clear()
55            NonGCSimpleBase._cleaning = False
56            try:
57                yield
58                if cls.errors:
59                    raise cls.errors[0]
60            finally:
61                NonGCSimpleBase._cleaning = True
62                cls._cleanup()
63
64    def check_sanity(self):
65        """
66        Check the object is sane (non-broken).
67        """
68
69    def __del__(self):
70        """
71        PEP 442 finalizer.  Record that this was called, check the
72        object is in a sane state, and invoke a side effect.
73        """
74        try:
75            if not self._cleaning:
76                self.del_calls.append(id(self))
77                self.check_sanity()
78                self.side_effect()
79        except Exception as e:
80            self.errors.append(e)
81
82    def side_effect(self):
83        """
84        A side effect called on destruction.
85        """
86
87
88class SimpleBase(NonGCSimpleBase):
89
90    def __init__(self):
91        self.id_ = id(self)
92
93    def check_sanity(self):
94        assert self.id_ == id(self)
95
96
97class NonGC(NonGCSimpleBase):
98    __slots__ = ()
99
100class NonGCResurrector(NonGCSimpleBase):
101    __slots__ = ()
102
103    def side_effect(self):
104        """
105        Resurrect self by storing self in a class-wide list.
106        """
107        self.survivors.append(self)
108
109class Simple(SimpleBase):
110    pass
111
112class SimpleResurrector(NonGCResurrector, SimpleBase):
113    pass
114
115
116class TestBase:
117
118    def setUp(self):
119        self.old_garbage = gc.garbage[:]
120        gc.garbage[:] = []
121
122    def tearDown(self):
123        # None of the tests here should put anything in gc.garbage
124        try:
125            self.assertEqual(gc.garbage, [])
126        finally:
127            del self.old_garbage
128            gc.collect()
129
130    def assert_del_calls(self, ids):
131        self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids))
132
133    def assert_tp_del_calls(self, ids):
134        self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids))
135
136    def assert_survivors(self, ids):
137        self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids))
138
139    def assert_garbage(self, ids):
140        self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids))
141
142    def clear_survivors(self):
143        SimpleBase.survivors.clear()
144
145
146class SimpleFinalizationTest(TestBase, unittest.TestCase):
147    """
148    Test finalization without refcycles.
149    """
150
151    def test_simple(self):
152        with SimpleBase.test():
153            s = Simple()
154            ids = [id(s)]
155            wr = weakref.ref(s)
156            del s
157            gc.collect()
158            self.assert_del_calls(ids)
159            self.assert_survivors([])
160            self.assertIs(wr(), None)
161            gc.collect()
162            self.assert_del_calls(ids)
163            self.assert_survivors([])
164
165    def test_simple_resurrect(self):
166        with SimpleBase.test():
167            s = SimpleResurrector()
168            ids = [id(s)]
169            wr = weakref.ref(s)
170            del s
171            gc.collect()
172            self.assert_del_calls(ids)
173            self.assert_survivors(ids)
174            self.assertIsNot(wr(), None)
175            self.clear_survivors()
176            gc.collect()
177            self.assert_del_calls(ids)
178            self.assert_survivors([])
179        self.assertIs(wr(), None)
180
181    def test_non_gc(self):
182        with SimpleBase.test():
183            s = NonGC()
184            self.assertFalse(gc.is_tracked(s))
185            ids = [id(s)]
186            del s
187            gc.collect()
188            self.assert_del_calls(ids)
189            self.assert_survivors([])
190            gc.collect()
191            self.assert_del_calls(ids)
192            self.assert_survivors([])
193
194    def test_non_gc_resurrect(self):
195        with SimpleBase.test():
196            s = NonGCResurrector()
197            self.assertFalse(gc.is_tracked(s))
198            ids = [id(s)]
199            del s
200            gc.collect()
201            self.assert_del_calls(ids)
202            self.assert_survivors(ids)
203            self.clear_survivors()
204            gc.collect()
205            self.assert_del_calls(ids * 2)
206            self.assert_survivors(ids)
207
208
209class SelfCycleBase:
210
211    def __init__(self):
212        super().__init__()
213        self.ref = self
214
215    def check_sanity(self):
216        super().check_sanity()
217        assert self.ref is self
218
219class SimpleSelfCycle(SelfCycleBase, Simple):
220    pass
221
222class SelfCycleResurrector(SelfCycleBase, SimpleResurrector):
223    pass
224
225class SuicidalSelfCycle(SelfCycleBase, Simple):
226
227    def side_effect(self):
228        """
229        Explicitly break the reference cycle.
230        """
231        self.ref = None
232
233
234class SelfCycleFinalizationTest(TestBase, unittest.TestCase):
235    """
236    Test finalization of an object having a single cyclic reference to
237    itself.
238    """
239
240    def test_simple(self):
241        with SimpleBase.test():
242            s = SimpleSelfCycle()
243            ids = [id(s)]
244            wr = weakref.ref(s)
245            del s
246            gc.collect()
247            self.assert_del_calls(ids)
248            self.assert_survivors([])
249            self.assertIs(wr(), None)
250            gc.collect()
251            self.assert_del_calls(ids)
252            self.assert_survivors([])
253
254    def test_simple_resurrect(self):
255        # Test that __del__ can resurrect the object being finalized.
256        with SimpleBase.test():
257            s = SelfCycleResurrector()
258            ids = [id(s)]
259            wr = weakref.ref(s)
260            del s
261            gc.collect()
262            self.assert_del_calls(ids)
263            self.assert_survivors(ids)
264            # XXX is this desirable?
265            self.assertIs(wr(), None)
266            # When trying to destroy the object a second time, __del__
267            # isn't called anymore (and the object isn't resurrected).
268            self.clear_survivors()
269            gc.collect()
270            self.assert_del_calls(ids)
271            self.assert_survivors([])
272            self.assertIs(wr(), None)
273
274    def test_simple_suicide(self):
275        # Test the GC is able to deal with an object that kills its last
276        # reference during __del__.
277        with SimpleBase.test():
278            s = SuicidalSelfCycle()
279            ids = [id(s)]
280            wr = weakref.ref(s)
281            del s
282            gc.collect()
283            self.assert_del_calls(ids)
284            self.assert_survivors([])
285            self.assertIs(wr(), None)
286            gc.collect()
287            self.assert_del_calls(ids)
288            self.assert_survivors([])
289            self.assertIs(wr(), None)
290
291
292class ChainedBase:
293
294    def chain(self, left):
295        self.suicided = False
296        self.left = left
297        left.right = self
298
299    def check_sanity(self):
300        super().check_sanity()
301        if self.suicided:
302            assert self.left is None
303            assert self.right is None
304        else:
305            left = self.left
306            if left.suicided:
307                assert left.right is None
308            else:
309                assert left.right is self
310            right = self.right
311            if right.suicided:
312                assert right.left is None
313            else:
314                assert right.left is self
315
316class SimpleChained(ChainedBase, Simple):
317    pass
318
319class ChainedResurrector(ChainedBase, SimpleResurrector):
320    pass
321
322class SuicidalChained(ChainedBase, Simple):
323
324    def side_effect(self):
325        """
326        Explicitly break the reference cycle.
327        """
328        self.suicided = True
329        self.left = None
330        self.right = None
331
332
333class CycleChainFinalizationTest(TestBase, unittest.TestCase):
334    """
335    Test finalization of a cyclic chain.  These tests are similar in
336    spirit to the self-cycle tests above, but the collectable object
337    graph isn't trivial anymore.
338    """
339
340    def build_chain(self, classes):
341        nodes = [cls() for cls in classes]
342        for i in range(len(nodes)):
343            nodes[i].chain(nodes[i-1])
344        return nodes
345
346    def check_non_resurrecting_chain(self, classes):
347        N = len(classes)
348        with SimpleBase.test():
349            nodes = self.build_chain(classes)
350            ids = [id(s) for s in nodes]
351            wrs = [weakref.ref(s) for s in nodes]
352            del nodes
353            gc.collect()
354            self.assert_del_calls(ids)
355            self.assert_survivors([])
356            self.assertEqual([wr() for wr in wrs], [None] * N)
357            gc.collect()
358            self.assert_del_calls(ids)
359
360    def check_resurrecting_chain(self, classes):
361        N = len(classes)
362        with SimpleBase.test():
363            nodes = self.build_chain(classes)
364            N = len(nodes)
365            ids = [id(s) for s in nodes]
366            survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)]
367            wrs = [weakref.ref(s) for s in nodes]
368            del nodes
369            gc.collect()
370            self.assert_del_calls(ids)
371            self.assert_survivors(survivor_ids)
372            # XXX desirable?
373            self.assertEqual([wr() for wr in wrs], [None] * N)
374            self.clear_survivors()
375            gc.collect()
376            self.assert_del_calls(ids)
377            self.assert_survivors([])
378
379    def test_homogenous(self):
380        self.check_non_resurrecting_chain([SimpleChained] * 3)
381
382    def test_homogenous_resurrect(self):
383        self.check_resurrecting_chain([ChainedResurrector] * 3)
384
385    def test_homogenous_suicidal(self):
386        self.check_non_resurrecting_chain([SuicidalChained] * 3)
387
388    def test_heterogenous_suicidal_one(self):
389        self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2)
390
391    def test_heterogenous_suicidal_two(self):
392        self.check_non_resurrecting_chain(
393            [SuicidalChained] * 2 + [SimpleChained] * 2)
394
395    def test_heterogenous_resurrect_one(self):
396        self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2)
397
398    def test_heterogenous_resurrect_two(self):
399        self.check_resurrecting_chain(
400            [ChainedResurrector, SimpleChained, SuicidalChained] * 2)
401
402    def test_heterogenous_resurrect_three(self):
403        self.check_resurrecting_chain(
404            [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2)
405
406
407# NOTE: the tp_del slot isn't automatically inherited, so we have to call
408# with_tp_del() for each instantiated class.
409
410class LegacyBase(SimpleBase):
411
412    def __del__(self):
413        try:
414            # Do not invoke side_effect here, since we are now exercising
415            # the tp_del slot.
416            if not self._cleaning:
417                self.del_calls.append(id(self))
418                self.check_sanity()
419        except Exception as e:
420            self.errors.append(e)
421
422    def __tp_del__(self):
423        """
424        Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot.
425        """
426        try:
427            if not self._cleaning:
428                self.tp_del_calls.append(id(self))
429                self.check_sanity()
430                self.side_effect()
431        except Exception as e:
432            self.errors.append(e)
433
434@with_tp_del
435class Legacy(LegacyBase):
436    pass
437
438@with_tp_del
439class LegacyResurrector(LegacyBase):
440
441    def side_effect(self):
442        """
443        Resurrect self by storing self in a class-wide list.
444        """
445        self.survivors.append(self)
446
447@with_tp_del
448class LegacySelfCycle(SelfCycleBase, LegacyBase):
449    pass
450
451
452@support.cpython_only
453class LegacyFinalizationTest(TestBase, unittest.TestCase):
454    """
455    Test finalization of objects with a tp_del.
456    """
457
458    def tearDown(self):
459        # These tests need to clean up a bit more, since they create
460        # uncollectable objects.
461        gc.garbage.clear()
462        gc.collect()
463        super().tearDown()
464
465    def test_legacy(self):
466        with SimpleBase.test():
467            s = Legacy()
468            ids = [id(s)]
469            wr = weakref.ref(s)
470            del s
471            gc.collect()
472            self.assert_del_calls(ids)
473            self.assert_tp_del_calls(ids)
474            self.assert_survivors([])
475            self.assertIs(wr(), None)
476            gc.collect()
477            self.assert_del_calls(ids)
478            self.assert_tp_del_calls(ids)
479
480    def test_legacy_resurrect(self):
481        with SimpleBase.test():
482            s = LegacyResurrector()
483            ids = [id(s)]
484            wr = weakref.ref(s)
485            del s
486            gc.collect()
487            self.assert_del_calls(ids)
488            self.assert_tp_del_calls(ids)
489            self.assert_survivors(ids)
490            # weakrefs are cleared before tp_del is called.
491            self.assertIs(wr(), None)
492            self.clear_survivors()
493            gc.collect()
494            self.assert_del_calls(ids)
495            self.assert_tp_del_calls(ids * 2)
496            self.assert_survivors(ids)
497        self.assertIs(wr(), None)
498
499    def test_legacy_self_cycle(self):
500        # Self-cycles with legacy finalizers end up in gc.garbage.
501        with SimpleBase.test():
502            s = LegacySelfCycle()
503            ids = [id(s)]
504            wr = weakref.ref(s)
505            del s
506            gc.collect()
507            self.assert_del_calls([])
508            self.assert_tp_del_calls([])
509            self.assert_survivors([])
510            self.assert_garbage(ids)
511            self.assertIsNot(wr(), None)
512            # Break the cycle to allow collection
513            gc.garbage[0].ref = None
514        self.assert_garbage([])
515        self.assertIs(wr(), None)
516
517
518if __name__ == "__main__":
519    unittest.main()
520