• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import sys
3import unittest
4import collections
5import weakref
6import operator
7import contextlib
8import copy
9import threading
10import time
11import random
12
13from test import support
14from test.support import script_helper, ALWAYS_EQ
15from test.support import gc_collect
16
17# Used in ReferencesTestCase.test_ref_created_during_del() .
18ref_from_del = None
19
20# Used by FinalizeTestCase as a global that may be replaced by None
21# when the interpreter shuts down.
22_global_var = 'foobar'
23
24class C:
25    def method(self):
26        pass
27
28
29class Callable:
30    bar = None
31
32    def __call__(self, x):
33        self.bar = x
34
35
36def create_function():
37    def f(): pass
38    return f
39
40def create_bound_method():
41    return C().method
42
43
44class Object:
45    def __init__(self, arg):
46        self.arg = arg
47    def __repr__(self):
48        return "<Object %r>" % self.arg
49    def __eq__(self, other):
50        if isinstance(other, Object):
51            return self.arg == other.arg
52        return NotImplemented
53    def __lt__(self, other):
54        if isinstance(other, Object):
55            return self.arg < other.arg
56        return NotImplemented
57    def __hash__(self):
58        return hash(self.arg)
59    def some_method(self):
60        return 4
61    def other_method(self):
62        return 5
63
64
65class RefCycle:
66    def __init__(self):
67        self.cycle = self
68
69
70class TestBase(unittest.TestCase):
71
72    def setUp(self):
73        self.cbcalled = 0
74
75    def callback(self, ref):
76        self.cbcalled += 1
77
78
79@contextlib.contextmanager
80def collect_in_thread(period=0.0001):
81    """
82    Ensure GC collections happen in a different thread, at a high frequency.
83    """
84    please_stop = False
85
86    def collect():
87        while not please_stop:
88            time.sleep(period)
89            gc.collect()
90
91    with support.disable_gc():
92        t = threading.Thread(target=collect)
93        t.start()
94        try:
95            yield
96        finally:
97            please_stop = True
98            t.join()
99
100
101class ReferencesTestCase(TestBase):
102
103    def test_basic_ref(self):
104        self.check_basic_ref(C)
105        self.check_basic_ref(create_function)
106        self.check_basic_ref(create_bound_method)
107
108        # Just make sure the tp_repr handler doesn't raise an exception.
109        # Live reference:
110        o = C()
111        wr = weakref.ref(o)
112        repr(wr)
113        # Dead reference:
114        del o
115        repr(wr)
116
117    def test_basic_callback(self):
118        self.check_basic_callback(C)
119        self.check_basic_callback(create_function)
120        self.check_basic_callback(create_bound_method)
121
122    @support.cpython_only
123    def test_cfunction(self):
124        import _testcapi
125        create_cfunction = _testcapi.create_cfunction
126        f = create_cfunction()
127        wr = weakref.ref(f)
128        self.assertIs(wr(), f)
129        del f
130        self.assertIsNone(wr())
131        self.check_basic_ref(create_cfunction)
132        self.check_basic_callback(create_cfunction)
133
134    def test_multiple_callbacks(self):
135        o = C()
136        ref1 = weakref.ref(o, self.callback)
137        ref2 = weakref.ref(o, self.callback)
138        del o
139        gc_collect()  # For PyPy or other GCs.
140        self.assertIsNone(ref1(), "expected reference to be invalidated")
141        self.assertIsNone(ref2(), "expected reference to be invalidated")
142        self.assertEqual(self.cbcalled, 2,
143                     "callback not called the right number of times")
144
145    def test_multiple_selfref_callbacks(self):
146        # Make sure all references are invalidated before callbacks are called
147        #
148        # What's important here is that we're using the first
149        # reference in the callback invoked on the second reference
150        # (the most recently created ref is cleaned up first).  This
151        # tests that all references to the object are invalidated
152        # before any of the callbacks are invoked, so that we only
153        # have one invocation of _weakref.c:cleanup_helper() active
154        # for a particular object at a time.
155        #
156        def callback(object, self=self):
157            self.ref()
158        c = C()
159        self.ref = weakref.ref(c, callback)
160        ref1 = weakref.ref(c, callback)
161        del c
162
163    def test_constructor_kwargs(self):
164        c = C()
165        self.assertRaises(TypeError, weakref.ref, c, callback=None)
166
167    def test_proxy_ref(self):
168        o = C()
169        o.bar = 1
170        ref1 = weakref.proxy(o, self.callback)
171        ref2 = weakref.proxy(o, self.callback)
172        del o
173        gc_collect()  # For PyPy or other GCs.
174
175        def check(proxy):
176            proxy.bar
177
178        self.assertRaises(ReferenceError, check, ref1)
179        self.assertRaises(ReferenceError, check, ref2)
180        ref3 = weakref.proxy(C())
181        gc_collect()  # For PyPy or other GCs.
182        self.assertRaises(ReferenceError, bool, ref3)
183        self.assertEqual(self.cbcalled, 2)
184
185    def check_basic_ref(self, factory):
186        o = factory()
187        ref = weakref.ref(o)
188        self.assertIsNotNone(ref(),
189                     "weak reference to live object should be live")
190        o2 = ref()
191        self.assertIs(o, o2,
192                     "<ref>() should return original object if live")
193
194    def check_basic_callback(self, factory):
195        self.cbcalled = 0
196        o = factory()
197        ref = weakref.ref(o, self.callback)
198        del o
199        gc_collect()  # For PyPy or other GCs.
200        self.assertEqual(self.cbcalled, 1,
201                     "callback did not properly set 'cbcalled'")
202        self.assertIsNone(ref(),
203                     "ref2 should be dead after deleting object reference")
204
205    def test_ref_reuse(self):
206        o = C()
207        ref1 = weakref.ref(o)
208        # create a proxy to make sure that there's an intervening creation
209        # between these two; it should make no difference
210        proxy = weakref.proxy(o)
211        ref2 = weakref.ref(o)
212        self.assertIs(ref1, ref2,
213                     "reference object w/out callback should be re-used")
214
215        o = C()
216        proxy = weakref.proxy(o)
217        ref1 = weakref.ref(o)
218        ref2 = weakref.ref(o)
219        self.assertIs(ref1, ref2,
220                     "reference object w/out callback should be re-used")
221        self.assertEqual(weakref.getweakrefcount(o), 2,
222                     "wrong weak ref count for object")
223        del proxy
224        gc_collect()  # For PyPy or other GCs.
225        self.assertEqual(weakref.getweakrefcount(o), 1,
226                     "wrong weak ref count for object after deleting proxy")
227
228    def test_proxy_reuse(self):
229        o = C()
230        proxy1 = weakref.proxy(o)
231        ref = weakref.ref(o)
232        proxy2 = weakref.proxy(o)
233        self.assertIs(proxy1, proxy2,
234                     "proxy object w/out callback should have been re-used")
235
236    def test_basic_proxy(self):
237        o = C()
238        self.check_proxy(o, weakref.proxy(o))
239
240        L = collections.UserList()
241        p = weakref.proxy(L)
242        self.assertFalse(p, "proxy for empty UserList should be false")
243        p.append(12)
244        self.assertEqual(len(L), 1)
245        self.assertTrue(p, "proxy for non-empty UserList should be true")
246        p[:] = [2, 3]
247        self.assertEqual(len(L), 2)
248        self.assertEqual(len(p), 2)
249        self.assertIn(3, p, "proxy didn't support __contains__() properly")
250        p[1] = 5
251        self.assertEqual(L[1], 5)
252        self.assertEqual(p[1], 5)
253        L2 = collections.UserList(L)
254        p2 = weakref.proxy(L2)
255        self.assertEqual(p, p2)
256        ## self.assertEqual(repr(L2), repr(p2))
257        L3 = collections.UserList(range(10))
258        p3 = weakref.proxy(L3)
259        self.assertEqual(L3[:], p3[:])
260        self.assertEqual(L3[5:], p3[5:])
261        self.assertEqual(L3[:5], p3[:5])
262        self.assertEqual(L3[2:5], p3[2:5])
263
264    def test_proxy_unicode(self):
265        # See bug 5037
266        class C(object):
267            def __str__(self):
268                return "string"
269            def __bytes__(self):
270                return b"bytes"
271        instance = C()
272        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
273        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
274
275    def test_proxy_index(self):
276        class C:
277            def __index__(self):
278                return 10
279        o = C()
280        p = weakref.proxy(o)
281        self.assertEqual(operator.index(p), 10)
282
283    def test_proxy_div(self):
284        class C:
285            def __floordiv__(self, other):
286                return 42
287            def __ifloordiv__(self, other):
288                return 21
289        o = C()
290        p = weakref.proxy(o)
291        self.assertEqual(p // 5, 42)
292        p //= 5
293        self.assertEqual(p, 21)
294
295    def test_proxy_matmul(self):
296        class C:
297            def __matmul__(self, other):
298                return 1729
299            def __rmatmul__(self, other):
300                return -163
301            def __imatmul__(self, other):
302                return 561
303        o = C()
304        p = weakref.proxy(o)
305        self.assertEqual(p @ 5, 1729)
306        self.assertEqual(5 @ p, -163)
307        p @= 5
308        self.assertEqual(p, 561)
309
310    # The PyWeakref_* C API is documented as allowing either NULL or
311    # None as the value for the callback, where either means "no
312    # callback".  The "no callback" ref and proxy objects are supposed
313    # to be shared so long as they exist by all callers so long as
314    # they are active.  In Python 2.3.3 and earlier, this guarantee
315    # was not honored, and was broken in different ways for
316    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
317
318    def test_shared_ref_without_callback(self):
319        self.check_shared_without_callback(weakref.ref)
320
321    def test_shared_proxy_without_callback(self):
322        self.check_shared_without_callback(weakref.proxy)
323
324    def check_shared_without_callback(self, makeref):
325        o = Object(1)
326        p1 = makeref(o, None)
327        p2 = makeref(o, None)
328        self.assertIs(p1, p2, "both callbacks were None in the C API")
329        del p1, p2
330        p1 = makeref(o)
331        p2 = makeref(o, None)
332        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
333        del p1, p2
334        p1 = makeref(o)
335        p2 = makeref(o)
336        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
337        del p1, p2
338        p1 = makeref(o, None)
339        p2 = makeref(o)
340        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
341
342    def test_callable_proxy(self):
343        o = Callable()
344        ref1 = weakref.proxy(o)
345
346        self.check_proxy(o, ref1)
347
348        self.assertIs(type(ref1), weakref.CallableProxyType,
349                     "proxy is not of callable type")
350        ref1('twinkies!')
351        self.assertEqual(o.bar, 'twinkies!',
352                     "call through proxy not passed through to original")
353        ref1(x='Splat.')
354        self.assertEqual(o.bar, 'Splat.',
355                     "call through proxy not passed through to original")
356
357        # expect due to too few args
358        self.assertRaises(TypeError, ref1)
359
360        # expect due to too many args
361        self.assertRaises(TypeError, ref1, 1, 2, 3)
362
363    def check_proxy(self, o, proxy):
364        o.foo = 1
365        self.assertEqual(proxy.foo, 1,
366                     "proxy does not reflect attribute addition")
367        o.foo = 2
368        self.assertEqual(proxy.foo, 2,
369                     "proxy does not reflect attribute modification")
370        del o.foo
371        self.assertFalse(hasattr(proxy, 'foo'),
372                     "proxy does not reflect attribute removal")
373
374        proxy.foo = 1
375        self.assertEqual(o.foo, 1,
376                     "object does not reflect attribute addition via proxy")
377        proxy.foo = 2
378        self.assertEqual(o.foo, 2,
379            "object does not reflect attribute modification via proxy")
380        del proxy.foo
381        self.assertFalse(hasattr(o, 'foo'),
382                     "object does not reflect attribute removal via proxy")
383
384    def test_proxy_deletion(self):
385        # Test clearing of SF bug #762891
386        class Foo:
387            result = None
388            def __delitem__(self, accessor):
389                self.result = accessor
390        g = Foo()
391        f = weakref.proxy(g)
392        del f[0]
393        self.assertEqual(f.result, 0)
394
395    def test_proxy_bool(self):
396        # Test clearing of SF bug #1170766
397        class List(list): pass
398        lyst = List()
399        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
400
401    def test_proxy_iter(self):
402        # Test fails with a debug build of the interpreter
403        # (see bpo-38395).
404
405        obj = None
406
407        class MyObj:
408            def __iter__(self):
409                nonlocal obj
410                del obj
411                return NotImplemented
412
413        obj = MyObj()
414        p = weakref.proxy(obj)
415        with self.assertRaises(TypeError):
416            # "blech" in p calls MyObj.__iter__ through the proxy,
417            # without keeping a reference to the real object, so it
418            # can be killed in the middle of the call
419            "blech" in p
420
421    def test_proxy_next(self):
422        arr = [4, 5, 6]
423        def iterator_func():
424            yield from arr
425        it = iterator_func()
426
427        class IteratesWeakly:
428            def __iter__(self):
429                return weakref.proxy(it)
430
431        weak_it = IteratesWeakly()
432
433        # Calls proxy.__next__
434        self.assertEqual(list(weak_it), [4, 5, 6])
435
436    def test_proxy_bad_next(self):
437        # bpo-44720: PyIter_Next() shouldn't be called if the reference
438        # isn't an iterator.
439
440        not_an_iterator = lambda: 0
441
442        class A:
443            def __iter__(self):
444                return weakref.proxy(not_an_iterator)
445        a = A()
446
447        msg = "Weakref proxy referenced a non-iterator"
448        with self.assertRaisesRegex(TypeError, msg):
449            list(a)
450
451    def test_proxy_reversed(self):
452        class MyObj:
453            def __len__(self):
454                return 3
455            def __reversed__(self):
456                return iter('cba')
457
458        obj = MyObj()
459        self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
460
461    def test_proxy_hash(self):
462        class MyObj:
463            def __hash__(self):
464                return 42
465
466        obj = MyObj()
467        with self.assertRaises(TypeError):
468            hash(weakref.proxy(obj))
469
470        class MyObj:
471            __hash__ = None
472
473        obj = MyObj()
474        with self.assertRaises(TypeError):
475            hash(weakref.proxy(obj))
476
477    def test_getweakrefcount(self):
478        o = C()
479        ref1 = weakref.ref(o)
480        ref2 = weakref.ref(o, self.callback)
481        self.assertEqual(weakref.getweakrefcount(o), 2,
482                     "got wrong number of weak reference objects")
483
484        proxy1 = weakref.proxy(o)
485        proxy2 = weakref.proxy(o, self.callback)
486        self.assertEqual(weakref.getweakrefcount(o), 4,
487                     "got wrong number of weak reference objects")
488
489        del ref1, ref2, proxy1, proxy2
490        gc_collect()  # For PyPy or other GCs.
491        self.assertEqual(weakref.getweakrefcount(o), 0,
492                     "weak reference objects not unlinked from"
493                     " referent when discarded.")
494
495        # assumes ints do not support weakrefs
496        self.assertEqual(weakref.getweakrefcount(1), 0,
497                     "got wrong number of weak reference objects for int")
498
499    def test_getweakrefs(self):
500        o = C()
501        ref1 = weakref.ref(o, self.callback)
502        ref2 = weakref.ref(o, self.callback)
503        del ref1
504        gc_collect()  # For PyPy or other GCs.
505        self.assertEqual(weakref.getweakrefs(o), [ref2],
506                     "list of refs does not match")
507
508        o = C()
509        ref1 = weakref.ref(o, self.callback)
510        ref2 = weakref.ref(o, self.callback)
511        del ref2
512        gc_collect()  # For PyPy or other GCs.
513        self.assertEqual(weakref.getweakrefs(o), [ref1],
514                     "list of refs does not match")
515
516        del ref1
517        gc_collect()  # For PyPy or other GCs.
518        self.assertEqual(weakref.getweakrefs(o), [],
519                     "list of refs not cleared")
520
521        # assumes ints do not support weakrefs
522        self.assertEqual(weakref.getweakrefs(1), [],
523                     "list of refs does not match for int")
524
525    def test_newstyle_number_ops(self):
526        class F(float):
527            pass
528        f = F(2.0)
529        p = weakref.proxy(f)
530        self.assertEqual(p + 1.0, 3.0)
531        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
532
533    def test_callbacks_protected(self):
534        # Callbacks protected from already-set exceptions?
535        # Regression test for SF bug #478534.
536        class BogusError(Exception):
537            pass
538        data = {}
539        def remove(k):
540            del data[k]
541        def encapsulate():
542            f = lambda : ()
543            data[weakref.ref(f, remove)] = None
544            raise BogusError
545        try:
546            encapsulate()
547        except BogusError:
548            pass
549        else:
550            self.fail("exception not properly restored")
551        try:
552            encapsulate()
553        except BogusError:
554            pass
555        else:
556            self.fail("exception not properly restored")
557
558    def test_sf_bug_840829(self):
559        # "weakref callbacks and gc corrupt memory"
560        # subtype_dealloc erroneously exposed a new-style instance
561        # already in the process of getting deallocated to gc,
562        # causing double-deallocation if the instance had a weakref
563        # callback that triggered gc.
564        # If the bug exists, there probably won't be an obvious symptom
565        # in a release build.  In a debug build, a segfault will occur
566        # when the second attempt to remove the instance from the "list
567        # of all objects" occurs.
568
569        import gc
570
571        class C(object):
572            pass
573
574        c = C()
575        wr = weakref.ref(c, lambda ignore: gc.collect())
576        del c
577
578        # There endeth the first part.  It gets worse.
579        del wr
580
581        c1 = C()
582        c1.i = C()
583        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
584
585        c2 = C()
586        c2.c1 = c1
587        del c1  # still alive because c2 points to it
588
589        # Now when subtype_dealloc gets called on c2, it's not enough just
590        # that c2 is immune from gc while the weakref callbacks associated
591        # with c2 execute (there are none in this 2nd half of the test, btw).
592        # subtype_dealloc goes on to call the base classes' deallocs too,
593        # so any gc triggered by weakref callbacks associated with anything
594        # torn down by a base class dealloc can also trigger double
595        # deallocation of c2.
596        del c2
597
598    def test_callback_in_cycle_1(self):
599        import gc
600
601        class J(object):
602            pass
603
604        class II(object):
605            def acallback(self, ignore):
606                self.J
607
608        I = II()
609        I.J = J
610        I.wr = weakref.ref(J, I.acallback)
611
612        # Now J and II are each in a self-cycle (as all new-style class
613        # objects are, since their __mro__ points back to them).  I holds
614        # both a weak reference (I.wr) and a strong reference (I.J) to class
615        # J.  I is also in a cycle (I.wr points to a weakref that references
616        # I.acallback).  When we del these three, they all become trash, but
617        # the cycles prevent any of them from getting cleaned up immediately.
618        # Instead they have to wait for cyclic gc to deduce that they're
619        # trash.
620        #
621        # gc used to call tp_clear on all of them, and the order in which
622        # it does that is pretty accidental.  The exact order in which we
623        # built up these things manages to provoke gc into running tp_clear
624        # in just the right order (I last).  Calling tp_clear on II leaves
625        # behind an insane class object (its __mro__ becomes NULL).  Calling
626        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
627        # just then because of the strong reference from I.J.  Calling
628        # tp_clear on I starts to clear I's __dict__, and just happens to
629        # clear I.J first -- I.wr is still intact.  That removes the last
630        # reference to J, which triggers the weakref callback.  The callback
631        # tries to do "self.J", and instances of new-style classes look up
632        # attributes ("J") in the class dict first.  The class (II) wants to
633        # search II.__mro__, but that's NULL.   The result was a segfault in
634        # a release build, and an assert failure in a debug build.
635        del I, J, II
636        gc.collect()
637
638    def test_callback_in_cycle_2(self):
639        import gc
640
641        # This is just like test_callback_in_cycle_1, except that II is an
642        # old-style class.  The symptom is different then:  an instance of an
643        # old-style class looks in its own __dict__ first.  'J' happens to
644        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
645        # __dict__, so the attribute isn't found.  The difference is that
646        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
647        # __mro__), so no segfault occurs.  Instead it got:
648        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
649        #    Exception exceptions.AttributeError:
650        #   "II instance has no attribute 'J'" in <bound method II.acallback
651        #       of <?.II instance at 0x00B9B4B8>> ignored
652
653        class J(object):
654            pass
655
656        class II:
657            def acallback(self, ignore):
658                self.J
659
660        I = II()
661        I.J = J
662        I.wr = weakref.ref(J, I.acallback)
663
664        del I, J, II
665        gc.collect()
666
667    def test_callback_in_cycle_3(self):
668        import gc
669
670        # This one broke the first patch that fixed the last two.  In this
671        # case, the objects reachable from the callback aren't also reachable
672        # from the object (c1) *triggering* the callback:  you can get to
673        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
674        # got tp_clear'ed by the time the c2.cb callback got invoked.
675
676        class C:
677            def cb(self, ignore):
678                self.me
679                self.c1
680                self.wr
681
682        c1, c2 = C(), C()
683
684        c2.me = c2
685        c2.c1 = c1
686        c2.wr = weakref.ref(c1, c2.cb)
687
688        del c1, c2
689        gc.collect()
690
691    def test_callback_in_cycle_4(self):
692        import gc
693
694        # Like test_callback_in_cycle_3, except c2 and c1 have different
695        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
696        # objects reachable from the dying object (c1) isn't enough to stop
697        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
698        # The result was a segfault (C.__mro__ was NULL when the callback
699        # tried to look up self.me).
700
701        class C(object):
702            def cb(self, ignore):
703                self.me
704                self.c1
705                self.wr
706
707        class D:
708            pass
709
710        c1, c2 = D(), C()
711
712        c2.me = c2
713        c2.c1 = c1
714        c2.wr = weakref.ref(c1, c2.cb)
715
716        del c1, c2, C, D
717        gc.collect()
718
719    def test_callback_in_cycle_resurrection(self):
720        import gc
721
722        # Do something nasty in a weakref callback:  resurrect objects
723        # from dead cycles.  For this to be attempted, the weakref and
724        # its callback must also be part of the cyclic trash (else the
725        # objects reachable via the callback couldn't be in cyclic trash
726        # to begin with -- the callback would act like an external root).
727        # But gc clears trash weakrefs with callbacks early now, which
728        # disables the callbacks, so the callbacks shouldn't get called
729        # at all (and so nothing actually gets resurrected).
730
731        alist = []
732        class C(object):
733            def __init__(self, value):
734                self.attribute = value
735
736            def acallback(self, ignore):
737                alist.append(self.c)
738
739        c1, c2 = C(1), C(2)
740        c1.c = c2
741        c2.c = c1
742        c1.wr = weakref.ref(c2, c1.acallback)
743        c2.wr = weakref.ref(c1, c2.acallback)
744
745        def C_went_away(ignore):
746            alist.append("C went away")
747        wr = weakref.ref(C, C_went_away)
748
749        del c1, c2, C   # make them all trash
750        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
751
752        gc.collect()
753        # c1.wr and c2.wr were part of the cyclic trash, so should have
754        # been cleared without their callbacks executing.  OTOH, the weakref
755        # to C is bound to a function local (wr), and wasn't trash, so that
756        # callback should have been invoked when C went away.
757        self.assertEqual(alist, ["C went away"])
758        # The remaining weakref should be dead now (its callback ran).
759        self.assertEqual(wr(), None)
760
761        del alist[:]
762        gc.collect()
763        self.assertEqual(alist, [])
764
765    def test_callbacks_on_callback(self):
766        import gc
767
768        # Set up weakref callbacks *on* weakref callbacks.
769        alist = []
770        def safe_callback(ignore):
771            alist.append("safe_callback called")
772
773        class C(object):
774            def cb(self, ignore):
775                alist.append("cb called")
776
777        c, d = C(), C()
778        c.other = d
779        d.other = c
780        callback = c.cb
781        c.wr = weakref.ref(d, callback)     # this won't trigger
782        d.wr = weakref.ref(callback, d.cb)  # ditto
783        external_wr = weakref.ref(callback, safe_callback)  # but this will
784        self.assertIs(external_wr(), callback)
785
786        # The weakrefs attached to c and d should get cleared, so that
787        # C.cb is never called.  But external_wr isn't part of the cyclic
788        # trash, and no cyclic trash is reachable from it, so safe_callback
789        # should get invoked when the bound method object callback (c.cb)
790        # -- which is itself a callback, and also part of the cyclic trash --
791        # gets reclaimed at the end of gc.
792
793        del callback, c, d, C
794        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
795        gc.collect()
796        self.assertEqual(alist, ["safe_callback called"])
797        self.assertEqual(external_wr(), None)
798
799        del alist[:]
800        gc.collect()
801        self.assertEqual(alist, [])
802
803    def test_gc_during_ref_creation(self):
804        self.check_gc_during_creation(weakref.ref)
805
806    def test_gc_during_proxy_creation(self):
807        self.check_gc_during_creation(weakref.proxy)
808
809    def check_gc_during_creation(self, makeref):
810        thresholds = gc.get_threshold()
811        gc.set_threshold(1, 1, 1)
812        gc.collect()
813        class A:
814            pass
815
816        def callback(*args):
817            pass
818
819        referenced = A()
820
821        a = A()
822        a.a = a
823        a.wr = makeref(referenced)
824
825        try:
826            # now make sure the object and the ref get labeled as
827            # cyclic trash:
828            a = A()
829            weakref.ref(referenced, callback)
830
831        finally:
832            gc.set_threshold(*thresholds)
833
834    def test_ref_created_during_del(self):
835        # Bug #1377858
836        # A weakref created in an object's __del__() would crash the
837        # interpreter when the weakref was cleaned up since it would refer to
838        # non-existent memory.  This test should not segfault the interpreter.
839        class Target(object):
840            def __del__(self):
841                global ref_from_del
842                ref_from_del = weakref.ref(self)
843
844        w = Target()
845
846    def test_init(self):
847        # Issue 3634
848        # <weakref to class>.__init__() doesn't check errors correctly
849        r = weakref.ref(Exception)
850        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
851        # No exception should be raised here
852        gc.collect()
853
854    def test_classes(self):
855        # Check that classes are weakrefable.
856        class A(object):
857            pass
858        l = []
859        weakref.ref(int)
860        a = weakref.ref(A, l.append)
861        A = None
862        gc.collect()
863        self.assertEqual(a(), None)
864        self.assertEqual(l, [a])
865
866    def test_equality(self):
867        # Alive weakrefs defer equality testing to their underlying object.
868        x = Object(1)
869        y = Object(1)
870        z = Object(2)
871        a = weakref.ref(x)
872        b = weakref.ref(y)
873        c = weakref.ref(z)
874        d = weakref.ref(x)
875        # Note how we directly test the operators here, to stress both
876        # __eq__ and __ne__.
877        self.assertTrue(a == b)
878        self.assertFalse(a != b)
879        self.assertFalse(a == c)
880        self.assertTrue(a != c)
881        self.assertTrue(a == d)
882        self.assertFalse(a != d)
883        self.assertFalse(a == x)
884        self.assertTrue(a != x)
885        self.assertTrue(a == ALWAYS_EQ)
886        self.assertFalse(a != ALWAYS_EQ)
887        del x, y, z
888        gc.collect()
889        for r in a, b, c:
890            # Sanity check
891            self.assertIs(r(), None)
892        # Dead weakrefs compare by identity: whether `a` and `d` are the
893        # same weakref object is an implementation detail, since they pointed
894        # to the same original object and didn't have a callback.
895        # (see issue #16453).
896        self.assertFalse(a == b)
897        self.assertTrue(a != b)
898        self.assertFalse(a == c)
899        self.assertTrue(a != c)
900        self.assertEqual(a == d, a is d)
901        self.assertEqual(a != d, a is not d)
902
903    def test_ordering(self):
904        # weakrefs cannot be ordered, even if the underlying objects can.
905        ops = [operator.lt, operator.gt, operator.le, operator.ge]
906        x = Object(1)
907        y = Object(1)
908        a = weakref.ref(x)
909        b = weakref.ref(y)
910        for op in ops:
911            self.assertRaises(TypeError, op, a, b)
912        # Same when dead.
913        del x, y
914        gc.collect()
915        for op in ops:
916            self.assertRaises(TypeError, op, a, b)
917
918    def test_hashing(self):
919        # Alive weakrefs hash the same as the underlying object
920        x = Object(42)
921        y = Object(42)
922        a = weakref.ref(x)
923        b = weakref.ref(y)
924        self.assertEqual(hash(a), hash(42))
925        del x, y
926        gc.collect()
927        # Dead weakrefs:
928        # - retain their hash is they were hashed when alive;
929        # - otherwise, cannot be hashed.
930        self.assertEqual(hash(a), hash(42))
931        self.assertRaises(TypeError, hash, b)
932
933    def test_trashcan_16602(self):
934        # Issue #16602: when a weakref's target was part of a long
935        # deallocation chain, the trashcan mechanism could delay clearing
936        # of the weakref and make the target object visible from outside
937        # code even though its refcount had dropped to 0.  A crash ensued.
938        class C:
939            def __init__(self, parent):
940                if not parent:
941                    return
942                wself = weakref.ref(self)
943                def cb(wparent):
944                    o = wself()
945                self.wparent = weakref.ref(parent, cb)
946
947        d = weakref.WeakKeyDictionary()
948        root = c = C(None)
949        for n in range(100):
950            d[c] = c = C(c)
951        del root
952        gc.collect()
953
954    def test_callback_attribute(self):
955        x = Object(1)
956        callback = lambda ref: None
957        ref1 = weakref.ref(x, callback)
958        self.assertIs(ref1.__callback__, callback)
959
960        ref2 = weakref.ref(x)
961        self.assertIsNone(ref2.__callback__)
962
963    def test_callback_attribute_after_deletion(self):
964        x = Object(1)
965        ref = weakref.ref(x, self.callback)
966        self.assertIsNotNone(ref.__callback__)
967        del x
968        support.gc_collect()
969        self.assertIsNone(ref.__callback__)
970
971    def test_set_callback_attribute(self):
972        x = Object(1)
973        callback = lambda ref: None
974        ref1 = weakref.ref(x, callback)
975        with self.assertRaises(AttributeError):
976            ref1.__callback__ = lambda ref: None
977
978    def test_callback_gcs(self):
979        class ObjectWithDel(Object):
980            def __del__(self): pass
981        x = ObjectWithDel(1)
982        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
983        del x
984        support.gc_collect()
985
986
987class SubclassableWeakrefTestCase(TestBase):
988
989    def test_subclass_refs(self):
990        class MyRef(weakref.ref):
991            def __init__(self, ob, callback=None, value=42):
992                self.value = value
993                super().__init__(ob, callback)
994            def __call__(self):
995                self.called = True
996                return super().__call__()
997        o = Object("foo")
998        mr = MyRef(o, value=24)
999        self.assertIs(mr(), o)
1000        self.assertTrue(mr.called)
1001        self.assertEqual(mr.value, 24)
1002        del o
1003        gc_collect()  # For PyPy or other GCs.
1004        self.assertIsNone(mr())
1005        self.assertTrue(mr.called)
1006
1007    def test_subclass_refs_dont_replace_standard_refs(self):
1008        class MyRef(weakref.ref):
1009            pass
1010        o = Object(42)
1011        r1 = MyRef(o)
1012        r2 = weakref.ref(o)
1013        self.assertIsNot(r1, r2)
1014        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
1015        self.assertEqual(weakref.getweakrefcount(o), 2)
1016        r3 = MyRef(o)
1017        self.assertEqual(weakref.getweakrefcount(o), 3)
1018        refs = weakref.getweakrefs(o)
1019        self.assertEqual(len(refs), 3)
1020        self.assertIs(r2, refs[0])
1021        self.assertIn(r1, refs[1:])
1022        self.assertIn(r3, refs[1:])
1023
1024    def test_subclass_refs_dont_conflate_callbacks(self):
1025        class MyRef(weakref.ref):
1026            pass
1027        o = Object(42)
1028        r1 = MyRef(o, id)
1029        r2 = MyRef(o, str)
1030        self.assertIsNot(r1, r2)
1031        refs = weakref.getweakrefs(o)
1032        self.assertIn(r1, refs)
1033        self.assertIn(r2, refs)
1034
1035    def test_subclass_refs_with_slots(self):
1036        class MyRef(weakref.ref):
1037            __slots__ = "slot1", "slot2"
1038            def __new__(type, ob, callback, slot1, slot2):
1039                return weakref.ref.__new__(type, ob, callback)
1040            def __init__(self, ob, callback, slot1, slot2):
1041                self.slot1 = slot1
1042                self.slot2 = slot2
1043            def meth(self):
1044                return self.slot1 + self.slot2
1045        o = Object(42)
1046        r = MyRef(o, None, "abc", "def")
1047        self.assertEqual(r.slot1, "abc")
1048        self.assertEqual(r.slot2, "def")
1049        self.assertEqual(r.meth(), "abcdef")
1050        self.assertFalse(hasattr(r, "__dict__"))
1051
1052    def test_subclass_refs_with_cycle(self):
1053        """Confirm https://bugs.python.org/issue3100 is fixed."""
1054        # An instance of a weakref subclass can have attributes.
1055        # If such a weakref holds the only strong reference to the object,
1056        # deleting the weakref will delete the object. In this case,
1057        # the callback must not be called, because the ref object is
1058        # being deleted.
1059        class MyRef(weakref.ref):
1060            pass
1061
1062        # Use a local callback, for "regrtest -R::"
1063        # to detect refcounting problems
1064        def callback(w):
1065            self.cbcalled += 1
1066
1067        o = C()
1068        r1 = MyRef(o, callback)
1069        r1.o = o
1070        del o
1071
1072        del r1 # Used to crash here
1073
1074        self.assertEqual(self.cbcalled, 0)
1075
1076        # Same test, with two weakrefs to the same object
1077        # (since code paths are different)
1078        o = C()
1079        r1 = MyRef(o, callback)
1080        r2 = MyRef(o, callback)
1081        r1.r = r2
1082        r2.o = o
1083        del o
1084        del r2
1085
1086        del r1 # Used to crash here
1087
1088        self.assertEqual(self.cbcalled, 0)
1089
1090
1091class WeakMethodTestCase(unittest.TestCase):
1092
1093    def _subclass(self):
1094        """Return an Object subclass overriding `some_method`."""
1095        class C(Object):
1096            def some_method(self):
1097                return 6
1098        return C
1099
1100    def test_alive(self):
1101        o = Object(1)
1102        r = weakref.WeakMethod(o.some_method)
1103        self.assertIsInstance(r, weakref.ReferenceType)
1104        self.assertIsInstance(r(), type(o.some_method))
1105        self.assertIs(r().__self__, o)
1106        self.assertIs(r().__func__, o.some_method.__func__)
1107        self.assertEqual(r()(), 4)
1108
1109    def test_object_dead(self):
1110        o = Object(1)
1111        r = weakref.WeakMethod(o.some_method)
1112        del o
1113        gc.collect()
1114        self.assertIs(r(), None)
1115
1116    def test_method_dead(self):
1117        C = self._subclass()
1118        o = C(1)
1119        r = weakref.WeakMethod(o.some_method)
1120        del C.some_method
1121        gc.collect()
1122        self.assertIs(r(), None)
1123
1124    def test_callback_when_object_dead(self):
1125        # Test callback behaviour when object dies first.
1126        C = self._subclass()
1127        calls = []
1128        def cb(arg):
1129            calls.append(arg)
1130        o = C(1)
1131        r = weakref.WeakMethod(o.some_method, cb)
1132        del o
1133        gc.collect()
1134        self.assertEqual(calls, [r])
1135        # Callback is only called once.
1136        C.some_method = Object.some_method
1137        gc.collect()
1138        self.assertEqual(calls, [r])
1139
1140    def test_callback_when_method_dead(self):
1141        # Test callback behaviour when method dies first.
1142        C = self._subclass()
1143        calls = []
1144        def cb(arg):
1145            calls.append(arg)
1146        o = C(1)
1147        r = weakref.WeakMethod(o.some_method, cb)
1148        del C.some_method
1149        gc.collect()
1150        self.assertEqual(calls, [r])
1151        # Callback is only called once.
1152        del o
1153        gc.collect()
1154        self.assertEqual(calls, [r])
1155
1156    @support.cpython_only
1157    def test_no_cycles(self):
1158        # A WeakMethod doesn't create any reference cycle to itself.
1159        o = Object(1)
1160        def cb(_):
1161            pass
1162        r = weakref.WeakMethod(o.some_method, cb)
1163        wr = weakref.ref(r)
1164        del r
1165        self.assertIs(wr(), None)
1166
1167    def test_equality(self):
1168        def _eq(a, b):
1169            self.assertTrue(a == b)
1170            self.assertFalse(a != b)
1171        def _ne(a, b):
1172            self.assertTrue(a != b)
1173            self.assertFalse(a == b)
1174        x = Object(1)
1175        y = Object(1)
1176        a = weakref.WeakMethod(x.some_method)
1177        b = weakref.WeakMethod(y.some_method)
1178        c = weakref.WeakMethod(x.other_method)
1179        d = weakref.WeakMethod(y.other_method)
1180        # Objects equal, same method
1181        _eq(a, b)
1182        _eq(c, d)
1183        # Objects equal, different method
1184        _ne(a, c)
1185        _ne(a, d)
1186        _ne(b, c)
1187        _ne(b, d)
1188        # Objects unequal, same or different method
1189        z = Object(2)
1190        e = weakref.WeakMethod(z.some_method)
1191        f = weakref.WeakMethod(z.other_method)
1192        _ne(a, e)
1193        _ne(a, f)
1194        _ne(b, e)
1195        _ne(b, f)
1196        # Compare with different types
1197        _ne(a, x.some_method)
1198        _eq(a, ALWAYS_EQ)
1199        del x, y, z
1200        gc.collect()
1201        # Dead WeakMethods compare by identity
1202        refs = a, b, c, d, e, f
1203        for q in refs:
1204            for r in refs:
1205                self.assertEqual(q == r, q is r)
1206                self.assertEqual(q != r, q is not r)
1207
1208    def test_hashing(self):
1209        # Alive WeakMethods are hashable if the underlying object is
1210        # hashable.
1211        x = Object(1)
1212        y = Object(1)
1213        a = weakref.WeakMethod(x.some_method)
1214        b = weakref.WeakMethod(y.some_method)
1215        c = weakref.WeakMethod(y.other_method)
1216        # Since WeakMethod objects are equal, the hashes should be equal.
1217        self.assertEqual(hash(a), hash(b))
1218        ha = hash(a)
1219        # Dead WeakMethods retain their old hash value
1220        del x, y
1221        gc.collect()
1222        self.assertEqual(hash(a), ha)
1223        self.assertEqual(hash(b), ha)
1224        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1225        self.assertRaises(TypeError, hash, c)
1226
1227
1228class MappingTestCase(TestBase):
1229
1230    COUNT = 10
1231
1232    def check_len_cycles(self, dict_type, cons):
1233        N = 20
1234        items = [RefCycle() for i in range(N)]
1235        dct = dict_type(cons(o) for o in items)
1236        # Keep an iterator alive
1237        it = dct.items()
1238        try:
1239            next(it)
1240        except StopIteration:
1241            pass
1242        del items
1243        gc.collect()
1244        n1 = len(dct)
1245        del it
1246        gc.collect()
1247        n2 = len(dct)
1248        # one item may be kept alive inside the iterator
1249        self.assertIn(n1, (0, 1))
1250        self.assertEqual(n2, 0)
1251
1252    def test_weak_keyed_len_cycles(self):
1253        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1254
1255    def test_weak_valued_len_cycles(self):
1256        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1257
1258    def check_len_race(self, dict_type, cons):
1259        # Extended sanity checks for len() in the face of cyclic collection
1260        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1261        for th in range(1, 100):
1262            N = 20
1263            gc.collect(0)
1264            gc.set_threshold(th, th, th)
1265            items = [RefCycle() for i in range(N)]
1266            dct = dict_type(cons(o) for o in items)
1267            del items
1268            # All items will be collected at next garbage collection pass
1269            it = dct.items()
1270            try:
1271                next(it)
1272            except StopIteration:
1273                pass
1274            n1 = len(dct)
1275            del it
1276            n2 = len(dct)
1277            self.assertGreaterEqual(n1, 0)
1278            self.assertLessEqual(n1, N)
1279            self.assertGreaterEqual(n2, 0)
1280            self.assertLessEqual(n2, n1)
1281
1282    def test_weak_keyed_len_race(self):
1283        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1284
1285    def test_weak_valued_len_race(self):
1286        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1287
1288    def test_weak_values(self):
1289        #
1290        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1291        #
1292        dict, objects = self.make_weak_valued_dict()
1293        for o in objects:
1294            self.assertEqual(weakref.getweakrefcount(o), 1)
1295            self.assertIs(o, dict[o.arg],
1296                         "wrong object returned by weak dict!")
1297        items1 = list(dict.items())
1298        items2 = list(dict.copy().items())
1299        items1.sort()
1300        items2.sort()
1301        self.assertEqual(items1, items2,
1302                     "cloning of weak-valued dictionary did not work!")
1303        del items1, items2
1304        self.assertEqual(len(dict), self.COUNT)
1305        del objects[0]
1306        gc_collect()  # For PyPy or other GCs.
1307        self.assertEqual(len(dict), self.COUNT - 1,
1308                     "deleting object did not cause dictionary update")
1309        del objects, o
1310        gc_collect()  # For PyPy or other GCs.
1311        self.assertEqual(len(dict), 0,
1312                     "deleting the values did not clear the dictionary")
1313        # regression on SF bug #447152:
1314        dict = weakref.WeakValueDictionary()
1315        self.assertRaises(KeyError, dict.__getitem__, 1)
1316        dict[2] = C()
1317        gc_collect()  # For PyPy or other GCs.
1318        self.assertRaises(KeyError, dict.__getitem__, 2)
1319
1320    def test_weak_keys(self):
1321        #
1322        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1323        #  len(d), k in d.
1324        #
1325        dict, objects = self.make_weak_keyed_dict()
1326        for o in objects:
1327            self.assertEqual(weakref.getweakrefcount(o), 1,
1328                         "wrong number of weak references to %r!" % o)
1329            self.assertIs(o.arg, dict[o],
1330                         "wrong object returned by weak dict!")
1331        items1 = dict.items()
1332        items2 = dict.copy().items()
1333        self.assertEqual(set(items1), set(items2),
1334                     "cloning of weak-keyed dictionary did not work!")
1335        del items1, items2
1336        self.assertEqual(len(dict), self.COUNT)
1337        del objects[0]
1338        gc_collect()  # For PyPy or other GCs.
1339        self.assertEqual(len(dict), (self.COUNT - 1),
1340                     "deleting object did not cause dictionary update")
1341        del objects, o
1342        gc_collect()  # For PyPy or other GCs.
1343        self.assertEqual(len(dict), 0,
1344                     "deleting the keys did not clear the dictionary")
1345        o = Object(42)
1346        dict[o] = "What is the meaning of the universe?"
1347        self.assertIn(o, dict)
1348        self.assertNotIn(34, dict)
1349
1350    def test_weak_keyed_iters(self):
1351        dict, objects = self.make_weak_keyed_dict()
1352        self.check_iters(dict)
1353
1354        # Test keyrefs()
1355        refs = dict.keyrefs()
1356        self.assertEqual(len(refs), len(objects))
1357        objects2 = list(objects)
1358        for wr in refs:
1359            ob = wr()
1360            self.assertIn(ob, dict)
1361            self.assertIn(ob, dict)
1362            self.assertEqual(ob.arg, dict[ob])
1363            objects2.remove(ob)
1364        self.assertEqual(len(objects2), 0)
1365
1366        # Test iterkeyrefs()
1367        objects2 = list(objects)
1368        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1369        for wr in dict.keyrefs():
1370            ob = wr()
1371            self.assertIn(ob, dict)
1372            self.assertIn(ob, dict)
1373            self.assertEqual(ob.arg, dict[ob])
1374            objects2.remove(ob)
1375        self.assertEqual(len(objects2), 0)
1376
1377    def test_weak_valued_iters(self):
1378        dict, objects = self.make_weak_valued_dict()
1379        self.check_iters(dict)
1380
1381        # Test valuerefs()
1382        refs = dict.valuerefs()
1383        self.assertEqual(len(refs), len(objects))
1384        objects2 = list(objects)
1385        for wr in refs:
1386            ob = wr()
1387            self.assertEqual(ob, dict[ob.arg])
1388            self.assertEqual(ob.arg, dict[ob.arg].arg)
1389            objects2.remove(ob)
1390        self.assertEqual(len(objects2), 0)
1391
1392        # Test itervaluerefs()
1393        objects2 = list(objects)
1394        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1395        for wr in dict.itervaluerefs():
1396            ob = wr()
1397            self.assertEqual(ob, dict[ob.arg])
1398            self.assertEqual(ob.arg, dict[ob.arg].arg)
1399            objects2.remove(ob)
1400        self.assertEqual(len(objects2), 0)
1401
1402    def check_iters(self, dict):
1403        # item iterator:
1404        items = list(dict.items())
1405        for item in dict.items():
1406            items.remove(item)
1407        self.assertFalse(items, "items() did not touch all items")
1408
1409        # key iterator, via __iter__():
1410        keys = list(dict.keys())
1411        for k in dict:
1412            keys.remove(k)
1413        self.assertFalse(keys, "__iter__() did not touch all keys")
1414
1415        # key iterator, via iterkeys():
1416        keys = list(dict.keys())
1417        for k in dict.keys():
1418            keys.remove(k)
1419        self.assertFalse(keys, "iterkeys() did not touch all keys")
1420
1421        # value iterator:
1422        values = list(dict.values())
1423        for v in dict.values():
1424            values.remove(v)
1425        self.assertFalse(values,
1426                     "itervalues() did not touch all values")
1427
1428    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1429        n = len(dict)
1430        it = iter(getattr(dict, iter_name)())
1431        next(it)             # Trigger internal iteration
1432        # Destroy an object
1433        del objects[-1]
1434        gc.collect()    # just in case
1435        # We have removed either the first consumed object, or another one
1436        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1437        del it
1438        # The removal has been committed
1439        self.assertEqual(len(dict), n - 1)
1440
1441    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1442        # Check that we can explicitly mutate the weak dict without
1443        # interfering with delayed removal.
1444        # `testcontext` should create an iterator, destroy one of the
1445        # weakref'ed objects and then return a new key/value pair corresponding
1446        # to the destroyed object.
1447        with testcontext() as (k, v):
1448            self.assertNotIn(k, dict)
1449        with testcontext() as (k, v):
1450            self.assertRaises(KeyError, dict.__delitem__, k)
1451        self.assertNotIn(k, dict)
1452        with testcontext() as (k, v):
1453            self.assertRaises(KeyError, dict.pop, k)
1454        self.assertNotIn(k, dict)
1455        with testcontext() as (k, v):
1456            dict[k] = v
1457        self.assertEqual(dict[k], v)
1458        ddict = copy.copy(dict)
1459        with testcontext() as (k, v):
1460            dict.update(ddict)
1461        self.assertEqual(dict, ddict)
1462        with testcontext() as (k, v):
1463            dict.clear()
1464        self.assertEqual(len(dict), 0)
1465
1466    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1467        # Check that len() works when both iterating and removing keys
1468        # explicitly through various means (.pop(), .clear()...), while
1469        # implicit mutation is deferred because an iterator is alive.
1470        # (each call to testcontext() should schedule one item for removal
1471        #  for this test to work properly)
1472        o = Object(123456)
1473        with testcontext():
1474            n = len(dict)
1475            # Since underlying dict is ordered, first item is popped
1476            dict.pop(next(dict.keys()))
1477            self.assertEqual(len(dict), n - 1)
1478            dict[o] = o
1479            self.assertEqual(len(dict), n)
1480        # last item in objects is removed from dict in context shutdown
1481        with testcontext():
1482            self.assertEqual(len(dict), n - 1)
1483            # Then, (o, o) is popped
1484            dict.popitem()
1485            self.assertEqual(len(dict), n - 2)
1486        with testcontext():
1487            self.assertEqual(len(dict), n - 3)
1488            del dict[next(dict.keys())]
1489            self.assertEqual(len(dict), n - 4)
1490        with testcontext():
1491            self.assertEqual(len(dict), n - 5)
1492            dict.popitem()
1493            self.assertEqual(len(dict), n - 6)
1494        with testcontext():
1495            dict.clear()
1496            self.assertEqual(len(dict), 0)
1497        self.assertEqual(len(dict), 0)
1498
1499    def test_weak_keys_destroy_while_iterating(self):
1500        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1501        dict, objects = self.make_weak_keyed_dict()
1502        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1503        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1504        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1505        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1506        dict, objects = self.make_weak_keyed_dict()
1507        @contextlib.contextmanager
1508        def testcontext():
1509            try:
1510                it = iter(dict.items())
1511                next(it)
1512                # Schedule a key/value for removal and recreate it
1513                v = objects.pop().arg
1514                gc.collect()      # just in case
1515                yield Object(v), v
1516            finally:
1517                it = None           # should commit all removals
1518                gc.collect()
1519        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1520        # Issue #21173: len() fragile when keys are both implicitly and
1521        # explicitly removed.
1522        dict, objects = self.make_weak_keyed_dict()
1523        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1524
1525    def test_weak_values_destroy_while_iterating(self):
1526        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1527        dict, objects = self.make_weak_valued_dict()
1528        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1529        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1530        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1531        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1532        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1533        dict, objects = self.make_weak_valued_dict()
1534        @contextlib.contextmanager
1535        def testcontext():
1536            try:
1537                it = iter(dict.items())
1538                next(it)
1539                # Schedule a key/value for removal and recreate it
1540                k = objects.pop().arg
1541                gc.collect()      # just in case
1542                yield k, Object(k)
1543            finally:
1544                it = None           # should commit all removals
1545                gc.collect()
1546        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1547        dict, objects = self.make_weak_valued_dict()
1548        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1549
1550    def test_make_weak_keyed_dict_from_dict(self):
1551        o = Object(3)
1552        dict = weakref.WeakKeyDictionary({o:364})
1553        self.assertEqual(dict[o], 364)
1554
1555    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1556        o = Object(3)
1557        dict = weakref.WeakKeyDictionary({o:364})
1558        dict2 = weakref.WeakKeyDictionary(dict)
1559        self.assertEqual(dict[o], 364)
1560
1561    def make_weak_keyed_dict(self):
1562        dict = weakref.WeakKeyDictionary()
1563        objects = list(map(Object, range(self.COUNT)))
1564        for o in objects:
1565            dict[o] = o.arg
1566        return dict, objects
1567
1568    def test_make_weak_valued_dict_from_dict(self):
1569        o = Object(3)
1570        dict = weakref.WeakValueDictionary({364:o})
1571        self.assertEqual(dict[364], o)
1572
1573    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1574        o = Object(3)
1575        dict = weakref.WeakValueDictionary({364:o})
1576        dict2 = weakref.WeakValueDictionary(dict)
1577        self.assertEqual(dict[364], o)
1578
1579    def test_make_weak_valued_dict_misc(self):
1580        # errors
1581        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1582        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1583        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1584        # special keyword arguments
1585        o = Object(3)
1586        for kw in 'self', 'dict', 'other', 'iterable':
1587            d = weakref.WeakValueDictionary(**{kw: o})
1588            self.assertEqual(list(d.keys()), [kw])
1589            self.assertEqual(d[kw], o)
1590
1591    def make_weak_valued_dict(self):
1592        dict = weakref.WeakValueDictionary()
1593        objects = list(map(Object, range(self.COUNT)))
1594        for o in objects:
1595            dict[o.arg] = o
1596        return dict, objects
1597
1598    def check_popitem(self, klass, key1, value1, key2, value2):
1599        weakdict = klass()
1600        weakdict[key1] = value1
1601        weakdict[key2] = value2
1602        self.assertEqual(len(weakdict), 2)
1603        k, v = weakdict.popitem()
1604        self.assertEqual(len(weakdict), 1)
1605        if k is key1:
1606            self.assertIs(v, value1)
1607        else:
1608            self.assertIs(v, value2)
1609        k, v = weakdict.popitem()
1610        self.assertEqual(len(weakdict), 0)
1611        if k is key1:
1612            self.assertIs(v, value1)
1613        else:
1614            self.assertIs(v, value2)
1615
1616    def test_weak_valued_dict_popitem(self):
1617        self.check_popitem(weakref.WeakValueDictionary,
1618                           "key1", C(), "key2", C())
1619
1620    def test_weak_keyed_dict_popitem(self):
1621        self.check_popitem(weakref.WeakKeyDictionary,
1622                           C(), "value 1", C(), "value 2")
1623
1624    def check_setdefault(self, klass, key, value1, value2):
1625        self.assertIsNot(value1, value2,
1626                     "invalid test"
1627                     " -- value parameters must be distinct objects")
1628        weakdict = klass()
1629        o = weakdict.setdefault(key, value1)
1630        self.assertIs(o, value1)
1631        self.assertIn(key, weakdict)
1632        self.assertIs(weakdict.get(key), value1)
1633        self.assertIs(weakdict[key], value1)
1634
1635        o = weakdict.setdefault(key, value2)
1636        self.assertIs(o, value1)
1637        self.assertIn(key, weakdict)
1638        self.assertIs(weakdict.get(key), value1)
1639        self.assertIs(weakdict[key], value1)
1640
1641    def test_weak_valued_dict_setdefault(self):
1642        self.check_setdefault(weakref.WeakValueDictionary,
1643                              "key", C(), C())
1644
1645    def test_weak_keyed_dict_setdefault(self):
1646        self.check_setdefault(weakref.WeakKeyDictionary,
1647                              C(), "value 1", "value 2")
1648
1649    def check_update(self, klass, dict):
1650        #
1651        #  This exercises d.update(), len(d), d.keys(), k in d,
1652        #  d.get(), d[].
1653        #
1654        weakdict = klass()
1655        weakdict.update(dict)
1656        self.assertEqual(len(weakdict), len(dict))
1657        for k in weakdict.keys():
1658            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1659            v = dict.get(k)
1660            self.assertIs(v, weakdict[k])
1661            self.assertIs(v, weakdict.get(k))
1662        for k in dict.keys():
1663            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1664            v = dict[k]
1665            self.assertIs(v, weakdict[k])
1666            self.assertIs(v, weakdict.get(k))
1667
1668    def test_weak_valued_dict_update(self):
1669        self.check_update(weakref.WeakValueDictionary,
1670                          {1: C(), 'a': C(), C(): C()})
1671        # errors
1672        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1673        d = weakref.WeakValueDictionary()
1674        self.assertRaises(TypeError, d.update, {}, {})
1675        self.assertRaises(TypeError, d.update, (), ())
1676        self.assertEqual(list(d.keys()), [])
1677        # special keyword arguments
1678        o = Object(3)
1679        for kw in 'self', 'dict', 'other', 'iterable':
1680            d = weakref.WeakValueDictionary()
1681            d.update(**{kw: o})
1682            self.assertEqual(list(d.keys()), [kw])
1683            self.assertEqual(d[kw], o)
1684
1685    def test_weak_valued_union_operators(self):
1686        a = C()
1687        b = C()
1688        c = C()
1689        wvd1 = weakref.WeakValueDictionary({1: a})
1690        wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
1691        wvd3 = wvd1.copy()
1692        d1 = {1: c, 3: b}
1693        pairs = [(5, c), (6, b)]
1694
1695        tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
1696        self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
1697        self.assertIs(type(tmp1), weakref.WeakValueDictionary)
1698        wvd1 |= wvd2
1699        self.assertEqual(wvd1, tmp1)
1700
1701        tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
1702        self.assertEqual(dict(tmp2), dict(wvd2) | d1)
1703        self.assertIs(type(tmp2), weakref.WeakValueDictionary)
1704        wvd2 |= d1
1705        self.assertEqual(wvd2, tmp2)
1706
1707        tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
1708        tmp3 |= pairs
1709        self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
1710        self.assertIs(type(tmp3), weakref.WeakValueDictionary)
1711
1712        tmp4 = d1 | wvd3 # Testing .__ror__
1713        self.assertEqual(dict(tmp4), d1 | dict(wvd3))
1714        self.assertIs(type(tmp4), weakref.WeakValueDictionary)
1715
1716        del a
1717        self.assertNotIn(2, tmp1)
1718        self.assertNotIn(2, tmp2)
1719        self.assertNotIn(1, tmp3)
1720        self.assertNotIn(1, tmp4)
1721
1722    def test_weak_keyed_dict_update(self):
1723        self.check_update(weakref.WeakKeyDictionary,
1724                          {C(): 1, C(): 2, C(): 3})
1725
1726    def test_weak_keyed_delitem(self):
1727        d = weakref.WeakKeyDictionary()
1728        o1 = Object('1')
1729        o2 = Object('2')
1730        d[o1] = 'something'
1731        d[o2] = 'something'
1732        self.assertEqual(len(d), 2)
1733        del d[o1]
1734        self.assertEqual(len(d), 1)
1735        self.assertEqual(list(d.keys()), [o2])
1736
1737    def test_weak_keyed_union_operators(self):
1738        o1 = C()
1739        o2 = C()
1740        o3 = C()
1741        wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
1742        wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
1743        wkd3 = wkd1.copy()
1744        d1 = {o2: '5', o3: '6'}
1745        pairs = [(o2, 7), (o3, 8)]
1746
1747        tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
1748        self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
1749        self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
1750        wkd1 |= wkd2
1751        self.assertEqual(wkd1, tmp1)
1752
1753        tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
1754        self.assertEqual(dict(tmp2), dict(wkd2) | d1)
1755        self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
1756        wkd2 |= d1
1757        self.assertEqual(wkd2, tmp2)
1758
1759        tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
1760        tmp3 |= pairs
1761        self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
1762        self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
1763
1764        tmp4 = d1 | wkd3 # Testing .__ror__
1765        self.assertEqual(dict(tmp4), d1 | dict(wkd3))
1766        self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
1767
1768        del o1
1769        self.assertNotIn(4, tmp1.values())
1770        self.assertNotIn(4, tmp2.values())
1771        self.assertNotIn(1, tmp3.values())
1772        self.assertNotIn(1, tmp4.values())
1773
1774    def test_weak_valued_delitem(self):
1775        d = weakref.WeakValueDictionary()
1776        o1 = Object('1')
1777        o2 = Object('2')
1778        d['something'] = o1
1779        d['something else'] = o2
1780        self.assertEqual(len(d), 2)
1781        del d['something']
1782        self.assertEqual(len(d), 1)
1783        self.assertEqual(list(d.items()), [('something else', o2)])
1784
1785    def test_weak_keyed_bad_delitem(self):
1786        d = weakref.WeakKeyDictionary()
1787        o = Object('1')
1788        # An attempt to delete an object that isn't there should raise
1789        # KeyError.  It didn't before 2.3.
1790        self.assertRaises(KeyError, d.__delitem__, o)
1791        self.assertRaises(KeyError, d.__getitem__, o)
1792
1793        # If a key isn't of a weakly referencable type, __getitem__ and
1794        # __setitem__ raise TypeError.  __delitem__ should too.
1795        self.assertRaises(TypeError, d.__delitem__,  13)
1796        self.assertRaises(TypeError, d.__getitem__,  13)
1797        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1798
1799    def test_weak_keyed_cascading_deletes(self):
1800        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1801        # over the keys via self.data.iterkeys().  If things vanished from
1802        # the dict during this (or got added), that caused a RuntimeError.
1803
1804        d = weakref.WeakKeyDictionary()
1805        mutate = False
1806
1807        class C(object):
1808            def __init__(self, i):
1809                self.value = i
1810            def __hash__(self):
1811                return hash(self.value)
1812            def __eq__(self, other):
1813                if mutate:
1814                    # Side effect that mutates the dict, by removing the
1815                    # last strong reference to a key.
1816                    del objs[-1]
1817                return self.value == other.value
1818
1819        objs = [C(i) for i in range(4)]
1820        for o in objs:
1821            d[o] = o.value
1822        del o   # now the only strong references to keys are in objs
1823        # Find the order in which iterkeys sees the keys.
1824        objs = list(d.keys())
1825        # Reverse it, so that the iteration implementation of __delitem__
1826        # has to keep looping to find the first object we delete.
1827        objs.reverse()
1828
1829        # Turn on mutation in C.__eq__.  The first time through the loop,
1830        # under the iterkeys() business the first comparison will delete
1831        # the last item iterkeys() would see, and that causes a
1832        #     RuntimeError: dictionary changed size during iteration
1833        # when the iterkeys() loop goes around to try comparing the next
1834        # key.  After this was fixed, it just deletes the last object *our*
1835        # "for o in obj" loop would have gotten to.
1836        mutate = True
1837        count = 0
1838        for o in objs:
1839            count += 1
1840            del d[o]
1841        gc_collect()  # For PyPy or other GCs.
1842        self.assertEqual(len(d), 0)
1843        self.assertEqual(count, 2)
1844
1845    def test_make_weak_valued_dict_repr(self):
1846        dict = weakref.WeakValueDictionary()
1847        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1848
1849    def test_make_weak_keyed_dict_repr(self):
1850        dict = weakref.WeakKeyDictionary()
1851        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1852
1853    def test_threaded_weak_valued_setdefault(self):
1854        d = weakref.WeakValueDictionary()
1855        with collect_in_thread():
1856            for i in range(100000):
1857                x = d.setdefault(10, RefCycle())
1858                self.assertIsNot(x, None)  # we never put None in there!
1859                del x
1860
1861    def test_threaded_weak_valued_pop(self):
1862        d = weakref.WeakValueDictionary()
1863        with collect_in_thread():
1864            for i in range(100000):
1865                d[10] = RefCycle()
1866                x = d.pop(10, 10)
1867                self.assertIsNot(x, None)  # we never put None in there!
1868
1869    def test_threaded_weak_valued_consistency(self):
1870        # Issue #28427: old keys should not remove new values from
1871        # WeakValueDictionary when collecting from another thread.
1872        d = weakref.WeakValueDictionary()
1873        with collect_in_thread():
1874            for i in range(200000):
1875                o = RefCycle()
1876                d[10] = o
1877                # o is still alive, so the dict can't be empty
1878                self.assertEqual(len(d), 1)
1879                o = None  # lose ref
1880
1881    def check_threaded_weak_dict_copy(self, type_, deepcopy):
1882        # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1883        # `deepcopy` should be either True or False.
1884        exc = []
1885
1886        class DummyKey:
1887            def __init__(self, ctr):
1888                self.ctr = ctr
1889
1890        class DummyValue:
1891            def __init__(self, ctr):
1892                self.ctr = ctr
1893
1894        def dict_copy(d, exc):
1895            try:
1896                if deepcopy is True:
1897                    _ = copy.deepcopy(d)
1898                else:
1899                    _ = d.copy()
1900            except Exception as ex:
1901                exc.append(ex)
1902
1903        def pop_and_collect(lst):
1904            gc_ctr = 0
1905            while lst:
1906                i = random.randint(0, len(lst) - 1)
1907                gc_ctr += 1
1908                lst.pop(i)
1909                if gc_ctr % 10000 == 0:
1910                    gc.collect()  # just in case
1911
1912        self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
1913
1914        d = type_()
1915        keys = []
1916        values = []
1917        # Initialize d with many entries
1918        for i in range(70000):
1919            k, v = DummyKey(i), DummyValue(i)
1920            keys.append(k)
1921            values.append(v)
1922            d[k] = v
1923            del k
1924            del v
1925
1926        t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
1927        if type_ is weakref.WeakKeyDictionary:
1928            t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
1929        else:  # weakref.WeakValueDictionary
1930            t_collect = threading.Thread(target=pop_and_collect, args=(values,))
1931
1932        t_copy.start()
1933        t_collect.start()
1934
1935        t_copy.join()
1936        t_collect.join()
1937
1938        # Test exceptions
1939        if exc:
1940            raise exc[0]
1941
1942    def test_threaded_weak_key_dict_copy(self):
1943        # Issue #35615: Weakref keys or values getting GC'ed during dict
1944        # copying should not result in a crash.
1945        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
1946
1947    def test_threaded_weak_key_dict_deepcopy(self):
1948        # Issue #35615: Weakref keys or values getting GC'ed during dict
1949        # copying should not result in a crash.
1950        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
1951
1952    def test_threaded_weak_value_dict_copy(self):
1953        # Issue #35615: Weakref keys or values getting GC'ed during dict
1954        # copying should not result in a crash.
1955        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
1956
1957    def test_threaded_weak_value_dict_deepcopy(self):
1958        # Issue #35615: Weakref keys or values getting GC'ed during dict
1959        # copying should not result in a crash.
1960        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
1961
1962    @support.cpython_only
1963    def test_remove_closure(self):
1964        d = weakref.WeakValueDictionary()
1965        self.assertIsNone(d._remove.__closure__)
1966
1967
1968from test import mapping_tests
1969
1970class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1971    """Check that WeakValueDictionary conforms to the mapping protocol"""
1972    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1973    type2test = weakref.WeakValueDictionary
1974    def _reference(self):
1975        return self.__ref.copy()
1976
1977class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1978    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1979    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1980    type2test = weakref.WeakKeyDictionary
1981    def _reference(self):
1982        return self.__ref.copy()
1983
1984
1985class FinalizeTestCase(unittest.TestCase):
1986
1987    class A:
1988        pass
1989
1990    def _collect_if_necessary(self):
1991        # we create no ref-cycles so in CPython no gc should be needed
1992        if sys.implementation.name != 'cpython':
1993            support.gc_collect()
1994
1995    def test_finalize(self):
1996        def add(x,y,z):
1997            res.append(x + y + z)
1998            return x + y + z
1999
2000        a = self.A()
2001
2002        res = []
2003        f = weakref.finalize(a, add, 67, 43, z=89)
2004        self.assertEqual(f.alive, True)
2005        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
2006        self.assertEqual(f(), 199)
2007        self.assertEqual(f(), None)
2008        self.assertEqual(f(), None)
2009        self.assertEqual(f.peek(), None)
2010        self.assertEqual(f.detach(), None)
2011        self.assertEqual(f.alive, False)
2012        self.assertEqual(res, [199])
2013
2014        res = []
2015        f = weakref.finalize(a, add, 67, 43, 89)
2016        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
2017        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
2018        self.assertEqual(f(), None)
2019        self.assertEqual(f(), None)
2020        self.assertEqual(f.peek(), None)
2021        self.assertEqual(f.detach(), None)
2022        self.assertEqual(f.alive, False)
2023        self.assertEqual(res, [])
2024
2025        res = []
2026        f = weakref.finalize(a, add, x=67, y=43, z=89)
2027        del a
2028        self._collect_if_necessary()
2029        self.assertEqual(f(), None)
2030        self.assertEqual(f(), None)
2031        self.assertEqual(f.peek(), None)
2032        self.assertEqual(f.detach(), None)
2033        self.assertEqual(f.alive, False)
2034        self.assertEqual(res, [199])
2035
2036    def test_arg_errors(self):
2037        def fin(*args, **kwargs):
2038            res.append((args, kwargs))
2039
2040        a = self.A()
2041
2042        res = []
2043        f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
2044        self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
2045        f()
2046        self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
2047
2048        with self.assertRaises(TypeError):
2049            weakref.finalize(a, func=fin, arg=1)
2050        with self.assertRaises(TypeError):
2051            weakref.finalize(obj=a, func=fin, arg=1)
2052        self.assertRaises(TypeError, weakref.finalize, a)
2053        self.assertRaises(TypeError, weakref.finalize)
2054
2055    def test_order(self):
2056        a = self.A()
2057        res = []
2058
2059        f1 = weakref.finalize(a, res.append, 'f1')
2060        f2 = weakref.finalize(a, res.append, 'f2')
2061        f3 = weakref.finalize(a, res.append, 'f3')
2062        f4 = weakref.finalize(a, res.append, 'f4')
2063        f5 = weakref.finalize(a, res.append, 'f5')
2064
2065        # make sure finalizers can keep themselves alive
2066        del f1, f4
2067
2068        self.assertTrue(f2.alive)
2069        self.assertTrue(f3.alive)
2070        self.assertTrue(f5.alive)
2071
2072        self.assertTrue(f5.detach())
2073        self.assertFalse(f5.alive)
2074
2075        f5()                       # nothing because previously unregistered
2076        res.append('A')
2077        f3()                       # => res.append('f3')
2078        self.assertFalse(f3.alive)
2079        res.append('B')
2080        f3()                       # nothing because previously called
2081        res.append('C')
2082        del a
2083        self._collect_if_necessary()
2084                                   # => res.append('f4')
2085                                   # => res.append('f2')
2086                                   # => res.append('f1')
2087        self.assertFalse(f2.alive)
2088        res.append('D')
2089        f2()                       # nothing because previously called by gc
2090
2091        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
2092        self.assertEqual(res, expected)
2093
2094    def test_all_freed(self):
2095        # we want a weakrefable subclass of weakref.finalize
2096        class MyFinalizer(weakref.finalize):
2097            pass
2098
2099        a = self.A()
2100        res = []
2101        def callback():
2102            res.append(123)
2103        f = MyFinalizer(a, callback)
2104
2105        wr_callback = weakref.ref(callback)
2106        wr_f = weakref.ref(f)
2107        del callback, f
2108
2109        self.assertIsNotNone(wr_callback())
2110        self.assertIsNotNone(wr_f())
2111
2112        del a
2113        self._collect_if_necessary()
2114
2115        self.assertIsNone(wr_callback())
2116        self.assertIsNone(wr_f())
2117        self.assertEqual(res, [123])
2118
2119    @classmethod
2120    def run_in_child(cls):
2121        def error():
2122            # Create an atexit finalizer from inside a finalizer called
2123            # at exit.  This should be the next to be run.
2124            g1 = weakref.finalize(cls, print, 'g1')
2125            print('f3 error')
2126            1/0
2127
2128        # cls should stay alive till atexit callbacks run
2129        f1 = weakref.finalize(cls, print, 'f1', _global_var)
2130        f2 = weakref.finalize(cls, print, 'f2', _global_var)
2131        f3 = weakref.finalize(cls, error)
2132        f4 = weakref.finalize(cls, print, 'f4', _global_var)
2133
2134        assert f1.atexit == True
2135        f2.atexit = False
2136        assert f3.atexit == True
2137        assert f4.atexit == True
2138
2139    def test_atexit(self):
2140        prog = ('from test.test_weakref import FinalizeTestCase;'+
2141                'FinalizeTestCase.run_in_child()')
2142        rc, out, err = script_helper.assert_python_ok('-c', prog)
2143        out = out.decode('ascii').splitlines()
2144        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
2145        self.assertTrue(b'ZeroDivisionError' in err)
2146
2147
2148libreftest = """ Doctest for examples in the library reference: weakref.rst
2149
2150>>> from test.support import gc_collect
2151>>> import weakref
2152>>> class Dict(dict):
2153...     pass
2154...
2155>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
2156>>> r = weakref.ref(obj)
2157>>> print(r() is obj)
2158True
2159
2160>>> import weakref
2161>>> class Object:
2162...     pass
2163...
2164>>> o = Object()
2165>>> r = weakref.ref(o)
2166>>> o2 = r()
2167>>> o is o2
2168True
2169>>> del o, o2
2170>>> gc_collect()  # For PyPy or other GCs.
2171>>> print(r())
2172None
2173
2174>>> import weakref
2175>>> class ExtendedRef(weakref.ref):
2176...     def __init__(self, ob, callback=None, **annotations):
2177...         super().__init__(ob, callback)
2178...         self.__counter = 0
2179...         for k, v in annotations.items():
2180...             setattr(self, k, v)
2181...     def __call__(self):
2182...         '''Return a pair containing the referent and the number of
2183...         times the reference has been called.
2184...         '''
2185...         ob = super().__call__()
2186...         if ob is not None:
2187...             self.__counter += 1
2188...             ob = (ob, self.__counter)
2189...         return ob
2190...
2191>>> class A:   # not in docs from here, just testing the ExtendedRef
2192...     pass
2193...
2194>>> a = A()
2195>>> r = ExtendedRef(a, foo=1, bar="baz")
2196>>> r.foo
21971
2198>>> r.bar
2199'baz'
2200>>> r()[1]
22011
2202>>> r()[1]
22032
2204>>> r()[0] is a
2205True
2206
2207
2208>>> import weakref
2209>>> _id2obj_dict = weakref.WeakValueDictionary()
2210>>> def remember(obj):
2211...     oid = id(obj)
2212...     _id2obj_dict[oid] = obj
2213...     return oid
2214...
2215>>> def id2obj(oid):
2216...     return _id2obj_dict[oid]
2217...
2218>>> a = A()             # from here, just testing
2219>>> a_id = remember(a)
2220>>> id2obj(a_id) is a
2221True
2222>>> del a
2223>>> gc_collect()  # For PyPy or other GCs.
2224>>> try:
2225...     id2obj(a_id)
2226... except KeyError:
2227...     print('OK')
2228... else:
2229...     print('WeakValueDictionary error')
2230OK
2231
2232"""
2233
2234__test__ = {'libreftest' : libreftest}
2235
2236def test_main():
2237    support.run_unittest(
2238        ReferencesTestCase,
2239        WeakMethodTestCase,
2240        MappingTestCase,
2241        WeakValueDictionaryTestCase,
2242        WeakKeyDictionaryTestCase,
2243        SubclassableWeakrefTestCase,
2244        FinalizeTestCase,
2245        )
2246    support.run_doctest(sys.modules[__name__])
2247
2248
2249if __name__ == "__main__":
2250    test_main()
2251