1import gc 2import sys 3import doctest 4import unittest 5import collections 6import weakref 7import operator 8import contextlib 9import copy 10import threading 11import time 12import random 13import textwrap 14 15from test import support 16from test.support import script_helper, ALWAYS_EQ, suppress_immortalization 17from test.support import gc_collect 18from test.support import import_helper 19from test.support import threading_helper 20from test.support import is_wasi, Py_DEBUG 21 22# Used in ReferencesTestCase.test_ref_created_during_del() . 23ref_from_del = None 24 25# Used by FinalizeTestCase as a global that may be replaced by None 26# when the interpreter shuts down. 27_global_var = 'foobar' 28 29class C: 30 def method(self): 31 pass 32 33 34class Callable: 35 bar = None 36 37 def __call__(self, x): 38 self.bar = x 39 40 41def create_function(): 42 def f(): pass 43 return f 44 45def create_bound_method(): 46 return C().method 47 48 49class Object: 50 def __init__(self, arg): 51 self.arg = arg 52 def __repr__(self): 53 return "<Object %r>" % self.arg 54 def __eq__(self, other): 55 if isinstance(other, Object): 56 return self.arg == other.arg 57 return NotImplemented 58 def __lt__(self, other): 59 if isinstance(other, Object): 60 return self.arg < other.arg 61 return NotImplemented 62 def __hash__(self): 63 return hash(self.arg) 64 def some_method(self): 65 return 4 66 def other_method(self): 67 return 5 68 69 70class RefCycle: 71 def __init__(self): 72 self.cycle = self 73 74 75class TestBase(unittest.TestCase): 76 77 def setUp(self): 78 self.cbcalled = 0 79 80 def callback(self, ref): 81 self.cbcalled += 1 82 83 84@contextlib.contextmanager 85def collect_in_thread(period=0.005): 86 """ 87 Ensure GC collections happen in a different thread, at a high frequency. 88 """ 89 please_stop = False 90 91 def collect(): 92 while not please_stop: 93 time.sleep(period) 94 gc.collect() 95 96 with support.disable_gc(): 97 t = threading.Thread(target=collect) 98 t.start() 99 try: 100 yield 101 finally: 102 please_stop = True 103 t.join() 104 105 106class ReferencesTestCase(TestBase): 107 108 def test_basic_ref(self): 109 self.check_basic_ref(C) 110 self.check_basic_ref(create_function) 111 self.check_basic_ref(create_bound_method) 112 113 # Just make sure the tp_repr handler doesn't raise an exception. 114 # Live reference: 115 o = C() 116 wr = weakref.ref(o) 117 repr(wr) 118 # Dead reference: 119 del o 120 repr(wr) 121 122 @support.cpython_only 123 def test_ref_repr(self): 124 obj = C() 125 ref = weakref.ref(obj) 126 regex = ( 127 rf"<weakref at 0x[0-9a-fA-F]+; " 128 rf"to '{'' if __name__ == '__main__' else C.__module__ + '.'}{C.__qualname__}' " 129 rf"at 0x[0-9a-fA-F]+>" 130 ) 131 self.assertRegex(repr(ref), regex) 132 133 obj = None 134 gc_collect() 135 self.assertRegex(repr(ref), 136 rf'<weakref at 0x[0-9a-fA-F]+; dead>') 137 138 # test type with __name__ 139 class WithName: 140 @property 141 def __name__(self): 142 return "custom_name" 143 144 obj2 = WithName() 145 ref2 = weakref.ref(obj2) 146 regex = ( 147 rf"<weakref at 0x[0-9a-fA-F]+; " 148 rf"to '{'' if __name__ == '__main__' else WithName.__module__ + '.'}" 149 rf"{WithName.__qualname__}' " 150 rf"at 0x[0-9a-fA-F]+ +\(custom_name\)>" 151 ) 152 self.assertRegex(repr(ref2), regex) 153 154 def test_repr_failure_gh99184(self): 155 class MyConfig(dict): 156 def __getattr__(self, x): 157 return self[x] 158 159 obj = MyConfig(offset=5) 160 obj_weakref = weakref.ref(obj) 161 162 self.assertIn('MyConfig', repr(obj_weakref)) 163 self.assertIn('MyConfig', str(obj_weakref)) 164 165 def test_basic_callback(self): 166 self.check_basic_callback(C) 167 self.check_basic_callback(create_function) 168 self.check_basic_callback(create_bound_method) 169 170 @support.cpython_only 171 def test_cfunction(self): 172 _testcapi = import_helper.import_module("_testcapi") 173 create_cfunction = _testcapi.create_cfunction 174 f = create_cfunction() 175 wr = weakref.ref(f) 176 self.assertIs(wr(), f) 177 del f 178 self.assertIsNone(wr()) 179 self.check_basic_ref(create_cfunction) 180 self.check_basic_callback(create_cfunction) 181 182 def test_multiple_callbacks(self): 183 o = C() 184 ref1 = weakref.ref(o, self.callback) 185 ref2 = weakref.ref(o, self.callback) 186 del o 187 gc_collect() # For PyPy or other GCs. 188 self.assertIsNone(ref1(), "expected reference to be invalidated") 189 self.assertIsNone(ref2(), "expected reference to be invalidated") 190 self.assertEqual(self.cbcalled, 2, 191 "callback not called the right number of times") 192 193 def test_multiple_selfref_callbacks(self): 194 # Make sure all references are invalidated before callbacks are called 195 # 196 # What's important here is that we're using the first 197 # reference in the callback invoked on the second reference 198 # (the most recently created ref is cleaned up first). This 199 # tests that all references to the object are invalidated 200 # before any of the callbacks are invoked, so that we only 201 # have one invocation of _weakref.c:cleanup_helper() active 202 # for a particular object at a time. 203 # 204 def callback(object, self=self): 205 self.ref() 206 c = C() 207 self.ref = weakref.ref(c, callback) 208 ref1 = weakref.ref(c, callback) 209 del c 210 211 def test_constructor_kwargs(self): 212 c = C() 213 self.assertRaises(TypeError, weakref.ref, c, callback=None) 214 215 def test_proxy_ref(self): 216 o = C() 217 o.bar = 1 218 ref1 = weakref.proxy(o, self.callback) 219 ref2 = weakref.proxy(o, self.callback) 220 del o 221 gc_collect() # For PyPy or other GCs. 222 223 def check(proxy): 224 proxy.bar 225 226 self.assertRaises(ReferenceError, check, ref1) 227 self.assertRaises(ReferenceError, check, ref2) 228 ref3 = weakref.proxy(C()) 229 gc_collect() # For PyPy or other GCs. 230 self.assertRaises(ReferenceError, bool, ref3) 231 self.assertEqual(self.cbcalled, 2) 232 233 @support.cpython_only 234 def test_proxy_repr(self): 235 obj = C() 236 ref = weakref.proxy(obj, self.callback) 237 regex = ( 238 rf"<weakproxy at 0x[0-9a-fA-F]+; " 239 rf"to '{'' if __name__ == '__main__' else C.__module__ + '.'}{C.__qualname__}' " 240 rf"at 0x[0-9a-fA-F]+>" 241 ) 242 self.assertRegex(repr(ref), regex) 243 244 obj = None 245 gc_collect() 246 self.assertRegex(repr(ref), 247 rf'<weakproxy at 0x[0-9a-fA-F]+; dead>') 248 249 def check_basic_ref(self, factory): 250 o = factory() 251 ref = weakref.ref(o) 252 self.assertIsNotNone(ref(), 253 "weak reference to live object should be live") 254 o2 = ref() 255 self.assertIs(o, o2, 256 "<ref>() should return original object if live") 257 258 def check_basic_callback(self, factory): 259 self.cbcalled = 0 260 o = factory() 261 ref = weakref.ref(o, self.callback) 262 del o 263 gc_collect() # For PyPy or other GCs. 264 self.assertEqual(self.cbcalled, 1, 265 "callback did not properly set 'cbcalled'") 266 self.assertIsNone(ref(), 267 "ref2 should be dead after deleting object reference") 268 269 def test_ref_reuse(self): 270 o = C() 271 ref1 = weakref.ref(o) 272 # create a proxy to make sure that there's an intervening creation 273 # between these two; it should make no difference 274 proxy = weakref.proxy(o) 275 ref2 = weakref.ref(o) 276 self.assertIs(ref1, ref2, 277 "reference object w/out callback should be re-used") 278 279 o = C() 280 proxy = weakref.proxy(o) 281 ref1 = weakref.ref(o) 282 ref2 = weakref.ref(o) 283 self.assertIs(ref1, ref2, 284 "reference object w/out callback should be re-used") 285 self.assertEqual(weakref.getweakrefcount(o), 2, 286 "wrong weak ref count for object") 287 del proxy 288 gc_collect() # For PyPy or other GCs. 289 self.assertEqual(weakref.getweakrefcount(o), 1, 290 "wrong weak ref count for object after deleting proxy") 291 292 def test_proxy_reuse(self): 293 o = C() 294 proxy1 = weakref.proxy(o) 295 ref = weakref.ref(o) 296 proxy2 = weakref.proxy(o) 297 self.assertIs(proxy1, proxy2, 298 "proxy object w/out callback should have been re-used") 299 300 def test_basic_proxy(self): 301 o = C() 302 self.check_proxy(o, weakref.proxy(o)) 303 304 L = collections.UserList() 305 p = weakref.proxy(L) 306 self.assertFalse(p, "proxy for empty UserList should be false") 307 p.append(12) 308 self.assertEqual(len(L), 1) 309 self.assertTrue(p, "proxy for non-empty UserList should be true") 310 p[:] = [2, 3] 311 self.assertEqual(len(L), 2) 312 self.assertEqual(len(p), 2) 313 self.assertIn(3, p, "proxy didn't support __contains__() properly") 314 p[1] = 5 315 self.assertEqual(L[1], 5) 316 self.assertEqual(p[1], 5) 317 L2 = collections.UserList(L) 318 p2 = weakref.proxy(L2) 319 self.assertEqual(p, p2) 320 ## self.assertEqual(repr(L2), repr(p2)) 321 L3 = collections.UserList(range(10)) 322 p3 = weakref.proxy(L3) 323 self.assertEqual(L3[:], p3[:]) 324 self.assertEqual(L3[5:], p3[5:]) 325 self.assertEqual(L3[:5], p3[:5]) 326 self.assertEqual(L3[2:5], p3[2:5]) 327 328 def test_proxy_unicode(self): 329 # See bug 5037 330 class C(object): 331 def __str__(self): 332 return "string" 333 def __bytes__(self): 334 return b"bytes" 335 instance = C() 336 self.assertIn("__bytes__", dir(weakref.proxy(instance))) 337 self.assertEqual(bytes(weakref.proxy(instance)), b"bytes") 338 339 def test_proxy_index(self): 340 class C: 341 def __index__(self): 342 return 10 343 o = C() 344 p = weakref.proxy(o) 345 self.assertEqual(operator.index(p), 10) 346 347 def test_proxy_div(self): 348 class C: 349 def __floordiv__(self, other): 350 return 42 351 def __ifloordiv__(self, other): 352 return 21 353 o = C() 354 p = weakref.proxy(o) 355 self.assertEqual(p // 5, 42) 356 p //= 5 357 self.assertEqual(p, 21) 358 359 def test_proxy_matmul(self): 360 class C: 361 def __matmul__(self, other): 362 return 1729 363 def __rmatmul__(self, other): 364 return -163 365 def __imatmul__(self, other): 366 return 561 367 o = C() 368 p = weakref.proxy(o) 369 self.assertEqual(p @ 5, 1729) 370 self.assertEqual(5 @ p, -163) 371 p @= 5 372 self.assertEqual(p, 561) 373 374 # The PyWeakref_* C API is documented as allowing either NULL or 375 # None as the value for the callback, where either means "no 376 # callback". The "no callback" ref and proxy objects are supposed 377 # to be shared so long as they exist by all callers so long as 378 # they are active. In Python 2.3.3 and earlier, this guarantee 379 # was not honored, and was broken in different ways for 380 # PyWeakref_NewRef() and PyWeakref_NewProxy(). (Two tests.) 381 382 def test_shared_ref_without_callback(self): 383 self.check_shared_without_callback(weakref.ref) 384 385 def test_shared_proxy_without_callback(self): 386 self.check_shared_without_callback(weakref.proxy) 387 388 def check_shared_without_callback(self, makeref): 389 o = Object(1) 390 p1 = makeref(o, None) 391 p2 = makeref(o, None) 392 self.assertIs(p1, p2, "both callbacks were None in the C API") 393 del p1, p2 394 p1 = makeref(o) 395 p2 = makeref(o, None) 396 self.assertIs(p1, p2, "callbacks were NULL, None in the C API") 397 del p1, p2 398 p1 = makeref(o) 399 p2 = makeref(o) 400 self.assertIs(p1, p2, "both callbacks were NULL in the C API") 401 del p1, p2 402 p1 = makeref(o, None) 403 p2 = makeref(o) 404 self.assertIs(p1, p2, "callbacks were None, NULL in the C API") 405 406 def test_callable_proxy(self): 407 o = Callable() 408 ref1 = weakref.proxy(o) 409 410 self.check_proxy(o, ref1) 411 412 self.assertIs(type(ref1), weakref.CallableProxyType, 413 "proxy is not of callable type") 414 ref1('twinkies!') 415 self.assertEqual(o.bar, 'twinkies!', 416 "call through proxy not passed through to original") 417 ref1(x='Splat.') 418 self.assertEqual(o.bar, 'Splat.', 419 "call through proxy not passed through to original") 420 421 # expect due to too few args 422 self.assertRaises(TypeError, ref1) 423 424 # expect due to too many args 425 self.assertRaises(TypeError, ref1, 1, 2, 3) 426 427 def check_proxy(self, o, proxy): 428 o.foo = 1 429 self.assertEqual(proxy.foo, 1, 430 "proxy does not reflect attribute addition") 431 o.foo = 2 432 self.assertEqual(proxy.foo, 2, 433 "proxy does not reflect attribute modification") 434 del o.foo 435 self.assertFalse(hasattr(proxy, 'foo'), 436 "proxy does not reflect attribute removal") 437 438 proxy.foo = 1 439 self.assertEqual(o.foo, 1, 440 "object does not reflect attribute addition via proxy") 441 proxy.foo = 2 442 self.assertEqual(o.foo, 2, 443 "object does not reflect attribute modification via proxy") 444 del proxy.foo 445 self.assertFalse(hasattr(o, 'foo'), 446 "object does not reflect attribute removal via proxy") 447 448 def test_proxy_deletion(self): 449 # Test clearing of SF bug #762891 450 class Foo: 451 result = None 452 def __delitem__(self, accessor): 453 self.result = accessor 454 g = Foo() 455 f = weakref.proxy(g) 456 del f[0] 457 self.assertEqual(f.result, 0) 458 459 def test_proxy_bool(self): 460 # Test clearing of SF bug #1170766 461 class List(list): pass 462 lyst = List() 463 self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst)) 464 465 def test_proxy_iter(self): 466 # Test fails with a debug build of the interpreter 467 # (see bpo-38395). 468 469 obj = None 470 471 class MyObj: 472 def __iter__(self): 473 nonlocal obj 474 del obj 475 return NotImplemented 476 477 obj = MyObj() 478 p = weakref.proxy(obj) 479 with self.assertRaises(TypeError): 480 # "blech" in p calls MyObj.__iter__ through the proxy, 481 # without keeping a reference to the real object, so it 482 # can be killed in the middle of the call 483 "blech" in p 484 485 def test_proxy_next(self): 486 arr = [4, 5, 6] 487 def iterator_func(): 488 yield from arr 489 it = iterator_func() 490 491 class IteratesWeakly: 492 def __iter__(self): 493 return weakref.proxy(it) 494 495 weak_it = IteratesWeakly() 496 497 # Calls proxy.__next__ 498 self.assertEqual(list(weak_it), [4, 5, 6]) 499 500 def test_proxy_bad_next(self): 501 # bpo-44720: PyIter_Next() shouldn't be called if the reference 502 # isn't an iterator. 503 504 not_an_iterator = lambda: 0 505 506 class A: 507 def __iter__(self): 508 return weakref.proxy(not_an_iterator) 509 a = A() 510 511 msg = "Weakref proxy referenced a non-iterator" 512 with self.assertRaisesRegex(TypeError, msg): 513 list(a) 514 515 def test_proxy_reversed(self): 516 class MyObj: 517 def __len__(self): 518 return 3 519 def __reversed__(self): 520 return iter('cba') 521 522 obj = MyObj() 523 self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba") 524 525 def test_proxy_hash(self): 526 class MyObj: 527 def __hash__(self): 528 return 42 529 530 obj = MyObj() 531 with self.assertRaises(TypeError): 532 hash(weakref.proxy(obj)) 533 534 class MyObj: 535 __hash__ = None 536 537 obj = MyObj() 538 with self.assertRaises(TypeError): 539 hash(weakref.proxy(obj)) 540 541 def test_getweakrefcount(self): 542 o = C() 543 ref1 = weakref.ref(o) 544 ref2 = weakref.ref(o, self.callback) 545 self.assertEqual(weakref.getweakrefcount(o), 2, 546 "got wrong number of weak reference objects") 547 548 proxy1 = weakref.proxy(o) 549 proxy2 = weakref.proxy(o, self.callback) 550 self.assertEqual(weakref.getweakrefcount(o), 4, 551 "got wrong number of weak reference objects") 552 553 del ref1, ref2, proxy1, proxy2 554 gc_collect() # For PyPy or other GCs. 555 self.assertEqual(weakref.getweakrefcount(o), 0, 556 "weak reference objects not unlinked from" 557 " referent when discarded.") 558 559 # assumes ints do not support weakrefs 560 self.assertEqual(weakref.getweakrefcount(1), 0, 561 "got wrong number of weak reference objects for int") 562 563 def test_getweakrefs(self): 564 o = C() 565 ref1 = weakref.ref(o, self.callback) 566 ref2 = weakref.ref(o, self.callback) 567 del ref1 568 gc_collect() # For PyPy or other GCs. 569 self.assertEqual(weakref.getweakrefs(o), [ref2], 570 "list of refs does not match") 571 572 o = C() 573 ref1 = weakref.ref(o, self.callback) 574 ref2 = weakref.ref(o, self.callback) 575 del ref2 576 gc_collect() # For PyPy or other GCs. 577 self.assertEqual(weakref.getweakrefs(o), [ref1], 578 "list of refs does not match") 579 580 del ref1 581 gc_collect() # For PyPy or other GCs. 582 self.assertEqual(weakref.getweakrefs(o), [], 583 "list of refs not cleared") 584 585 # assumes ints do not support weakrefs 586 self.assertEqual(weakref.getweakrefs(1), [], 587 "list of refs does not match for int") 588 589 def test_newstyle_number_ops(self): 590 class F(float): 591 pass 592 f = F(2.0) 593 p = weakref.proxy(f) 594 self.assertEqual(p + 1.0, 3.0) 595 self.assertEqual(1.0 + p, 3.0) # this used to SEGV 596 597 def test_callbacks_protected(self): 598 # Callbacks protected from already-set exceptions? 599 # Regression test for SF bug #478534. 600 class BogusError(Exception): 601 pass 602 data = {} 603 def remove(k): 604 del data[k] 605 def encapsulate(): 606 f = lambda : () 607 data[weakref.ref(f, remove)] = None 608 raise BogusError 609 try: 610 encapsulate() 611 except BogusError: 612 pass 613 else: 614 self.fail("exception not properly restored") 615 try: 616 encapsulate() 617 except BogusError: 618 pass 619 else: 620 self.fail("exception not properly restored") 621 622 def test_sf_bug_840829(self): 623 # "weakref callbacks and gc corrupt memory" 624 # subtype_dealloc erroneously exposed a new-style instance 625 # already in the process of getting deallocated to gc, 626 # causing double-deallocation if the instance had a weakref 627 # callback that triggered gc. 628 # If the bug exists, there probably won't be an obvious symptom 629 # in a release build. In a debug build, a segfault will occur 630 # when the second attempt to remove the instance from the "list 631 # of all objects" occurs. 632 633 import gc 634 635 class C(object): 636 pass 637 638 c = C() 639 wr = weakref.ref(c, lambda ignore: gc.collect()) 640 del c 641 642 # There endeth the first part. It gets worse. 643 del wr 644 645 c1 = C() 646 c1.i = C() 647 wr = weakref.ref(c1.i, lambda ignore: gc.collect()) 648 649 c2 = C() 650 c2.c1 = c1 651 del c1 # still alive because c2 points to it 652 653 # Now when subtype_dealloc gets called on c2, it's not enough just 654 # that c2 is immune from gc while the weakref callbacks associated 655 # with c2 execute (there are none in this 2nd half of the test, btw). 656 # subtype_dealloc goes on to call the base classes' deallocs too, 657 # so any gc triggered by weakref callbacks associated with anything 658 # torn down by a base class dealloc can also trigger double 659 # deallocation of c2. 660 del c2 661 662 @suppress_immortalization() 663 def test_callback_in_cycle(self): 664 import gc 665 666 class J(object): 667 pass 668 669 class II(object): 670 def acallback(self, ignore): 671 self.J 672 673 I = II() 674 I.J = J 675 I.wr = weakref.ref(J, I.acallback) 676 677 # Now J and II are each in a self-cycle (as all new-style class 678 # objects are, since their __mro__ points back to them). I holds 679 # both a weak reference (I.wr) and a strong reference (I.J) to class 680 # J. I is also in a cycle (I.wr points to a weakref that references 681 # I.acallback). When we del these three, they all become trash, but 682 # the cycles prevent any of them from getting cleaned up immediately. 683 # Instead they have to wait for cyclic gc to deduce that they're 684 # trash. 685 # 686 # gc used to call tp_clear on all of them, and the order in which 687 # it does that is pretty accidental. The exact order in which we 688 # built up these things manages to provoke gc into running tp_clear 689 # in just the right order (I last). Calling tp_clear on II leaves 690 # behind an insane class object (its __mro__ becomes NULL). Calling 691 # tp_clear on J breaks its self-cycle, but J doesn't get deleted 692 # just then because of the strong reference from I.J. Calling 693 # tp_clear on I starts to clear I's __dict__, and just happens to 694 # clear I.J first -- I.wr is still intact. That removes the last 695 # reference to J, which triggers the weakref callback. The callback 696 # tries to do "self.J", and instances of new-style classes look up 697 # attributes ("J") in the class dict first. The class (II) wants to 698 # search II.__mro__, but that's NULL. The result was a segfault in 699 # a release build, and an assert failure in a debug build. 700 del I, J, II 701 gc.collect() 702 703 def test_callback_reachable_one_way(self): 704 import gc 705 706 # This one broke the first patch that fixed the previous test. In this case, 707 # the objects reachable from the callback aren't also reachable 708 # from the object (c1) *triggering* the callback: you can get to 709 # c1 from c2, but not vice-versa. The result was that c2's __dict__ 710 # got tp_clear'ed by the time the c2.cb callback got invoked. 711 712 class C: 713 def cb(self, ignore): 714 self.me 715 self.c1 716 self.wr 717 718 c1, c2 = C(), C() 719 720 c2.me = c2 721 c2.c1 = c1 722 c2.wr = weakref.ref(c1, c2.cb) 723 724 del c1, c2 725 gc.collect() 726 727 def test_callback_different_classes(self): 728 import gc 729 730 # Like test_callback_reachable_one_way, except c2 and c1 have different 731 # classes. c2's class (C) isn't reachable from c1 then, so protecting 732 # objects reachable from the dying object (c1) isn't enough to stop 733 # c2's class (C) from getting tp_clear'ed before c2.cb is invoked. 734 # The result was a segfault (C.__mro__ was NULL when the callback 735 # tried to look up self.me). 736 737 class C(object): 738 def cb(self, ignore): 739 self.me 740 self.c1 741 self.wr 742 743 class D: 744 pass 745 746 c1, c2 = D(), C() 747 748 c2.me = c2 749 c2.c1 = c1 750 c2.wr = weakref.ref(c1, c2.cb) 751 752 del c1, c2, C, D 753 gc.collect() 754 755 @suppress_immortalization() 756 def test_callback_in_cycle_resurrection(self): 757 import gc 758 759 # Do something nasty in a weakref callback: resurrect objects 760 # from dead cycles. For this to be attempted, the weakref and 761 # its callback must also be part of the cyclic trash (else the 762 # objects reachable via the callback couldn't be in cyclic trash 763 # to begin with -- the callback would act like an external root). 764 # But gc clears trash weakrefs with callbacks early now, which 765 # disables the callbacks, so the callbacks shouldn't get called 766 # at all (and so nothing actually gets resurrected). 767 768 alist = [] 769 class C(object): 770 def __init__(self, value): 771 self.attribute = value 772 773 def acallback(self, ignore): 774 alist.append(self.c) 775 776 c1, c2 = C(1), C(2) 777 c1.c = c2 778 c2.c = c1 779 c1.wr = weakref.ref(c2, c1.acallback) 780 c2.wr = weakref.ref(c1, c2.acallback) 781 782 def C_went_away(ignore): 783 alist.append("C went away") 784 wr = weakref.ref(C, C_went_away) 785 786 del c1, c2, C # make them all trash 787 self.assertEqual(alist, []) # del isn't enough to reclaim anything 788 789 gc.collect() 790 # c1.wr and c2.wr were part of the cyclic trash, so should have 791 # been cleared without their callbacks executing. OTOH, the weakref 792 # to C is bound to a function local (wr), and wasn't trash, so that 793 # callback should have been invoked when C went away. 794 self.assertEqual(alist, ["C went away"]) 795 # The remaining weakref should be dead now (its callback ran). 796 self.assertEqual(wr(), None) 797 798 del alist[:] 799 gc.collect() 800 self.assertEqual(alist, []) 801 802 def test_callbacks_on_callback(self): 803 import gc 804 805 # Set up weakref callbacks *on* weakref callbacks. 806 alist = [] 807 def safe_callback(ignore): 808 alist.append("safe_callback called") 809 810 class C(object): 811 def cb(self, ignore): 812 alist.append("cb called") 813 814 c, d = C(), C() 815 c.other = d 816 d.other = c 817 callback = c.cb 818 c.wr = weakref.ref(d, callback) # this won't trigger 819 d.wr = weakref.ref(callback, d.cb) # ditto 820 external_wr = weakref.ref(callback, safe_callback) # but this will 821 self.assertIs(external_wr(), callback) 822 823 # The weakrefs attached to c and d should get cleared, so that 824 # C.cb is never called. But external_wr isn't part of the cyclic 825 # trash, and no cyclic trash is reachable from it, so safe_callback 826 # should get invoked when the bound method object callback (c.cb) 827 # -- which is itself a callback, and also part of the cyclic trash -- 828 # gets reclaimed at the end of gc. 829 830 del callback, c, d, C 831 self.assertEqual(alist, []) # del isn't enough to clean up cycles 832 gc.collect() 833 self.assertEqual(alist, ["safe_callback called"]) 834 self.assertEqual(external_wr(), None) 835 836 del alist[:] 837 gc.collect() 838 self.assertEqual(alist, []) 839 840 def test_gc_during_ref_creation(self): 841 self.check_gc_during_creation(weakref.ref) 842 843 def test_gc_during_proxy_creation(self): 844 self.check_gc_during_creation(weakref.proxy) 845 846 def check_gc_during_creation(self, makeref): 847 thresholds = gc.get_threshold() 848 gc.set_threshold(1, 1, 1) 849 gc.collect() 850 class A: 851 pass 852 853 def callback(*args): 854 pass 855 856 referenced = A() 857 858 a = A() 859 a.a = a 860 a.wr = makeref(referenced) 861 862 try: 863 # now make sure the object and the ref get labeled as 864 # cyclic trash: 865 a = A() 866 weakref.ref(referenced, callback) 867 868 finally: 869 gc.set_threshold(*thresholds) 870 871 def test_ref_created_during_del(self): 872 # Bug #1377858 873 # A weakref created in an object's __del__() would crash the 874 # interpreter when the weakref was cleaned up since it would refer to 875 # non-existent memory. This test should not segfault the interpreter. 876 class Target(object): 877 def __del__(self): 878 global ref_from_del 879 ref_from_del = weakref.ref(self) 880 881 w = Target() 882 883 def test_init(self): 884 # Issue 3634 885 # <weakref to class>.__init__() doesn't check errors correctly 886 r = weakref.ref(Exception) 887 self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0) 888 # No exception should be raised here 889 gc.collect() 890 891 @suppress_immortalization() 892 def test_classes(self): 893 # Check that classes are weakrefable. 894 class A(object): 895 pass 896 l = [] 897 weakref.ref(int) 898 a = weakref.ref(A, l.append) 899 A = None 900 gc.collect() 901 self.assertEqual(a(), None) 902 self.assertEqual(l, [a]) 903 904 def test_equality(self): 905 # Alive weakrefs defer equality testing to their underlying object. 906 x = Object(1) 907 y = Object(1) 908 z = Object(2) 909 a = weakref.ref(x) 910 b = weakref.ref(y) 911 c = weakref.ref(z) 912 d = weakref.ref(x) 913 # Note how we directly test the operators here, to stress both 914 # __eq__ and __ne__. 915 self.assertTrue(a == b) 916 self.assertFalse(a != b) 917 self.assertFalse(a == c) 918 self.assertTrue(a != c) 919 self.assertTrue(a == d) 920 self.assertFalse(a != d) 921 self.assertFalse(a == x) 922 self.assertTrue(a != x) 923 self.assertTrue(a == ALWAYS_EQ) 924 self.assertFalse(a != ALWAYS_EQ) 925 del x, y, z 926 gc.collect() 927 for r in a, b, c: 928 # Sanity check 929 self.assertIs(r(), None) 930 # Dead weakrefs compare by identity: whether `a` and `d` are the 931 # same weakref object is an implementation detail, since they pointed 932 # to the same original object and didn't have a callback. 933 # (see issue #16453). 934 self.assertFalse(a == b) 935 self.assertTrue(a != b) 936 self.assertFalse(a == c) 937 self.assertTrue(a != c) 938 self.assertEqual(a == d, a is d) 939 self.assertEqual(a != d, a is not d) 940 941 def test_ordering(self): 942 # weakrefs cannot be ordered, even if the underlying objects can. 943 ops = [operator.lt, operator.gt, operator.le, operator.ge] 944 x = Object(1) 945 y = Object(1) 946 a = weakref.ref(x) 947 b = weakref.ref(y) 948 for op in ops: 949 self.assertRaises(TypeError, op, a, b) 950 # Same when dead. 951 del x, y 952 gc.collect() 953 for op in ops: 954 self.assertRaises(TypeError, op, a, b) 955 956 def test_hashing(self): 957 # Alive weakrefs hash the same as the underlying object 958 x = Object(42) 959 y = Object(42) 960 a = weakref.ref(x) 961 b = weakref.ref(y) 962 self.assertEqual(hash(a), hash(42)) 963 del x, y 964 gc.collect() 965 # Dead weakrefs: 966 # - retain their hash is they were hashed when alive; 967 # - otherwise, cannot be hashed. 968 self.assertEqual(hash(a), hash(42)) 969 self.assertRaises(TypeError, hash, b) 970 971 @unittest.skipIf(is_wasi and Py_DEBUG, "requires deep stack") 972 def test_trashcan_16602(self): 973 # Issue #16602: when a weakref's target was part of a long 974 # deallocation chain, the trashcan mechanism could delay clearing 975 # of the weakref and make the target object visible from outside 976 # code even though its refcount had dropped to 0. A crash ensued. 977 class C: 978 def __init__(self, parent): 979 if not parent: 980 return 981 wself = weakref.ref(self) 982 def cb(wparent): 983 o = wself() 984 self.wparent = weakref.ref(parent, cb) 985 986 d = weakref.WeakKeyDictionary() 987 root = c = C(None) 988 for n in range(100): 989 d[c] = c = C(c) 990 del root 991 gc.collect() 992 993 def test_callback_attribute(self): 994 x = Object(1) 995 callback = lambda ref: None 996 ref1 = weakref.ref(x, callback) 997 self.assertIs(ref1.__callback__, callback) 998 999 ref2 = weakref.ref(x) 1000 self.assertIsNone(ref2.__callback__) 1001 1002 def test_callback_attribute_after_deletion(self): 1003 x = Object(1) 1004 ref = weakref.ref(x, self.callback) 1005 self.assertIsNotNone(ref.__callback__) 1006 del x 1007 support.gc_collect() 1008 self.assertIsNone(ref.__callback__) 1009 1010 def test_set_callback_attribute(self): 1011 x = Object(1) 1012 callback = lambda ref: None 1013 ref1 = weakref.ref(x, callback) 1014 with self.assertRaises(AttributeError): 1015 ref1.__callback__ = lambda ref: None 1016 1017 def test_callback_gcs(self): 1018 class ObjectWithDel(Object): 1019 def __del__(self): pass 1020 x = ObjectWithDel(1) 1021 ref1 = weakref.ref(x, lambda ref: support.gc_collect()) 1022 del x 1023 support.gc_collect() 1024 1025 @support.cpython_only 1026 def test_no_memory_when_clearing(self): 1027 # gh-118331: Make sure we do not raise an exception from the destructor 1028 # when clearing weakrefs if allocating the intermediate tuple fails. 1029 code = textwrap.dedent(""" 1030 import _testcapi 1031 import weakref 1032 1033 class TestObj: 1034 pass 1035 1036 def callback(obj): 1037 pass 1038 1039 obj = TestObj() 1040 # The choice of 50 is arbitrary, but must be large enough to ensure 1041 # the allocation won't be serviced by the free list. 1042 wrs = [weakref.ref(obj, callback) for _ in range(50)] 1043 _testcapi.set_nomemory(0) 1044 del obj 1045 """).strip() 1046 res, _ = script_helper.run_python_until_end("-c", code) 1047 stderr = res.err.decode("ascii", "backslashreplace") 1048 self.assertNotRegex(stderr, "_Py_Dealloc: Deallocator of type 'TestObj'") 1049 1050 1051class SubclassableWeakrefTestCase(TestBase): 1052 1053 def test_subclass_refs(self): 1054 class MyRef(weakref.ref): 1055 def __init__(self, ob, callback=None, value=42): 1056 self.value = value 1057 super().__init__(ob, callback) 1058 def __call__(self): 1059 self.called = True 1060 return super().__call__() 1061 o = Object("foo") 1062 mr = MyRef(o, value=24) 1063 self.assertIs(mr(), o) 1064 self.assertTrue(mr.called) 1065 self.assertEqual(mr.value, 24) 1066 del o 1067 gc_collect() # For PyPy or other GCs. 1068 self.assertIsNone(mr()) 1069 self.assertTrue(mr.called) 1070 1071 def test_subclass_refs_dont_replace_standard_refs(self): 1072 class MyRef(weakref.ref): 1073 pass 1074 o = Object(42) 1075 r1 = MyRef(o) 1076 r2 = weakref.ref(o) 1077 self.assertIsNot(r1, r2) 1078 self.assertEqual(weakref.getweakrefs(o), [r2, r1]) 1079 self.assertEqual(weakref.getweakrefcount(o), 2) 1080 r3 = MyRef(o) 1081 self.assertEqual(weakref.getweakrefcount(o), 3) 1082 refs = weakref.getweakrefs(o) 1083 self.assertEqual(len(refs), 3) 1084 self.assertIs(r2, refs[0]) 1085 self.assertIn(r1, refs[1:]) 1086 self.assertIn(r3, refs[1:]) 1087 1088 def test_subclass_refs_dont_conflate_callbacks(self): 1089 class MyRef(weakref.ref): 1090 pass 1091 o = Object(42) 1092 r1 = MyRef(o, id) 1093 r2 = MyRef(o, str) 1094 self.assertIsNot(r1, r2) 1095 refs = weakref.getweakrefs(o) 1096 self.assertIn(r1, refs) 1097 self.assertIn(r2, refs) 1098 1099 def test_subclass_refs_with_slots(self): 1100 class MyRef(weakref.ref): 1101 __slots__ = "slot1", "slot2" 1102 def __new__(type, ob, callback, slot1, slot2): 1103 return weakref.ref.__new__(type, ob, callback) 1104 def __init__(self, ob, callback, slot1, slot2): 1105 self.slot1 = slot1 1106 self.slot2 = slot2 1107 def meth(self): 1108 return self.slot1 + self.slot2 1109 o = Object(42) 1110 r = MyRef(o, None, "abc", "def") 1111 self.assertEqual(r.slot1, "abc") 1112 self.assertEqual(r.slot2, "def") 1113 self.assertEqual(r.meth(), "abcdef") 1114 self.assertFalse(hasattr(r, "__dict__")) 1115 1116 def test_subclass_refs_with_cycle(self): 1117 """Confirm https://bugs.python.org/issue3100 is fixed.""" 1118 # An instance of a weakref subclass can have attributes. 1119 # If such a weakref holds the only strong reference to the object, 1120 # deleting the weakref will delete the object. In this case, 1121 # the callback must not be called, because the ref object is 1122 # being deleted. 1123 class MyRef(weakref.ref): 1124 pass 1125 1126 # Use a local callback, for "regrtest -R::" 1127 # to detect refcounting problems 1128 def callback(w): 1129 self.cbcalled += 1 1130 1131 o = C() 1132 r1 = MyRef(o, callback) 1133 r1.o = o 1134 del o 1135 1136 del r1 # Used to crash here 1137 1138 self.assertEqual(self.cbcalled, 0) 1139 1140 # Same test, with two weakrefs to the same object 1141 # (since code paths are different) 1142 o = C() 1143 r1 = MyRef(o, callback) 1144 r2 = MyRef(o, callback) 1145 r1.r = r2 1146 r2.o = o 1147 del o 1148 del r2 1149 1150 del r1 # Used to crash here 1151 1152 self.assertEqual(self.cbcalled, 0) 1153 1154 1155class WeakMethodTestCase(unittest.TestCase): 1156 1157 def _subclass(self): 1158 """Return an Object subclass overriding `some_method`.""" 1159 class C(Object): 1160 def some_method(self): 1161 return 6 1162 return C 1163 1164 def test_alive(self): 1165 o = Object(1) 1166 r = weakref.WeakMethod(o.some_method) 1167 self.assertIsInstance(r, weakref.ReferenceType) 1168 self.assertIsInstance(r(), type(o.some_method)) 1169 self.assertIs(r().__self__, o) 1170 self.assertIs(r().__func__, o.some_method.__func__) 1171 self.assertEqual(r()(), 4) 1172 1173 def test_object_dead(self): 1174 o = Object(1) 1175 r = weakref.WeakMethod(o.some_method) 1176 del o 1177 gc.collect() 1178 self.assertIs(r(), None) 1179 1180 def test_method_dead(self): 1181 C = self._subclass() 1182 o = C(1) 1183 r = weakref.WeakMethod(o.some_method) 1184 del C.some_method 1185 gc.collect() 1186 self.assertIs(r(), None) 1187 1188 def test_callback_when_object_dead(self): 1189 # Test callback behaviour when object dies first. 1190 C = self._subclass() 1191 calls = [] 1192 def cb(arg): 1193 calls.append(arg) 1194 o = C(1) 1195 r = weakref.WeakMethod(o.some_method, cb) 1196 del o 1197 gc.collect() 1198 self.assertEqual(calls, [r]) 1199 # Callback is only called once. 1200 C.some_method = Object.some_method 1201 gc.collect() 1202 self.assertEqual(calls, [r]) 1203 1204 def test_callback_when_method_dead(self): 1205 # Test callback behaviour when method dies first. 1206 C = self._subclass() 1207 calls = [] 1208 def cb(arg): 1209 calls.append(arg) 1210 o = C(1) 1211 r = weakref.WeakMethod(o.some_method, cb) 1212 del C.some_method 1213 gc.collect() 1214 self.assertEqual(calls, [r]) 1215 # Callback is only called once. 1216 del o 1217 gc.collect() 1218 self.assertEqual(calls, [r]) 1219 1220 @support.cpython_only 1221 def test_no_cycles(self): 1222 # A WeakMethod doesn't create any reference cycle to itself. 1223 o = Object(1) 1224 def cb(_): 1225 pass 1226 r = weakref.WeakMethod(o.some_method, cb) 1227 wr = weakref.ref(r) 1228 del r 1229 self.assertIs(wr(), None) 1230 1231 def test_equality(self): 1232 def _eq(a, b): 1233 self.assertTrue(a == b) 1234 self.assertFalse(a != b) 1235 def _ne(a, b): 1236 self.assertTrue(a != b) 1237 self.assertFalse(a == b) 1238 x = Object(1) 1239 y = Object(1) 1240 a = weakref.WeakMethod(x.some_method) 1241 b = weakref.WeakMethod(y.some_method) 1242 c = weakref.WeakMethod(x.other_method) 1243 d = weakref.WeakMethod(y.other_method) 1244 # Objects equal, same method 1245 _eq(a, b) 1246 _eq(c, d) 1247 # Objects equal, different method 1248 _ne(a, c) 1249 _ne(a, d) 1250 _ne(b, c) 1251 _ne(b, d) 1252 # Objects unequal, same or different method 1253 z = Object(2) 1254 e = weakref.WeakMethod(z.some_method) 1255 f = weakref.WeakMethod(z.other_method) 1256 _ne(a, e) 1257 _ne(a, f) 1258 _ne(b, e) 1259 _ne(b, f) 1260 # Compare with different types 1261 _ne(a, x.some_method) 1262 _eq(a, ALWAYS_EQ) 1263 del x, y, z 1264 gc.collect() 1265 # Dead WeakMethods compare by identity 1266 refs = a, b, c, d, e, f 1267 for q in refs: 1268 for r in refs: 1269 self.assertEqual(q == r, q is r) 1270 self.assertEqual(q != r, q is not r) 1271 1272 def test_hashing(self): 1273 # Alive WeakMethods are hashable if the underlying object is 1274 # hashable. 1275 x = Object(1) 1276 y = Object(1) 1277 a = weakref.WeakMethod(x.some_method) 1278 b = weakref.WeakMethod(y.some_method) 1279 c = weakref.WeakMethod(y.other_method) 1280 # Since WeakMethod objects are equal, the hashes should be equal. 1281 self.assertEqual(hash(a), hash(b)) 1282 ha = hash(a) 1283 # Dead WeakMethods retain their old hash value 1284 del x, y 1285 gc.collect() 1286 self.assertEqual(hash(a), ha) 1287 self.assertEqual(hash(b), ha) 1288 # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed. 1289 self.assertRaises(TypeError, hash, c) 1290 1291 1292class MappingTestCase(TestBase): 1293 1294 COUNT = 10 1295 1296 if support.check_sanitizer(thread=True) and support.Py_GIL_DISABLED: 1297 # Reduce iteration count to get acceptable latency 1298 NUM_THREADED_ITERATIONS = 1000 1299 else: 1300 NUM_THREADED_ITERATIONS = 100000 1301 1302 def check_len_cycles(self, dict_type, cons): 1303 N = 20 1304 items = [RefCycle() for i in range(N)] 1305 dct = dict_type(cons(o) for o in items) 1306 # Keep an iterator alive 1307 it = dct.items() 1308 try: 1309 next(it) 1310 except StopIteration: 1311 pass 1312 del items 1313 gc.collect() 1314 n1 = len(dct) 1315 del it 1316 gc.collect() 1317 n2 = len(dct) 1318 # one item may be kept alive inside the iterator 1319 self.assertIn(n1, (0, 1)) 1320 self.assertEqual(n2, 0) 1321 1322 def test_weak_keyed_len_cycles(self): 1323 self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1324 1325 def test_weak_valued_len_cycles(self): 1326 self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k)) 1327 1328 def check_len_race(self, dict_type, cons): 1329 # Extended sanity checks for len() in the face of cyclic collection 1330 self.addCleanup(gc.set_threshold, *gc.get_threshold()) 1331 for th in range(1, 100): 1332 N = 20 1333 gc.collect(0) 1334 gc.set_threshold(th, th, th) 1335 items = [RefCycle() for i in range(N)] 1336 dct = dict_type(cons(o) for o in items) 1337 del items 1338 # All items will be collected at next garbage collection pass 1339 it = dct.items() 1340 try: 1341 next(it) 1342 except StopIteration: 1343 pass 1344 n1 = len(dct) 1345 del it 1346 n2 = len(dct) 1347 self.assertGreaterEqual(n1, 0) 1348 self.assertLessEqual(n1, N) 1349 self.assertGreaterEqual(n2, 0) 1350 self.assertLessEqual(n2, n1) 1351 1352 def test_weak_keyed_len_race(self): 1353 self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1)) 1354 1355 def test_weak_valued_len_race(self): 1356 self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k)) 1357 1358 def test_weak_values(self): 1359 # 1360 # This exercises d.copy(), d.items(), d[], del d[], len(d). 1361 # 1362 dict, objects = self.make_weak_valued_dict() 1363 for o in objects: 1364 self.assertEqual(weakref.getweakrefcount(o), 1) 1365 self.assertIs(o, dict[o.arg], 1366 "wrong object returned by weak dict!") 1367 items1 = list(dict.items()) 1368 items2 = list(dict.copy().items()) 1369 items1.sort() 1370 items2.sort() 1371 self.assertEqual(items1, items2, 1372 "cloning of weak-valued dictionary did not work!") 1373 del items1, items2 1374 self.assertEqual(len(dict), self.COUNT) 1375 del objects[0] 1376 gc_collect() # For PyPy or other GCs. 1377 self.assertEqual(len(dict), self.COUNT - 1, 1378 "deleting object did not cause dictionary update") 1379 del objects, o 1380 gc_collect() # For PyPy or other GCs. 1381 self.assertEqual(len(dict), 0, 1382 "deleting the values did not clear the dictionary") 1383 # regression on SF bug #447152: 1384 dict = weakref.WeakValueDictionary() 1385 self.assertRaises(KeyError, dict.__getitem__, 1) 1386 dict[2] = C() 1387 gc_collect() # For PyPy or other GCs. 1388 self.assertRaises(KeyError, dict.__getitem__, 2) 1389 1390 def test_weak_keys(self): 1391 # 1392 # This exercises d.copy(), d.items(), d[] = v, d[], del d[], 1393 # len(d), k in d. 1394 # 1395 dict, objects = self.make_weak_keyed_dict() 1396 for o in objects: 1397 self.assertEqual(weakref.getweakrefcount(o), 1, 1398 "wrong number of weak references to %r!" % o) 1399 self.assertIs(o.arg, dict[o], 1400 "wrong object returned by weak dict!") 1401 items1 = dict.items() 1402 items2 = dict.copy().items() 1403 self.assertEqual(set(items1), set(items2), 1404 "cloning of weak-keyed dictionary did not work!") 1405 del items1, items2 1406 self.assertEqual(len(dict), self.COUNT) 1407 del objects[0] 1408 gc_collect() # For PyPy or other GCs. 1409 self.assertEqual(len(dict), (self.COUNT - 1), 1410 "deleting object did not cause dictionary update") 1411 del objects, o 1412 gc_collect() # For PyPy or other GCs. 1413 self.assertEqual(len(dict), 0, 1414 "deleting the keys did not clear the dictionary") 1415 o = Object(42) 1416 dict[o] = "What is the meaning of the universe?" 1417 self.assertIn(o, dict) 1418 self.assertNotIn(34, dict) 1419 1420 def test_weak_keyed_iters(self): 1421 dict, objects = self.make_weak_keyed_dict() 1422 self.check_iters(dict) 1423 1424 # Test keyrefs() 1425 refs = dict.keyrefs() 1426 self.assertEqual(len(refs), len(objects)) 1427 objects2 = list(objects) 1428 for wr in refs: 1429 ob = wr() 1430 self.assertIn(ob, dict) 1431 self.assertIn(ob, dict) 1432 self.assertEqual(ob.arg, dict[ob]) 1433 objects2.remove(ob) 1434 self.assertEqual(len(objects2), 0) 1435 1436 # Test iterkeyrefs() 1437 objects2 = list(objects) 1438 self.assertEqual(len(list(dict.keyrefs())), len(objects)) 1439 for wr in dict.keyrefs(): 1440 ob = wr() 1441 self.assertIn(ob, dict) 1442 self.assertIn(ob, dict) 1443 self.assertEqual(ob.arg, dict[ob]) 1444 objects2.remove(ob) 1445 self.assertEqual(len(objects2), 0) 1446 1447 def test_weak_valued_iters(self): 1448 dict, objects = self.make_weak_valued_dict() 1449 self.check_iters(dict) 1450 1451 # Test valuerefs() 1452 refs = dict.valuerefs() 1453 self.assertEqual(len(refs), len(objects)) 1454 objects2 = list(objects) 1455 for wr in refs: 1456 ob = wr() 1457 self.assertEqual(ob, dict[ob.arg]) 1458 self.assertEqual(ob.arg, dict[ob.arg].arg) 1459 objects2.remove(ob) 1460 self.assertEqual(len(objects2), 0) 1461 1462 # Test itervaluerefs() 1463 objects2 = list(objects) 1464 self.assertEqual(len(list(dict.itervaluerefs())), len(objects)) 1465 for wr in dict.itervaluerefs(): 1466 ob = wr() 1467 self.assertEqual(ob, dict[ob.arg]) 1468 self.assertEqual(ob.arg, dict[ob.arg].arg) 1469 objects2.remove(ob) 1470 self.assertEqual(len(objects2), 0) 1471 1472 def check_iters(self, dict): 1473 # item iterator: 1474 items = list(dict.items()) 1475 for item in dict.items(): 1476 items.remove(item) 1477 self.assertFalse(items, "items() did not touch all items") 1478 1479 # key iterator, via __iter__(): 1480 keys = list(dict.keys()) 1481 for k in dict: 1482 keys.remove(k) 1483 self.assertFalse(keys, "__iter__() did not touch all keys") 1484 1485 # key iterator, via iterkeys(): 1486 keys = list(dict.keys()) 1487 for k in dict.keys(): 1488 keys.remove(k) 1489 self.assertFalse(keys, "iterkeys() did not touch all keys") 1490 1491 # value iterator: 1492 values = list(dict.values()) 1493 for v in dict.values(): 1494 values.remove(v) 1495 self.assertFalse(values, 1496 "itervalues() did not touch all values") 1497 1498 def check_weak_destroy_while_iterating(self, dict, objects, iter_name): 1499 n = len(dict) 1500 it = iter(getattr(dict, iter_name)()) 1501 next(it) # Trigger internal iteration 1502 # Destroy an object 1503 del objects[-1] 1504 gc.collect() # just in case 1505 # We have removed either the first consumed object, or another one 1506 self.assertIn(len(list(it)), [len(objects), len(objects) - 1]) 1507 del it 1508 # The removal has been committed 1509 self.assertEqual(len(dict), n - 1) 1510 1511 def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext): 1512 # Check that we can explicitly mutate the weak dict without 1513 # interfering with delayed removal. 1514 # `testcontext` should create an iterator, destroy one of the 1515 # weakref'ed objects and then return a new key/value pair corresponding 1516 # to the destroyed object. 1517 with testcontext() as (k, v): 1518 self.assertNotIn(k, dict) 1519 with testcontext() as (k, v): 1520 self.assertRaises(KeyError, dict.__delitem__, k) 1521 self.assertNotIn(k, dict) 1522 with testcontext() as (k, v): 1523 self.assertRaises(KeyError, dict.pop, k) 1524 self.assertNotIn(k, dict) 1525 with testcontext() as (k, v): 1526 dict[k] = v 1527 self.assertEqual(dict[k], v) 1528 ddict = copy.copy(dict) 1529 with testcontext() as (k, v): 1530 dict.update(ddict) 1531 self.assertEqual(dict, ddict) 1532 with testcontext() as (k, v): 1533 dict.clear() 1534 self.assertEqual(len(dict), 0) 1535 1536 def check_weak_del_and_len_while_iterating(self, dict, testcontext): 1537 # Check that len() works when both iterating and removing keys 1538 # explicitly through various means (.pop(), .clear()...), while 1539 # implicit mutation is deferred because an iterator is alive. 1540 # (each call to testcontext() should schedule one item for removal 1541 # for this test to work properly) 1542 o = Object(123456) 1543 with testcontext(): 1544 n = len(dict) 1545 # Since underlying dict is ordered, first item is popped 1546 dict.pop(next(dict.keys())) 1547 self.assertEqual(len(dict), n - 1) 1548 dict[o] = o 1549 self.assertEqual(len(dict), n) 1550 # last item in objects is removed from dict in context shutdown 1551 with testcontext(): 1552 self.assertEqual(len(dict), n - 1) 1553 # Then, (o, o) is popped 1554 dict.popitem() 1555 self.assertEqual(len(dict), n - 2) 1556 with testcontext(): 1557 self.assertEqual(len(dict), n - 3) 1558 del dict[next(dict.keys())] 1559 self.assertEqual(len(dict), n - 4) 1560 with testcontext(): 1561 self.assertEqual(len(dict), n - 5) 1562 dict.popitem() 1563 self.assertEqual(len(dict), n - 6) 1564 with testcontext(): 1565 dict.clear() 1566 self.assertEqual(len(dict), 0) 1567 self.assertEqual(len(dict), 0) 1568 1569 def test_weak_keys_destroy_while_iterating(self): 1570 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1571 dict, objects = self.make_weak_keyed_dict() 1572 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1573 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1574 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1575 self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs') 1576 dict, objects = self.make_weak_keyed_dict() 1577 @contextlib.contextmanager 1578 def testcontext(): 1579 try: 1580 it = iter(dict.items()) 1581 next(it) 1582 # Schedule a key/value for removal and recreate it 1583 v = objects.pop().arg 1584 gc.collect() # just in case 1585 yield Object(v), v 1586 finally: 1587 it = None # should commit all removals 1588 gc.collect() 1589 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1590 # Issue #21173: len() fragile when keys are both implicitly and 1591 # explicitly removed. 1592 dict, objects = self.make_weak_keyed_dict() 1593 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1594 1595 def test_weak_values_destroy_while_iterating(self): 1596 # Issue #7105: iterators shouldn't crash when a key is implicitly removed 1597 dict, objects = self.make_weak_valued_dict() 1598 self.check_weak_destroy_while_iterating(dict, objects, 'keys') 1599 self.check_weak_destroy_while_iterating(dict, objects, 'items') 1600 self.check_weak_destroy_while_iterating(dict, objects, 'values') 1601 self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs') 1602 self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs') 1603 dict, objects = self.make_weak_valued_dict() 1604 @contextlib.contextmanager 1605 def testcontext(): 1606 try: 1607 it = iter(dict.items()) 1608 next(it) 1609 # Schedule a key/value for removal and recreate it 1610 k = objects.pop().arg 1611 gc.collect() # just in case 1612 yield k, Object(k) 1613 finally: 1614 it = None # should commit all removals 1615 gc.collect() 1616 self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext) 1617 dict, objects = self.make_weak_valued_dict() 1618 self.check_weak_del_and_len_while_iterating(dict, testcontext) 1619 1620 def test_make_weak_keyed_dict_from_dict(self): 1621 o = Object(3) 1622 dict = weakref.WeakKeyDictionary({o:364}) 1623 self.assertEqual(dict[o], 364) 1624 1625 def test_make_weak_keyed_dict_from_weak_keyed_dict(self): 1626 o = Object(3) 1627 dict = weakref.WeakKeyDictionary({o:364}) 1628 dict2 = weakref.WeakKeyDictionary(dict) 1629 self.assertEqual(dict[o], 364) 1630 1631 def make_weak_keyed_dict(self): 1632 dict = weakref.WeakKeyDictionary() 1633 objects = list(map(Object, range(self.COUNT))) 1634 for o in objects: 1635 dict[o] = o.arg 1636 return dict, objects 1637 1638 def test_make_weak_valued_dict_from_dict(self): 1639 o = Object(3) 1640 dict = weakref.WeakValueDictionary({364:o}) 1641 self.assertEqual(dict[364], o) 1642 1643 def test_make_weak_valued_dict_from_weak_valued_dict(self): 1644 o = Object(3) 1645 dict = weakref.WeakValueDictionary({364:o}) 1646 dict2 = weakref.WeakValueDictionary(dict) 1647 self.assertEqual(dict[364], o) 1648 1649 def test_make_weak_valued_dict_misc(self): 1650 # errors 1651 self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__) 1652 self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {}) 1653 self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ()) 1654 # special keyword arguments 1655 o = Object(3) 1656 for kw in 'self', 'dict', 'other', 'iterable': 1657 d = weakref.WeakValueDictionary(**{kw: o}) 1658 self.assertEqual(list(d.keys()), [kw]) 1659 self.assertEqual(d[kw], o) 1660 1661 def make_weak_valued_dict(self): 1662 dict = weakref.WeakValueDictionary() 1663 objects = list(map(Object, range(self.COUNT))) 1664 for o in objects: 1665 dict[o.arg] = o 1666 return dict, objects 1667 1668 def check_popitem(self, klass, key1, value1, key2, value2): 1669 weakdict = klass() 1670 weakdict[key1] = value1 1671 weakdict[key2] = value2 1672 self.assertEqual(len(weakdict), 2) 1673 k, v = weakdict.popitem() 1674 self.assertEqual(len(weakdict), 1) 1675 if k is key1: 1676 self.assertIs(v, value1) 1677 else: 1678 self.assertIs(v, value2) 1679 k, v = weakdict.popitem() 1680 self.assertEqual(len(weakdict), 0) 1681 if k is key1: 1682 self.assertIs(v, value1) 1683 else: 1684 self.assertIs(v, value2) 1685 1686 def test_weak_valued_dict_popitem(self): 1687 self.check_popitem(weakref.WeakValueDictionary, 1688 "key1", C(), "key2", C()) 1689 1690 def test_weak_keyed_dict_popitem(self): 1691 self.check_popitem(weakref.WeakKeyDictionary, 1692 C(), "value 1", C(), "value 2") 1693 1694 def check_setdefault(self, klass, key, value1, value2): 1695 self.assertIsNot(value1, value2, 1696 "invalid test" 1697 " -- value parameters must be distinct objects") 1698 weakdict = klass() 1699 o = weakdict.setdefault(key, value1) 1700 self.assertIs(o, value1) 1701 self.assertIn(key, weakdict) 1702 self.assertIs(weakdict.get(key), value1) 1703 self.assertIs(weakdict[key], value1) 1704 1705 o = weakdict.setdefault(key, value2) 1706 self.assertIs(o, value1) 1707 self.assertIn(key, weakdict) 1708 self.assertIs(weakdict.get(key), value1) 1709 self.assertIs(weakdict[key], value1) 1710 1711 def test_weak_valued_dict_setdefault(self): 1712 self.check_setdefault(weakref.WeakValueDictionary, 1713 "key", C(), C()) 1714 1715 def test_weak_keyed_dict_setdefault(self): 1716 self.check_setdefault(weakref.WeakKeyDictionary, 1717 C(), "value 1", "value 2") 1718 1719 def check_update(self, klass, dict): 1720 # 1721 # This exercises d.update(), len(d), d.keys(), k in d, 1722 # d.get(), d[]. 1723 # 1724 weakdict = klass() 1725 weakdict.update(dict) 1726 self.assertEqual(len(weakdict), len(dict)) 1727 for k in weakdict.keys(): 1728 self.assertIn(k, dict, "mysterious new key appeared in weak dict") 1729 v = dict.get(k) 1730 self.assertIs(v, weakdict[k]) 1731 self.assertIs(v, weakdict.get(k)) 1732 for k in dict.keys(): 1733 self.assertIn(k, weakdict, "original key disappeared in weak dict") 1734 v = dict[k] 1735 self.assertIs(v, weakdict[k]) 1736 self.assertIs(v, weakdict.get(k)) 1737 1738 def test_weak_valued_dict_update(self): 1739 self.check_update(weakref.WeakValueDictionary, 1740 {1: C(), 'a': C(), C(): C()}) 1741 # errors 1742 self.assertRaises(TypeError, weakref.WeakValueDictionary.update) 1743 d = weakref.WeakValueDictionary() 1744 self.assertRaises(TypeError, d.update, {}, {}) 1745 self.assertRaises(TypeError, d.update, (), ()) 1746 self.assertEqual(list(d.keys()), []) 1747 # special keyword arguments 1748 o = Object(3) 1749 for kw in 'self', 'dict', 'other', 'iterable': 1750 d = weakref.WeakValueDictionary() 1751 d.update(**{kw: o}) 1752 self.assertEqual(list(d.keys()), [kw]) 1753 self.assertEqual(d[kw], o) 1754 1755 def test_weak_valued_union_operators(self): 1756 a = C() 1757 b = C() 1758 c = C() 1759 wvd1 = weakref.WeakValueDictionary({1: a}) 1760 wvd2 = weakref.WeakValueDictionary({1: b, 2: a}) 1761 wvd3 = wvd1.copy() 1762 d1 = {1: c, 3: b} 1763 pairs = [(5, c), (6, b)] 1764 1765 tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries 1766 self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2)) 1767 self.assertIs(type(tmp1), weakref.WeakValueDictionary) 1768 wvd1 |= wvd2 1769 self.assertEqual(wvd1, tmp1) 1770 1771 tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping 1772 self.assertEqual(dict(tmp2), dict(wvd2) | d1) 1773 self.assertIs(type(tmp2), weakref.WeakValueDictionary) 1774 wvd2 |= d1 1775 self.assertEqual(wvd2, tmp2) 1776 1777 tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value 1778 tmp3 |= pairs 1779 self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs)) 1780 self.assertIs(type(tmp3), weakref.WeakValueDictionary) 1781 1782 tmp4 = d1 | wvd3 # Testing .__ror__ 1783 self.assertEqual(dict(tmp4), d1 | dict(wvd3)) 1784 self.assertIs(type(tmp4), weakref.WeakValueDictionary) 1785 1786 del a 1787 self.assertNotIn(2, tmp1) 1788 self.assertNotIn(2, tmp2) 1789 self.assertNotIn(1, tmp3) 1790 self.assertNotIn(1, tmp4) 1791 1792 def test_weak_keyed_dict_update(self): 1793 self.check_update(weakref.WeakKeyDictionary, 1794 {C(): 1, C(): 2, C(): 3}) 1795 1796 def test_weak_keyed_delitem(self): 1797 d = weakref.WeakKeyDictionary() 1798 o1 = Object('1') 1799 o2 = Object('2') 1800 d[o1] = 'something' 1801 d[o2] = 'something' 1802 self.assertEqual(len(d), 2) 1803 del d[o1] 1804 self.assertEqual(len(d), 1) 1805 self.assertEqual(list(d.keys()), [o2]) 1806 1807 def test_weak_keyed_union_operators(self): 1808 o1 = C() 1809 o2 = C() 1810 o3 = C() 1811 wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2}) 1812 wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4}) 1813 wkd3 = wkd1.copy() 1814 d1 = {o2: '5', o3: '6'} 1815 pairs = [(o2, 7), (o3, 8)] 1816 1817 tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries 1818 self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2)) 1819 self.assertIs(type(tmp1), weakref.WeakKeyDictionary) 1820 wkd1 |= wkd2 1821 self.assertEqual(wkd1, tmp1) 1822 1823 tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping 1824 self.assertEqual(dict(tmp2), dict(wkd2) | d1) 1825 self.assertIs(type(tmp2), weakref.WeakKeyDictionary) 1826 wkd2 |= d1 1827 self.assertEqual(wkd2, tmp2) 1828 1829 tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value 1830 tmp3 |= pairs 1831 self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs)) 1832 self.assertIs(type(tmp3), weakref.WeakKeyDictionary) 1833 1834 tmp4 = d1 | wkd3 # Testing .__ror__ 1835 self.assertEqual(dict(tmp4), d1 | dict(wkd3)) 1836 self.assertIs(type(tmp4), weakref.WeakKeyDictionary) 1837 1838 del o1 1839 self.assertNotIn(4, tmp1.values()) 1840 self.assertNotIn(4, tmp2.values()) 1841 self.assertNotIn(1, tmp3.values()) 1842 self.assertNotIn(1, tmp4.values()) 1843 1844 def test_weak_valued_delitem(self): 1845 d = weakref.WeakValueDictionary() 1846 o1 = Object('1') 1847 o2 = Object('2') 1848 d['something'] = o1 1849 d['something else'] = o2 1850 self.assertEqual(len(d), 2) 1851 del d['something'] 1852 self.assertEqual(len(d), 1) 1853 self.assertEqual(list(d.items()), [('something else', o2)]) 1854 1855 def test_weak_keyed_bad_delitem(self): 1856 d = weakref.WeakKeyDictionary() 1857 o = Object('1') 1858 # An attempt to delete an object that isn't there should raise 1859 # KeyError. It didn't before 2.3. 1860 self.assertRaises(KeyError, d.__delitem__, o) 1861 self.assertRaises(KeyError, d.__getitem__, o) 1862 1863 # If a key isn't of a weakly referencable type, __getitem__ and 1864 # __setitem__ raise TypeError. __delitem__ should too. 1865 self.assertRaises(TypeError, d.__delitem__, 13) 1866 self.assertRaises(TypeError, d.__getitem__, 13) 1867 self.assertRaises(TypeError, d.__setitem__, 13, 13) 1868 1869 def test_weak_keyed_cascading_deletes(self): 1870 # SF bug 742860. For some reason, before 2.3 __delitem__ iterated 1871 # over the keys via self.data.iterkeys(). If things vanished from 1872 # the dict during this (or got added), that caused a RuntimeError. 1873 1874 d = weakref.WeakKeyDictionary() 1875 mutate = False 1876 1877 class C(object): 1878 def __init__(self, i): 1879 self.value = i 1880 def __hash__(self): 1881 return hash(self.value) 1882 def __eq__(self, other): 1883 if mutate: 1884 # Side effect that mutates the dict, by removing the 1885 # last strong reference to a key. 1886 del objs[-1] 1887 return self.value == other.value 1888 1889 objs = [C(i) for i in range(4)] 1890 for o in objs: 1891 d[o] = o.value 1892 del o # now the only strong references to keys are in objs 1893 # Find the order in which iterkeys sees the keys. 1894 objs = list(d.keys()) 1895 # Reverse it, so that the iteration implementation of __delitem__ 1896 # has to keep looping to find the first object we delete. 1897 objs.reverse() 1898 1899 # Turn on mutation in C.__eq__. The first time through the loop, 1900 # under the iterkeys() business the first comparison will delete 1901 # the last item iterkeys() would see, and that causes a 1902 # RuntimeError: dictionary changed size during iteration 1903 # when the iterkeys() loop goes around to try comparing the next 1904 # key. After this was fixed, it just deletes the last object *our* 1905 # "for o in obj" loop would have gotten to. 1906 mutate = True 1907 count = 0 1908 for o in objs: 1909 count += 1 1910 del d[o] 1911 gc_collect() # For PyPy or other GCs. 1912 self.assertEqual(len(d), 0) 1913 self.assertEqual(count, 2) 1914 1915 def test_make_weak_valued_dict_repr(self): 1916 dict = weakref.WeakValueDictionary() 1917 self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>') 1918 1919 def test_make_weak_keyed_dict_repr(self): 1920 dict = weakref.WeakKeyDictionary() 1921 self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>') 1922 1923 @threading_helper.requires_working_threading() 1924 def test_threaded_weak_valued_setdefault(self): 1925 d = weakref.WeakValueDictionary() 1926 with collect_in_thread(): 1927 for i in range(self.NUM_THREADED_ITERATIONS): 1928 x = d.setdefault(10, RefCycle()) 1929 self.assertIsNot(x, None) # we never put None in there! 1930 del x 1931 1932 @threading_helper.requires_working_threading() 1933 def test_threaded_weak_valued_pop(self): 1934 d = weakref.WeakValueDictionary() 1935 with collect_in_thread(): 1936 for i in range(self.NUM_THREADED_ITERATIONS): 1937 d[10] = RefCycle() 1938 x = d.pop(10, 10) 1939 self.assertIsNot(x, None) # we never put None in there! 1940 1941 @threading_helper.requires_working_threading() 1942 def test_threaded_weak_valued_consistency(self): 1943 # Issue #28427: old keys should not remove new values from 1944 # WeakValueDictionary when collecting from another thread. 1945 d = weakref.WeakValueDictionary() 1946 with collect_in_thread(): 1947 for i in range(2 * self.NUM_THREADED_ITERATIONS): 1948 o = RefCycle() 1949 d[10] = o 1950 # o is still alive, so the dict can't be empty 1951 self.assertEqual(len(d), 1) 1952 o = None # lose ref 1953 1954 @support.cpython_only 1955 def test_weak_valued_consistency(self): 1956 # A single-threaded, deterministic repro for issue #28427: old keys 1957 # should not remove new values from WeakValueDictionary. This relies on 1958 # an implementation detail of CPython's WeakValueDictionary (its 1959 # underlying dictionary of KeyedRefs) to reproduce the issue. 1960 d = weakref.WeakValueDictionary() 1961 with support.disable_gc(): 1962 d[10] = RefCycle() 1963 # Keep the KeyedRef alive after it's replaced so that GC will invoke 1964 # the callback. 1965 wr = d.data[10] 1966 # Replace the value with something that isn't cyclic garbage 1967 o = RefCycle() 1968 d[10] = o 1969 # Trigger GC, which will invoke the callback for `wr` 1970 gc.collect() 1971 self.assertEqual(len(d), 1) 1972 1973 def check_threaded_weak_dict_copy(self, type_, deepcopy): 1974 # `type_` should be either WeakKeyDictionary or WeakValueDictionary. 1975 # `deepcopy` should be either True or False. 1976 exc = [] 1977 1978 class DummyKey: 1979 def __init__(self, ctr): 1980 self.ctr = ctr 1981 1982 class DummyValue: 1983 def __init__(self, ctr): 1984 self.ctr = ctr 1985 1986 def dict_copy(d, exc): 1987 try: 1988 if deepcopy is True: 1989 _ = copy.deepcopy(d) 1990 else: 1991 _ = d.copy() 1992 except Exception as ex: 1993 exc.append(ex) 1994 1995 def pop_and_collect(lst): 1996 gc_ctr = 0 1997 while lst: 1998 i = random.randint(0, len(lst) - 1) 1999 gc_ctr += 1 2000 lst.pop(i) 2001 if gc_ctr % 10000 == 0: 2002 gc.collect() # just in case 2003 2004 self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary)) 2005 2006 d = type_() 2007 keys = [] 2008 values = [] 2009 # Initialize d with many entries 2010 for i in range(70000): 2011 k, v = DummyKey(i), DummyValue(i) 2012 keys.append(k) 2013 values.append(v) 2014 d[k] = v 2015 del k 2016 del v 2017 2018 t_copy = threading.Thread(target=dict_copy, args=(d, exc,)) 2019 if type_ is weakref.WeakKeyDictionary: 2020 t_collect = threading.Thread(target=pop_and_collect, args=(keys,)) 2021 else: # weakref.WeakValueDictionary 2022 t_collect = threading.Thread(target=pop_and_collect, args=(values,)) 2023 2024 t_copy.start() 2025 t_collect.start() 2026 2027 t_copy.join() 2028 t_collect.join() 2029 2030 # Test exceptions 2031 if exc: 2032 raise exc[0] 2033 2034 @threading_helper.requires_working_threading() 2035 def test_threaded_weak_key_dict_copy(self): 2036 # Issue #35615: Weakref keys or values getting GC'ed during dict 2037 # copying should not result in a crash. 2038 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False) 2039 2040 @threading_helper.requires_working_threading() 2041 @support.requires_resource('cpu') 2042 def test_threaded_weak_key_dict_deepcopy(self): 2043 # Issue #35615: Weakref keys or values getting GC'ed during dict 2044 # copying should not result in a crash. 2045 self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True) 2046 2047 @threading_helper.requires_working_threading() 2048 def test_threaded_weak_value_dict_copy(self): 2049 # Issue #35615: Weakref keys or values getting GC'ed during dict 2050 # copying should not result in a crash. 2051 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False) 2052 2053 @threading_helper.requires_working_threading() 2054 @support.requires_resource('cpu') 2055 def test_threaded_weak_value_dict_deepcopy(self): 2056 # Issue #35615: Weakref keys or values getting GC'ed during dict 2057 # copying should not result in a crash. 2058 self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True) 2059 2060 @support.cpython_only 2061 def test_remove_closure(self): 2062 d = weakref.WeakValueDictionary() 2063 self.assertIsNone(d._remove.__closure__) 2064 2065 2066from test import mapping_tests 2067 2068class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 2069 """Check that WeakValueDictionary conforms to the mapping protocol""" 2070 __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)} 2071 type2test = weakref.WeakValueDictionary 2072 def _reference(self): 2073 return self.__ref.copy() 2074 2075class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol): 2076 """Check that WeakKeyDictionary conforms to the mapping protocol""" 2077 __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3} 2078 type2test = weakref.WeakKeyDictionary 2079 def _reference(self): 2080 return self.__ref.copy() 2081 2082 2083class FinalizeTestCase(unittest.TestCase): 2084 2085 class A: 2086 pass 2087 2088 def _collect_if_necessary(self): 2089 # we create no ref-cycles so in CPython no gc should be needed 2090 if sys.implementation.name != 'cpython': 2091 support.gc_collect() 2092 2093 def test_finalize(self): 2094 def add(x,y,z): 2095 res.append(x + y + z) 2096 return x + y + z 2097 2098 a = self.A() 2099 2100 res = [] 2101 f = weakref.finalize(a, add, 67, 43, z=89) 2102 self.assertEqual(f.alive, True) 2103 self.assertEqual(f.peek(), (a, add, (67,43), {'z':89})) 2104 self.assertEqual(f(), 199) 2105 self.assertEqual(f(), None) 2106 self.assertEqual(f(), None) 2107 self.assertEqual(f.peek(), None) 2108 self.assertEqual(f.detach(), None) 2109 self.assertEqual(f.alive, False) 2110 self.assertEqual(res, [199]) 2111 2112 res = [] 2113 f = weakref.finalize(a, add, 67, 43, 89) 2114 self.assertEqual(f.peek(), (a, add, (67,43,89), {})) 2115 self.assertEqual(f.detach(), (a, add, (67,43,89), {})) 2116 self.assertEqual(f(), None) 2117 self.assertEqual(f(), None) 2118 self.assertEqual(f.peek(), None) 2119 self.assertEqual(f.detach(), None) 2120 self.assertEqual(f.alive, False) 2121 self.assertEqual(res, []) 2122 2123 res = [] 2124 f = weakref.finalize(a, add, x=67, y=43, z=89) 2125 del a 2126 self._collect_if_necessary() 2127 self.assertEqual(f(), None) 2128 self.assertEqual(f(), None) 2129 self.assertEqual(f.peek(), None) 2130 self.assertEqual(f.detach(), None) 2131 self.assertEqual(f.alive, False) 2132 self.assertEqual(res, [199]) 2133 2134 def test_arg_errors(self): 2135 def fin(*args, **kwargs): 2136 res.append((args, kwargs)) 2137 2138 a = self.A() 2139 2140 res = [] 2141 f = weakref.finalize(a, fin, 1, 2, func=3, obj=4) 2142 self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4})) 2143 f() 2144 self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})]) 2145 2146 with self.assertRaises(TypeError): 2147 weakref.finalize(a, func=fin, arg=1) 2148 with self.assertRaises(TypeError): 2149 weakref.finalize(obj=a, func=fin, arg=1) 2150 self.assertRaises(TypeError, weakref.finalize, a) 2151 self.assertRaises(TypeError, weakref.finalize) 2152 2153 def test_order(self): 2154 a = self.A() 2155 res = [] 2156 2157 f1 = weakref.finalize(a, res.append, 'f1') 2158 f2 = weakref.finalize(a, res.append, 'f2') 2159 f3 = weakref.finalize(a, res.append, 'f3') 2160 f4 = weakref.finalize(a, res.append, 'f4') 2161 f5 = weakref.finalize(a, res.append, 'f5') 2162 2163 # make sure finalizers can keep themselves alive 2164 del f1, f4 2165 2166 self.assertTrue(f2.alive) 2167 self.assertTrue(f3.alive) 2168 self.assertTrue(f5.alive) 2169 2170 self.assertTrue(f5.detach()) 2171 self.assertFalse(f5.alive) 2172 2173 f5() # nothing because previously unregistered 2174 res.append('A') 2175 f3() # => res.append('f3') 2176 self.assertFalse(f3.alive) 2177 res.append('B') 2178 f3() # nothing because previously called 2179 res.append('C') 2180 del a 2181 self._collect_if_necessary() 2182 # => res.append('f4') 2183 # => res.append('f2') 2184 # => res.append('f1') 2185 self.assertFalse(f2.alive) 2186 res.append('D') 2187 f2() # nothing because previously called by gc 2188 2189 expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D'] 2190 self.assertEqual(res, expected) 2191 2192 def test_all_freed(self): 2193 # we want a weakrefable subclass of weakref.finalize 2194 class MyFinalizer(weakref.finalize): 2195 pass 2196 2197 a = self.A() 2198 res = [] 2199 def callback(): 2200 res.append(123) 2201 f = MyFinalizer(a, callback) 2202 2203 wr_callback = weakref.ref(callback) 2204 wr_f = weakref.ref(f) 2205 del callback, f 2206 2207 self.assertIsNotNone(wr_callback()) 2208 self.assertIsNotNone(wr_f()) 2209 2210 del a 2211 self._collect_if_necessary() 2212 2213 self.assertIsNone(wr_callback()) 2214 self.assertIsNone(wr_f()) 2215 self.assertEqual(res, [123]) 2216 2217 @classmethod 2218 def run_in_child(cls): 2219 def error(): 2220 # Create an atexit finalizer from inside a finalizer called 2221 # at exit. This should be the next to be run. 2222 g1 = weakref.finalize(cls, print, 'g1') 2223 print('f3 error') 2224 1/0 2225 2226 # cls should stay alive till atexit callbacks run 2227 f1 = weakref.finalize(cls, print, 'f1', _global_var) 2228 f2 = weakref.finalize(cls, print, 'f2', _global_var) 2229 f3 = weakref.finalize(cls, error) 2230 f4 = weakref.finalize(cls, print, 'f4', _global_var) 2231 2232 assert f1.atexit == True 2233 f2.atexit = False 2234 assert f3.atexit == True 2235 assert f4.atexit == True 2236 2237 def test_atexit(self): 2238 prog = ('from test.test_weakref import FinalizeTestCase;'+ 2239 'FinalizeTestCase.run_in_child()') 2240 rc, out, err = script_helper.assert_python_ok('-c', prog) 2241 out = out.decode('ascii').splitlines() 2242 self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar']) 2243 self.assertTrue(b'ZeroDivisionError' in err) 2244 2245 2246class ModuleTestCase(unittest.TestCase): 2247 def test_names(self): 2248 for name in ('ReferenceType', 'ProxyType', 'CallableProxyType', 2249 'WeakMethod', 'WeakSet', 'WeakKeyDictionary', 'WeakValueDictionary'): 2250 obj = getattr(weakref, name) 2251 if name != 'WeakSet': 2252 self.assertEqual(obj.__module__, 'weakref') 2253 self.assertEqual(obj.__name__, name) 2254 self.assertEqual(obj.__qualname__, name) 2255 2256 2257libreftest = """ Doctest for examples in the library reference: weakref.rst 2258 2259>>> from test.support import gc_collect 2260>>> import weakref 2261>>> class Dict(dict): 2262... pass 2263... 2264>>> obj = Dict(red=1, green=2, blue=3) # this object is weak referencable 2265>>> r = weakref.ref(obj) 2266>>> print(r() is obj) 2267True 2268 2269>>> import weakref 2270>>> class Object: 2271... pass 2272... 2273>>> o = Object() 2274>>> r = weakref.ref(o) 2275>>> o2 = r() 2276>>> o is o2 2277True 2278>>> del o, o2 2279>>> gc_collect() # For PyPy or other GCs. 2280>>> print(r()) 2281None 2282 2283>>> import weakref 2284>>> class ExtendedRef(weakref.ref): 2285... def __init__(self, ob, callback=None, **annotations): 2286... super().__init__(ob, callback) 2287... self.__counter = 0 2288... for k, v in annotations.items(): 2289... setattr(self, k, v) 2290... def __call__(self): 2291... '''Return a pair containing the referent and the number of 2292... times the reference has been called. 2293... ''' 2294... ob = super().__call__() 2295... if ob is not None: 2296... self.__counter += 1 2297... ob = (ob, self.__counter) 2298... return ob 2299... 2300>>> class A: # not in docs from here, just testing the ExtendedRef 2301... pass 2302... 2303>>> a = A() 2304>>> r = ExtendedRef(a, foo=1, bar="baz") 2305>>> r.foo 23061 2307>>> r.bar 2308'baz' 2309>>> r()[1] 23101 2311>>> r()[1] 23122 2313>>> r()[0] is a 2314True 2315 2316 2317>>> import weakref 2318>>> _id2obj_dict = weakref.WeakValueDictionary() 2319>>> def remember(obj): 2320... oid = id(obj) 2321... _id2obj_dict[oid] = obj 2322... return oid 2323... 2324>>> def id2obj(oid): 2325... return _id2obj_dict[oid] 2326... 2327>>> a = A() # from here, just testing 2328>>> a_id = remember(a) 2329>>> id2obj(a_id) is a 2330True 2331>>> del a 2332>>> gc_collect() # For PyPy or other GCs. 2333>>> try: 2334... id2obj(a_id) 2335... except KeyError: 2336... print('OK') 2337... else: 2338... print('WeakValueDictionary error') 2339OK 2340 2341""" 2342 2343__test__ = {'libreftest' : libreftest} 2344 2345def load_tests(loader, tests, pattern): 2346 tests.addTest(doctest.DocTestSuite()) 2347 return tests 2348 2349 2350if __name__ == "__main__": 2351 unittest.main() 2352