• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import sys
3import doctest
4import unittest
5import collections
6import weakref
7import operator
8import contextlib
9import copy
10import threading
11import time
12import random
13import textwrap
14
15from test import support
16from test.support import script_helper, ALWAYS_EQ, suppress_immortalization
17from test.support import gc_collect
18from test.support import import_helper
19from test.support import threading_helper
20from test.support import is_wasi, Py_DEBUG
21
22# Used in ReferencesTestCase.test_ref_created_during_del() .
23ref_from_del = None
24
25# Used by FinalizeTestCase as a global that may be replaced by None
26# when the interpreter shuts down.
27_global_var = 'foobar'
28
29class C:
30    def method(self):
31        pass
32
33
34class Callable:
35    bar = None
36
37    def __call__(self, x):
38        self.bar = x
39
40
41def create_function():
42    def f(): pass
43    return f
44
45def create_bound_method():
46    return C().method
47
48
49class Object:
50    def __init__(self, arg):
51        self.arg = arg
52    def __repr__(self):
53        return "<Object %r>" % self.arg
54    def __eq__(self, other):
55        if isinstance(other, Object):
56            return self.arg == other.arg
57        return NotImplemented
58    def __lt__(self, other):
59        if isinstance(other, Object):
60            return self.arg < other.arg
61        return NotImplemented
62    def __hash__(self):
63        return hash(self.arg)
64    def some_method(self):
65        return 4
66    def other_method(self):
67        return 5
68
69
70class RefCycle:
71    def __init__(self):
72        self.cycle = self
73
74
75class TestBase(unittest.TestCase):
76
77    def setUp(self):
78        self.cbcalled = 0
79
80    def callback(self, ref):
81        self.cbcalled += 1
82
83
84@contextlib.contextmanager
85def collect_in_thread(period=0.005):
86    """
87    Ensure GC collections happen in a different thread, at a high frequency.
88    """
89    please_stop = False
90
91    def collect():
92        while not please_stop:
93            time.sleep(period)
94            gc.collect()
95
96    with support.disable_gc():
97        t = threading.Thread(target=collect)
98        t.start()
99        try:
100            yield
101        finally:
102            please_stop = True
103            t.join()
104
105
106class ReferencesTestCase(TestBase):
107
108    def test_basic_ref(self):
109        self.check_basic_ref(C)
110        self.check_basic_ref(create_function)
111        self.check_basic_ref(create_bound_method)
112
113        # Just make sure the tp_repr handler doesn't raise an exception.
114        # Live reference:
115        o = C()
116        wr = weakref.ref(o)
117        repr(wr)
118        # Dead reference:
119        del o
120        repr(wr)
121
122    @support.cpython_only
123    def test_ref_repr(self):
124        obj = C()
125        ref = weakref.ref(obj)
126        regex = (
127            rf"<weakref at 0x[0-9a-fA-F]+; "
128            rf"to '{'' if __name__ == '__main__' else C.__module__ + '.'}{C.__qualname__}' "
129            rf"at 0x[0-9a-fA-F]+>"
130        )
131        self.assertRegex(repr(ref), regex)
132
133        obj = None
134        gc_collect()
135        self.assertRegex(repr(ref),
136                         rf'<weakref at 0x[0-9a-fA-F]+; dead>')
137
138        # test type with __name__
139        class WithName:
140            @property
141            def __name__(self):
142                return "custom_name"
143
144        obj2 = WithName()
145        ref2 = weakref.ref(obj2)
146        regex = (
147            rf"<weakref at 0x[0-9a-fA-F]+; "
148            rf"to '{'' if __name__ == '__main__' else WithName.__module__ + '.'}"
149            rf"{WithName.__qualname__}' "
150            rf"at 0x[0-9a-fA-F]+ +\(custom_name\)>"
151        )
152        self.assertRegex(repr(ref2), regex)
153
154    def test_repr_failure_gh99184(self):
155        class MyConfig(dict):
156            def __getattr__(self, x):
157                return self[x]
158
159        obj = MyConfig(offset=5)
160        obj_weakref = weakref.ref(obj)
161
162        self.assertIn('MyConfig', repr(obj_weakref))
163        self.assertIn('MyConfig', str(obj_weakref))
164
165    def test_basic_callback(self):
166        self.check_basic_callback(C)
167        self.check_basic_callback(create_function)
168        self.check_basic_callback(create_bound_method)
169
170    @support.cpython_only
171    def test_cfunction(self):
172        _testcapi = import_helper.import_module("_testcapi")
173        create_cfunction = _testcapi.create_cfunction
174        f = create_cfunction()
175        wr = weakref.ref(f)
176        self.assertIs(wr(), f)
177        del f
178        self.assertIsNone(wr())
179        self.check_basic_ref(create_cfunction)
180        self.check_basic_callback(create_cfunction)
181
182    def test_multiple_callbacks(self):
183        o = C()
184        ref1 = weakref.ref(o, self.callback)
185        ref2 = weakref.ref(o, self.callback)
186        del o
187        gc_collect()  # For PyPy or other GCs.
188        self.assertIsNone(ref1(), "expected reference to be invalidated")
189        self.assertIsNone(ref2(), "expected reference to be invalidated")
190        self.assertEqual(self.cbcalled, 2,
191                     "callback not called the right number of times")
192
193    def test_multiple_selfref_callbacks(self):
194        # Make sure all references are invalidated before callbacks are called
195        #
196        # What's important here is that we're using the first
197        # reference in the callback invoked on the second reference
198        # (the most recently created ref is cleaned up first).  This
199        # tests that all references to the object are invalidated
200        # before any of the callbacks are invoked, so that we only
201        # have one invocation of _weakref.c:cleanup_helper() active
202        # for a particular object at a time.
203        #
204        def callback(object, self=self):
205            self.ref()
206        c = C()
207        self.ref = weakref.ref(c, callback)
208        ref1 = weakref.ref(c, callback)
209        del c
210
211    def test_constructor_kwargs(self):
212        c = C()
213        self.assertRaises(TypeError, weakref.ref, c, callback=None)
214
215    def test_proxy_ref(self):
216        o = C()
217        o.bar = 1
218        ref1 = weakref.proxy(o, self.callback)
219        ref2 = weakref.proxy(o, self.callback)
220        del o
221        gc_collect()  # For PyPy or other GCs.
222
223        def check(proxy):
224            proxy.bar
225
226        self.assertRaises(ReferenceError, check, ref1)
227        self.assertRaises(ReferenceError, check, ref2)
228        ref3 = weakref.proxy(C())
229        gc_collect()  # For PyPy or other GCs.
230        self.assertRaises(ReferenceError, bool, ref3)
231        self.assertEqual(self.cbcalled, 2)
232
233    @support.cpython_only
234    def test_proxy_repr(self):
235        obj = C()
236        ref = weakref.proxy(obj, self.callback)
237        regex = (
238            rf"<weakproxy at 0x[0-9a-fA-F]+; "
239            rf"to '{'' if __name__ == '__main__' else C.__module__ + '.'}{C.__qualname__}' "
240            rf"at 0x[0-9a-fA-F]+>"
241        )
242        self.assertRegex(repr(ref), regex)
243
244        obj = None
245        gc_collect()
246        self.assertRegex(repr(ref),
247                         rf'<weakproxy at 0x[0-9a-fA-F]+; dead>')
248
249    def check_basic_ref(self, factory):
250        o = factory()
251        ref = weakref.ref(o)
252        self.assertIsNotNone(ref(),
253                     "weak reference to live object should be live")
254        o2 = ref()
255        self.assertIs(o, o2,
256                     "<ref>() should return original object if live")
257
258    def check_basic_callback(self, factory):
259        self.cbcalled = 0
260        o = factory()
261        ref = weakref.ref(o, self.callback)
262        del o
263        gc_collect()  # For PyPy or other GCs.
264        self.assertEqual(self.cbcalled, 1,
265                     "callback did not properly set 'cbcalled'")
266        self.assertIsNone(ref(),
267                     "ref2 should be dead after deleting object reference")
268
269    def test_ref_reuse(self):
270        o = C()
271        ref1 = weakref.ref(o)
272        # create a proxy to make sure that there's an intervening creation
273        # between these two; it should make no difference
274        proxy = weakref.proxy(o)
275        ref2 = weakref.ref(o)
276        self.assertIs(ref1, ref2,
277                     "reference object w/out callback should be re-used")
278
279        o = C()
280        proxy = weakref.proxy(o)
281        ref1 = weakref.ref(o)
282        ref2 = weakref.ref(o)
283        self.assertIs(ref1, ref2,
284                     "reference object w/out callback should be re-used")
285        self.assertEqual(weakref.getweakrefcount(o), 2,
286                     "wrong weak ref count for object")
287        del proxy
288        gc_collect()  # For PyPy or other GCs.
289        self.assertEqual(weakref.getweakrefcount(o), 1,
290                     "wrong weak ref count for object after deleting proxy")
291
292    def test_proxy_reuse(self):
293        o = C()
294        proxy1 = weakref.proxy(o)
295        ref = weakref.ref(o)
296        proxy2 = weakref.proxy(o)
297        self.assertIs(proxy1, proxy2,
298                     "proxy object w/out callback should have been re-used")
299
300    def test_basic_proxy(self):
301        o = C()
302        self.check_proxy(o, weakref.proxy(o))
303
304        L = collections.UserList()
305        p = weakref.proxy(L)
306        self.assertFalse(p, "proxy for empty UserList should be false")
307        p.append(12)
308        self.assertEqual(len(L), 1)
309        self.assertTrue(p, "proxy for non-empty UserList should be true")
310        p[:] = [2, 3]
311        self.assertEqual(len(L), 2)
312        self.assertEqual(len(p), 2)
313        self.assertIn(3, p, "proxy didn't support __contains__() properly")
314        p[1] = 5
315        self.assertEqual(L[1], 5)
316        self.assertEqual(p[1], 5)
317        L2 = collections.UserList(L)
318        p2 = weakref.proxy(L2)
319        self.assertEqual(p, p2)
320        ## self.assertEqual(repr(L2), repr(p2))
321        L3 = collections.UserList(range(10))
322        p3 = weakref.proxy(L3)
323        self.assertEqual(L3[:], p3[:])
324        self.assertEqual(L3[5:], p3[5:])
325        self.assertEqual(L3[:5], p3[:5])
326        self.assertEqual(L3[2:5], p3[2:5])
327
328    def test_proxy_unicode(self):
329        # See bug 5037
330        class C(object):
331            def __str__(self):
332                return "string"
333            def __bytes__(self):
334                return b"bytes"
335        instance = C()
336        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
337        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
338
339    def test_proxy_index(self):
340        class C:
341            def __index__(self):
342                return 10
343        o = C()
344        p = weakref.proxy(o)
345        self.assertEqual(operator.index(p), 10)
346
347    def test_proxy_div(self):
348        class C:
349            def __floordiv__(self, other):
350                return 42
351            def __ifloordiv__(self, other):
352                return 21
353        o = C()
354        p = weakref.proxy(o)
355        self.assertEqual(p // 5, 42)
356        p //= 5
357        self.assertEqual(p, 21)
358
359    def test_proxy_matmul(self):
360        class C:
361            def __matmul__(self, other):
362                return 1729
363            def __rmatmul__(self, other):
364                return -163
365            def __imatmul__(self, other):
366                return 561
367        o = C()
368        p = weakref.proxy(o)
369        self.assertEqual(p @ 5, 1729)
370        self.assertEqual(5 @ p, -163)
371        p @= 5
372        self.assertEqual(p, 561)
373
374    # The PyWeakref_* C API is documented as allowing either NULL or
375    # None as the value for the callback, where either means "no
376    # callback".  The "no callback" ref and proxy objects are supposed
377    # to be shared so long as they exist by all callers so long as
378    # they are active.  In Python 2.3.3 and earlier, this guarantee
379    # was not honored, and was broken in different ways for
380    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
381
382    def test_shared_ref_without_callback(self):
383        self.check_shared_without_callback(weakref.ref)
384
385    def test_shared_proxy_without_callback(self):
386        self.check_shared_without_callback(weakref.proxy)
387
388    def check_shared_without_callback(self, makeref):
389        o = Object(1)
390        p1 = makeref(o, None)
391        p2 = makeref(o, None)
392        self.assertIs(p1, p2, "both callbacks were None in the C API")
393        del p1, p2
394        p1 = makeref(o)
395        p2 = makeref(o, None)
396        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
397        del p1, p2
398        p1 = makeref(o)
399        p2 = makeref(o)
400        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
401        del p1, p2
402        p1 = makeref(o, None)
403        p2 = makeref(o)
404        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
405
406    def test_callable_proxy(self):
407        o = Callable()
408        ref1 = weakref.proxy(o)
409
410        self.check_proxy(o, ref1)
411
412        self.assertIs(type(ref1), weakref.CallableProxyType,
413                     "proxy is not of callable type")
414        ref1('twinkies!')
415        self.assertEqual(o.bar, 'twinkies!',
416                     "call through proxy not passed through to original")
417        ref1(x='Splat.')
418        self.assertEqual(o.bar, 'Splat.',
419                     "call through proxy not passed through to original")
420
421        # expect due to too few args
422        self.assertRaises(TypeError, ref1)
423
424        # expect due to too many args
425        self.assertRaises(TypeError, ref1, 1, 2, 3)
426
427    def check_proxy(self, o, proxy):
428        o.foo = 1
429        self.assertEqual(proxy.foo, 1,
430                     "proxy does not reflect attribute addition")
431        o.foo = 2
432        self.assertEqual(proxy.foo, 2,
433                     "proxy does not reflect attribute modification")
434        del o.foo
435        self.assertFalse(hasattr(proxy, 'foo'),
436                     "proxy does not reflect attribute removal")
437
438        proxy.foo = 1
439        self.assertEqual(o.foo, 1,
440                     "object does not reflect attribute addition via proxy")
441        proxy.foo = 2
442        self.assertEqual(o.foo, 2,
443            "object does not reflect attribute modification via proxy")
444        del proxy.foo
445        self.assertFalse(hasattr(o, 'foo'),
446                     "object does not reflect attribute removal via proxy")
447
448    def test_proxy_deletion(self):
449        # Test clearing of SF bug #762891
450        class Foo:
451            result = None
452            def __delitem__(self, accessor):
453                self.result = accessor
454        g = Foo()
455        f = weakref.proxy(g)
456        del f[0]
457        self.assertEqual(f.result, 0)
458
459    def test_proxy_bool(self):
460        # Test clearing of SF bug #1170766
461        class List(list): pass
462        lyst = List()
463        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
464
465    def test_proxy_iter(self):
466        # Test fails with a debug build of the interpreter
467        # (see bpo-38395).
468
469        obj = None
470
471        class MyObj:
472            def __iter__(self):
473                nonlocal obj
474                del obj
475                return NotImplemented
476
477        obj = MyObj()
478        p = weakref.proxy(obj)
479        with self.assertRaises(TypeError):
480            # "blech" in p calls MyObj.__iter__ through the proxy,
481            # without keeping a reference to the real object, so it
482            # can be killed in the middle of the call
483            "blech" in p
484
485    def test_proxy_next(self):
486        arr = [4, 5, 6]
487        def iterator_func():
488            yield from arr
489        it = iterator_func()
490
491        class IteratesWeakly:
492            def __iter__(self):
493                return weakref.proxy(it)
494
495        weak_it = IteratesWeakly()
496
497        # Calls proxy.__next__
498        self.assertEqual(list(weak_it), [4, 5, 6])
499
500    def test_proxy_bad_next(self):
501        # bpo-44720: PyIter_Next() shouldn't be called if the reference
502        # isn't an iterator.
503
504        not_an_iterator = lambda: 0
505
506        class A:
507            def __iter__(self):
508                return weakref.proxy(not_an_iterator)
509        a = A()
510
511        msg = "Weakref proxy referenced a non-iterator"
512        with self.assertRaisesRegex(TypeError, msg):
513            list(a)
514
515    def test_proxy_reversed(self):
516        class MyObj:
517            def __len__(self):
518                return 3
519            def __reversed__(self):
520                return iter('cba')
521
522        obj = MyObj()
523        self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
524
525    def test_proxy_hash(self):
526        class MyObj:
527            def __hash__(self):
528                return 42
529
530        obj = MyObj()
531        with self.assertRaises(TypeError):
532            hash(weakref.proxy(obj))
533
534        class MyObj:
535            __hash__ = None
536
537        obj = MyObj()
538        with self.assertRaises(TypeError):
539            hash(weakref.proxy(obj))
540
541    def test_getweakrefcount(self):
542        o = C()
543        ref1 = weakref.ref(o)
544        ref2 = weakref.ref(o, self.callback)
545        self.assertEqual(weakref.getweakrefcount(o), 2,
546                     "got wrong number of weak reference objects")
547
548        proxy1 = weakref.proxy(o)
549        proxy2 = weakref.proxy(o, self.callback)
550        self.assertEqual(weakref.getweakrefcount(o), 4,
551                     "got wrong number of weak reference objects")
552
553        del ref1, ref2, proxy1, proxy2
554        gc_collect()  # For PyPy or other GCs.
555        self.assertEqual(weakref.getweakrefcount(o), 0,
556                     "weak reference objects not unlinked from"
557                     " referent when discarded.")
558
559        # assumes ints do not support weakrefs
560        self.assertEqual(weakref.getweakrefcount(1), 0,
561                     "got wrong number of weak reference objects for int")
562
563    def test_getweakrefs(self):
564        o = C()
565        ref1 = weakref.ref(o, self.callback)
566        ref2 = weakref.ref(o, self.callback)
567        del ref1
568        gc_collect()  # For PyPy or other GCs.
569        self.assertEqual(weakref.getweakrefs(o), [ref2],
570                     "list of refs does not match")
571
572        o = C()
573        ref1 = weakref.ref(o, self.callback)
574        ref2 = weakref.ref(o, self.callback)
575        del ref2
576        gc_collect()  # For PyPy or other GCs.
577        self.assertEqual(weakref.getweakrefs(o), [ref1],
578                     "list of refs does not match")
579
580        del ref1
581        gc_collect()  # For PyPy or other GCs.
582        self.assertEqual(weakref.getweakrefs(o), [],
583                     "list of refs not cleared")
584
585        # assumes ints do not support weakrefs
586        self.assertEqual(weakref.getweakrefs(1), [],
587                     "list of refs does not match for int")
588
589    def test_newstyle_number_ops(self):
590        class F(float):
591            pass
592        f = F(2.0)
593        p = weakref.proxy(f)
594        self.assertEqual(p + 1.0, 3.0)
595        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
596
597    def test_callbacks_protected(self):
598        # Callbacks protected from already-set exceptions?
599        # Regression test for SF bug #478534.
600        class BogusError(Exception):
601            pass
602        data = {}
603        def remove(k):
604            del data[k]
605        def encapsulate():
606            f = lambda : ()
607            data[weakref.ref(f, remove)] = None
608            raise BogusError
609        try:
610            encapsulate()
611        except BogusError:
612            pass
613        else:
614            self.fail("exception not properly restored")
615        try:
616            encapsulate()
617        except BogusError:
618            pass
619        else:
620            self.fail("exception not properly restored")
621
622    def test_sf_bug_840829(self):
623        # "weakref callbacks and gc corrupt memory"
624        # subtype_dealloc erroneously exposed a new-style instance
625        # already in the process of getting deallocated to gc,
626        # causing double-deallocation if the instance had a weakref
627        # callback that triggered gc.
628        # If the bug exists, there probably won't be an obvious symptom
629        # in a release build.  In a debug build, a segfault will occur
630        # when the second attempt to remove the instance from the "list
631        # of all objects" occurs.
632
633        import gc
634
635        class C(object):
636            pass
637
638        c = C()
639        wr = weakref.ref(c, lambda ignore: gc.collect())
640        del c
641
642        # There endeth the first part.  It gets worse.
643        del wr
644
645        c1 = C()
646        c1.i = C()
647        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
648
649        c2 = C()
650        c2.c1 = c1
651        del c1  # still alive because c2 points to it
652
653        # Now when subtype_dealloc gets called on c2, it's not enough just
654        # that c2 is immune from gc while the weakref callbacks associated
655        # with c2 execute (there are none in this 2nd half of the test, btw).
656        # subtype_dealloc goes on to call the base classes' deallocs too,
657        # so any gc triggered by weakref callbacks associated with anything
658        # torn down by a base class dealloc can also trigger double
659        # deallocation of c2.
660        del c2
661
662    @suppress_immortalization()
663    def test_callback_in_cycle(self):
664        import gc
665
666        class J(object):
667            pass
668
669        class II(object):
670            def acallback(self, ignore):
671                self.J
672
673        I = II()
674        I.J = J
675        I.wr = weakref.ref(J, I.acallback)
676
677        # Now J and II are each in a self-cycle (as all new-style class
678        # objects are, since their __mro__ points back to them).  I holds
679        # both a weak reference (I.wr) and a strong reference (I.J) to class
680        # J.  I is also in a cycle (I.wr points to a weakref that references
681        # I.acallback).  When we del these three, they all become trash, but
682        # the cycles prevent any of them from getting cleaned up immediately.
683        # Instead they have to wait for cyclic gc to deduce that they're
684        # trash.
685        #
686        # gc used to call tp_clear on all of them, and the order in which
687        # it does that is pretty accidental.  The exact order in which we
688        # built up these things manages to provoke gc into running tp_clear
689        # in just the right order (I last).  Calling tp_clear on II leaves
690        # behind an insane class object (its __mro__ becomes NULL).  Calling
691        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
692        # just then because of the strong reference from I.J.  Calling
693        # tp_clear on I starts to clear I's __dict__, and just happens to
694        # clear I.J first -- I.wr is still intact.  That removes the last
695        # reference to J, which triggers the weakref callback.  The callback
696        # tries to do "self.J", and instances of new-style classes look up
697        # attributes ("J") in the class dict first.  The class (II) wants to
698        # search II.__mro__, but that's NULL.   The result was a segfault in
699        # a release build, and an assert failure in a debug build.
700        del I, J, II
701        gc.collect()
702
703    def test_callback_reachable_one_way(self):
704        import gc
705
706        # This one broke the first patch that fixed the previous test. In this case,
707        # the objects reachable from the callback aren't also reachable
708        # from the object (c1) *triggering* the callback:  you can get to
709        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
710        # got tp_clear'ed by the time the c2.cb callback got invoked.
711
712        class C:
713            def cb(self, ignore):
714                self.me
715                self.c1
716                self.wr
717
718        c1, c2 = C(), C()
719
720        c2.me = c2
721        c2.c1 = c1
722        c2.wr = weakref.ref(c1, c2.cb)
723
724        del c1, c2
725        gc.collect()
726
727    def test_callback_different_classes(self):
728        import gc
729
730        # Like test_callback_reachable_one_way, except c2 and c1 have different
731        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
732        # objects reachable from the dying object (c1) isn't enough to stop
733        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
734        # The result was a segfault (C.__mro__ was NULL when the callback
735        # tried to look up self.me).
736
737        class C(object):
738            def cb(self, ignore):
739                self.me
740                self.c1
741                self.wr
742
743        class D:
744            pass
745
746        c1, c2 = D(), C()
747
748        c2.me = c2
749        c2.c1 = c1
750        c2.wr = weakref.ref(c1, c2.cb)
751
752        del c1, c2, C, D
753        gc.collect()
754
755    @suppress_immortalization()
756    def test_callback_in_cycle_resurrection(self):
757        import gc
758
759        # Do something nasty in a weakref callback:  resurrect objects
760        # from dead cycles.  For this to be attempted, the weakref and
761        # its callback must also be part of the cyclic trash (else the
762        # objects reachable via the callback couldn't be in cyclic trash
763        # to begin with -- the callback would act like an external root).
764        # But gc clears trash weakrefs with callbacks early now, which
765        # disables the callbacks, so the callbacks shouldn't get called
766        # at all (and so nothing actually gets resurrected).
767
768        alist = []
769        class C(object):
770            def __init__(self, value):
771                self.attribute = value
772
773            def acallback(self, ignore):
774                alist.append(self.c)
775
776        c1, c2 = C(1), C(2)
777        c1.c = c2
778        c2.c = c1
779        c1.wr = weakref.ref(c2, c1.acallback)
780        c2.wr = weakref.ref(c1, c2.acallback)
781
782        def C_went_away(ignore):
783            alist.append("C went away")
784        wr = weakref.ref(C, C_went_away)
785
786        del c1, c2, C   # make them all trash
787        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
788
789        gc.collect()
790        # c1.wr and c2.wr were part of the cyclic trash, so should have
791        # been cleared without their callbacks executing.  OTOH, the weakref
792        # to C is bound to a function local (wr), and wasn't trash, so that
793        # callback should have been invoked when C went away.
794        self.assertEqual(alist, ["C went away"])
795        # The remaining weakref should be dead now (its callback ran).
796        self.assertEqual(wr(), None)
797
798        del alist[:]
799        gc.collect()
800        self.assertEqual(alist, [])
801
802    def test_callbacks_on_callback(self):
803        import gc
804
805        # Set up weakref callbacks *on* weakref callbacks.
806        alist = []
807        def safe_callback(ignore):
808            alist.append("safe_callback called")
809
810        class C(object):
811            def cb(self, ignore):
812                alist.append("cb called")
813
814        c, d = C(), C()
815        c.other = d
816        d.other = c
817        callback = c.cb
818        c.wr = weakref.ref(d, callback)     # this won't trigger
819        d.wr = weakref.ref(callback, d.cb)  # ditto
820        external_wr = weakref.ref(callback, safe_callback)  # but this will
821        self.assertIs(external_wr(), callback)
822
823        # The weakrefs attached to c and d should get cleared, so that
824        # C.cb is never called.  But external_wr isn't part of the cyclic
825        # trash, and no cyclic trash is reachable from it, so safe_callback
826        # should get invoked when the bound method object callback (c.cb)
827        # -- which is itself a callback, and also part of the cyclic trash --
828        # gets reclaimed at the end of gc.
829
830        del callback, c, d, C
831        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
832        gc.collect()
833        self.assertEqual(alist, ["safe_callback called"])
834        self.assertEqual(external_wr(), None)
835
836        del alist[:]
837        gc.collect()
838        self.assertEqual(alist, [])
839
840    def test_gc_during_ref_creation(self):
841        self.check_gc_during_creation(weakref.ref)
842
843    def test_gc_during_proxy_creation(self):
844        self.check_gc_during_creation(weakref.proxy)
845
846    def check_gc_during_creation(self, makeref):
847        thresholds = gc.get_threshold()
848        gc.set_threshold(1, 1, 1)
849        gc.collect()
850        class A:
851            pass
852
853        def callback(*args):
854            pass
855
856        referenced = A()
857
858        a = A()
859        a.a = a
860        a.wr = makeref(referenced)
861
862        try:
863            # now make sure the object and the ref get labeled as
864            # cyclic trash:
865            a = A()
866            weakref.ref(referenced, callback)
867
868        finally:
869            gc.set_threshold(*thresholds)
870
871    def test_ref_created_during_del(self):
872        # Bug #1377858
873        # A weakref created in an object's __del__() would crash the
874        # interpreter when the weakref was cleaned up since it would refer to
875        # non-existent memory.  This test should not segfault the interpreter.
876        class Target(object):
877            def __del__(self):
878                global ref_from_del
879                ref_from_del = weakref.ref(self)
880
881        w = Target()
882
883    def test_init(self):
884        # Issue 3634
885        # <weakref to class>.__init__() doesn't check errors correctly
886        r = weakref.ref(Exception)
887        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
888        # No exception should be raised here
889        gc.collect()
890
891    @suppress_immortalization()
892    def test_classes(self):
893        # Check that classes are weakrefable.
894        class A(object):
895            pass
896        l = []
897        weakref.ref(int)
898        a = weakref.ref(A, l.append)
899        A = None
900        gc.collect()
901        self.assertEqual(a(), None)
902        self.assertEqual(l, [a])
903
904    def test_equality(self):
905        # Alive weakrefs defer equality testing to their underlying object.
906        x = Object(1)
907        y = Object(1)
908        z = Object(2)
909        a = weakref.ref(x)
910        b = weakref.ref(y)
911        c = weakref.ref(z)
912        d = weakref.ref(x)
913        # Note how we directly test the operators here, to stress both
914        # __eq__ and __ne__.
915        self.assertTrue(a == b)
916        self.assertFalse(a != b)
917        self.assertFalse(a == c)
918        self.assertTrue(a != c)
919        self.assertTrue(a == d)
920        self.assertFalse(a != d)
921        self.assertFalse(a == x)
922        self.assertTrue(a != x)
923        self.assertTrue(a == ALWAYS_EQ)
924        self.assertFalse(a != ALWAYS_EQ)
925        del x, y, z
926        gc.collect()
927        for r in a, b, c:
928            # Sanity check
929            self.assertIs(r(), None)
930        # Dead weakrefs compare by identity: whether `a` and `d` are the
931        # same weakref object is an implementation detail, since they pointed
932        # to the same original object and didn't have a callback.
933        # (see issue #16453).
934        self.assertFalse(a == b)
935        self.assertTrue(a != b)
936        self.assertFalse(a == c)
937        self.assertTrue(a != c)
938        self.assertEqual(a == d, a is d)
939        self.assertEqual(a != d, a is not d)
940
941    def test_ordering(self):
942        # weakrefs cannot be ordered, even if the underlying objects can.
943        ops = [operator.lt, operator.gt, operator.le, operator.ge]
944        x = Object(1)
945        y = Object(1)
946        a = weakref.ref(x)
947        b = weakref.ref(y)
948        for op in ops:
949            self.assertRaises(TypeError, op, a, b)
950        # Same when dead.
951        del x, y
952        gc.collect()
953        for op in ops:
954            self.assertRaises(TypeError, op, a, b)
955
956    def test_hashing(self):
957        # Alive weakrefs hash the same as the underlying object
958        x = Object(42)
959        y = Object(42)
960        a = weakref.ref(x)
961        b = weakref.ref(y)
962        self.assertEqual(hash(a), hash(42))
963        del x, y
964        gc.collect()
965        # Dead weakrefs:
966        # - retain their hash is they were hashed when alive;
967        # - otherwise, cannot be hashed.
968        self.assertEqual(hash(a), hash(42))
969        self.assertRaises(TypeError, hash, b)
970
971    @unittest.skipIf(is_wasi and Py_DEBUG, "requires deep stack")
972    def test_trashcan_16602(self):
973        # Issue #16602: when a weakref's target was part of a long
974        # deallocation chain, the trashcan mechanism could delay clearing
975        # of the weakref and make the target object visible from outside
976        # code even though its refcount had dropped to 0.  A crash ensued.
977        class C:
978            def __init__(self, parent):
979                if not parent:
980                    return
981                wself = weakref.ref(self)
982                def cb(wparent):
983                    o = wself()
984                self.wparent = weakref.ref(parent, cb)
985
986        d = weakref.WeakKeyDictionary()
987        root = c = C(None)
988        for n in range(100):
989            d[c] = c = C(c)
990        del root
991        gc.collect()
992
993    def test_callback_attribute(self):
994        x = Object(1)
995        callback = lambda ref: None
996        ref1 = weakref.ref(x, callback)
997        self.assertIs(ref1.__callback__, callback)
998
999        ref2 = weakref.ref(x)
1000        self.assertIsNone(ref2.__callback__)
1001
1002    def test_callback_attribute_after_deletion(self):
1003        x = Object(1)
1004        ref = weakref.ref(x, self.callback)
1005        self.assertIsNotNone(ref.__callback__)
1006        del x
1007        support.gc_collect()
1008        self.assertIsNone(ref.__callback__)
1009
1010    def test_set_callback_attribute(self):
1011        x = Object(1)
1012        callback = lambda ref: None
1013        ref1 = weakref.ref(x, callback)
1014        with self.assertRaises(AttributeError):
1015            ref1.__callback__ = lambda ref: None
1016
1017    def test_callback_gcs(self):
1018        class ObjectWithDel(Object):
1019            def __del__(self): pass
1020        x = ObjectWithDel(1)
1021        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
1022        del x
1023        support.gc_collect()
1024
1025    @support.cpython_only
1026    def test_no_memory_when_clearing(self):
1027        # gh-118331: Make sure we do not raise an exception from the destructor
1028        # when clearing weakrefs if allocating the intermediate tuple fails.
1029        code = textwrap.dedent("""
1030        import _testcapi
1031        import weakref
1032
1033        class TestObj:
1034            pass
1035
1036        def callback(obj):
1037            pass
1038
1039        obj = TestObj()
1040        # The choice of 50 is arbitrary, but must be large enough to ensure
1041        # the allocation won't be serviced by the free list.
1042        wrs = [weakref.ref(obj, callback) for _ in range(50)]
1043        _testcapi.set_nomemory(0)
1044        del obj
1045        """).strip()
1046        res, _ = script_helper.run_python_until_end("-c", code)
1047        stderr = res.err.decode("ascii", "backslashreplace")
1048        self.assertNotRegex(stderr, "_Py_Dealloc: Deallocator of type 'TestObj'")
1049
1050
1051class SubclassableWeakrefTestCase(TestBase):
1052
1053    def test_subclass_refs(self):
1054        class MyRef(weakref.ref):
1055            def __init__(self, ob, callback=None, value=42):
1056                self.value = value
1057                super().__init__(ob, callback)
1058            def __call__(self):
1059                self.called = True
1060                return super().__call__()
1061        o = Object("foo")
1062        mr = MyRef(o, value=24)
1063        self.assertIs(mr(), o)
1064        self.assertTrue(mr.called)
1065        self.assertEqual(mr.value, 24)
1066        del o
1067        gc_collect()  # For PyPy or other GCs.
1068        self.assertIsNone(mr())
1069        self.assertTrue(mr.called)
1070
1071    def test_subclass_refs_dont_replace_standard_refs(self):
1072        class MyRef(weakref.ref):
1073            pass
1074        o = Object(42)
1075        r1 = MyRef(o)
1076        r2 = weakref.ref(o)
1077        self.assertIsNot(r1, r2)
1078        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
1079        self.assertEqual(weakref.getweakrefcount(o), 2)
1080        r3 = MyRef(o)
1081        self.assertEqual(weakref.getweakrefcount(o), 3)
1082        refs = weakref.getweakrefs(o)
1083        self.assertEqual(len(refs), 3)
1084        self.assertIs(r2, refs[0])
1085        self.assertIn(r1, refs[1:])
1086        self.assertIn(r3, refs[1:])
1087
1088    def test_subclass_refs_dont_conflate_callbacks(self):
1089        class MyRef(weakref.ref):
1090            pass
1091        o = Object(42)
1092        r1 = MyRef(o, id)
1093        r2 = MyRef(o, str)
1094        self.assertIsNot(r1, r2)
1095        refs = weakref.getweakrefs(o)
1096        self.assertIn(r1, refs)
1097        self.assertIn(r2, refs)
1098
1099    def test_subclass_refs_with_slots(self):
1100        class MyRef(weakref.ref):
1101            __slots__ = "slot1", "slot2"
1102            def __new__(type, ob, callback, slot1, slot2):
1103                return weakref.ref.__new__(type, ob, callback)
1104            def __init__(self, ob, callback, slot1, slot2):
1105                self.slot1 = slot1
1106                self.slot2 = slot2
1107            def meth(self):
1108                return self.slot1 + self.slot2
1109        o = Object(42)
1110        r = MyRef(o, None, "abc", "def")
1111        self.assertEqual(r.slot1, "abc")
1112        self.assertEqual(r.slot2, "def")
1113        self.assertEqual(r.meth(), "abcdef")
1114        self.assertFalse(hasattr(r, "__dict__"))
1115
1116    def test_subclass_refs_with_cycle(self):
1117        """Confirm https://bugs.python.org/issue3100 is fixed."""
1118        # An instance of a weakref subclass can have attributes.
1119        # If such a weakref holds the only strong reference to the object,
1120        # deleting the weakref will delete the object. In this case,
1121        # the callback must not be called, because the ref object is
1122        # being deleted.
1123        class MyRef(weakref.ref):
1124            pass
1125
1126        # Use a local callback, for "regrtest -R::"
1127        # to detect refcounting problems
1128        def callback(w):
1129            self.cbcalled += 1
1130
1131        o = C()
1132        r1 = MyRef(o, callback)
1133        r1.o = o
1134        del o
1135
1136        del r1 # Used to crash here
1137
1138        self.assertEqual(self.cbcalled, 0)
1139
1140        # Same test, with two weakrefs to the same object
1141        # (since code paths are different)
1142        o = C()
1143        r1 = MyRef(o, callback)
1144        r2 = MyRef(o, callback)
1145        r1.r = r2
1146        r2.o = o
1147        del o
1148        del r2
1149
1150        del r1 # Used to crash here
1151
1152        self.assertEqual(self.cbcalled, 0)
1153
1154
1155class WeakMethodTestCase(unittest.TestCase):
1156
1157    def _subclass(self):
1158        """Return an Object subclass overriding `some_method`."""
1159        class C(Object):
1160            def some_method(self):
1161                return 6
1162        return C
1163
1164    def test_alive(self):
1165        o = Object(1)
1166        r = weakref.WeakMethod(o.some_method)
1167        self.assertIsInstance(r, weakref.ReferenceType)
1168        self.assertIsInstance(r(), type(o.some_method))
1169        self.assertIs(r().__self__, o)
1170        self.assertIs(r().__func__, o.some_method.__func__)
1171        self.assertEqual(r()(), 4)
1172
1173    def test_object_dead(self):
1174        o = Object(1)
1175        r = weakref.WeakMethod(o.some_method)
1176        del o
1177        gc.collect()
1178        self.assertIs(r(), None)
1179
1180    def test_method_dead(self):
1181        C = self._subclass()
1182        o = C(1)
1183        r = weakref.WeakMethod(o.some_method)
1184        del C.some_method
1185        gc.collect()
1186        self.assertIs(r(), None)
1187
1188    def test_callback_when_object_dead(self):
1189        # Test callback behaviour when object dies first.
1190        C = self._subclass()
1191        calls = []
1192        def cb(arg):
1193            calls.append(arg)
1194        o = C(1)
1195        r = weakref.WeakMethod(o.some_method, cb)
1196        del o
1197        gc.collect()
1198        self.assertEqual(calls, [r])
1199        # Callback is only called once.
1200        C.some_method = Object.some_method
1201        gc.collect()
1202        self.assertEqual(calls, [r])
1203
1204    def test_callback_when_method_dead(self):
1205        # Test callback behaviour when method dies first.
1206        C = self._subclass()
1207        calls = []
1208        def cb(arg):
1209            calls.append(arg)
1210        o = C(1)
1211        r = weakref.WeakMethod(o.some_method, cb)
1212        del C.some_method
1213        gc.collect()
1214        self.assertEqual(calls, [r])
1215        # Callback is only called once.
1216        del o
1217        gc.collect()
1218        self.assertEqual(calls, [r])
1219
1220    @support.cpython_only
1221    def test_no_cycles(self):
1222        # A WeakMethod doesn't create any reference cycle to itself.
1223        o = Object(1)
1224        def cb(_):
1225            pass
1226        r = weakref.WeakMethod(o.some_method, cb)
1227        wr = weakref.ref(r)
1228        del r
1229        self.assertIs(wr(), None)
1230
1231    def test_equality(self):
1232        def _eq(a, b):
1233            self.assertTrue(a == b)
1234            self.assertFalse(a != b)
1235        def _ne(a, b):
1236            self.assertTrue(a != b)
1237            self.assertFalse(a == b)
1238        x = Object(1)
1239        y = Object(1)
1240        a = weakref.WeakMethod(x.some_method)
1241        b = weakref.WeakMethod(y.some_method)
1242        c = weakref.WeakMethod(x.other_method)
1243        d = weakref.WeakMethod(y.other_method)
1244        # Objects equal, same method
1245        _eq(a, b)
1246        _eq(c, d)
1247        # Objects equal, different method
1248        _ne(a, c)
1249        _ne(a, d)
1250        _ne(b, c)
1251        _ne(b, d)
1252        # Objects unequal, same or different method
1253        z = Object(2)
1254        e = weakref.WeakMethod(z.some_method)
1255        f = weakref.WeakMethod(z.other_method)
1256        _ne(a, e)
1257        _ne(a, f)
1258        _ne(b, e)
1259        _ne(b, f)
1260        # Compare with different types
1261        _ne(a, x.some_method)
1262        _eq(a, ALWAYS_EQ)
1263        del x, y, z
1264        gc.collect()
1265        # Dead WeakMethods compare by identity
1266        refs = a, b, c, d, e, f
1267        for q in refs:
1268            for r in refs:
1269                self.assertEqual(q == r, q is r)
1270                self.assertEqual(q != r, q is not r)
1271
1272    def test_hashing(self):
1273        # Alive WeakMethods are hashable if the underlying object is
1274        # hashable.
1275        x = Object(1)
1276        y = Object(1)
1277        a = weakref.WeakMethod(x.some_method)
1278        b = weakref.WeakMethod(y.some_method)
1279        c = weakref.WeakMethod(y.other_method)
1280        # Since WeakMethod objects are equal, the hashes should be equal.
1281        self.assertEqual(hash(a), hash(b))
1282        ha = hash(a)
1283        # Dead WeakMethods retain their old hash value
1284        del x, y
1285        gc.collect()
1286        self.assertEqual(hash(a), ha)
1287        self.assertEqual(hash(b), ha)
1288        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1289        self.assertRaises(TypeError, hash, c)
1290
1291
1292class MappingTestCase(TestBase):
1293
1294    COUNT = 10
1295
1296    if support.check_sanitizer(thread=True) and support.Py_GIL_DISABLED:
1297        # Reduce iteration count to get acceptable latency
1298        NUM_THREADED_ITERATIONS = 1000
1299    else:
1300        NUM_THREADED_ITERATIONS = 100000
1301
1302    def check_len_cycles(self, dict_type, cons):
1303        N = 20
1304        items = [RefCycle() for i in range(N)]
1305        dct = dict_type(cons(o) for o in items)
1306        # Keep an iterator alive
1307        it = dct.items()
1308        try:
1309            next(it)
1310        except StopIteration:
1311            pass
1312        del items
1313        gc.collect()
1314        n1 = len(dct)
1315        del it
1316        gc.collect()
1317        n2 = len(dct)
1318        # one item may be kept alive inside the iterator
1319        self.assertIn(n1, (0, 1))
1320        self.assertEqual(n2, 0)
1321
1322    def test_weak_keyed_len_cycles(self):
1323        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1324
1325    def test_weak_valued_len_cycles(self):
1326        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1327
1328    def check_len_race(self, dict_type, cons):
1329        # Extended sanity checks for len() in the face of cyclic collection
1330        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1331        for th in range(1, 100):
1332            N = 20
1333            gc.collect(0)
1334            gc.set_threshold(th, th, th)
1335            items = [RefCycle() for i in range(N)]
1336            dct = dict_type(cons(o) for o in items)
1337            del items
1338            # All items will be collected at next garbage collection pass
1339            it = dct.items()
1340            try:
1341                next(it)
1342            except StopIteration:
1343                pass
1344            n1 = len(dct)
1345            del it
1346            n2 = len(dct)
1347            self.assertGreaterEqual(n1, 0)
1348            self.assertLessEqual(n1, N)
1349            self.assertGreaterEqual(n2, 0)
1350            self.assertLessEqual(n2, n1)
1351
1352    def test_weak_keyed_len_race(self):
1353        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1354
1355    def test_weak_valued_len_race(self):
1356        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1357
1358    def test_weak_values(self):
1359        #
1360        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1361        #
1362        dict, objects = self.make_weak_valued_dict()
1363        for o in objects:
1364            self.assertEqual(weakref.getweakrefcount(o), 1)
1365            self.assertIs(o, dict[o.arg],
1366                         "wrong object returned by weak dict!")
1367        items1 = list(dict.items())
1368        items2 = list(dict.copy().items())
1369        items1.sort()
1370        items2.sort()
1371        self.assertEqual(items1, items2,
1372                     "cloning of weak-valued dictionary did not work!")
1373        del items1, items2
1374        self.assertEqual(len(dict), self.COUNT)
1375        del objects[0]
1376        gc_collect()  # For PyPy or other GCs.
1377        self.assertEqual(len(dict), self.COUNT - 1,
1378                     "deleting object did not cause dictionary update")
1379        del objects, o
1380        gc_collect()  # For PyPy or other GCs.
1381        self.assertEqual(len(dict), 0,
1382                     "deleting the values did not clear the dictionary")
1383        # regression on SF bug #447152:
1384        dict = weakref.WeakValueDictionary()
1385        self.assertRaises(KeyError, dict.__getitem__, 1)
1386        dict[2] = C()
1387        gc_collect()  # For PyPy or other GCs.
1388        self.assertRaises(KeyError, dict.__getitem__, 2)
1389
1390    def test_weak_keys(self):
1391        #
1392        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1393        #  len(d), k in d.
1394        #
1395        dict, objects = self.make_weak_keyed_dict()
1396        for o in objects:
1397            self.assertEqual(weakref.getweakrefcount(o), 1,
1398                         "wrong number of weak references to %r!" % o)
1399            self.assertIs(o.arg, dict[o],
1400                         "wrong object returned by weak dict!")
1401        items1 = dict.items()
1402        items2 = dict.copy().items()
1403        self.assertEqual(set(items1), set(items2),
1404                     "cloning of weak-keyed dictionary did not work!")
1405        del items1, items2
1406        self.assertEqual(len(dict), self.COUNT)
1407        del objects[0]
1408        gc_collect()  # For PyPy or other GCs.
1409        self.assertEqual(len(dict), (self.COUNT - 1),
1410                     "deleting object did not cause dictionary update")
1411        del objects, o
1412        gc_collect()  # For PyPy or other GCs.
1413        self.assertEqual(len(dict), 0,
1414                     "deleting the keys did not clear the dictionary")
1415        o = Object(42)
1416        dict[o] = "What is the meaning of the universe?"
1417        self.assertIn(o, dict)
1418        self.assertNotIn(34, dict)
1419
1420    def test_weak_keyed_iters(self):
1421        dict, objects = self.make_weak_keyed_dict()
1422        self.check_iters(dict)
1423
1424        # Test keyrefs()
1425        refs = dict.keyrefs()
1426        self.assertEqual(len(refs), len(objects))
1427        objects2 = list(objects)
1428        for wr in refs:
1429            ob = wr()
1430            self.assertIn(ob, dict)
1431            self.assertIn(ob, dict)
1432            self.assertEqual(ob.arg, dict[ob])
1433            objects2.remove(ob)
1434        self.assertEqual(len(objects2), 0)
1435
1436        # Test iterkeyrefs()
1437        objects2 = list(objects)
1438        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1439        for wr in dict.keyrefs():
1440            ob = wr()
1441            self.assertIn(ob, dict)
1442            self.assertIn(ob, dict)
1443            self.assertEqual(ob.arg, dict[ob])
1444            objects2.remove(ob)
1445        self.assertEqual(len(objects2), 0)
1446
1447    def test_weak_valued_iters(self):
1448        dict, objects = self.make_weak_valued_dict()
1449        self.check_iters(dict)
1450
1451        # Test valuerefs()
1452        refs = dict.valuerefs()
1453        self.assertEqual(len(refs), len(objects))
1454        objects2 = list(objects)
1455        for wr in refs:
1456            ob = wr()
1457            self.assertEqual(ob, dict[ob.arg])
1458            self.assertEqual(ob.arg, dict[ob.arg].arg)
1459            objects2.remove(ob)
1460        self.assertEqual(len(objects2), 0)
1461
1462        # Test itervaluerefs()
1463        objects2 = list(objects)
1464        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1465        for wr in dict.itervaluerefs():
1466            ob = wr()
1467            self.assertEqual(ob, dict[ob.arg])
1468            self.assertEqual(ob.arg, dict[ob.arg].arg)
1469            objects2.remove(ob)
1470        self.assertEqual(len(objects2), 0)
1471
1472    def check_iters(self, dict):
1473        # item iterator:
1474        items = list(dict.items())
1475        for item in dict.items():
1476            items.remove(item)
1477        self.assertFalse(items, "items() did not touch all items")
1478
1479        # key iterator, via __iter__():
1480        keys = list(dict.keys())
1481        for k in dict:
1482            keys.remove(k)
1483        self.assertFalse(keys, "__iter__() did not touch all keys")
1484
1485        # key iterator, via iterkeys():
1486        keys = list(dict.keys())
1487        for k in dict.keys():
1488            keys.remove(k)
1489        self.assertFalse(keys, "iterkeys() did not touch all keys")
1490
1491        # value iterator:
1492        values = list(dict.values())
1493        for v in dict.values():
1494            values.remove(v)
1495        self.assertFalse(values,
1496                     "itervalues() did not touch all values")
1497
1498    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1499        n = len(dict)
1500        it = iter(getattr(dict, iter_name)())
1501        next(it)             # Trigger internal iteration
1502        # Destroy an object
1503        del objects[-1]
1504        gc.collect()    # just in case
1505        # We have removed either the first consumed object, or another one
1506        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1507        del it
1508        # The removal has been committed
1509        self.assertEqual(len(dict), n - 1)
1510
1511    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1512        # Check that we can explicitly mutate the weak dict without
1513        # interfering with delayed removal.
1514        # `testcontext` should create an iterator, destroy one of the
1515        # weakref'ed objects and then return a new key/value pair corresponding
1516        # to the destroyed object.
1517        with testcontext() as (k, v):
1518            self.assertNotIn(k, dict)
1519        with testcontext() as (k, v):
1520            self.assertRaises(KeyError, dict.__delitem__, k)
1521        self.assertNotIn(k, dict)
1522        with testcontext() as (k, v):
1523            self.assertRaises(KeyError, dict.pop, k)
1524        self.assertNotIn(k, dict)
1525        with testcontext() as (k, v):
1526            dict[k] = v
1527        self.assertEqual(dict[k], v)
1528        ddict = copy.copy(dict)
1529        with testcontext() as (k, v):
1530            dict.update(ddict)
1531        self.assertEqual(dict, ddict)
1532        with testcontext() as (k, v):
1533            dict.clear()
1534        self.assertEqual(len(dict), 0)
1535
1536    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1537        # Check that len() works when both iterating and removing keys
1538        # explicitly through various means (.pop(), .clear()...), while
1539        # implicit mutation is deferred because an iterator is alive.
1540        # (each call to testcontext() should schedule one item for removal
1541        #  for this test to work properly)
1542        o = Object(123456)
1543        with testcontext():
1544            n = len(dict)
1545            # Since underlying dict is ordered, first item is popped
1546            dict.pop(next(dict.keys()))
1547            self.assertEqual(len(dict), n - 1)
1548            dict[o] = o
1549            self.assertEqual(len(dict), n)
1550        # last item in objects is removed from dict in context shutdown
1551        with testcontext():
1552            self.assertEqual(len(dict), n - 1)
1553            # Then, (o, o) is popped
1554            dict.popitem()
1555            self.assertEqual(len(dict), n - 2)
1556        with testcontext():
1557            self.assertEqual(len(dict), n - 3)
1558            del dict[next(dict.keys())]
1559            self.assertEqual(len(dict), n - 4)
1560        with testcontext():
1561            self.assertEqual(len(dict), n - 5)
1562            dict.popitem()
1563            self.assertEqual(len(dict), n - 6)
1564        with testcontext():
1565            dict.clear()
1566            self.assertEqual(len(dict), 0)
1567        self.assertEqual(len(dict), 0)
1568
1569    def test_weak_keys_destroy_while_iterating(self):
1570        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1571        dict, objects = self.make_weak_keyed_dict()
1572        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1573        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1574        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1575        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1576        dict, objects = self.make_weak_keyed_dict()
1577        @contextlib.contextmanager
1578        def testcontext():
1579            try:
1580                it = iter(dict.items())
1581                next(it)
1582                # Schedule a key/value for removal and recreate it
1583                v = objects.pop().arg
1584                gc.collect()      # just in case
1585                yield Object(v), v
1586            finally:
1587                it = None           # should commit all removals
1588                gc.collect()
1589        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1590        # Issue #21173: len() fragile when keys are both implicitly and
1591        # explicitly removed.
1592        dict, objects = self.make_weak_keyed_dict()
1593        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1594
1595    def test_weak_values_destroy_while_iterating(self):
1596        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1597        dict, objects = self.make_weak_valued_dict()
1598        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1599        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1600        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1601        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1602        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1603        dict, objects = self.make_weak_valued_dict()
1604        @contextlib.contextmanager
1605        def testcontext():
1606            try:
1607                it = iter(dict.items())
1608                next(it)
1609                # Schedule a key/value for removal and recreate it
1610                k = objects.pop().arg
1611                gc.collect()      # just in case
1612                yield k, Object(k)
1613            finally:
1614                it = None           # should commit all removals
1615                gc.collect()
1616        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1617        dict, objects = self.make_weak_valued_dict()
1618        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1619
1620    def test_make_weak_keyed_dict_from_dict(self):
1621        o = Object(3)
1622        dict = weakref.WeakKeyDictionary({o:364})
1623        self.assertEqual(dict[o], 364)
1624
1625    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1626        o = Object(3)
1627        dict = weakref.WeakKeyDictionary({o:364})
1628        dict2 = weakref.WeakKeyDictionary(dict)
1629        self.assertEqual(dict[o], 364)
1630
1631    def make_weak_keyed_dict(self):
1632        dict = weakref.WeakKeyDictionary()
1633        objects = list(map(Object, range(self.COUNT)))
1634        for o in objects:
1635            dict[o] = o.arg
1636        return dict, objects
1637
1638    def test_make_weak_valued_dict_from_dict(self):
1639        o = Object(3)
1640        dict = weakref.WeakValueDictionary({364:o})
1641        self.assertEqual(dict[364], o)
1642
1643    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1644        o = Object(3)
1645        dict = weakref.WeakValueDictionary({364:o})
1646        dict2 = weakref.WeakValueDictionary(dict)
1647        self.assertEqual(dict[364], o)
1648
1649    def test_make_weak_valued_dict_misc(self):
1650        # errors
1651        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1652        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1653        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1654        # special keyword arguments
1655        o = Object(3)
1656        for kw in 'self', 'dict', 'other', 'iterable':
1657            d = weakref.WeakValueDictionary(**{kw: o})
1658            self.assertEqual(list(d.keys()), [kw])
1659            self.assertEqual(d[kw], o)
1660
1661    def make_weak_valued_dict(self):
1662        dict = weakref.WeakValueDictionary()
1663        objects = list(map(Object, range(self.COUNT)))
1664        for o in objects:
1665            dict[o.arg] = o
1666        return dict, objects
1667
1668    def check_popitem(self, klass, key1, value1, key2, value2):
1669        weakdict = klass()
1670        weakdict[key1] = value1
1671        weakdict[key2] = value2
1672        self.assertEqual(len(weakdict), 2)
1673        k, v = weakdict.popitem()
1674        self.assertEqual(len(weakdict), 1)
1675        if k is key1:
1676            self.assertIs(v, value1)
1677        else:
1678            self.assertIs(v, value2)
1679        k, v = weakdict.popitem()
1680        self.assertEqual(len(weakdict), 0)
1681        if k is key1:
1682            self.assertIs(v, value1)
1683        else:
1684            self.assertIs(v, value2)
1685
1686    def test_weak_valued_dict_popitem(self):
1687        self.check_popitem(weakref.WeakValueDictionary,
1688                           "key1", C(), "key2", C())
1689
1690    def test_weak_keyed_dict_popitem(self):
1691        self.check_popitem(weakref.WeakKeyDictionary,
1692                           C(), "value 1", C(), "value 2")
1693
1694    def check_setdefault(self, klass, key, value1, value2):
1695        self.assertIsNot(value1, value2,
1696                     "invalid test"
1697                     " -- value parameters must be distinct objects")
1698        weakdict = klass()
1699        o = weakdict.setdefault(key, value1)
1700        self.assertIs(o, value1)
1701        self.assertIn(key, weakdict)
1702        self.assertIs(weakdict.get(key), value1)
1703        self.assertIs(weakdict[key], value1)
1704
1705        o = weakdict.setdefault(key, value2)
1706        self.assertIs(o, value1)
1707        self.assertIn(key, weakdict)
1708        self.assertIs(weakdict.get(key), value1)
1709        self.assertIs(weakdict[key], value1)
1710
1711    def test_weak_valued_dict_setdefault(self):
1712        self.check_setdefault(weakref.WeakValueDictionary,
1713                              "key", C(), C())
1714
1715    def test_weak_keyed_dict_setdefault(self):
1716        self.check_setdefault(weakref.WeakKeyDictionary,
1717                              C(), "value 1", "value 2")
1718
1719    def check_update(self, klass, dict):
1720        #
1721        #  This exercises d.update(), len(d), d.keys(), k in d,
1722        #  d.get(), d[].
1723        #
1724        weakdict = klass()
1725        weakdict.update(dict)
1726        self.assertEqual(len(weakdict), len(dict))
1727        for k in weakdict.keys():
1728            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1729            v = dict.get(k)
1730            self.assertIs(v, weakdict[k])
1731            self.assertIs(v, weakdict.get(k))
1732        for k in dict.keys():
1733            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1734            v = dict[k]
1735            self.assertIs(v, weakdict[k])
1736            self.assertIs(v, weakdict.get(k))
1737
1738    def test_weak_valued_dict_update(self):
1739        self.check_update(weakref.WeakValueDictionary,
1740                          {1: C(), 'a': C(), C(): C()})
1741        # errors
1742        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1743        d = weakref.WeakValueDictionary()
1744        self.assertRaises(TypeError, d.update, {}, {})
1745        self.assertRaises(TypeError, d.update, (), ())
1746        self.assertEqual(list(d.keys()), [])
1747        # special keyword arguments
1748        o = Object(3)
1749        for kw in 'self', 'dict', 'other', 'iterable':
1750            d = weakref.WeakValueDictionary()
1751            d.update(**{kw: o})
1752            self.assertEqual(list(d.keys()), [kw])
1753            self.assertEqual(d[kw], o)
1754
1755    def test_weak_valued_union_operators(self):
1756        a = C()
1757        b = C()
1758        c = C()
1759        wvd1 = weakref.WeakValueDictionary({1: a})
1760        wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
1761        wvd3 = wvd1.copy()
1762        d1 = {1: c, 3: b}
1763        pairs = [(5, c), (6, b)]
1764
1765        tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
1766        self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
1767        self.assertIs(type(tmp1), weakref.WeakValueDictionary)
1768        wvd1 |= wvd2
1769        self.assertEqual(wvd1, tmp1)
1770
1771        tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
1772        self.assertEqual(dict(tmp2), dict(wvd2) | d1)
1773        self.assertIs(type(tmp2), weakref.WeakValueDictionary)
1774        wvd2 |= d1
1775        self.assertEqual(wvd2, tmp2)
1776
1777        tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
1778        tmp3 |= pairs
1779        self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
1780        self.assertIs(type(tmp3), weakref.WeakValueDictionary)
1781
1782        tmp4 = d1 | wvd3 # Testing .__ror__
1783        self.assertEqual(dict(tmp4), d1 | dict(wvd3))
1784        self.assertIs(type(tmp4), weakref.WeakValueDictionary)
1785
1786        del a
1787        self.assertNotIn(2, tmp1)
1788        self.assertNotIn(2, tmp2)
1789        self.assertNotIn(1, tmp3)
1790        self.assertNotIn(1, tmp4)
1791
1792    def test_weak_keyed_dict_update(self):
1793        self.check_update(weakref.WeakKeyDictionary,
1794                          {C(): 1, C(): 2, C(): 3})
1795
1796    def test_weak_keyed_delitem(self):
1797        d = weakref.WeakKeyDictionary()
1798        o1 = Object('1')
1799        o2 = Object('2')
1800        d[o1] = 'something'
1801        d[o2] = 'something'
1802        self.assertEqual(len(d), 2)
1803        del d[o1]
1804        self.assertEqual(len(d), 1)
1805        self.assertEqual(list(d.keys()), [o2])
1806
1807    def test_weak_keyed_union_operators(self):
1808        o1 = C()
1809        o2 = C()
1810        o3 = C()
1811        wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
1812        wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
1813        wkd3 = wkd1.copy()
1814        d1 = {o2: '5', o3: '6'}
1815        pairs = [(o2, 7), (o3, 8)]
1816
1817        tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
1818        self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
1819        self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
1820        wkd1 |= wkd2
1821        self.assertEqual(wkd1, tmp1)
1822
1823        tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
1824        self.assertEqual(dict(tmp2), dict(wkd2) | d1)
1825        self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
1826        wkd2 |= d1
1827        self.assertEqual(wkd2, tmp2)
1828
1829        tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
1830        tmp3 |= pairs
1831        self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
1832        self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
1833
1834        tmp4 = d1 | wkd3 # Testing .__ror__
1835        self.assertEqual(dict(tmp4), d1 | dict(wkd3))
1836        self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
1837
1838        del o1
1839        self.assertNotIn(4, tmp1.values())
1840        self.assertNotIn(4, tmp2.values())
1841        self.assertNotIn(1, tmp3.values())
1842        self.assertNotIn(1, tmp4.values())
1843
1844    def test_weak_valued_delitem(self):
1845        d = weakref.WeakValueDictionary()
1846        o1 = Object('1')
1847        o2 = Object('2')
1848        d['something'] = o1
1849        d['something else'] = o2
1850        self.assertEqual(len(d), 2)
1851        del d['something']
1852        self.assertEqual(len(d), 1)
1853        self.assertEqual(list(d.items()), [('something else', o2)])
1854
1855    def test_weak_keyed_bad_delitem(self):
1856        d = weakref.WeakKeyDictionary()
1857        o = Object('1')
1858        # An attempt to delete an object that isn't there should raise
1859        # KeyError.  It didn't before 2.3.
1860        self.assertRaises(KeyError, d.__delitem__, o)
1861        self.assertRaises(KeyError, d.__getitem__, o)
1862
1863        # If a key isn't of a weakly referencable type, __getitem__ and
1864        # __setitem__ raise TypeError.  __delitem__ should too.
1865        self.assertRaises(TypeError, d.__delitem__,  13)
1866        self.assertRaises(TypeError, d.__getitem__,  13)
1867        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1868
1869    def test_weak_keyed_cascading_deletes(self):
1870        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1871        # over the keys via self.data.iterkeys().  If things vanished from
1872        # the dict during this (or got added), that caused a RuntimeError.
1873
1874        d = weakref.WeakKeyDictionary()
1875        mutate = False
1876
1877        class C(object):
1878            def __init__(self, i):
1879                self.value = i
1880            def __hash__(self):
1881                return hash(self.value)
1882            def __eq__(self, other):
1883                if mutate:
1884                    # Side effect that mutates the dict, by removing the
1885                    # last strong reference to a key.
1886                    del objs[-1]
1887                return self.value == other.value
1888
1889        objs = [C(i) for i in range(4)]
1890        for o in objs:
1891            d[o] = o.value
1892        del o   # now the only strong references to keys are in objs
1893        # Find the order in which iterkeys sees the keys.
1894        objs = list(d.keys())
1895        # Reverse it, so that the iteration implementation of __delitem__
1896        # has to keep looping to find the first object we delete.
1897        objs.reverse()
1898
1899        # Turn on mutation in C.__eq__.  The first time through the loop,
1900        # under the iterkeys() business the first comparison will delete
1901        # the last item iterkeys() would see, and that causes a
1902        #     RuntimeError: dictionary changed size during iteration
1903        # when the iterkeys() loop goes around to try comparing the next
1904        # key.  After this was fixed, it just deletes the last object *our*
1905        # "for o in obj" loop would have gotten to.
1906        mutate = True
1907        count = 0
1908        for o in objs:
1909            count += 1
1910            del d[o]
1911        gc_collect()  # For PyPy or other GCs.
1912        self.assertEqual(len(d), 0)
1913        self.assertEqual(count, 2)
1914
1915    def test_make_weak_valued_dict_repr(self):
1916        dict = weakref.WeakValueDictionary()
1917        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1918
1919    def test_make_weak_keyed_dict_repr(self):
1920        dict = weakref.WeakKeyDictionary()
1921        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1922
1923    @threading_helper.requires_working_threading()
1924    def test_threaded_weak_valued_setdefault(self):
1925        d = weakref.WeakValueDictionary()
1926        with collect_in_thread():
1927            for i in range(self.NUM_THREADED_ITERATIONS):
1928                x = d.setdefault(10, RefCycle())
1929                self.assertIsNot(x, None)  # we never put None in there!
1930                del x
1931
1932    @threading_helper.requires_working_threading()
1933    def test_threaded_weak_valued_pop(self):
1934        d = weakref.WeakValueDictionary()
1935        with collect_in_thread():
1936            for i in range(self.NUM_THREADED_ITERATIONS):
1937                d[10] = RefCycle()
1938                x = d.pop(10, 10)
1939                self.assertIsNot(x, None)  # we never put None in there!
1940
1941    @threading_helper.requires_working_threading()
1942    def test_threaded_weak_valued_consistency(self):
1943        # Issue #28427: old keys should not remove new values from
1944        # WeakValueDictionary when collecting from another thread.
1945        d = weakref.WeakValueDictionary()
1946        with collect_in_thread():
1947            for i in range(2 * self.NUM_THREADED_ITERATIONS):
1948                o = RefCycle()
1949                d[10] = o
1950                # o is still alive, so the dict can't be empty
1951                self.assertEqual(len(d), 1)
1952                o = None  # lose ref
1953
1954    @support.cpython_only
1955    def test_weak_valued_consistency(self):
1956        # A single-threaded, deterministic repro for issue #28427: old keys
1957        # should not remove new values from WeakValueDictionary. This relies on
1958        # an implementation detail of CPython's WeakValueDictionary (its
1959        # underlying dictionary of KeyedRefs) to reproduce the issue.
1960        d = weakref.WeakValueDictionary()
1961        with support.disable_gc():
1962            d[10] = RefCycle()
1963            # Keep the KeyedRef alive after it's replaced so that GC will invoke
1964            # the callback.
1965            wr = d.data[10]
1966            # Replace the value with something that isn't cyclic garbage
1967            o = RefCycle()
1968            d[10] = o
1969            # Trigger GC, which will invoke the callback for `wr`
1970            gc.collect()
1971            self.assertEqual(len(d), 1)
1972
1973    def check_threaded_weak_dict_copy(self, type_, deepcopy):
1974        # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1975        # `deepcopy` should be either True or False.
1976        exc = []
1977
1978        class DummyKey:
1979            def __init__(self, ctr):
1980                self.ctr = ctr
1981
1982        class DummyValue:
1983            def __init__(self, ctr):
1984                self.ctr = ctr
1985
1986        def dict_copy(d, exc):
1987            try:
1988                if deepcopy is True:
1989                    _ = copy.deepcopy(d)
1990                else:
1991                    _ = d.copy()
1992            except Exception as ex:
1993                exc.append(ex)
1994
1995        def pop_and_collect(lst):
1996            gc_ctr = 0
1997            while lst:
1998                i = random.randint(0, len(lst) - 1)
1999                gc_ctr += 1
2000                lst.pop(i)
2001                if gc_ctr % 10000 == 0:
2002                    gc.collect()  # just in case
2003
2004        self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
2005
2006        d = type_()
2007        keys = []
2008        values = []
2009        # Initialize d with many entries
2010        for i in range(70000):
2011            k, v = DummyKey(i), DummyValue(i)
2012            keys.append(k)
2013            values.append(v)
2014            d[k] = v
2015            del k
2016            del v
2017
2018        t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
2019        if type_ is weakref.WeakKeyDictionary:
2020            t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
2021        else:  # weakref.WeakValueDictionary
2022            t_collect = threading.Thread(target=pop_and_collect, args=(values,))
2023
2024        t_copy.start()
2025        t_collect.start()
2026
2027        t_copy.join()
2028        t_collect.join()
2029
2030        # Test exceptions
2031        if exc:
2032            raise exc[0]
2033
2034    @threading_helper.requires_working_threading()
2035    def test_threaded_weak_key_dict_copy(self):
2036        # Issue #35615: Weakref keys or values getting GC'ed during dict
2037        # copying should not result in a crash.
2038        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
2039
2040    @threading_helper.requires_working_threading()
2041    @support.requires_resource('cpu')
2042    def test_threaded_weak_key_dict_deepcopy(self):
2043        # Issue #35615: Weakref keys or values getting GC'ed during dict
2044        # copying should not result in a crash.
2045        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
2046
2047    @threading_helper.requires_working_threading()
2048    def test_threaded_weak_value_dict_copy(self):
2049        # Issue #35615: Weakref keys or values getting GC'ed during dict
2050        # copying should not result in a crash.
2051        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
2052
2053    @threading_helper.requires_working_threading()
2054    @support.requires_resource('cpu')
2055    def test_threaded_weak_value_dict_deepcopy(self):
2056        # Issue #35615: Weakref keys or values getting GC'ed during dict
2057        # copying should not result in a crash.
2058        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
2059
2060    @support.cpython_only
2061    def test_remove_closure(self):
2062        d = weakref.WeakValueDictionary()
2063        self.assertIsNone(d._remove.__closure__)
2064
2065
2066from test import mapping_tests
2067
2068class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
2069    """Check that WeakValueDictionary conforms to the mapping protocol"""
2070    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
2071    type2test = weakref.WeakValueDictionary
2072    def _reference(self):
2073        return self.__ref.copy()
2074
2075class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
2076    """Check that WeakKeyDictionary conforms to the mapping protocol"""
2077    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
2078    type2test = weakref.WeakKeyDictionary
2079    def _reference(self):
2080        return self.__ref.copy()
2081
2082
2083class FinalizeTestCase(unittest.TestCase):
2084
2085    class A:
2086        pass
2087
2088    def _collect_if_necessary(self):
2089        # we create no ref-cycles so in CPython no gc should be needed
2090        if sys.implementation.name != 'cpython':
2091            support.gc_collect()
2092
2093    def test_finalize(self):
2094        def add(x,y,z):
2095            res.append(x + y + z)
2096            return x + y + z
2097
2098        a = self.A()
2099
2100        res = []
2101        f = weakref.finalize(a, add, 67, 43, z=89)
2102        self.assertEqual(f.alive, True)
2103        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
2104        self.assertEqual(f(), 199)
2105        self.assertEqual(f(), None)
2106        self.assertEqual(f(), None)
2107        self.assertEqual(f.peek(), None)
2108        self.assertEqual(f.detach(), None)
2109        self.assertEqual(f.alive, False)
2110        self.assertEqual(res, [199])
2111
2112        res = []
2113        f = weakref.finalize(a, add, 67, 43, 89)
2114        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
2115        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
2116        self.assertEqual(f(), None)
2117        self.assertEqual(f(), None)
2118        self.assertEqual(f.peek(), None)
2119        self.assertEqual(f.detach(), None)
2120        self.assertEqual(f.alive, False)
2121        self.assertEqual(res, [])
2122
2123        res = []
2124        f = weakref.finalize(a, add, x=67, y=43, z=89)
2125        del a
2126        self._collect_if_necessary()
2127        self.assertEqual(f(), None)
2128        self.assertEqual(f(), None)
2129        self.assertEqual(f.peek(), None)
2130        self.assertEqual(f.detach(), None)
2131        self.assertEqual(f.alive, False)
2132        self.assertEqual(res, [199])
2133
2134    def test_arg_errors(self):
2135        def fin(*args, **kwargs):
2136            res.append((args, kwargs))
2137
2138        a = self.A()
2139
2140        res = []
2141        f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
2142        self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
2143        f()
2144        self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
2145
2146        with self.assertRaises(TypeError):
2147            weakref.finalize(a, func=fin, arg=1)
2148        with self.assertRaises(TypeError):
2149            weakref.finalize(obj=a, func=fin, arg=1)
2150        self.assertRaises(TypeError, weakref.finalize, a)
2151        self.assertRaises(TypeError, weakref.finalize)
2152
2153    def test_order(self):
2154        a = self.A()
2155        res = []
2156
2157        f1 = weakref.finalize(a, res.append, 'f1')
2158        f2 = weakref.finalize(a, res.append, 'f2')
2159        f3 = weakref.finalize(a, res.append, 'f3')
2160        f4 = weakref.finalize(a, res.append, 'f4')
2161        f5 = weakref.finalize(a, res.append, 'f5')
2162
2163        # make sure finalizers can keep themselves alive
2164        del f1, f4
2165
2166        self.assertTrue(f2.alive)
2167        self.assertTrue(f3.alive)
2168        self.assertTrue(f5.alive)
2169
2170        self.assertTrue(f5.detach())
2171        self.assertFalse(f5.alive)
2172
2173        f5()                       # nothing because previously unregistered
2174        res.append('A')
2175        f3()                       # => res.append('f3')
2176        self.assertFalse(f3.alive)
2177        res.append('B')
2178        f3()                       # nothing because previously called
2179        res.append('C')
2180        del a
2181        self._collect_if_necessary()
2182                                   # => res.append('f4')
2183                                   # => res.append('f2')
2184                                   # => res.append('f1')
2185        self.assertFalse(f2.alive)
2186        res.append('D')
2187        f2()                       # nothing because previously called by gc
2188
2189        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
2190        self.assertEqual(res, expected)
2191
2192    def test_all_freed(self):
2193        # we want a weakrefable subclass of weakref.finalize
2194        class MyFinalizer(weakref.finalize):
2195            pass
2196
2197        a = self.A()
2198        res = []
2199        def callback():
2200            res.append(123)
2201        f = MyFinalizer(a, callback)
2202
2203        wr_callback = weakref.ref(callback)
2204        wr_f = weakref.ref(f)
2205        del callback, f
2206
2207        self.assertIsNotNone(wr_callback())
2208        self.assertIsNotNone(wr_f())
2209
2210        del a
2211        self._collect_if_necessary()
2212
2213        self.assertIsNone(wr_callback())
2214        self.assertIsNone(wr_f())
2215        self.assertEqual(res, [123])
2216
2217    @classmethod
2218    def run_in_child(cls):
2219        def error():
2220            # Create an atexit finalizer from inside a finalizer called
2221            # at exit.  This should be the next to be run.
2222            g1 = weakref.finalize(cls, print, 'g1')
2223            print('f3 error')
2224            1/0
2225
2226        # cls should stay alive till atexit callbacks run
2227        f1 = weakref.finalize(cls, print, 'f1', _global_var)
2228        f2 = weakref.finalize(cls, print, 'f2', _global_var)
2229        f3 = weakref.finalize(cls, error)
2230        f4 = weakref.finalize(cls, print, 'f4', _global_var)
2231
2232        assert f1.atexit == True
2233        f2.atexit = False
2234        assert f3.atexit == True
2235        assert f4.atexit == True
2236
2237    def test_atexit(self):
2238        prog = ('from test.test_weakref import FinalizeTestCase;'+
2239                'FinalizeTestCase.run_in_child()')
2240        rc, out, err = script_helper.assert_python_ok('-c', prog)
2241        out = out.decode('ascii').splitlines()
2242        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
2243        self.assertTrue(b'ZeroDivisionError' in err)
2244
2245
2246class ModuleTestCase(unittest.TestCase):
2247    def test_names(self):
2248        for name in ('ReferenceType', 'ProxyType', 'CallableProxyType',
2249                     'WeakMethod', 'WeakSet', 'WeakKeyDictionary', 'WeakValueDictionary'):
2250            obj = getattr(weakref, name)
2251            if name != 'WeakSet':
2252                self.assertEqual(obj.__module__, 'weakref')
2253            self.assertEqual(obj.__name__, name)
2254            self.assertEqual(obj.__qualname__, name)
2255
2256
2257libreftest = """ Doctest for examples in the library reference: weakref.rst
2258
2259>>> from test.support import gc_collect
2260>>> import weakref
2261>>> class Dict(dict):
2262...     pass
2263...
2264>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
2265>>> r = weakref.ref(obj)
2266>>> print(r() is obj)
2267True
2268
2269>>> import weakref
2270>>> class Object:
2271...     pass
2272...
2273>>> o = Object()
2274>>> r = weakref.ref(o)
2275>>> o2 = r()
2276>>> o is o2
2277True
2278>>> del o, o2
2279>>> gc_collect()  # For PyPy or other GCs.
2280>>> print(r())
2281None
2282
2283>>> import weakref
2284>>> class ExtendedRef(weakref.ref):
2285...     def __init__(self, ob, callback=None, **annotations):
2286...         super().__init__(ob, callback)
2287...         self.__counter = 0
2288...         for k, v in annotations.items():
2289...             setattr(self, k, v)
2290...     def __call__(self):
2291...         '''Return a pair containing the referent and the number of
2292...         times the reference has been called.
2293...         '''
2294...         ob = super().__call__()
2295...         if ob is not None:
2296...             self.__counter += 1
2297...             ob = (ob, self.__counter)
2298...         return ob
2299...
2300>>> class A:   # not in docs from here, just testing the ExtendedRef
2301...     pass
2302...
2303>>> a = A()
2304>>> r = ExtendedRef(a, foo=1, bar="baz")
2305>>> r.foo
23061
2307>>> r.bar
2308'baz'
2309>>> r()[1]
23101
2311>>> r()[1]
23122
2313>>> r()[0] is a
2314True
2315
2316
2317>>> import weakref
2318>>> _id2obj_dict = weakref.WeakValueDictionary()
2319>>> def remember(obj):
2320...     oid = id(obj)
2321...     _id2obj_dict[oid] = obj
2322...     return oid
2323...
2324>>> def id2obj(oid):
2325...     return _id2obj_dict[oid]
2326...
2327>>> a = A()             # from here, just testing
2328>>> a_id = remember(a)
2329>>> id2obj(a_id) is a
2330True
2331>>> del a
2332>>> gc_collect()  # For PyPy or other GCs.
2333>>> try:
2334...     id2obj(a_id)
2335... except KeyError:
2336...     print('OK')
2337... else:
2338...     print('WeakValueDictionary error')
2339OK
2340
2341"""
2342
2343__test__ = {'libreftest' : libreftest}
2344
2345def load_tests(loader, tests, pattern):
2346    tests.addTest(doctest.DocTestSuite())
2347    return tests
2348
2349
2350if __name__ == "__main__":
2351    unittest.main()
2352