1import gc 2import sys 3import unittest 4import collections 5import weakref 6import operator 7import contextlib 8import copy 9import threading 10import time 11import random 12 13from test import support 14from test.support import script_helper, ALWAYS_EQ 15 16# Used in ReferencesTestCase.test_ref_created_during_del() . 17ref_from_del = None 18 19# Used by FinalizeTestCase as a global that may be replaced by None 20# when the interpreter shuts down. 21_global_var = 'foobar' 22 23class C: 24 def method(self): 25 pass 26 27 28class Callable: 29 bar = None 30 31 def __call__(self, x): 32 self.bar = x 33 34 35def create_function(): 36 def f(): pass 37 return f 38 39def create_bound_method(): 40 return C().method 41 42 43class Object: 44 def __init__(self, arg): 45 self.arg = arg 46 def __repr__(self): 47 return "<Object %r>" % self.arg 48 def __eq__(self, other): 49 if isinstance(other, Object): 50 return self.arg == other.arg 51 return NotImplemented 52 def __lt__(self, other): 53 if isinstance(other, Object): 54 return self.arg < other.arg 55 return NotImplemented 56 def __hash__(self): 57 return hash(self.arg) 58 def some_method(self): 59 return 4 60 def other_method(self): 61 return 5 62 63 64class RefCycle: 65 def __init__(self): 66 self.cycle = self 67 68 69class TestBase(unittest.TestCase): 70 71 def setUp(self): 72 self.cbcalled = 0 73 74 def callback(self, ref): 75 self.cbcalled += 1 76 77 78@contextlib.contextmanager 79def collect_in_thread(period=0.0001): 80 """ 81 Ensure GC collections happen in a different thread, at a high frequency. 82 """ 83 please_stop = False 84 85 def collect(): 86 while not please_stop: 87 time.sleep(period) 88 gc.collect() 89 90 with support.disable_gc(): 91 t = threading.Thread(target=collect) 92 t.start() 93 try: 94 yield 95 finally: 96 please_stop = True 97 t.join() 98 99 100class ReferencesTestCase(TestBase): 101 102 def test_basic_ref(self): 103 self.check_basic_ref(C) 104 self.check_basic_ref(create_function) 105 self.check_basic_ref(create_bound_method) 106 107 # Just make sure the tp_repr handler doesn't raise an exception. 108 # Live reference: 109 o = C() 110 wr = weakref.ref(o) 111 repr(wr) 112 # Dead reference: 113 del o 114 repr(wr) 115 116 def test_basic_callback(self): 117 self.check_basic_callback(C) 118 self.check_basic_callback(create_function) 119 self.check_basic_callback(create_bound_method) 120 121 @support.cpython_only 122 def test_cfunction(self): 123 import _testcapi 124 create_cfunction = _testcapi.create_cfunction 125 f = create_cfunction() 126 wr = weakref.ref(f) 127 self.assertIs(wr(), f) 128 del f 129 self.assertIsNone(wr()) 130 self.check_basic_ref(create_cfunction) 131 self.check_basic_callback(create_cfunction) 132 133 def test_multiple_callbacks(self): 134 o = C() 135 ref1 = weakref.ref(o, self.callback) 136 ref2 = weakref.ref(o, self.callback) 137 del o 138 self.assertIsNone(ref1(), "expected reference to be invalidated") 139 self.assertIsNone(ref2(), "expected reference to be invalidated") 140 self.assertEqual(self.cbcalled, 2, 141 "callback not called the right number of times") 142 143 def test_multiple_selfref_callbacks(self): 144 # Make sure all references are invalidated before callbacks are called 145 # 146 # What's important here is that we're using the first 147 # reference in the callback invoked on the second reference 148 # (the most recently created ref is cleaned up first). This 149 # tests that all references to the object are invalidated 150 # before any of the callbacks are invoked, so that we only 151 # have one invocation of _weakref.c:cleanup_helper() active 152 # for a particular object at a time. 153 # 154 def callback(object, self=self): 155 self.ref() 156 c = C() 157 self.ref = weakref.ref(c, callback) 158 ref1 = weakref.ref(c, callback) 159 del c 160 161 def test_constructor_kwargs(self): 162 c = C() 163 self.assertRaises(TypeError, weakref.ref, c, callback=None) 164 165 def test_proxy_ref(self): 166 o = C() 167 o.bar = 1 168 ref1 = weakref.proxy(o, self.callback) 169 ref2 = weakref.proxy(o, self.callback) 170 del o 171 172 def check(proxy): 173 proxy.bar 174 175 self.assertRaises(ReferenceError, check, ref1) 176 self.assertRaises(ReferenceError, check, ref2) 177 self.assertRaises(ReferenceError, bool, weakref.proxy(C())) 178 self.assertEqual(self.cbcalled, 2) 179 180 def check_basic_ref(self, factory): 181 o = factory() 182 ref = weakref.ref(o) 183 self.assertIsNotNone(ref(), 184 "weak reference to live object should be live") 185 o2 = ref() 186 self.assertIs(o, o2, 187 "<ref>() should return original object if live") 188 189 def check_basic_callback(self, factory): 190 self.cbcalled = 0 191 o = factory() 192 ref = weakref.ref(o, self.callback) 193 del o 194 self.assertEqual(self.cbcalled, 1, 195 "callback did not properly set 'cbcalled'") 196 self.assertIsNone(ref(), 197 "ref2 should be dead after deleting object reference") 198 199 def test_ref_reuse(self): 200 o = C() 201 ref1 = weakref.ref(o) 202 # create a proxy to make sure that there's an intervening creation 203 # between these two; it should make no difference 204 proxy = weakref.proxy(o) 205 ref2 = weakref.ref(o) 206 self.assertIs(ref1, ref2, 207 "reference object w/out callback should be re-used") 208 209 o = C() 210 proxy = weakref.proxy(o) 211 ref1 = weakref.ref(o) 212 ref2 = weakref.ref(o) 213 self.assertIs(ref1, ref2, 214 "reference object w/out callback should be re-used") 215 self.assertEqual(weakref.getweakrefcount(o), 2, 216 "wrong weak ref count for object") 217 del proxy 218 self.assertEqual(weakref.getweakrefcount(o), 1, 219 "wrong weak ref count for object after deleting proxy") 220 221 def test_proxy_reuse(self): 222 o = C() 223 proxy1 = weakref.proxy(o) 224 ref = weakref.ref(o) 225 proxy2 = weakref.proxy(o) 226 self.assertIs(proxy1, proxy2, 227 "proxy object w/out callback should have been re-used") 228 229 def test_basic_proxy(self): 230 o = C() 231 self.check_proxy(o, weakref.proxy(o)) 232 233 L = collections.UserList() 234 p = weakref.proxy(L) 235 self.assertFalse(p, "proxy for empty UserList should be false") 236 p.append(12) 237 self.assertEqual(len(L), 1) 238 self.assertTrue(p, "proxy for non-empty UserList should be true") 239 p[:] = [2, 3] 240 self.assertEqual(len(L), 2) 241 self.assertEqual(len(p), 2) 242 self.assertIn(3, p, "proxy didn't support __contains__() properly") 243 p[1] = 5 244 self.assertEqual(L[1], 5) 245 self.assertEqual(p[1], 5) 246 L2 = collections.UserList(L) 247 p2 = weakref.proxy(L2) 248 self.assertEqual(p, p2) 249 ## self.assertEqual(repr(L2), repr(p2)) 250 L3 = collections.UserList(range(10)) 251 p3 = weakref.proxy(L3) 252 self.assertEqual(L3[:], p3[:]) 253 self.assertEqual(L3[5:], p3[5:]) 254 self.assertEqual(L3[:5], p3[:5]) 255 self.assertEqual(L3[2:5], p3[2:5]) 256 257 def test_proxy_unicode(self): 258 # See bug 5037 259 class C(object): 260 def __str__(self): 261 return "string" 262 def __bytes__(self): 263 return b"bytes" 264 instance = C() 265 self.assertIn("__bytes__", dir(weakref.proxy(instance))) 266 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") 267 268 def test_proxy_index(self): 269 class C: 270 def __index__(self): 271 return 10 272 o = C() 273 p = weakref.proxy(o) 274 self.assertEqual(operator.index(p), 10) 275 276 def test_proxy_div(self): 277 class C: 278 def __floordiv__(self, other): 279 return 42 280 def __ifloordiv__(self, other): 281 return 21 282 o = C() 283 p = weakref.proxy(o) 284 self.assertEqual(p // 5, 42) 285 p //= 5 286 self.assertEqual(p, 21) 287 288 def test_proxy_matmul(self): 289 class C: 290 def __matmul__(self, other): 291 return 1729 292 def __rmatmul__(self, other): 293 return -163 294 def __imatmul__(self, other): 295 return 561 296 o = C() 297 p = weakref.proxy(o) 298 self.assertEqual(p @ 5, 1729) 299 self.assertEqual(5 @ p, -163) 300 p @= 5 301 self.assertEqual(p, 561) 302 303 # The PyWeakref_* C API is documented as allowing either NULL or 304 # None as the value for the callback, where either means "no 305 # callback". The "no callback" ref and proxy objects are supposed 306 # to be shared so long as they exist by all callers so long as 307 # they are active. In Python 2.3.3 and earlier, this guarantee 308 # was not honored, and was broken in different ways for 309 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 310 311 def test_shared_ref_without_callback(self): 312 self.check_shared_without_callback(weakref.ref) 313 314 def test_shared_proxy_without_callback(self): 315 self.check_shared_without_callback(weakref.proxy) 316 317 def check_shared_without_callback(self, makeref): 318 o = Object(1) 319 p1 = makeref(o, None) 320 p2 = makeref(o, None) 321 self.assertIs(p1, p2, "both callbacks were None in the C API") 322 del p1, p2 323 p1 = makeref(o) 324 p2 = makeref(o, None) 325 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 326 del p1, p2 327 p1 = makeref(o) 328 p2 = makeref(o) 329 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 330 del p1, p2 331 p1 = makeref(o, None) 332 p2 = makeref(o) 333 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 334 335 def test_callable_proxy(self): 336 o = Callable() 337 ref1 = weakref.proxy(o) 338 339 self.check_proxy(o, ref1) 340 341 self.assertIs(type(ref1), weakref.CallableProxyType, 342 "proxy is not of callable type") 343 ref1('twinkies!') 344 self.assertEqual(o.bar, 'twinkies!', 345 "call through proxy not passed through to original") 346 ref1(x='Splat.') 347 self.assertEqual(o.bar, 'Splat.', 348 "call through proxy not passed through to original") 349 350 # expect due to too few args 351 self.assertRaises(TypeError, ref1) 352 353 # expect due to too many args 354 self.assertRaises(TypeError, ref1, 1, 2, 3) 355 356 def check_proxy(self, o, proxy): 357 o.foo = 1 358 self.assertEqual(proxy.foo, 1, 359 "proxy does not reflect attribute addition") 360 o.foo = 2 361 self.assertEqual(proxy.foo, 2, 362 "proxy does not reflect attribute modification") 363 del o.foo 364 self.assertFalse(hasattr(proxy, 'foo'), 365 "proxy does not reflect attribute removal") 366 367 proxy.foo = 1 368 self.assertEqual(o.foo, 1, 369 "object does not reflect attribute addition via proxy") 370 proxy.foo = 2 371 self.assertEqual(o.foo, 2, 372 "object does not reflect attribute modification via proxy") 373 del proxy.foo 374 self.assertFalse(hasattr(o, 'foo'), 375 "object does not reflect attribute removal via proxy") 376 377 def test_proxy_deletion(self): 378 # Test clearing of SF bug #762891 379 class Foo: 380 result = None 381 def __delitem__(self, accessor): 382 self.result = accessor 383 g = Foo() 384 f = weakref.proxy(g) 385 del f[0] 386 self.assertEqual(f.result, 0) 387 388 def test_proxy_bool(self): 389 # Test clearing of SF bug #1170766 390 class List(list): pass 391 lyst = List() 392 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 393 394 def test_proxy_iter(self): 395 # Test fails with a debug build of the interpreter 396 # (see bpo-38395). 397 398 obj = None 399 400 class MyObj: 401 def __iter__(self): 402 nonlocal obj 403 del obj 404 return NotImplemented 405 406 obj = MyObj() 407 p = weakref.proxy(obj) 408 with self.assertRaises(TypeError): 409 # "blech" in p calls MyObj.__iter__ through the proxy, 410 # without keeping a reference to the real object, so it 411 # can be killed in the middle of the call 412 "blech" in p 413 414 def test_proxy_reversed(self): 415 class MyObj: 416 def __len__(self): 417 return 3 418 def __reversed__(self): 419 return iter('cba') 420 421 obj = MyObj() 422 self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba") 423 424 def test_proxy_hash(self): 425 cool_hash = 299_792_458 426 427 class MyObj: 428 def __hash__(self): 429 return cool_hash 430 431 obj = MyObj() 432 self.assertEqual(hash(weakref.proxy(obj)), cool_hash) 433 434 def test_getweakrefcount(self): 435 o = C() 436 ref1 = weakref.ref(o) 437 ref2 = weakref.ref(o, self.callback) 438 self.assertEqual(weakref.getweakrefcount(o), 2, 439 "got wrong number of weak reference objects") 440 441 proxy1 = weakref.proxy(o) 442 proxy2 = weakref.proxy(o, self.callback) 443 self.assertEqual(weakref.getweakrefcount(o), 4, 444 "got wrong number of weak reference objects") 445 446 del ref1, ref2, proxy1, proxy2 447 self.assertEqual(weakref.getweakrefcount(o), 0, 448 "weak reference objects not unlinked from" 449 " referent when discarded.") 450 451 # assumes ints do not support weakrefs 452 self.assertEqual(weakref.getweakrefcount(1), 0, 453 "got wrong number of weak reference objects for int") 454 455 def test_getweakrefs(self): 456 o = C() 457 ref1 = weakref.ref(o, self.callback) 458 ref2 = weakref.ref(o, self.callback) 459 del ref1 460 self.assertEqual(weakref.getweakrefs(o), [ref2], 461 "list of refs does not match") 462 463 o = C() 464 ref1 = weakref.ref(o, self.callback) 465 ref2 = weakref.ref(o, self.callback) 466 del ref2 467 self.assertEqual(weakref.getweakrefs(o), [ref1], 468 "list of refs does not match") 469 470 del ref1 471 self.assertEqual(weakref.getweakrefs(o), [], 472 "list of refs not cleared") 473 474 # assumes ints do not support weakrefs 475 self.assertEqual(weakref.getweakrefs(1), [], 476 "list of refs does not match for int") 477 478 def test_newstyle_number_ops(self): 479 class F(float): 480 pass 481 f = F(2.0) 482 p = weakref.proxy(f) 483 self.assertEqual(p + 1.0, 3.0) 484 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 485 486 def test_callbacks_protected(self): 487 # Callbacks protected from already-set exceptions? 488 # Regression test for SF bug #478534. 489 class BogusError(Exception): 490 pass 491 data = {} 492 def remove(k): 493 del data[k] 494 def encapsulate(): 495 f = lambda : () 496 data[weakref.ref(f, remove)] = None 497 raise BogusError 498 try: 499 encapsulate() 500 except BogusError: 501 pass 502 else: 503 self.fail("exception not properly restored") 504 try: 505 encapsulate() 506 except BogusError: 507 pass 508 else: 509 self.fail("exception not properly restored") 510 511 def test_sf_bug_840829(self): 512 # "weakref callbacks and gc corrupt memory" 513 # subtype_dealloc erroneously exposed a new-style instance 514 # already in the process of getting deallocated to gc, 515 # causing double-deallocation if the instance had a weakref 516 # callback that triggered gc. 517 # If the bug exists, there probably won't be an obvious symptom 518 # in a release build. In a debug build, a segfault will occur 519 # when the second attempt to remove the instance from the "list 520 # of all objects" occurs. 521 522 import gc 523 524 class C(object): 525 pass 526 527 c = C() 528 wr = weakref.ref(c, lambda ignore: gc.collect()) 529 del c 530 531 # There endeth the first part. It gets worse. 532 del wr 533 534 c1 = C() 535 c1.i = C() 536 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 537 538 c2 = C() 539 c2.c1 = c1 540 del c1 # still alive because c2 points to it 541 542 # Now when subtype_dealloc gets called on c2, it's not enough just 543 # that c2 is immune from gc while the weakref callbacks associated 544 # with c2 execute (there are none in this 2nd half of the test, btw). 545 # subtype_dealloc goes on to call the base classes' deallocs too, 546 # so any gc triggered by weakref callbacks associated with anything 547 # torn down by a base class dealloc can also trigger double 548 # deallocation of c2. 549 del c2 550 551 def test_callback_in_cycle_1(self): 552 import gc 553 554 class J(object): 555 pass 556 557 class II(object): 558 def acallback(self, ignore): 559 self.J 560 561 I = II() 562 I.J = J 563 I.wr = weakref.ref(J, I.acallback) 564 565 # Now J and II are each in a self-cycle (as all new-style class 566 # objects are, since their __mro__ points back to them). I holds 567 # both a weak reference (I.wr) and a strong reference (I.J) to class 568 # J. I is also in a cycle (I.wr points to a weakref that references 569 # I.acallback). When we del these three, they all become trash, but 570 # the cycles prevent any of them from getting cleaned up immediately. 571 # Instead they have to wait for cyclic gc to deduce that they're 572 # trash. 573 # 574 # gc used to call tp_clear on all of them, and the order in which 575 # it does that is pretty accidental. The exact order in which we 576 # built up these things manages to provoke gc into running tp_clear 577 # in just the right order (I last). Calling tp_clear on II leaves 578 # behind an insane class object (its __mro__ becomes NULL). Calling 579 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 580 # just then because of the strong reference from I.J. Calling 581 # tp_clear on I starts to clear I's __dict__, and just happens to 582 # clear I.J first -- I.wr is still intact. That removes the last 583 # reference to J, which triggers the weakref callback. The callback 584 # tries to do "self.J", and instances of new-style classes look up 585 # attributes ("J") in the class dict first. The class (II) wants to 586 # search II.__mro__, but that's NULL. The result was a segfault in 587 # a release build, and an assert failure in a debug build. 588 del I, J, II 589 gc.collect() 590 591 def test_callback_in_cycle_2(self): 592 import gc 593 594 # This is just like test_callback_in_cycle_1, except that II is an 595 # old-style class. The symptom is different then: an instance of an 596 # old-style class looks in its own __dict__ first. 'J' happens to 597 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 598 # __dict__, so the attribute isn't found. The difference is that 599 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 600 # __mro__), so no segfault occurs. Instead it got: 601 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 602 # Exception exceptions.AttributeError: 603 # "II instance has no attribute 'J'" in <bound method II.acallback 604 # of <?.II instance at 0x00B9B4B8>> ignored 605 606 class J(object): 607 pass 608 609 class II: 610 def acallback(self, ignore): 611 self.J 612 613 I = II() 614 I.J = J 615 I.wr = weakref.ref(J, I.acallback) 616 617 del I, J, II 618 gc.collect() 619 620 def test_callback_in_cycle_3(self): 621 import gc 622 623 # This one broke the first patch that fixed the last two. In this 624 # case, the objects reachable from the callback aren't also reachable 625 # from the object (c1) *triggering* the callback: you can get to 626 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 627 # got tp_clear'ed by the time the c2.cb callback got invoked. 628 629 class C: 630 def cb(self, ignore): 631 self.me 632 self.c1 633 self.wr 634 635 c1, c2 = C(), C() 636 637 c2.me = c2 638 c2.c1 = c1 639 c2.wr = weakref.ref(c1, c2.cb) 640 641 del c1, c2 642 gc.collect() 643 644 def test_callback_in_cycle_4(self): 645 import gc 646 647 # Like test_callback_in_cycle_3, except c2 and c1 have different 648 # classes. c2's class (C) isn't reachable from c1 then, so protecting 649 # objects reachable from the dying object (c1) isn't enough to stop 650 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 651 # The result was a segfault (C.__mro__ was NULL when the callback 652 # tried to look up self.me). 653 654 class C(object): 655 def cb(self, ignore): 656 self.me 657 self.c1 658 self.wr 659 660 class D: 661 pass 662 663 c1, c2 = D(), C() 664 665 c2.me = c2 666 c2.c1 = c1 667 c2.wr = weakref.ref(c1, c2.cb) 668 669 del c1, c2, C, D 670 gc.collect() 671 672 def test_callback_in_cycle_resurrection(self): 673 import gc 674 675 # Do something nasty in a weakref callback: resurrect objects 676 # from dead cycles. For this to be attempted, the weakref and 677 # its callback must also be part of the cyclic trash (else the 678 # objects reachable via the callback couldn't be in cyclic trash 679 # to begin with -- the callback would act like an external root). 680 # But gc clears trash weakrefs with callbacks early now, which 681 # disables the callbacks, so the callbacks shouldn't get called 682 # at all (and so nothing actually gets resurrected). 683 684 alist = [] 685 class C(object): 686 def __init__(self, value): 687 self.attribute = value 688 689 def acallback(self, ignore): 690 alist.append(self.c) 691 692 c1, c2 = C(1), C(2) 693 c1.c = c2 694 c2.c = c1 695 c1.wr = weakref.ref(c2, c1.acallback) 696 c2.wr = weakref.ref(c1, c2.acallback) 697 698 def C_went_away(ignore): 699 alist.append("C went away") 700 wr = weakref.ref(C, C_went_away) 701 702 del c1, c2, C # make them all trash 703 self.assertEqual(alist, []) # del isn't enough to reclaim anything 704 705 gc.collect() 706 # c1.wr and c2.wr were part of the cyclic trash, so should have 707 # been cleared without their callbacks executing. OTOH, the weakref 708 # to C is bound to a function local (wr), and wasn't trash, so that 709 # callback should have been invoked when C went away. 710 self.assertEqual(alist, ["C went away"]) 711 # The remaining weakref should be dead now (its callback ran). 712 self.assertEqual(wr(), None) 713 714 del alist[:] 715 gc.collect() 716 self.assertEqual(alist, []) 717 718 def test_callbacks_on_callback(self): 719 import gc 720 721 # Set up weakref callbacks *on* weakref callbacks. 722 alist = [] 723 def safe_callback(ignore): 724 alist.append("safe_callback called") 725 726 class C(object): 727 def cb(self, ignore): 728 alist.append("cb called") 729 730 c, d = C(), C() 731 c.other = d 732 d.other = c 733 callback = c.cb 734 c.wr = weakref.ref(d, callback) # this won't trigger 735 d.wr = weakref.ref(callback, d.cb) # ditto 736 external_wr = weakref.ref(callback, safe_callback) # but this will 737 self.assertIs(external_wr(), callback) 738 739 # The weakrefs attached to c and d should get cleared, so that 740 # C.cb is never called. But external_wr isn't part of the cyclic 741 # trash, and no cyclic trash is reachable from it, so safe_callback 742 # should get invoked when the bound method object callback (c.cb) 743 # -- which is itself a callback, and also part of the cyclic trash -- 744 # gets reclaimed at the end of gc. 745 746 del callback, c, d, C 747 self.assertEqual(alist, []) # del isn't enough to clean up cycles 748 gc.collect() 749 self.assertEqual(alist, ["safe_callback called"]) 750 self.assertEqual(external_wr(), None) 751 752 del alist[:] 753 gc.collect() 754 self.assertEqual(alist, []) 755 756 def test_gc_during_ref_creation(self): 757 self.check_gc_during_creation(weakref.ref) 758 759 def test_gc_during_proxy_creation(self): 760 self.check_gc_during_creation(weakref.proxy) 761 762 def check_gc_during_creation(self, makeref): 763 thresholds = gc.get_threshold() 764 gc.set_threshold(1, 1, 1) 765 gc.collect() 766 class A: 767 pass 768 769 def callback(*args): 770 pass 771 772 referenced = A() 773 774 a = A() 775 a.a = a 776 a.wr = makeref(referenced) 777 778 try: 779 # now make sure the object and the ref get labeled as 780 # cyclic trash: 781 a = A() 782 weakref.ref(referenced, callback) 783 784 finally: 785 gc.set_threshold(*thresholds) 786 787 def test_ref_created_during_del(self): 788 # Bug #1377858 789 # A weakref created in an object's __del__() would crash the 790 # interpreter when the weakref was cleaned up since it would refer to 791 # non-existent memory. This test should not segfault the interpreter. 792 class Target(object): 793 def __del__(self): 794 global ref_from_del 795 ref_from_del = weakref.ref(self) 796 797 w = Target() 798 799 def test_init(self): 800 # Issue 3634 801 # <weakref to class>.__init__() doesn't check errors correctly 802 r = weakref.ref(Exception) 803 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 804 # No exception should be raised here 805 gc.collect() 806 807 def test_classes(self): 808 # Check that classes are weakrefable. 809 class A(object): 810 pass 811 l = [] 812 weakref.ref(int) 813 a = weakref.ref(A, l.append) 814 A = None 815 gc.collect() 816 self.assertEqual(a(), None) 817 self.assertEqual(l, [a]) 818 819 def test_equality(self): 820 # Alive weakrefs defer equality testing to their underlying object. 821 x = Object(1) 822 y = Object(1) 823 z = Object(2) 824 a = weakref.ref(x) 825 b = weakref.ref(y) 826 c = weakref.ref(z) 827 d = weakref.ref(x) 828 # Note how we directly test the operators here, to stress both 829 # __eq__ and __ne__. 830 self.assertTrue(a == b) 831 self.assertFalse(a != b) 832 self.assertFalse(a == c) 833 self.assertTrue(a != c) 834 self.assertTrue(a == d) 835 self.assertFalse(a != d) 836 self.assertFalse(a == x) 837 self.assertTrue(a != x) 838 self.assertTrue(a == ALWAYS_EQ) 839 self.assertFalse(a != ALWAYS_EQ) 840 del x, y, z 841 gc.collect() 842 for r in a, b, c: 843 # Sanity check 844 self.assertIs(r(), None) 845 # Dead weakrefs compare by identity: whether `a` and `d` are the 846 # same weakref object is an implementation detail, since they pointed 847 # to the same original object and didn't have a callback. 848 # (see issue #16453). 849 self.assertFalse(a == b) 850 self.assertTrue(a != b) 851 self.assertFalse(a == c) 852 self.assertTrue(a != c) 853 self.assertEqual(a == d, a is d) 854 self.assertEqual(a != d, a is not d) 855 856 def test_ordering(self): 857 # weakrefs cannot be ordered, even if the underlying objects can. 858 ops = [operator.lt, operator.gt, operator.le, operator.ge] 859 x = Object(1) 860 y = Object(1) 861 a = weakref.ref(x) 862 b = weakref.ref(y) 863 for op in ops: 864 self.assertRaises(TypeError, op, a, b) 865 # Same when dead. 866 del x, y 867 gc.collect() 868 for op in ops: 869 self.assertRaises(TypeError, op, a, b) 870 871 def test_hashing(self): 872 # Alive weakrefs hash the same as the underlying object 873 x = Object(42) 874 y = Object(42) 875 a = weakref.ref(x) 876 b = weakref.ref(y) 877 self.assertEqual(hash(a), hash(42)) 878 del x, y 879 gc.collect() 880 # Dead weakrefs: 881 # - retain their hash is they were hashed when alive; 882 # - otherwise, cannot be hashed. 883 self.assertEqual(hash(a), hash(42)) 884 self.assertRaises(TypeError, hash, b) 885 886 def test_trashcan_16602(self): 887 # Issue #16602: when a weakref's target was part of a long 888 # deallocation chain, the trashcan mechanism could delay clearing 889 # of the weakref and make the target object visible from outside 890 # code even though its refcount had dropped to 0. A crash ensued. 891 class C: 892 def __init__(self, parent): 893 if not parent: 894 return 895 wself = weakref.ref(self) 896 def cb(wparent): 897 o = wself() 898 self.wparent = weakref.ref(parent, cb) 899 900 d = weakref.WeakKeyDictionary() 901 root = c = C(None) 902 for n in range(100): 903 d[c] = c = C(c) 904 del root 905 gc.collect() 906 907 def test_callback_attribute(self): 908 x = Object(1) 909 callback = lambda ref: None 910 ref1 = weakref.ref(x, callback) 911 self.assertIs(ref1.__callback__, callback) 912 913 ref2 = weakref.ref(x) 914 self.assertIsNone(ref2.__callback__) 915 916 def test_callback_attribute_after_deletion(self): 917 x = Object(1) 918 ref = weakref.ref(x, self.callback) 919 self.assertIsNotNone(ref.__callback__) 920 del x 921 support.gc_collect() 922 self.assertIsNone(ref.__callback__) 923 924 def test_set_callback_attribute(self): 925 x = Object(1) 926 callback = lambda ref: None 927 ref1 = weakref.ref(x, callback) 928 with self.assertRaises(AttributeError): 929 ref1.__callback__ = lambda ref: None 930 931 def test_callback_gcs(self): 932 class ObjectWithDel(Object): 933 def __del__(self): pass 934 x = ObjectWithDel(1) 935 ref1 = weakref.ref(x, lambda ref: support.gc_collect()) 936 del x 937 support.gc_collect() 938 939 940class SubclassableWeakrefTestCase(TestBase): 941 942 def test_subclass_refs(self): 943 class MyRef(weakref.ref): 944 def __init__(self, ob, callback=None, value=42): 945 self.value = value 946 super().__init__(ob, callback) 947 def __call__(self): 948 self.called = True 949 return super().__call__() 950 o = Object("foo") 951 mr = MyRef(o, value=24) 952 self.assertIs(mr(), o) 953 self.assertTrue(mr.called) 954 self.assertEqual(mr.value, 24) 955 del o 956 self.assertIsNone(mr()) 957 self.assertTrue(mr.called) 958 959 def test_subclass_refs_dont_replace_standard_refs(self): 960 class MyRef(weakref.ref): 961 pass 962 o = Object(42) 963 r1 = MyRef(o) 964 r2 = weakref.ref(o) 965 self.assertIsNot(r1, r2) 966 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 967 self.assertEqual(weakref.getweakrefcount(o), 2) 968 r3 = MyRef(o) 969 self.assertEqual(weakref.getweakrefcount(o), 3) 970 refs = weakref.getweakrefs(o) 971 self.assertEqual(len(refs), 3) 972 self.assertIs(r2, refs[0]) 973 self.assertIn(r1, refs[1:]) 974 self.assertIn(r3, refs[1:]) 975 976 def test_subclass_refs_dont_conflate_callbacks(self): 977 class MyRef(weakref.ref): 978 pass 979 o = Object(42) 980 r1 = MyRef(o, id) 981 r2 = MyRef(o, str) 982 self.assertIsNot(r1, r2) 983 refs = weakref.getweakrefs(o) 984 self.assertIn(r1, refs) 985 self.assertIn(r2, refs) 986 987 def test_subclass_refs_with_slots(self): 988 class MyRef(weakref.ref): 989 __slots__ = "slot1", "slot2" 990 def __new__(type, ob, callback, slot1, slot2): 991 return weakref.ref.__new__(type, ob, callback) 992 def __init__(self, ob, callback, slot1, slot2): 993 self.slot1 = slot1 994 self.slot2 = slot2 995 def meth(self): 996 return self.slot1 + self.slot2 997 o = Object(42) 998 r = MyRef(o, None, "abc", "def") 999 self.assertEqual(r.slot1, "abc") 1000 self.assertEqual(r.slot2, "def") 1001 self.assertEqual(r.meth(), "abcdef") 1002 self.assertFalse(hasattr(r, "__dict__")) 1003 1004 def test_subclass_refs_with_cycle(self): 1005 """Confirm https://bugs.python.org/issue3100 is fixed.""" 1006 # An instance of a weakref subclass can have attributes. 1007 # If such a weakref holds the only strong reference to the object, 1008 # deleting the weakref will delete the object. In this case, 1009 # the callback must not be called, because the ref object is 1010 # being deleted. 1011 class MyRef(weakref.ref): 1012 pass 1013 1014 # Use a local callback, for "regrtest -R::" 1015 # to detect refcounting problems 1016 def callback(w): 1017 self.cbcalled += 1 1018 1019 o = C() 1020 r1 = MyRef(o, callback) 1021 r1.o = o 1022 del o 1023 1024 del r1 # Used to crash here 1025 1026 self.assertEqual(self.cbcalled, 0) 1027 1028 # Same test, with two weakrefs to the same object 1029 # (since code paths are different) 1030 o = C() 1031 r1 = MyRef(o, callback) 1032 r2 = MyRef(o, callback) 1033 r1.r = r2 1034 r2.o = o 1035 del o 1036 del r2 1037 1038 del r1 # Used to crash here 1039 1040 self.assertEqual(self.cbcalled, 0) 1041 1042 1043class WeakMethodTestCase(unittest.TestCase): 1044 1045 def _subclass(self): 1046 """Return an Object subclass overriding `some_method`.""" 1047 class C(Object): 1048 def some_method(self): 1049 return 6 1050 return C 1051 1052 def test_alive(self): 1053 o = Object(1) 1054 r = weakref.WeakMethod(o.some_method) 1055 self.assertIsInstance(r, weakref.ReferenceType) 1056 self.assertIsInstance(r(), type(o.some_method)) 1057 self.assertIs(r().__self__, o) 1058 self.assertIs(r().__func__, o.some_method.__func__) 1059 self.assertEqual(r()(), 4) 1060 1061 def test_object_dead(self): 1062 o = Object(1) 1063 r = weakref.WeakMethod(o.some_method) 1064 del o 1065 gc.collect() 1066 self.assertIs(r(), None) 1067 1068 def test_method_dead(self): 1069 C = self._subclass() 1070 o = C(1) 1071 r = weakref.WeakMethod(o.some_method) 1072 del C.some_method 1073 gc.collect() 1074 self.assertIs(r(), None) 1075 1076 def test_callback_when_object_dead(self): 1077 # Test callback behaviour when object dies first. 1078 C = self._subclass() 1079 calls = [] 1080 def cb(arg): 1081 calls.append(arg) 1082 o = C(1) 1083 r = weakref.WeakMethod(o.some_method, cb) 1084 del o 1085 gc.collect() 1086 self.assertEqual(calls, [r]) 1087 # Callback is only called once. 1088 C.some_method = Object.some_method 1089 gc.collect() 1090 self.assertEqual(calls, [r]) 1091 1092 def test_callback_when_method_dead(self): 1093 # Test callback behaviour when method dies first. 1094 C = self._subclass() 1095 calls = [] 1096 def cb(arg): 1097 calls.append(arg) 1098 o = C(1) 1099 r = weakref.WeakMethod(o.some_method, cb) 1100 del C.some_method 1101 gc.collect() 1102 self.assertEqual(calls, [r]) 1103 # Callback is only called once. 1104 del o 1105 gc.collect() 1106 self.assertEqual(calls, [r]) 1107 1108 @support.cpython_only 1109 def test_no_cycles(self): 1110 # A WeakMethod doesn't create any reference cycle to itself. 1111 o = Object(1) 1112 def cb(_): 1113 pass 1114 r = weakref.WeakMethod(o.some_method, cb) 1115 wr = weakref.ref(r) 1116 del r 1117 self.assertIs(wr(), None) 1118 1119 def test_equality(self): 1120 def _eq(a, b): 1121 self.assertTrue(a == b) 1122 self.assertFalse(a != b) 1123 def _ne(a, b): 1124 self.assertTrue(a != b) 1125 self.assertFalse(a == b) 1126 x = Object(1) 1127 y = Object(1) 1128 a = weakref.WeakMethod(x.some_method) 1129 b = weakref.WeakMethod(y.some_method) 1130 c = weakref.WeakMethod(x.other_method) 1131 d = weakref.WeakMethod(y.other_method) 1132 # Objects equal, same method 1133 _eq(a, b) 1134 _eq(c, d) 1135 # Objects equal, different method 1136 _ne(a, c) 1137 _ne(a, d) 1138 _ne(b, c) 1139 _ne(b, d) 1140 # Objects unequal, same or different method 1141 z = Object(2) 1142 e = weakref.WeakMethod(z.some_method) 1143 f = weakref.WeakMethod(z.other_method) 1144 _ne(a, e) 1145 _ne(a, f) 1146 _ne(b, e) 1147 _ne(b, f) 1148 # Compare with different types 1149 _ne(a, x.some_method) 1150 _eq(a, ALWAYS_EQ) 1151 del x, y, z 1152 gc.collect() 1153 # Dead WeakMethods compare by identity 1154 refs = a, b, c, d, e, f 1155 for q in refs: 1156 for r in refs: 1157 self.assertEqual(q == r, q is r) 1158 self.assertEqual(q != r, q is not r) 1159 1160 def test_hashing(self): 1161 # Alive WeakMethods are hashable if the underlying object is 1162 # hashable. 1163 x = Object(1) 1164 y = Object(1) 1165 a = weakref.WeakMethod(x.some_method) 1166 b = weakref.WeakMethod(y.some_method) 1167 c = weakref.WeakMethod(y.other_method) 1168 # Since WeakMethod objects are equal, the hashes should be equal. 1169 self.assertEqual(hash(a), hash(b)) 1170 ha = hash(a) 1171 # Dead WeakMethods retain their old hash value 1172 del x, y 1173 gc.collect() 1174 self.assertEqual(hash(a), ha) 1175 self.assertEqual(hash(b), ha) 1176 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. 1177 self.assertRaises(TypeError, hash, c) 1178 1179 1180class MappingTestCase(TestBase): 1181 1182 COUNT = 10 1183 1184 def check_len_cycles(self, dict_type, cons): 1185 N = 20 1186 items = [RefCycle() for i in range(N)] 1187 dct = dict_type(cons(o) for o in items) 1188 # Keep an iterator alive 1189 it = dct.items() 1190 try: 1191 next(it) 1192 except StopIteration: 1193 pass 1194 del items 1195 gc.collect() 1196 n1 = len(dct) 1197 del it 1198 gc.collect() 1199 n2 = len(dct) 1200 # one item may be kept alive inside the iterator 1201 self.assertIn(n1, (0, 1)) 1202 self.assertEqual(n2, 0) 1203 1204 def test_weak_keyed_len_cycles(self): 1205 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1206 1207 def test_weak_valued_len_cycles(self): 1208 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 1209 1210 def check_len_race(self, dict_type, cons): 1211 # Extended sanity checks for len() in the face of cyclic collection 1212 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 1213 for th in range(1, 100): 1214 N = 20 1215 gc.collect(0) 1216 gc.set_threshold(th, th, th) 1217 items = [RefCycle() for i in range(N)] 1218 dct = dict_type(cons(o) for o in items) 1219 del items 1220 # All items will be collected at next garbage collection pass 1221 it = dct.items() 1222 try: 1223 next(it) 1224 except StopIteration: 1225 pass 1226 n1 = len(dct) 1227 del it 1228 n2 = len(dct) 1229 self.assertGreaterEqual(n1, 0) 1230 self.assertLessEqual(n1, N) 1231 self.assertGreaterEqual(n2, 0) 1232 self.assertLessEqual(n2, n1) 1233 1234 def test_weak_keyed_len_race(self): 1235 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1236 1237 def test_weak_valued_len_race(self): 1238 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 1239 1240 def test_weak_values(self): 1241 # 1242 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1243 # 1244 dict, objects = self.make_weak_valued_dict() 1245 for o in objects: 1246 self.assertEqual(weakref.getweakrefcount(o), 1) 1247 self.assertIs(o, dict[o.arg], 1248 "wrong object returned by weak dict!") 1249 items1 = list(dict.items()) 1250 items2 = list(dict.copy().items()) 1251 items1.sort() 1252 items2.sort() 1253 self.assertEqual(items1, items2, 1254 "cloning of weak-valued dictionary did not work!") 1255 del items1, items2 1256 self.assertEqual(len(dict), self.COUNT) 1257 del objects[0] 1258 self.assertEqual(len(dict), self.COUNT - 1, 1259 "deleting object did not cause dictionary update") 1260 del objects, o 1261 self.assertEqual(len(dict), 0, 1262 "deleting the values did not clear the dictionary") 1263 # regression on SF bug #447152: 1264 dict = weakref.WeakValueDictionary() 1265 self.assertRaises(KeyError, dict.__getitem__, 1) 1266 dict[2] = C() 1267 self.assertRaises(KeyError, dict.__getitem__, 2) 1268 1269 def test_weak_keys(self): 1270 # 1271 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1272 # len(d), k in d. 1273 # 1274 dict, objects = self.make_weak_keyed_dict() 1275 for o in objects: 1276 self.assertEqual(weakref.getweakrefcount(o), 1, 1277 "wrong number of weak references to %r!" % o) 1278 self.assertIs(o.arg, dict[o], 1279 "wrong object returned by weak dict!") 1280 items1 = dict.items() 1281 items2 = dict.copy().items() 1282 self.assertEqual(set(items1), set(items2), 1283 "cloning of weak-keyed dictionary did not work!") 1284 del items1, items2 1285 self.assertEqual(len(dict), self.COUNT) 1286 del objects[0] 1287 self.assertEqual(len(dict), (self.COUNT - 1), 1288 "deleting object did not cause dictionary update") 1289 del objects, o 1290 self.assertEqual(len(dict), 0, 1291 "deleting the keys did not clear the dictionary") 1292 o = Object(42) 1293 dict[o] = "What is the meaning of the universe?" 1294 self.assertIn(o, dict) 1295 self.assertNotIn(34, dict) 1296 1297 def test_weak_keyed_iters(self): 1298 dict, objects = self.make_weak_keyed_dict() 1299 self.check_iters(dict) 1300 1301 # Test keyrefs() 1302 refs = dict.keyrefs() 1303 self.assertEqual(len(refs), len(objects)) 1304 objects2 = list(objects) 1305 for wr in refs: 1306 ob = wr() 1307 self.assertIn(ob, dict) 1308 self.assertIn(ob, dict) 1309 self.assertEqual(ob.arg, dict[ob]) 1310 objects2.remove(ob) 1311 self.assertEqual(len(objects2), 0) 1312 1313 # Test iterkeyrefs() 1314 objects2 = list(objects) 1315 self.assertEqual(len(list(dict.keyrefs())), len(objects)) 1316 for wr in dict.keyrefs(): 1317 ob = wr() 1318 self.assertIn(ob, dict) 1319 self.assertIn(ob, dict) 1320 self.assertEqual(ob.arg, dict[ob]) 1321 objects2.remove(ob) 1322 self.assertEqual(len(objects2), 0) 1323 1324 def test_weak_valued_iters(self): 1325 dict, objects = self.make_weak_valued_dict() 1326 self.check_iters(dict) 1327 1328 # Test valuerefs() 1329 refs = dict.valuerefs() 1330 self.assertEqual(len(refs), len(objects)) 1331 objects2 = list(objects) 1332 for wr in refs: 1333 ob = wr() 1334 self.assertEqual(ob, dict[ob.arg]) 1335 self.assertEqual(ob.arg, dict[ob.arg].arg) 1336 objects2.remove(ob) 1337 self.assertEqual(len(objects2), 0) 1338 1339 # Test itervaluerefs() 1340 objects2 = list(objects) 1341 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1342 for wr in dict.itervaluerefs(): 1343 ob = wr() 1344 self.assertEqual(ob, dict[ob.arg]) 1345 self.assertEqual(ob.arg, dict[ob.arg].arg) 1346 objects2.remove(ob) 1347 self.assertEqual(len(objects2), 0) 1348 1349 def check_iters(self, dict): 1350 # item iterator: 1351 items = list(dict.items()) 1352 for item in dict.items(): 1353 items.remove(item) 1354 self.assertFalse(items, "items() did not touch all items") 1355 1356 # key iterator, via __iter__(): 1357 keys = list(dict.keys()) 1358 for k in dict: 1359 keys.remove(k) 1360 self.assertFalse(keys, "__iter__() did not touch all keys") 1361 1362 # key iterator, via iterkeys(): 1363 keys = list(dict.keys()) 1364 for k in dict.keys(): 1365 keys.remove(k) 1366 self.assertFalse(keys, "iterkeys() did not touch all keys") 1367 1368 # value iterator: 1369 values = list(dict.values()) 1370 for v in dict.values(): 1371 values.remove(v) 1372 self.assertFalse(values, 1373 "itervalues() did not touch all values") 1374 1375 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1376 n = len(dict) 1377 it = iter(getattr(dict, iter_name)()) 1378 next(it) # Trigger internal iteration 1379 # Destroy an object 1380 del objects[-1] 1381 gc.collect() # just in case 1382 # We have removed either the first consumed object, or another one 1383 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1384 del it 1385 # The removal has been committed 1386 self.assertEqual(len(dict), n - 1) 1387 1388 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1389 # Check that we can explicitly mutate the weak dict without 1390 # interfering with delayed removal. 1391 # `testcontext` should create an iterator, destroy one of the 1392 # weakref'ed objects and then return a new key/value pair corresponding 1393 # to the destroyed object. 1394 with testcontext() as (k, v): 1395 self.assertNotIn(k, dict) 1396 with testcontext() as (k, v): 1397 self.assertRaises(KeyError, dict.__delitem__, k) 1398 self.assertNotIn(k, dict) 1399 with testcontext() as (k, v): 1400 self.assertRaises(KeyError, dict.pop, k) 1401 self.assertNotIn(k, dict) 1402 with testcontext() as (k, v): 1403 dict[k] = v 1404 self.assertEqual(dict[k], v) 1405 ddict = copy.copy(dict) 1406 with testcontext() as (k, v): 1407 dict.update(ddict) 1408 self.assertEqual(dict, ddict) 1409 with testcontext() as (k, v): 1410 dict.clear() 1411 self.assertEqual(len(dict), 0) 1412 1413 def check_weak_del_and_len_while_iterating(self, dict, testcontext): 1414 # Check that len() works when both iterating and removing keys 1415 # explicitly through various means (.pop(), .clear()...), while 1416 # implicit mutation is deferred because an iterator is alive. 1417 # (each call to testcontext() should schedule one item for removal 1418 # for this test to work properly) 1419 o = Object(123456) 1420 with testcontext(): 1421 n = len(dict) 1422 # Since underlaying dict is ordered, first item is popped 1423 dict.pop(next(dict.keys())) 1424 self.assertEqual(len(dict), n - 1) 1425 dict[o] = o 1426 self.assertEqual(len(dict), n) 1427 # last item in objects is removed from dict in context shutdown 1428 with testcontext(): 1429 self.assertEqual(len(dict), n - 1) 1430 # Then, (o, o) is popped 1431 dict.popitem() 1432 self.assertEqual(len(dict), n - 2) 1433 with testcontext(): 1434 self.assertEqual(len(dict), n - 3) 1435 del dict[next(dict.keys())] 1436 self.assertEqual(len(dict), n - 4) 1437 with testcontext(): 1438 self.assertEqual(len(dict), n - 5) 1439 dict.popitem() 1440 self.assertEqual(len(dict), n - 6) 1441 with testcontext(): 1442 dict.clear() 1443 self.assertEqual(len(dict), 0) 1444 self.assertEqual(len(dict), 0) 1445 1446 def test_weak_keys_destroy_while_iterating(self): 1447 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1448 dict, objects = self.make_weak_keyed_dict() 1449 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1450 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1451 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1452 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') 1453 dict, objects = self.make_weak_keyed_dict() 1454 @contextlib.contextmanager 1455 def testcontext(): 1456 try: 1457 it = iter(dict.items()) 1458 next(it) 1459 # Schedule a key/value for removal and recreate it 1460 v = objects.pop().arg 1461 gc.collect() # just in case 1462 yield Object(v), v 1463 finally: 1464 it = None # should commit all removals 1465 gc.collect() 1466 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1467 # Issue #21173: len() fragile when keys are both implicitly and 1468 # explicitly removed. 1469 dict, objects = self.make_weak_keyed_dict() 1470 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1471 1472 def test_weak_values_destroy_while_iterating(self): 1473 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1474 dict, objects = self.make_weak_valued_dict() 1475 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1476 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1477 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1478 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1479 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') 1480 dict, objects = self.make_weak_valued_dict() 1481 @contextlib.contextmanager 1482 def testcontext(): 1483 try: 1484 it = iter(dict.items()) 1485 next(it) 1486 # Schedule a key/value for removal and recreate it 1487 k = objects.pop().arg 1488 gc.collect() # just in case 1489 yield k, Object(k) 1490 finally: 1491 it = None # should commit all removals 1492 gc.collect() 1493 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1494 dict, objects = self.make_weak_valued_dict() 1495 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1496 1497 def test_make_weak_keyed_dict_from_dict(self): 1498 o = Object(3) 1499 dict = weakref.WeakKeyDictionary({o:364}) 1500 self.assertEqual(dict[o], 364) 1501 1502 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1503 o = Object(3) 1504 dict = weakref.WeakKeyDictionary({o:364}) 1505 dict2 = weakref.WeakKeyDictionary(dict) 1506 self.assertEqual(dict[o], 364) 1507 1508 def make_weak_keyed_dict(self): 1509 dict = weakref.WeakKeyDictionary() 1510 objects = list(map(Object, range(self.COUNT))) 1511 for o in objects: 1512 dict[o] = o.arg 1513 return dict, objects 1514 1515 def test_make_weak_valued_dict_from_dict(self): 1516 o = Object(3) 1517 dict = weakref.WeakValueDictionary({364:o}) 1518 self.assertEqual(dict[364], o) 1519 1520 def test_make_weak_valued_dict_from_weak_valued_dict(self): 1521 o = Object(3) 1522 dict = weakref.WeakValueDictionary({364:o}) 1523 dict2 = weakref.WeakValueDictionary(dict) 1524 self.assertEqual(dict[364], o) 1525 1526 def test_make_weak_valued_dict_misc(self): 1527 # errors 1528 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1529 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1530 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1531 # special keyword arguments 1532 o = Object(3) 1533 for kw in 'self', 'dict', 'other', 'iterable': 1534 d = weakref.WeakValueDictionary(**{kw: o}) 1535 self.assertEqual(list(d.keys()), [kw]) 1536 self.assertEqual(d[kw], o) 1537 1538 def make_weak_valued_dict(self): 1539 dict = weakref.WeakValueDictionary() 1540 objects = list(map(Object, range(self.COUNT))) 1541 for o in objects: 1542 dict[o.arg] = o 1543 return dict, objects 1544 1545 def check_popitem(self, klass, key1, value1, key2, value2): 1546 weakdict = klass() 1547 weakdict[key1] = value1 1548 weakdict[key2] = value2 1549 self.assertEqual(len(weakdict), 2) 1550 k, v = weakdict.popitem() 1551 self.assertEqual(len(weakdict), 1) 1552 if k is key1: 1553 self.assertIs(v, value1) 1554 else: 1555 self.assertIs(v, value2) 1556 k, v = weakdict.popitem() 1557 self.assertEqual(len(weakdict), 0) 1558 if k is key1: 1559 self.assertIs(v, value1) 1560 else: 1561 self.assertIs(v, value2) 1562 1563 def test_weak_valued_dict_popitem(self): 1564 self.check_popitem(weakref.WeakValueDictionary, 1565 "key1", C(), "key2", C()) 1566 1567 def test_weak_keyed_dict_popitem(self): 1568 self.check_popitem(weakref.WeakKeyDictionary, 1569 C(), "value 1", C(), "value 2") 1570 1571 def check_setdefault(self, klass, key, value1, value2): 1572 self.assertIsNot(value1, value2, 1573 "invalid test" 1574 " -- value parameters must be distinct objects") 1575 weakdict = klass() 1576 o = weakdict.setdefault(key, value1) 1577 self.assertIs(o, value1) 1578 self.assertIn(key, weakdict) 1579 self.assertIs(weakdict.get(key), value1) 1580 self.assertIs(weakdict[key], value1) 1581 1582 o = weakdict.setdefault(key, value2) 1583 self.assertIs(o, value1) 1584 self.assertIn(key, weakdict) 1585 self.assertIs(weakdict.get(key), value1) 1586 self.assertIs(weakdict[key], value1) 1587 1588 def test_weak_valued_dict_setdefault(self): 1589 self.check_setdefault(weakref.WeakValueDictionary, 1590 "key", C(), C()) 1591 1592 def test_weak_keyed_dict_setdefault(self): 1593 self.check_setdefault(weakref.WeakKeyDictionary, 1594 C(), "value 1", "value 2") 1595 1596 def check_update(self, klass, dict): 1597 # 1598 # This exercises d.update(), len(d), d.keys(), k in d, 1599 # d.get(), d[]. 1600 # 1601 weakdict = klass() 1602 weakdict.update(dict) 1603 self.assertEqual(len(weakdict), len(dict)) 1604 for k in weakdict.keys(): 1605 self.assertIn(k, dict, "mysterious new key appeared in weak dict") 1606 v = dict.get(k) 1607 self.assertIs(v, weakdict[k]) 1608 self.assertIs(v, weakdict.get(k)) 1609 for k in dict.keys(): 1610 self.assertIn(k, weakdict, "original key disappeared in weak dict") 1611 v = dict[k] 1612 self.assertIs(v, weakdict[k]) 1613 self.assertIs(v, weakdict.get(k)) 1614 1615 def test_weak_valued_dict_update(self): 1616 self.check_update(weakref.WeakValueDictionary, 1617 {1: C(), 'a': C(), C(): C()}) 1618 # errors 1619 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1620 d = weakref.WeakValueDictionary() 1621 self.assertRaises(TypeError, d.update, {}, {}) 1622 self.assertRaises(TypeError, d.update, (), ()) 1623 self.assertEqual(list(d.keys()), []) 1624 # special keyword arguments 1625 o = Object(3) 1626 for kw in 'self', 'dict', 'other', 'iterable': 1627 d = weakref.WeakValueDictionary() 1628 d.update(**{kw: o}) 1629 self.assertEqual(list(d.keys()), [kw]) 1630 self.assertEqual(d[kw], o) 1631 1632 def test_weak_valued_union_operators(self): 1633 a = C() 1634 b = C() 1635 c = C() 1636 wvd1 = weakref.WeakValueDictionary({1: a}) 1637 wvd2 = weakref.WeakValueDictionary({1: b, 2: a}) 1638 wvd3 = wvd1.copy() 1639 d1 = {1: c, 3: b} 1640 pairs = [(5, c), (6, b)] 1641 1642 tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries 1643 self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2)) 1644 self.assertIs(type(tmp1), weakref.WeakValueDictionary) 1645 wvd1 |= wvd2 1646 self.assertEqual(wvd1, tmp1) 1647 1648 tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping 1649 self.assertEqual(dict(tmp2), dict(wvd2) | d1) 1650 self.assertIs(type(tmp2), weakref.WeakValueDictionary) 1651 wvd2 |= d1 1652 self.assertEqual(wvd2, tmp2) 1653 1654 tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value 1655 tmp3 |= pairs 1656 self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs)) 1657 self.assertIs(type(tmp3), weakref.WeakValueDictionary) 1658 1659 tmp4 = d1 | wvd3 # Testing .__ror__ 1660 self.assertEqual(dict(tmp4), d1 | dict(wvd3)) 1661 self.assertIs(type(tmp4), weakref.WeakValueDictionary) 1662 1663 del a 1664 self.assertNotIn(2, tmp1) 1665 self.assertNotIn(2, tmp2) 1666 self.assertNotIn(1, tmp3) 1667 self.assertNotIn(1, tmp4) 1668 1669 def test_weak_keyed_dict_update(self): 1670 self.check_update(weakref.WeakKeyDictionary, 1671 {C(): 1, C(): 2, C(): 3}) 1672 1673 def test_weak_keyed_delitem(self): 1674 d = weakref.WeakKeyDictionary() 1675 o1 = Object('1') 1676 o2 = Object('2') 1677 d[o1] = 'something' 1678 d[o2] = 'something' 1679 self.assertEqual(len(d), 2) 1680 del d[o1] 1681 self.assertEqual(len(d), 1) 1682 self.assertEqual(list(d.keys()), [o2]) 1683 1684 def test_weak_keyed_union_operators(self): 1685 o1 = C() 1686 o2 = C() 1687 o3 = C() 1688 wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2}) 1689 wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4}) 1690 wkd3 = wkd1.copy() 1691 d1 = {o2: '5', o3: '6'} 1692 pairs = [(o2, 7), (o3, 8)] 1693 1694 tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries 1695 self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2)) 1696 self.assertIs(type(tmp1), weakref.WeakKeyDictionary) 1697 wkd1 |= wkd2 1698 self.assertEqual(wkd1, tmp1) 1699 1700 tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping 1701 self.assertEqual(dict(tmp2), dict(wkd2) | d1) 1702 self.assertIs(type(tmp2), weakref.WeakKeyDictionary) 1703 wkd2 |= d1 1704 self.assertEqual(wkd2, tmp2) 1705 1706 tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value 1707 tmp3 |= pairs 1708 self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs)) 1709 self.assertIs(type(tmp3), weakref.WeakKeyDictionary) 1710 1711 tmp4 = d1 | wkd3 # Testing .__ror__ 1712 self.assertEqual(dict(tmp4), d1 | dict(wkd3)) 1713 self.assertIs(type(tmp4), weakref.WeakKeyDictionary) 1714 1715 del o1 1716 self.assertNotIn(4, tmp1.values()) 1717 self.assertNotIn(4, tmp2.values()) 1718 self.assertNotIn(1, tmp3.values()) 1719 self.assertNotIn(1, tmp4.values()) 1720 1721 def test_weak_valued_delitem(self): 1722 d = weakref.WeakValueDictionary() 1723 o1 = Object('1') 1724 o2 = Object('2') 1725 d['something'] = o1 1726 d['something else'] = o2 1727 self.assertEqual(len(d), 2) 1728 del d['something'] 1729 self.assertEqual(len(d), 1) 1730 self.assertEqual(list(d.items()), [('something else', o2)]) 1731 1732 def test_weak_keyed_bad_delitem(self): 1733 d = weakref.WeakKeyDictionary() 1734 o = Object('1') 1735 # An attempt to delete an object that isn't there should raise 1736 # KeyError. It didn't before 2.3. 1737 self.assertRaises(KeyError, d.__delitem__, o) 1738 self.assertRaises(KeyError, d.__getitem__, o) 1739 1740 # If a key isn't of a weakly referencable type, __getitem__ and 1741 # __setitem__ raise TypeError. __delitem__ should too. 1742 self.assertRaises(TypeError, d.__delitem__, 13) 1743 self.assertRaises(TypeError, d.__getitem__, 13) 1744 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1745 1746 def test_weak_keyed_cascading_deletes(self): 1747 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1748 # over the keys via self.data.iterkeys(). If things vanished from 1749 # the dict during this (or got added), that caused a RuntimeError. 1750 1751 d = weakref.WeakKeyDictionary() 1752 mutate = False 1753 1754 class C(object): 1755 def __init__(self, i): 1756 self.value = i 1757 def __hash__(self): 1758 return hash(self.value) 1759 def __eq__(self, other): 1760 if mutate: 1761 # Side effect that mutates the dict, by removing the 1762 # last strong reference to a key. 1763 del objs[-1] 1764 return self.value == other.value 1765 1766 objs = [C(i) for i in range(4)] 1767 for o in objs: 1768 d[o] = o.value 1769 del o # now the only strong references to keys are in objs 1770 # Find the order in which iterkeys sees the keys. 1771 objs = list(d.keys()) 1772 # Reverse it, so that the iteration implementation of __delitem__ 1773 # has to keep looping to find the first object we delete. 1774 objs.reverse() 1775 1776 # Turn on mutation in C.__eq__. The first time through the loop, 1777 # under the iterkeys() business the first comparison will delete 1778 # the last item iterkeys() would see, and that causes a 1779 # RuntimeError: dictionary changed size during iteration 1780 # when the iterkeys() loop goes around to try comparing the next 1781 # key. After this was fixed, it just deletes the last object *our* 1782 # "for o in obj" loop would have gotten to. 1783 mutate = True 1784 count = 0 1785 for o in objs: 1786 count += 1 1787 del d[o] 1788 self.assertEqual(len(d), 0) 1789 self.assertEqual(count, 2) 1790 1791 def test_make_weak_valued_dict_repr(self): 1792 dict = weakref.WeakValueDictionary() 1793 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') 1794 1795 def test_make_weak_keyed_dict_repr(self): 1796 dict = weakref.WeakKeyDictionary() 1797 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') 1798 1799 def test_threaded_weak_valued_setdefault(self): 1800 d = weakref.WeakValueDictionary() 1801 with collect_in_thread(): 1802 for i in range(100000): 1803 x = d.setdefault(10, RefCycle()) 1804 self.assertIsNot(x, None) # we never put None in there! 1805 del x 1806 1807 def test_threaded_weak_valued_pop(self): 1808 d = weakref.WeakValueDictionary() 1809 with collect_in_thread(): 1810 for i in range(100000): 1811 d[10] = RefCycle() 1812 x = d.pop(10, 10) 1813 self.assertIsNot(x, None) # we never put None in there! 1814 1815 def test_threaded_weak_valued_consistency(self): 1816 # Issue #28427: old keys should not remove new values from 1817 # WeakValueDictionary when collecting from another thread. 1818 d = weakref.WeakValueDictionary() 1819 with collect_in_thread(): 1820 for i in range(200000): 1821 o = RefCycle() 1822 d[10] = o 1823 # o is still alive, so the dict can't be empty 1824 self.assertEqual(len(d), 1) 1825 o = None # lose ref 1826 1827 def check_threaded_weak_dict_copy(self, type_, deepcopy): 1828 # `type_` should be either WeakKeyDictionary or WeakValueDictionary. 1829 # `deepcopy` should be either True or False. 1830 exc = [] 1831 1832 class DummyKey: 1833 def __init__(self, ctr): 1834 self.ctr = ctr 1835 1836 class DummyValue: 1837 def __init__(self, ctr): 1838 self.ctr = ctr 1839 1840 def dict_copy(d, exc): 1841 try: 1842 if deepcopy is True: 1843 _ = copy.deepcopy(d) 1844 else: 1845 _ = d.copy() 1846 except Exception as ex: 1847 exc.append(ex) 1848 1849 def pop_and_collect(lst): 1850 gc_ctr = 0 1851 while lst: 1852 i = random.randint(0, len(lst) - 1) 1853 gc_ctr += 1 1854 lst.pop(i) 1855 if gc_ctr % 10000 == 0: 1856 gc.collect() # just in case 1857 1858 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) 1859 1860 d = type_() 1861 keys = [] 1862 values = [] 1863 # Initialize d with many entries 1864 for i in range(70000): 1865 k, v = DummyKey(i), DummyValue(i) 1866 keys.append(k) 1867 values.append(v) 1868 d[k] = v 1869 del k 1870 del v 1871 1872 t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) 1873 if type_ is weakref.WeakKeyDictionary: 1874 t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) 1875 else: # weakref.WeakValueDictionary 1876 t_collect = threading.Thread(target=pop_and_collect, args=(values,)) 1877 1878 t_copy.start() 1879 t_collect.start() 1880 1881 t_copy.join() 1882 t_collect.join() 1883 1884 # Test exceptions 1885 if exc: 1886 raise exc[0] 1887 1888 def test_threaded_weak_key_dict_copy(self): 1889 # Issue #35615: Weakref keys or values getting GC'ed during dict 1890 # copying should not result in a crash. 1891 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) 1892 1893 def test_threaded_weak_key_dict_deepcopy(self): 1894 # Issue #35615: Weakref keys or values getting GC'ed during dict 1895 # copying should not result in a crash. 1896 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) 1897 1898 def test_threaded_weak_value_dict_copy(self): 1899 # Issue #35615: Weakref keys or values getting GC'ed during dict 1900 # copying should not result in a crash. 1901 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) 1902 1903 def test_threaded_weak_value_dict_deepcopy(self): 1904 # Issue #35615: Weakref keys or values getting GC'ed during dict 1905 # copying should not result in a crash. 1906 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) 1907 1908 @support.cpython_only 1909 def test_remove_closure(self): 1910 d = weakref.WeakValueDictionary() 1911 self.assertIsNone(d._remove.__closure__) 1912 1913 1914from test import mapping_tests 1915 1916class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1917 """Check that WeakValueDictionary conforms to the mapping protocol""" 1918 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1919 type2test = weakref.WeakValueDictionary 1920 def _reference(self): 1921 return self.__ref.copy() 1922 1923class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1924 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1925 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1926 type2test = weakref.WeakKeyDictionary 1927 def _reference(self): 1928 return self.__ref.copy() 1929 1930 1931class FinalizeTestCase(unittest.TestCase): 1932 1933 class A: 1934 pass 1935 1936 def _collect_if_necessary(self): 1937 # we create no ref-cycles so in CPython no gc should be needed 1938 if sys.implementation.name != 'cpython': 1939 support.gc_collect() 1940 1941 def test_finalize(self): 1942 def add(x,y,z): 1943 res.append(x + y + z) 1944 return x + y + z 1945 1946 a = self.A() 1947 1948 res = [] 1949 f = weakref.finalize(a, add, 67, 43, z=89) 1950 self.assertEqual(f.alive, True) 1951 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) 1952 self.assertEqual(f(), 199) 1953 self.assertEqual(f(), None) 1954 self.assertEqual(f(), None) 1955 self.assertEqual(f.peek(), None) 1956 self.assertEqual(f.detach(), None) 1957 self.assertEqual(f.alive, False) 1958 self.assertEqual(res, [199]) 1959 1960 res = [] 1961 f = weakref.finalize(a, add, 67, 43, 89) 1962 self.assertEqual(f.peek(), (a, add, (67,43,89), {})) 1963 self.assertEqual(f.detach(), (a, add, (67,43,89), {})) 1964 self.assertEqual(f(), None) 1965 self.assertEqual(f(), None) 1966 self.assertEqual(f.peek(), None) 1967 self.assertEqual(f.detach(), None) 1968 self.assertEqual(f.alive, False) 1969 self.assertEqual(res, []) 1970 1971 res = [] 1972 f = weakref.finalize(a, add, x=67, y=43, z=89) 1973 del a 1974 self._collect_if_necessary() 1975 self.assertEqual(f(), None) 1976 self.assertEqual(f(), None) 1977 self.assertEqual(f.peek(), None) 1978 self.assertEqual(f.detach(), None) 1979 self.assertEqual(f.alive, False) 1980 self.assertEqual(res, [199]) 1981 1982 def test_arg_errors(self): 1983 def fin(*args, **kwargs): 1984 res.append((args, kwargs)) 1985 1986 a = self.A() 1987 1988 res = [] 1989 f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) 1990 self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) 1991 f() 1992 self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) 1993 1994 with self.assertRaises(TypeError): 1995 weakref.finalize(a, func=fin, arg=1) 1996 with self.assertRaises(TypeError): 1997 weakref.finalize(obj=a, func=fin, arg=1) 1998 self.assertRaises(TypeError, weakref.finalize, a) 1999 self.assertRaises(TypeError, weakref.finalize) 2000 2001 def test_order(self): 2002 a = self.A() 2003 res = [] 2004 2005 f1 = weakref.finalize(a, res.append, 'f1') 2006 f2 = weakref.finalize(a, res.append, 'f2') 2007 f3 = weakref.finalize(a, res.append, 'f3') 2008 f4 = weakref.finalize(a, res.append, 'f4') 2009 f5 = weakref.finalize(a, res.append, 'f5') 2010 2011 # make sure finalizers can keep themselves alive 2012 del f1, f4 2013 2014 self.assertTrue(f2.alive) 2015 self.assertTrue(f3.alive) 2016 self.assertTrue(f5.alive) 2017 2018 self.assertTrue(f5.detach()) 2019 self.assertFalse(f5.alive) 2020 2021 f5() # nothing because previously unregistered 2022 res.append('A') 2023 f3() # => res.append('f3') 2024 self.assertFalse(f3.alive) 2025 res.append('B') 2026 f3() # nothing because previously called 2027 res.append('C') 2028 del a 2029 self._collect_if_necessary() 2030 # => res.append('f4') 2031 # => res.append('f2') 2032 # => res.append('f1') 2033 self.assertFalse(f2.alive) 2034 res.append('D') 2035 f2() # nothing because previously called by gc 2036 2037 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] 2038 self.assertEqual(res, expected) 2039 2040 def test_all_freed(self): 2041 # we want a weakrefable subclass of weakref.finalize 2042 class MyFinalizer(weakref.finalize): 2043 pass 2044 2045 a = self.A() 2046 res = [] 2047 def callback(): 2048 res.append(123) 2049 f = MyFinalizer(a, callback) 2050 2051 wr_callback = weakref.ref(callback) 2052 wr_f = weakref.ref(f) 2053 del callback, f 2054 2055 self.assertIsNotNone(wr_callback()) 2056 self.assertIsNotNone(wr_f()) 2057 2058 del a 2059 self._collect_if_necessary() 2060 2061 self.assertIsNone(wr_callback()) 2062 self.assertIsNone(wr_f()) 2063 self.assertEqual(res, [123]) 2064 2065 @classmethod 2066 def run_in_child(cls): 2067 def error(): 2068 # Create an atexit finalizer from inside a finalizer called 2069 # at exit. This should be the next to be run. 2070 g1 = weakref.finalize(cls, print, 'g1') 2071 print('f3 error') 2072 1/0 2073 2074 # cls should stay alive till atexit callbacks run 2075 f1 = weakref.finalize(cls, print, 'f1', _global_var) 2076 f2 = weakref.finalize(cls, print, 'f2', _global_var) 2077 f3 = weakref.finalize(cls, error) 2078 f4 = weakref.finalize(cls, print, 'f4', _global_var) 2079 2080 assert f1.atexit == True 2081 f2.atexit = False 2082 assert f3.atexit == True 2083 assert f4.atexit == True 2084 2085 def test_atexit(self): 2086 prog = ('from test.test_weakref import FinalizeTestCase;'+ 2087 'FinalizeTestCase.run_in_child()') 2088 rc, out, err = script_helper.assert_python_ok('-c', prog) 2089 out = out.decode('ascii').splitlines() 2090 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) 2091 self.assertTrue(b'ZeroDivisionError' in err) 2092 2093 2094libreftest = """ Doctest for examples in the library reference: weakref.rst 2095 2096>>> import weakref 2097>>> class Dict(dict): 2098... pass 2099... 2100>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 2101>>> r = weakref.ref(obj) 2102>>> print(r() is obj) 2103True 2104 2105>>> import weakref 2106>>> class Object: 2107... pass 2108... 2109>>> o = Object() 2110>>> r = weakref.ref(o) 2111>>> o2 = r() 2112>>> o is o2 2113True 2114>>> del o, o2 2115>>> print(r()) 2116None 2117 2118>>> import weakref 2119>>> class ExtendedRef(weakref.ref): 2120... def __init__(self, ob, callback=None, **annotations): 2121... super().__init__(ob, callback) 2122... self.__counter = 0 2123... for k, v in annotations.items(): 2124... setattr(self, k, v) 2125... def __call__(self): 2126... '''Return a pair containing the referent and the number of 2127... times the reference has been called. 2128... ''' 2129... ob = super().__call__() 2130... if ob is not None: 2131... self.__counter += 1 2132... ob = (ob, self.__counter) 2133... return ob 2134... 2135>>> class A: # not in docs from here, just testing the ExtendedRef 2136... pass 2137... 2138>>> a = A() 2139>>> r = ExtendedRef(a, foo=1, bar="baz") 2140>>> r.foo 21411 2142>>> r.bar 2143'baz' 2144>>> r()[1] 21451 2146>>> r()[1] 21472 2148>>> r()[0] is a 2149True 2150 2151 2152>>> import weakref 2153>>> _id2obj_dict = weakref.WeakValueDictionary() 2154>>> def remember(obj): 2155... oid = id(obj) 2156... _id2obj_dict[oid] = obj 2157... return oid 2158... 2159>>> def id2obj(oid): 2160... return _id2obj_dict[oid] 2161... 2162>>> a = A() # from here, just testing 2163>>> a_id = remember(a) 2164>>> id2obj(a_id) is a 2165True 2166>>> del a 2167>>> try: 2168... id2obj(a_id) 2169... except KeyError: 2170... print('OK') 2171... else: 2172... print('WeakValueDictionary error') 2173OK 2174 2175""" 2176 2177__test__ = {'libreftest' : libreftest} 2178 2179def test_main(): 2180 support.run_unittest( 2181 ReferencesTestCase, 2182 WeakMethodTestCase, 2183 MappingTestCase, 2184 WeakValueDictionaryTestCase, 2185 WeakKeyDictionaryTestCase, 2186 SubclassableWeakrefTestCase, 2187 FinalizeTestCase, 2188 ) 2189 support.run_doctest(sys.modules[__name__]) 2190 2191 2192if __name__ == "__main__": 2193 test_main() 2194