1import gc 2import sys 3import unittest 4import collections 5import weakref 6import operator 7import contextlib 8import copy 9import threading 10import time 11import random 12 13from test import support 14from test.support import script_helper 15 16# Used in ReferencesTestCase.test_ref_created_during_del() . 17ref_from_del = None 18 19# Used by FinalizeTestCase as a global that may be replaced by None 20# when the interpreter shuts down. 21_global_var = 'foobar' 22 23class C: 24 def method(self): 25 pass 26 27 28class Callable: 29 bar = None 30 31 def __call__(self, x): 32 self.bar = x 33 34 35def create_function(): 36 def f(): pass 37 return f 38 39def create_bound_method(): 40 return C().method 41 42 43class Object: 44 def __init__(self, arg): 45 self.arg = arg 46 def __repr__(self): 47 return "<Object %r>" % self.arg 48 def __eq__(self, other): 49 if isinstance(other, Object): 50 return self.arg == other.arg 51 return NotImplemented 52 def __lt__(self, other): 53 if isinstance(other, Object): 54 return self.arg < other.arg 55 return NotImplemented 56 def __hash__(self): 57 return hash(self.arg) 58 def some_method(self): 59 return 4 60 def other_method(self): 61 return 5 62 63 64class RefCycle: 65 def __init__(self): 66 self.cycle = self 67 68 69class TestBase(unittest.TestCase): 70 71 def setUp(self): 72 self.cbcalled = 0 73 74 def callback(self, ref): 75 self.cbcalled += 1 76 77 78@contextlib.contextmanager 79def collect_in_thread(period=0.0001): 80 """ 81 Ensure GC collections happen in a different thread, at a high frequency. 82 """ 83 please_stop = False 84 85 def collect(): 86 while not please_stop: 87 time.sleep(period) 88 gc.collect() 89 90 with support.disable_gc(): 91 t = threading.Thread(target=collect) 92 t.start() 93 try: 94 yield 95 finally: 96 please_stop = True 97 t.join() 98 99 100class ReferencesTestCase(TestBase): 101 102 def test_basic_ref(self): 103 self.check_basic_ref(C) 104 self.check_basic_ref(create_function) 105 self.check_basic_ref(create_bound_method) 106 107 # Just make sure the tp_repr handler doesn't raise an exception. 108 # Live reference: 109 o = C() 110 wr = weakref.ref(o) 111 repr(wr) 112 # Dead reference: 113 del o 114 repr(wr) 115 116 def test_basic_callback(self): 117 self.check_basic_callback(C) 118 self.check_basic_callback(create_function) 119 self.check_basic_callback(create_bound_method) 120 121 @support.cpython_only 122 def test_cfunction(self): 123 import _testcapi 124 create_cfunction = _testcapi.create_cfunction 125 f = create_cfunction() 126 wr = weakref.ref(f) 127 self.assertIs(wr(), f) 128 del f 129 self.assertIsNone(wr()) 130 self.check_basic_ref(create_cfunction) 131 self.check_basic_callback(create_cfunction) 132 133 def test_multiple_callbacks(self): 134 o = C() 135 ref1 = weakref.ref(o, self.callback) 136 ref2 = weakref.ref(o, self.callback) 137 del o 138 self.assertIsNone(ref1(), "expected reference to be invalidated") 139 self.assertIsNone(ref2(), "expected reference to be invalidated") 140 self.assertEqual(self.cbcalled, 2, 141 "callback not called the right number of times") 142 143 def test_multiple_selfref_callbacks(self): 144 # Make sure all references are invalidated before callbacks are called 145 # 146 # What's important here is that we're using the first 147 # reference in the callback invoked on the second reference 148 # (the most recently created ref is cleaned up first). This 149 # tests that all references to the object are invalidated 150 # before any of the callbacks are invoked, so that we only 151 # have one invocation of _weakref.c:cleanup_helper() active 152 # for a particular object at a time. 153 # 154 def callback(object, self=self): 155 self.ref() 156 c = C() 157 self.ref = weakref.ref(c, callback) 158 ref1 = weakref.ref(c, callback) 159 del c 160 161 def test_constructor_kwargs(self): 162 c = C() 163 self.assertRaises(TypeError, weakref.ref, c, callback=None) 164 165 def test_proxy_ref(self): 166 o = C() 167 o.bar = 1 168 ref1 = weakref.proxy(o, self.callback) 169 ref2 = weakref.proxy(o, self.callback) 170 del o 171 172 def check(proxy): 173 proxy.bar 174 175 self.assertRaises(ReferenceError, check, ref1) 176 self.assertRaises(ReferenceError, check, ref2) 177 self.assertRaises(ReferenceError, bool, weakref.proxy(C())) 178 self.assertEqual(self.cbcalled, 2) 179 180 def check_basic_ref(self, factory): 181 o = factory() 182 ref = weakref.ref(o) 183 self.assertIsNotNone(ref(), 184 "weak reference to live object should be live") 185 o2 = ref() 186 self.assertIs(o, o2, 187 "<ref>() should return original object if live") 188 189 def check_basic_callback(self, factory): 190 self.cbcalled = 0 191 o = factory() 192 ref = weakref.ref(o, self.callback) 193 del o 194 self.assertEqual(self.cbcalled, 1, 195 "callback did not properly set 'cbcalled'") 196 self.assertIsNone(ref(), 197 "ref2 should be dead after deleting object reference") 198 199 def test_ref_reuse(self): 200 o = C() 201 ref1 = weakref.ref(o) 202 # create a proxy to make sure that there's an intervening creation 203 # between these two; it should make no difference 204 proxy = weakref.proxy(o) 205 ref2 = weakref.ref(o) 206 self.assertIs(ref1, ref2, 207 "reference object w/out callback should be re-used") 208 209 o = C() 210 proxy = weakref.proxy(o) 211 ref1 = weakref.ref(o) 212 ref2 = weakref.ref(o) 213 self.assertIs(ref1, ref2, 214 "reference object w/out callback should be re-used") 215 self.assertEqual(weakref.getweakrefcount(o), 2, 216 "wrong weak ref count for object") 217 del proxy 218 self.assertEqual(weakref.getweakrefcount(o), 1, 219 "wrong weak ref count for object after deleting proxy") 220 221 def test_proxy_reuse(self): 222 o = C() 223 proxy1 = weakref.proxy(o) 224 ref = weakref.ref(o) 225 proxy2 = weakref.proxy(o) 226 self.assertIs(proxy1, proxy2, 227 "proxy object w/out callback should have been re-used") 228 229 def test_basic_proxy(self): 230 o = C() 231 self.check_proxy(o, weakref.proxy(o)) 232 233 L = collections.UserList() 234 p = weakref.proxy(L) 235 self.assertFalse(p, "proxy for empty UserList should be false") 236 p.append(12) 237 self.assertEqual(len(L), 1) 238 self.assertTrue(p, "proxy for non-empty UserList should be true") 239 p[:] = [2, 3] 240 self.assertEqual(len(L), 2) 241 self.assertEqual(len(p), 2) 242 self.assertIn(3, p, "proxy didn't support __contains__() properly") 243 p[1] = 5 244 self.assertEqual(L[1], 5) 245 self.assertEqual(p[1], 5) 246 L2 = collections.UserList(L) 247 p2 = weakref.proxy(L2) 248 self.assertEqual(p, p2) 249 ## self.assertEqual(repr(L2), repr(p2)) 250 L3 = collections.UserList(range(10)) 251 p3 = weakref.proxy(L3) 252 self.assertEqual(L3[:], p3[:]) 253 self.assertEqual(L3[5:], p3[5:]) 254 self.assertEqual(L3[:5], p3[:5]) 255 self.assertEqual(L3[2:5], p3[2:5]) 256 257 def test_proxy_unicode(self): 258 # See bug 5037 259 class C(object): 260 def __str__(self): 261 return "string" 262 def __bytes__(self): 263 return b"bytes" 264 instance = C() 265 self.assertIn("__bytes__", dir(weakref.proxy(instance))) 266 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") 267 268 def test_proxy_index(self): 269 class C: 270 def __index__(self): 271 return 10 272 o = C() 273 p = weakref.proxy(o) 274 self.assertEqual(operator.index(p), 10) 275 276 def test_proxy_div(self): 277 class C: 278 def __floordiv__(self, other): 279 return 42 280 def __ifloordiv__(self, other): 281 return 21 282 o = C() 283 p = weakref.proxy(o) 284 self.assertEqual(p // 5, 42) 285 p //= 5 286 self.assertEqual(p, 21) 287 288 # The PyWeakref_* C API is documented as allowing either NULL or 289 # None as the value for the callback, where either means "no 290 # callback". The "no callback" ref and proxy objects are supposed 291 # to be shared so long as they exist by all callers so long as 292 # they are active. In Python 2.3.3 and earlier, this guarantee 293 # was not honored, and was broken in different ways for 294 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 295 296 def test_shared_ref_without_callback(self): 297 self.check_shared_without_callback(weakref.ref) 298 299 def test_shared_proxy_without_callback(self): 300 self.check_shared_without_callback(weakref.proxy) 301 302 def check_shared_without_callback(self, makeref): 303 o = Object(1) 304 p1 = makeref(o, None) 305 p2 = makeref(o, None) 306 self.assertIs(p1, p2, "both callbacks were None in the C API") 307 del p1, p2 308 p1 = makeref(o) 309 p2 = makeref(o, None) 310 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 311 del p1, p2 312 p1 = makeref(o) 313 p2 = makeref(o) 314 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 315 del p1, p2 316 p1 = makeref(o, None) 317 p2 = makeref(o) 318 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 319 320 def test_callable_proxy(self): 321 o = Callable() 322 ref1 = weakref.proxy(o) 323 324 self.check_proxy(o, ref1) 325 326 self.assertIs(type(ref1), weakref.CallableProxyType, 327 "proxy is not of callable type") 328 ref1('twinkies!') 329 self.assertEqual(o.bar, 'twinkies!', 330 "call through proxy not passed through to original") 331 ref1(x='Splat.') 332 self.assertEqual(o.bar, 'Splat.', 333 "call through proxy not passed through to original") 334 335 # expect due to too few args 336 self.assertRaises(TypeError, ref1) 337 338 # expect due to too many args 339 self.assertRaises(TypeError, ref1, 1, 2, 3) 340 341 def check_proxy(self, o, proxy): 342 o.foo = 1 343 self.assertEqual(proxy.foo, 1, 344 "proxy does not reflect attribute addition") 345 o.foo = 2 346 self.assertEqual(proxy.foo, 2, 347 "proxy does not reflect attribute modification") 348 del o.foo 349 self.assertFalse(hasattr(proxy, 'foo'), 350 "proxy does not reflect attribute removal") 351 352 proxy.foo = 1 353 self.assertEqual(o.foo, 1, 354 "object does not reflect attribute addition via proxy") 355 proxy.foo = 2 356 self.assertEqual(o.foo, 2, 357 "object does not reflect attribute modification via proxy") 358 del proxy.foo 359 self.assertFalse(hasattr(o, 'foo'), 360 "object does not reflect attribute removal via proxy") 361 362 def test_proxy_deletion(self): 363 # Test clearing of SF bug #762891 364 class Foo: 365 result = None 366 def __delitem__(self, accessor): 367 self.result = accessor 368 g = Foo() 369 f = weakref.proxy(g) 370 del f[0] 371 self.assertEqual(f.result, 0) 372 373 def test_proxy_bool(self): 374 # Test clearing of SF bug #1170766 375 class List(list): pass 376 lyst = List() 377 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 378 379 def test_getweakrefcount(self): 380 o = C() 381 ref1 = weakref.ref(o) 382 ref2 = weakref.ref(o, self.callback) 383 self.assertEqual(weakref.getweakrefcount(o), 2, 384 "got wrong number of weak reference objects") 385 386 proxy1 = weakref.proxy(o) 387 proxy2 = weakref.proxy(o, self.callback) 388 self.assertEqual(weakref.getweakrefcount(o), 4, 389 "got wrong number of weak reference objects") 390 391 del ref1, ref2, proxy1, proxy2 392 self.assertEqual(weakref.getweakrefcount(o), 0, 393 "weak reference objects not unlinked from" 394 " referent when discarded.") 395 396 # assumes ints do not support weakrefs 397 self.assertEqual(weakref.getweakrefcount(1), 0, 398 "got wrong number of weak reference objects for int") 399 400 def test_getweakrefs(self): 401 o = C() 402 ref1 = weakref.ref(o, self.callback) 403 ref2 = weakref.ref(o, self.callback) 404 del ref1 405 self.assertEqual(weakref.getweakrefs(o), [ref2], 406 "list of refs does not match") 407 408 o = C() 409 ref1 = weakref.ref(o, self.callback) 410 ref2 = weakref.ref(o, self.callback) 411 del ref2 412 self.assertEqual(weakref.getweakrefs(o), [ref1], 413 "list of refs does not match") 414 415 del ref1 416 self.assertEqual(weakref.getweakrefs(o), [], 417 "list of refs not cleared") 418 419 # assumes ints do not support weakrefs 420 self.assertEqual(weakref.getweakrefs(1), [], 421 "list of refs does not match for int") 422 423 def test_newstyle_number_ops(self): 424 class F(float): 425 pass 426 f = F(2.0) 427 p = weakref.proxy(f) 428 self.assertEqual(p + 1.0, 3.0) 429 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 430 431 def test_callbacks_protected(self): 432 # Callbacks protected from already-set exceptions? 433 # Regression test for SF bug #478534. 434 class BogusError(Exception): 435 pass 436 data = {} 437 def remove(k): 438 del data[k] 439 def encapsulate(): 440 f = lambda : () 441 data[weakref.ref(f, remove)] = None 442 raise BogusError 443 try: 444 encapsulate() 445 except BogusError: 446 pass 447 else: 448 self.fail("exception not properly restored") 449 try: 450 encapsulate() 451 except BogusError: 452 pass 453 else: 454 self.fail("exception not properly restored") 455 456 def test_sf_bug_840829(self): 457 # "weakref callbacks and gc corrupt memory" 458 # subtype_dealloc erroneously exposed a new-style instance 459 # already in the process of getting deallocated to gc, 460 # causing double-deallocation if the instance had a weakref 461 # callback that triggered gc. 462 # If the bug exists, there probably won't be an obvious symptom 463 # in a release build. In a debug build, a segfault will occur 464 # when the second attempt to remove the instance from the "list 465 # of all objects" occurs. 466 467 import gc 468 469 class C(object): 470 pass 471 472 c = C() 473 wr = weakref.ref(c, lambda ignore: gc.collect()) 474 del c 475 476 # There endeth the first part. It gets worse. 477 del wr 478 479 c1 = C() 480 c1.i = C() 481 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 482 483 c2 = C() 484 c2.c1 = c1 485 del c1 # still alive because c2 points to it 486 487 # Now when subtype_dealloc gets called on c2, it's not enough just 488 # that c2 is immune from gc while the weakref callbacks associated 489 # with c2 execute (there are none in this 2nd half of the test, btw). 490 # subtype_dealloc goes on to call the base classes' deallocs too, 491 # so any gc triggered by weakref callbacks associated with anything 492 # torn down by a base class dealloc can also trigger double 493 # deallocation of c2. 494 del c2 495 496 def test_callback_in_cycle_1(self): 497 import gc 498 499 class J(object): 500 pass 501 502 class II(object): 503 def acallback(self, ignore): 504 self.J 505 506 I = II() 507 I.J = J 508 I.wr = weakref.ref(J, I.acallback) 509 510 # Now J and II are each in a self-cycle (as all new-style class 511 # objects are, since their __mro__ points back to them). I holds 512 # both a weak reference (I.wr) and a strong reference (I.J) to class 513 # J. I is also in a cycle (I.wr points to a weakref that references 514 # I.acallback). When we del these three, they all become trash, but 515 # the cycles prevent any of them from getting cleaned up immediately. 516 # Instead they have to wait for cyclic gc to deduce that they're 517 # trash. 518 # 519 # gc used to call tp_clear on all of them, and the order in which 520 # it does that is pretty accidental. The exact order in which we 521 # built up these things manages to provoke gc into running tp_clear 522 # in just the right order (I last). Calling tp_clear on II leaves 523 # behind an insane class object (its __mro__ becomes NULL). Calling 524 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 525 # just then because of the strong reference from I.J. Calling 526 # tp_clear on I starts to clear I's __dict__, and just happens to 527 # clear I.J first -- I.wr is still intact. That removes the last 528 # reference to J, which triggers the weakref callback. The callback 529 # tries to do "self.J", and instances of new-style classes look up 530 # attributes ("J") in the class dict first. The class (II) wants to 531 # search II.__mro__, but that's NULL. The result was a segfault in 532 # a release build, and an assert failure in a debug build. 533 del I, J, II 534 gc.collect() 535 536 def test_callback_in_cycle_2(self): 537 import gc 538 539 # This is just like test_callback_in_cycle_1, except that II is an 540 # old-style class. The symptom is different then: an instance of an 541 # old-style class looks in its own __dict__ first. 'J' happens to 542 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 543 # __dict__, so the attribute isn't found. The difference is that 544 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 545 # __mro__), so no segfault occurs. Instead it got: 546 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 547 # Exception exceptions.AttributeError: 548 # "II instance has no attribute 'J'" in <bound method II.acallback 549 # of <?.II instance at 0x00B9B4B8>> ignored 550 551 class J(object): 552 pass 553 554 class II: 555 def acallback(self, ignore): 556 self.J 557 558 I = II() 559 I.J = J 560 I.wr = weakref.ref(J, I.acallback) 561 562 del I, J, II 563 gc.collect() 564 565 def test_callback_in_cycle_3(self): 566 import gc 567 568 # This one broke the first patch that fixed the last two. In this 569 # case, the objects reachable from the callback aren't also reachable 570 # from the object (c1) *triggering* the callback: you can get to 571 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 572 # got tp_clear'ed by the time the c2.cb callback got invoked. 573 574 class C: 575 def cb(self, ignore): 576 self.me 577 self.c1 578 self.wr 579 580 c1, c2 = C(), C() 581 582 c2.me = c2 583 c2.c1 = c1 584 c2.wr = weakref.ref(c1, c2.cb) 585 586 del c1, c2 587 gc.collect() 588 589 def test_callback_in_cycle_4(self): 590 import gc 591 592 # Like test_callback_in_cycle_3, except c2 and c1 have different 593 # classes. c2's class (C) isn't reachable from c1 then, so protecting 594 # objects reachable from the dying object (c1) isn't enough to stop 595 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 596 # The result was a segfault (C.__mro__ was NULL when the callback 597 # tried to look up self.me). 598 599 class C(object): 600 def cb(self, ignore): 601 self.me 602 self.c1 603 self.wr 604 605 class D: 606 pass 607 608 c1, c2 = D(), C() 609 610 c2.me = c2 611 c2.c1 = c1 612 c2.wr = weakref.ref(c1, c2.cb) 613 614 del c1, c2, C, D 615 gc.collect() 616 617 @support.requires_type_collecting 618 def test_callback_in_cycle_resurrection(self): 619 import gc 620 621 # Do something nasty in a weakref callback: resurrect objects 622 # from dead cycles. For this to be attempted, the weakref and 623 # its callback must also be part of the cyclic trash (else the 624 # objects reachable via the callback couldn't be in cyclic trash 625 # to begin with -- the callback would act like an external root). 626 # But gc clears trash weakrefs with callbacks early now, which 627 # disables the callbacks, so the callbacks shouldn't get called 628 # at all (and so nothing actually gets resurrected). 629 630 alist = [] 631 class C(object): 632 def __init__(self, value): 633 self.attribute = value 634 635 def acallback(self, ignore): 636 alist.append(self.c) 637 638 c1, c2 = C(1), C(2) 639 c1.c = c2 640 c2.c = c1 641 c1.wr = weakref.ref(c2, c1.acallback) 642 c2.wr = weakref.ref(c1, c2.acallback) 643 644 def C_went_away(ignore): 645 alist.append("C went away") 646 wr = weakref.ref(C, C_went_away) 647 648 del c1, c2, C # make them all trash 649 self.assertEqual(alist, []) # del isn't enough to reclaim anything 650 651 gc.collect() 652 # c1.wr and c2.wr were part of the cyclic trash, so should have 653 # been cleared without their callbacks executing. OTOH, the weakref 654 # to C is bound to a function local (wr), and wasn't trash, so that 655 # callback should have been invoked when C went away. 656 self.assertEqual(alist, ["C went away"]) 657 # The remaining weakref should be dead now (its callback ran). 658 self.assertEqual(wr(), None) 659 660 del alist[:] 661 gc.collect() 662 self.assertEqual(alist, []) 663 664 def test_callbacks_on_callback(self): 665 import gc 666 667 # Set up weakref callbacks *on* weakref callbacks. 668 alist = [] 669 def safe_callback(ignore): 670 alist.append("safe_callback called") 671 672 class C(object): 673 def cb(self, ignore): 674 alist.append("cb called") 675 676 c, d = C(), C() 677 c.other = d 678 d.other = c 679 callback = c.cb 680 c.wr = weakref.ref(d, callback) # this won't trigger 681 d.wr = weakref.ref(callback, d.cb) # ditto 682 external_wr = weakref.ref(callback, safe_callback) # but this will 683 self.assertIs(external_wr(), callback) 684 685 # The weakrefs attached to c and d should get cleared, so that 686 # C.cb is never called. But external_wr isn't part of the cyclic 687 # trash, and no cyclic trash is reachable from it, so safe_callback 688 # should get invoked when the bound method object callback (c.cb) 689 # -- which is itself a callback, and also part of the cyclic trash -- 690 # gets reclaimed at the end of gc. 691 692 del callback, c, d, C 693 self.assertEqual(alist, []) # del isn't enough to clean up cycles 694 gc.collect() 695 self.assertEqual(alist, ["safe_callback called"]) 696 self.assertEqual(external_wr(), None) 697 698 del alist[:] 699 gc.collect() 700 self.assertEqual(alist, []) 701 702 def test_gc_during_ref_creation(self): 703 self.check_gc_during_creation(weakref.ref) 704 705 def test_gc_during_proxy_creation(self): 706 self.check_gc_during_creation(weakref.proxy) 707 708 def check_gc_during_creation(self, makeref): 709 thresholds = gc.get_threshold() 710 gc.set_threshold(1, 1, 1) 711 gc.collect() 712 class A: 713 pass 714 715 def callback(*args): 716 pass 717 718 referenced = A() 719 720 a = A() 721 a.a = a 722 a.wr = makeref(referenced) 723 724 try: 725 # now make sure the object and the ref get labeled as 726 # cyclic trash: 727 a = A() 728 weakref.ref(referenced, callback) 729 730 finally: 731 gc.set_threshold(*thresholds) 732 733 def test_ref_created_during_del(self): 734 # Bug #1377858 735 # A weakref created in an object's __del__() would crash the 736 # interpreter when the weakref was cleaned up since it would refer to 737 # non-existent memory. This test should not segfault the interpreter. 738 class Target(object): 739 def __del__(self): 740 global ref_from_del 741 ref_from_del = weakref.ref(self) 742 743 w = Target() 744 745 def test_init(self): 746 # Issue 3634 747 # <weakref to class>.__init__() doesn't check errors correctly 748 r = weakref.ref(Exception) 749 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 750 # No exception should be raised here 751 gc.collect() 752 753 def test_classes(self): 754 # Check that classes are weakrefable. 755 class A(object): 756 pass 757 l = [] 758 weakref.ref(int) 759 a = weakref.ref(A, l.append) 760 A = None 761 gc.collect() 762 self.assertEqual(a(), None) 763 self.assertEqual(l, [a]) 764 765 def test_equality(self): 766 # Alive weakrefs defer equality testing to their underlying object. 767 x = Object(1) 768 y = Object(1) 769 z = Object(2) 770 a = weakref.ref(x) 771 b = weakref.ref(y) 772 c = weakref.ref(z) 773 d = weakref.ref(x) 774 # Note how we directly test the operators here, to stress both 775 # __eq__ and __ne__. 776 self.assertTrue(a == b) 777 self.assertFalse(a != b) 778 self.assertFalse(a == c) 779 self.assertTrue(a != c) 780 self.assertTrue(a == d) 781 self.assertFalse(a != d) 782 del x, y, z 783 gc.collect() 784 for r in a, b, c: 785 # Sanity check 786 self.assertIs(r(), None) 787 # Dead weakrefs compare by identity: whether `a` and `d` are the 788 # same weakref object is an implementation detail, since they pointed 789 # to the same original object and didn't have a callback. 790 # (see issue #16453). 791 self.assertFalse(a == b) 792 self.assertTrue(a != b) 793 self.assertFalse(a == c) 794 self.assertTrue(a != c) 795 self.assertEqual(a == d, a is d) 796 self.assertEqual(a != d, a is not d) 797 798 def test_ordering(self): 799 # weakrefs cannot be ordered, even if the underlying objects can. 800 ops = [operator.lt, operator.gt, operator.le, operator.ge] 801 x = Object(1) 802 y = Object(1) 803 a = weakref.ref(x) 804 b = weakref.ref(y) 805 for op in ops: 806 self.assertRaises(TypeError, op, a, b) 807 # Same when dead. 808 del x, y 809 gc.collect() 810 for op in ops: 811 self.assertRaises(TypeError, op, a, b) 812 813 def test_hashing(self): 814 # Alive weakrefs hash the same as the underlying object 815 x = Object(42) 816 y = Object(42) 817 a = weakref.ref(x) 818 b = weakref.ref(y) 819 self.assertEqual(hash(a), hash(42)) 820 del x, y 821 gc.collect() 822 # Dead weakrefs: 823 # - retain their hash is they were hashed when alive; 824 # - otherwise, cannot be hashed. 825 self.assertEqual(hash(a), hash(42)) 826 self.assertRaises(TypeError, hash, b) 827 828 def test_trashcan_16602(self): 829 # Issue #16602: when a weakref's target was part of a long 830 # deallocation chain, the trashcan mechanism could delay clearing 831 # of the weakref and make the target object visible from outside 832 # code even though its refcount had dropped to 0. A crash ensued. 833 class C: 834 def __init__(self, parent): 835 if not parent: 836 return 837 wself = weakref.ref(self) 838 def cb(wparent): 839 o = wself() 840 self.wparent = weakref.ref(parent, cb) 841 842 d = weakref.WeakKeyDictionary() 843 root = c = C(None) 844 for n in range(100): 845 d[c] = c = C(c) 846 del root 847 gc.collect() 848 849 def test_callback_attribute(self): 850 x = Object(1) 851 callback = lambda ref: None 852 ref1 = weakref.ref(x, callback) 853 self.assertIs(ref1.__callback__, callback) 854 855 ref2 = weakref.ref(x) 856 self.assertIsNone(ref2.__callback__) 857 858 def test_callback_attribute_after_deletion(self): 859 x = Object(1) 860 ref = weakref.ref(x, self.callback) 861 self.assertIsNotNone(ref.__callback__) 862 del x 863 support.gc_collect() 864 self.assertIsNone(ref.__callback__) 865 866 def test_set_callback_attribute(self): 867 x = Object(1) 868 callback = lambda ref: None 869 ref1 = weakref.ref(x, callback) 870 with self.assertRaises(AttributeError): 871 ref1.__callback__ = lambda ref: None 872 873 def test_callback_gcs(self): 874 class ObjectWithDel(Object): 875 def __del__(self): pass 876 x = ObjectWithDel(1) 877 ref1 = weakref.ref(x, lambda ref: support.gc_collect()) 878 del x 879 support.gc_collect() 880 881 882class SubclassableWeakrefTestCase(TestBase): 883 884 def test_subclass_refs(self): 885 class MyRef(weakref.ref): 886 def __init__(self, ob, callback=None, value=42): 887 self.value = value 888 super().__init__(ob, callback) 889 def __call__(self): 890 self.called = True 891 return super().__call__() 892 o = Object("foo") 893 mr = MyRef(o, value=24) 894 self.assertIs(mr(), o) 895 self.assertTrue(mr.called) 896 self.assertEqual(mr.value, 24) 897 del o 898 self.assertIsNone(mr()) 899 self.assertTrue(mr.called) 900 901 def test_subclass_refs_dont_replace_standard_refs(self): 902 class MyRef(weakref.ref): 903 pass 904 o = Object(42) 905 r1 = MyRef(o) 906 r2 = weakref.ref(o) 907 self.assertIsNot(r1, r2) 908 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 909 self.assertEqual(weakref.getweakrefcount(o), 2) 910 r3 = MyRef(o) 911 self.assertEqual(weakref.getweakrefcount(o), 3) 912 refs = weakref.getweakrefs(o) 913 self.assertEqual(len(refs), 3) 914 self.assertIs(r2, refs[0]) 915 self.assertIn(r1, refs[1:]) 916 self.assertIn(r3, refs[1:]) 917 918 def test_subclass_refs_dont_conflate_callbacks(self): 919 class MyRef(weakref.ref): 920 pass 921 o = Object(42) 922 r1 = MyRef(o, id) 923 r2 = MyRef(o, str) 924 self.assertIsNot(r1, r2) 925 refs = weakref.getweakrefs(o) 926 self.assertIn(r1, refs) 927 self.assertIn(r2, refs) 928 929 def test_subclass_refs_with_slots(self): 930 class MyRef(weakref.ref): 931 __slots__ = "slot1", "slot2" 932 def __new__(type, ob, callback, slot1, slot2): 933 return weakref.ref.__new__(type, ob, callback) 934 def __init__(self, ob, callback, slot1, slot2): 935 self.slot1 = slot1 936 self.slot2 = slot2 937 def meth(self): 938 return self.slot1 + self.slot2 939 o = Object(42) 940 r = MyRef(o, None, "abc", "def") 941 self.assertEqual(r.slot1, "abc") 942 self.assertEqual(r.slot2, "def") 943 self.assertEqual(r.meth(), "abcdef") 944 self.assertFalse(hasattr(r, "__dict__")) 945 946 def test_subclass_refs_with_cycle(self): 947 """Confirm https://bugs.python.org/issue3100 is fixed.""" 948 # An instance of a weakref subclass can have attributes. 949 # If such a weakref holds the only strong reference to the object, 950 # deleting the weakref will delete the object. In this case, 951 # the callback must not be called, because the ref object is 952 # being deleted. 953 class MyRef(weakref.ref): 954 pass 955 956 # Use a local callback, for "regrtest -R::" 957 # to detect refcounting problems 958 def callback(w): 959 self.cbcalled += 1 960 961 o = C() 962 r1 = MyRef(o, callback) 963 r1.o = o 964 del o 965 966 del r1 # Used to crash here 967 968 self.assertEqual(self.cbcalled, 0) 969 970 # Same test, with two weakrefs to the same object 971 # (since code paths are different) 972 o = C() 973 r1 = MyRef(o, callback) 974 r2 = MyRef(o, callback) 975 r1.r = r2 976 r2.o = o 977 del o 978 del r2 979 980 del r1 # Used to crash here 981 982 self.assertEqual(self.cbcalled, 0) 983 984 985class WeakMethodTestCase(unittest.TestCase): 986 987 def _subclass(self): 988 """Return an Object subclass overriding `some_method`.""" 989 class C(Object): 990 def some_method(self): 991 return 6 992 return C 993 994 def test_alive(self): 995 o = Object(1) 996 r = weakref.WeakMethod(o.some_method) 997 self.assertIsInstance(r, weakref.ReferenceType) 998 self.assertIsInstance(r(), type(o.some_method)) 999 self.assertIs(r().__self__, o) 1000 self.assertIs(r().__func__, o.some_method.__func__) 1001 self.assertEqual(r()(), 4) 1002 1003 def test_object_dead(self): 1004 o = Object(1) 1005 r = weakref.WeakMethod(o.some_method) 1006 del o 1007 gc.collect() 1008 self.assertIs(r(), None) 1009 1010 def test_method_dead(self): 1011 C = self._subclass() 1012 o = C(1) 1013 r = weakref.WeakMethod(o.some_method) 1014 del C.some_method 1015 gc.collect() 1016 self.assertIs(r(), None) 1017 1018 def test_callback_when_object_dead(self): 1019 # Test callback behaviour when object dies first. 1020 C = self._subclass() 1021 calls = [] 1022 def cb(arg): 1023 calls.append(arg) 1024 o = C(1) 1025 r = weakref.WeakMethod(o.some_method, cb) 1026 del o 1027 gc.collect() 1028 self.assertEqual(calls, [r]) 1029 # Callback is only called once. 1030 C.some_method = Object.some_method 1031 gc.collect() 1032 self.assertEqual(calls, [r]) 1033 1034 def test_callback_when_method_dead(self): 1035 # Test callback behaviour when method dies first. 1036 C = self._subclass() 1037 calls = [] 1038 def cb(arg): 1039 calls.append(arg) 1040 o = C(1) 1041 r = weakref.WeakMethod(o.some_method, cb) 1042 del C.some_method 1043 gc.collect() 1044 self.assertEqual(calls, [r]) 1045 # Callback is only called once. 1046 del o 1047 gc.collect() 1048 self.assertEqual(calls, [r]) 1049 1050 @support.cpython_only 1051 def test_no_cycles(self): 1052 # A WeakMethod doesn't create any reference cycle to itself. 1053 o = Object(1) 1054 def cb(_): 1055 pass 1056 r = weakref.WeakMethod(o.some_method, cb) 1057 wr = weakref.ref(r) 1058 del r 1059 self.assertIs(wr(), None) 1060 1061 def test_equality(self): 1062 def _eq(a, b): 1063 self.assertTrue(a == b) 1064 self.assertFalse(a != b) 1065 def _ne(a, b): 1066 self.assertTrue(a != b) 1067 self.assertFalse(a == b) 1068 x = Object(1) 1069 y = Object(1) 1070 a = weakref.WeakMethod(x.some_method) 1071 b = weakref.WeakMethod(y.some_method) 1072 c = weakref.WeakMethod(x.other_method) 1073 d = weakref.WeakMethod(y.other_method) 1074 # Objects equal, same method 1075 _eq(a, b) 1076 _eq(c, d) 1077 # Objects equal, different method 1078 _ne(a, c) 1079 _ne(a, d) 1080 _ne(b, c) 1081 _ne(b, d) 1082 # Objects unequal, same or different method 1083 z = Object(2) 1084 e = weakref.WeakMethod(z.some_method) 1085 f = weakref.WeakMethod(z.other_method) 1086 _ne(a, e) 1087 _ne(a, f) 1088 _ne(b, e) 1089 _ne(b, f) 1090 del x, y, z 1091 gc.collect() 1092 # Dead WeakMethods compare by identity 1093 refs = a, b, c, d, e, f 1094 for q in refs: 1095 for r in refs: 1096 self.assertEqual(q == r, q is r) 1097 self.assertEqual(q != r, q is not r) 1098 1099 def test_hashing(self): 1100 # Alive WeakMethods are hashable if the underlying object is 1101 # hashable. 1102 x = Object(1) 1103 y = Object(1) 1104 a = weakref.WeakMethod(x.some_method) 1105 b = weakref.WeakMethod(y.some_method) 1106 c = weakref.WeakMethod(y.other_method) 1107 # Since WeakMethod objects are equal, the hashes should be equal. 1108 self.assertEqual(hash(a), hash(b)) 1109 ha = hash(a) 1110 # Dead WeakMethods retain their old hash value 1111 del x, y 1112 gc.collect() 1113 self.assertEqual(hash(a), ha) 1114 self.assertEqual(hash(b), ha) 1115 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. 1116 self.assertRaises(TypeError, hash, c) 1117 1118 1119class MappingTestCase(TestBase): 1120 1121 COUNT = 10 1122 1123 def check_len_cycles(self, dict_type, cons): 1124 N = 20 1125 items = [RefCycle() for i in range(N)] 1126 dct = dict_type(cons(o) for o in items) 1127 # Keep an iterator alive 1128 it = dct.items() 1129 try: 1130 next(it) 1131 except StopIteration: 1132 pass 1133 del items 1134 gc.collect() 1135 n1 = len(dct) 1136 del it 1137 gc.collect() 1138 n2 = len(dct) 1139 # one item may be kept alive inside the iterator 1140 self.assertIn(n1, (0, 1)) 1141 self.assertEqual(n2, 0) 1142 1143 def test_weak_keyed_len_cycles(self): 1144 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1145 1146 def test_weak_valued_len_cycles(self): 1147 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 1148 1149 def check_len_race(self, dict_type, cons): 1150 # Extended sanity checks for len() in the face of cyclic collection 1151 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 1152 for th in range(1, 100): 1153 N = 20 1154 gc.collect(0) 1155 gc.set_threshold(th, th, th) 1156 items = [RefCycle() for i in range(N)] 1157 dct = dict_type(cons(o) for o in items) 1158 del items 1159 # All items will be collected at next garbage collection pass 1160 it = dct.items() 1161 try: 1162 next(it) 1163 except StopIteration: 1164 pass 1165 n1 = len(dct) 1166 del it 1167 n2 = len(dct) 1168 self.assertGreaterEqual(n1, 0) 1169 self.assertLessEqual(n1, N) 1170 self.assertGreaterEqual(n2, 0) 1171 self.assertLessEqual(n2, n1) 1172 1173 def test_weak_keyed_len_race(self): 1174 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1175 1176 def test_weak_valued_len_race(self): 1177 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 1178 1179 def test_weak_values(self): 1180 # 1181 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1182 # 1183 dict, objects = self.make_weak_valued_dict() 1184 for o in objects: 1185 self.assertEqual(weakref.getweakrefcount(o), 1) 1186 self.assertIs(o, dict[o.arg], 1187 "wrong object returned by weak dict!") 1188 items1 = list(dict.items()) 1189 items2 = list(dict.copy().items()) 1190 items1.sort() 1191 items2.sort() 1192 self.assertEqual(items1, items2, 1193 "cloning of weak-valued dictionary did not work!") 1194 del items1, items2 1195 self.assertEqual(len(dict), self.COUNT) 1196 del objects[0] 1197 self.assertEqual(len(dict), self.COUNT - 1, 1198 "deleting object did not cause dictionary update") 1199 del objects, o 1200 self.assertEqual(len(dict), 0, 1201 "deleting the values did not clear the dictionary") 1202 # regression on SF bug #447152: 1203 dict = weakref.WeakValueDictionary() 1204 self.assertRaises(KeyError, dict.__getitem__, 1) 1205 dict[2] = C() 1206 self.assertRaises(KeyError, dict.__getitem__, 2) 1207 1208 def test_weak_keys(self): 1209 # 1210 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1211 # len(d), k in d. 1212 # 1213 dict, objects = self.make_weak_keyed_dict() 1214 for o in objects: 1215 self.assertEqual(weakref.getweakrefcount(o), 1, 1216 "wrong number of weak references to %r!" % o) 1217 self.assertIs(o.arg, dict[o], 1218 "wrong object returned by weak dict!") 1219 items1 = dict.items() 1220 items2 = dict.copy().items() 1221 self.assertEqual(set(items1), set(items2), 1222 "cloning of weak-keyed dictionary did not work!") 1223 del items1, items2 1224 self.assertEqual(len(dict), self.COUNT) 1225 del objects[0] 1226 self.assertEqual(len(dict), (self.COUNT - 1), 1227 "deleting object did not cause dictionary update") 1228 del objects, o 1229 self.assertEqual(len(dict), 0, 1230 "deleting the keys did not clear the dictionary") 1231 o = Object(42) 1232 dict[o] = "What is the meaning of the universe?" 1233 self.assertIn(o, dict) 1234 self.assertNotIn(34, dict) 1235 1236 def test_weak_keyed_iters(self): 1237 dict, objects = self.make_weak_keyed_dict() 1238 self.check_iters(dict) 1239 1240 # Test keyrefs() 1241 refs = dict.keyrefs() 1242 self.assertEqual(len(refs), len(objects)) 1243 objects2 = list(objects) 1244 for wr in refs: 1245 ob = wr() 1246 self.assertIn(ob, dict) 1247 self.assertIn(ob, dict) 1248 self.assertEqual(ob.arg, dict[ob]) 1249 objects2.remove(ob) 1250 self.assertEqual(len(objects2), 0) 1251 1252 # Test iterkeyrefs() 1253 objects2 = list(objects) 1254 self.assertEqual(len(list(dict.keyrefs())), len(objects)) 1255 for wr in dict.keyrefs(): 1256 ob = wr() 1257 self.assertIn(ob, dict) 1258 self.assertIn(ob, dict) 1259 self.assertEqual(ob.arg, dict[ob]) 1260 objects2.remove(ob) 1261 self.assertEqual(len(objects2), 0) 1262 1263 def test_weak_valued_iters(self): 1264 dict, objects = self.make_weak_valued_dict() 1265 self.check_iters(dict) 1266 1267 # Test valuerefs() 1268 refs = dict.valuerefs() 1269 self.assertEqual(len(refs), len(objects)) 1270 objects2 = list(objects) 1271 for wr in refs: 1272 ob = wr() 1273 self.assertEqual(ob, dict[ob.arg]) 1274 self.assertEqual(ob.arg, dict[ob.arg].arg) 1275 objects2.remove(ob) 1276 self.assertEqual(len(objects2), 0) 1277 1278 # Test itervaluerefs() 1279 objects2 = list(objects) 1280 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1281 for wr in dict.itervaluerefs(): 1282 ob = wr() 1283 self.assertEqual(ob, dict[ob.arg]) 1284 self.assertEqual(ob.arg, dict[ob.arg].arg) 1285 objects2.remove(ob) 1286 self.assertEqual(len(objects2), 0) 1287 1288 def check_iters(self, dict): 1289 # item iterator: 1290 items = list(dict.items()) 1291 for item in dict.items(): 1292 items.remove(item) 1293 self.assertFalse(items, "items() did not touch all items") 1294 1295 # key iterator, via __iter__(): 1296 keys = list(dict.keys()) 1297 for k in dict: 1298 keys.remove(k) 1299 self.assertFalse(keys, "__iter__() did not touch all keys") 1300 1301 # key iterator, via iterkeys(): 1302 keys = list(dict.keys()) 1303 for k in dict.keys(): 1304 keys.remove(k) 1305 self.assertFalse(keys, "iterkeys() did not touch all keys") 1306 1307 # value iterator: 1308 values = list(dict.values()) 1309 for v in dict.values(): 1310 values.remove(v) 1311 self.assertFalse(values, 1312 "itervalues() did not touch all values") 1313 1314 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1315 n = len(dict) 1316 it = iter(getattr(dict, iter_name)()) 1317 next(it) # Trigger internal iteration 1318 # Destroy an object 1319 del objects[-1] 1320 gc.collect() # just in case 1321 # We have removed either the first consumed object, or another one 1322 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1323 del it 1324 # The removal has been committed 1325 self.assertEqual(len(dict), n - 1) 1326 1327 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1328 # Check that we can explicitly mutate the weak dict without 1329 # interfering with delayed removal. 1330 # `testcontext` should create an iterator, destroy one of the 1331 # weakref'ed objects and then return a new key/value pair corresponding 1332 # to the destroyed object. 1333 with testcontext() as (k, v): 1334 self.assertNotIn(k, dict) 1335 with testcontext() as (k, v): 1336 self.assertRaises(KeyError, dict.__delitem__, k) 1337 self.assertNotIn(k, dict) 1338 with testcontext() as (k, v): 1339 self.assertRaises(KeyError, dict.pop, k) 1340 self.assertNotIn(k, dict) 1341 with testcontext() as (k, v): 1342 dict[k] = v 1343 self.assertEqual(dict[k], v) 1344 ddict = copy.copy(dict) 1345 with testcontext() as (k, v): 1346 dict.update(ddict) 1347 self.assertEqual(dict, ddict) 1348 with testcontext() as (k, v): 1349 dict.clear() 1350 self.assertEqual(len(dict), 0) 1351 1352 def check_weak_del_and_len_while_iterating(self, dict, testcontext): 1353 # Check that len() works when both iterating and removing keys 1354 # explicitly through various means (.pop(), .clear()...), while 1355 # implicit mutation is deferred because an iterator is alive. 1356 # (each call to testcontext() should schedule one item for removal 1357 # for this test to work properly) 1358 o = Object(123456) 1359 with testcontext(): 1360 n = len(dict) 1361 # Since underlaying dict is ordered, first item is popped 1362 dict.pop(next(dict.keys())) 1363 self.assertEqual(len(dict), n - 1) 1364 dict[o] = o 1365 self.assertEqual(len(dict), n) 1366 # last item in objects is removed from dict in context shutdown 1367 with testcontext(): 1368 self.assertEqual(len(dict), n - 1) 1369 # Then, (o, o) is popped 1370 dict.popitem() 1371 self.assertEqual(len(dict), n - 2) 1372 with testcontext(): 1373 self.assertEqual(len(dict), n - 3) 1374 del dict[next(dict.keys())] 1375 self.assertEqual(len(dict), n - 4) 1376 with testcontext(): 1377 self.assertEqual(len(dict), n - 5) 1378 dict.popitem() 1379 self.assertEqual(len(dict), n - 6) 1380 with testcontext(): 1381 dict.clear() 1382 self.assertEqual(len(dict), 0) 1383 self.assertEqual(len(dict), 0) 1384 1385 def test_weak_keys_destroy_while_iterating(self): 1386 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1387 dict, objects = self.make_weak_keyed_dict() 1388 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1389 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1390 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1391 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') 1392 dict, objects = self.make_weak_keyed_dict() 1393 @contextlib.contextmanager 1394 def testcontext(): 1395 try: 1396 it = iter(dict.items()) 1397 next(it) 1398 # Schedule a key/value for removal and recreate it 1399 v = objects.pop().arg 1400 gc.collect() # just in case 1401 yield Object(v), v 1402 finally: 1403 it = None # should commit all removals 1404 gc.collect() 1405 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1406 # Issue #21173: len() fragile when keys are both implicitly and 1407 # explicitly removed. 1408 dict, objects = self.make_weak_keyed_dict() 1409 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1410 1411 def test_weak_values_destroy_while_iterating(self): 1412 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1413 dict, objects = self.make_weak_valued_dict() 1414 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1415 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1416 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1417 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1418 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') 1419 dict, objects = self.make_weak_valued_dict() 1420 @contextlib.contextmanager 1421 def testcontext(): 1422 try: 1423 it = iter(dict.items()) 1424 next(it) 1425 # Schedule a key/value for removal and recreate it 1426 k = objects.pop().arg 1427 gc.collect() # just in case 1428 yield k, Object(k) 1429 finally: 1430 it = None # should commit all removals 1431 gc.collect() 1432 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1433 dict, objects = self.make_weak_valued_dict() 1434 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1435 1436 def test_make_weak_keyed_dict_from_dict(self): 1437 o = Object(3) 1438 dict = weakref.WeakKeyDictionary({o:364}) 1439 self.assertEqual(dict[o], 364) 1440 1441 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1442 o = Object(3) 1443 dict = weakref.WeakKeyDictionary({o:364}) 1444 dict2 = weakref.WeakKeyDictionary(dict) 1445 self.assertEqual(dict[o], 364) 1446 1447 def make_weak_keyed_dict(self): 1448 dict = weakref.WeakKeyDictionary() 1449 objects = list(map(Object, range(self.COUNT))) 1450 for o in objects: 1451 dict[o] = o.arg 1452 return dict, objects 1453 1454 def test_make_weak_valued_dict_from_dict(self): 1455 o = Object(3) 1456 dict = weakref.WeakValueDictionary({364:o}) 1457 self.assertEqual(dict[364], o) 1458 1459 def test_make_weak_valued_dict_from_weak_valued_dict(self): 1460 o = Object(3) 1461 dict = weakref.WeakValueDictionary({364:o}) 1462 dict2 = weakref.WeakValueDictionary(dict) 1463 self.assertEqual(dict[364], o) 1464 1465 def test_make_weak_valued_dict_misc(self): 1466 # errors 1467 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1468 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1469 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1470 # special keyword arguments 1471 o = Object(3) 1472 for kw in 'self', 'dict', 'other', 'iterable': 1473 d = weakref.WeakValueDictionary(**{kw: o}) 1474 self.assertEqual(list(d.keys()), [kw]) 1475 self.assertEqual(d[kw], o) 1476 1477 def make_weak_valued_dict(self): 1478 dict = weakref.WeakValueDictionary() 1479 objects = list(map(Object, range(self.COUNT))) 1480 for o in objects: 1481 dict[o.arg] = o 1482 return dict, objects 1483 1484 def check_popitem(self, klass, key1, value1, key2, value2): 1485 weakdict = klass() 1486 weakdict[key1] = value1 1487 weakdict[key2] = value2 1488 self.assertEqual(len(weakdict), 2) 1489 k, v = weakdict.popitem() 1490 self.assertEqual(len(weakdict), 1) 1491 if k is key1: 1492 self.assertIs(v, value1) 1493 else: 1494 self.assertIs(v, value2) 1495 k, v = weakdict.popitem() 1496 self.assertEqual(len(weakdict), 0) 1497 if k is key1: 1498 self.assertIs(v, value1) 1499 else: 1500 self.assertIs(v, value2) 1501 1502 def test_weak_valued_dict_popitem(self): 1503 self.check_popitem(weakref.WeakValueDictionary, 1504 "key1", C(), "key2", C()) 1505 1506 def test_weak_keyed_dict_popitem(self): 1507 self.check_popitem(weakref.WeakKeyDictionary, 1508 C(), "value 1", C(), "value 2") 1509 1510 def check_setdefault(self, klass, key, value1, value2): 1511 self.assertIsNot(value1, value2, 1512 "invalid test" 1513 " -- value parameters must be distinct objects") 1514 weakdict = klass() 1515 o = weakdict.setdefault(key, value1) 1516 self.assertIs(o, value1) 1517 self.assertIn(key, weakdict) 1518 self.assertIs(weakdict.get(key), value1) 1519 self.assertIs(weakdict[key], value1) 1520 1521 o = weakdict.setdefault(key, value2) 1522 self.assertIs(o, value1) 1523 self.assertIn(key, weakdict) 1524 self.assertIs(weakdict.get(key), value1) 1525 self.assertIs(weakdict[key], value1) 1526 1527 def test_weak_valued_dict_setdefault(self): 1528 self.check_setdefault(weakref.WeakValueDictionary, 1529 "key", C(), C()) 1530 1531 def test_weak_keyed_dict_setdefault(self): 1532 self.check_setdefault(weakref.WeakKeyDictionary, 1533 C(), "value 1", "value 2") 1534 1535 def check_update(self, klass, dict): 1536 # 1537 # This exercises d.update(), len(d), d.keys(), k in d, 1538 # d.get(), d[]. 1539 # 1540 weakdict = klass() 1541 weakdict.update(dict) 1542 self.assertEqual(len(weakdict), len(dict)) 1543 for k in weakdict.keys(): 1544 self.assertIn(k, dict, "mysterious new key appeared in weak dict") 1545 v = dict.get(k) 1546 self.assertIs(v, weakdict[k]) 1547 self.assertIs(v, weakdict.get(k)) 1548 for k in dict.keys(): 1549 self.assertIn(k, weakdict, "original key disappeared in weak dict") 1550 v = dict[k] 1551 self.assertIs(v, weakdict[k]) 1552 self.assertIs(v, weakdict.get(k)) 1553 1554 def test_weak_valued_dict_update(self): 1555 self.check_update(weakref.WeakValueDictionary, 1556 {1: C(), 'a': C(), C(): C()}) 1557 # errors 1558 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1559 d = weakref.WeakValueDictionary() 1560 self.assertRaises(TypeError, d.update, {}, {}) 1561 self.assertRaises(TypeError, d.update, (), ()) 1562 self.assertEqual(list(d.keys()), []) 1563 # special keyword arguments 1564 o = Object(3) 1565 for kw in 'self', 'dict', 'other', 'iterable': 1566 d = weakref.WeakValueDictionary() 1567 d.update(**{kw: o}) 1568 self.assertEqual(list(d.keys()), [kw]) 1569 self.assertEqual(d[kw], o) 1570 1571 def test_weak_keyed_dict_update(self): 1572 self.check_update(weakref.WeakKeyDictionary, 1573 {C(): 1, C(): 2, C(): 3}) 1574 1575 def test_weak_keyed_delitem(self): 1576 d = weakref.WeakKeyDictionary() 1577 o1 = Object('1') 1578 o2 = Object('2') 1579 d[o1] = 'something' 1580 d[o2] = 'something' 1581 self.assertEqual(len(d), 2) 1582 del d[o1] 1583 self.assertEqual(len(d), 1) 1584 self.assertEqual(list(d.keys()), [o2]) 1585 1586 def test_weak_valued_delitem(self): 1587 d = weakref.WeakValueDictionary() 1588 o1 = Object('1') 1589 o2 = Object('2') 1590 d['something'] = o1 1591 d['something else'] = o2 1592 self.assertEqual(len(d), 2) 1593 del d['something'] 1594 self.assertEqual(len(d), 1) 1595 self.assertEqual(list(d.items()), [('something else', o2)]) 1596 1597 def test_weak_keyed_bad_delitem(self): 1598 d = weakref.WeakKeyDictionary() 1599 o = Object('1') 1600 # An attempt to delete an object that isn't there should raise 1601 # KeyError. It didn't before 2.3. 1602 self.assertRaises(KeyError, d.__delitem__, o) 1603 self.assertRaises(KeyError, d.__getitem__, o) 1604 1605 # If a key isn't of a weakly referencable type, __getitem__ and 1606 # __setitem__ raise TypeError. __delitem__ should too. 1607 self.assertRaises(TypeError, d.__delitem__, 13) 1608 self.assertRaises(TypeError, d.__getitem__, 13) 1609 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1610 1611 def test_weak_keyed_cascading_deletes(self): 1612 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1613 # over the keys via self.data.iterkeys(). If things vanished from 1614 # the dict during this (or got added), that caused a RuntimeError. 1615 1616 d = weakref.WeakKeyDictionary() 1617 mutate = False 1618 1619 class C(object): 1620 def __init__(self, i): 1621 self.value = i 1622 def __hash__(self): 1623 return hash(self.value) 1624 def __eq__(self, other): 1625 if mutate: 1626 # Side effect that mutates the dict, by removing the 1627 # last strong reference to a key. 1628 del objs[-1] 1629 return self.value == other.value 1630 1631 objs = [C(i) for i in range(4)] 1632 for o in objs: 1633 d[o] = o.value 1634 del o # now the only strong references to keys are in objs 1635 # Find the order in which iterkeys sees the keys. 1636 objs = list(d.keys()) 1637 # Reverse it, so that the iteration implementation of __delitem__ 1638 # has to keep looping to find the first object we delete. 1639 objs.reverse() 1640 1641 # Turn on mutation in C.__eq__. The first time through the loop, 1642 # under the iterkeys() business the first comparison will delete 1643 # the last item iterkeys() would see, and that causes a 1644 # RuntimeError: dictionary changed size during iteration 1645 # when the iterkeys() loop goes around to try comparing the next 1646 # key. After this was fixed, it just deletes the last object *our* 1647 # "for o in obj" loop would have gotten to. 1648 mutate = True 1649 count = 0 1650 for o in objs: 1651 count += 1 1652 del d[o] 1653 self.assertEqual(len(d), 0) 1654 self.assertEqual(count, 2) 1655 1656 def test_make_weak_valued_dict_repr(self): 1657 dict = weakref.WeakValueDictionary() 1658 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') 1659 1660 def test_make_weak_keyed_dict_repr(self): 1661 dict = weakref.WeakKeyDictionary() 1662 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') 1663 1664 def test_threaded_weak_valued_setdefault(self): 1665 d = weakref.WeakValueDictionary() 1666 with collect_in_thread(): 1667 for i in range(100000): 1668 x = d.setdefault(10, RefCycle()) 1669 self.assertIsNot(x, None) # we never put None in there! 1670 del x 1671 1672 def test_threaded_weak_valued_pop(self): 1673 d = weakref.WeakValueDictionary() 1674 with collect_in_thread(): 1675 for i in range(100000): 1676 d[10] = RefCycle() 1677 x = d.pop(10, 10) 1678 self.assertIsNot(x, None) # we never put None in there! 1679 1680 def test_threaded_weak_valued_consistency(self): 1681 # Issue #28427: old keys should not remove new values from 1682 # WeakValueDictionary when collecting from another thread. 1683 d = weakref.WeakValueDictionary() 1684 with collect_in_thread(): 1685 for i in range(200000): 1686 o = RefCycle() 1687 d[10] = o 1688 # o is still alive, so the dict can't be empty 1689 self.assertEqual(len(d), 1) 1690 o = None # lose ref 1691 1692 def check_threaded_weak_dict_copy(self, type_, deepcopy): 1693 # `type_` should be either WeakKeyDictionary or WeakValueDictionary. 1694 # `deepcopy` should be either True or False. 1695 exc = [] 1696 1697 class DummyKey: 1698 def __init__(self, ctr): 1699 self.ctr = ctr 1700 1701 class DummyValue: 1702 def __init__(self, ctr): 1703 self.ctr = ctr 1704 1705 def dict_copy(d, exc): 1706 try: 1707 if deepcopy is True: 1708 _ = copy.deepcopy(d) 1709 else: 1710 _ = d.copy() 1711 except Exception as ex: 1712 exc.append(ex) 1713 1714 def pop_and_collect(lst): 1715 gc_ctr = 0 1716 while lst: 1717 i = random.randint(0, len(lst) - 1) 1718 gc_ctr += 1 1719 lst.pop(i) 1720 if gc_ctr % 10000 == 0: 1721 gc.collect() # just in case 1722 1723 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) 1724 1725 d = type_() 1726 keys = [] 1727 values = [] 1728 # Initialize d with many entries 1729 for i in range(70000): 1730 k, v = DummyKey(i), DummyValue(i) 1731 keys.append(k) 1732 values.append(v) 1733 d[k] = v 1734 del k 1735 del v 1736 1737 t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) 1738 if type_ is weakref.WeakKeyDictionary: 1739 t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) 1740 else: # weakref.WeakValueDictionary 1741 t_collect = threading.Thread(target=pop_and_collect, args=(values,)) 1742 1743 t_copy.start() 1744 t_collect.start() 1745 1746 t_copy.join() 1747 t_collect.join() 1748 1749 # Test exceptions 1750 if exc: 1751 raise exc[0] 1752 1753 def test_threaded_weak_key_dict_copy(self): 1754 # Issue #35615: Weakref keys or values getting GC'ed during dict 1755 # copying should not result in a crash. 1756 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) 1757 1758 def test_threaded_weak_key_dict_deepcopy(self): 1759 # Issue #35615: Weakref keys or values getting GC'ed during dict 1760 # copying should not result in a crash. 1761 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) 1762 1763 def test_threaded_weak_value_dict_copy(self): 1764 # Issue #35615: Weakref keys or values getting GC'ed during dict 1765 # copying should not result in a crash. 1766 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) 1767 1768 def test_threaded_weak_value_dict_deepcopy(self): 1769 # Issue #35615: Weakref keys or values getting GC'ed during dict 1770 # copying should not result in a crash. 1771 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) 1772 1773 1774from test import mapping_tests 1775 1776class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1777 """Check that WeakValueDictionary conforms to the mapping protocol""" 1778 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1779 type2test = weakref.WeakValueDictionary 1780 def _reference(self): 1781 return self.__ref.copy() 1782 1783class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1784 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1785 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1786 type2test = weakref.WeakKeyDictionary 1787 def _reference(self): 1788 return self.__ref.copy() 1789 1790 1791class FinalizeTestCase(unittest.TestCase): 1792 1793 class A: 1794 pass 1795 1796 def _collect_if_necessary(self): 1797 # we create no ref-cycles so in CPython no gc should be needed 1798 if sys.implementation.name != 'cpython': 1799 support.gc_collect() 1800 1801 def test_finalize(self): 1802 def add(x,y,z): 1803 res.append(x + y + z) 1804 return x + y + z 1805 1806 a = self.A() 1807 1808 res = [] 1809 f = weakref.finalize(a, add, 67, 43, z=89) 1810 self.assertEqual(f.alive, True) 1811 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) 1812 self.assertEqual(f(), 199) 1813 self.assertEqual(f(), None) 1814 self.assertEqual(f(), None) 1815 self.assertEqual(f.peek(), None) 1816 self.assertEqual(f.detach(), None) 1817 self.assertEqual(f.alive, False) 1818 self.assertEqual(res, [199]) 1819 1820 res = [] 1821 f = weakref.finalize(a, add, 67, 43, 89) 1822 self.assertEqual(f.peek(), (a, add, (67,43,89), {})) 1823 self.assertEqual(f.detach(), (a, add, (67,43,89), {})) 1824 self.assertEqual(f(), None) 1825 self.assertEqual(f(), None) 1826 self.assertEqual(f.peek(), None) 1827 self.assertEqual(f.detach(), None) 1828 self.assertEqual(f.alive, False) 1829 self.assertEqual(res, []) 1830 1831 res = [] 1832 f = weakref.finalize(a, add, x=67, y=43, z=89) 1833 del a 1834 self._collect_if_necessary() 1835 self.assertEqual(f(), None) 1836 self.assertEqual(f(), None) 1837 self.assertEqual(f.peek(), None) 1838 self.assertEqual(f.detach(), None) 1839 self.assertEqual(f.alive, False) 1840 self.assertEqual(res, [199]) 1841 1842 def test_order(self): 1843 a = self.A() 1844 res = [] 1845 1846 f1 = weakref.finalize(a, res.append, 'f1') 1847 f2 = weakref.finalize(a, res.append, 'f2') 1848 f3 = weakref.finalize(a, res.append, 'f3') 1849 f4 = weakref.finalize(a, res.append, 'f4') 1850 f5 = weakref.finalize(a, res.append, 'f5') 1851 1852 # make sure finalizers can keep themselves alive 1853 del f1, f4 1854 1855 self.assertTrue(f2.alive) 1856 self.assertTrue(f3.alive) 1857 self.assertTrue(f5.alive) 1858 1859 self.assertTrue(f5.detach()) 1860 self.assertFalse(f5.alive) 1861 1862 f5() # nothing because previously unregistered 1863 res.append('A') 1864 f3() # => res.append('f3') 1865 self.assertFalse(f3.alive) 1866 res.append('B') 1867 f3() # nothing because previously called 1868 res.append('C') 1869 del a 1870 self._collect_if_necessary() 1871 # => res.append('f4') 1872 # => res.append('f2') 1873 # => res.append('f1') 1874 self.assertFalse(f2.alive) 1875 res.append('D') 1876 f2() # nothing because previously called by gc 1877 1878 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] 1879 self.assertEqual(res, expected) 1880 1881 def test_all_freed(self): 1882 # we want a weakrefable subclass of weakref.finalize 1883 class MyFinalizer(weakref.finalize): 1884 pass 1885 1886 a = self.A() 1887 res = [] 1888 def callback(): 1889 res.append(123) 1890 f = MyFinalizer(a, callback) 1891 1892 wr_callback = weakref.ref(callback) 1893 wr_f = weakref.ref(f) 1894 del callback, f 1895 1896 self.assertIsNotNone(wr_callback()) 1897 self.assertIsNotNone(wr_f()) 1898 1899 del a 1900 self._collect_if_necessary() 1901 1902 self.assertIsNone(wr_callback()) 1903 self.assertIsNone(wr_f()) 1904 self.assertEqual(res, [123]) 1905 1906 @classmethod 1907 def run_in_child(cls): 1908 def error(): 1909 # Create an atexit finalizer from inside a finalizer called 1910 # at exit. This should be the next to be run. 1911 g1 = weakref.finalize(cls, print, 'g1') 1912 print('f3 error') 1913 1/0 1914 1915 # cls should stay alive till atexit callbacks run 1916 f1 = weakref.finalize(cls, print, 'f1', _global_var) 1917 f2 = weakref.finalize(cls, print, 'f2', _global_var) 1918 f3 = weakref.finalize(cls, error) 1919 f4 = weakref.finalize(cls, print, 'f4', _global_var) 1920 1921 assert f1.atexit == True 1922 f2.atexit = False 1923 assert f3.atexit == True 1924 assert f4.atexit == True 1925 1926 def test_atexit(self): 1927 prog = ('from test.test_weakref import FinalizeTestCase;'+ 1928 'FinalizeTestCase.run_in_child()') 1929 rc, out, err = script_helper.assert_python_ok('-c', prog) 1930 out = out.decode('ascii').splitlines() 1931 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) 1932 self.assertTrue(b'ZeroDivisionError' in err) 1933 1934 1935libreftest = """ Doctest for examples in the library reference: weakref.rst 1936 1937>>> import weakref 1938>>> class Dict(dict): 1939... pass 1940... 1941>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 1942>>> r = weakref.ref(obj) 1943>>> print(r() is obj) 1944True 1945 1946>>> import weakref 1947>>> class Object: 1948... pass 1949... 1950>>> o = Object() 1951>>> r = weakref.ref(o) 1952>>> o2 = r() 1953>>> o is o2 1954True 1955>>> del o, o2 1956>>> print(r()) 1957None 1958 1959>>> import weakref 1960>>> class ExtendedRef(weakref.ref): 1961... def __init__(self, ob, callback=None, **annotations): 1962... super().__init__(ob, callback) 1963... self.__counter = 0 1964... for k, v in annotations.items(): 1965... setattr(self, k, v) 1966... def __call__(self): 1967... '''Return a pair containing the referent and the number of 1968... times the reference has been called. 1969... ''' 1970... ob = super().__call__() 1971... if ob is not None: 1972... self.__counter += 1 1973... ob = (ob, self.__counter) 1974... return ob 1975... 1976>>> class A: # not in docs from here, just testing the ExtendedRef 1977... pass 1978... 1979>>> a = A() 1980>>> r = ExtendedRef(a, foo=1, bar="baz") 1981>>> r.foo 19821 1983>>> r.bar 1984'baz' 1985>>> r()[1] 19861 1987>>> r()[1] 19882 1989>>> r()[0] is a 1990True 1991 1992 1993>>> import weakref 1994>>> _id2obj_dict = weakref.WeakValueDictionary() 1995>>> def remember(obj): 1996... oid = id(obj) 1997... _id2obj_dict[oid] = obj 1998... return oid 1999... 2000>>> def id2obj(oid): 2001... return _id2obj_dict[oid] 2002... 2003>>> a = A() # from here, just testing 2004>>> a_id = remember(a) 2005>>> id2obj(a_id) is a 2006True 2007>>> del a 2008>>> try: 2009... id2obj(a_id) 2010... except KeyError: 2011... print('OK') 2012... else: 2013... print('WeakValueDictionary error') 2014OK 2015 2016""" 2017 2018__test__ = {'libreftest' : libreftest} 2019 2020def test_main(): 2021 support.run_unittest( 2022 ReferencesTestCase, 2023 WeakMethodTestCase, 2024 MappingTestCase, 2025 WeakValueDictionaryTestCase, 2026 WeakKeyDictionaryTestCase, 2027 SubclassableWeakrefTestCase, 2028 FinalizeTestCase, 2029 ) 2030 support.run_doctest(sys.modules[__name__]) 2031 2032 2033if __name__ == "__main__": 2034 test_main() 2035