• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import sys
3import unittest
4import collections
5import weakref
6import operator
7import contextlib
8import copy
9import threading
10import time
11import random
12
13from test import support
14from test.support import script_helper, ALWAYS_EQ
15
16# Used in ReferencesTestCase.test_ref_created_during_del() .
17ref_from_del = None
18
19# Used by FinalizeTestCase as a global that may be replaced by None
20# when the interpreter shuts down.
21_global_var = 'foobar'
22
23class C:
24    def method(self):
25        pass
26
27
28class Callable:
29    bar = None
30
31    def __call__(self, x):
32        self.bar = x
33
34
35def create_function():
36    def f(): pass
37    return f
38
39def create_bound_method():
40    return C().method
41
42
43class Object:
44    def __init__(self, arg):
45        self.arg = arg
46    def __repr__(self):
47        return "<Object %r>" % self.arg
48    def __eq__(self, other):
49        if isinstance(other, Object):
50            return self.arg == other.arg
51        return NotImplemented
52    def __lt__(self, other):
53        if isinstance(other, Object):
54            return self.arg < other.arg
55        return NotImplemented
56    def __hash__(self):
57        return hash(self.arg)
58    def some_method(self):
59        return 4
60    def other_method(self):
61        return 5
62
63
64class RefCycle:
65    def __init__(self):
66        self.cycle = self
67
68
69class TestBase(unittest.TestCase):
70
71    def setUp(self):
72        self.cbcalled = 0
73
74    def callback(self, ref):
75        self.cbcalled += 1
76
77
78@contextlib.contextmanager
79def collect_in_thread(period=0.0001):
80    """
81    Ensure GC collections happen in a different thread, at a high frequency.
82    """
83    please_stop = False
84
85    def collect():
86        while not please_stop:
87            time.sleep(period)
88            gc.collect()
89
90    with support.disable_gc():
91        t = threading.Thread(target=collect)
92        t.start()
93        try:
94            yield
95        finally:
96            please_stop = True
97            t.join()
98
99
100class ReferencesTestCase(TestBase):
101
102    def test_basic_ref(self):
103        self.check_basic_ref(C)
104        self.check_basic_ref(create_function)
105        self.check_basic_ref(create_bound_method)
106
107        # Just make sure the tp_repr handler doesn't raise an exception.
108        # Live reference:
109        o = C()
110        wr = weakref.ref(o)
111        repr(wr)
112        # Dead reference:
113        del o
114        repr(wr)
115
116    def test_basic_callback(self):
117        self.check_basic_callback(C)
118        self.check_basic_callback(create_function)
119        self.check_basic_callback(create_bound_method)
120
121    @support.cpython_only
122    def test_cfunction(self):
123        import _testcapi
124        create_cfunction = _testcapi.create_cfunction
125        f = create_cfunction()
126        wr = weakref.ref(f)
127        self.assertIs(wr(), f)
128        del f
129        self.assertIsNone(wr())
130        self.check_basic_ref(create_cfunction)
131        self.check_basic_callback(create_cfunction)
132
133    def test_multiple_callbacks(self):
134        o = C()
135        ref1 = weakref.ref(o, self.callback)
136        ref2 = weakref.ref(o, self.callback)
137        del o
138        self.assertIsNone(ref1(), "expected reference to be invalidated")
139        self.assertIsNone(ref2(), "expected reference to be invalidated")
140        self.assertEqual(self.cbcalled, 2,
141                     "callback not called the right number of times")
142
143    def test_multiple_selfref_callbacks(self):
144        # Make sure all references are invalidated before callbacks are called
145        #
146        # What's important here is that we're using the first
147        # reference in the callback invoked on the second reference
148        # (the most recently created ref is cleaned up first).  This
149        # tests that all references to the object are invalidated
150        # before any of the callbacks are invoked, so that we only
151        # have one invocation of _weakref.c:cleanup_helper() active
152        # for a particular object at a time.
153        #
154        def callback(object, self=self):
155            self.ref()
156        c = C()
157        self.ref = weakref.ref(c, callback)
158        ref1 = weakref.ref(c, callback)
159        del c
160
161    def test_constructor_kwargs(self):
162        c = C()
163        self.assertRaises(TypeError, weakref.ref, c, callback=None)
164
165    def test_proxy_ref(self):
166        o = C()
167        o.bar = 1
168        ref1 = weakref.proxy(o, self.callback)
169        ref2 = weakref.proxy(o, self.callback)
170        del o
171
172        def check(proxy):
173            proxy.bar
174
175        self.assertRaises(ReferenceError, check, ref1)
176        self.assertRaises(ReferenceError, check, ref2)
177        self.assertRaises(ReferenceError, bool, weakref.proxy(C()))
178        self.assertEqual(self.cbcalled, 2)
179
180    def check_basic_ref(self, factory):
181        o = factory()
182        ref = weakref.ref(o)
183        self.assertIsNotNone(ref(),
184                     "weak reference to live object should be live")
185        o2 = ref()
186        self.assertIs(o, o2,
187                     "<ref>() should return original object if live")
188
189    def check_basic_callback(self, factory):
190        self.cbcalled = 0
191        o = factory()
192        ref = weakref.ref(o, self.callback)
193        del o
194        self.assertEqual(self.cbcalled, 1,
195                     "callback did not properly set 'cbcalled'")
196        self.assertIsNone(ref(),
197                     "ref2 should be dead after deleting object reference")
198
199    def test_ref_reuse(self):
200        o = C()
201        ref1 = weakref.ref(o)
202        # create a proxy to make sure that there's an intervening creation
203        # between these two; it should make no difference
204        proxy = weakref.proxy(o)
205        ref2 = weakref.ref(o)
206        self.assertIs(ref1, ref2,
207                     "reference object w/out callback should be re-used")
208
209        o = C()
210        proxy = weakref.proxy(o)
211        ref1 = weakref.ref(o)
212        ref2 = weakref.ref(o)
213        self.assertIs(ref1, ref2,
214                     "reference object w/out callback should be re-used")
215        self.assertEqual(weakref.getweakrefcount(o), 2,
216                     "wrong weak ref count for object")
217        del proxy
218        self.assertEqual(weakref.getweakrefcount(o), 1,
219                     "wrong weak ref count for object after deleting proxy")
220
221    def test_proxy_reuse(self):
222        o = C()
223        proxy1 = weakref.proxy(o)
224        ref = weakref.ref(o)
225        proxy2 = weakref.proxy(o)
226        self.assertIs(proxy1, proxy2,
227                     "proxy object w/out callback should have been re-used")
228
229    def test_basic_proxy(self):
230        o = C()
231        self.check_proxy(o, weakref.proxy(o))
232
233        L = collections.UserList()
234        p = weakref.proxy(L)
235        self.assertFalse(p, "proxy for empty UserList should be false")
236        p.append(12)
237        self.assertEqual(len(L), 1)
238        self.assertTrue(p, "proxy for non-empty UserList should be true")
239        p[:] = [2, 3]
240        self.assertEqual(len(L), 2)
241        self.assertEqual(len(p), 2)
242        self.assertIn(3, p, "proxy didn't support __contains__() properly")
243        p[1] = 5
244        self.assertEqual(L[1], 5)
245        self.assertEqual(p[1], 5)
246        L2 = collections.UserList(L)
247        p2 = weakref.proxy(L2)
248        self.assertEqual(p, p2)
249        ## self.assertEqual(repr(L2), repr(p2))
250        L3 = collections.UserList(range(10))
251        p3 = weakref.proxy(L3)
252        self.assertEqual(L3[:], p3[:])
253        self.assertEqual(L3[5:], p3[5:])
254        self.assertEqual(L3[:5], p3[:5])
255        self.assertEqual(L3[2:5], p3[2:5])
256
257    def test_proxy_unicode(self):
258        # See bug 5037
259        class C(object):
260            def __str__(self):
261                return "string"
262            def __bytes__(self):
263                return b"bytes"
264        instance = C()
265        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
266        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
267
268    def test_proxy_index(self):
269        class C:
270            def __index__(self):
271                return 10
272        o = C()
273        p = weakref.proxy(o)
274        self.assertEqual(operator.index(p), 10)
275
276    def test_proxy_div(self):
277        class C:
278            def __floordiv__(self, other):
279                return 42
280            def __ifloordiv__(self, other):
281                return 21
282        o = C()
283        p = weakref.proxy(o)
284        self.assertEqual(p // 5, 42)
285        p //= 5
286        self.assertEqual(p, 21)
287
288    def test_proxy_matmul(self):
289        class C:
290            def __matmul__(self, other):
291                return 1729
292            def __rmatmul__(self, other):
293                return -163
294            def __imatmul__(self, other):
295                return 561
296        o = C()
297        p = weakref.proxy(o)
298        self.assertEqual(p @ 5, 1729)
299        self.assertEqual(5 @ p, -163)
300        p @= 5
301        self.assertEqual(p, 561)
302
303    # The PyWeakref_* C API is documented as allowing either NULL or
304    # None as the value for the callback, where either means "no
305    # callback".  The "no callback" ref and proxy objects are supposed
306    # to be shared so long as they exist by all callers so long as
307    # they are active.  In Python 2.3.3 and earlier, this guarantee
308    # was not honored, and was broken in different ways for
309    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
310
311    def test_shared_ref_without_callback(self):
312        self.check_shared_without_callback(weakref.ref)
313
314    def test_shared_proxy_without_callback(self):
315        self.check_shared_without_callback(weakref.proxy)
316
317    def check_shared_without_callback(self, makeref):
318        o = Object(1)
319        p1 = makeref(o, None)
320        p2 = makeref(o, None)
321        self.assertIs(p1, p2, "both callbacks were None in the C API")
322        del p1, p2
323        p1 = makeref(o)
324        p2 = makeref(o, None)
325        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
326        del p1, p2
327        p1 = makeref(o)
328        p2 = makeref(o)
329        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
330        del p1, p2
331        p1 = makeref(o, None)
332        p2 = makeref(o)
333        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
334
335    def test_callable_proxy(self):
336        o = Callable()
337        ref1 = weakref.proxy(o)
338
339        self.check_proxy(o, ref1)
340
341        self.assertIs(type(ref1), weakref.CallableProxyType,
342                     "proxy is not of callable type")
343        ref1('twinkies!')
344        self.assertEqual(o.bar, 'twinkies!',
345                     "call through proxy not passed through to original")
346        ref1(x='Splat.')
347        self.assertEqual(o.bar, 'Splat.',
348                     "call through proxy not passed through to original")
349
350        # expect due to too few args
351        self.assertRaises(TypeError, ref1)
352
353        # expect due to too many args
354        self.assertRaises(TypeError, ref1, 1, 2, 3)
355
356    def check_proxy(self, o, proxy):
357        o.foo = 1
358        self.assertEqual(proxy.foo, 1,
359                     "proxy does not reflect attribute addition")
360        o.foo = 2
361        self.assertEqual(proxy.foo, 2,
362                     "proxy does not reflect attribute modification")
363        del o.foo
364        self.assertFalse(hasattr(proxy, 'foo'),
365                     "proxy does not reflect attribute removal")
366
367        proxy.foo = 1
368        self.assertEqual(o.foo, 1,
369                     "object does not reflect attribute addition via proxy")
370        proxy.foo = 2
371        self.assertEqual(o.foo, 2,
372            "object does not reflect attribute modification via proxy")
373        del proxy.foo
374        self.assertFalse(hasattr(o, 'foo'),
375                     "object does not reflect attribute removal via proxy")
376
377    def test_proxy_deletion(self):
378        # Test clearing of SF bug #762891
379        class Foo:
380            result = None
381            def __delitem__(self, accessor):
382                self.result = accessor
383        g = Foo()
384        f = weakref.proxy(g)
385        del f[0]
386        self.assertEqual(f.result, 0)
387
388    def test_proxy_bool(self):
389        # Test clearing of SF bug #1170766
390        class List(list): pass
391        lyst = List()
392        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
393
394    def test_proxy_iter(self):
395        # Test fails with a debug build of the interpreter
396        # (see bpo-38395).
397
398        obj = None
399
400        class MyObj:
401            def __iter__(self):
402                nonlocal obj
403                del obj
404                return NotImplemented
405
406        obj = MyObj()
407        p = weakref.proxy(obj)
408        with self.assertRaises(TypeError):
409            # "blech" in p calls MyObj.__iter__ through the proxy,
410            # without keeping a reference to the real object, so it
411            # can be killed in the middle of the call
412            "blech" in p
413
414    def test_proxy_reversed(self):
415        class MyObj:
416            def __len__(self):
417                return 3
418            def __reversed__(self):
419                return iter('cba')
420
421        obj = MyObj()
422        self.assertEqual("".join(reversed(weakref.proxy(obj))), "cba")
423
424    def test_proxy_hash(self):
425        cool_hash = 299_792_458
426
427        class MyObj:
428            def __hash__(self):
429                return cool_hash
430
431        obj = MyObj()
432        self.assertEqual(hash(weakref.proxy(obj)), cool_hash)
433
434    def test_getweakrefcount(self):
435        o = C()
436        ref1 = weakref.ref(o)
437        ref2 = weakref.ref(o, self.callback)
438        self.assertEqual(weakref.getweakrefcount(o), 2,
439                     "got wrong number of weak reference objects")
440
441        proxy1 = weakref.proxy(o)
442        proxy2 = weakref.proxy(o, self.callback)
443        self.assertEqual(weakref.getweakrefcount(o), 4,
444                     "got wrong number of weak reference objects")
445
446        del ref1, ref2, proxy1, proxy2
447        self.assertEqual(weakref.getweakrefcount(o), 0,
448                     "weak reference objects not unlinked from"
449                     " referent when discarded.")
450
451        # assumes ints do not support weakrefs
452        self.assertEqual(weakref.getweakrefcount(1), 0,
453                     "got wrong number of weak reference objects for int")
454
455    def test_getweakrefs(self):
456        o = C()
457        ref1 = weakref.ref(o, self.callback)
458        ref2 = weakref.ref(o, self.callback)
459        del ref1
460        self.assertEqual(weakref.getweakrefs(o), [ref2],
461                     "list of refs does not match")
462
463        o = C()
464        ref1 = weakref.ref(o, self.callback)
465        ref2 = weakref.ref(o, self.callback)
466        del ref2
467        self.assertEqual(weakref.getweakrefs(o), [ref1],
468                     "list of refs does not match")
469
470        del ref1
471        self.assertEqual(weakref.getweakrefs(o), [],
472                     "list of refs not cleared")
473
474        # assumes ints do not support weakrefs
475        self.assertEqual(weakref.getweakrefs(1), [],
476                     "list of refs does not match for int")
477
478    def test_newstyle_number_ops(self):
479        class F(float):
480            pass
481        f = F(2.0)
482        p = weakref.proxy(f)
483        self.assertEqual(p + 1.0, 3.0)
484        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
485
486    def test_callbacks_protected(self):
487        # Callbacks protected from already-set exceptions?
488        # Regression test for SF bug #478534.
489        class BogusError(Exception):
490            pass
491        data = {}
492        def remove(k):
493            del data[k]
494        def encapsulate():
495            f = lambda : ()
496            data[weakref.ref(f, remove)] = None
497            raise BogusError
498        try:
499            encapsulate()
500        except BogusError:
501            pass
502        else:
503            self.fail("exception not properly restored")
504        try:
505            encapsulate()
506        except BogusError:
507            pass
508        else:
509            self.fail("exception not properly restored")
510
511    def test_sf_bug_840829(self):
512        # "weakref callbacks and gc corrupt memory"
513        # subtype_dealloc erroneously exposed a new-style instance
514        # already in the process of getting deallocated to gc,
515        # causing double-deallocation if the instance had a weakref
516        # callback that triggered gc.
517        # If the bug exists, there probably won't be an obvious symptom
518        # in a release build.  In a debug build, a segfault will occur
519        # when the second attempt to remove the instance from the "list
520        # of all objects" occurs.
521
522        import gc
523
524        class C(object):
525            pass
526
527        c = C()
528        wr = weakref.ref(c, lambda ignore: gc.collect())
529        del c
530
531        # There endeth the first part.  It gets worse.
532        del wr
533
534        c1 = C()
535        c1.i = C()
536        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
537
538        c2 = C()
539        c2.c1 = c1
540        del c1  # still alive because c2 points to it
541
542        # Now when subtype_dealloc gets called on c2, it's not enough just
543        # that c2 is immune from gc while the weakref callbacks associated
544        # with c2 execute (there are none in this 2nd half of the test, btw).
545        # subtype_dealloc goes on to call the base classes' deallocs too,
546        # so any gc triggered by weakref callbacks associated with anything
547        # torn down by a base class dealloc can also trigger double
548        # deallocation of c2.
549        del c2
550
551    def test_callback_in_cycle_1(self):
552        import gc
553
554        class J(object):
555            pass
556
557        class II(object):
558            def acallback(self, ignore):
559                self.J
560
561        I = II()
562        I.J = J
563        I.wr = weakref.ref(J, I.acallback)
564
565        # Now J and II are each in a self-cycle (as all new-style class
566        # objects are, since their __mro__ points back to them).  I holds
567        # both a weak reference (I.wr) and a strong reference (I.J) to class
568        # J.  I is also in a cycle (I.wr points to a weakref that references
569        # I.acallback).  When we del these three, they all become trash, but
570        # the cycles prevent any of them from getting cleaned up immediately.
571        # Instead they have to wait for cyclic gc to deduce that they're
572        # trash.
573        #
574        # gc used to call tp_clear on all of them, and the order in which
575        # it does that is pretty accidental.  The exact order in which we
576        # built up these things manages to provoke gc into running tp_clear
577        # in just the right order (I last).  Calling tp_clear on II leaves
578        # behind an insane class object (its __mro__ becomes NULL).  Calling
579        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
580        # just then because of the strong reference from I.J.  Calling
581        # tp_clear on I starts to clear I's __dict__, and just happens to
582        # clear I.J first -- I.wr is still intact.  That removes the last
583        # reference to J, which triggers the weakref callback.  The callback
584        # tries to do "self.J", and instances of new-style classes look up
585        # attributes ("J") in the class dict first.  The class (II) wants to
586        # search II.__mro__, but that's NULL.   The result was a segfault in
587        # a release build, and an assert failure in a debug build.
588        del I, J, II
589        gc.collect()
590
591    def test_callback_in_cycle_2(self):
592        import gc
593
594        # This is just like test_callback_in_cycle_1, except that II is an
595        # old-style class.  The symptom is different then:  an instance of an
596        # old-style class looks in its own __dict__ first.  'J' happens to
597        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
598        # __dict__, so the attribute isn't found.  The difference is that
599        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
600        # __mro__), so no segfault occurs.  Instead it got:
601        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
602        #    Exception exceptions.AttributeError:
603        #   "II instance has no attribute 'J'" in <bound method II.acallback
604        #       of <?.II instance at 0x00B9B4B8>> ignored
605
606        class J(object):
607            pass
608
609        class II:
610            def acallback(self, ignore):
611                self.J
612
613        I = II()
614        I.J = J
615        I.wr = weakref.ref(J, I.acallback)
616
617        del I, J, II
618        gc.collect()
619
620    def test_callback_in_cycle_3(self):
621        import gc
622
623        # This one broke the first patch that fixed the last two.  In this
624        # case, the objects reachable from the callback aren't also reachable
625        # from the object (c1) *triggering* the callback:  you can get to
626        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
627        # got tp_clear'ed by the time the c2.cb callback got invoked.
628
629        class C:
630            def cb(self, ignore):
631                self.me
632                self.c1
633                self.wr
634
635        c1, c2 = C(), C()
636
637        c2.me = c2
638        c2.c1 = c1
639        c2.wr = weakref.ref(c1, c2.cb)
640
641        del c1, c2
642        gc.collect()
643
644    def test_callback_in_cycle_4(self):
645        import gc
646
647        # Like test_callback_in_cycle_3, except c2 and c1 have different
648        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
649        # objects reachable from the dying object (c1) isn't enough to stop
650        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
651        # The result was a segfault (C.__mro__ was NULL when the callback
652        # tried to look up self.me).
653
654        class C(object):
655            def cb(self, ignore):
656                self.me
657                self.c1
658                self.wr
659
660        class D:
661            pass
662
663        c1, c2 = D(), C()
664
665        c2.me = c2
666        c2.c1 = c1
667        c2.wr = weakref.ref(c1, c2.cb)
668
669        del c1, c2, C, D
670        gc.collect()
671
672    def test_callback_in_cycle_resurrection(self):
673        import gc
674
675        # Do something nasty in a weakref callback:  resurrect objects
676        # from dead cycles.  For this to be attempted, the weakref and
677        # its callback must also be part of the cyclic trash (else the
678        # objects reachable via the callback couldn't be in cyclic trash
679        # to begin with -- the callback would act like an external root).
680        # But gc clears trash weakrefs with callbacks early now, which
681        # disables the callbacks, so the callbacks shouldn't get called
682        # at all (and so nothing actually gets resurrected).
683
684        alist = []
685        class C(object):
686            def __init__(self, value):
687                self.attribute = value
688
689            def acallback(self, ignore):
690                alist.append(self.c)
691
692        c1, c2 = C(1), C(2)
693        c1.c = c2
694        c2.c = c1
695        c1.wr = weakref.ref(c2, c1.acallback)
696        c2.wr = weakref.ref(c1, c2.acallback)
697
698        def C_went_away(ignore):
699            alist.append("C went away")
700        wr = weakref.ref(C, C_went_away)
701
702        del c1, c2, C   # make them all trash
703        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
704
705        gc.collect()
706        # c1.wr and c2.wr were part of the cyclic trash, so should have
707        # been cleared without their callbacks executing.  OTOH, the weakref
708        # to C is bound to a function local (wr), and wasn't trash, so that
709        # callback should have been invoked when C went away.
710        self.assertEqual(alist, ["C went away"])
711        # The remaining weakref should be dead now (its callback ran).
712        self.assertEqual(wr(), None)
713
714        del alist[:]
715        gc.collect()
716        self.assertEqual(alist, [])
717
718    def test_callbacks_on_callback(self):
719        import gc
720
721        # Set up weakref callbacks *on* weakref callbacks.
722        alist = []
723        def safe_callback(ignore):
724            alist.append("safe_callback called")
725
726        class C(object):
727            def cb(self, ignore):
728                alist.append("cb called")
729
730        c, d = C(), C()
731        c.other = d
732        d.other = c
733        callback = c.cb
734        c.wr = weakref.ref(d, callback)     # this won't trigger
735        d.wr = weakref.ref(callback, d.cb)  # ditto
736        external_wr = weakref.ref(callback, safe_callback)  # but this will
737        self.assertIs(external_wr(), callback)
738
739        # The weakrefs attached to c and d should get cleared, so that
740        # C.cb is never called.  But external_wr isn't part of the cyclic
741        # trash, and no cyclic trash is reachable from it, so safe_callback
742        # should get invoked when the bound method object callback (c.cb)
743        # -- which is itself a callback, and also part of the cyclic trash --
744        # gets reclaimed at the end of gc.
745
746        del callback, c, d, C
747        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
748        gc.collect()
749        self.assertEqual(alist, ["safe_callback called"])
750        self.assertEqual(external_wr(), None)
751
752        del alist[:]
753        gc.collect()
754        self.assertEqual(alist, [])
755
756    def test_gc_during_ref_creation(self):
757        self.check_gc_during_creation(weakref.ref)
758
759    def test_gc_during_proxy_creation(self):
760        self.check_gc_during_creation(weakref.proxy)
761
762    def check_gc_during_creation(self, makeref):
763        thresholds = gc.get_threshold()
764        gc.set_threshold(1, 1, 1)
765        gc.collect()
766        class A:
767            pass
768
769        def callback(*args):
770            pass
771
772        referenced = A()
773
774        a = A()
775        a.a = a
776        a.wr = makeref(referenced)
777
778        try:
779            # now make sure the object and the ref get labeled as
780            # cyclic trash:
781            a = A()
782            weakref.ref(referenced, callback)
783
784        finally:
785            gc.set_threshold(*thresholds)
786
787    def test_ref_created_during_del(self):
788        # Bug #1377858
789        # A weakref created in an object's __del__() would crash the
790        # interpreter when the weakref was cleaned up since it would refer to
791        # non-existent memory.  This test should not segfault the interpreter.
792        class Target(object):
793            def __del__(self):
794                global ref_from_del
795                ref_from_del = weakref.ref(self)
796
797        w = Target()
798
799    def test_init(self):
800        # Issue 3634
801        # <weakref to class>.__init__() doesn't check errors correctly
802        r = weakref.ref(Exception)
803        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
804        # No exception should be raised here
805        gc.collect()
806
807    def test_classes(self):
808        # Check that classes are weakrefable.
809        class A(object):
810            pass
811        l = []
812        weakref.ref(int)
813        a = weakref.ref(A, l.append)
814        A = None
815        gc.collect()
816        self.assertEqual(a(), None)
817        self.assertEqual(l, [a])
818
819    def test_equality(self):
820        # Alive weakrefs defer equality testing to their underlying object.
821        x = Object(1)
822        y = Object(1)
823        z = Object(2)
824        a = weakref.ref(x)
825        b = weakref.ref(y)
826        c = weakref.ref(z)
827        d = weakref.ref(x)
828        # Note how we directly test the operators here, to stress both
829        # __eq__ and __ne__.
830        self.assertTrue(a == b)
831        self.assertFalse(a != b)
832        self.assertFalse(a == c)
833        self.assertTrue(a != c)
834        self.assertTrue(a == d)
835        self.assertFalse(a != d)
836        self.assertFalse(a == x)
837        self.assertTrue(a != x)
838        self.assertTrue(a == ALWAYS_EQ)
839        self.assertFalse(a != ALWAYS_EQ)
840        del x, y, z
841        gc.collect()
842        for r in a, b, c:
843            # Sanity check
844            self.assertIs(r(), None)
845        # Dead weakrefs compare by identity: whether `a` and `d` are the
846        # same weakref object is an implementation detail, since they pointed
847        # to the same original object and didn't have a callback.
848        # (see issue #16453).
849        self.assertFalse(a == b)
850        self.assertTrue(a != b)
851        self.assertFalse(a == c)
852        self.assertTrue(a != c)
853        self.assertEqual(a == d, a is d)
854        self.assertEqual(a != d, a is not d)
855
856    def test_ordering(self):
857        # weakrefs cannot be ordered, even if the underlying objects can.
858        ops = [operator.lt, operator.gt, operator.le, operator.ge]
859        x = Object(1)
860        y = Object(1)
861        a = weakref.ref(x)
862        b = weakref.ref(y)
863        for op in ops:
864            self.assertRaises(TypeError, op, a, b)
865        # Same when dead.
866        del x, y
867        gc.collect()
868        for op in ops:
869            self.assertRaises(TypeError, op, a, b)
870
871    def test_hashing(self):
872        # Alive weakrefs hash the same as the underlying object
873        x = Object(42)
874        y = Object(42)
875        a = weakref.ref(x)
876        b = weakref.ref(y)
877        self.assertEqual(hash(a), hash(42))
878        del x, y
879        gc.collect()
880        # Dead weakrefs:
881        # - retain their hash is they were hashed when alive;
882        # - otherwise, cannot be hashed.
883        self.assertEqual(hash(a), hash(42))
884        self.assertRaises(TypeError, hash, b)
885
886    def test_trashcan_16602(self):
887        # Issue #16602: when a weakref's target was part of a long
888        # deallocation chain, the trashcan mechanism could delay clearing
889        # of the weakref and make the target object visible from outside
890        # code even though its refcount had dropped to 0.  A crash ensued.
891        class C:
892            def __init__(self, parent):
893                if not parent:
894                    return
895                wself = weakref.ref(self)
896                def cb(wparent):
897                    o = wself()
898                self.wparent = weakref.ref(parent, cb)
899
900        d = weakref.WeakKeyDictionary()
901        root = c = C(None)
902        for n in range(100):
903            d[c] = c = C(c)
904        del root
905        gc.collect()
906
907    def test_callback_attribute(self):
908        x = Object(1)
909        callback = lambda ref: None
910        ref1 = weakref.ref(x, callback)
911        self.assertIs(ref1.__callback__, callback)
912
913        ref2 = weakref.ref(x)
914        self.assertIsNone(ref2.__callback__)
915
916    def test_callback_attribute_after_deletion(self):
917        x = Object(1)
918        ref = weakref.ref(x, self.callback)
919        self.assertIsNotNone(ref.__callback__)
920        del x
921        support.gc_collect()
922        self.assertIsNone(ref.__callback__)
923
924    def test_set_callback_attribute(self):
925        x = Object(1)
926        callback = lambda ref: None
927        ref1 = weakref.ref(x, callback)
928        with self.assertRaises(AttributeError):
929            ref1.__callback__ = lambda ref: None
930
931    def test_callback_gcs(self):
932        class ObjectWithDel(Object):
933            def __del__(self): pass
934        x = ObjectWithDel(1)
935        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
936        del x
937        support.gc_collect()
938
939
940class SubclassableWeakrefTestCase(TestBase):
941
942    def test_subclass_refs(self):
943        class MyRef(weakref.ref):
944            def __init__(self, ob, callback=None, value=42):
945                self.value = value
946                super().__init__(ob, callback)
947            def __call__(self):
948                self.called = True
949                return super().__call__()
950        o = Object("foo")
951        mr = MyRef(o, value=24)
952        self.assertIs(mr(), o)
953        self.assertTrue(mr.called)
954        self.assertEqual(mr.value, 24)
955        del o
956        self.assertIsNone(mr())
957        self.assertTrue(mr.called)
958
959    def test_subclass_refs_dont_replace_standard_refs(self):
960        class MyRef(weakref.ref):
961            pass
962        o = Object(42)
963        r1 = MyRef(o)
964        r2 = weakref.ref(o)
965        self.assertIsNot(r1, r2)
966        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
967        self.assertEqual(weakref.getweakrefcount(o), 2)
968        r3 = MyRef(o)
969        self.assertEqual(weakref.getweakrefcount(o), 3)
970        refs = weakref.getweakrefs(o)
971        self.assertEqual(len(refs), 3)
972        self.assertIs(r2, refs[0])
973        self.assertIn(r1, refs[1:])
974        self.assertIn(r3, refs[1:])
975
976    def test_subclass_refs_dont_conflate_callbacks(self):
977        class MyRef(weakref.ref):
978            pass
979        o = Object(42)
980        r1 = MyRef(o, id)
981        r2 = MyRef(o, str)
982        self.assertIsNot(r1, r2)
983        refs = weakref.getweakrefs(o)
984        self.assertIn(r1, refs)
985        self.assertIn(r2, refs)
986
987    def test_subclass_refs_with_slots(self):
988        class MyRef(weakref.ref):
989            __slots__ = "slot1", "slot2"
990            def __new__(type, ob, callback, slot1, slot2):
991                return weakref.ref.__new__(type, ob, callback)
992            def __init__(self, ob, callback, slot1, slot2):
993                self.slot1 = slot1
994                self.slot2 = slot2
995            def meth(self):
996                return self.slot1 + self.slot2
997        o = Object(42)
998        r = MyRef(o, None, "abc", "def")
999        self.assertEqual(r.slot1, "abc")
1000        self.assertEqual(r.slot2, "def")
1001        self.assertEqual(r.meth(), "abcdef")
1002        self.assertFalse(hasattr(r, "__dict__"))
1003
1004    def test_subclass_refs_with_cycle(self):
1005        """Confirm https://bugs.python.org/issue3100 is fixed."""
1006        # An instance of a weakref subclass can have attributes.
1007        # If such a weakref holds the only strong reference to the object,
1008        # deleting the weakref will delete the object. In this case,
1009        # the callback must not be called, because the ref object is
1010        # being deleted.
1011        class MyRef(weakref.ref):
1012            pass
1013
1014        # Use a local callback, for "regrtest -R::"
1015        # to detect refcounting problems
1016        def callback(w):
1017            self.cbcalled += 1
1018
1019        o = C()
1020        r1 = MyRef(o, callback)
1021        r1.o = o
1022        del o
1023
1024        del r1 # Used to crash here
1025
1026        self.assertEqual(self.cbcalled, 0)
1027
1028        # Same test, with two weakrefs to the same object
1029        # (since code paths are different)
1030        o = C()
1031        r1 = MyRef(o, callback)
1032        r2 = MyRef(o, callback)
1033        r1.r = r2
1034        r2.o = o
1035        del o
1036        del r2
1037
1038        del r1 # Used to crash here
1039
1040        self.assertEqual(self.cbcalled, 0)
1041
1042
1043class WeakMethodTestCase(unittest.TestCase):
1044
1045    def _subclass(self):
1046        """Return an Object subclass overriding `some_method`."""
1047        class C(Object):
1048            def some_method(self):
1049                return 6
1050        return C
1051
1052    def test_alive(self):
1053        o = Object(1)
1054        r = weakref.WeakMethod(o.some_method)
1055        self.assertIsInstance(r, weakref.ReferenceType)
1056        self.assertIsInstance(r(), type(o.some_method))
1057        self.assertIs(r().__self__, o)
1058        self.assertIs(r().__func__, o.some_method.__func__)
1059        self.assertEqual(r()(), 4)
1060
1061    def test_object_dead(self):
1062        o = Object(1)
1063        r = weakref.WeakMethod(o.some_method)
1064        del o
1065        gc.collect()
1066        self.assertIs(r(), None)
1067
1068    def test_method_dead(self):
1069        C = self._subclass()
1070        o = C(1)
1071        r = weakref.WeakMethod(o.some_method)
1072        del C.some_method
1073        gc.collect()
1074        self.assertIs(r(), None)
1075
1076    def test_callback_when_object_dead(self):
1077        # Test callback behaviour when object dies first.
1078        C = self._subclass()
1079        calls = []
1080        def cb(arg):
1081            calls.append(arg)
1082        o = C(1)
1083        r = weakref.WeakMethod(o.some_method, cb)
1084        del o
1085        gc.collect()
1086        self.assertEqual(calls, [r])
1087        # Callback is only called once.
1088        C.some_method = Object.some_method
1089        gc.collect()
1090        self.assertEqual(calls, [r])
1091
1092    def test_callback_when_method_dead(self):
1093        # Test callback behaviour when method dies first.
1094        C = self._subclass()
1095        calls = []
1096        def cb(arg):
1097            calls.append(arg)
1098        o = C(1)
1099        r = weakref.WeakMethod(o.some_method, cb)
1100        del C.some_method
1101        gc.collect()
1102        self.assertEqual(calls, [r])
1103        # Callback is only called once.
1104        del o
1105        gc.collect()
1106        self.assertEqual(calls, [r])
1107
1108    @support.cpython_only
1109    def test_no_cycles(self):
1110        # A WeakMethod doesn't create any reference cycle to itself.
1111        o = Object(1)
1112        def cb(_):
1113            pass
1114        r = weakref.WeakMethod(o.some_method, cb)
1115        wr = weakref.ref(r)
1116        del r
1117        self.assertIs(wr(), None)
1118
1119    def test_equality(self):
1120        def _eq(a, b):
1121            self.assertTrue(a == b)
1122            self.assertFalse(a != b)
1123        def _ne(a, b):
1124            self.assertTrue(a != b)
1125            self.assertFalse(a == b)
1126        x = Object(1)
1127        y = Object(1)
1128        a = weakref.WeakMethod(x.some_method)
1129        b = weakref.WeakMethod(y.some_method)
1130        c = weakref.WeakMethod(x.other_method)
1131        d = weakref.WeakMethod(y.other_method)
1132        # Objects equal, same method
1133        _eq(a, b)
1134        _eq(c, d)
1135        # Objects equal, different method
1136        _ne(a, c)
1137        _ne(a, d)
1138        _ne(b, c)
1139        _ne(b, d)
1140        # Objects unequal, same or different method
1141        z = Object(2)
1142        e = weakref.WeakMethod(z.some_method)
1143        f = weakref.WeakMethod(z.other_method)
1144        _ne(a, e)
1145        _ne(a, f)
1146        _ne(b, e)
1147        _ne(b, f)
1148        # Compare with different types
1149        _ne(a, x.some_method)
1150        _eq(a, ALWAYS_EQ)
1151        del x, y, z
1152        gc.collect()
1153        # Dead WeakMethods compare by identity
1154        refs = a, b, c, d, e, f
1155        for q in refs:
1156            for r in refs:
1157                self.assertEqual(q == r, q is r)
1158                self.assertEqual(q != r, q is not r)
1159
1160    def test_hashing(self):
1161        # Alive WeakMethods are hashable if the underlying object is
1162        # hashable.
1163        x = Object(1)
1164        y = Object(1)
1165        a = weakref.WeakMethod(x.some_method)
1166        b = weakref.WeakMethod(y.some_method)
1167        c = weakref.WeakMethod(y.other_method)
1168        # Since WeakMethod objects are equal, the hashes should be equal.
1169        self.assertEqual(hash(a), hash(b))
1170        ha = hash(a)
1171        # Dead WeakMethods retain their old hash value
1172        del x, y
1173        gc.collect()
1174        self.assertEqual(hash(a), ha)
1175        self.assertEqual(hash(b), ha)
1176        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1177        self.assertRaises(TypeError, hash, c)
1178
1179
1180class MappingTestCase(TestBase):
1181
1182    COUNT = 10
1183
1184    def check_len_cycles(self, dict_type, cons):
1185        N = 20
1186        items = [RefCycle() for i in range(N)]
1187        dct = dict_type(cons(o) for o in items)
1188        # Keep an iterator alive
1189        it = dct.items()
1190        try:
1191            next(it)
1192        except StopIteration:
1193            pass
1194        del items
1195        gc.collect()
1196        n1 = len(dct)
1197        del it
1198        gc.collect()
1199        n2 = len(dct)
1200        # one item may be kept alive inside the iterator
1201        self.assertIn(n1, (0, 1))
1202        self.assertEqual(n2, 0)
1203
1204    def test_weak_keyed_len_cycles(self):
1205        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1206
1207    def test_weak_valued_len_cycles(self):
1208        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1209
1210    def check_len_race(self, dict_type, cons):
1211        # Extended sanity checks for len() in the face of cyclic collection
1212        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1213        for th in range(1, 100):
1214            N = 20
1215            gc.collect(0)
1216            gc.set_threshold(th, th, th)
1217            items = [RefCycle() for i in range(N)]
1218            dct = dict_type(cons(o) for o in items)
1219            del items
1220            # All items will be collected at next garbage collection pass
1221            it = dct.items()
1222            try:
1223                next(it)
1224            except StopIteration:
1225                pass
1226            n1 = len(dct)
1227            del it
1228            n2 = len(dct)
1229            self.assertGreaterEqual(n1, 0)
1230            self.assertLessEqual(n1, N)
1231            self.assertGreaterEqual(n2, 0)
1232            self.assertLessEqual(n2, n1)
1233
1234    def test_weak_keyed_len_race(self):
1235        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1236
1237    def test_weak_valued_len_race(self):
1238        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1239
1240    def test_weak_values(self):
1241        #
1242        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1243        #
1244        dict, objects = self.make_weak_valued_dict()
1245        for o in objects:
1246            self.assertEqual(weakref.getweakrefcount(o), 1)
1247            self.assertIs(o, dict[o.arg],
1248                         "wrong object returned by weak dict!")
1249        items1 = list(dict.items())
1250        items2 = list(dict.copy().items())
1251        items1.sort()
1252        items2.sort()
1253        self.assertEqual(items1, items2,
1254                     "cloning of weak-valued dictionary did not work!")
1255        del items1, items2
1256        self.assertEqual(len(dict), self.COUNT)
1257        del objects[0]
1258        self.assertEqual(len(dict), self.COUNT - 1,
1259                     "deleting object did not cause dictionary update")
1260        del objects, o
1261        self.assertEqual(len(dict), 0,
1262                     "deleting the values did not clear the dictionary")
1263        # regression on SF bug #447152:
1264        dict = weakref.WeakValueDictionary()
1265        self.assertRaises(KeyError, dict.__getitem__, 1)
1266        dict[2] = C()
1267        self.assertRaises(KeyError, dict.__getitem__, 2)
1268
1269    def test_weak_keys(self):
1270        #
1271        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1272        #  len(d), k in d.
1273        #
1274        dict, objects = self.make_weak_keyed_dict()
1275        for o in objects:
1276            self.assertEqual(weakref.getweakrefcount(o), 1,
1277                         "wrong number of weak references to %r!" % o)
1278            self.assertIs(o.arg, dict[o],
1279                         "wrong object returned by weak dict!")
1280        items1 = dict.items()
1281        items2 = dict.copy().items()
1282        self.assertEqual(set(items1), set(items2),
1283                     "cloning of weak-keyed dictionary did not work!")
1284        del items1, items2
1285        self.assertEqual(len(dict), self.COUNT)
1286        del objects[0]
1287        self.assertEqual(len(dict), (self.COUNT - 1),
1288                     "deleting object did not cause dictionary update")
1289        del objects, o
1290        self.assertEqual(len(dict), 0,
1291                     "deleting the keys did not clear the dictionary")
1292        o = Object(42)
1293        dict[o] = "What is the meaning of the universe?"
1294        self.assertIn(o, dict)
1295        self.assertNotIn(34, dict)
1296
1297    def test_weak_keyed_iters(self):
1298        dict, objects = self.make_weak_keyed_dict()
1299        self.check_iters(dict)
1300
1301        # Test keyrefs()
1302        refs = dict.keyrefs()
1303        self.assertEqual(len(refs), len(objects))
1304        objects2 = list(objects)
1305        for wr in refs:
1306            ob = wr()
1307            self.assertIn(ob, dict)
1308            self.assertIn(ob, dict)
1309            self.assertEqual(ob.arg, dict[ob])
1310            objects2.remove(ob)
1311        self.assertEqual(len(objects2), 0)
1312
1313        # Test iterkeyrefs()
1314        objects2 = list(objects)
1315        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1316        for wr in dict.keyrefs():
1317            ob = wr()
1318            self.assertIn(ob, dict)
1319            self.assertIn(ob, dict)
1320            self.assertEqual(ob.arg, dict[ob])
1321            objects2.remove(ob)
1322        self.assertEqual(len(objects2), 0)
1323
1324    def test_weak_valued_iters(self):
1325        dict, objects = self.make_weak_valued_dict()
1326        self.check_iters(dict)
1327
1328        # Test valuerefs()
1329        refs = dict.valuerefs()
1330        self.assertEqual(len(refs), len(objects))
1331        objects2 = list(objects)
1332        for wr in refs:
1333            ob = wr()
1334            self.assertEqual(ob, dict[ob.arg])
1335            self.assertEqual(ob.arg, dict[ob.arg].arg)
1336            objects2.remove(ob)
1337        self.assertEqual(len(objects2), 0)
1338
1339        # Test itervaluerefs()
1340        objects2 = list(objects)
1341        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1342        for wr in dict.itervaluerefs():
1343            ob = wr()
1344            self.assertEqual(ob, dict[ob.arg])
1345            self.assertEqual(ob.arg, dict[ob.arg].arg)
1346            objects2.remove(ob)
1347        self.assertEqual(len(objects2), 0)
1348
1349    def check_iters(self, dict):
1350        # item iterator:
1351        items = list(dict.items())
1352        for item in dict.items():
1353            items.remove(item)
1354        self.assertFalse(items, "items() did not touch all items")
1355
1356        # key iterator, via __iter__():
1357        keys = list(dict.keys())
1358        for k in dict:
1359            keys.remove(k)
1360        self.assertFalse(keys, "__iter__() did not touch all keys")
1361
1362        # key iterator, via iterkeys():
1363        keys = list(dict.keys())
1364        for k in dict.keys():
1365            keys.remove(k)
1366        self.assertFalse(keys, "iterkeys() did not touch all keys")
1367
1368        # value iterator:
1369        values = list(dict.values())
1370        for v in dict.values():
1371            values.remove(v)
1372        self.assertFalse(values,
1373                     "itervalues() did not touch all values")
1374
1375    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1376        n = len(dict)
1377        it = iter(getattr(dict, iter_name)())
1378        next(it)             # Trigger internal iteration
1379        # Destroy an object
1380        del objects[-1]
1381        gc.collect()    # just in case
1382        # We have removed either the first consumed object, or another one
1383        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1384        del it
1385        # The removal has been committed
1386        self.assertEqual(len(dict), n - 1)
1387
1388    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1389        # Check that we can explicitly mutate the weak dict without
1390        # interfering with delayed removal.
1391        # `testcontext` should create an iterator, destroy one of the
1392        # weakref'ed objects and then return a new key/value pair corresponding
1393        # to the destroyed object.
1394        with testcontext() as (k, v):
1395            self.assertNotIn(k, dict)
1396        with testcontext() as (k, v):
1397            self.assertRaises(KeyError, dict.__delitem__, k)
1398        self.assertNotIn(k, dict)
1399        with testcontext() as (k, v):
1400            self.assertRaises(KeyError, dict.pop, k)
1401        self.assertNotIn(k, dict)
1402        with testcontext() as (k, v):
1403            dict[k] = v
1404        self.assertEqual(dict[k], v)
1405        ddict = copy.copy(dict)
1406        with testcontext() as (k, v):
1407            dict.update(ddict)
1408        self.assertEqual(dict, ddict)
1409        with testcontext() as (k, v):
1410            dict.clear()
1411        self.assertEqual(len(dict), 0)
1412
1413    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1414        # Check that len() works when both iterating and removing keys
1415        # explicitly through various means (.pop(), .clear()...), while
1416        # implicit mutation is deferred because an iterator is alive.
1417        # (each call to testcontext() should schedule one item for removal
1418        #  for this test to work properly)
1419        o = Object(123456)
1420        with testcontext():
1421            n = len(dict)
1422            # Since underlaying dict is ordered, first item is popped
1423            dict.pop(next(dict.keys()))
1424            self.assertEqual(len(dict), n - 1)
1425            dict[o] = o
1426            self.assertEqual(len(dict), n)
1427        # last item in objects is removed from dict in context shutdown
1428        with testcontext():
1429            self.assertEqual(len(dict), n - 1)
1430            # Then, (o, o) is popped
1431            dict.popitem()
1432            self.assertEqual(len(dict), n - 2)
1433        with testcontext():
1434            self.assertEqual(len(dict), n - 3)
1435            del dict[next(dict.keys())]
1436            self.assertEqual(len(dict), n - 4)
1437        with testcontext():
1438            self.assertEqual(len(dict), n - 5)
1439            dict.popitem()
1440            self.assertEqual(len(dict), n - 6)
1441        with testcontext():
1442            dict.clear()
1443            self.assertEqual(len(dict), 0)
1444        self.assertEqual(len(dict), 0)
1445
1446    def test_weak_keys_destroy_while_iterating(self):
1447        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1448        dict, objects = self.make_weak_keyed_dict()
1449        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1450        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1451        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1452        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1453        dict, objects = self.make_weak_keyed_dict()
1454        @contextlib.contextmanager
1455        def testcontext():
1456            try:
1457                it = iter(dict.items())
1458                next(it)
1459                # Schedule a key/value for removal and recreate it
1460                v = objects.pop().arg
1461                gc.collect()      # just in case
1462                yield Object(v), v
1463            finally:
1464                it = None           # should commit all removals
1465                gc.collect()
1466        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1467        # Issue #21173: len() fragile when keys are both implicitly and
1468        # explicitly removed.
1469        dict, objects = self.make_weak_keyed_dict()
1470        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1471
1472    def test_weak_values_destroy_while_iterating(self):
1473        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1474        dict, objects = self.make_weak_valued_dict()
1475        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1476        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1477        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1478        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1479        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1480        dict, objects = self.make_weak_valued_dict()
1481        @contextlib.contextmanager
1482        def testcontext():
1483            try:
1484                it = iter(dict.items())
1485                next(it)
1486                # Schedule a key/value for removal and recreate it
1487                k = objects.pop().arg
1488                gc.collect()      # just in case
1489                yield k, Object(k)
1490            finally:
1491                it = None           # should commit all removals
1492                gc.collect()
1493        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1494        dict, objects = self.make_weak_valued_dict()
1495        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1496
1497    def test_make_weak_keyed_dict_from_dict(self):
1498        o = Object(3)
1499        dict = weakref.WeakKeyDictionary({o:364})
1500        self.assertEqual(dict[o], 364)
1501
1502    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1503        o = Object(3)
1504        dict = weakref.WeakKeyDictionary({o:364})
1505        dict2 = weakref.WeakKeyDictionary(dict)
1506        self.assertEqual(dict[o], 364)
1507
1508    def make_weak_keyed_dict(self):
1509        dict = weakref.WeakKeyDictionary()
1510        objects = list(map(Object, range(self.COUNT)))
1511        for o in objects:
1512            dict[o] = o.arg
1513        return dict, objects
1514
1515    def test_make_weak_valued_dict_from_dict(self):
1516        o = Object(3)
1517        dict = weakref.WeakValueDictionary({364:o})
1518        self.assertEqual(dict[364], o)
1519
1520    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1521        o = Object(3)
1522        dict = weakref.WeakValueDictionary({364:o})
1523        dict2 = weakref.WeakValueDictionary(dict)
1524        self.assertEqual(dict[364], o)
1525
1526    def test_make_weak_valued_dict_misc(self):
1527        # errors
1528        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1529        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1530        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1531        # special keyword arguments
1532        o = Object(3)
1533        for kw in 'self', 'dict', 'other', 'iterable':
1534            d = weakref.WeakValueDictionary(**{kw: o})
1535            self.assertEqual(list(d.keys()), [kw])
1536            self.assertEqual(d[kw], o)
1537
1538    def make_weak_valued_dict(self):
1539        dict = weakref.WeakValueDictionary()
1540        objects = list(map(Object, range(self.COUNT)))
1541        for o in objects:
1542            dict[o.arg] = o
1543        return dict, objects
1544
1545    def check_popitem(self, klass, key1, value1, key2, value2):
1546        weakdict = klass()
1547        weakdict[key1] = value1
1548        weakdict[key2] = value2
1549        self.assertEqual(len(weakdict), 2)
1550        k, v = weakdict.popitem()
1551        self.assertEqual(len(weakdict), 1)
1552        if k is key1:
1553            self.assertIs(v, value1)
1554        else:
1555            self.assertIs(v, value2)
1556        k, v = weakdict.popitem()
1557        self.assertEqual(len(weakdict), 0)
1558        if k is key1:
1559            self.assertIs(v, value1)
1560        else:
1561            self.assertIs(v, value2)
1562
1563    def test_weak_valued_dict_popitem(self):
1564        self.check_popitem(weakref.WeakValueDictionary,
1565                           "key1", C(), "key2", C())
1566
1567    def test_weak_keyed_dict_popitem(self):
1568        self.check_popitem(weakref.WeakKeyDictionary,
1569                           C(), "value 1", C(), "value 2")
1570
1571    def check_setdefault(self, klass, key, value1, value2):
1572        self.assertIsNot(value1, value2,
1573                     "invalid test"
1574                     " -- value parameters must be distinct objects")
1575        weakdict = klass()
1576        o = weakdict.setdefault(key, value1)
1577        self.assertIs(o, value1)
1578        self.assertIn(key, weakdict)
1579        self.assertIs(weakdict.get(key), value1)
1580        self.assertIs(weakdict[key], value1)
1581
1582        o = weakdict.setdefault(key, value2)
1583        self.assertIs(o, value1)
1584        self.assertIn(key, weakdict)
1585        self.assertIs(weakdict.get(key), value1)
1586        self.assertIs(weakdict[key], value1)
1587
1588    def test_weak_valued_dict_setdefault(self):
1589        self.check_setdefault(weakref.WeakValueDictionary,
1590                              "key", C(), C())
1591
1592    def test_weak_keyed_dict_setdefault(self):
1593        self.check_setdefault(weakref.WeakKeyDictionary,
1594                              C(), "value 1", "value 2")
1595
1596    def check_update(self, klass, dict):
1597        #
1598        #  This exercises d.update(), len(d), d.keys(), k in d,
1599        #  d.get(), d[].
1600        #
1601        weakdict = klass()
1602        weakdict.update(dict)
1603        self.assertEqual(len(weakdict), len(dict))
1604        for k in weakdict.keys():
1605            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1606            v = dict.get(k)
1607            self.assertIs(v, weakdict[k])
1608            self.assertIs(v, weakdict.get(k))
1609        for k in dict.keys():
1610            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1611            v = dict[k]
1612            self.assertIs(v, weakdict[k])
1613            self.assertIs(v, weakdict.get(k))
1614
1615    def test_weak_valued_dict_update(self):
1616        self.check_update(weakref.WeakValueDictionary,
1617                          {1: C(), 'a': C(), C(): C()})
1618        # errors
1619        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1620        d = weakref.WeakValueDictionary()
1621        self.assertRaises(TypeError, d.update, {}, {})
1622        self.assertRaises(TypeError, d.update, (), ())
1623        self.assertEqual(list(d.keys()), [])
1624        # special keyword arguments
1625        o = Object(3)
1626        for kw in 'self', 'dict', 'other', 'iterable':
1627            d = weakref.WeakValueDictionary()
1628            d.update(**{kw: o})
1629            self.assertEqual(list(d.keys()), [kw])
1630            self.assertEqual(d[kw], o)
1631
1632    def test_weak_valued_union_operators(self):
1633        a = C()
1634        b = C()
1635        c = C()
1636        wvd1 = weakref.WeakValueDictionary({1: a})
1637        wvd2 = weakref.WeakValueDictionary({1: b, 2: a})
1638        wvd3 = wvd1.copy()
1639        d1 = {1: c, 3: b}
1640        pairs = [(5, c), (6, b)]
1641
1642        tmp1 = wvd1 | wvd2 # Between two WeakValueDictionaries
1643        self.assertEqual(dict(tmp1), dict(wvd1) | dict(wvd2))
1644        self.assertIs(type(tmp1), weakref.WeakValueDictionary)
1645        wvd1 |= wvd2
1646        self.assertEqual(wvd1, tmp1)
1647
1648        tmp2 = wvd2 | d1 # Between WeakValueDictionary and mapping
1649        self.assertEqual(dict(tmp2), dict(wvd2) | d1)
1650        self.assertIs(type(tmp2), weakref.WeakValueDictionary)
1651        wvd2 |= d1
1652        self.assertEqual(wvd2, tmp2)
1653
1654        tmp3 = wvd3.copy() # Between WeakValueDictionary and iterable key, value
1655        tmp3 |= pairs
1656        self.assertEqual(dict(tmp3), dict(wvd3) | dict(pairs))
1657        self.assertIs(type(tmp3), weakref.WeakValueDictionary)
1658
1659        tmp4 = d1 | wvd3 # Testing .__ror__
1660        self.assertEqual(dict(tmp4), d1 | dict(wvd3))
1661        self.assertIs(type(tmp4), weakref.WeakValueDictionary)
1662
1663        del a
1664        self.assertNotIn(2, tmp1)
1665        self.assertNotIn(2, tmp2)
1666        self.assertNotIn(1, tmp3)
1667        self.assertNotIn(1, tmp4)
1668
1669    def test_weak_keyed_dict_update(self):
1670        self.check_update(weakref.WeakKeyDictionary,
1671                          {C(): 1, C(): 2, C(): 3})
1672
1673    def test_weak_keyed_delitem(self):
1674        d = weakref.WeakKeyDictionary()
1675        o1 = Object('1')
1676        o2 = Object('2')
1677        d[o1] = 'something'
1678        d[o2] = 'something'
1679        self.assertEqual(len(d), 2)
1680        del d[o1]
1681        self.assertEqual(len(d), 1)
1682        self.assertEqual(list(d.keys()), [o2])
1683
1684    def test_weak_keyed_union_operators(self):
1685        o1 = C()
1686        o2 = C()
1687        o3 = C()
1688        wkd1 = weakref.WeakKeyDictionary({o1: 1, o2: 2})
1689        wkd2 = weakref.WeakKeyDictionary({o3: 3, o1: 4})
1690        wkd3 = wkd1.copy()
1691        d1 = {o2: '5', o3: '6'}
1692        pairs = [(o2, 7), (o3, 8)]
1693
1694        tmp1 = wkd1 | wkd2 # Between two WeakKeyDictionaries
1695        self.assertEqual(dict(tmp1), dict(wkd1) | dict(wkd2))
1696        self.assertIs(type(tmp1), weakref.WeakKeyDictionary)
1697        wkd1 |= wkd2
1698        self.assertEqual(wkd1, tmp1)
1699
1700        tmp2 = wkd2 | d1 # Between WeakKeyDictionary and mapping
1701        self.assertEqual(dict(tmp2), dict(wkd2) | d1)
1702        self.assertIs(type(tmp2), weakref.WeakKeyDictionary)
1703        wkd2 |= d1
1704        self.assertEqual(wkd2, tmp2)
1705
1706        tmp3 = wkd3.copy() # Between WeakKeyDictionary and iterable key, value
1707        tmp3 |= pairs
1708        self.assertEqual(dict(tmp3), dict(wkd3) | dict(pairs))
1709        self.assertIs(type(tmp3), weakref.WeakKeyDictionary)
1710
1711        tmp4 = d1 | wkd3 # Testing .__ror__
1712        self.assertEqual(dict(tmp4), d1 | dict(wkd3))
1713        self.assertIs(type(tmp4), weakref.WeakKeyDictionary)
1714
1715        del o1
1716        self.assertNotIn(4, tmp1.values())
1717        self.assertNotIn(4, tmp2.values())
1718        self.assertNotIn(1, tmp3.values())
1719        self.assertNotIn(1, tmp4.values())
1720
1721    def test_weak_valued_delitem(self):
1722        d = weakref.WeakValueDictionary()
1723        o1 = Object('1')
1724        o2 = Object('2')
1725        d['something'] = o1
1726        d['something else'] = o2
1727        self.assertEqual(len(d), 2)
1728        del d['something']
1729        self.assertEqual(len(d), 1)
1730        self.assertEqual(list(d.items()), [('something else', o2)])
1731
1732    def test_weak_keyed_bad_delitem(self):
1733        d = weakref.WeakKeyDictionary()
1734        o = Object('1')
1735        # An attempt to delete an object that isn't there should raise
1736        # KeyError.  It didn't before 2.3.
1737        self.assertRaises(KeyError, d.__delitem__, o)
1738        self.assertRaises(KeyError, d.__getitem__, o)
1739
1740        # If a key isn't of a weakly referencable type, __getitem__ and
1741        # __setitem__ raise TypeError.  __delitem__ should too.
1742        self.assertRaises(TypeError, d.__delitem__,  13)
1743        self.assertRaises(TypeError, d.__getitem__,  13)
1744        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1745
1746    def test_weak_keyed_cascading_deletes(self):
1747        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1748        # over the keys via self.data.iterkeys().  If things vanished from
1749        # the dict during this (or got added), that caused a RuntimeError.
1750
1751        d = weakref.WeakKeyDictionary()
1752        mutate = False
1753
1754        class C(object):
1755            def __init__(self, i):
1756                self.value = i
1757            def __hash__(self):
1758                return hash(self.value)
1759            def __eq__(self, other):
1760                if mutate:
1761                    # Side effect that mutates the dict, by removing the
1762                    # last strong reference to a key.
1763                    del objs[-1]
1764                return self.value == other.value
1765
1766        objs = [C(i) for i in range(4)]
1767        for o in objs:
1768            d[o] = o.value
1769        del o   # now the only strong references to keys are in objs
1770        # Find the order in which iterkeys sees the keys.
1771        objs = list(d.keys())
1772        # Reverse it, so that the iteration implementation of __delitem__
1773        # has to keep looping to find the first object we delete.
1774        objs.reverse()
1775
1776        # Turn on mutation in C.__eq__.  The first time through the loop,
1777        # under the iterkeys() business the first comparison will delete
1778        # the last item iterkeys() would see, and that causes a
1779        #     RuntimeError: dictionary changed size during iteration
1780        # when the iterkeys() loop goes around to try comparing the next
1781        # key.  After this was fixed, it just deletes the last object *our*
1782        # "for o in obj" loop would have gotten to.
1783        mutate = True
1784        count = 0
1785        for o in objs:
1786            count += 1
1787            del d[o]
1788        self.assertEqual(len(d), 0)
1789        self.assertEqual(count, 2)
1790
1791    def test_make_weak_valued_dict_repr(self):
1792        dict = weakref.WeakValueDictionary()
1793        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1794
1795    def test_make_weak_keyed_dict_repr(self):
1796        dict = weakref.WeakKeyDictionary()
1797        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1798
1799    def test_threaded_weak_valued_setdefault(self):
1800        d = weakref.WeakValueDictionary()
1801        with collect_in_thread():
1802            for i in range(100000):
1803                x = d.setdefault(10, RefCycle())
1804                self.assertIsNot(x, None)  # we never put None in there!
1805                del x
1806
1807    def test_threaded_weak_valued_pop(self):
1808        d = weakref.WeakValueDictionary()
1809        with collect_in_thread():
1810            for i in range(100000):
1811                d[10] = RefCycle()
1812                x = d.pop(10, 10)
1813                self.assertIsNot(x, None)  # we never put None in there!
1814
1815    def test_threaded_weak_valued_consistency(self):
1816        # Issue #28427: old keys should not remove new values from
1817        # WeakValueDictionary when collecting from another thread.
1818        d = weakref.WeakValueDictionary()
1819        with collect_in_thread():
1820            for i in range(200000):
1821                o = RefCycle()
1822                d[10] = o
1823                # o is still alive, so the dict can't be empty
1824                self.assertEqual(len(d), 1)
1825                o = None  # lose ref
1826
1827    def check_threaded_weak_dict_copy(self, type_, deepcopy):
1828        # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1829        # `deepcopy` should be either True or False.
1830        exc = []
1831
1832        class DummyKey:
1833            def __init__(self, ctr):
1834                self.ctr = ctr
1835
1836        class DummyValue:
1837            def __init__(self, ctr):
1838                self.ctr = ctr
1839
1840        def dict_copy(d, exc):
1841            try:
1842                if deepcopy is True:
1843                    _ = copy.deepcopy(d)
1844                else:
1845                    _ = d.copy()
1846            except Exception as ex:
1847                exc.append(ex)
1848
1849        def pop_and_collect(lst):
1850            gc_ctr = 0
1851            while lst:
1852                i = random.randint(0, len(lst) - 1)
1853                gc_ctr += 1
1854                lst.pop(i)
1855                if gc_ctr % 10000 == 0:
1856                    gc.collect()  # just in case
1857
1858        self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
1859
1860        d = type_()
1861        keys = []
1862        values = []
1863        # Initialize d with many entries
1864        for i in range(70000):
1865            k, v = DummyKey(i), DummyValue(i)
1866            keys.append(k)
1867            values.append(v)
1868            d[k] = v
1869            del k
1870            del v
1871
1872        t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
1873        if type_ is weakref.WeakKeyDictionary:
1874            t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
1875        else:  # weakref.WeakValueDictionary
1876            t_collect = threading.Thread(target=pop_and_collect, args=(values,))
1877
1878        t_copy.start()
1879        t_collect.start()
1880
1881        t_copy.join()
1882        t_collect.join()
1883
1884        # Test exceptions
1885        if exc:
1886            raise exc[0]
1887
1888    def test_threaded_weak_key_dict_copy(self):
1889        # Issue #35615: Weakref keys or values getting GC'ed during dict
1890        # copying should not result in a crash.
1891        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
1892
1893    def test_threaded_weak_key_dict_deepcopy(self):
1894        # Issue #35615: Weakref keys or values getting GC'ed during dict
1895        # copying should not result in a crash.
1896        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
1897
1898    def test_threaded_weak_value_dict_copy(self):
1899        # Issue #35615: Weakref keys or values getting GC'ed during dict
1900        # copying should not result in a crash.
1901        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
1902
1903    def test_threaded_weak_value_dict_deepcopy(self):
1904        # Issue #35615: Weakref keys or values getting GC'ed during dict
1905        # copying should not result in a crash.
1906        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
1907
1908    @support.cpython_only
1909    def test_remove_closure(self):
1910        d = weakref.WeakValueDictionary()
1911        self.assertIsNone(d._remove.__closure__)
1912
1913
1914from test import mapping_tests
1915
1916class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1917    """Check that WeakValueDictionary conforms to the mapping protocol"""
1918    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1919    type2test = weakref.WeakValueDictionary
1920    def _reference(self):
1921        return self.__ref.copy()
1922
1923class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1924    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1925    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1926    type2test = weakref.WeakKeyDictionary
1927    def _reference(self):
1928        return self.__ref.copy()
1929
1930
1931class FinalizeTestCase(unittest.TestCase):
1932
1933    class A:
1934        pass
1935
1936    def _collect_if_necessary(self):
1937        # we create no ref-cycles so in CPython no gc should be needed
1938        if sys.implementation.name != 'cpython':
1939            support.gc_collect()
1940
1941    def test_finalize(self):
1942        def add(x,y,z):
1943            res.append(x + y + z)
1944            return x + y + z
1945
1946        a = self.A()
1947
1948        res = []
1949        f = weakref.finalize(a, add, 67, 43, z=89)
1950        self.assertEqual(f.alive, True)
1951        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
1952        self.assertEqual(f(), 199)
1953        self.assertEqual(f(), None)
1954        self.assertEqual(f(), None)
1955        self.assertEqual(f.peek(), None)
1956        self.assertEqual(f.detach(), None)
1957        self.assertEqual(f.alive, False)
1958        self.assertEqual(res, [199])
1959
1960        res = []
1961        f = weakref.finalize(a, add, 67, 43, 89)
1962        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
1963        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
1964        self.assertEqual(f(), None)
1965        self.assertEqual(f(), None)
1966        self.assertEqual(f.peek(), None)
1967        self.assertEqual(f.detach(), None)
1968        self.assertEqual(f.alive, False)
1969        self.assertEqual(res, [])
1970
1971        res = []
1972        f = weakref.finalize(a, add, x=67, y=43, z=89)
1973        del a
1974        self._collect_if_necessary()
1975        self.assertEqual(f(), None)
1976        self.assertEqual(f(), None)
1977        self.assertEqual(f.peek(), None)
1978        self.assertEqual(f.detach(), None)
1979        self.assertEqual(f.alive, False)
1980        self.assertEqual(res, [199])
1981
1982    def test_arg_errors(self):
1983        def fin(*args, **kwargs):
1984            res.append((args, kwargs))
1985
1986        a = self.A()
1987
1988        res = []
1989        f = weakref.finalize(a, fin, 1, 2, func=3, obj=4)
1990        self.assertEqual(f.peek(), (a, fin, (1, 2), {'func': 3, 'obj': 4}))
1991        f()
1992        self.assertEqual(res, [((1, 2), {'func': 3, 'obj': 4})])
1993
1994        with self.assertRaises(TypeError):
1995            weakref.finalize(a, func=fin, arg=1)
1996        with self.assertRaises(TypeError):
1997            weakref.finalize(obj=a, func=fin, arg=1)
1998        self.assertRaises(TypeError, weakref.finalize, a)
1999        self.assertRaises(TypeError, weakref.finalize)
2000
2001    def test_order(self):
2002        a = self.A()
2003        res = []
2004
2005        f1 = weakref.finalize(a, res.append, 'f1')
2006        f2 = weakref.finalize(a, res.append, 'f2')
2007        f3 = weakref.finalize(a, res.append, 'f3')
2008        f4 = weakref.finalize(a, res.append, 'f4')
2009        f5 = weakref.finalize(a, res.append, 'f5')
2010
2011        # make sure finalizers can keep themselves alive
2012        del f1, f4
2013
2014        self.assertTrue(f2.alive)
2015        self.assertTrue(f3.alive)
2016        self.assertTrue(f5.alive)
2017
2018        self.assertTrue(f5.detach())
2019        self.assertFalse(f5.alive)
2020
2021        f5()                       # nothing because previously unregistered
2022        res.append('A')
2023        f3()                       # => res.append('f3')
2024        self.assertFalse(f3.alive)
2025        res.append('B')
2026        f3()                       # nothing because previously called
2027        res.append('C')
2028        del a
2029        self._collect_if_necessary()
2030                                   # => res.append('f4')
2031                                   # => res.append('f2')
2032                                   # => res.append('f1')
2033        self.assertFalse(f2.alive)
2034        res.append('D')
2035        f2()                       # nothing because previously called by gc
2036
2037        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
2038        self.assertEqual(res, expected)
2039
2040    def test_all_freed(self):
2041        # we want a weakrefable subclass of weakref.finalize
2042        class MyFinalizer(weakref.finalize):
2043            pass
2044
2045        a = self.A()
2046        res = []
2047        def callback():
2048            res.append(123)
2049        f = MyFinalizer(a, callback)
2050
2051        wr_callback = weakref.ref(callback)
2052        wr_f = weakref.ref(f)
2053        del callback, f
2054
2055        self.assertIsNotNone(wr_callback())
2056        self.assertIsNotNone(wr_f())
2057
2058        del a
2059        self._collect_if_necessary()
2060
2061        self.assertIsNone(wr_callback())
2062        self.assertIsNone(wr_f())
2063        self.assertEqual(res, [123])
2064
2065    @classmethod
2066    def run_in_child(cls):
2067        def error():
2068            # Create an atexit finalizer from inside a finalizer called
2069            # at exit.  This should be the next to be run.
2070            g1 = weakref.finalize(cls, print, 'g1')
2071            print('f3 error')
2072            1/0
2073
2074        # cls should stay alive till atexit callbacks run
2075        f1 = weakref.finalize(cls, print, 'f1', _global_var)
2076        f2 = weakref.finalize(cls, print, 'f2', _global_var)
2077        f3 = weakref.finalize(cls, error)
2078        f4 = weakref.finalize(cls, print, 'f4', _global_var)
2079
2080        assert f1.atexit == True
2081        f2.atexit = False
2082        assert f3.atexit == True
2083        assert f4.atexit == True
2084
2085    def test_atexit(self):
2086        prog = ('from test.test_weakref import FinalizeTestCase;'+
2087                'FinalizeTestCase.run_in_child()')
2088        rc, out, err = script_helper.assert_python_ok('-c', prog)
2089        out = out.decode('ascii').splitlines()
2090        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
2091        self.assertTrue(b'ZeroDivisionError' in err)
2092
2093
2094libreftest = """ Doctest for examples in the library reference: weakref.rst
2095
2096>>> import weakref
2097>>> class Dict(dict):
2098...     pass
2099...
2100>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
2101>>> r = weakref.ref(obj)
2102>>> print(r() is obj)
2103True
2104
2105>>> import weakref
2106>>> class Object:
2107...     pass
2108...
2109>>> o = Object()
2110>>> r = weakref.ref(o)
2111>>> o2 = r()
2112>>> o is o2
2113True
2114>>> del o, o2
2115>>> print(r())
2116None
2117
2118>>> import weakref
2119>>> class ExtendedRef(weakref.ref):
2120...     def __init__(self, ob, callback=None, **annotations):
2121...         super().__init__(ob, callback)
2122...         self.__counter = 0
2123...         for k, v in annotations.items():
2124...             setattr(self, k, v)
2125...     def __call__(self):
2126...         '''Return a pair containing the referent and the number of
2127...         times the reference has been called.
2128...         '''
2129...         ob = super().__call__()
2130...         if ob is not None:
2131...             self.__counter += 1
2132...             ob = (ob, self.__counter)
2133...         return ob
2134...
2135>>> class A:   # not in docs from here, just testing the ExtendedRef
2136...     pass
2137...
2138>>> a = A()
2139>>> r = ExtendedRef(a, foo=1, bar="baz")
2140>>> r.foo
21411
2142>>> r.bar
2143'baz'
2144>>> r()[1]
21451
2146>>> r()[1]
21472
2148>>> r()[0] is a
2149True
2150
2151
2152>>> import weakref
2153>>> _id2obj_dict = weakref.WeakValueDictionary()
2154>>> def remember(obj):
2155...     oid = id(obj)
2156...     _id2obj_dict[oid] = obj
2157...     return oid
2158...
2159>>> def id2obj(oid):
2160...     return _id2obj_dict[oid]
2161...
2162>>> a = A()             # from here, just testing
2163>>> a_id = remember(a)
2164>>> id2obj(a_id) is a
2165True
2166>>> del a
2167>>> try:
2168...     id2obj(a_id)
2169... except KeyError:
2170...     print('OK')
2171... else:
2172...     print('WeakValueDictionary error')
2173OK
2174
2175"""
2176
2177__test__ = {'libreftest' : libreftest}
2178
2179def test_main():
2180    support.run_unittest(
2181        ReferencesTestCase,
2182        WeakMethodTestCase,
2183        MappingTestCase,
2184        WeakValueDictionaryTestCase,
2185        WeakKeyDictionaryTestCase,
2186        SubclassableWeakrefTestCase,
2187        FinalizeTestCase,
2188        )
2189    support.run_doctest(sys.modules[__name__])
2190
2191
2192if __name__ == "__main__":
2193    test_main()
2194