1import gc 2import sys 3import unittest 4import UserList 5import weakref 6import operator 7import contextlib 8import copy 9 10from test import test_support 11 12# Used in ReferencesTestCase.test_ref_created_during_del() . 13ref_from_del = None 14 15class C: 16 def method(self): 17 pass 18 19 20class Callable: 21 bar = None 22 23 def __call__(self, x): 24 self.bar = x 25 26 27def create_function(): 28 def f(): pass 29 return f 30 31def create_bound_method(): 32 return C().method 33 34def create_unbound_method(): 35 return C.method 36 37 38class Object: 39 def __init__(self, arg): 40 self.arg = arg 41 def __repr__(self): 42 return "<Object %r>" % self.arg 43 def __eq__(self, other): 44 if isinstance(other, Object): 45 return self.arg == other.arg 46 return NotImplemented 47 def __ne__(self, other): 48 if isinstance(other, Object): 49 return self.arg != other.arg 50 return NotImplemented 51 def __hash__(self): 52 return hash(self.arg) 53 54class RefCycle: 55 def __init__(self): 56 self.cycle = self 57 58 59class TestBase(unittest.TestCase): 60 61 def setUp(self): 62 self.cbcalled = 0 63 64 def callback(self, ref): 65 self.cbcalled += 1 66 67 68class ReferencesTestCase(TestBase): 69 70 def test_basic_ref(self): 71 self.check_basic_ref(C) 72 self.check_basic_ref(create_function) 73 self.check_basic_ref(create_bound_method) 74 self.check_basic_ref(create_unbound_method) 75 76 # Just make sure the tp_repr handler doesn't raise an exception. 77 # Live reference: 78 o = C() 79 wr = weakref.ref(o) 80 repr(wr) 81 # Dead reference: 82 del o 83 repr(wr) 84 85 def test_basic_callback(self): 86 self.check_basic_callback(C) 87 self.check_basic_callback(create_function) 88 self.check_basic_callback(create_bound_method) 89 self.check_basic_callback(create_unbound_method) 90 91 def test_multiple_callbacks(self): 92 o = C() 93 ref1 = weakref.ref(o, self.callback) 94 ref2 = weakref.ref(o, self.callback) 95 del o 96 self.assertIsNone(ref1(), "expected reference to be invalidated") 97 self.assertIsNone(ref2(), "expected reference to be invalidated") 98 self.assertEqual(self.cbcalled, 2, 99 "callback not called the right number of times") 100 101 def test_multiple_selfref_callbacks(self): 102 # Make sure all references are invalidated before callbacks are called 103 # 104 # What's important here is that we're using the first 105 # reference in the callback invoked on the second reference 106 # (the most recently created ref is cleaned up first). This 107 # tests that all references to the object are invalidated 108 # before any of the callbacks are invoked, so that we only 109 # have one invocation of _weakref.c:cleanup_helper() active 110 # for a particular object at a time. 111 # 112 def callback(object, self=self): 113 self.ref() 114 c = C() 115 self.ref = weakref.ref(c, callback) 116 ref1 = weakref.ref(c, callback) 117 del c 118 119 def test_constructor_kwargs(self): 120 c = C() 121 self.assertRaises(TypeError, weakref.ref, c, callback=None) 122 123 def test_proxy_ref(self): 124 o = C() 125 o.bar = 1 126 ref1 = weakref.proxy(o, self.callback) 127 ref2 = weakref.proxy(o, self.callback) 128 del o 129 130 def check(proxy): 131 proxy.bar 132 133 self.assertRaises(weakref.ReferenceError, check, ref1) 134 self.assertRaises(weakref.ReferenceError, check, ref2) 135 self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C())) 136 self.assertEqual(self.cbcalled, 2) 137 138 def check_basic_ref(self, factory): 139 o = factory() 140 ref = weakref.ref(o) 141 self.assertIsNotNone(ref(), 142 "weak reference to live object should be live") 143 o2 = ref() 144 self.assertIs(o, o2, 145 "<ref>() should return original object if live") 146 147 def check_basic_callback(self, factory): 148 self.cbcalled = 0 149 o = factory() 150 ref = weakref.ref(o, self.callback) 151 del o 152 self.assertEqual(self.cbcalled, 1, 153 "callback did not properly set 'cbcalled'") 154 self.assertIsNone(ref(), 155 "ref2 should be dead after deleting object reference") 156 157 def test_ref_reuse(self): 158 o = C() 159 ref1 = weakref.ref(o) 160 # create a proxy to make sure that there's an intervening creation 161 # between these two; it should make no difference 162 proxy = weakref.proxy(o) 163 ref2 = weakref.ref(o) 164 self.assertIs(ref1, ref2, 165 "reference object w/out callback should be re-used") 166 167 o = C() 168 proxy = weakref.proxy(o) 169 ref1 = weakref.ref(o) 170 ref2 = weakref.ref(o) 171 self.assertIs(ref1, ref2, 172 "reference object w/out callback should be re-used") 173 self.assertEqual(weakref.getweakrefcount(o), 2, 174 "wrong weak ref count for object") 175 del proxy 176 self.assertEqual(weakref.getweakrefcount(o), 1, 177 "wrong weak ref count for object after deleting proxy") 178 179 def test_proxy_reuse(self): 180 o = C() 181 proxy1 = weakref.proxy(o) 182 ref = weakref.ref(o) 183 proxy2 = weakref.proxy(o) 184 self.assertIs(proxy1, proxy2, 185 "proxy object w/out callback should have been re-used") 186 187 def test_basic_proxy(self): 188 o = C() 189 self.check_proxy(o, weakref.proxy(o)) 190 191 L = UserList.UserList() 192 p = weakref.proxy(L) 193 self.assertFalse(p, "proxy for empty UserList should be false") 194 p.append(12) 195 self.assertEqual(len(L), 1) 196 self.assertTrue(p, "proxy for non-empty UserList should be true") 197 with test_support.check_py3k_warnings(): 198 p[:] = [2, 3] 199 self.assertEqual(len(L), 2) 200 self.assertEqual(len(p), 2) 201 self.assertIn(3, p, "proxy didn't support __contains__() properly") 202 p[1] = 5 203 self.assertEqual(L[1], 5) 204 self.assertEqual(p[1], 5) 205 L2 = UserList.UserList(L) 206 p2 = weakref.proxy(L2) 207 self.assertEqual(p, p2) 208 ## self.assertEqual(repr(L2), repr(p2)) 209 L3 = UserList.UserList(range(10)) 210 p3 = weakref.proxy(L3) 211 with test_support.check_py3k_warnings(): 212 self.assertEqual(L3[:], p3[:]) 213 self.assertEqual(L3[5:], p3[5:]) 214 self.assertEqual(L3[:5], p3[:5]) 215 self.assertEqual(L3[2:5], p3[2:5]) 216 217 def test_proxy_unicode(self): 218 # See bug 5037 219 class C(object): 220 def __str__(self): 221 return "string" 222 def __unicode__(self): 223 return u"unicode" 224 instance = C() 225 self.assertIn("__unicode__", dir(weakref.proxy(instance))) 226 self.assertEqual(unicode(weakref.proxy(instance)), u"unicode") 227 228 def test_proxy_index(self): 229 class C: 230 def __index__(self): 231 return 10 232 o = C() 233 p = weakref.proxy(o) 234 self.assertEqual(operator.index(p), 10) 235 236 def test_proxy_div(self): 237 class C: 238 def __floordiv__(self, other): 239 return 42 240 def __ifloordiv__(self, other): 241 return 21 242 o = C() 243 p = weakref.proxy(o) 244 self.assertEqual(p // 5, 42) 245 p //= 5 246 self.assertEqual(p, 21) 247 248 # The PyWeakref_* C API is documented as allowing either NULL or 249 # None as the value for the callback, where either means "no 250 # callback". The "no callback" ref and proxy objects are supposed 251 # to be shared so long as they exist by all callers so long as 252 # they are active. In Python 2.3.3 and earlier, this guarantee 253 # was not honored, and was broken in different ways for 254 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 255 256 def test_shared_ref_without_callback(self): 257 self.check_shared_without_callback(weakref.ref) 258 259 def test_shared_proxy_without_callback(self): 260 self.check_shared_without_callback(weakref.proxy) 261 262 def check_shared_without_callback(self, makeref): 263 o = Object(1) 264 p1 = makeref(o, None) 265 p2 = makeref(o, None) 266 self.assertIs(p1, p2, "both callbacks were None in the C API") 267 del p1, p2 268 p1 = makeref(o) 269 p2 = makeref(o, None) 270 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 271 del p1, p2 272 p1 = makeref(o) 273 p2 = makeref(o) 274 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 275 del p1, p2 276 p1 = makeref(o, None) 277 p2 = makeref(o) 278 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 279 280 def test_callable_proxy(self): 281 o = Callable() 282 ref1 = weakref.proxy(o) 283 284 self.check_proxy(o, ref1) 285 286 self.assertIs(type(ref1), weakref.CallableProxyType, 287 "proxy is not of callable type") 288 ref1('twinkies!') 289 self.assertEqual(o.bar, 'twinkies!', 290 "call through proxy not passed through to original") 291 ref1(x='Splat.') 292 self.assertEqual(o.bar, 'Splat.', 293 "call through proxy not passed through to original") 294 295 # expect due to too few args 296 self.assertRaises(TypeError, ref1) 297 298 # expect due to too many args 299 self.assertRaises(TypeError, ref1, 1, 2, 3) 300 301 def check_proxy(self, o, proxy): 302 o.foo = 1 303 self.assertEqual(proxy.foo, 1, 304 "proxy does not reflect attribute addition") 305 o.foo = 2 306 self.assertEqual(proxy.foo, 2, 307 "proxy does not reflect attribute modification") 308 del o.foo 309 self.assertFalse(hasattr(proxy, 'foo'), 310 "proxy does not reflect attribute removal") 311 312 proxy.foo = 1 313 self.assertEqual(o.foo, 1, 314 "object does not reflect attribute addition via proxy") 315 proxy.foo = 2 316 self.assertEqual(o.foo, 2, 317 "object does not reflect attribute modification via proxy") 318 del proxy.foo 319 self.assertFalse(hasattr(o, 'foo'), 320 "object does not reflect attribute removal via proxy") 321 322 def test_proxy_deletion(self): 323 # Test clearing of SF bug #762891 324 class Foo: 325 result = None 326 def __delitem__(self, accessor): 327 self.result = accessor 328 g = Foo() 329 f = weakref.proxy(g) 330 del f[0] 331 self.assertEqual(f.result, 0) 332 333 def test_proxy_bool(self): 334 # Test clearing of SF bug #1170766 335 class List(list): pass 336 lyst = List() 337 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 338 339 def test_getweakrefcount(self): 340 o = C() 341 ref1 = weakref.ref(o) 342 ref2 = weakref.ref(o, self.callback) 343 self.assertEqual(weakref.getweakrefcount(o), 2, 344 "got wrong number of weak reference objects") 345 346 proxy1 = weakref.proxy(o) 347 proxy2 = weakref.proxy(o, self.callback) 348 self.assertEqual(weakref.getweakrefcount(o), 4, 349 "got wrong number of weak reference objects") 350 351 del ref1, ref2, proxy1, proxy2 352 self.assertEqual(weakref.getweakrefcount(o), 0, 353 "weak reference objects not unlinked from" 354 " referent when discarded.") 355 356 # assumes ints do not support weakrefs 357 self.assertEqual(weakref.getweakrefcount(1), 0, 358 "got wrong number of weak reference objects for int") 359 360 def test_getweakrefs(self): 361 o = C() 362 ref1 = weakref.ref(o, self.callback) 363 ref2 = weakref.ref(o, self.callback) 364 del ref1 365 self.assertEqual(weakref.getweakrefs(o), [ref2], 366 "list of refs does not match") 367 368 o = C() 369 ref1 = weakref.ref(o, self.callback) 370 ref2 = weakref.ref(o, self.callback) 371 del ref2 372 self.assertEqual(weakref.getweakrefs(o), [ref1], 373 "list of refs does not match") 374 375 del ref1 376 self.assertEqual(weakref.getweakrefs(o), [], 377 "list of refs not cleared") 378 379 # assumes ints do not support weakrefs 380 self.assertEqual(weakref.getweakrefs(1), [], 381 "list of refs does not match for int") 382 383 def test_newstyle_number_ops(self): 384 class F(float): 385 pass 386 f = F(2.0) 387 p = weakref.proxy(f) 388 self.assertEqual(p + 1.0, 3.0) 389 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 390 391 def test_callbacks_protected(self): 392 # Callbacks protected from already-set exceptions? 393 # Regression test for SF bug #478534. 394 class BogusError(Exception): 395 pass 396 data = {} 397 def remove(k): 398 del data[k] 399 def encapsulate(): 400 f = lambda : () 401 data[weakref.ref(f, remove)] = None 402 raise BogusError 403 try: 404 encapsulate() 405 except BogusError: 406 pass 407 else: 408 self.fail("exception not properly restored") 409 try: 410 encapsulate() 411 except BogusError: 412 pass 413 else: 414 self.fail("exception not properly restored") 415 416 def test_sf_bug_840829(self): 417 # "weakref callbacks and gc corrupt memory" 418 # subtype_dealloc erroneously exposed a new-style instance 419 # already in the process of getting deallocated to gc, 420 # causing double-deallocation if the instance had a weakref 421 # callback that triggered gc. 422 # If the bug exists, there probably won't be an obvious symptom 423 # in a release build. In a debug build, a segfault will occur 424 # when the second attempt to remove the instance from the "list 425 # of all objects" occurs. 426 427 import gc 428 429 class C(object): 430 pass 431 432 c = C() 433 wr = weakref.ref(c, lambda ignore: gc.collect()) 434 del c 435 436 # There endeth the first part. It gets worse. 437 del wr 438 439 c1 = C() 440 c1.i = C() 441 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 442 443 c2 = C() 444 c2.c1 = c1 445 del c1 # still alive because c2 points to it 446 447 # Now when subtype_dealloc gets called on c2, it's not enough just 448 # that c2 is immune from gc while the weakref callbacks associated 449 # with c2 execute (there are none in this 2nd half of the test, btw). 450 # subtype_dealloc goes on to call the base classes' deallocs too, 451 # so any gc triggered by weakref callbacks associated with anything 452 # torn down by a base class dealloc can also trigger double 453 # deallocation of c2. 454 del c2 455 456 def test_callback_in_cycle_1(self): 457 import gc 458 459 class J(object): 460 pass 461 462 class II(object): 463 def acallback(self, ignore): 464 self.J 465 466 I = II() 467 I.J = J 468 I.wr = weakref.ref(J, I.acallback) 469 470 # Now J and II are each in a self-cycle (as all new-style class 471 # objects are, since their __mro__ points back to them). I holds 472 # both a weak reference (I.wr) and a strong reference (I.J) to class 473 # J. I is also in a cycle (I.wr points to a weakref that references 474 # I.acallback). When we del these three, they all become trash, but 475 # the cycles prevent any of them from getting cleaned up immediately. 476 # Instead they have to wait for cyclic gc to deduce that they're 477 # trash. 478 # 479 # gc used to call tp_clear on all of them, and the order in which 480 # it does that is pretty accidental. The exact order in which we 481 # built up these things manages to provoke gc into running tp_clear 482 # in just the right order (I last). Calling tp_clear on II leaves 483 # behind an insane class object (its __mro__ becomes NULL). Calling 484 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 485 # just then because of the strong reference from I.J. Calling 486 # tp_clear on I starts to clear I's __dict__, and just happens to 487 # clear I.J first -- I.wr is still intact. That removes the last 488 # reference to J, which triggers the weakref callback. The callback 489 # tries to do "self.J", and instances of new-style classes look up 490 # attributes ("J") in the class dict first. The class (II) wants to 491 # search II.__mro__, but that's NULL. The result was a segfault in 492 # a release build, and an assert failure in a debug build. 493 del I, J, II 494 gc.collect() 495 496 def test_callback_in_cycle_2(self): 497 import gc 498 499 # This is just like test_callback_in_cycle_1, except that II is an 500 # old-style class. The symptom is different then: an instance of an 501 # old-style class looks in its own __dict__ first. 'J' happens to 502 # get cleared from I.__dict__ before 'wr', and 'J' was never in II's 503 # __dict__, so the attribute isn't found. The difference is that 504 # the old-style II doesn't have a NULL __mro__ (it doesn't have any 505 # __mro__), so no segfault occurs. Instead it got: 506 # test_callback_in_cycle_2 (__main__.ReferencesTestCase) ... 507 # Exception exceptions.AttributeError: 508 # "II instance has no attribute 'J'" in <bound method II.acallback 509 # of <?.II instance at 0x00B9B4B8>> ignored 510 511 class J(object): 512 pass 513 514 class II: 515 def acallback(self, ignore): 516 self.J 517 518 I = II() 519 I.J = J 520 I.wr = weakref.ref(J, I.acallback) 521 522 del I, J, II 523 gc.collect() 524 525 def test_callback_in_cycle_3(self): 526 import gc 527 528 # This one broke the first patch that fixed the last two. In this 529 # case, the objects reachable from the callback aren't also reachable 530 # from the object (c1) *triggering* the callback: you can get to 531 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 532 # got tp_clear'ed by the time the c2.cb callback got invoked. 533 534 class C: 535 def cb(self, ignore): 536 self.me 537 self.c1 538 self.wr 539 540 c1, c2 = C(), C() 541 542 c2.me = c2 543 c2.c1 = c1 544 c2.wr = weakref.ref(c1, c2.cb) 545 546 del c1, c2 547 gc.collect() 548 549 def test_callback_in_cycle_4(self): 550 import gc 551 552 # Like test_callback_in_cycle_3, except c2 and c1 have different 553 # classes. c2's class (C) isn't reachable from c1 then, so protecting 554 # objects reachable from the dying object (c1) isn't enough to stop 555 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 556 # The result was a segfault (C.__mro__ was NULL when the callback 557 # tried to look up self.me). 558 559 class C(object): 560 def cb(self, ignore): 561 self.me 562 self.c1 563 self.wr 564 565 class D: 566 pass 567 568 c1, c2 = D(), C() 569 570 c2.me = c2 571 c2.c1 = c1 572 c2.wr = weakref.ref(c1, c2.cb) 573 574 del c1, c2, C, D 575 gc.collect() 576 577 def test_callback_in_cycle_resurrection(self): 578 import gc 579 580 # Do something nasty in a weakref callback: resurrect objects 581 # from dead cycles. For this to be attempted, the weakref and 582 # its callback must also be part of the cyclic trash (else the 583 # objects reachable via the callback couldn't be in cyclic trash 584 # to begin with -- the callback would act like an external root). 585 # But gc clears trash weakrefs with callbacks early now, which 586 # disables the callbacks, so the callbacks shouldn't get called 587 # at all (and so nothing actually gets resurrected). 588 589 alist = [] 590 class C(object): 591 def __init__(self, value): 592 self.attribute = value 593 594 def acallback(self, ignore): 595 alist.append(self.c) 596 597 c1, c2 = C(1), C(2) 598 c1.c = c2 599 c2.c = c1 600 c1.wr = weakref.ref(c2, c1.acallback) 601 c2.wr = weakref.ref(c1, c2.acallback) 602 603 def C_went_away(ignore): 604 alist.append("C went away") 605 wr = weakref.ref(C, C_went_away) 606 607 del c1, c2, C # make them all trash 608 self.assertEqual(alist, []) # del isn't enough to reclaim anything 609 610 gc.collect() 611 # c1.wr and c2.wr were part of the cyclic trash, so should have 612 # been cleared without their callbacks executing. OTOH, the weakref 613 # to C is bound to a function local (wr), and wasn't trash, so that 614 # callback should have been invoked when C went away. 615 self.assertEqual(alist, ["C went away"]) 616 # The remaining weakref should be dead now (its callback ran). 617 self.assertEqual(wr(), None) 618 619 del alist[:] 620 gc.collect() 621 self.assertEqual(alist, []) 622 623 def test_callbacks_on_callback(self): 624 import gc 625 626 # Set up weakref callbacks *on* weakref callbacks. 627 alist = [] 628 def safe_callback(ignore): 629 alist.append("safe_callback called") 630 631 class C(object): 632 def cb(self, ignore): 633 alist.append("cb called") 634 635 c, d = C(), C() 636 c.other = d 637 d.other = c 638 callback = c.cb 639 c.wr = weakref.ref(d, callback) # this won't trigger 640 d.wr = weakref.ref(callback, d.cb) # ditto 641 external_wr = weakref.ref(callback, safe_callback) # but this will 642 self.assertIs(external_wr(), callback) 643 644 # The weakrefs attached to c and d should get cleared, so that 645 # C.cb is never called. But external_wr isn't part of the cyclic 646 # trash, and no cyclic trash is reachable from it, so safe_callback 647 # should get invoked when the bound method object callback (c.cb) 648 # -- which is itself a callback, and also part of the cyclic trash -- 649 # gets reclaimed at the end of gc. 650 651 del callback, c, d, C 652 self.assertEqual(alist, []) # del isn't enough to clean up cycles 653 gc.collect() 654 self.assertEqual(alist, ["safe_callback called"]) 655 self.assertEqual(external_wr(), None) 656 657 del alist[:] 658 gc.collect() 659 self.assertEqual(alist, []) 660 661 def test_gc_during_ref_creation(self): 662 self.check_gc_during_creation(weakref.ref) 663 664 def test_gc_during_proxy_creation(self): 665 self.check_gc_during_creation(weakref.proxy) 666 667 def check_gc_during_creation(self, makeref): 668 thresholds = gc.get_threshold() 669 gc.set_threshold(1, 1, 1) 670 gc.collect() 671 class A: 672 pass 673 674 def callback(*args): 675 pass 676 677 referenced = A() 678 679 a = A() 680 a.a = a 681 a.wr = makeref(referenced) 682 683 try: 684 # now make sure the object and the ref get labeled as 685 # cyclic trash: 686 a = A() 687 weakref.ref(referenced, callback) 688 689 finally: 690 gc.set_threshold(*thresholds) 691 692 def test_ref_created_during_del(self): 693 # Bug #1377858 694 # A weakref created in an object's __del__() would crash the 695 # interpreter when the weakref was cleaned up since it would refer to 696 # non-existent memory. This test should not segfault the interpreter. 697 class Target(object): 698 def __del__(self): 699 global ref_from_del 700 ref_from_del = weakref.ref(self) 701 702 w = Target() 703 704 def test_init(self): 705 # Issue 3634 706 # <weakref to class>.__init__() doesn't check errors correctly 707 r = weakref.ref(Exception) 708 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 709 # No exception should be raised here 710 gc.collect() 711 712 def test_classes(self): 713 # Check that both old-style classes and new-style classes 714 # are weakrefable. 715 class A(object): 716 pass 717 class B: 718 pass 719 l = [] 720 weakref.ref(int) 721 a = weakref.ref(A, l.append) 722 A = None 723 gc.collect() 724 self.assertEqual(a(), None) 725 self.assertEqual(l, [a]) 726 b = weakref.ref(B, l.append) 727 B = None 728 gc.collect() 729 self.assertEqual(b(), None) 730 self.assertEqual(l, [a, b]) 731 732 def test_equality(self): 733 # Alive weakrefs defer equality testing to their underlying object. 734 x = Object(1) 735 y = Object(1) 736 z = Object(2) 737 a = weakref.ref(x) 738 b = weakref.ref(y) 739 c = weakref.ref(z) 740 d = weakref.ref(x) 741 # Note how we directly test the operators here, to stress both 742 # __eq__ and __ne__. 743 self.assertTrue(a == b) 744 self.assertFalse(a != b) 745 self.assertFalse(a == c) 746 self.assertTrue(a != c) 747 self.assertTrue(a == d) 748 self.assertFalse(a != d) 749 del x, y, z 750 gc.collect() 751 for r in a, b, c: 752 # Sanity check 753 self.assertIs(r(), None) 754 # Dead weakrefs compare by identity: whether `a` and `d` are the 755 # same weakref object is an implementation detail, since they pointed 756 # to the same original object and didn't have a callback. 757 # (see issue #16453). 758 self.assertFalse(a == b) 759 self.assertTrue(a != b) 760 self.assertFalse(a == c) 761 self.assertTrue(a != c) 762 self.assertEqual(a == d, a is d) 763 self.assertEqual(a != d, a is not d) 764 765 def test_hashing(self): 766 # Alive weakrefs hash the same as the underlying object 767 x = Object(42) 768 y = Object(42) 769 a = weakref.ref(x) 770 b = weakref.ref(y) 771 self.assertEqual(hash(a), hash(42)) 772 del x, y 773 gc.collect() 774 # Dead weakrefs: 775 # - retain their hash is they were hashed when alive; 776 # - otherwise, cannot be hashed. 777 self.assertEqual(hash(a), hash(42)) 778 self.assertRaises(TypeError, hash, b) 779 780 def test_trashcan_16602(self): 781 # Issue #16602: when a weakref's target was part of a long 782 # deallocation chain, the trashcan mechanism could delay clearing 783 # of the weakref and make the target object visible from outside 784 # code even though its refcount had dropped to 0. A crash ensued. 785 class C(object): 786 def __init__(self, parent): 787 if not parent: 788 return 789 wself = weakref.ref(self) 790 def cb(wparent): 791 o = wself() 792 self.wparent = weakref.ref(parent, cb) 793 794 d = weakref.WeakKeyDictionary() 795 root = c = C(None) 796 for n in range(100): 797 d[c] = c = C(c) 798 del root 799 gc.collect() 800 801 802class SubclassableWeakrefTestCase(TestBase): 803 804 def test_subclass_refs(self): 805 class MyRef(weakref.ref): 806 def __init__(self, ob, callback=None, value=42): 807 self.value = value 808 super(MyRef, self).__init__(ob, callback) 809 def __call__(self): 810 self.called = True 811 return super(MyRef, self).__call__() 812 o = Object("foo") 813 mr = MyRef(o, value=24) 814 self.assertIs(mr(), o) 815 self.assertTrue(mr.called) 816 self.assertEqual(mr.value, 24) 817 del o 818 self.assertIsNone(mr()) 819 self.assertTrue(mr.called) 820 821 def test_subclass_refs_dont_replace_standard_refs(self): 822 class MyRef(weakref.ref): 823 pass 824 o = Object(42) 825 r1 = MyRef(o) 826 r2 = weakref.ref(o) 827 self.assertIsNot(r1, r2) 828 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 829 self.assertEqual(weakref.getweakrefcount(o), 2) 830 r3 = MyRef(o) 831 self.assertEqual(weakref.getweakrefcount(o), 3) 832 refs = weakref.getweakrefs(o) 833 self.assertEqual(len(refs), 3) 834 self.assertIs(r2, refs[0]) 835 self.assertIn(r1, refs[1:]) 836 self.assertIn(r3, refs[1:]) 837 838 def test_subclass_refs_dont_conflate_callbacks(self): 839 class MyRef(weakref.ref): 840 pass 841 o = Object(42) 842 r1 = MyRef(o, id) 843 r2 = MyRef(o, str) 844 self.assertIsNot(r1, r2) 845 refs = weakref.getweakrefs(o) 846 self.assertIn(r1, refs) 847 self.assertIn(r2, refs) 848 849 def test_subclass_refs_with_slots(self): 850 class MyRef(weakref.ref): 851 __slots__ = "slot1", "slot2" 852 def __new__(type, ob, callback, slot1, slot2): 853 return weakref.ref.__new__(type, ob, callback) 854 def __init__(self, ob, callback, slot1, slot2): 855 self.slot1 = slot1 856 self.slot2 = slot2 857 def meth(self): 858 return self.slot1 + self.slot2 859 o = Object(42) 860 r = MyRef(o, None, "abc", "def") 861 self.assertEqual(r.slot1, "abc") 862 self.assertEqual(r.slot2, "def") 863 self.assertEqual(r.meth(), "abcdef") 864 self.assertFalse(hasattr(r, "__dict__")) 865 866 def test_subclass_refs_with_cycle(self): 867 # Bug #3110 868 # An instance of a weakref subclass can have attributes. 869 # If such a weakref holds the only strong reference to the object, 870 # deleting the weakref will delete the object. In this case, 871 # the callback must not be called, because the ref object is 872 # being deleted. 873 class MyRef(weakref.ref): 874 pass 875 876 # Use a local callback, for "regrtest -R::" 877 # to detect refcounting problems 878 def callback(w): 879 self.cbcalled += 1 880 881 o = C() 882 r1 = MyRef(o, callback) 883 r1.o = o 884 del o 885 886 del r1 # Used to crash here 887 888 self.assertEqual(self.cbcalled, 0) 889 890 # Same test, with two weakrefs to the same object 891 # (since code paths are different) 892 o = C() 893 r1 = MyRef(o, callback) 894 r2 = MyRef(o, callback) 895 r1.r = r2 896 r2.o = o 897 del o 898 del r2 899 900 del r1 # Used to crash here 901 902 self.assertEqual(self.cbcalled, 0) 903 904 905class MappingTestCase(TestBase): 906 907 COUNT = 10 908 909 def check_len_cycles(self, dict_type, cons): 910 N = 20 911 items = [RefCycle() for i in range(N)] 912 dct = dict_type(cons(i, o) for i, o in enumerate(items)) 913 # Keep an iterator alive 914 it = dct.iteritems() 915 try: 916 next(it) 917 except StopIteration: 918 pass 919 del items 920 gc.collect() 921 n1 = len(dct) 922 list(it) 923 del it 924 gc.collect() 925 n2 = len(dct) 926 # iteration should prevent garbage collection here 927 # Note that this is a test on an implementation detail. The requirement 928 # is only to provide stable iteration, not that the size of the container 929 # stay fixed. 930 self.assertEqual(n1, 20) 931 #self.assertIn(n1, (0, 1)) 932 self.assertEqual(n2, 0) 933 934 def test_weak_keyed_len_cycles(self): 935 self.check_len_cycles(weakref.WeakKeyDictionary, lambda n, k: (k, n)) 936 937 def test_weak_valued_len_cycles(self): 938 self.check_len_cycles(weakref.WeakValueDictionary, lambda n, k: (n, k)) 939 940 def check_len_race(self, dict_type, cons): 941 # Extended sanity checks for len() in the face of cyclic collection 942 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 943 for th in range(1, 100): 944 N = 20 945 gc.collect(0) 946 gc.set_threshold(th, th, th) 947 items = [RefCycle() for i in range(N)] 948 dct = dict_type(cons(o) for o in items) 949 del items 950 # All items will be collected at next garbage collection pass 951 it = dct.iteritems() 952 try: 953 next(it) 954 except StopIteration: 955 pass 956 n1 = len(dct) 957 del it 958 n2 = len(dct) 959 self.assertGreaterEqual(n1, 0) 960 self.assertLessEqual(n1, N) 961 self.assertGreaterEqual(n2, 0) 962 self.assertLessEqual(n2, n1) 963 964 def test_weak_keyed_len_race(self): 965 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 966 967 def test_weak_valued_len_race(self): 968 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 969 970 def test_weak_values(self): 971 # 972 # This exercises d.copy(), d.items(), d[], del d[], len(d). 973 # 974 dict, objects = self.make_weak_valued_dict() 975 for o in objects: 976 self.assertEqual(weakref.getweakrefcount(o), 1, 977 "wrong number of weak references to %r!" % o) 978 self.assertIs(o, dict[o.arg], 979 "wrong object returned by weak dict!") 980 items1 = dict.items() 981 items2 = dict.copy().items() 982 items1.sort() 983 items2.sort() 984 self.assertEqual(items1, items2, 985 "cloning of weak-valued dictionary did not work!") 986 del items1, items2 987 self.assertEqual(len(dict), self.COUNT) 988 del objects[0] 989 self.assertEqual(len(dict), (self.COUNT - 1), 990 "deleting object did not cause dictionary update") 991 del objects, o 992 self.assertEqual(len(dict), 0, 993 "deleting the values did not clear the dictionary") 994 # regression on SF bug #447152: 995 dict = weakref.WeakValueDictionary() 996 self.assertRaises(KeyError, dict.__getitem__, 1) 997 dict[2] = C() 998 self.assertRaises(KeyError, dict.__getitem__, 2) 999 1000 def test_weak_keys(self): 1001 # 1002 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1003 # len(d), in d. 1004 # 1005 dict, objects = self.make_weak_keyed_dict() 1006 for o in objects: 1007 self.assertEqual(weakref.getweakrefcount(o), 1, 1008 "wrong number of weak references to %r!" % o) 1009 self.assertIs(o.arg, dict[o], 1010 "wrong object returned by weak dict!") 1011 items1 = dict.items() 1012 items2 = dict.copy().items() 1013 self.assertEqual(set(items1), set(items2), 1014 "cloning of weak-keyed dictionary did not work!") 1015 del items1, items2 1016 self.assertEqual(len(dict), self.COUNT) 1017 del objects[0] 1018 self.assertEqual(len(dict), (self.COUNT - 1), 1019 "deleting object did not cause dictionary update") 1020 del objects, o 1021 self.assertEqual(len(dict), 0, 1022 "deleting the keys did not clear the dictionary") 1023 o = Object(42) 1024 dict[o] = "What is the meaning of the universe?" 1025 self.assertIn(o, dict) 1026 self.assertNotIn(34, dict) 1027 1028 def test_weak_keyed_iters(self): 1029 dict, objects = self.make_weak_keyed_dict() 1030 self.check_iters(dict) 1031 1032 # Test keyrefs() 1033 refs = dict.keyrefs() 1034 self.assertEqual(len(refs), len(objects)) 1035 objects2 = list(objects) 1036 for wr in refs: 1037 ob = wr() 1038 self.assertIn(ob, dict) 1039 self.assertEqual(ob.arg, dict[ob]) 1040 objects2.remove(ob) 1041 self.assertEqual(len(objects2), 0) 1042 1043 # Test iterkeyrefs() 1044 objects2 = list(objects) 1045 self.assertEqual(len(list(dict.iterkeyrefs())), len(objects)) 1046 for wr in dict.iterkeyrefs(): 1047 ob = wr() 1048 self.assertIn(ob, dict) 1049 self.assertEqual(ob.arg, dict[ob]) 1050 objects2.remove(ob) 1051 self.assertEqual(len(objects2), 0) 1052 1053 def test_weak_valued_iters(self): 1054 dict, objects = self.make_weak_valued_dict() 1055 self.check_iters(dict) 1056 1057 # Test valuerefs() 1058 refs = dict.valuerefs() 1059 self.assertEqual(len(refs), len(objects)) 1060 objects2 = list(objects) 1061 for wr in refs: 1062 ob = wr() 1063 self.assertEqual(ob, dict[ob.arg]) 1064 self.assertEqual(ob.arg, dict[ob.arg].arg) 1065 objects2.remove(ob) 1066 self.assertEqual(len(objects2), 0) 1067 1068 # Test itervaluerefs() 1069 objects2 = list(objects) 1070 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1071 for wr in dict.itervaluerefs(): 1072 ob = wr() 1073 self.assertEqual(ob, dict[ob.arg]) 1074 self.assertEqual(ob.arg, dict[ob.arg].arg) 1075 objects2.remove(ob) 1076 self.assertEqual(len(objects2), 0) 1077 1078 def check_iters(self, dict): 1079 # item iterator: 1080 items = dict.items() 1081 for item in dict.iteritems(): 1082 items.remove(item) 1083 self.assertEqual(len(items), 0, "iteritems() did not touch all items") 1084 1085 # key iterator, via __iter__(): 1086 keys = dict.keys() 1087 for k in dict: 1088 keys.remove(k) 1089 self.assertEqual(len(keys), 0, "__iter__() did not touch all keys") 1090 1091 # key iterator, via iterkeys(): 1092 keys = dict.keys() 1093 for k in dict.iterkeys(): 1094 keys.remove(k) 1095 self.assertEqual(len(keys), 0, "iterkeys() did not touch all keys") 1096 1097 # value iterator: 1098 values = dict.values() 1099 for v in dict.itervalues(): 1100 values.remove(v) 1101 self.assertEqual(len(values), 0, 1102 "itervalues() did not touch all values") 1103 1104 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1105 n = len(dict) 1106 it = iter(getattr(dict, iter_name)()) 1107 next(it) # Trigger internal iteration 1108 # Destroy an object 1109 del objects[-1] 1110 gc.collect() # just in case 1111 # We have removed either the first consumed object, or another one 1112 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1113 del it 1114 # The removal has been committed 1115 self.assertEqual(len(dict), n - 1) 1116 1117 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1118 # Check that we can explicitly mutate the weak dict without 1119 # interfering with delayed removal. 1120 # `testcontext` should create an iterator, destroy one of the 1121 # weakref'ed objects and then return a new key/value pair corresponding 1122 # to the destroyed object. 1123 with testcontext() as (k, v): 1124 self.assertFalse(k in dict) 1125 with testcontext() as (k, v): 1126 self.assertRaises(KeyError, dict.__delitem__, k) 1127 self.assertFalse(k in dict) 1128 with testcontext() as (k, v): 1129 self.assertRaises(KeyError, dict.pop, k) 1130 self.assertFalse(k in dict) 1131 with testcontext() as (k, v): 1132 dict[k] = v 1133 self.assertEqual(dict[k], v) 1134 ddict = copy.copy(dict) 1135 with testcontext() as (k, v): 1136 dict.update(ddict) 1137 self.assertEqual(dict, ddict) 1138 with testcontext() as (k, v): 1139 dict.clear() 1140 self.assertEqual(len(dict), 0) 1141 1142 def test_weak_keys_destroy_while_iterating(self): 1143 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1144 dict, objects = self.make_weak_keyed_dict() 1145 self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') 1146 self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') 1147 self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') 1148 self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs') 1149 dict, objects = self.make_weak_keyed_dict() 1150 @contextlib.contextmanager 1151 def testcontext(): 1152 try: 1153 it = iter(dict.iteritems()) 1154 next(it) 1155 # Schedule a key/value for removal and recreate it 1156 v = objects.pop().arg 1157 gc.collect() # just in case 1158 yield Object(v), v 1159 finally: 1160 it = None # should commit all removals 1161 gc.collect() 1162 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1163 1164 def test_weak_values_destroy_while_iterating(self): 1165 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1166 dict, objects = self.make_weak_valued_dict() 1167 self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys') 1168 self.check_weak_destroy_while_iterating(dict, objects, 'iteritems') 1169 self.check_weak_destroy_while_iterating(dict, objects, 'itervalues') 1170 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1171 dict, objects = self.make_weak_valued_dict() 1172 @contextlib.contextmanager 1173 def testcontext(): 1174 try: 1175 it = iter(dict.iteritems()) 1176 next(it) 1177 # Schedule a key/value for removal and recreate it 1178 k = objects.pop().arg 1179 gc.collect() # just in case 1180 yield k, Object(k) 1181 finally: 1182 it = None # should commit all removals 1183 gc.collect() 1184 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1185 1186 def test_make_weak_keyed_dict_from_dict(self): 1187 o = Object(3) 1188 dict = weakref.WeakKeyDictionary({o:364}) 1189 self.assertEqual(dict[o], 364) 1190 1191 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1192 o = Object(3) 1193 dict = weakref.WeakKeyDictionary({o:364}) 1194 dict2 = weakref.WeakKeyDictionary(dict) 1195 self.assertEqual(dict[o], 364) 1196 1197 def make_weak_keyed_dict(self): 1198 dict = weakref.WeakKeyDictionary() 1199 objects = map(Object, range(self.COUNT)) 1200 for o in objects: 1201 dict[o] = o.arg 1202 return dict, objects 1203 1204 def test_make_weak_valued_dict_misc(self): 1205 # errors 1206 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1207 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1208 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1209 # special keyword arguments 1210 o = Object(3) 1211 for kw in 'self', 'other', 'iterable': 1212 d = weakref.WeakValueDictionary(**{kw: o}) 1213 self.assertEqual(list(d.keys()), [kw]) 1214 self.assertEqual(d[kw], o) 1215 1216 def make_weak_valued_dict(self): 1217 dict = weakref.WeakValueDictionary() 1218 objects = map(Object, range(self.COUNT)) 1219 for o in objects: 1220 dict[o.arg] = o 1221 return dict, objects 1222 1223 def check_popitem(self, klass, key1, value1, key2, value2): 1224 weakdict = klass() 1225 weakdict[key1] = value1 1226 weakdict[key2] = value2 1227 self.assertEqual(len(weakdict), 2) 1228 k, v = weakdict.popitem() 1229 self.assertEqual(len(weakdict), 1) 1230 if k is key1: 1231 self.assertIs(v, value1) 1232 else: 1233 self.assertIs(v, value2) 1234 k, v = weakdict.popitem() 1235 self.assertEqual(len(weakdict), 0) 1236 if k is key1: 1237 self.assertIs(v, value1) 1238 else: 1239 self.assertIs(v, value2) 1240 1241 def test_weak_valued_dict_popitem(self): 1242 self.check_popitem(weakref.WeakValueDictionary, 1243 "key1", C(), "key2", C()) 1244 1245 def test_weak_keyed_dict_popitem(self): 1246 self.check_popitem(weakref.WeakKeyDictionary, 1247 C(), "value 1", C(), "value 2") 1248 1249 def check_setdefault(self, klass, key, value1, value2): 1250 self.assertIsNot(value1, value2, 1251 "invalid test" 1252 " -- value parameters must be distinct objects") 1253 weakdict = klass() 1254 o = weakdict.setdefault(key, value1) 1255 self.assertIs(o, value1) 1256 self.assertIn(key, weakdict) 1257 self.assertIs(weakdict.get(key), value1) 1258 self.assertIs(weakdict[key], value1) 1259 1260 o = weakdict.setdefault(key, value2) 1261 self.assertIs(o, value1) 1262 self.assertIn(key, weakdict) 1263 self.assertIs(weakdict.get(key), value1) 1264 self.assertIs(weakdict[key], value1) 1265 1266 def test_weak_valued_dict_setdefault(self): 1267 self.check_setdefault(weakref.WeakValueDictionary, 1268 "key", C(), C()) 1269 1270 def test_weak_keyed_dict_setdefault(self): 1271 self.check_setdefault(weakref.WeakKeyDictionary, 1272 C(), "value 1", "value 2") 1273 1274 def check_update(self, klass, dict): 1275 # 1276 # This exercises d.update(), len(d), d.keys(), in d, 1277 # d.get(), d[]. 1278 # 1279 weakdict = klass() 1280 weakdict.update(dict) 1281 self.assertEqual(len(weakdict), len(dict)) 1282 for k in weakdict.keys(): 1283 self.assertIn(k, dict, 1284 "mysterious new key appeared in weak dict") 1285 v = dict.get(k) 1286 self.assertIs(v, weakdict[k]) 1287 self.assertIs(v, weakdict.get(k)) 1288 for k in dict.keys(): 1289 self.assertIn(k, weakdict, 1290 "original key disappeared in weak dict") 1291 v = dict[k] 1292 self.assertIs(v, weakdict[k]) 1293 self.assertIs(v, weakdict.get(k)) 1294 1295 def test_weak_valued_dict_update(self): 1296 self.check_update(weakref.WeakValueDictionary, 1297 {1: C(), 'a': C(), C(): C()}) 1298 # errors 1299 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1300 d = weakref.WeakValueDictionary() 1301 self.assertRaises(TypeError, d.update, {}, {}) 1302 self.assertRaises(TypeError, d.update, (), ()) 1303 self.assertEqual(list(d.keys()), []) 1304 # special keyword arguments 1305 o = Object(3) 1306 for kw in 'self', 'dict', 'other', 'iterable': 1307 d = weakref.WeakValueDictionary() 1308 d.update(**{kw: o}) 1309 self.assertEqual(list(d.keys()), [kw]) 1310 self.assertEqual(d[kw], o) 1311 1312 def test_weak_keyed_dict_update(self): 1313 self.check_update(weakref.WeakKeyDictionary, 1314 {C(): 1, C(): 2, C(): 3}) 1315 1316 def test_weak_keyed_delitem(self): 1317 d = weakref.WeakKeyDictionary() 1318 o1 = Object('1') 1319 o2 = Object('2') 1320 d[o1] = 'something' 1321 d[o2] = 'something' 1322 self.assertEqual(len(d), 2) 1323 del d[o1] 1324 self.assertEqual(len(d), 1) 1325 self.assertEqual(d.keys(), [o2]) 1326 1327 def test_weak_valued_delitem(self): 1328 d = weakref.WeakValueDictionary() 1329 o1 = Object('1') 1330 o2 = Object('2') 1331 d['something'] = o1 1332 d['something else'] = o2 1333 self.assertEqual(len(d), 2) 1334 del d['something'] 1335 self.assertEqual(len(d), 1) 1336 self.assertEqual(d.items(), [('something else', o2)]) 1337 1338 def test_weak_keyed_bad_delitem(self): 1339 d = weakref.WeakKeyDictionary() 1340 o = Object('1') 1341 # An attempt to delete an object that isn't there should raise 1342 # KeyError. It didn't before 2.3. 1343 self.assertRaises(KeyError, d.__delitem__, o) 1344 self.assertRaises(KeyError, d.__getitem__, o) 1345 1346 # If a key isn't of a weakly referencable type, __getitem__ and 1347 # __setitem__ raise TypeError. __delitem__ should too. 1348 self.assertRaises(TypeError, d.__delitem__, 13) 1349 self.assertRaises(TypeError, d.__getitem__, 13) 1350 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1351 1352 def test_weak_keyed_cascading_deletes(self): 1353 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1354 # over the keys via self.data.iterkeys(). If things vanished from 1355 # the dict during this (or got added), that caused a RuntimeError. 1356 1357 d = weakref.WeakKeyDictionary() 1358 mutate = False 1359 1360 class C(object): 1361 def __init__(self, i): 1362 self.value = i 1363 def __hash__(self): 1364 return hash(self.value) 1365 def __eq__(self, other): 1366 if mutate: 1367 # Side effect that mutates the dict, by removing the 1368 # last strong reference to a key. 1369 del objs[-1] 1370 return self.value == other.value 1371 1372 objs = [C(i) for i in range(4)] 1373 for o in objs: 1374 d[o] = o.value 1375 del o # now the only strong references to keys are in objs 1376 # Find the order in which iterkeys sees the keys. 1377 objs = d.keys() 1378 # Reverse it, so that the iteration implementation of __delitem__ 1379 # has to keep looping to find the first object we delete. 1380 objs.reverse() 1381 1382 # Turn on mutation in C.__eq__. The first time thru the loop, 1383 # under the iterkeys() business the first comparison will delete 1384 # the last item iterkeys() would see, and that causes a 1385 # RuntimeError: dictionary changed size during iteration 1386 # when the iterkeys() loop goes around to try comparing the next 1387 # key. After this was fixed, it just deletes the last object *our* 1388 # "for o in obj" loop would have gotten to. 1389 mutate = True 1390 count = 0 1391 for o in objs: 1392 count += 1 1393 del d[o] 1394 self.assertEqual(len(d), 0) 1395 self.assertEqual(count, 2) 1396 1397from test import mapping_tests 1398 1399class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1400 """Check that WeakValueDictionary conforms to the mapping protocol""" 1401 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 1402 type2test = weakref.WeakValueDictionary 1403 def _reference(self): 1404 return self.__ref.copy() 1405 1406class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 1407 """Check that WeakKeyDictionary conforms to the mapping protocol""" 1408 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 1409 type2test = weakref.WeakKeyDictionary 1410 def _reference(self): 1411 return self.__ref.copy() 1412 1413libreftest = """ Doctest for examples in the library reference: weakref.rst 1414 1415>>> import weakref 1416>>> class Dict(dict): 1417... pass 1418... 1419>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 1420>>> r = weakref.ref(obj) 1421>>> print r() is obj 1422True 1423 1424>>> import weakref 1425>>> class Object: 1426... pass 1427... 1428>>> o = Object() 1429>>> r = weakref.ref(o) 1430>>> o2 = r() 1431>>> o is o2 1432True 1433>>> del o, o2 1434>>> print r() 1435None 1436 1437>>> import weakref 1438>>> class ExtendedRef(weakref.ref): 1439... def __init__(self, ob, callback=None, **annotations): 1440... super(ExtendedRef, self).__init__(ob, callback) 1441... self.__counter = 0 1442... for k, v in annotations.iteritems(): 1443... setattr(self, k, v) 1444... def __call__(self): 1445... '''Return a pair containing the referent and the number of 1446... times the reference has been called. 1447... ''' 1448... ob = super(ExtendedRef, self).__call__() 1449... if ob is not None: 1450... self.__counter += 1 1451... ob = (ob, self.__counter) 1452... return ob 1453... 1454>>> class A: # not in docs from here, just testing the ExtendedRef 1455... pass 1456... 1457>>> a = A() 1458>>> r = ExtendedRef(a, foo=1, bar="baz") 1459>>> r.foo 14601 1461>>> r.bar 1462'baz' 1463>>> r()[1] 14641 1465>>> r()[1] 14662 1467>>> r()[0] is a 1468True 1469 1470 1471>>> import weakref 1472>>> _id2obj_dict = weakref.WeakValueDictionary() 1473>>> def remember(obj): 1474... oid = id(obj) 1475... _id2obj_dict[oid] = obj 1476... return oid 1477... 1478>>> def id2obj(oid): 1479... return _id2obj_dict[oid] 1480... 1481>>> a = A() # from here, just testing 1482>>> a_id = remember(a) 1483>>> id2obj(a_id) is a 1484True 1485>>> del a 1486>>> try: 1487... id2obj(a_id) 1488... except KeyError: 1489... print 'OK' 1490... else: 1491... print 'WeakValueDictionary error' 1492OK 1493 1494""" 1495 1496__test__ = {'libreftest' : libreftest} 1497 1498def test_main(): 1499 test_support.run_unittest( 1500 ReferencesTestCase, 1501 MappingTestCase, 1502 WeakValueDictionaryTestCase, 1503 WeakKeyDictionaryTestCase, 1504 SubclassableWeakrefTestCase, 1505 ) 1506 test_support.run_doctest(sys.modules[__name__]) 1507 1508 1509if __name__ == "__main__": 1510 test_main() 1511