1""" 2Tests for object finalization semantics, as outlined in PEP 442. 3""" 4 5import contextlib 6import gc 7import unittest 8import weakref 9 10try: 11 from _testcapi import with_tp_del 12except ImportError: 13 def with_tp_del(cls): 14 class C(object): 15 def __new__(cls, *args, **kwargs): 16 raise TypeError('requires _testcapi.with_tp_del') 17 return C 18 19from test import support 20 21 22class NonGCSimpleBase: 23 """ 24 The base class for all the objects under test, equipped with various 25 testing features. 26 """ 27 28 survivors = [] 29 del_calls = [] 30 tp_del_calls = [] 31 errors = [] 32 33 _cleaning = False 34 35 __slots__ = () 36 37 @classmethod 38 def _cleanup(cls): 39 cls.survivors.clear() 40 cls.errors.clear() 41 gc.garbage.clear() 42 gc.collect() 43 cls.del_calls.clear() 44 cls.tp_del_calls.clear() 45 46 @classmethod 47 @contextlib.contextmanager 48 def test(cls): 49 """ 50 A context manager to use around all finalization tests. 51 """ 52 with support.disable_gc(): 53 cls.del_calls.clear() 54 cls.tp_del_calls.clear() 55 NonGCSimpleBase._cleaning = False 56 try: 57 yield 58 if cls.errors: 59 raise cls.errors[0] 60 finally: 61 NonGCSimpleBase._cleaning = True 62 cls._cleanup() 63 64 def check_sanity(self): 65 """ 66 Check the object is sane (non-broken). 67 """ 68 69 def __del__(self): 70 """ 71 PEP 442 finalizer. Record that this was called, check the 72 object is in a sane state, and invoke a side effect. 73 """ 74 try: 75 if not self._cleaning: 76 self.del_calls.append(id(self)) 77 self.check_sanity() 78 self.side_effect() 79 except Exception as e: 80 self.errors.append(e) 81 82 def side_effect(self): 83 """ 84 A side effect called on destruction. 85 """ 86 87 88class SimpleBase(NonGCSimpleBase): 89 90 def __init__(self): 91 self.id_ = id(self) 92 93 def check_sanity(self): 94 assert self.id_ == id(self) 95 96 97class NonGC(NonGCSimpleBase): 98 __slots__ = () 99 100class NonGCResurrector(NonGCSimpleBase): 101 __slots__ = () 102 103 def side_effect(self): 104 """ 105 Resurrect self by storing self in a class-wide list. 106 """ 107 self.survivors.append(self) 108 109class Simple(SimpleBase): 110 pass 111 112class SimpleResurrector(NonGCResurrector, SimpleBase): 113 pass 114 115 116class TestBase: 117 118 def setUp(self): 119 self.old_garbage = gc.garbage[:] 120 gc.garbage[:] = [] 121 122 def tearDown(self): 123 # None of the tests here should put anything in gc.garbage 124 try: 125 self.assertEqual(gc.garbage, []) 126 finally: 127 del self.old_garbage 128 gc.collect() 129 130 def assert_del_calls(self, ids): 131 self.assertEqual(sorted(SimpleBase.del_calls), sorted(ids)) 132 133 def assert_tp_del_calls(self, ids): 134 self.assertEqual(sorted(SimpleBase.tp_del_calls), sorted(ids)) 135 136 def assert_survivors(self, ids): 137 self.assertEqual(sorted(id(x) for x in SimpleBase.survivors), sorted(ids)) 138 139 def assert_garbage(self, ids): 140 self.assertEqual(sorted(id(x) for x in gc.garbage), sorted(ids)) 141 142 def clear_survivors(self): 143 SimpleBase.survivors.clear() 144 145 146class SimpleFinalizationTest(TestBase, unittest.TestCase): 147 """ 148 Test finalization without refcycles. 149 """ 150 151 def test_simple(self): 152 with SimpleBase.test(): 153 s = Simple() 154 ids = [id(s)] 155 wr = weakref.ref(s) 156 del s 157 gc.collect() 158 self.assert_del_calls(ids) 159 self.assert_survivors([]) 160 self.assertIs(wr(), None) 161 gc.collect() 162 self.assert_del_calls(ids) 163 self.assert_survivors([]) 164 165 def test_simple_resurrect(self): 166 with SimpleBase.test(): 167 s = SimpleResurrector() 168 ids = [id(s)] 169 wr = weakref.ref(s) 170 del s 171 gc.collect() 172 self.assert_del_calls(ids) 173 self.assert_survivors(ids) 174 self.assertIsNot(wr(), None) 175 self.clear_survivors() 176 gc.collect() 177 self.assert_del_calls(ids) 178 self.assert_survivors([]) 179 self.assertIs(wr(), None) 180 181 def test_non_gc(self): 182 with SimpleBase.test(): 183 s = NonGC() 184 self.assertFalse(gc.is_tracked(s)) 185 ids = [id(s)] 186 del s 187 gc.collect() 188 self.assert_del_calls(ids) 189 self.assert_survivors([]) 190 gc.collect() 191 self.assert_del_calls(ids) 192 self.assert_survivors([]) 193 194 def test_non_gc_resurrect(self): 195 with SimpleBase.test(): 196 s = NonGCResurrector() 197 self.assertFalse(gc.is_tracked(s)) 198 ids = [id(s)] 199 del s 200 gc.collect() 201 self.assert_del_calls(ids) 202 self.assert_survivors(ids) 203 self.clear_survivors() 204 gc.collect() 205 self.assert_del_calls(ids * 2) 206 self.assert_survivors(ids) 207 208 209class SelfCycleBase: 210 211 def __init__(self): 212 super().__init__() 213 self.ref = self 214 215 def check_sanity(self): 216 super().check_sanity() 217 assert self.ref is self 218 219class SimpleSelfCycle(SelfCycleBase, Simple): 220 pass 221 222class SelfCycleResurrector(SelfCycleBase, SimpleResurrector): 223 pass 224 225class SuicidalSelfCycle(SelfCycleBase, Simple): 226 227 def side_effect(self): 228 """ 229 Explicitly break the reference cycle. 230 """ 231 self.ref = None 232 233 234class SelfCycleFinalizationTest(TestBase, unittest.TestCase): 235 """ 236 Test finalization of an object having a single cyclic reference to 237 itself. 238 """ 239 240 def test_simple(self): 241 with SimpleBase.test(): 242 s = SimpleSelfCycle() 243 ids = [id(s)] 244 wr = weakref.ref(s) 245 del s 246 gc.collect() 247 self.assert_del_calls(ids) 248 self.assert_survivors([]) 249 self.assertIs(wr(), None) 250 gc.collect() 251 self.assert_del_calls(ids) 252 self.assert_survivors([]) 253 254 def test_simple_resurrect(self): 255 # Test that __del__ can resurrect the object being finalized. 256 with SimpleBase.test(): 257 s = SelfCycleResurrector() 258 ids = [id(s)] 259 wr = weakref.ref(s) 260 del s 261 gc.collect() 262 self.assert_del_calls(ids) 263 self.assert_survivors(ids) 264 # XXX is this desirable? 265 self.assertIs(wr(), None) 266 # When trying to destroy the object a second time, __del__ 267 # isn't called anymore (and the object isn't resurrected). 268 self.clear_survivors() 269 gc.collect() 270 self.assert_del_calls(ids) 271 self.assert_survivors([]) 272 self.assertIs(wr(), None) 273 274 def test_simple_suicide(self): 275 # Test the GC is able to deal with an object that kills its last 276 # reference during __del__. 277 with SimpleBase.test(): 278 s = SuicidalSelfCycle() 279 ids = [id(s)] 280 wr = weakref.ref(s) 281 del s 282 gc.collect() 283 self.assert_del_calls(ids) 284 self.assert_survivors([]) 285 self.assertIs(wr(), None) 286 gc.collect() 287 self.assert_del_calls(ids) 288 self.assert_survivors([]) 289 self.assertIs(wr(), None) 290 291 292class ChainedBase: 293 294 def chain(self, left): 295 self.suicided = False 296 self.left = left 297 left.right = self 298 299 def check_sanity(self): 300 super().check_sanity() 301 if self.suicided: 302 assert self.left is None 303 assert self.right is None 304 else: 305 left = self.left 306 if left.suicided: 307 assert left.right is None 308 else: 309 assert left.right is self 310 right = self.right 311 if right.suicided: 312 assert right.left is None 313 else: 314 assert right.left is self 315 316class SimpleChained(ChainedBase, Simple): 317 pass 318 319class ChainedResurrector(ChainedBase, SimpleResurrector): 320 pass 321 322class SuicidalChained(ChainedBase, Simple): 323 324 def side_effect(self): 325 """ 326 Explicitly break the reference cycle. 327 """ 328 self.suicided = True 329 self.left = None 330 self.right = None 331 332 333class CycleChainFinalizationTest(TestBase, unittest.TestCase): 334 """ 335 Test finalization of a cyclic chain. These tests are similar in 336 spirit to the self-cycle tests above, but the collectable object 337 graph isn't trivial anymore. 338 """ 339 340 def build_chain(self, classes): 341 nodes = [cls() for cls in classes] 342 for i in range(len(nodes)): 343 nodes[i].chain(nodes[i-1]) 344 return nodes 345 346 def check_non_resurrecting_chain(self, classes): 347 N = len(classes) 348 with SimpleBase.test(): 349 nodes = self.build_chain(classes) 350 ids = [id(s) for s in nodes] 351 wrs = [weakref.ref(s) for s in nodes] 352 del nodes 353 gc.collect() 354 self.assert_del_calls(ids) 355 self.assert_survivors([]) 356 self.assertEqual([wr() for wr in wrs], [None] * N) 357 gc.collect() 358 self.assert_del_calls(ids) 359 360 def check_resurrecting_chain(self, classes): 361 N = len(classes) 362 with SimpleBase.test(): 363 nodes = self.build_chain(classes) 364 N = len(nodes) 365 ids = [id(s) for s in nodes] 366 survivor_ids = [id(s) for s in nodes if isinstance(s, SimpleResurrector)] 367 wrs = [weakref.ref(s) for s in nodes] 368 del nodes 369 gc.collect() 370 self.assert_del_calls(ids) 371 self.assert_survivors(survivor_ids) 372 # XXX desirable? 373 self.assertEqual([wr() for wr in wrs], [None] * N) 374 self.clear_survivors() 375 gc.collect() 376 self.assert_del_calls(ids) 377 self.assert_survivors([]) 378 379 def test_homogenous(self): 380 self.check_non_resurrecting_chain([SimpleChained] * 3) 381 382 def test_homogenous_resurrect(self): 383 self.check_resurrecting_chain([ChainedResurrector] * 3) 384 385 def test_homogenous_suicidal(self): 386 self.check_non_resurrecting_chain([SuicidalChained] * 3) 387 388 def test_heterogenous_suicidal_one(self): 389 self.check_non_resurrecting_chain([SuicidalChained, SimpleChained] * 2) 390 391 def test_heterogenous_suicidal_two(self): 392 self.check_non_resurrecting_chain( 393 [SuicidalChained] * 2 + [SimpleChained] * 2) 394 395 def test_heterogenous_resurrect_one(self): 396 self.check_resurrecting_chain([ChainedResurrector, SimpleChained] * 2) 397 398 def test_heterogenous_resurrect_two(self): 399 self.check_resurrecting_chain( 400 [ChainedResurrector, SimpleChained, SuicidalChained] * 2) 401 402 def test_heterogenous_resurrect_three(self): 403 self.check_resurrecting_chain( 404 [ChainedResurrector] * 2 + [SimpleChained] * 2 + [SuicidalChained] * 2) 405 406 407# NOTE: the tp_del slot isn't automatically inherited, so we have to call 408# with_tp_del() for each instantiated class. 409 410class LegacyBase(SimpleBase): 411 412 def __del__(self): 413 try: 414 # Do not invoke side_effect here, since we are now exercising 415 # the tp_del slot. 416 if not self._cleaning: 417 self.del_calls.append(id(self)) 418 self.check_sanity() 419 except Exception as e: 420 self.errors.append(e) 421 422 def __tp_del__(self): 423 """ 424 Legacy (pre-PEP 442) finalizer, mapped to a tp_del slot. 425 """ 426 try: 427 if not self._cleaning: 428 self.tp_del_calls.append(id(self)) 429 self.check_sanity() 430 self.side_effect() 431 except Exception as e: 432 self.errors.append(e) 433 434@with_tp_del 435class Legacy(LegacyBase): 436 pass 437 438@with_tp_del 439class LegacyResurrector(LegacyBase): 440 441 def side_effect(self): 442 """ 443 Resurrect self by storing self in a class-wide list. 444 """ 445 self.survivors.append(self) 446 447@with_tp_del 448class LegacySelfCycle(SelfCycleBase, LegacyBase): 449 pass 450 451 452@support.cpython_only 453class LegacyFinalizationTest(TestBase, unittest.TestCase): 454 """ 455 Test finalization of objects with a tp_del. 456 """ 457 458 def tearDown(self): 459 # These tests need to clean up a bit more, since they create 460 # uncollectable objects. 461 gc.garbage.clear() 462 gc.collect() 463 super().tearDown() 464 465 def test_legacy(self): 466 with SimpleBase.test(): 467 s = Legacy() 468 ids = [id(s)] 469 wr = weakref.ref(s) 470 del s 471 gc.collect() 472 self.assert_del_calls(ids) 473 self.assert_tp_del_calls(ids) 474 self.assert_survivors([]) 475 self.assertIs(wr(), None) 476 gc.collect() 477 self.assert_del_calls(ids) 478 self.assert_tp_del_calls(ids) 479 480 def test_legacy_resurrect(self): 481 with SimpleBase.test(): 482 s = LegacyResurrector() 483 ids = [id(s)] 484 wr = weakref.ref(s) 485 del s 486 gc.collect() 487 self.assert_del_calls(ids) 488 self.assert_tp_del_calls(ids) 489 self.assert_survivors(ids) 490 # weakrefs are cleared before tp_del is called. 491 self.assertIs(wr(), None) 492 self.clear_survivors() 493 gc.collect() 494 self.assert_del_calls(ids) 495 self.assert_tp_del_calls(ids * 2) 496 self.assert_survivors(ids) 497 self.assertIs(wr(), None) 498 499 def test_legacy_self_cycle(self): 500 # Self-cycles with legacy finalizers end up in gc.garbage. 501 with SimpleBase.test(): 502 s = LegacySelfCycle() 503 ids = [id(s)] 504 wr = weakref.ref(s) 505 del s 506 gc.collect() 507 self.assert_del_calls([]) 508 self.assert_tp_del_calls([]) 509 self.assert_survivors([]) 510 self.assert_garbage(ids) 511 self.assertIsNot(wr(), None) 512 # Break the cycle to allow collection 513 gc.garbage[0].ref = None 514 self.assert_garbage([]) 515 self.assertIs(wr(), None) 516 517 518if __name__ == "__main__": 519 unittest.main() 520