1import gc 2import sys 3import unittest 4import collections 5import weakref 6import operator 7import contextlib 8import copy 9import threading 10import time 11import random 12 13from test import support 14from test.support import script_helper, ALWAYS_EQ 15from test.support import gc_collect 16 17# Used in ReferencesTestCase.test_ref_created_during_del() . 18ref_from_del = None 19 20# Used by FinalizeTestCase as a global that may be replaced by None 21# when the interpreter shuts down. 22_global_var = 'foobar' 23 24class C: 25 def method(self): 26 pass 27 28 29class Callable: 30 bar = None 31 32 def __call__(self, x): 33 self.bar = x 34 35 36def create_function(): 37 def f(): pass 38 return f 39 40def create_bound_method(): 41 return C().method 42 43 44class Object: 45 def __init__(self, arg): 46 self.arg = arg 47 def __repr__(self): 48 return "<Object %r>" % self.arg 49 def __eq__(self, other): 50 if isinstance(other, Object): 51 return self.arg == other.arg 52 return NotImplemented 53 def __lt__(self, other): 54 if isinstance(other, Object): 55 return self.arg < other.arg 56 return NotImplemented 57 def __hash__(self): 58 return hash(self.arg) 59 def some_method(self): 60 return 4 61 def other_method(self): 62 return 5 63 64 65class RefCycle: 66 def __init__(self): 67 self.cycle = self 68 69 70class TestBase(unittest.TestCase): 71 72 def setUp(self): 73 self.cbcalled = 0 74 75 def callback(self, ref): 76 self.cbcalled += 1 77 78 79@contextlib.contextmanager 80def collect_in_thread(period=0.0001): 81 """ 82 Ensure GC collections happen in a different thread, at a high frequency. 83 """ 84 please_stop = False 85 86 def collect(): 87 while not please_stop: 88 time.sleep(period) 89 gc.collect() 90 91 with support.disable_gc(): 92 t = threading.Thread(target=collect) 93 t.start() 94 try: 95 yield 96 finally: 97 please_stop = True 98 t.join() 99 100 101class ReferencesTestCase(TestBase): 102 103 def test_basic_ref(self): 104 self.check_basic_ref(C) 105 self.check_basic_ref(create_function) 106 self.check_basic_ref(create_bound_method) 107 108 # Just make sure the tp_repr handler doesn't raise an exception. 109 # Live reference: 110 o = C() 111 wr = weakref.ref(o) 112 repr(wr) 113 # Dead reference: 114 del o 115 repr(wr) 116 117 def test_basic_callback(self): 118 self.check_basic_callback(C) 119 self.check_basic_callback(create_function) 120 self.check_basic_callback(create_bound_method) 121 122 @support.cpython_only 123 def test_cfunction(self): 124 import _testcapi 125 create_cfunction = _testcapi.create_cfunction 126 f = create_cfunction() 127 wr = weakref.ref(f) 128 self.assertIs(wr(), f) 129 del f 130 self.assertIsNone(wr()) 131 self.check_basic_ref(create_cfunction) 132 self.check_basic_callback(create_cfunction) 133 134 def test_multiple_callbacks(self): 135 o = C() 136 ref1 = weakref.ref(o, self.callback) 137 ref2 = weakref.ref(o, self.callback) 138 del o 139 gc_collect() # For PyPy or other GCs. 140 self.assertIsNone(ref1(), "expected reference to be invalidated") 141 self.assertIsNone(ref2(), "expected reference to be invalidated") 142 self.assertEqual(self.cbcalled, 2, 143 "callback not called the right number of times") 144 145 def test_multiple_selfref_callbacks(self): 146 # Make sure all references are invalidated before callbacks are called 147 # 148 # What's important here is that we're using the first 149 # reference in the callback invoked on the second reference 150 # (the most recently created ref is cleaned up first). This 151 # tests that all references to the object are invalidated 152 # before any of the callbacks are invoked, so that we only 153 # have one invocation of _weakref.c:cleanup_helper() active 154 # for a particular object at a time. 155 # 156 def callback(object, self=self): 157 self.ref() 158 c = C() 159 self.ref = weakref.ref(c, callback) 160 ref1 = weakref.ref(c, callback) 161 del c 162 163 def test_constructor_kwargs(self): 164 c = C() 165 self.assertRaises(TypeError, weakref.ref, c, callback=None) 166 167 def test_proxy_ref(self): 168 o = C() 169 o.bar = 1 170 ref1 = weakref.proxy(o, self.callback) 171 ref2 = weakref.proxy(o, self.callback) 172 del o 173 gc_collect() # For PyPy or other GCs. 174 175 def check(proxy): 176 proxy.bar 177 178 self.assertRaises(ReferenceError, check, ref1) 179 self.assertRaises(ReferenceError, check, ref2) 180 ref3 = weakref.proxy(C()) 181 gc_collect() # For PyPy or other GCs. 182 self.assertRaises(ReferenceError, bool, ref3) 183 self.assertEqual(self.cbcalled, 2) 184 185 def check_basic_ref(self, factory): 186 o = factory() 187 ref = weakref.ref(o) 188 self.assertIsNotNone(ref(), 189 "weak reference to live object should be live") 190 o2 = ref() 191 self.assertIs(o, o2, 192 "<ref>() should return original object if live") 193 194 def check_basic_callback(self, factory): 195 self.cbcalled = 0 196 o = factory() 197 ref = weakref.ref(o, self.callback) 198 del o 199 gc_collect() # For PyPy or other GCs. 200 self.assertEqual(self.cbcalled, 1, 201 "callback did not properly set 'cbcalled'") 202 self.assertIsNone(ref(), 203 "ref2 should be dead after deleting object reference") 204 205 def test_ref_reuse(self): 206 o = C() 207 ref1 = weakref.ref(o) 208 # create a proxy to make sure that there's an intervening creation 209 # between these two; it should make no difference 210 proxy = weakref.proxy(o) 211 ref2 = weakref.ref(o) 212 self.assertIs(ref1, ref2, 213 "reference object w/out callback should be re-used") 214 215 o = C() 216 proxy = weakref.proxy(o) 217 ref1 = weakref.ref(o) 218 ref2 = weakref.ref(o) 219 self.assertIs(ref1, ref2, 220 "reference object w/out callback should be re-used") 221 self.assertEqual(weakref.getweakrefcount(o), 2, 222 "wrong weak ref count for object") 223 del proxy 224 gc_collect() # For PyPy or other GCs. 225 self.assertEqual(weakref.getweakrefcount(o), 1, 226 "wrong weak ref count for object after deleting proxy") 227 228 def test_proxy_reuse(self): 229 o = C() 230 proxy1 = weakref.proxy(o) 231 ref = weakref.ref(o) 232 proxy2 = weakref.proxy(o) 233 self.assertIs(proxy1, proxy2, 234 "proxy object w/out callback should have been re-used") 235 236 def test_basic_proxy(self): 237 o = C() 238 self.check_proxy(o, weakref.proxy(o)) 239 240 L = collections.UserList() 241 p = weakref.proxy(L) 242 self.assertFalse(p, "proxy for empty UserList should be false") 243 p.append(12) 244 self.assertEqual(len(L), 1) 245 self.assertTrue(p, "proxy for non-empty UserList should be true") 246 p[:] = [2, 3] 247 self.assertEqual(len(L), 2) 248 self.assertEqual(len(p), 2) 249 self.assertIn(3, p, "proxy didn't support __contains__() properly") 250 p[1] = 5 251 self.assertEqual(L[1], 5) 252 self.assertEqual(p[1], 5) 253 L2 = collections.UserList(L) 254 p2 = weakref.proxy(L2) 255 self.assertEqual(p, p2) 256 ## self.assertEqual(repr(L2), repr(p2)) 257 L3 = collections.UserList(range(10)) 258 p3 = weakref.proxy(L3) 259 self.assertEqual(L3[:], p3[:]) 260 self.assertEqual(L3[5:], p3[5:]) 261 self.assertEqual(L3[:5], p3[:5]) 262 self.assertEqual(L3[2:5], p3[2:5]) 263 264 def test_proxy_unicode(self): 265 # See bug 5037 266 class C(object): 267 def __str__(self): 268 return "string" 269 def __bytes__(self): 270 return b"bytes" 271 instance = C() 272 self.assertIn("__bytes__", dir(weakref.proxy(instance))) 273 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") 274 275 def test_proxy_index(self): 276 class C: 277 def __index__(self): 278 return 10 279 o = C() 280 p = weakref.proxy(o) 281 self.assertEqual(operator.index(p), 10) 282 283 def test_proxy_div(self): 284 class C: 285 def __floordiv__(self, other): 286 return 42 287 def __ifloordiv__(self, other): 288 return 21 289 o = C() 290 p = weakref.proxy(o) 291 self.assertEqual(p // 5, 42) 292 p //= 5 293 self.assertEqual(p, 21) 294 295 def test_proxy_matmul(self): 296 class C: 297 def __matmul__(self, other): 298 return 1729 299 def __rmatmul__(self, other): 300 return -163 301 def __imatmul__(self, other): 302 return 561 303 o = C() 304 p = weakref.proxy(o) 305 self.assertEqual(p @ 5, 1729) 306 self.assertEqual(5 @ p, -163) 307 p @= 5 308 self.assertEqual(p, 561) 309 310 # The PyWeakref_* C API is documented as allowing either NULL or 311 # None as the value for the callback, where either means "no 312 # callback". The "no callback" ref and proxy objects are supposed 313 # to be shared so long as they exist by all callers so long as 314 # they are active. In Python 2.3.3 and earlier, this guarantee 315 # was not honored, and was broken in different ways for 316 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 317 318 def test_shared_ref_without_callback(self): 319 self.check_shared_without_callback(weakref.ref) 320 321 def test_shared_proxy_without_callback(self): 322 self.check_shared_without_callback(weakref.proxy) 323 324 def check_shared_without_callback(self, makeref): 325 o = Object(1) 326 p1 = makeref(o, None) 327 p2 = makeref(o, None) 328 self.assertIs(p1, p2, "both callbacks were None in the C API") 329 del p1, p2 330 p1 = makeref(o) 331 p2 = makeref(o, None) 332 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 333 del p1, p2 334 p1 = makeref(o) 335 p2 = makeref(o) 336 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 337 del p1, p2 338 p1 = makeref(o, None) 339 p2 = makeref(o) 340 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 341 342 def test_callable_proxy(self): 343 o = Callable() 344 ref1 = weakref.proxy(o) 345 346 self.check_proxy(o, ref1) 347 348 self.assertIs(type(ref1), weakref.CallableProxyType, 349 "proxy is not of callable type") 350 ref1('twinkies!') 351 self.assertEqual(o.bar, 'twinkies!', 352 "call through proxy not passed through to original") 353 ref1(x='Splat.') 354 self.assertEqual(o.bar, 'Splat.', 355 "call through proxy not passed through to original") 356 357 # expect due to too few args 358 self.assertRaises(TypeError, ref1) 359 360 # expect due to too many args 361 self.assertRaises(TypeError, ref1, 1, 2, 3) 362 363 def check_proxy(self, o, proxy): 364 o.foo = 1 365 self.assertEqual(proxy.foo, 1, 366 "proxy does not reflect attribute addition") 367 o.foo = 2 368 self.assertEqual(proxy.foo, 2, 369 "proxy does not reflect attribute modification") 370 del o.foo 371 self.assertFalse(hasattr(proxy, 'foo'), 372 "proxy does not reflect attribute removal") 373 374 proxy.foo = 1 375 self.assertEqual(o.foo, 1, 376 "object does not reflect attribute addition via proxy") 377 proxy.foo = 2 378 self.assertEqual(o.foo, 2, 379 "object does not reflect attribute modification via proxy") 380 del proxy.foo 381 self.assertFalse(hasattr(o, 'foo'), 382 "object does not reflect attribute removal via proxy") 383 384 def test_proxy_deletion(self): 385 # Test clearing of SF bug #762891 386 class Foo: 387 result = None 388 def __delitem__(self, accessor): 389 self.result = accessor 390 g = Foo() 391 f = weakref.proxy(g) 392 del f[0] 393 self.assertEqual(f.result, 0) 394 395 def test_proxy_bool(self): 396 # Test clearing of SF bug #1170766 397 class List(list): pass 398 lyst = List() 399 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 400 401 def test_proxy_iter(self): 402 # Test fails with a debug build of the interpreter 403 # (see bpo-38395). 404 405 obj = None 406 407 class MyObj: 408 def __iter__(self): 409 nonlocal obj 410 del obj 411 return NotImplemented 412 413 obj = MyObj() 414 p = weakref.proxy(obj) 415 with self.assertRaises(TypeError): 416 # "blech" in p calls MyObj.__iter__ through the proxy, 417 # without keeping a reference to the real object, so it 418 # can be killed in the middle of the call 419 "blech" in p 420 421 def test_proxy_next(self): 422 arr = [4, 5, 6] 423 def iterator_func(): 424 yield from arr 425 it = iterator_func() 426 427 class IteratesWeakly: 428 def __iter__(self): 429 return weakref.proxy(it) 430 431 weak_it = IteratesWeakly() 432 433 # Calls proxy.__next__ 434 self.assertEqual(list(weak_it), [4, 5, 6]) 435 436 def test_proxy_bad_next(self): 437 # bpo-44720: PyIter_Next() shouldn't be called if the reference 438 # isn't an iterator. 439 440 not_an_iterator = lambda: 0 441 442 class A: 443 def __iter__(self): 444 return weakref.proxy(not_an_iterator) 445 a = A() 446 447 msg = "Weakref proxy referenced a non-iterator" 448 with self.assertRaisesRegex(TypeError, msg): 449 list(a) 450 451 def test_proxy_reversed(self): 452 class MyObj: 453 def __len__(self): 454 return 3 455 def __reversed__(self): 456 return iter('cba') 457 458 obj = MyObj() 459 self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba") 460 461 def test_proxy_hash(self): 462 class MyObj: 463 def __hash__(self): 464 return 42 465 466 obj = MyObj() 467 with self.assertRaises(TypeError): 468 hash(weakref.proxy(obj)) 469 470 class MyObj: 471 __hash__ = None 472 473 obj = MyObj() 474 with self.assertRaises(TypeError): 475 hash(weakref.proxy(obj)) 476 477 def test_getweakrefcount(self): 478 o = C() 479 ref1 = weakref.ref(o) 480 ref2 = weakref.ref(o, self.callback) 481 self.assertEqual(weakref.getweakrefcount(o), 2, 482 "got wrong number of weak reference objects") 483 484 proxy1 = weakref.proxy(o) 485 proxy2 = weakref.proxy(o, self.callback) 486 self.assertEqual(weakref.getweakrefcount(o), 4, 487 "got wrong number of weak reference objects") 488 489 del ref1, ref2, proxy1, proxy2 490 gc_collect() # For PyPy or other GCs. 491 self.assertEqual(weakref.getweakrefcount(o), 0, 492 "weak reference objects not unlinked from" 493 " referent when discarded.") 494 495 # assumes ints do not support weakrefs 496 self.assertEqual(weakref.getweakrefcount(1), 0, 497 "got wrong number of weak reference objects for int") 498 499 def test_getweakrefs(self): 500 o = C() 501 ref1 = weakref.ref(o, self.callback) 502 ref2 = weakref.ref(o, self.callback) 503 del ref1 504 gc_collect() # For PyPy or other GCs. 505 self.assertEqual(weakref.getweakrefs(o), [ref2], 506 "list of refs does not match") 507 508 o = C() 509 ref1 = weakref.ref(o, self.callback) 510 ref2 = weakref.ref(o, self.callback) 511 del ref2 512 gc_collect() # For PyPy or other GCs. 513 self.assertEqual(weakref.getweakrefs(o), [ref1], 514 "list of refs does not match") 515 516 del ref1 517 gc_collect() # For PyPy or other GCs. 518 self.assertEqual(weakref.getweakrefs(o), [], 519 "list of refs not cleared") 520 521 # assumes ints do not support weakrefs 522 self.assertEqual(weakref.getweakrefs(1), [], 523 "list of refs does not match for int") 524 525 def test_newstyle_number_ops(self): 526 class F(float): 527 pass 528 f = F(2.0) 529 p = weakref.proxy(f) 530 self.assertEqual(p + 1.0, 3.0) 531 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 532 533 def test_callbacks_protected(self): 534 # Callbacks protected from already-set exceptions? 535 # Regression test for SF bug #478534. 536 class BogusError(Exception): 537 pass 538 data = {} 539 def remove(k): 540 del data[k] 541 def encapsulate(): 542 f = lambda : () 543 data[weakref.ref(f, remove)] = None 544 raise BogusError 545 try: 546 encapsulate() 547 except BogusError: 548 pass 549 else: 550 self.fail("exception not properly restored") 551 try: 552 encapsulate() 553 except BogusError: 554 pass 555 else: 556 self.fail("exception not properly restored") 557 558 def test_sf_bug_840829(self): 559 # "weakref callbacks and gc corrupt memory" 560 # subtype_dealloc erroneously exposed a new-style instance 561 # already in the process of getting deallocated to gc, 562 # causing double-deallocation if the instance had a weakref 563 # callback that triggered gc. 564 # If the bug exists, there probably won't be an obvious symptom 565 # in a release build. In a debug build, a segfault will occur 566 # when the second attempt to remove the instance from the "list 567 # of all objects" occurs. 568 569 import gc 570 571 class C(object): 572 pass 573 574 c = C() 575 wr = weakref.ref(c, lambda ignore: gc.collect()) 576 del c 577 578 # There endeth the first part. It gets worse. 579 del wr 580 581 c1 = C() 582 c1.i = C() 583 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 584 585 c2 = C() 586 c2.c1 = c1 587 del c1 # still alive because c2 points to it 588 589 # Now when subtype_dealloc gets called on c2, it's not enough just 590 # that c2 is immune from gc while the weakref callbacks associated 591 # with c2 execute (there are none in this 2nd half of the test, btw). 592 # subtype_dealloc goes on to call the base classes' deallocs too, 593 # so any gc triggered by weakref callbacks associated with anything 594 # torn down by a base class dealloc can also trigger double 595 # deallocation of c2. 596 del c2 597 598 def test_callback_in_cycle_1(self): 599 import gc 600 601 class J(object): 602 pass 603 604 class II(object): 605 def acallback(self, ignore): 606 self.J 607 608 I = II() 609 I.J = J 610 I.wr = weakref.ref(J, I.acallback) 611 612 # Now J and II are each in a self-cycle (as all new-style class 613 # objects are, since their __mro__ points back to them). I holds 614 # both a weak reference (I.wr) and a strong reference (I.J) to class 615 # J. I is also in a cycle (I.wr points to a weakref that references 616 # I.acallback). When we del these three, they all become trash, but 617 # the cycles prevent any of them from getting cleaned up immediately. 618 # Instead they have to wait for cyclic gc to deduce that they're 619 # trash. 620 # 621 # gc used to call tp_clear on all of them, and the order in which 622 # it does that is pretty accidental. The exact order in which we 623 # built up these things manages to provoke gc into running tp_clear 624 # in just the right order (I last). Calling tp_clear on II leaves 625 # behind an insane class object (its __mro__ becomes NULL). Calling 626 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 627 # just then because of the strong reference from I.J. Calling 628 # tp_clear on I starts to clear I's __dict__, and just happens to 629 # clear I.J first -- I.wr is still intact. That removes the last 630 # reference to J, which triggers the weakref callback. The callback 631 # tries to do "self.J", and instances of new-style classes look up 632 # attributes ("J") in the class dict first. The class (II) wants to 633 # search II.__mro__, but that's NULL. The result was a segfault in 634 # a release build, and an assert failure in a debug build. 635 del I, J, II 636 gc.collect() 637 638 def test_callback_in_cycle_2(self): 639 import gc 640 641 # This is just like test_callback_in_cycle_1, except that II is an 642 # old-style class. The symptom is different then: an instance of an 643 # old-style class looks in its own __dict__ first. 'J' happens to 644 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 645 # __dict__, so the attribute isn't found. The difference is that 646 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 647 # __mro__), so no segfault occurs. Instead it got: 648 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 649 # Exception exceptions.AttributeError: 650 # "II instance has no attribute 'J'" in <bound method II.acallback 651 # of <?.II instance at 0x00B9B4B8>> ignored 652 653 class J(object): 654 pass 655 656 class II: 657 def acallback(self, ignore): 658 self.J 659 660 I = II() 661 I.J = J 662 I.wr = weakref.ref(J, I.acallback) 663 664 del I, J, II 665 gc.collect() 666 667 def test_callback_in_cycle_3(self): 668 import gc 669 670 # This one broke the first patch that fixed the last two. In this 671 # case, the objects reachable from the callback aren't also reachable 672 # from the object (c1) *triggering* the callback: you can get to 673 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 674 # got tp_clear'ed by the time the c2.cb callback got invoked. 675 676 class C: 677 def cb(self, ignore): 678 self.me 679 self.c1 680 self.wr 681 682 c1, c2 = C(), C() 683 684 c2.me = c2 685 c2.c1 = c1 686 c2.wr = weakref.ref(c1, c2.cb) 687 688 del c1, c2 689 gc.collect() 690 691 def test_callback_in_cycle_4(self): 692 import gc 693 694 # Like test_callback_in_cycle_3, except c2 and c1 have different 695 # classes. c2's class (C) isn't reachable from c1 then, so protecting 696 # objects reachable from the dying object (c1) isn't enough to stop 697 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 698 # The result was a segfault (C.__mro__ was NULL when the callback 699 # tried to look up self.me). 700 701 class C(object): 702 def cb(self, ignore): 703 self.me 704 self.c1 705 self.wr 706 707 class D: 708 pass 709 710 c1, c2 = D(), C() 711 712 c2.me = c2 713 c2.c1 = c1 714 c2.wr = weakref.ref(c1, c2.cb) 715 716 del c1, c2, C, D 717 gc.collect() 718 719 def test_callback_in_cycle_resurrection(self): 720 import gc 721 722 # Do something nasty in a weakref callback: resurrect objects 723 # from dead cycles. For this to be attempted, the weakref and 724 # its callback must also be part of the cyclic trash (else the 725 # objects reachable via the callback couldn't be in cyclic trash 726 # to begin with -- the callback would act like an external root). 727 # But gc clears trash weakrefs with callbacks early now, which 728 # disables the callbacks, so the callbacks shouldn't get called 729 # at all (and so nothing actually gets resurrected). 730 731 alist = [] 732 class C(object): 733 def __init__(self, value): 734 self.attribute = value 735 736 def acallback(self, ignore): 737 alist.append(self.c) 738 739 c1, c2 = C(1), C(2) 740 c1.c = c2 741 c2.c = c1 742 c1.wr = weakref.ref(c2, c1.acallback) 743 c2.wr = weakref.ref(c1, c2.acallback) 744 745 def C_went_away(ignore): 746 alist.append("C went away") 747 wr = weakref.ref(C, C_went_away) 748 749 del c1, c2, C # make them all trash 750 self.assertEqual(alist, []) # del isn't enough to reclaim anything 751 752 gc.collect() 753 # c1.wr and c2.wr were part of the cyclic trash, so should have 754 # been cleared without their callbacks executing. OTOH, the weakref 755 # to C is bound to a function local (wr), and wasn't trash, so that 756 # callback should have been invoked when C went away. 757 self.assertEqual(alist, ["C went away"]) 758 # The remaining weakref should be dead now (its callback ran). 759 self.assertEqual(wr(), None) 760 761 del alist[:] 762 gc.collect() 763 self.assertEqual(alist, []) 764 765 def test_callbacks_on_callback(self): 766 import gc 767 768 # Set up weakref callbacks *on* weakref callbacks. 769 alist = [] 770 def safe_callback(ignore): 771 alist.append("safe_callback called") 772 773 class C(object): 774 def cb(self, ignore): 775 alist.append("cb called") 776 777 c, d = C(), C() 778 c.other = d 779 d.other = c 780 callback = c.cb 781 c.wr = weakref.ref(d, callback) # this won't trigger 782 d.wr = weakref.ref(callback, d.cb) # ditto 783 external_wr = weakref.ref(callback, safe_callback) # but this will 784 self.assertIs(external_wr(), callback) 785 786 # The weakrefs attached to c and d should get cleared, so that 787 # C.cb is never called. But external_wr isn't part of the cyclic 788 # trash, and no cyclic trash is reachable from it, so safe_callback 789 # should get invoked when the bound method object callback (c.cb) 790 # -- which is itself a callback, and also part of the cyclic trash -- 791 # gets reclaimed at the end of gc. 792 793 del callback, c, d, C 794 self.assertEqual(alist, []) # del isn't enough to clean up cycles 795 gc.collect() 796 self.assertEqual(alist, ["safe_callback called"]) 797 self.assertEqual(external_wr(), None) 798 799 del alist[:] 800 gc.collect() 801 self.assertEqual(alist, []) 802 803 def test_gc_during_ref_creation(self): 804 self.check_gc_during_creation(weakref.ref) 805 806 def test_gc_during_proxy_creation(self): 807 self.check_gc_during_creation(weakref.proxy) 808 809 def check_gc_during_creation(self, makeref): 810 thresholds = gc.get_threshold() 811 gc.set_threshold(1, 1, 1) 812 gc.collect() 813 class A: 814 pass 815 816 def callback(*args): 817 pass 818 819 referenced = A() 820 821 a = A() 822 a.a = a 823 a.wr = makeref(referenced) 824 825 try: 826 # now make sure the object and the ref get labeled as 827 # cyclic trash: 828 a = A() 829 weakref.ref(referenced, callback) 830 831 finally: 832 gc.set_threshold(*thresholds) 833 834 def test_ref_created_during_del(self): 835 # Bug #1377858 836 # A weakref created in an object's __del__() would crash the 837 # interpreter when the weakref was cleaned up since it would refer to 838 # non-existent memory. This test should not segfault the interpreter. 839 class Target(object): 840 def __del__(self): 841 global ref_from_del 842 ref_from_del = weakref.ref(self) 843 844 w = Target() 845 846 def test_init(self): 847 # Issue 3634 848 # <weakref to class>.__init__() doesn't check errors correctly 849 r = weakref.ref(Exception) 850 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 851 # No exception should be raised here 852 gc.collect() 853 854 def test_classes(self): 855 # Check that classes are weakrefable. 856 class A(object): 857 pass 858 l = [] 859 weakref.ref(int) 860 a = weakref.ref(A, l.append) 861 A = None 862 gc.collect() 863 self.assertEqual(a(), None) 864 self.assertEqual(l, [a]) 865 866 def test_equality(self): 867 # Alive weakrefs defer equality testing to their underlying object. 868 x = Object(1) 869 y = Object(1) 870 z = Object(2) 871 a = weakref.ref(x) 872 b = weakref.ref(y) 873 c = weakref.ref(z) 874 d = weakref.ref(x) 875 # Note how we directly test the operators here, to stress both 876 # __eq__ and __ne__. 877 self.assertTrue(a == b) 878 self.assertFalse(a != b) 879 self.assertFalse(a == c) 880 self.assertTrue(a != c) 881 self.assertTrue(a == d) 882 self.assertFalse(a != d) 883 self.assertFalse(a == x) 884 self.assertTrue(a != x) 885 self.assertTrue(a == ALWAYS_EQ) 886 self.assertFalse(a != ALWAYS_EQ) 887 del x, y, z 888 gc.collect() 889 for r in a, b, c: 890 # Sanity check 891 self.assertIs(r(), None) 892 # Dead weakrefs compare by identity: whether `a` and `d` are the 893 # same weakref object is an implementation detail, since they pointed 894 # to the same original object and didn't have a callback. 895 # (see issue #16453). 896 self.assertFalse(a == b) 897 self.assertTrue(a != b) 898 self.assertFalse(a == c) 899 self.assertTrue(a != c) 900 self.assertEqual(a == d, a is d) 901 self.assertEqual(a != d, a is not d) 902 903 def test_ordering(self): 904 # weakrefs cannot be ordered, even if the underlying objects can. 905 ops = [operator.lt, operator.gt, operator.le, operator.ge] 906 x = Object(1) 907 y = Object(1) 908 a = weakref.ref(x) 909 b = weakref.ref(y) 910 for op in ops: 911 self.assertRaises(TypeError, op, a, b) 912 # Same when dead. 913 del x, y 914 gc.collect() 915 for op in ops: 916 self.assertRaises(TypeError, op, a, b) 917 918 def test_hashing(self): 919 # Alive weakrefs hash the same as the underlying object 920 x = Object(42) 921 y = Object(42) 922 a = weakref.ref(x) 923 b = weakref.ref(y) 924 self.assertEqual(hash(a), hash(42)) 925 del x, y 926 gc.collect() 927 # Dead weakrefs: 928 # - retain their hash is they were hashed when alive; 929 # - otherwise, cannot be hashed. 930 self.assertEqual(hash(a), hash(42)) 931 self.assertRaises(TypeError, hash, b) 932 933 def test_trashcan_16602(self): 934 # Issue #16602: when a weakref's target was part of a long 935 # deallocation chain, the trashcan mechanism could delay clearing 936 # of the weakref and make the target object visible from outside 937 # code even though its refcount had dropped to 0. A crash ensued. 938 class C: 939 def __init__(self, parent): 940 if not parent: 941 return 942 wself = weakref.ref(self) 943 def cb(wparent): 944 o = wself() 945 self.wparent = weakref.ref(parent, cb) 946 947 d = weakref.WeakKeyDictionary() 948 root = c = C(None) 949 for n in range(100): 950 d[c] = c = C(c) 951 del root 952 gc.collect() 953 954 def test_callback_attribute(self): 955 x = Object(1) 956 callback = lambda ref: None 957 ref1 = weakref.ref(x, callback) 958 self.assertIs(ref1.__callback__, callback) 959 960 ref2 = weakref.ref(x) 961 self.assertIsNone(ref2.__callback__) 962 963 def test_callback_attribute_after_deletion(self): 964 x = Object(1) 965 ref = weakref.ref(x, self.callback) 966 self.assertIsNotNone(ref.__callback__) 967 del x 968 support.gc_collect() 969 self.assertIsNone(ref.__callback__) 970 971 def test_set_callback_attribute(self): 972 x = Object(1) 973 callback = lambda ref: None 974 ref1 = weakref.ref(x, callback) 975 with self.assertRaises(AttributeError): 976 ref1.__callback__ = lambda ref: None 977 978 def test_callback_gcs(self): 979 class ObjectWithDel(Object): 980 def __del__(self): pass 981 x = ObjectWithDel(1) 982 ref1 = weakref.ref(x, lambda ref: support.gc_collect()) 983 del x 984 support.gc_collect() 985 986 987class SubclassableWeakrefTestCase(TestBase): 988 989 def test_subclass_refs(self): 990 class MyRef(weakref.ref): 991 def __init__(self, ob, callback=None, value=42): 992 self.value = value 993 super().__init__(ob, callback) 994 def __call__(self): 995 self.called = True 996 return super().__call__() 997 o = Object("foo") 998 mr = MyRef(o, value=24) 999 self.assertIs(mr(), o) 1000 self.assertTrue(mr.called) 1001 self.assertEqual(mr.value, 24) 1002 del o 1003 gc_collect() # For PyPy or other GCs. 1004 self.assertIsNone(mr()) 1005 self.assertTrue(mr.called) 1006 1007 def test_subclass_refs_dont_replace_standard_refs(self): 1008 class MyRef(weakref.ref): 1009 pass 1010 o = Object(42) 1011 r1 = MyRef(o) 1012 r2 = weakref.ref(o) 1013 self.assertIsNot(r1, r2) 1014 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 1015 self.assertEqual(weakref.getweakrefcount(o), 2) 1016 r3 = MyRef(o) 1017 self.assertEqual(weakref.getweakrefcount(o), 3) 1018 refs = weakref.getweakrefs(o) 1019 self.assertEqual(len(refs), 3) 1020 self.assertIs(r2, refs[0]) 1021 self.assertIn(r1, refs[1:]) 1022 self.assertIn(r3, refs[1:]) 1023 1024 def test_subclass_refs_dont_conflate_callbacks(self): 1025 class MyRef(weakref.ref): 1026 pass 1027 o = Object(42) 1028 r1 = MyRef(o, id) 1029 r2 = MyRef(o, str) 1030 self.assertIsNot(r1, r2) 1031 refs = weakref.getweakrefs(o) 1032 self.assertIn(r1, refs) 1033 self.assertIn(r2, refs) 1034 1035 def test_subclass_refs_with_slots(self): 1036 class MyRef(weakref.ref): 1037 __slots__ = "slot1", "slot2" 1038 def __new__(type, ob, callback, slot1, slot2): 1039 return weakref.ref.__new__(type, ob, callback) 1040 def __init__(self, ob, callback, slot1, slot2): 1041 self.slot1 = slot1 1042 self.slot2 = slot2 1043 def meth(self): 1044 return self.slot1 + self.slot2 1045 o = Object(42) 1046 r = MyRef(o, None, "abc", "def") 1047 self.assertEqual(r.slot1, "abc") 1048 self.assertEqual(r.slot2, "def") 1049 self.assertEqual(r.meth(), "abcdef") 1050 self.assertFalse(hasattr(r, "__dict__")) 1051 1052 def test_subclass_refs_with_cycle(self): 1053 """Confirm https://bugs.python.org/issue3100 is fixed.""" 1054 # An instance of a weakref subclass can have attributes. 1055 # If such a weakref holds the only strong reference to the object, 1056 # deleting the weakref will delete the object. In this case, 1057 # the callback must not be called, because the ref object is 1058 # being deleted. 1059 class MyRef(weakref.ref): 1060 pass 1061 1062 # Use a local callback, for "regrtest -R::" 1063 # to detect refcounting problems 1064 def callback(w): 1065 self.cbcalled += 1 1066 1067 o = C() 1068 r1 = MyRef(o, callback) 1069 r1.o = o 1070 del o 1071 1072 del r1 # Used to crash here 1073 1074 self.assertEqual(self.cbcalled, 0) 1075 1076 # Same test, with two weakrefs to the same object 1077 # (since code paths are different) 1078 o = C() 1079 r1 = MyRef(o, callback) 1080 r2 = MyRef(o, callback) 1081 r1.r = r2 1082 r2.o = o 1083 del o 1084 del r2 1085 1086 del r1 # Used to crash here 1087 1088 self.assertEqual(self.cbcalled, 0) 1089 1090 1091class WeakMethodTestCase(unittest.TestCase): 1092 1093 def _subclass(self): 1094 """Return an Object subclass overriding `some_method`.""" 1095 class C(Object): 1096 def some_method(self): 1097 return 6 1098 return C 1099 1100 def test_alive(self): 1101 o = Object(1) 1102 r = weakref.WeakMethod(o.some_method) 1103 self.assertIsInstance(r, weakref.ReferenceType) 1104 self.assertIsInstance(r(), type(o.some_method)) 1105 self.assertIs(r().__self__, o) 1106 self.assertIs(r().__func__, o.some_method.__func__) 1107 self.assertEqual(r()(), 4) 1108 1109 def test_object_dead(self): 1110 o = Object(1) 1111 r = weakref.WeakMethod(o.some_method) 1112 del o 1113 gc.collect() 1114 self.assertIs(r(), None) 1115 1116 def test_method_dead(self): 1117 C = self._subclass() 1118 o = C(1) 1119 r = weakref.WeakMethod(o.some_method) 1120 del C.some_method 1121 gc.collect() 1122 self.assertIs(r(), None) 1123 1124 def test_callback_when_object_dead(self): 1125 # Test callback behaviour when object dies first. 1126 C = self._subclass() 1127 calls = [] 1128 def cb(arg): 1129 calls.append(arg) 1130 o = C(1) 1131 r = weakref.WeakMethod(o.some_method, cb) 1132 del o 1133 gc.collect() 1134 self.assertEqual(calls, [r]) 1135 # Callback is only called once. 1136 C.some_method = Object.some_method 1137 gc.collect() 1138 self.assertEqual(calls, [r]) 1139 1140 def test_callback_when_method_dead(self): 1141 # Test callback behaviour when method dies first. 1142 C = self._subclass() 1143 calls = [] 1144 def cb(arg): 1145 calls.append(arg) 1146 o = C(1) 1147 r = weakref.WeakMethod(o.some_method, cb) 1148 del C.some_method 1149 gc.collect() 1150 self.assertEqual(calls, [r]) 1151 # Callback is only called once. 1152 del o 1153 gc.collect() 1154 self.assertEqual(calls, [r]) 1155 1156 @support.cpython_only 1157 def test_no_cycles(self): 1158 # A WeakMethod doesn't create any reference cycle to itself. 1159 o = Object(1) 1160 def cb(_): 1161 pass 1162 r = weakref.WeakMethod(o.some_method, cb) 1163 wr = weakref.ref(r) 1164 del r 1165 self.assertIs(wr(), None) 1166 1167 def test_equality(self): 1168 def _eq(a, b): 1169 self.assertTrue(a == b) 1170 self.assertFalse(a != b) 1171 def _ne(a, b): 1172 self.assertTrue(a != b) 1173 self.assertFalse(a == b) 1174 x = Object(1) 1175 y = Object(1) 1176 a = weakref.WeakMethod(x.some_method) 1177 b = weakref.WeakMethod(y.some_method) 1178 c = weakref.WeakMethod(x.other_method) 1179 d = weakref.WeakMethod(y.other_method) 1180 # Objects equal, same method 1181 _eq(a, b) 1182 _eq(c, d) 1183 # Objects equal, different method 1184 _ne(a, c) 1185 _ne(a, d) 1186 _ne(b, c) 1187 _ne(b, d) 1188 # Objects unequal, same or different method 1189 z = Object(2) 1190 e = weakref.WeakMethod(z.some_method) 1191 f = weakref.WeakMethod(z.other_method) 1192 _ne(a, e) 1193 _ne(a, f) 1194 _ne(b, e) 1195 _ne(b, f) 1196 # Compare with different types 1197 _ne(a, x.some_method) 1198 _eq(a, ALWAYS_EQ) 1199 del x, y, z 1200 gc.collect() 1201 # Dead WeakMethods compare by identity 1202 refs = a, b, c, d, e, f 1203 for q in refs: 1204 for r in refs: 1205 self.assertEqual(q == r, q is r) 1206 self.assertEqual(q != r, q is not r) 1207 1208 def test_hashing(self): 1209 # Alive WeakMethods are hashable if the underlying object is 1210 # hashable. 1211 x = Object(1) 1212 y = Object(1) 1213 a = weakref.WeakMethod(x.some_method) 1214 b = weakref.WeakMethod(y.some_method) 1215 c = weakref.WeakMethod(y.other_method) 1216 # Since WeakMethod objects are equal, the hashes should be equal. 1217 self.assertEqual(hash(a), hash(b)) 1218 ha = hash(a) 1219 # Dead WeakMethods retain their old hash value 1220 del x, y 1221 gc.collect() 1222 self.assertEqual(hash(a), ha) 1223 self.assertEqual(hash(b), ha) 1224 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. 1225 self.assertRaises(TypeError, hash, c) 1226 1227 1228class MappingTestCase(TestBase): 1229 1230 COUNT = 10 1231 1232 def check_len_cycles(self, dict_type, cons): 1233 N = 20 1234 items = [RefCycle() for i in range(N)] 1235 dct = dict_type(cons(o) for o in items) 1236 # Keep an iterator alive 1237 it = dct.items() 1238 try: 1239 next(it) 1240 except StopIteration: 1241 pass 1242 del items 1243 gc.collect() 1244 n1 = len(dct) 1245 del it 1246 gc.collect() 1247 n2 = len(dct) 1248 # one item may be kept alive inside the iterator 1249 self.assertIn(n1, (0, 1)) 1250 self.assertEqual(n2, 0) 1251 1252 def test_weak_keyed_len_cycles(self): 1253 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1254 1255 def test_weak_valued_len_cycles(self): 1256 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 1257 1258 def check_len_race(self, dict_type, cons): 1259 # Extended sanity checks for len() in the face of cyclic collection 1260 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 1261 for th in range(1, 100): 1262 N = 20 1263 gc.collect(0) 1264 gc.set_threshold(th, th, th) 1265 items = [RefCycle() for i in range(N)] 1266 dct = dict_type(cons(o) for o in items) 1267 del items 1268 # All items will be collected at next garbage collection pass 1269 it = dct.items() 1270 try: 1271 next(it) 1272 except StopIteration: 1273 pass 1274 n1 = len(dct) 1275 del it 1276 n2 = len(dct) 1277 self.assertGreaterEqual(n1, 0) 1278 self.assertLessEqual(n1, N) 1279 self.assertGreaterEqual(n2, 0) 1280 self.assertLessEqual(n2, n1) 1281 1282 def test_weak_keyed_len_race(self): 1283 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1284 1285 def test_weak_valued_len_race(self): 1286 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 1287 1288 def test_weak_values(self): 1289 # 1290 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1291 # 1292 dict, objects = self.make_weak_valued_dict() 1293 for o in objects: 1294 self.assertEqual(weakref.getweakrefcount(o), 1) 1295 self.assertIs(o, dict[o.arg], 1296 "wrong object returned by weak dict!") 1297 items1 = list(dict.items()) 1298 items2 = list(dict.copy().items()) 1299 items1.sort() 1300 items2.sort() 1301 self.assertEqual(items1, items2, 1302 "cloning of weak-valued dictionary did not work!") 1303 del items1, items2 1304 self.assertEqual(len(dict), self.COUNT) 1305 del objects[0] 1306 gc_collect() # For PyPy or other GCs. 1307 self.assertEqual(len(dict), self.COUNT - 1, 1308 "deleting object did not cause dictionary update") 1309 del objects, o 1310 gc_collect() # For PyPy or other GCs. 1311 self.assertEqual(len(dict), 0, 1312 "deleting the values did not clear the dictionary") 1313 # regression on SF bug #447152: 1314 dict = weakref.WeakValueDictionary() 1315 self.assertRaises(KeyError, dict.__getitem__, 1) 1316 dict[2] = C() 1317 gc_collect() # For PyPy or other GCs. 1318 self.assertRaises(KeyError, dict.__getitem__, 2) 1319 1320 def test_weak_keys(self): 1321 # 1322 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1323 # len(d), k in d. 1324 # 1325 dict, objects = self.make_weak_keyed_dict() 1326 for o in objects: 1327 self.assertEqual(weakref.getweakrefcount(o), 1, 1328 "wrong number of weak references to %r!" % o) 1329 self.assertIs(o.arg, dict[o], 1330 "wrong object returned by weak dict!") 1331 items1 = dict.items() 1332 items2 = dict.copy().items() 1333 self.assertEqual(set(items1), set(items2), 1334 "cloning of weak-keyed dictionary did not work!") 1335 del items1, items2 1336 self.assertEqual(len(dict), self.COUNT) 1337 del objects[0] 1338 gc_collect() # For PyPy or other GCs. 1339 self.assertEqual(len(dict), (self.COUNT - 1), 1340 "deleting object did not cause dictionary update") 1341 del objects, o 1342 gc_collect() # For PyPy or other GCs. 1343 self.assertEqual(len(dict), 0, 1344 "deleting the keys did not clear the dictionary") 1345 o = Object(42) 1346 dict[o] = "What is the meaning of the universe?" 1347 self.assertIn(o, dict) 1348 self.assertNotIn(34, dict) 1349 1350 def test_weak_keyed_iters(self): 1351 dict, objects = self.make_weak_keyed_dict() 1352 self.check_iters(dict) 1353 1354 # Test keyrefs() 1355 refs = dict.keyrefs() 1356 self.assertEqual(len(refs), len(objects)) 1357 objects2 = list(objects) 1358 for wr in refs: 1359 ob = wr() 1360 self.assertIn(ob, dict) 1361 self.assertIn(ob, dict) 1362 self.assertEqual(ob.arg, dict[ob]) 1363 objects2.remove(ob) 1364 self.assertEqual(len(objects2), 0) 1365 1366 # Test iterkeyrefs() 1367 objects2 = list(objects) 1368 self.assertEqual(len(list(dict.keyrefs())), len(objects)) 1369 for wr in dict.keyrefs(): 1370 ob = wr() 1371 self.assertIn(ob, dict) 1372 self.assertIn(ob, dict) 1373 self.assertEqual(ob.arg, dict[ob]) 1374 objects2.remove(ob) 1375 self.assertEqual(len(objects2), 0) 1376 1377 def test_weak_valued_iters(self): 1378 dict, objects = self.make_weak_valued_dict() 1379 self.check_iters(dict) 1380 1381 # Test valuerefs() 1382 refs = dict.valuerefs() 1383 self.assertEqual(len(refs), len(objects)) 1384 objects2 = list(objects) 1385 for wr in refs: 1386 ob = wr() 1387 self.assertEqual(ob, dict[ob.arg]) 1388 self.assertEqual(ob.arg, dict[ob.arg].arg) 1389 objects2.remove(ob) 1390 self.assertEqual(len(objects2), 0) 1391 1392 # Test itervaluerefs() 1393 objects2 = list(objects) 1394 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1395 for wr in dict.itervaluerefs(): 1396 ob = wr() 1397 self.assertEqual(ob, dict[ob.arg]) 1398 self.assertEqual(ob.arg, dict[ob.arg].arg) 1399 objects2.remove(ob) 1400 self.assertEqual(len(objects2), 0) 1401 1402 def check_iters(self, dict): 1403 # item iterator: 1404 items = list(dict.items()) 1405 for item in dict.items(): 1406 items.remove(item) 1407 self.assertFalse(items, "items() did not touch all items") 1408 1409 # key iterator, via __iter__(): 1410 keys = list(dict.keys()) 1411 for k in dict: 1412 keys.remove(k) 1413 self.assertFalse(keys, "__iter__() did not touch all keys") 1414 1415 # key iterator, via iterkeys(): 1416 keys = list(dict.keys()) 1417 for k in dict.keys(): 1418 keys.remove(k) 1419 self.assertFalse(keys, "iterkeys() did not touch all keys") 1420 1421 # value iterator: 1422 values = list(dict.values()) 1423 for v in dict.values(): 1424 values.remove(v) 1425 self.assertFalse(values, 1426 "itervalues() did not touch all values") 1427 1428 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1429 n = len(dict) 1430 it = iter(getattr(dict, iter_name)()) 1431 next(it) # Trigger internal iteration 1432 # Destroy an object 1433 del objects[-1] 1434 gc.collect() # just in case 1435 # We have removed either the first consumed object, or another one 1436 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1437 del it 1438 # The removal has been committed 1439 self.assertEqual(len(dict), n - 1) 1440 1441 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1442 # Check that we can explicitly mutate the weak dict without 1443 # interfering with delayed removal. 1444 # `testcontext` should create an iterator, destroy one of the 1445 # weakref'ed objects and then return a new key/value pair corresponding 1446 # to the destroyed object. 1447 with testcontext() as (k, v): 1448 self.assertNotIn(k, dict) 1449 with testcontext() as (k, v): 1450 self.assertRaises(KeyError, dict.__delitem__, k) 1451 self.assertNotIn(k, dict) 1452 with testcontext() as (k, v): 1453 self.assertRaises(KeyError, dict.pop, k) 1454 self.assertNotIn(k, dict) 1455 with testcontext() as (k, v): 1456 dict[k] = v 1457 self.assertEqual(dict[k], v) 1458 ddict = copy.copy(dict) 1459 with testcontext() as (k, v): 1460 dict.update(ddict) 1461 self.assertEqual(dict, ddict) 1462 with testcontext() as (k, v): 1463 dict.clear() 1464 self.assertEqual(len(dict), 0) 1465 1466 def check_weak_del_and_len_while_iterating(self, dict, testcontext): 1467 # Check that len() works when both iterating and removing keys 1468 # explicitly through various means (.pop(), .clear()...), while 1469 # implicit mutation is deferred because an iterator is alive. 1470 # (each call to testcontext() should schedule one item for removal 1471 # for this test to work properly) 1472 o = Object(123456) 1473 with testcontext(): 1474 n = len(dict) 1475 # Since underlying dict is ordered, first item is popped 1476 dict.pop(next(dict.keys())) 1477 self.assertEqual(len(dict), n - 1) 1478 dict[o] = o 1479 self.assertEqual(len(dict), n) 1480 # last item in objects is removed from dict in context shutdown 1481 with testcontext(): 1482 self.assertEqual(len(dict), n - 1) 1483 # Then, (o, o) is popped 1484 dict.popitem() 1485 self.assertEqual(len(dict), n - 2) 1486 with testcontext(): 1487 self.assertEqual(len(dict), n - 3) 1488 del dict[next(dict.keys())] 1489 self.assertEqual(len(dict), n - 4) 1490 with testcontext(): 1491 self.assertEqual(len(dict), n - 5) 1492 dict.popitem() 1493 self.assertEqual(len(dict), n - 6) 1494 with testcontext(): 1495 dict.clear() 1496 self.assertEqual(len(dict), 0) 1497 self.assertEqual(len(dict), 0) 1498 1499 def test_weak_keys_destroy_while_iterating(self): 1500 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1501 dict, objects = self.make_weak_keyed_dict() 1502 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1503 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1504 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1505 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') 1506 dict, objects = self.make_weak_keyed_dict() 1507 @contextlib.contextmanager 1508 def testcontext(): 1509 try: 1510 it = iter(dict.items()) 1511 next(it) 1512 # Schedule a key/value for removal and recreate it 1513 v = objects.pop().arg 1514 gc.collect() # just in case 1515 yield Object(v), v 1516 finally: 1517 it = None # should commit all removals 1518 gc.collect() 1519 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1520 # Issue #21173: len() fragile when keys are both implicitly and 1521 # explicitly removed. 1522 dict, objects = self.make_weak_keyed_dict() 1523 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1524 1525 def test_weak_values_destroy_while_iterating(self): 1526 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1527 dict, objects = self.make_weak_valued_dict() 1528 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1529 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1530 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1531 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1532 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') 1533 dict, objects = self.make_weak_valued_dict() 1534 @contextlib.contextmanager 1535 def testcontext(): 1536 try: 1537 it = iter(dict.items()) 1538 next(it) 1539 # Schedule a key/value for removal and recreate it 1540 k = objects.pop().arg 1541 gc.collect() # just in case 1542 yield k, Object(k) 1543 finally: 1544 it = None # should commit all removals 1545 gc.collect() 1546 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1547 dict, objects = self.make_weak_valued_dict() 1548 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1549 1550 def test_make_weak_keyed_dict_from_dict(self): 1551 o = Object(3) 1552 dict = weakref.WeakKeyDictionary({o:364}) 1553 self.assertEqual(dict[o], 364) 1554 1555 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1556 o = Object(3) 1557 dict = weakref.WeakKeyDictionary({o:364}) 1558 dict2 = weakref.WeakKeyDictionary(dict) 1559 self.assertEqual(dict[o], 364) 1560 1561 def make_weak_keyed_dict(self): 1562 dict = weakref.WeakKeyDictionary() 1563 objects = list(map(Object, range(self.COUNT))) 1564 for o in objects: 1565 dict[o] = o.arg 1566 return dict, objects 1567 1568 def test_make_weak_valued_dict_from_dict(self): 1569 o = Object(3) 1570 dict = weakref.WeakValueDictionary({364:o}) 1571 self.assertEqual(dict[364], o) 1572 1573 def test_make_weak_valued_dict_from_weak_valued_dict(self): 1574 o = Object(3) 1575 dict = weakref.WeakValueDictionary({364:o}) 1576 dict2 = weakref.WeakValueDictionary(dict) 1577 self.assertEqual(dict[364], o) 1578 1579 def test_make_weak_valued_dict_misc(self): 1580 # errors 1581 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1582 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1583 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1584 # special keyword arguments 1585 o = Object(3) 1586 for kw in 'self', 'dict', 'other', 'iterable': 1587 d = weakref.WeakValueDictionary(**{kw: o}) 1588 self.assertEqual(list(d.keys()), [kw]) 1589 self.assertEqual(d[kw], o) 1590 1591 def make_weak_valued_dict(self): 1592 dict = weakref.WeakValueDictionary() 1593 objects = list(map(Object, range(self.COUNT))) 1594 for o in objects: 1595 dict[o.arg] = o 1596 return dict, objects 1597 1598 def check_popitem(self, klass, key1, value1, key2, value2): 1599 weakdict = klass() 1600 weakdict[key1] = value1 1601 weakdict[key2] = value2 1602 self.assertEqual(len(weakdict), 2) 1603 k, v = weakdict.popitem() 1604 self.assertEqual(len(weakdict), 1) 1605 if k is key1: 1606 self.assertIs(v, value1) 1607 else: 1608 self.assertIs(v, value2) 1609 k, v = weakdict.popitem() 1610 self.assertEqual(len(weakdict), 0) 1611 if k is key1: 1612 self.assertIs(v, value1) 1613 else: 1614 self.assertIs(v, value2) 1615 1616 def test_weak_valued_dict_popitem(self): 1617 self.check_popitem(weakref.WeakValueDictionary, 1618 "key1", C(), "key2", C()) 1619 1620 def test_weak_keyed_dict_popitem(self): 1621 self.check_popitem(weakref.WeakKeyDictionary, 1622 C(), "value 1", C(), "value 2") 1623 1624 def check_setdefault(self, klass, key, value1, value2): 1625 self.assertIsNot(value1, value2, 1626 "invalid test" 1627 " -- value parameters must be distinct objects") 1628 weakdict = klass() 1629 o = weakdict.setdefault(key, value1) 1630 self.assertIs(o, value1) 1631 self.assertIn(key, weakdict) 1632 self.assertIs(weakdict.get(key), value1) 1633 self.assertIs(weakdict[key], value1) 1634 1635 o = weakdict.setdefault(key, value2) 1636 self.assertIs(o, value1) 1637 self.assertIn(key, weakdict) 1638 self.assertIs(weakdict.get(key), value1) 1639 self.assertIs(weakdict[key], value1) 1640 1641 def test_weak_valued_dict_setdefault(self): 1642 self.check_setdefault(weakref.WeakValueDictionary, 1643 "key", C(), C()) 1644 1645 def test_weak_keyed_dict_setdefault(self): 1646 self.check_setdefault(weakref.WeakKeyDictionary, 1647 C(), "value 1", "value 2") 1648 1649 def check_update(self, klass, dict): 1650 # 1651 # This exercises d.update(), len(d), d.keys(), k in d, 1652 # d.get(), d[]. 1653 # 1654 weakdict = klass() 1655 weakdict.update(dict) 1656 self.assertEqual(len(weakdict), len(dict)) 1657 for k in weakdict.keys(): 1658 self.assertIn(k, dict, "mysterious new key appeared in weak dict") 1659 v = dict.get(k) 1660 self.assertIs(v, weakdict[k]) 1661 self.assertIs(v, weakdict.get(k)) 1662 for k in dict.keys(): 1663 self.assertIn(k, weakdict, "original key disappeared in weak dict") 1664 v = dict[k] 1665 self.assertIs(v, weakdict[k]) 1666 self.assertIs(v, weakdict.get(k)) 1667 1668 def test_weak_valued_dict_update(self): 1669 self.check_update(weakref.WeakValueDictionary, 1670 {1: C(), 'a': C(), C(): C()}) 1671 # errors 1672 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1673 d = weakref.WeakValueDictionary() 1674 self.assertRaises(TypeError, d.update, {}, {}) 1675 self.assertRaises(TypeError, d.update, (), ()) 1676 self.assertEqual(list(d.keys()), []) 1677 # special keyword arguments 1678 o = Object(3) 1679 for kw in 'self', 'dict', 'other', 'iterable': 1680 d = weakref.WeakValueDictionary() 1681 d.update(**{kw: o}) 1682 self.assertEqual(list(d.keys()), [kw]) 1683 self.assertEqual(d[kw], o) 1684 1685 def test_weak_valued_union_operators(self): 1686 a = C() 1687 b = C() 1688 c = C() 1689 wvd1 = weakref.WeakValueDictionary({1: a}) 1690 wvd2 = weakref.WeakValueDictionary({1: b, 2: a}) 1691 wvd3 = wvd1.copy() 1692 d1 = {1: c, 3: b} 1693 pairs = [(5, c), (6, b)] 1694 1695 tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries 1696 self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2)) 1697 self.assertIs(type(tmp1), weakref.WeakValueDictionary) 1698 wvd1 |= wvd2 1699 self.assertEqual(wvd1, tmp1) 1700 1701 tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping 1702 self.assertEqual(dict(tmp2), dict(wvd2) | d1) 1703 self.assertIs(type(tmp2), weakref.WeakValueDictionary) 1704 wvd2 |= d1 1705 self.assertEqual(wvd2, tmp2) 1706 1707 tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value 1708 tmp3 |= pairs 1709 self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs)) 1710 self.assertIs(type(tmp3), weakref.WeakValueDictionary) 1711 1712 tmp4 = d1 | wvd3 # Testing .__ror__ 1713 self.assertEqual(dict(tmp4), d1 | dict(wvd3)) 1714 self.assertIs(type(tmp4), weakref.WeakValueDictionary) 1715 1716 del a 1717 self.assertNotIn(2, tmp1) 1718 self.assertNotIn(2, tmp2) 1719 self.assertNotIn(1, tmp3) 1720 self.assertNotIn(1, tmp4) 1721 1722 def test_weak_keyed_dict_update(self): 1723 self.check_update(weakref.WeakKeyDictionary, 1724 {C(): 1, C(): 2, C(): 3}) 1725 1726 def test_weak_keyed_delitem(self): 1727 d = weakref.WeakKeyDictionary() 1728 o1 = Object('1') 1729 o2 = Object('2') 1730 d[o1] = 'something' 1731 d[o2] = 'something' 1732 self.assertEqual(len(d), 2) 1733 del d[o1] 1734 self.assertEqual(len(d), 1) 1735 self.assertEqual(list(d.keys()), [o2]) 1736 1737 def test_weak_keyed_union_operators(self): 1738 o1 = C() 1739 o2 = C() 1740 o3 = C() 1741 wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2}) 1742 wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4}) 1743 wkd3 = wkd1.copy() 1744 d1 = {o2: '5', o3: '6'} 1745 pairs = [(o2, 7), (o3, 8)] 1746 1747 tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries 1748 self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2)) 1749 self.assertIs(type(tmp1), weakref.WeakKeyDictionary) 1750 wkd1 |= wkd2 1751 self.assertEqual(wkd1, tmp1) 1752 1753 tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping 1754 self.assertEqual(dict(tmp2), dict(wkd2) | d1) 1755 self.assertIs(type(tmp2), weakref.WeakKeyDictionary) 1756 wkd2 |= d1 1757 self.assertEqual(wkd2, tmp2) 1758 1759 tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value 1760 tmp3 |= pairs 1761 self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs)) 1762 self.assertIs(type(tmp3), weakref.WeakKeyDictionary) 1763 1764 tmp4 = d1 | wkd3 # Testing .__ror__ 1765 self.assertEqual(dict(tmp4), d1 | dict(wkd3)) 1766 self.assertIs(type(tmp4), weakref.WeakKeyDictionary) 1767 1768 del o1 1769 self.assertNotIn(4, tmp1.values()) 1770 self.assertNotIn(4, tmp2.values()) 1771 self.assertNotIn(1, tmp3.values()) 1772 self.assertNotIn(1, tmp4.values()) 1773 1774 def test_weak_valued_delitem(self): 1775 d = weakref.WeakValueDictionary() 1776 o1 = Object('1') 1777 o2 = Object('2') 1778 d['something'] = o1 1779 d['something else'] = o2 1780 self.assertEqual(len(d), 2) 1781 del d['something'] 1782 self.assertEqual(len(d), 1) 1783 self.assertEqual(list(d.items()), [('something else', o2)]) 1784 1785 def test_weak_keyed_bad_delitem(self): 1786 d = weakref.WeakKeyDictionary() 1787 o = Object('1') 1788 # An attempt to delete an object that isn't there should raise 1789 # KeyError. It didn't before 2.3. 1790 self.assertRaises(KeyError, d.__delitem__, o) 1791 self.assertRaises(KeyError, d.__getitem__, o) 1792 1793 # If a key isn't of a weakly referencable type, __getitem__ and 1794 # __setitem__ raise TypeError. __delitem__ should too. 1795 self.assertRaises(TypeError, d.__delitem__, 13) 1796 self.assertRaises(TypeError, d.__getitem__, 13) 1797 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1798 1799 def test_weak_keyed_cascading_deletes(self): 1800 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1801 # over the keys via self.data.iterkeys(). If things vanished from 1802 # the dict during this (or got added), that caused a RuntimeError. 1803 1804 d = weakref.WeakKeyDictionary() 1805 mutate = False 1806 1807 class C(object): 1808 def __init__(self, i): 1809 self.value = i 1810 def __hash__(self): 1811 return hash(self.value) 1812 def __eq__(self, other): 1813 if mutate: 1814 # Side effect that mutates the dict, by removing the 1815 # last strong reference to a key. 1816 del objs[-1] 1817 return self.value == other.value 1818 1819 objs = [C(i) for i in range(4)] 1820 for o in objs: 1821 d[o] = o.value 1822 del o # now the only strong references to keys are in objs 1823 # Find the order in which iterkeys sees the keys. 1824 objs = list(d.keys()) 1825 # Reverse it, so that the iteration implementation of __delitem__ 1826 # has to keep looping to find the first object we delete. 1827 objs.reverse() 1828 1829 # Turn on mutation in C.__eq__. The first time through the loop, 1830 # under the iterkeys() business the first comparison will delete 1831 # the last item iterkeys() would see, and that causes a 1832 # RuntimeError: dictionary changed size during iteration 1833 # when the iterkeys() loop goes around to try comparing the next 1834 # key. After this was fixed, it just deletes the last object *our* 1835 # "for o in obj" loop would have gotten to. 1836 mutate = True 1837 count = 0 1838 for o in objs: 1839 count += 1 1840 del d[o] 1841 gc_collect() # For PyPy or other GCs. 1842 self.assertEqual(len(d), 0) 1843 self.assertEqual(count, 2) 1844 1845 def test_make_weak_valued_dict_repr(self): 1846 dict = weakref.WeakValueDictionary() 1847 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') 1848 1849 def test_make_weak_keyed_dict_repr(self): 1850 dict = weakref.WeakKeyDictionary() 1851 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') 1852 1853 def test_threaded_weak_valued_setdefault(self): 1854 d = weakref.WeakValueDictionary() 1855 with collect_in_thread(): 1856 for i in range(100000): 1857 x = d.setdefault(10, RefCycle()) 1858 self.assertIsNot(x, None) # we never put None in there! 1859 del x 1860 1861 def test_threaded_weak_valued_pop(self): 1862 d = weakref.WeakValueDictionary() 1863 with collect_in_thread(): 1864 for i in range(100000): 1865 d[10] = RefCycle() 1866 x = d.pop(10, 10) 1867 self.assertIsNot(x, None) # we never put None in there! 1868 1869 def test_threaded_weak_valued_consistency(self): 1870 # Issue #28427: old keys should not remove new values from 1871 # WeakValueDictionary when collecting from another thread. 1872 d = weakref.WeakValueDictionary() 1873 with collect_in_thread(): 1874 for i in range(200000): 1875 o = RefCycle() 1876 d[10] = o 1877 # o is still alive, so the dict can't be empty 1878 self.assertEqual(len(d), 1) 1879 o = None # lose ref 1880 1881 def check_threaded_weak_dict_copy(self, type_, deepcopy): 1882 # `type_` should be either WeakKeyDictionary or WeakValueDictionary. 1883 # `deepcopy` should be either True or False. 1884 exc = [] 1885 1886 class DummyKey: 1887 def __init__(self, ctr): 1888 self.ctr = ctr 1889 1890 class DummyValue: 1891 def __init__(self, ctr): 1892 self.ctr = ctr 1893 1894 def dict_copy(d, exc): 1895 try: 1896 if deepcopy is True: 1897 _ = copy.deepcopy(d) 1898 else: 1899 _ = d.copy() 1900 except Exception as ex: 1901 exc.append(ex) 1902 1903 def pop_and_collect(lst): 1904 gc_ctr = 0 1905 while lst: 1906 i = random.randint(0, len(lst) - 1) 1907 gc_ctr += 1 1908 lst.pop(i) 1909 if gc_ctr % 10000 == 0: 1910 gc.collect() # just in case 1911 1912 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) 1913 1914 d = type_() 1915 keys = [] 1916 values = [] 1917 # Initialize d with many entries 1918 for i in range(70000): 1919 k, v = DummyKey(i), DummyValue(i) 1920 keys.append(k) 1921 values.append(v) 1922 d[k] = v 1923 del k 1924 del v 1925 1926 t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) 1927 if type_ is weakref.WeakKeyDictionary: 1928 t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) 1929 else: # weakref.WeakValueDictionary 1930 t_collect = threading.Thread(target=pop_and_collect, args=(values,)) 1931 1932 t_copy.start() 1933 t_collect.start() 1934 1935 t_copy.join() 1936 t_collect.join() 1937 1938 # Test exceptions 1939 if exc: 1940 raise exc[0] 1941 1942 def test_threaded_weak_key_dict_copy(self): 1943 # Issue #35615: Weakref keys or values getting GC'ed during dict 1944 # copying should not result in a crash. 1945 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) 1946 1947 def test_threaded_weak_key_dict_deepcopy(self): 1948 # Issue #35615: Weakref keys or values getting GC'ed during dict 1949 # copying should not result in a crash. 1950 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) 1951 1952 def test_threaded_weak_value_dict_copy(self): 1953 # Issue #35615: Weakref keys or values getting GC'ed during dict 1954 # copying should not result in a crash. 1955 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) 1956 1957 def test_threaded_weak_value_dict_deepcopy(self): 1958 # Issue #35615: Weakref keys or values getting GC'ed during dict 1959 # copying should not result in a crash. 1960 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) 1961 1962 @support.cpython_only 1963 def test_remove_closure(self): 1964 d = weakref.WeakValueDictionary() 1965 self.assertIsNone(d._remove.__closure__) 1966 1967 1968from test import mapping_tests 1969 1970class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1971 """Check that WeakValueDictionary conforms to the mapping protocol""" 1972 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1973 type2test = weakref.WeakValueDictionary 1974 def _reference(self): 1975 return self.__ref.copy() 1976 1977class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1978 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1979 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1980 type2test = weakref.WeakKeyDictionary 1981 def _reference(self): 1982 return self.__ref.copy() 1983 1984 1985class FinalizeTestCase(unittest.TestCase): 1986 1987 class A: 1988 pass 1989 1990 def _collect_if_necessary(self): 1991 # we create no ref-cycles so in CPython no gc should be needed 1992 if sys.implementation.name != 'cpython': 1993 support.gc_collect() 1994 1995 def test_finalize(self): 1996 def add(x,y,z): 1997 res.append(x + y + z) 1998 return x + y + z 1999 2000 a = self.A() 2001 2002 res = [] 2003 f = weakref.finalize(a, add, 67, 43, z=89) 2004 self.assertEqual(f.alive, True) 2005 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) 2006 self.assertEqual(f(), 199) 2007 self.assertEqual(f(), None) 2008 self.assertEqual(f(), None) 2009 self.assertEqual(f.peek(), None) 2010 self.assertEqual(f.detach(), None) 2011 self.assertEqual(f.alive, False) 2012 self.assertEqual(res, [199]) 2013 2014 res = [] 2015 f = weakref.finalize(a, add, 67, 43, 89) 2016 self.assertEqual(f.peek(), (a, add, (67,43,89), {})) 2017 self.assertEqual(f.detach(), (a, add, (67,43,89), {})) 2018 self.assertEqual(f(), None) 2019 self.assertEqual(f(), None) 2020 self.assertEqual(f.peek(), None) 2021 self.assertEqual(f.detach(), None) 2022 self.assertEqual(f.alive, False) 2023 self.assertEqual(res, []) 2024 2025 res = [] 2026 f = weakref.finalize(a, add, x=67, y=43, z=89) 2027 del a 2028 self._collect_if_necessary() 2029 self.assertEqual(f(), None) 2030 self.assertEqual(f(), None) 2031 self.assertEqual(f.peek(), None) 2032 self.assertEqual(f.detach(), None) 2033 self.assertEqual(f.alive, False) 2034 self.assertEqual(res, [199]) 2035 2036 def test_arg_errors(self): 2037 def fin(*args, **kwargs): 2038 res.append((args, kwargs)) 2039 2040 a = self.A() 2041 2042 res = [] 2043 f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) 2044 self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) 2045 f() 2046 self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) 2047 2048 with self.assertRaises(TypeError): 2049 weakref.finalize(a, func=fin, arg=1) 2050 with self.assertRaises(TypeError): 2051 weakref.finalize(obj=a, func=fin, arg=1) 2052 self.assertRaises(TypeError, weakref.finalize, a) 2053 self.assertRaises(TypeError, weakref.finalize) 2054 2055 def test_order(self): 2056 a = self.A() 2057 res = [] 2058 2059 f1 = weakref.finalize(a, res.append, 'f1') 2060 f2 = weakref.finalize(a, res.append, 'f2') 2061 f3 = weakref.finalize(a, res.append, 'f3') 2062 f4 = weakref.finalize(a, res.append, 'f4') 2063 f5 = weakref.finalize(a, res.append, 'f5') 2064 2065 # make sure finalizers can keep themselves alive 2066 del f1, f4 2067 2068 self.assertTrue(f2.alive) 2069 self.assertTrue(f3.alive) 2070 self.assertTrue(f5.alive) 2071 2072 self.assertTrue(f5.detach()) 2073 self.assertFalse(f5.alive) 2074 2075 f5() # nothing because previously unregistered 2076 res.append('A') 2077 f3() # => res.append('f3') 2078 self.assertFalse(f3.alive) 2079 res.append('B') 2080 f3() # nothing because previously called 2081 res.append('C') 2082 del a 2083 self._collect_if_necessary() 2084 # => res.append('f4') 2085 # => res.append('f2') 2086 # => res.append('f1') 2087 self.assertFalse(f2.alive) 2088 res.append('D') 2089 f2() # nothing because previously called by gc 2090 2091 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] 2092 self.assertEqual(res, expected) 2093 2094 def test_all_freed(self): 2095 # we want a weakrefable subclass of weakref.finalize 2096 class MyFinalizer(weakref.finalize): 2097 pass 2098 2099 a = self.A() 2100 res = [] 2101 def callback(): 2102 res.append(123) 2103 f = MyFinalizer(a, callback) 2104 2105 wr_callback = weakref.ref(callback) 2106 wr_f = weakref.ref(f) 2107 del callback, f 2108 2109 self.assertIsNotNone(wr_callback()) 2110 self.assertIsNotNone(wr_f()) 2111 2112 del a 2113 self._collect_if_necessary() 2114 2115 self.assertIsNone(wr_callback()) 2116 self.assertIsNone(wr_f()) 2117 self.assertEqual(res, [123]) 2118 2119 @classmethod 2120 def run_in_child(cls): 2121 def error(): 2122 # Create an atexit finalizer from inside a finalizer called 2123 # at exit. This should be the next to be run. 2124 g1 = weakref.finalize(cls, print, 'g1') 2125 print('f3 error') 2126 1/0 2127 2128 # cls should stay alive till atexit callbacks run 2129 f1 = weakref.finalize(cls, print, 'f1', _global_var) 2130 f2 = weakref.finalize(cls, print, 'f2', _global_var) 2131 f3 = weakref.finalize(cls, error) 2132 f4 = weakref.finalize(cls, print, 'f4', _global_var) 2133 2134 assert f1.atexit == True 2135 f2.atexit = False 2136 assert f3.atexit == True 2137 assert f4.atexit == True 2138 2139 def test_atexit(self): 2140 prog = ('from test.test_weakref import FinalizeTestCase;'+ 2141 'FinalizeTestCase.run_in_child()') 2142 rc, out, err = script_helper.assert_python_ok('-c', prog) 2143 out = out.decode('ascii').splitlines() 2144 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) 2145 self.assertTrue(b'ZeroDivisionError' in err) 2146 2147 2148libreftest = """ Doctest for examples in the library reference: weakref.rst 2149 2150>>> from test.support import gc_collect 2151>>> import weakref 2152>>> class Dict(dict): 2153... pass 2154... 2155>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 2156>>> r = weakref.ref(obj) 2157>>> print(r() is obj) 2158True 2159 2160>>> import weakref 2161>>> class Object: 2162... pass 2163... 2164>>> o = Object() 2165>>> r = weakref.ref(o) 2166>>> o2 = r() 2167>>> o is o2 2168True 2169>>> del o, o2 2170>>> gc_collect() # For PyPy or other GCs. 2171>>> print(r()) 2172None 2173 2174>>> import weakref 2175>>> class ExtendedRef(weakref.ref): 2176... def __init__(self, ob, callback=None, **annotations): 2177... super().__init__(ob, callback) 2178... self.__counter = 0 2179... for k, v in annotations.items(): 2180... setattr(self, k, v) 2181... def __call__(self): 2182... '''Return a pair containing the referent and the number of 2183... times the reference has been called. 2184... ''' 2185... ob = super().__call__() 2186... if ob is not None: 2187... self.__counter += 1 2188... ob = (ob, self.__counter) 2189... return ob 2190... 2191>>> class A: # not in docs from here, just testing the ExtendedRef 2192... pass 2193... 2194>>> a = A() 2195>>> r = ExtendedRef(a, foo=1, bar="baz") 2196>>> r.foo 21971 2198>>> r.bar 2199'baz' 2200>>> r()[1] 22011 2202>>> r()[1] 22032 2204>>> r()[0] is a 2205True 2206 2207 2208>>> import weakref 2209>>> _id2obj_dict = weakref.WeakValueDictionary() 2210>>> def remember(obj): 2211... oid = id(obj) 2212... _id2obj_dict[oid] = obj 2213... return oid 2214... 2215>>> def id2obj(oid): 2216... return _id2obj_dict[oid] 2217... 2218>>> a = A() # from here, just testing 2219>>> a_id = remember(a) 2220>>> id2obj(a_id) is a 2221True 2222>>> del a 2223>>> gc_collect() # For PyPy or other GCs. 2224>>> try: 2225... id2obj(a_id) 2226... except KeyError: 2227... print('OK') 2228... else: 2229... print('WeakValueDictionary error') 2230OK 2231 2232""" 2233 2234__test__ = {'libreftest' : libreftest} 2235 2236def test_main(): 2237 support.run_unittest( 2238 ReferencesTestCase, 2239 WeakMethodTestCase, 2240 MappingTestCase, 2241 WeakValueDictionaryTestCase, 2242 WeakKeyDictionaryTestCase, 2243 SubclassableWeakrefTestCase, 2244 FinalizeTestCase, 2245 ) 2246 support.run_doctest(sys.modules[__name__]) 2247 2248 2249if __name__ == "__main__": 2250 test_main() 2251