• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import sys
3import unittest
4import collections
5import weakref
6import operator
7import contextlib
8import copy
9import threading
10import time
11import random
12
13from test import support
14from test.support import script_helper
15
16# Used in ReferencesTestCase.test_ref_created_during_del() .
17ref_from_del = None
18
19# Used by FinalizeTestCase as a global that may be replaced by None
20# when the interpreter shuts down.
21_global_var = 'foobar'
22
23class C:
24    def method(self):
25        pass
26
27
28class Callable:
29    bar = None
30
31    def __call__(self, x):
32        self.bar = x
33
34
35def create_function():
36    def f(): pass
37    return f
38
39def create_bound_method():
40    return C().method
41
42
43class Object:
44    def __init__(self, arg):
45        self.arg = arg
46    def __repr__(self):
47        return "<Object %r>" % self.arg
48    def __eq__(self, other):
49        if isinstance(other, Object):
50            return self.arg == other.arg
51        return NotImplemented
52    def __lt__(self, other):
53        if isinstance(other, Object):
54            return self.arg < other.arg
55        return NotImplemented
56    def __hash__(self):
57        return hash(self.arg)
58    def some_method(self):
59        return 4
60    def other_method(self):
61        return 5
62
63
64class RefCycle:
65    def __init__(self):
66        self.cycle = self
67
68
69class TestBase(unittest.TestCase):
70
71    def setUp(self):
72        self.cbcalled = 0
73
74    def callback(self, ref):
75        self.cbcalled += 1
76
77
78@contextlib.contextmanager
79def collect_in_thread(period=0.0001):
80    """
81    Ensure GC collections happen in a different thread, at a high frequency.
82    """
83    please_stop = False
84
85    def collect():
86        while not please_stop:
87            time.sleep(period)
88            gc.collect()
89
90    with support.disable_gc():
91        t = threading.Thread(target=collect)
92        t.start()
93        try:
94            yield
95        finally:
96            please_stop = True
97            t.join()
98
99
100class ReferencesTestCase(TestBase):
101
102    def test_basic_ref(self):
103        self.check_basic_ref(C)
104        self.check_basic_ref(create_function)
105        self.check_basic_ref(create_bound_method)
106
107        # Just make sure the tp_repr handler doesn't raise an exception.
108        # Live reference:
109        o = C()
110        wr = weakref.ref(o)
111        repr(wr)
112        # Dead reference:
113        del o
114        repr(wr)
115
116    def test_basic_callback(self):
117        self.check_basic_callback(C)
118        self.check_basic_callback(create_function)
119        self.check_basic_callback(create_bound_method)
120
121    @support.cpython_only
122    def test_cfunction(self):
123        import _testcapi
124        create_cfunction = _testcapi.create_cfunction
125        f = create_cfunction()
126        wr = weakref.ref(f)
127        self.assertIs(wr(), f)
128        del f
129        self.assertIsNone(wr())
130        self.check_basic_ref(create_cfunction)
131        self.check_basic_callback(create_cfunction)
132
133    def test_multiple_callbacks(self):
134        o = C()
135        ref1 = weakref.ref(o, self.callback)
136        ref2 = weakref.ref(o, self.callback)
137        del o
138        self.assertIsNone(ref1(), "expected reference to be invalidated")
139        self.assertIsNone(ref2(), "expected reference to be invalidated")
140        self.assertEqual(self.cbcalled, 2,
141                     "callback not called the right number of times")
142
143    def test_multiple_selfref_callbacks(self):
144        # Make sure all references are invalidated before callbacks are called
145        #
146        # What's important here is that we're using the first
147        # reference in the callback invoked on the second reference
148        # (the most recently created ref is cleaned up first).  This
149        # tests that all references to the object are invalidated
150        # before any of the callbacks are invoked, so that we only
151        # have one invocation of _weakref.c:cleanup_helper() active
152        # for a particular object at a time.
153        #
154        def callback(object, self=self):
155            self.ref()
156        c = C()
157        self.ref = weakref.ref(c, callback)
158        ref1 = weakref.ref(c, callback)
159        del c
160
161    def test_constructor_kwargs(self):
162        c = C()
163        self.assertRaises(TypeError, weakref.ref, c, callback=None)
164
165    def test_proxy_ref(self):
166        o = C()
167        o.bar = 1
168        ref1 = weakref.proxy(o, self.callback)
169        ref2 = weakref.proxy(o, self.callback)
170        del o
171
172        def check(proxy):
173            proxy.bar
174
175        self.assertRaises(ReferenceError, check, ref1)
176        self.assertRaises(ReferenceError, check, ref2)
177        self.assertRaises(ReferenceError, bool, weakref.proxy(C()))
178        self.assertEqual(self.cbcalled, 2)
179
180    def check_basic_ref(self, factory):
181        o = factory()
182        ref = weakref.ref(o)
183        self.assertIsNotNone(ref(),
184                     "weak reference to live object should be live")
185        o2 = ref()
186        self.assertIs(o, o2,
187                     "<ref>() should return original object if live")
188
189    def check_basic_callback(self, factory):
190        self.cbcalled = 0
191        o = factory()
192        ref = weakref.ref(o, self.callback)
193        del o
194        self.assertEqual(self.cbcalled, 1,
195                     "callback did not properly set 'cbcalled'")
196        self.assertIsNone(ref(),
197                     "ref2 should be dead after deleting object reference")
198
199    def test_ref_reuse(self):
200        o = C()
201        ref1 = weakref.ref(o)
202        # create a proxy to make sure that there's an intervening creation
203        # between these two; it should make no difference
204        proxy = weakref.proxy(o)
205        ref2 = weakref.ref(o)
206        self.assertIs(ref1, ref2,
207                     "reference object w/out callback should be re-used")
208
209        o = C()
210        proxy = weakref.proxy(o)
211        ref1 = weakref.ref(o)
212        ref2 = weakref.ref(o)
213        self.assertIs(ref1, ref2,
214                     "reference object w/out callback should be re-used")
215        self.assertEqual(weakref.getweakrefcount(o), 2,
216                     "wrong weak ref count for object")
217        del proxy
218        self.assertEqual(weakref.getweakrefcount(o), 1,
219                     "wrong weak ref count for object after deleting proxy")
220
221    def test_proxy_reuse(self):
222        o = C()
223        proxy1 = weakref.proxy(o)
224        ref = weakref.ref(o)
225        proxy2 = weakref.proxy(o)
226        self.assertIs(proxy1, proxy2,
227                     "proxy object w/out callback should have been re-used")
228
229    def test_basic_proxy(self):
230        o = C()
231        self.check_proxy(o, weakref.proxy(o))
232
233        L = collections.UserList()
234        p = weakref.proxy(L)
235        self.assertFalse(p, "proxy for empty UserList should be false")
236        p.append(12)
237        self.assertEqual(len(L), 1)
238        self.assertTrue(p, "proxy for non-empty UserList should be true")
239        p[:] = [2, 3]
240        self.assertEqual(len(L), 2)
241        self.assertEqual(len(p), 2)
242        self.assertIn(3, p, "proxy didn't support __contains__() properly")
243        p[1] = 5
244        self.assertEqual(L[1], 5)
245        self.assertEqual(p[1], 5)
246        L2 = collections.UserList(L)
247        p2 = weakref.proxy(L2)
248        self.assertEqual(p, p2)
249        ## self.assertEqual(repr(L2), repr(p2))
250        L3 = collections.UserList(range(10))
251        p3 = weakref.proxy(L3)
252        self.assertEqual(L3[:], p3[:])
253        self.assertEqual(L3[5:], p3[5:])
254        self.assertEqual(L3[:5], p3[:5])
255        self.assertEqual(L3[2:5], p3[2:5])
256
257    def test_proxy_unicode(self):
258        # See bug 5037
259        class C(object):
260            def __str__(self):
261                return "string"
262            def __bytes__(self):
263                return b"bytes"
264        instance = C()
265        self.assertIn("__bytes__", dir(weakref.proxy(instance)))
266        self.assertEqual(bytes(weakref.proxy(instance)), b"bytes")
267
268    def test_proxy_index(self):
269        class C:
270            def __index__(self):
271                return 10
272        o = C()
273        p = weakref.proxy(o)
274        self.assertEqual(operator.index(p), 10)
275
276    def test_proxy_div(self):
277        class C:
278            def __floordiv__(self, other):
279                return 42
280            def __ifloordiv__(self, other):
281                return 21
282        o = C()
283        p = weakref.proxy(o)
284        self.assertEqual(p // 5, 42)
285        p //= 5
286        self.assertEqual(p, 21)
287
288    # The PyWeakref_* C API is documented as allowing either NULL or
289    # None as the value for the callback, where either means "no
290    # callback".  The "no callback" ref and proxy objects are supposed
291    # to be shared so long as they exist by all callers so long as
292    # they are active.  In Python 2.3.3 and earlier, this guarantee
293    # was not honored, and was broken in different ways for
294    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
295
296    def test_shared_ref_without_callback(self):
297        self.check_shared_without_callback(weakref.ref)
298
299    def test_shared_proxy_without_callback(self):
300        self.check_shared_without_callback(weakref.proxy)
301
302    def check_shared_without_callback(self, makeref):
303        o = Object(1)
304        p1 = makeref(o, None)
305        p2 = makeref(o, None)
306        self.assertIs(p1, p2, "both callbacks were None in the C API")
307        del p1, p2
308        p1 = makeref(o)
309        p2 = makeref(o, None)
310        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
311        del p1, p2
312        p1 = makeref(o)
313        p2 = makeref(o)
314        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
315        del p1, p2
316        p1 = makeref(o, None)
317        p2 = makeref(o)
318        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
319
320    def test_callable_proxy(self):
321        o = Callable()
322        ref1 = weakref.proxy(o)
323
324        self.check_proxy(o, ref1)
325
326        self.assertIs(type(ref1), weakref.CallableProxyType,
327                     "proxy is not of callable type")
328        ref1('twinkies!')
329        self.assertEqual(o.bar, 'twinkies!',
330                     "call through proxy not passed through to original")
331        ref1(x='Splat.')
332        self.assertEqual(o.bar, 'Splat.',
333                     "call through proxy not passed through to original")
334
335        # expect due to too few args
336        self.assertRaises(TypeError, ref1)
337
338        # expect due to too many args
339        self.assertRaises(TypeError, ref1, 1, 2, 3)
340
341    def check_proxy(self, o, proxy):
342        o.foo = 1
343        self.assertEqual(proxy.foo, 1,
344                     "proxy does not reflect attribute addition")
345        o.foo = 2
346        self.assertEqual(proxy.foo, 2,
347                     "proxy does not reflect attribute modification")
348        del o.foo
349        self.assertFalse(hasattr(proxy, 'foo'),
350                     "proxy does not reflect attribute removal")
351
352        proxy.foo = 1
353        self.assertEqual(o.foo, 1,
354                     "object does not reflect attribute addition via proxy")
355        proxy.foo = 2
356        self.assertEqual(o.foo, 2,
357            "object does not reflect attribute modification via proxy")
358        del proxy.foo
359        self.assertFalse(hasattr(o, 'foo'),
360                     "object does not reflect attribute removal via proxy")
361
362    def test_proxy_deletion(self):
363        # Test clearing of SF bug #762891
364        class Foo:
365            result = None
366            def __delitem__(self, accessor):
367                self.result = accessor
368        g = Foo()
369        f = weakref.proxy(g)
370        del f[0]
371        self.assertEqual(f.result, 0)
372
373    def test_proxy_bool(self):
374        # Test clearing of SF bug #1170766
375        class List(list): pass
376        lyst = List()
377        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
378
379    def test_getweakrefcount(self):
380        o = C()
381        ref1 = weakref.ref(o)
382        ref2 = weakref.ref(o, self.callback)
383        self.assertEqual(weakref.getweakrefcount(o), 2,
384                     "got wrong number of weak reference objects")
385
386        proxy1 = weakref.proxy(o)
387        proxy2 = weakref.proxy(o, self.callback)
388        self.assertEqual(weakref.getweakrefcount(o), 4,
389                     "got wrong number of weak reference objects")
390
391        del ref1, ref2, proxy1, proxy2
392        self.assertEqual(weakref.getweakrefcount(o), 0,
393                     "weak reference objects not unlinked from"
394                     " referent when discarded.")
395
396        # assumes ints do not support weakrefs
397        self.assertEqual(weakref.getweakrefcount(1), 0,
398                     "got wrong number of weak reference objects for int")
399
400    def test_getweakrefs(self):
401        o = C()
402        ref1 = weakref.ref(o, self.callback)
403        ref2 = weakref.ref(o, self.callback)
404        del ref1
405        self.assertEqual(weakref.getweakrefs(o), [ref2],
406                     "list of refs does not match")
407
408        o = C()
409        ref1 = weakref.ref(o, self.callback)
410        ref2 = weakref.ref(o, self.callback)
411        del ref2
412        self.assertEqual(weakref.getweakrefs(o), [ref1],
413                     "list of refs does not match")
414
415        del ref1
416        self.assertEqual(weakref.getweakrefs(o), [],
417                     "list of refs not cleared")
418
419        # assumes ints do not support weakrefs
420        self.assertEqual(weakref.getweakrefs(1), [],
421                     "list of refs does not match for int")
422
423    def test_newstyle_number_ops(self):
424        class F(float):
425            pass
426        f = F(2.0)
427        p = weakref.proxy(f)
428        self.assertEqual(p + 1.0, 3.0)
429        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
430
431    def test_callbacks_protected(self):
432        # Callbacks protected from already-set exceptions?
433        # Regression test for SF bug #478534.
434        class BogusError(Exception):
435            pass
436        data = {}
437        def remove(k):
438            del data[k]
439        def encapsulate():
440            f = lambda : ()
441            data[weakref.ref(f, remove)] = None
442            raise BogusError
443        try:
444            encapsulate()
445        except BogusError:
446            pass
447        else:
448            self.fail("exception not properly restored")
449        try:
450            encapsulate()
451        except BogusError:
452            pass
453        else:
454            self.fail("exception not properly restored")
455
456    def test_sf_bug_840829(self):
457        # "weakref callbacks and gc corrupt memory"
458        # subtype_dealloc erroneously exposed a new-style instance
459        # already in the process of getting deallocated to gc,
460        # causing double-deallocation if the instance had a weakref
461        # callback that triggered gc.
462        # If the bug exists, there probably won't be an obvious symptom
463        # in a release build.  In a debug build, a segfault will occur
464        # when the second attempt to remove the instance from the "list
465        # of all objects" occurs.
466
467        import gc
468
469        class C(object):
470            pass
471
472        c = C()
473        wr = weakref.ref(c, lambda ignore: gc.collect())
474        del c
475
476        # There endeth the first part.  It gets worse.
477        del wr
478
479        c1 = C()
480        c1.i = C()
481        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
482
483        c2 = C()
484        c2.c1 = c1
485        del c1  # still alive because c2 points to it
486
487        # Now when subtype_dealloc gets called on c2, it's not enough just
488        # that c2 is immune from gc while the weakref callbacks associated
489        # with c2 execute (there are none in this 2nd half of the test, btw).
490        # subtype_dealloc goes on to call the base classes' deallocs too,
491        # so any gc triggered by weakref callbacks associated with anything
492        # torn down by a base class dealloc can also trigger double
493        # deallocation of c2.
494        del c2
495
496    def test_callback_in_cycle_1(self):
497        import gc
498
499        class J(object):
500            pass
501
502        class II(object):
503            def acallback(self, ignore):
504                self.J
505
506        I = II()
507        I.J = J
508        I.wr = weakref.ref(J, I.acallback)
509
510        # Now J and II are each in a self-cycle (as all new-style class
511        # objects are, since their __mro__ points back to them).  I holds
512        # both a weak reference (I.wr) and a strong reference (I.J) to class
513        # J.  I is also in a cycle (I.wr points to a weakref that references
514        # I.acallback).  When we del these three, they all become trash, but
515        # the cycles prevent any of them from getting cleaned up immediately.
516        # Instead they have to wait for cyclic gc to deduce that they're
517        # trash.
518        #
519        # gc used to call tp_clear on all of them, and the order in which
520        # it does that is pretty accidental.  The exact order in which we
521        # built up these things manages to provoke gc into running tp_clear
522        # in just the right order (I last).  Calling tp_clear on II leaves
523        # behind an insane class object (its __mro__ becomes NULL).  Calling
524        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
525        # just then because of the strong reference from I.J.  Calling
526        # tp_clear on I starts to clear I's __dict__, and just happens to
527        # clear I.J first -- I.wr is still intact.  That removes the last
528        # reference to J, which triggers the weakref callback.  The callback
529        # tries to do "self.J", and instances of new-style classes look up
530        # attributes ("J") in the class dict first.  The class (II) wants to
531        # search II.__mro__, but that's NULL.   The result was a segfault in
532        # a release build, and an assert failure in a debug build.
533        del I, J, II
534        gc.collect()
535
536    def test_callback_in_cycle_2(self):
537        import gc
538
539        # This is just like test_callback_in_cycle_1, except that II is an
540        # old-style class.  The symptom is different then:  an instance of an
541        # old-style class looks in its own __dict__ first.  'J' happens to
542        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
543        # __dict__, so the attribute isn't found.  The difference is that
544        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
545        # __mro__), so no segfault occurs.  Instead it got:
546        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
547        #    Exception exceptions.AttributeError:
548        #   "II instance has no attribute 'J'" in <bound method II.acallback
549        #       of <?.II instance at 0x00B9B4B8>> ignored
550
551        class J(object):
552            pass
553
554        class II:
555            def acallback(self, ignore):
556                self.J
557
558        I = II()
559        I.J = J
560        I.wr = weakref.ref(J, I.acallback)
561
562        del I, J, II
563        gc.collect()
564
565    def test_callback_in_cycle_3(self):
566        import gc
567
568        # This one broke the first patch that fixed the last two.  In this
569        # case, the objects reachable from the callback aren't also reachable
570        # from the object (c1) *triggering* the callback:  you can get to
571        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
572        # got tp_clear'ed by the time the c2.cb callback got invoked.
573
574        class C:
575            def cb(self, ignore):
576                self.me
577                self.c1
578                self.wr
579
580        c1, c2 = C(), C()
581
582        c2.me = c2
583        c2.c1 = c1
584        c2.wr = weakref.ref(c1, c2.cb)
585
586        del c1, c2
587        gc.collect()
588
589    def test_callback_in_cycle_4(self):
590        import gc
591
592        # Like test_callback_in_cycle_3, except c2 and c1 have different
593        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
594        # objects reachable from the dying object (c1) isn't enough to stop
595        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
596        # The result was a segfault (C.__mro__ was NULL when the callback
597        # tried to look up self.me).
598
599        class C(object):
600            def cb(self, ignore):
601                self.me
602                self.c1
603                self.wr
604
605        class D:
606            pass
607
608        c1, c2 = D(), C()
609
610        c2.me = c2
611        c2.c1 = c1
612        c2.wr = weakref.ref(c1, c2.cb)
613
614        del c1, c2, C, D
615        gc.collect()
616
617    @support.requires_type_collecting
618    def test_callback_in_cycle_resurrection(self):
619        import gc
620
621        # Do something nasty in a weakref callback:  resurrect objects
622        # from dead cycles.  For this to be attempted, the weakref and
623        # its callback must also be part of the cyclic trash (else the
624        # objects reachable via the callback couldn't be in cyclic trash
625        # to begin with -- the callback would act like an external root).
626        # But gc clears trash weakrefs with callbacks early now, which
627        # disables the callbacks, so the callbacks shouldn't get called
628        # at all (and so nothing actually gets resurrected).
629
630        alist = []
631        class C(object):
632            def __init__(self, value):
633                self.attribute = value
634
635            def acallback(self, ignore):
636                alist.append(self.c)
637
638        c1, c2 = C(1), C(2)
639        c1.c = c2
640        c2.c = c1
641        c1.wr = weakref.ref(c2, c1.acallback)
642        c2.wr = weakref.ref(c1, c2.acallback)
643
644        def C_went_away(ignore):
645            alist.append("C went away")
646        wr = weakref.ref(C, C_went_away)
647
648        del c1, c2, C   # make them all trash
649        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
650
651        gc.collect()
652        # c1.wr and c2.wr were part of the cyclic trash, so should have
653        # been cleared without their callbacks executing.  OTOH, the weakref
654        # to C is bound to a function local (wr), and wasn't trash, so that
655        # callback should have been invoked when C went away.
656        self.assertEqual(alist, ["C went away"])
657        # The remaining weakref should be dead now (its callback ran).
658        self.assertEqual(wr(), None)
659
660        del alist[:]
661        gc.collect()
662        self.assertEqual(alist, [])
663
664    def test_callbacks_on_callback(self):
665        import gc
666
667        # Set up weakref callbacks *on* weakref callbacks.
668        alist = []
669        def safe_callback(ignore):
670            alist.append("safe_callback called")
671
672        class C(object):
673            def cb(self, ignore):
674                alist.append("cb called")
675
676        c, d = C(), C()
677        c.other = d
678        d.other = c
679        callback = c.cb
680        c.wr = weakref.ref(d, callback)     # this won't trigger
681        d.wr = weakref.ref(callback, d.cb)  # ditto
682        external_wr = weakref.ref(callback, safe_callback)  # but this will
683        self.assertIs(external_wr(), callback)
684
685        # The weakrefs attached to c and d should get cleared, so that
686        # C.cb is never called.  But external_wr isn't part of the cyclic
687        # trash, and no cyclic trash is reachable from it, so safe_callback
688        # should get invoked when the bound method object callback (c.cb)
689        # -- which is itself a callback, and also part of the cyclic trash --
690        # gets reclaimed at the end of gc.
691
692        del callback, c, d, C
693        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
694        gc.collect()
695        self.assertEqual(alist, ["safe_callback called"])
696        self.assertEqual(external_wr(), None)
697
698        del alist[:]
699        gc.collect()
700        self.assertEqual(alist, [])
701
702    def test_gc_during_ref_creation(self):
703        self.check_gc_during_creation(weakref.ref)
704
705    def test_gc_during_proxy_creation(self):
706        self.check_gc_during_creation(weakref.proxy)
707
708    def check_gc_during_creation(self, makeref):
709        thresholds = gc.get_threshold()
710        gc.set_threshold(1, 1, 1)
711        gc.collect()
712        class A:
713            pass
714
715        def callback(*args):
716            pass
717
718        referenced = A()
719
720        a = A()
721        a.a = a
722        a.wr = makeref(referenced)
723
724        try:
725            # now make sure the object and the ref get labeled as
726            # cyclic trash:
727            a = A()
728            weakref.ref(referenced, callback)
729
730        finally:
731            gc.set_threshold(*thresholds)
732
733    def test_ref_created_during_del(self):
734        # Bug #1377858
735        # A weakref created in an object's __del__() would crash the
736        # interpreter when the weakref was cleaned up since it would refer to
737        # non-existent memory.  This test should not segfault the interpreter.
738        class Target(object):
739            def __del__(self):
740                global ref_from_del
741                ref_from_del = weakref.ref(self)
742
743        w = Target()
744
745    def test_init(self):
746        # Issue 3634
747        # <weakref to class>.__init__() doesn't check errors correctly
748        r = weakref.ref(Exception)
749        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
750        # No exception should be raised here
751        gc.collect()
752
753    def test_classes(self):
754        # Check that classes are weakrefable.
755        class A(object):
756            pass
757        l = []
758        weakref.ref(int)
759        a = weakref.ref(A, l.append)
760        A = None
761        gc.collect()
762        self.assertEqual(a(), None)
763        self.assertEqual(l, [a])
764
765    def test_equality(self):
766        # Alive weakrefs defer equality testing to their underlying object.
767        x = Object(1)
768        y = Object(1)
769        z = Object(2)
770        a = weakref.ref(x)
771        b = weakref.ref(y)
772        c = weakref.ref(z)
773        d = weakref.ref(x)
774        # Note how we directly test the operators here, to stress both
775        # __eq__ and __ne__.
776        self.assertTrue(a == b)
777        self.assertFalse(a != b)
778        self.assertFalse(a == c)
779        self.assertTrue(a != c)
780        self.assertTrue(a == d)
781        self.assertFalse(a != d)
782        del x, y, z
783        gc.collect()
784        for r in a, b, c:
785            # Sanity check
786            self.assertIs(r(), None)
787        # Dead weakrefs compare by identity: whether `a` and `d` are the
788        # same weakref object is an implementation detail, since they pointed
789        # to the same original object and didn't have a callback.
790        # (see issue #16453).
791        self.assertFalse(a == b)
792        self.assertTrue(a != b)
793        self.assertFalse(a == c)
794        self.assertTrue(a != c)
795        self.assertEqual(a == d, a is d)
796        self.assertEqual(a != d, a is not d)
797
798    def test_ordering(self):
799        # weakrefs cannot be ordered, even if the underlying objects can.
800        ops = [operator.lt, operator.gt, operator.le, operator.ge]
801        x = Object(1)
802        y = Object(1)
803        a = weakref.ref(x)
804        b = weakref.ref(y)
805        for op in ops:
806            self.assertRaises(TypeError, op, a, b)
807        # Same when dead.
808        del x, y
809        gc.collect()
810        for op in ops:
811            self.assertRaises(TypeError, op, a, b)
812
813    def test_hashing(self):
814        # Alive weakrefs hash the same as the underlying object
815        x = Object(42)
816        y = Object(42)
817        a = weakref.ref(x)
818        b = weakref.ref(y)
819        self.assertEqual(hash(a), hash(42))
820        del x, y
821        gc.collect()
822        # Dead weakrefs:
823        # - retain their hash is they were hashed when alive;
824        # - otherwise, cannot be hashed.
825        self.assertEqual(hash(a), hash(42))
826        self.assertRaises(TypeError, hash, b)
827
828    def test_trashcan_16602(self):
829        # Issue #16602: when a weakref's target was part of a long
830        # deallocation chain, the trashcan mechanism could delay clearing
831        # of the weakref and make the target object visible from outside
832        # code even though its refcount had dropped to 0.  A crash ensued.
833        class C:
834            def __init__(self, parent):
835                if not parent:
836                    return
837                wself = weakref.ref(self)
838                def cb(wparent):
839                    o = wself()
840                self.wparent = weakref.ref(parent, cb)
841
842        d = weakref.WeakKeyDictionary()
843        root = c = C(None)
844        for n in range(100):
845            d[c] = c = C(c)
846        del root
847        gc.collect()
848
849    def test_callback_attribute(self):
850        x = Object(1)
851        callback = lambda ref: None
852        ref1 = weakref.ref(x, callback)
853        self.assertIs(ref1.__callback__, callback)
854
855        ref2 = weakref.ref(x)
856        self.assertIsNone(ref2.__callback__)
857
858    def test_callback_attribute_after_deletion(self):
859        x = Object(1)
860        ref = weakref.ref(x, self.callback)
861        self.assertIsNotNone(ref.__callback__)
862        del x
863        support.gc_collect()
864        self.assertIsNone(ref.__callback__)
865
866    def test_set_callback_attribute(self):
867        x = Object(1)
868        callback = lambda ref: None
869        ref1 = weakref.ref(x, callback)
870        with self.assertRaises(AttributeError):
871            ref1.__callback__ = lambda ref: None
872
873    def test_callback_gcs(self):
874        class ObjectWithDel(Object):
875            def __del__(self): pass
876        x = ObjectWithDel(1)
877        ref1 = weakref.ref(x, lambda ref: support.gc_collect())
878        del x
879        support.gc_collect()
880
881
882class SubclassableWeakrefTestCase(TestBase):
883
884    def test_subclass_refs(self):
885        class MyRef(weakref.ref):
886            def __init__(self, ob, callback=None, value=42):
887                self.value = value
888                super().__init__(ob, callback)
889            def __call__(self):
890                self.called = True
891                return super().__call__()
892        o = Object("foo")
893        mr = MyRef(o, value=24)
894        self.assertIs(mr(), o)
895        self.assertTrue(mr.called)
896        self.assertEqual(mr.value, 24)
897        del o
898        self.assertIsNone(mr())
899        self.assertTrue(mr.called)
900
901    def test_subclass_refs_dont_replace_standard_refs(self):
902        class MyRef(weakref.ref):
903            pass
904        o = Object(42)
905        r1 = MyRef(o)
906        r2 = weakref.ref(o)
907        self.assertIsNot(r1, r2)
908        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
909        self.assertEqual(weakref.getweakrefcount(o), 2)
910        r3 = MyRef(o)
911        self.assertEqual(weakref.getweakrefcount(o), 3)
912        refs = weakref.getweakrefs(o)
913        self.assertEqual(len(refs), 3)
914        self.assertIs(r2, refs[0])
915        self.assertIn(r1, refs[1:])
916        self.assertIn(r3, refs[1:])
917
918    def test_subclass_refs_dont_conflate_callbacks(self):
919        class MyRef(weakref.ref):
920            pass
921        o = Object(42)
922        r1 = MyRef(o, id)
923        r2 = MyRef(o, str)
924        self.assertIsNot(r1, r2)
925        refs = weakref.getweakrefs(o)
926        self.assertIn(r1, refs)
927        self.assertIn(r2, refs)
928
929    def test_subclass_refs_with_slots(self):
930        class MyRef(weakref.ref):
931            __slots__ = "slot1", "slot2"
932            def __new__(type, ob, callback, slot1, slot2):
933                return weakref.ref.__new__(type, ob, callback)
934            def __init__(self, ob, callback, slot1, slot2):
935                self.slot1 = slot1
936                self.slot2 = slot2
937            def meth(self):
938                return self.slot1 + self.slot2
939        o = Object(42)
940        r = MyRef(o, None, "abc", "def")
941        self.assertEqual(r.slot1, "abc")
942        self.assertEqual(r.slot2, "def")
943        self.assertEqual(r.meth(), "abcdef")
944        self.assertFalse(hasattr(r, "__dict__"))
945
946    def test_subclass_refs_with_cycle(self):
947        """Confirm https://bugs.python.org/issue3100 is fixed."""
948        # An instance of a weakref subclass can have attributes.
949        # If such a weakref holds the only strong reference to the object,
950        # deleting the weakref will delete the object. In this case,
951        # the callback must not be called, because the ref object is
952        # being deleted.
953        class MyRef(weakref.ref):
954            pass
955
956        # Use a local callback, for "regrtest -R::"
957        # to detect refcounting problems
958        def callback(w):
959            self.cbcalled += 1
960
961        o = C()
962        r1 = MyRef(o, callback)
963        r1.o = o
964        del o
965
966        del r1 # Used to crash here
967
968        self.assertEqual(self.cbcalled, 0)
969
970        # Same test, with two weakrefs to the same object
971        # (since code paths are different)
972        o = C()
973        r1 = MyRef(o, callback)
974        r2 = MyRef(o, callback)
975        r1.r = r2
976        r2.o = o
977        del o
978        del r2
979
980        del r1 # Used to crash here
981
982        self.assertEqual(self.cbcalled, 0)
983
984
985class WeakMethodTestCase(unittest.TestCase):
986
987    def _subclass(self):
988        """Return an Object subclass overriding `some_method`."""
989        class C(Object):
990            def some_method(self):
991                return 6
992        return C
993
994    def test_alive(self):
995        o = Object(1)
996        r = weakref.WeakMethod(o.some_method)
997        self.assertIsInstance(r, weakref.ReferenceType)
998        self.assertIsInstance(r(), type(o.some_method))
999        self.assertIs(r().__self__, o)
1000        self.assertIs(r().__func__, o.some_method.__func__)
1001        self.assertEqual(r()(), 4)
1002
1003    def test_object_dead(self):
1004        o = Object(1)
1005        r = weakref.WeakMethod(o.some_method)
1006        del o
1007        gc.collect()
1008        self.assertIs(r(), None)
1009
1010    def test_method_dead(self):
1011        C = self._subclass()
1012        o = C(1)
1013        r = weakref.WeakMethod(o.some_method)
1014        del C.some_method
1015        gc.collect()
1016        self.assertIs(r(), None)
1017
1018    def test_callback_when_object_dead(self):
1019        # Test callback behaviour when object dies first.
1020        C = self._subclass()
1021        calls = []
1022        def cb(arg):
1023            calls.append(arg)
1024        o = C(1)
1025        r = weakref.WeakMethod(o.some_method, cb)
1026        del o
1027        gc.collect()
1028        self.assertEqual(calls, [r])
1029        # Callback is only called once.
1030        C.some_method = Object.some_method
1031        gc.collect()
1032        self.assertEqual(calls, [r])
1033
1034    def test_callback_when_method_dead(self):
1035        # Test callback behaviour when method dies first.
1036        C = self._subclass()
1037        calls = []
1038        def cb(arg):
1039            calls.append(arg)
1040        o = C(1)
1041        r = weakref.WeakMethod(o.some_method, cb)
1042        del C.some_method
1043        gc.collect()
1044        self.assertEqual(calls, [r])
1045        # Callback is only called once.
1046        del o
1047        gc.collect()
1048        self.assertEqual(calls, [r])
1049
1050    @support.cpython_only
1051    def test_no_cycles(self):
1052        # A WeakMethod doesn't create any reference cycle to itself.
1053        o = Object(1)
1054        def cb(_):
1055            pass
1056        r = weakref.WeakMethod(o.some_method, cb)
1057        wr = weakref.ref(r)
1058        del r
1059        self.assertIs(wr(), None)
1060
1061    def test_equality(self):
1062        def _eq(a, b):
1063            self.assertTrue(a == b)
1064            self.assertFalse(a != b)
1065        def _ne(a, b):
1066            self.assertTrue(a != b)
1067            self.assertFalse(a == b)
1068        x = Object(1)
1069        y = Object(1)
1070        a = weakref.WeakMethod(x.some_method)
1071        b = weakref.WeakMethod(y.some_method)
1072        c = weakref.WeakMethod(x.other_method)
1073        d = weakref.WeakMethod(y.other_method)
1074        # Objects equal, same method
1075        _eq(a, b)
1076        _eq(c, d)
1077        # Objects equal, different method
1078        _ne(a, c)
1079        _ne(a, d)
1080        _ne(b, c)
1081        _ne(b, d)
1082        # Objects unequal, same or different method
1083        z = Object(2)
1084        e = weakref.WeakMethod(z.some_method)
1085        f = weakref.WeakMethod(z.other_method)
1086        _ne(a, e)
1087        _ne(a, f)
1088        _ne(b, e)
1089        _ne(b, f)
1090        del x, y, z
1091        gc.collect()
1092        # Dead WeakMethods compare by identity
1093        refs = a, b, c, d, e, f
1094        for q in refs:
1095            for r in refs:
1096                self.assertEqual(q == r, q is r)
1097                self.assertEqual(q != r, q is not r)
1098
1099    def test_hashing(self):
1100        # Alive WeakMethods are hashable if the underlying object is
1101        # hashable.
1102        x = Object(1)
1103        y = Object(1)
1104        a = weakref.WeakMethod(x.some_method)
1105        b = weakref.WeakMethod(y.some_method)
1106        c = weakref.WeakMethod(y.other_method)
1107        # Since WeakMethod objects are equal, the hashes should be equal.
1108        self.assertEqual(hash(a), hash(b))
1109        ha = hash(a)
1110        # Dead WeakMethods retain their old hash value
1111        del x, y
1112        gc.collect()
1113        self.assertEqual(hash(a), ha)
1114        self.assertEqual(hash(b), ha)
1115        # If it wasn't hashed when alive, a dead WeakMethod cannot be hashed.
1116        self.assertRaises(TypeError, hash, c)
1117
1118
1119class MappingTestCase(TestBase):
1120
1121    COUNT = 10
1122
1123    def check_len_cycles(self, dict_type, cons):
1124        N = 20
1125        items = [RefCycle() for i in range(N)]
1126        dct = dict_type(cons(o) for o in items)
1127        # Keep an iterator alive
1128        it = dct.items()
1129        try:
1130            next(it)
1131        except StopIteration:
1132            pass
1133        del items
1134        gc.collect()
1135        n1 = len(dct)
1136        del it
1137        gc.collect()
1138        n2 = len(dct)
1139        # one item may be kept alive inside the iterator
1140        self.assertIn(n1, (0, 1))
1141        self.assertEqual(n2, 0)
1142
1143    def test_weak_keyed_len_cycles(self):
1144        self.check_len_cycles(weakref.WeakKeyDictionary, lambda k: (k, 1))
1145
1146    def test_weak_valued_len_cycles(self):
1147        self.check_len_cycles(weakref.WeakValueDictionary, lambda k: (1, k))
1148
1149    def check_len_race(self, dict_type, cons):
1150        # Extended sanity checks for len() in the face of cyclic collection
1151        self.addCleanup(gc.set_threshold, *gc.get_threshold())
1152        for th in range(1, 100):
1153            N = 20
1154            gc.collect(0)
1155            gc.set_threshold(th, th, th)
1156            items = [RefCycle() for i in range(N)]
1157            dct = dict_type(cons(o) for o in items)
1158            del items
1159            # All items will be collected at next garbage collection pass
1160            it = dct.items()
1161            try:
1162                next(it)
1163            except StopIteration:
1164                pass
1165            n1 = len(dct)
1166            del it
1167            n2 = len(dct)
1168            self.assertGreaterEqual(n1, 0)
1169            self.assertLessEqual(n1, N)
1170            self.assertGreaterEqual(n2, 0)
1171            self.assertLessEqual(n2, n1)
1172
1173    def test_weak_keyed_len_race(self):
1174        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
1175
1176    def test_weak_valued_len_race(self):
1177        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
1178
1179    def test_weak_values(self):
1180        #
1181        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
1182        #
1183        dict, objects = self.make_weak_valued_dict()
1184        for o in objects:
1185            self.assertEqual(weakref.getweakrefcount(o), 1)
1186            self.assertIs(o, dict[o.arg],
1187                         "wrong object returned by weak dict!")
1188        items1 = list(dict.items())
1189        items2 = list(dict.copy().items())
1190        items1.sort()
1191        items2.sort()
1192        self.assertEqual(items1, items2,
1193                     "cloning of weak-valued dictionary did not work!")
1194        del items1, items2
1195        self.assertEqual(len(dict), self.COUNT)
1196        del objects[0]
1197        self.assertEqual(len(dict), self.COUNT - 1,
1198                     "deleting object did not cause dictionary update")
1199        del objects, o
1200        self.assertEqual(len(dict), 0,
1201                     "deleting the values did not clear the dictionary")
1202        # regression on SF bug #447152:
1203        dict = weakref.WeakValueDictionary()
1204        self.assertRaises(KeyError, dict.__getitem__, 1)
1205        dict[2] = C()
1206        self.assertRaises(KeyError, dict.__getitem__, 2)
1207
1208    def test_weak_keys(self):
1209        #
1210        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1211        #  len(d), k in d.
1212        #
1213        dict, objects = self.make_weak_keyed_dict()
1214        for o in objects:
1215            self.assertEqual(weakref.getweakrefcount(o), 1,
1216                         "wrong number of weak references to %r!" % o)
1217            self.assertIs(o.arg, dict[o],
1218                         "wrong object returned by weak dict!")
1219        items1 = dict.items()
1220        items2 = dict.copy().items()
1221        self.assertEqual(set(items1), set(items2),
1222                     "cloning of weak-keyed dictionary did not work!")
1223        del items1, items2
1224        self.assertEqual(len(dict), self.COUNT)
1225        del objects[0]
1226        self.assertEqual(len(dict), (self.COUNT - 1),
1227                     "deleting object did not cause dictionary update")
1228        del objects, o
1229        self.assertEqual(len(dict), 0,
1230                     "deleting the keys did not clear the dictionary")
1231        o = Object(42)
1232        dict[o] = "What is the meaning of the universe?"
1233        self.assertIn(o, dict)
1234        self.assertNotIn(34, dict)
1235
1236    def test_weak_keyed_iters(self):
1237        dict, objects = self.make_weak_keyed_dict()
1238        self.check_iters(dict)
1239
1240        # Test keyrefs()
1241        refs = dict.keyrefs()
1242        self.assertEqual(len(refs), len(objects))
1243        objects2 = list(objects)
1244        for wr in refs:
1245            ob = wr()
1246            self.assertIn(ob, dict)
1247            self.assertIn(ob, dict)
1248            self.assertEqual(ob.arg, dict[ob])
1249            objects2.remove(ob)
1250        self.assertEqual(len(objects2), 0)
1251
1252        # Test iterkeyrefs()
1253        objects2 = list(objects)
1254        self.assertEqual(len(list(dict.keyrefs())), len(objects))
1255        for wr in dict.keyrefs():
1256            ob = wr()
1257            self.assertIn(ob, dict)
1258            self.assertIn(ob, dict)
1259            self.assertEqual(ob.arg, dict[ob])
1260            objects2.remove(ob)
1261        self.assertEqual(len(objects2), 0)
1262
1263    def test_weak_valued_iters(self):
1264        dict, objects = self.make_weak_valued_dict()
1265        self.check_iters(dict)
1266
1267        # Test valuerefs()
1268        refs = dict.valuerefs()
1269        self.assertEqual(len(refs), len(objects))
1270        objects2 = list(objects)
1271        for wr in refs:
1272            ob = wr()
1273            self.assertEqual(ob, dict[ob.arg])
1274            self.assertEqual(ob.arg, dict[ob.arg].arg)
1275            objects2.remove(ob)
1276        self.assertEqual(len(objects2), 0)
1277
1278        # Test itervaluerefs()
1279        objects2 = list(objects)
1280        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1281        for wr in dict.itervaluerefs():
1282            ob = wr()
1283            self.assertEqual(ob, dict[ob.arg])
1284            self.assertEqual(ob.arg, dict[ob.arg].arg)
1285            objects2.remove(ob)
1286        self.assertEqual(len(objects2), 0)
1287
1288    def check_iters(self, dict):
1289        # item iterator:
1290        items = list(dict.items())
1291        for item in dict.items():
1292            items.remove(item)
1293        self.assertFalse(items, "items() did not touch all items")
1294
1295        # key iterator, via __iter__():
1296        keys = list(dict.keys())
1297        for k in dict:
1298            keys.remove(k)
1299        self.assertFalse(keys, "__iter__() did not touch all keys")
1300
1301        # key iterator, via iterkeys():
1302        keys = list(dict.keys())
1303        for k in dict.keys():
1304            keys.remove(k)
1305        self.assertFalse(keys, "iterkeys() did not touch all keys")
1306
1307        # value iterator:
1308        values = list(dict.values())
1309        for v in dict.values():
1310            values.remove(v)
1311        self.assertFalse(values,
1312                     "itervalues() did not touch all values")
1313
1314    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1315        n = len(dict)
1316        it = iter(getattr(dict, iter_name)())
1317        next(it)             # Trigger internal iteration
1318        # Destroy an object
1319        del objects[-1]
1320        gc.collect()    # just in case
1321        # We have removed either the first consumed object, or another one
1322        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1323        del it
1324        # The removal has been committed
1325        self.assertEqual(len(dict), n - 1)
1326
1327    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1328        # Check that we can explicitly mutate the weak dict without
1329        # interfering with delayed removal.
1330        # `testcontext` should create an iterator, destroy one of the
1331        # weakref'ed objects and then return a new key/value pair corresponding
1332        # to the destroyed object.
1333        with testcontext() as (k, v):
1334            self.assertNotIn(k, dict)
1335        with testcontext() as (k, v):
1336            self.assertRaises(KeyError, dict.__delitem__, k)
1337        self.assertNotIn(k, dict)
1338        with testcontext() as (k, v):
1339            self.assertRaises(KeyError, dict.pop, k)
1340        self.assertNotIn(k, dict)
1341        with testcontext() as (k, v):
1342            dict[k] = v
1343        self.assertEqual(dict[k], v)
1344        ddict = copy.copy(dict)
1345        with testcontext() as (k, v):
1346            dict.update(ddict)
1347        self.assertEqual(dict, ddict)
1348        with testcontext() as (k, v):
1349            dict.clear()
1350        self.assertEqual(len(dict), 0)
1351
1352    def check_weak_del_and_len_while_iterating(self, dict, testcontext):
1353        # Check that len() works when both iterating and removing keys
1354        # explicitly through various means (.pop(), .clear()...), while
1355        # implicit mutation is deferred because an iterator is alive.
1356        # (each call to testcontext() should schedule one item for removal
1357        #  for this test to work properly)
1358        o = Object(123456)
1359        with testcontext():
1360            n = len(dict)
1361            # Since underlaying dict is ordered, first item is popped
1362            dict.pop(next(dict.keys()))
1363            self.assertEqual(len(dict), n - 1)
1364            dict[o] = o
1365            self.assertEqual(len(dict), n)
1366        # last item in objects is removed from dict in context shutdown
1367        with testcontext():
1368            self.assertEqual(len(dict), n - 1)
1369            # Then, (o, o) is popped
1370            dict.popitem()
1371            self.assertEqual(len(dict), n - 2)
1372        with testcontext():
1373            self.assertEqual(len(dict), n - 3)
1374            del dict[next(dict.keys())]
1375            self.assertEqual(len(dict), n - 4)
1376        with testcontext():
1377            self.assertEqual(len(dict), n - 5)
1378            dict.popitem()
1379            self.assertEqual(len(dict), n - 6)
1380        with testcontext():
1381            dict.clear()
1382            self.assertEqual(len(dict), 0)
1383        self.assertEqual(len(dict), 0)
1384
1385    def test_weak_keys_destroy_while_iterating(self):
1386        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1387        dict, objects = self.make_weak_keyed_dict()
1388        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1389        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1390        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1391        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
1392        dict, objects = self.make_weak_keyed_dict()
1393        @contextlib.contextmanager
1394        def testcontext():
1395            try:
1396                it = iter(dict.items())
1397                next(it)
1398                # Schedule a key/value for removal and recreate it
1399                v = objects.pop().arg
1400                gc.collect()      # just in case
1401                yield Object(v), v
1402            finally:
1403                it = None           # should commit all removals
1404                gc.collect()
1405        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1406        # Issue #21173: len() fragile when keys are both implicitly and
1407        # explicitly removed.
1408        dict, objects = self.make_weak_keyed_dict()
1409        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1410
1411    def test_weak_values_destroy_while_iterating(self):
1412        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1413        dict, objects = self.make_weak_valued_dict()
1414        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
1415        self.check_weak_destroy_while_iterating(dict, objects, 'items')
1416        self.check_weak_destroy_while_iterating(dict, objects, 'values')
1417        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1418        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
1419        dict, objects = self.make_weak_valued_dict()
1420        @contextlib.contextmanager
1421        def testcontext():
1422            try:
1423                it = iter(dict.items())
1424                next(it)
1425                # Schedule a key/value for removal and recreate it
1426                k = objects.pop().arg
1427                gc.collect()      # just in case
1428                yield k, Object(k)
1429            finally:
1430                it = None           # should commit all removals
1431                gc.collect()
1432        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1433        dict, objects = self.make_weak_valued_dict()
1434        self.check_weak_del_and_len_while_iterating(dict, testcontext)
1435
1436    def test_make_weak_keyed_dict_from_dict(self):
1437        o = Object(3)
1438        dict = weakref.WeakKeyDictionary({o:364})
1439        self.assertEqual(dict[o], 364)
1440
1441    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1442        o = Object(3)
1443        dict = weakref.WeakKeyDictionary({o:364})
1444        dict2 = weakref.WeakKeyDictionary(dict)
1445        self.assertEqual(dict[o], 364)
1446
1447    def make_weak_keyed_dict(self):
1448        dict = weakref.WeakKeyDictionary()
1449        objects = list(map(Object, range(self.COUNT)))
1450        for o in objects:
1451            dict[o] = o.arg
1452        return dict, objects
1453
1454    def test_make_weak_valued_dict_from_dict(self):
1455        o = Object(3)
1456        dict = weakref.WeakValueDictionary({364:o})
1457        self.assertEqual(dict[364], o)
1458
1459    def test_make_weak_valued_dict_from_weak_valued_dict(self):
1460        o = Object(3)
1461        dict = weakref.WeakValueDictionary({364:o})
1462        dict2 = weakref.WeakValueDictionary(dict)
1463        self.assertEqual(dict[364], o)
1464
1465    def test_make_weak_valued_dict_misc(self):
1466        # errors
1467        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1468        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1469        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1470        # special keyword arguments
1471        o = Object(3)
1472        for kw in 'self', 'dict', 'other', 'iterable':
1473            d = weakref.WeakValueDictionary(**{kw: o})
1474            self.assertEqual(list(d.keys()), [kw])
1475            self.assertEqual(d[kw], o)
1476
1477    def make_weak_valued_dict(self):
1478        dict = weakref.WeakValueDictionary()
1479        objects = list(map(Object, range(self.COUNT)))
1480        for o in objects:
1481            dict[o.arg] = o
1482        return dict, objects
1483
1484    def check_popitem(self, klass, key1, value1, key2, value2):
1485        weakdict = klass()
1486        weakdict[key1] = value1
1487        weakdict[key2] = value2
1488        self.assertEqual(len(weakdict), 2)
1489        k, v = weakdict.popitem()
1490        self.assertEqual(len(weakdict), 1)
1491        if k is key1:
1492            self.assertIs(v, value1)
1493        else:
1494            self.assertIs(v, value2)
1495        k, v = weakdict.popitem()
1496        self.assertEqual(len(weakdict), 0)
1497        if k is key1:
1498            self.assertIs(v, value1)
1499        else:
1500            self.assertIs(v, value2)
1501
1502    def test_weak_valued_dict_popitem(self):
1503        self.check_popitem(weakref.WeakValueDictionary,
1504                           "key1", C(), "key2", C())
1505
1506    def test_weak_keyed_dict_popitem(self):
1507        self.check_popitem(weakref.WeakKeyDictionary,
1508                           C(), "value 1", C(), "value 2")
1509
1510    def check_setdefault(self, klass, key, value1, value2):
1511        self.assertIsNot(value1, value2,
1512                     "invalid test"
1513                     " -- value parameters must be distinct objects")
1514        weakdict = klass()
1515        o = weakdict.setdefault(key, value1)
1516        self.assertIs(o, value1)
1517        self.assertIn(key, weakdict)
1518        self.assertIs(weakdict.get(key), value1)
1519        self.assertIs(weakdict[key], value1)
1520
1521        o = weakdict.setdefault(key, value2)
1522        self.assertIs(o, value1)
1523        self.assertIn(key, weakdict)
1524        self.assertIs(weakdict.get(key), value1)
1525        self.assertIs(weakdict[key], value1)
1526
1527    def test_weak_valued_dict_setdefault(self):
1528        self.check_setdefault(weakref.WeakValueDictionary,
1529                              "key", C(), C())
1530
1531    def test_weak_keyed_dict_setdefault(self):
1532        self.check_setdefault(weakref.WeakKeyDictionary,
1533                              C(), "value 1", "value 2")
1534
1535    def check_update(self, klass, dict):
1536        #
1537        #  This exercises d.update(), len(d), d.keys(), k in d,
1538        #  d.get(), d[].
1539        #
1540        weakdict = klass()
1541        weakdict.update(dict)
1542        self.assertEqual(len(weakdict), len(dict))
1543        for k in weakdict.keys():
1544            self.assertIn(k, dict, "mysterious new key appeared in weak dict")
1545            v = dict.get(k)
1546            self.assertIs(v, weakdict[k])
1547            self.assertIs(v, weakdict.get(k))
1548        for k in dict.keys():
1549            self.assertIn(k, weakdict, "original key disappeared in weak dict")
1550            v = dict[k]
1551            self.assertIs(v, weakdict[k])
1552            self.assertIs(v, weakdict.get(k))
1553
1554    def test_weak_valued_dict_update(self):
1555        self.check_update(weakref.WeakValueDictionary,
1556                          {1: C(), 'a': C(), C(): C()})
1557        # errors
1558        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1559        d = weakref.WeakValueDictionary()
1560        self.assertRaises(TypeError, d.update, {}, {})
1561        self.assertRaises(TypeError, d.update, (), ())
1562        self.assertEqual(list(d.keys()), [])
1563        # special keyword arguments
1564        o = Object(3)
1565        for kw in 'self', 'dict', 'other', 'iterable':
1566            d = weakref.WeakValueDictionary()
1567            d.update(**{kw: o})
1568            self.assertEqual(list(d.keys()), [kw])
1569            self.assertEqual(d[kw], o)
1570
1571    def test_weak_keyed_dict_update(self):
1572        self.check_update(weakref.WeakKeyDictionary,
1573                          {C(): 1, C(): 2, C(): 3})
1574
1575    def test_weak_keyed_delitem(self):
1576        d = weakref.WeakKeyDictionary()
1577        o1 = Object('1')
1578        o2 = Object('2')
1579        d[o1] = 'something'
1580        d[o2] = 'something'
1581        self.assertEqual(len(d), 2)
1582        del d[o1]
1583        self.assertEqual(len(d), 1)
1584        self.assertEqual(list(d.keys()), [o2])
1585
1586    def test_weak_valued_delitem(self):
1587        d = weakref.WeakValueDictionary()
1588        o1 = Object('1')
1589        o2 = Object('2')
1590        d['something'] = o1
1591        d['something else'] = o2
1592        self.assertEqual(len(d), 2)
1593        del d['something']
1594        self.assertEqual(len(d), 1)
1595        self.assertEqual(list(d.items()), [('something else', o2)])
1596
1597    def test_weak_keyed_bad_delitem(self):
1598        d = weakref.WeakKeyDictionary()
1599        o = Object('1')
1600        # An attempt to delete an object that isn't there should raise
1601        # KeyError.  It didn't before 2.3.
1602        self.assertRaises(KeyError, d.__delitem__, o)
1603        self.assertRaises(KeyError, d.__getitem__, o)
1604
1605        # If a key isn't of a weakly referencable type, __getitem__ and
1606        # __setitem__ raise TypeError.  __delitem__ should too.
1607        self.assertRaises(TypeError, d.__delitem__,  13)
1608        self.assertRaises(TypeError, d.__getitem__,  13)
1609        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1610
1611    def test_weak_keyed_cascading_deletes(self):
1612        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1613        # over the keys via self.data.iterkeys().  If things vanished from
1614        # the dict during this (or got added), that caused a RuntimeError.
1615
1616        d = weakref.WeakKeyDictionary()
1617        mutate = False
1618
1619        class C(object):
1620            def __init__(self, i):
1621                self.value = i
1622            def __hash__(self):
1623                return hash(self.value)
1624            def __eq__(self, other):
1625                if mutate:
1626                    # Side effect that mutates the dict, by removing the
1627                    # last strong reference to a key.
1628                    del objs[-1]
1629                return self.value == other.value
1630
1631        objs = [C(i) for i in range(4)]
1632        for o in objs:
1633            d[o] = o.value
1634        del o   # now the only strong references to keys are in objs
1635        # Find the order in which iterkeys sees the keys.
1636        objs = list(d.keys())
1637        # Reverse it, so that the iteration implementation of __delitem__
1638        # has to keep looping to find the first object we delete.
1639        objs.reverse()
1640
1641        # Turn on mutation in C.__eq__.  The first time through the loop,
1642        # under the iterkeys() business the first comparison will delete
1643        # the last item iterkeys() would see, and that causes a
1644        #     RuntimeError: dictionary changed size during iteration
1645        # when the iterkeys() loop goes around to try comparing the next
1646        # key.  After this was fixed, it just deletes the last object *our*
1647        # "for o in obj" loop would have gotten to.
1648        mutate = True
1649        count = 0
1650        for o in objs:
1651            count += 1
1652            del d[o]
1653        self.assertEqual(len(d), 0)
1654        self.assertEqual(count, 2)
1655
1656    def test_make_weak_valued_dict_repr(self):
1657        dict = weakref.WeakValueDictionary()
1658        self.assertRegex(repr(dict), '<WeakValueDictionary at 0x.*>')
1659
1660    def test_make_weak_keyed_dict_repr(self):
1661        dict = weakref.WeakKeyDictionary()
1662        self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
1663
1664    def test_threaded_weak_valued_setdefault(self):
1665        d = weakref.WeakValueDictionary()
1666        with collect_in_thread():
1667            for i in range(100000):
1668                x = d.setdefault(10, RefCycle())
1669                self.assertIsNot(x, None)  # we never put None in there!
1670                del x
1671
1672    def test_threaded_weak_valued_pop(self):
1673        d = weakref.WeakValueDictionary()
1674        with collect_in_thread():
1675            for i in range(100000):
1676                d[10] = RefCycle()
1677                x = d.pop(10, 10)
1678                self.assertIsNot(x, None)  # we never put None in there!
1679
1680    def test_threaded_weak_valued_consistency(self):
1681        # Issue #28427: old keys should not remove new values from
1682        # WeakValueDictionary when collecting from another thread.
1683        d = weakref.WeakValueDictionary()
1684        with collect_in_thread():
1685            for i in range(200000):
1686                o = RefCycle()
1687                d[10] = o
1688                # o is still alive, so the dict can't be empty
1689                self.assertEqual(len(d), 1)
1690                o = None  # lose ref
1691
1692    def check_threaded_weak_dict_copy(self, type_, deepcopy):
1693        # `type_` should be either WeakKeyDictionary or WeakValueDictionary.
1694        # `deepcopy` should be either True or False.
1695        exc = []
1696
1697        class DummyKey:
1698            def __init__(self, ctr):
1699                self.ctr = ctr
1700
1701        class DummyValue:
1702            def __init__(self, ctr):
1703                self.ctr = ctr
1704
1705        def dict_copy(d, exc):
1706            try:
1707                if deepcopy is True:
1708                    _ = copy.deepcopy(d)
1709                else:
1710                    _ = d.copy()
1711            except Exception as ex:
1712                exc.append(ex)
1713
1714        def pop_and_collect(lst):
1715            gc_ctr = 0
1716            while lst:
1717                i = random.randint(0, len(lst) - 1)
1718                gc_ctr += 1
1719                lst.pop(i)
1720                if gc_ctr % 10000 == 0:
1721                    gc.collect()  # just in case
1722
1723        self.assertIn(type_, (weakref.WeakKeyDictionary, weakref.WeakValueDictionary))
1724
1725        d = type_()
1726        keys = []
1727        values = []
1728        # Initialize d with many entries
1729        for i in range(70000):
1730            k, v = DummyKey(i), DummyValue(i)
1731            keys.append(k)
1732            values.append(v)
1733            d[k] = v
1734            del k
1735            del v
1736
1737        t_copy = threading.Thread(target=dict_copy, args=(d, exc,))
1738        if type_ is weakref.WeakKeyDictionary:
1739            t_collect = threading.Thread(target=pop_and_collect, args=(keys,))
1740        else:  # weakref.WeakValueDictionary
1741            t_collect = threading.Thread(target=pop_and_collect, args=(values,))
1742
1743        t_copy.start()
1744        t_collect.start()
1745
1746        t_copy.join()
1747        t_collect.join()
1748
1749        # Test exceptions
1750        if exc:
1751            raise exc[0]
1752
1753    def test_threaded_weak_key_dict_copy(self):
1754        # Issue #35615: Weakref keys or values getting GC'ed during dict
1755        # copying should not result in a crash.
1756        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, False)
1757
1758    def test_threaded_weak_key_dict_deepcopy(self):
1759        # Issue #35615: Weakref keys or values getting GC'ed during dict
1760        # copying should not result in a crash.
1761        self.check_threaded_weak_dict_copy(weakref.WeakKeyDictionary, True)
1762
1763    def test_threaded_weak_value_dict_copy(self):
1764        # Issue #35615: Weakref keys or values getting GC'ed during dict
1765        # copying should not result in a crash.
1766        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, False)
1767
1768    def test_threaded_weak_value_dict_deepcopy(self):
1769        # Issue #35615: Weakref keys or values getting GC'ed during dict
1770        # copying should not result in a crash.
1771        self.check_threaded_weak_dict_copy(weakref.WeakValueDictionary, True)
1772
1773
1774from test import mapping_tests
1775
1776class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1777    """Check that WeakValueDictionary conforms to the mapping protocol"""
1778    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1779    type2test = weakref.WeakValueDictionary
1780    def _reference(self):
1781        return self.__ref.copy()
1782
1783class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1784    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1785    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1786    type2test = weakref.WeakKeyDictionary
1787    def _reference(self):
1788        return self.__ref.copy()
1789
1790
1791class FinalizeTestCase(unittest.TestCase):
1792
1793    class A:
1794        pass
1795
1796    def _collect_if_necessary(self):
1797        # we create no ref-cycles so in CPython no gc should be needed
1798        if sys.implementation.name != 'cpython':
1799            support.gc_collect()
1800
1801    def test_finalize(self):
1802        def add(x,y,z):
1803            res.append(x + y + z)
1804            return x + y + z
1805
1806        a = self.A()
1807
1808        res = []
1809        f = weakref.finalize(a, add, 67, 43, z=89)
1810        self.assertEqual(f.alive, True)
1811        self.assertEqual(f.peek(), (a, add, (67,43), {'z':89}))
1812        self.assertEqual(f(), 199)
1813        self.assertEqual(f(), None)
1814        self.assertEqual(f(), None)
1815        self.assertEqual(f.peek(), None)
1816        self.assertEqual(f.detach(), None)
1817        self.assertEqual(f.alive, False)
1818        self.assertEqual(res, [199])
1819
1820        res = []
1821        f = weakref.finalize(a, add, 67, 43, 89)
1822        self.assertEqual(f.peek(), (a, add, (67,43,89), {}))
1823        self.assertEqual(f.detach(), (a, add, (67,43,89), {}))
1824        self.assertEqual(f(), None)
1825        self.assertEqual(f(), None)
1826        self.assertEqual(f.peek(), None)
1827        self.assertEqual(f.detach(), None)
1828        self.assertEqual(f.alive, False)
1829        self.assertEqual(res, [])
1830
1831        res = []
1832        f = weakref.finalize(a, add, x=67, y=43, z=89)
1833        del a
1834        self._collect_if_necessary()
1835        self.assertEqual(f(), None)
1836        self.assertEqual(f(), None)
1837        self.assertEqual(f.peek(), None)
1838        self.assertEqual(f.detach(), None)
1839        self.assertEqual(f.alive, False)
1840        self.assertEqual(res, [199])
1841
1842    def test_order(self):
1843        a = self.A()
1844        res = []
1845
1846        f1 = weakref.finalize(a, res.append, 'f1')
1847        f2 = weakref.finalize(a, res.append, 'f2')
1848        f3 = weakref.finalize(a, res.append, 'f3')
1849        f4 = weakref.finalize(a, res.append, 'f4')
1850        f5 = weakref.finalize(a, res.append, 'f5')
1851
1852        # make sure finalizers can keep themselves alive
1853        del f1, f4
1854
1855        self.assertTrue(f2.alive)
1856        self.assertTrue(f3.alive)
1857        self.assertTrue(f5.alive)
1858
1859        self.assertTrue(f5.detach())
1860        self.assertFalse(f5.alive)
1861
1862        f5()                       # nothing because previously unregistered
1863        res.append('A')
1864        f3()                       # => res.append('f3')
1865        self.assertFalse(f3.alive)
1866        res.append('B')
1867        f3()                       # nothing because previously called
1868        res.append('C')
1869        del a
1870        self._collect_if_necessary()
1871                                   # => res.append('f4')
1872                                   # => res.append('f2')
1873                                   # => res.append('f1')
1874        self.assertFalse(f2.alive)
1875        res.append('D')
1876        f2()                       # nothing because previously called by gc
1877
1878        expected = ['A', 'f3', 'B', 'C', 'f4', 'f2', 'f1', 'D']
1879        self.assertEqual(res, expected)
1880
1881    def test_all_freed(self):
1882        # we want a weakrefable subclass of weakref.finalize
1883        class MyFinalizer(weakref.finalize):
1884            pass
1885
1886        a = self.A()
1887        res = []
1888        def callback():
1889            res.append(123)
1890        f = MyFinalizer(a, callback)
1891
1892        wr_callback = weakref.ref(callback)
1893        wr_f = weakref.ref(f)
1894        del callback, f
1895
1896        self.assertIsNotNone(wr_callback())
1897        self.assertIsNotNone(wr_f())
1898
1899        del a
1900        self._collect_if_necessary()
1901
1902        self.assertIsNone(wr_callback())
1903        self.assertIsNone(wr_f())
1904        self.assertEqual(res, [123])
1905
1906    @classmethod
1907    def run_in_child(cls):
1908        def error():
1909            # Create an atexit finalizer from inside a finalizer called
1910            # at exit.  This should be the next to be run.
1911            g1 = weakref.finalize(cls, print, 'g1')
1912            print('f3 error')
1913            1/0
1914
1915        # cls should stay alive till atexit callbacks run
1916        f1 = weakref.finalize(cls, print, 'f1', _global_var)
1917        f2 = weakref.finalize(cls, print, 'f2', _global_var)
1918        f3 = weakref.finalize(cls, error)
1919        f4 = weakref.finalize(cls, print, 'f4', _global_var)
1920
1921        assert f1.atexit == True
1922        f2.atexit = False
1923        assert f3.atexit == True
1924        assert f4.atexit == True
1925
1926    def test_atexit(self):
1927        prog = ('from test.test_weakref import FinalizeTestCase;'+
1928                'FinalizeTestCase.run_in_child()')
1929        rc, out, err = script_helper.assert_python_ok('-c', prog)
1930        out = out.decode('ascii').splitlines()
1931        self.assertEqual(out, ['f4 foobar', 'f3 error', 'g1', 'f1 foobar'])
1932        self.assertTrue(b'ZeroDivisionError' in err)
1933
1934
1935libreftest = """ Doctest for examples in the library reference: weakref.rst
1936
1937>>> import weakref
1938>>> class Dict(dict):
1939...     pass
1940...
1941>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
1942>>> r = weakref.ref(obj)
1943>>> print(r() is obj)
1944True
1945
1946>>> import weakref
1947>>> class Object:
1948...     pass
1949...
1950>>> o = Object()
1951>>> r = weakref.ref(o)
1952>>> o2 = r()
1953>>> o is o2
1954True
1955>>> del o, o2
1956>>> print(r())
1957None
1958
1959>>> import weakref
1960>>> class ExtendedRef(weakref.ref):
1961...     def __init__(self, ob, callback=None, **annotations):
1962...         super().__init__(ob, callback)
1963...         self.__counter = 0
1964...         for k, v in annotations.items():
1965...             setattr(self, k, v)
1966...     def __call__(self):
1967...         '''Return a pair containing the referent and the number of
1968...         times the reference has been called.
1969...         '''
1970...         ob = super().__call__()
1971...         if ob is not None:
1972...             self.__counter += 1
1973...             ob = (ob, self.__counter)
1974...         return ob
1975...
1976>>> class A:   # not in docs from here, just testing the ExtendedRef
1977...     pass
1978...
1979>>> a = A()
1980>>> r = ExtendedRef(a, foo=1, bar="baz")
1981>>> r.foo
19821
1983>>> r.bar
1984'baz'
1985>>> r()[1]
19861
1987>>> r()[1]
19882
1989>>> r()[0] is a
1990True
1991
1992
1993>>> import weakref
1994>>> _id2obj_dict = weakref.WeakValueDictionary()
1995>>> def remember(obj):
1996...     oid = id(obj)
1997...     _id2obj_dict[oid] = obj
1998...     return oid
1999...
2000>>> def id2obj(oid):
2001...     return _id2obj_dict[oid]
2002...
2003>>> a = A()             # from here, just testing
2004>>> a_id = remember(a)
2005>>> id2obj(a_id) is a
2006True
2007>>> del a
2008>>> try:
2009...     id2obj(a_id)
2010... except KeyError:
2011...     print('OK')
2012... else:
2013...     print('WeakValueDictionary error')
2014OK
2015
2016"""
2017
2018__test__ = {'libreftest' : libreftest}
2019
2020def test_main():
2021    support.run_unittest(
2022        ReferencesTestCase,
2023        WeakMethodTestCase,
2024        MappingTestCase,
2025        WeakValueDictionaryTestCase,
2026        WeakKeyDictionaryTestCase,
2027        SubclassableWeakrefTestCase,
2028        FinalizeTestCase,
2029        )
2030    support.run_doctest(sys.modules[__name__])
2031
2032
2033if __name__ == "__main__":
2034    test_main()
2035