• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import sys
3import unittest
4import UserList
5import weakref
6import operator
7import contextlib
8import copy
9
10from test import test_support
11
12# Used in ReferencesTestCase.test_ref_created_during_del() .
13ref_from_del = None
14
15class C:
16    def method(self):
17        pass
18
19
20class Callable:
21    bar = None
22
23    def __call__(self, x):
24        self.bar = x
25
26
27def create_function():
28    def f(): pass
29    return f
30
31def create_bound_method():
32    return C().method
33
34def create_unbound_method():
35    return C.method
36
37
38class Object:
39    def __init__(self, arg):
40        self.arg = arg
41    def __repr__(self):
42        return "<Object %r>" % self.arg
43    def __eq__(self, other):
44        if isinstance(other, Object):
45            return self.arg == other.arg
46        return NotImplemented
47    def __ne__(self, other):
48        if isinstance(other, Object):
49            return self.arg != other.arg
50        return NotImplemented
51    def __hash__(self):
52        return hash(self.arg)
53
54class RefCycle:
55    def __init__(self):
56        self.cycle = self
57
58
59class TestBase(unittest.TestCase):
60
61    def setUp(self):
62        self.cbcalled = 0
63
64    def callback(self, ref):
65        self.cbcalled += 1
66
67
68class ReferencesTestCase(TestBase):
69
70    def test_basic_ref(self):
71        self.check_basic_ref(C)
72        self.check_basic_ref(create_function)
73        self.check_basic_ref(create_bound_method)
74        self.check_basic_ref(create_unbound_method)
75
76        # Just make sure the tp_repr handler doesn't raise an exception.
77        # Live reference:
78        o = C()
79        wr = weakref.ref(o)
80        repr(wr)
81        # Dead reference:
82        del o
83        repr(wr)
84
85    def test_basic_callback(self):
86        self.check_basic_callback(C)
87        self.check_basic_callback(create_function)
88        self.check_basic_callback(create_bound_method)
89        self.check_basic_callback(create_unbound_method)
90
91    def test_multiple_callbacks(self):
92        o = C()
93        ref1 = weakref.ref(o, self.callback)
94        ref2 = weakref.ref(o, self.callback)
95        del o
96        self.assertIsNone(ref1(), "expected reference to be invalidated")
97        self.assertIsNone(ref2(), "expected reference to be invalidated")
98        self.assertEqual(self.cbcalled, 2,
99                     "callback not called the right number of times")
100
101    def test_multiple_selfref_callbacks(self):
102        # Make sure all references are invalidated before callbacks are called
103        #
104        # What's important here is that we're using the first
105        # reference in the callback invoked on the second reference
106        # (the most recently created ref is cleaned up first).  This
107        # tests that all references to the object are invalidated
108        # before any of the callbacks are invoked, so that we only
109        # have one invocation of _weakref.c:cleanup_helper() active
110        # for a particular object at a time.
111        #
112        def callback(object, self=self):
113            self.ref()
114        c = C()
115        self.ref = weakref.ref(c, callback)
116        ref1 = weakref.ref(c, callback)
117        del c
118
119    def test_constructor_kwargs(self):
120        c = C()
121        self.assertRaises(TypeError, weakref.ref, c, callback=None)
122
123    def test_proxy_ref(self):
124        o = C()
125        o.bar = 1
126        ref1 = weakref.proxy(o, self.callback)
127        ref2 = weakref.proxy(o, self.callback)
128        del o
129
130        def check(proxy):
131            proxy.bar
132
133        self.assertRaises(weakref.ReferenceError, check, ref1)
134        self.assertRaises(weakref.ReferenceError, check, ref2)
135        self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
136        self.assertEqual(self.cbcalled, 2)
137
138    def check_basic_ref(self, factory):
139        o = factory()
140        ref = weakref.ref(o)
141        self.assertIsNotNone(ref(),
142                     "weak reference to live object should be live")
143        o2 = ref()
144        self.assertIs(o, o2,
145                     "<ref>() should return original object if live")
146
147    def check_basic_callback(self, factory):
148        self.cbcalled = 0
149        o = factory()
150        ref = weakref.ref(o, self.callback)
151        del o
152        self.assertEqual(self.cbcalled, 1,
153                     "callback did not properly set 'cbcalled'")
154        self.assertIsNone(ref(),
155                     "ref2 should be dead after deleting object reference")
156
157    def test_ref_reuse(self):
158        o = C()
159        ref1 = weakref.ref(o)
160        # create a proxy to make sure that there's an intervening creation
161        # between these two; it should make no difference
162        proxy = weakref.proxy(o)
163        ref2 = weakref.ref(o)
164        self.assertIs(ref1, ref2,
165                     "reference object w/out callback should be re-used")
166
167        o = C()
168        proxy = weakref.proxy(o)
169        ref1 = weakref.ref(o)
170        ref2 = weakref.ref(o)
171        self.assertIs(ref1, ref2,
172                     "reference object w/out callback should be re-used")
173        self.assertEqual(weakref.getweakrefcount(o), 2,
174                     "wrong weak ref count for object")
175        del proxy
176        self.assertEqual(weakref.getweakrefcount(o), 1,
177                     "wrong weak ref count for object after deleting proxy")
178
179    def test_proxy_reuse(self):
180        o = C()
181        proxy1 = weakref.proxy(o)
182        ref = weakref.ref(o)
183        proxy2 = weakref.proxy(o)
184        self.assertIs(proxy1, proxy2,
185                     "proxy object w/out callback should have been re-used")
186
187    def test_basic_proxy(self):
188        o = C()
189        self.check_proxy(o, weakref.proxy(o))
190
191        L = UserList.UserList()
192        p = weakref.proxy(L)
193        self.assertFalse(p, "proxy for empty UserList should be false")
194        p.append(12)
195        self.assertEqual(len(L), 1)
196        self.assertTrue(p, "proxy for non-empty UserList should be true")
197        with test_support.check_py3k_warnings():
198            p[:] = [2, 3]
199        self.assertEqual(len(L), 2)
200        self.assertEqual(len(p), 2)
201        self.assertIn(3, p, "proxy didn't support __contains__() properly")
202        p[1] = 5
203        self.assertEqual(L[1], 5)
204        self.assertEqual(p[1], 5)
205        L2 = UserList.UserList(L)
206        p2 = weakref.proxy(L2)
207        self.assertEqual(p, p2)
208        ## self.assertEqual(repr(L2), repr(p2))
209        L3 = UserList.UserList(range(10))
210        p3 = weakref.proxy(L3)
211        with test_support.check_py3k_warnings():
212            self.assertEqual(L3[:], p3[:])
213            self.assertEqual(L3[5:], p3[5:])
214            self.assertEqual(L3[:5], p3[:5])
215            self.assertEqual(L3[2:5], p3[2:5])
216
217    def test_proxy_unicode(self):
218        # See bug 5037
219        class C(object):
220            def __str__(self):
221                return "string"
222            def __unicode__(self):
223                return u"unicode"
224        instance = C()
225        self.assertIn("__unicode__", dir(weakref.proxy(instance)))
226        self.assertEqual(unicode(weakref.proxy(instance)), u"unicode")
227
228    def test_proxy_index(self):
229        class C:
230            def __index__(self):
231                return 10
232        o = C()
233        p = weakref.proxy(o)
234        self.assertEqual(operator.index(p), 10)
235
236    def test_proxy_div(self):
237        class C:
238            def __floordiv__(self, other):
239                return 42
240            def __ifloordiv__(self, other):
241                return 21
242        o = C()
243        p = weakref.proxy(o)
244        self.assertEqual(p // 5, 42)
245        p //= 5
246        self.assertEqual(p, 21)
247
248    # The PyWeakref_* C API is documented as allowing either NULL or
249    # None as the value for the callback, where either means "no
250    # callback".  The "no callback" ref and proxy objects are supposed
251    # to be shared so long as they exist by all callers so long as
252    # they are active.  In Python 2.3.3 and earlier, this guarantee
253    # was not honored, and was broken in different ways for
254    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
255
256    def test_shared_ref_without_callback(self):
257        self.check_shared_without_callback(weakref.ref)
258
259    def test_shared_proxy_without_callback(self):
260        self.check_shared_without_callback(weakref.proxy)
261
262    def check_shared_without_callback(self, makeref):
263        o = Object(1)
264        p1 = makeref(o, None)
265        p2 = makeref(o, None)
266        self.assertIs(p1, p2, "both callbacks were None in the C API")
267        del p1, p2
268        p1 = makeref(o)
269        p2 = makeref(o, None)
270        self.assertIs(p1, p2, "callbacks were NULL, None in the C API")
271        del p1, p2
272        p1 = makeref(o)
273        p2 = makeref(o)
274        self.assertIs(p1, p2, "both callbacks were NULL in the C API")
275        del p1, p2
276        p1 = makeref(o, None)
277        p2 = makeref(o)
278        self.assertIs(p1, p2, "callbacks were None, NULL in the C API")
279
280    def test_callable_proxy(self):
281        o = Callable()
282        ref1 = weakref.proxy(o)
283
284        self.check_proxy(o, ref1)
285
286        self.assertIs(type(ref1), weakref.CallableProxyType,
287                     "proxy is not of callable type")
288        ref1('twinkies!')
289        self.assertEqual(o.bar, 'twinkies!',
290                     "call through proxy not passed through to original")
291        ref1(x='Splat.')
292        self.assertEqual(o.bar, 'Splat.',
293                     "call through proxy not passed through to original")
294
295        # expect due to too few args
296        self.assertRaises(TypeError, ref1)
297
298        # expect due to too many args
299        self.assertRaises(TypeError, ref1, 1, 2, 3)
300
301    def check_proxy(self, o, proxy):
302        o.foo = 1
303        self.assertEqual(proxy.foo, 1,
304                     "proxy does not reflect attribute addition")
305        o.foo = 2
306        self.assertEqual(proxy.foo, 2,
307                     "proxy does not reflect attribute modification")
308        del o.foo
309        self.assertFalse(hasattr(proxy, 'foo'),
310                     "proxy does not reflect attribute removal")
311
312        proxy.foo = 1
313        self.assertEqual(o.foo, 1,
314                     "object does not reflect attribute addition via proxy")
315        proxy.foo = 2
316        self.assertEqual(o.foo, 2,
317            "object does not reflect attribute modification via proxy")
318        del proxy.foo
319        self.assertFalse(hasattr(o, 'foo'),
320                     "object does not reflect attribute removal via proxy")
321
322    def test_proxy_deletion(self):
323        # Test clearing of SF bug #762891
324        class Foo:
325            result = None
326            def __delitem__(self, accessor):
327                self.result = accessor
328        g = Foo()
329        f = weakref.proxy(g)
330        del f[0]
331        self.assertEqual(f.result, 0)
332
333    def test_proxy_bool(self):
334        # Test clearing of SF bug #1170766
335        class List(list): pass
336        lyst = List()
337        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
338
339    def test_getweakrefcount(self):
340        o = C()
341        ref1 = weakref.ref(o)
342        ref2 = weakref.ref(o, self.callback)
343        self.assertEqual(weakref.getweakrefcount(o), 2,
344                     "got wrong number of weak reference objects")
345
346        proxy1 = weakref.proxy(o)
347        proxy2 = weakref.proxy(o, self.callback)
348        self.assertEqual(weakref.getweakrefcount(o), 4,
349                     "got wrong number of weak reference objects")
350
351        del ref1, ref2, proxy1, proxy2
352        self.assertEqual(weakref.getweakrefcount(o), 0,
353                     "weak reference objects not unlinked from"
354                     " referent when discarded.")
355
356        # assumes ints do not support weakrefs
357        self.assertEqual(weakref.getweakrefcount(1), 0,
358                     "got wrong number of weak reference objects for int")
359
360    def test_getweakrefs(self):
361        o = C()
362        ref1 = weakref.ref(o, self.callback)
363        ref2 = weakref.ref(o, self.callback)
364        del ref1
365        self.assertEqual(weakref.getweakrefs(o), [ref2],
366                     "list of refs does not match")
367
368        o = C()
369        ref1 = weakref.ref(o, self.callback)
370        ref2 = weakref.ref(o, self.callback)
371        del ref2
372        self.assertEqual(weakref.getweakrefs(o), [ref1],
373                     "list of refs does not match")
374
375        del ref1
376        self.assertEqual(weakref.getweakrefs(o), [],
377                     "list of refs not cleared")
378
379        # assumes ints do not support weakrefs
380        self.assertEqual(weakref.getweakrefs(1), [],
381                     "list of refs does not match for int")
382
383    def test_newstyle_number_ops(self):
384        class F(float):
385            pass
386        f = F(2.0)
387        p = weakref.proxy(f)
388        self.assertEqual(p + 1.0, 3.0)
389        self.assertEqual(1.0 + p, 3.0)  # this used to SEGV
390
391    def test_callbacks_protected(self):
392        # Callbacks protected from already-set exceptions?
393        # Regression test for SF bug #478534.
394        class BogusError(Exception):
395            pass
396        data = {}
397        def remove(k):
398            del data[k]
399        def encapsulate():
400            f = lambda : ()
401            data[weakref.ref(f, remove)] = None
402            raise BogusError
403        try:
404            encapsulate()
405        except BogusError:
406            pass
407        else:
408            self.fail("exception not properly restored")
409        try:
410            encapsulate()
411        except BogusError:
412            pass
413        else:
414            self.fail("exception not properly restored")
415
416    def test_sf_bug_840829(self):
417        # "weakref callbacks and gc corrupt memory"
418        # subtype_dealloc erroneously exposed a new-style instance
419        # already in the process of getting deallocated to gc,
420        # causing double-deallocation if the instance had a weakref
421        # callback that triggered gc.
422        # If the bug exists, there probably won't be an obvious symptom
423        # in a release build.  In a debug build, a segfault will occur
424        # when the second attempt to remove the instance from the "list
425        # of all objects" occurs.
426
427        import gc
428
429        class C(object):
430            pass
431
432        c = C()
433        wr = weakref.ref(c, lambda ignore: gc.collect())
434        del c
435
436        # There endeth the first part.  It gets worse.
437        del wr
438
439        c1 = C()
440        c1.i = C()
441        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
442
443        c2 = C()
444        c2.c1 = c1
445        del c1  # still alive because c2 points to it
446
447        # Now when subtype_dealloc gets called on c2, it's not enough just
448        # that c2 is immune from gc while the weakref callbacks associated
449        # with c2 execute (there are none in this 2nd half of the test, btw).
450        # subtype_dealloc goes on to call the base classes' deallocs too,
451        # so any gc triggered by weakref callbacks associated with anything
452        # torn down by a base class dealloc can also trigger double
453        # deallocation of c2.
454        del c2
455
456    def test_callback_in_cycle_1(self):
457        import gc
458
459        class J(object):
460            pass
461
462        class II(object):
463            def acallback(self, ignore):
464                self.J
465
466        I = II()
467        I.J = J
468        I.wr = weakref.ref(J, I.acallback)
469
470        # Now J and II are each in a self-cycle (as all new-style class
471        # objects are, since their __mro__ points back to them).  I holds
472        # both a weak reference (I.wr) and a strong reference (I.J) to class
473        # J.  I is also in a cycle (I.wr points to a weakref that references
474        # I.acallback).  When we del these three, they all become trash, but
475        # the cycles prevent any of them from getting cleaned up immediately.
476        # Instead they have to wait for cyclic gc to deduce that they're
477        # trash.
478        #
479        # gc used to call tp_clear on all of them, and the order in which
480        # it does that is pretty accidental.  The exact order in which we
481        # built up these things manages to provoke gc into running tp_clear
482        # in just the right order (I last).  Calling tp_clear on II leaves
483        # behind an insane class object (its __mro__ becomes NULL).  Calling
484        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
485        # just then because of the strong reference from I.J.  Calling
486        # tp_clear on I starts to clear I's __dict__, and just happens to
487        # clear I.J first -- I.wr is still intact.  That removes the last
488        # reference to J, which triggers the weakref callback.  The callback
489        # tries to do "self.J", and instances of new-style classes look up
490        # attributes ("J") in the class dict first.  The class (II) wants to
491        # search II.__mro__, but that's NULL.   The result was a segfault in
492        # a release build, and an assert failure in a debug build.
493        del I, J, II
494        gc.collect()
495
496    def test_callback_in_cycle_2(self):
497        import gc
498
499        # This is just like test_callback_in_cycle_1, except that II is an
500        # old-style class.  The symptom is different then:  an instance of an
501        # old-style class looks in its own __dict__ first.  'J' happens to
502        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
503        # __dict__, so the attribute isn't found.  The difference is that
504        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
505        # __mro__), so no segfault occurs.  Instead it got:
506        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
507        #    Exception exceptions.AttributeError:
508        #   "II instance has no attribute 'J'" in <bound method II.acallback
509        #       of <?.II instance at 0x00B9B4B8>> ignored
510
511        class J(object):
512            pass
513
514        class II:
515            def acallback(self, ignore):
516                self.J
517
518        I = II()
519        I.J = J
520        I.wr = weakref.ref(J, I.acallback)
521
522        del I, J, II
523        gc.collect()
524
525    def test_callback_in_cycle_3(self):
526        import gc
527
528        # This one broke the first patch that fixed the last two.  In this
529        # case, the objects reachable from the callback aren't also reachable
530        # from the object (c1) *triggering* the callback:  you can get to
531        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
532        # got tp_clear'ed by the time the c2.cb callback got invoked.
533
534        class C:
535            def cb(self, ignore):
536                self.me
537                self.c1
538                self.wr
539
540        c1, c2 = C(), C()
541
542        c2.me = c2
543        c2.c1 = c1
544        c2.wr = weakref.ref(c1, c2.cb)
545
546        del c1, c2
547        gc.collect()
548
549    def test_callback_in_cycle_4(self):
550        import gc
551
552        # Like test_callback_in_cycle_3, except c2 and c1 have different
553        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
554        # objects reachable from the dying object (c1) isn't enough to stop
555        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
556        # The result was a segfault (C.__mro__ was NULL when the callback
557        # tried to look up self.me).
558
559        class C(object):
560            def cb(self, ignore):
561                self.me
562                self.c1
563                self.wr
564
565        class D:
566            pass
567
568        c1, c2 = D(), C()
569
570        c2.me = c2
571        c2.c1 = c1
572        c2.wr = weakref.ref(c1, c2.cb)
573
574        del c1, c2, C, D
575        gc.collect()
576
577    def test_callback_in_cycle_resurrection(self):
578        import gc
579
580        # Do something nasty in a weakref callback:  resurrect objects
581        # from dead cycles.  For this to be attempted, the weakref and
582        # its callback must also be part of the cyclic trash (else the
583        # objects reachable via the callback couldn't be in cyclic trash
584        # to begin with -- the callback would act like an external root).
585        # But gc clears trash weakrefs with callbacks early now, which
586        # disables the callbacks, so the callbacks shouldn't get called
587        # at all (and so nothing actually gets resurrected).
588
589        alist = []
590        class C(object):
591            def __init__(self, value):
592                self.attribute = value
593
594            def acallback(self, ignore):
595                alist.append(self.c)
596
597        c1, c2 = C(1), C(2)
598        c1.c = c2
599        c2.c = c1
600        c1.wr = weakref.ref(c2, c1.acallback)
601        c2.wr = weakref.ref(c1, c2.acallback)
602
603        def C_went_away(ignore):
604            alist.append("C went away")
605        wr = weakref.ref(C, C_went_away)
606
607        del c1, c2, C   # make them all trash
608        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
609
610        gc.collect()
611        # c1.wr and c2.wr were part of the cyclic trash, so should have
612        # been cleared without their callbacks executing.  OTOH, the weakref
613        # to C is bound to a function local (wr), and wasn't trash, so that
614        # callback should have been invoked when C went away.
615        self.assertEqual(alist, ["C went away"])
616        # The remaining weakref should be dead now (its callback ran).
617        self.assertEqual(wr(), None)
618
619        del alist[:]
620        gc.collect()
621        self.assertEqual(alist, [])
622
623    def test_callbacks_on_callback(self):
624        import gc
625
626        # Set up weakref callbacks *on* weakref callbacks.
627        alist = []
628        def safe_callback(ignore):
629            alist.append("safe_callback called")
630
631        class C(object):
632            def cb(self, ignore):
633                alist.append("cb called")
634
635        c, d = C(), C()
636        c.other = d
637        d.other = c
638        callback = c.cb
639        c.wr = weakref.ref(d, callback)     # this won't trigger
640        d.wr = weakref.ref(callback, d.cb)  # ditto
641        external_wr = weakref.ref(callback, safe_callback)  # but this will
642        self.assertIs(external_wr(), callback)
643
644        # The weakrefs attached to c and d should get cleared, so that
645        # C.cb is never called.  But external_wr isn't part of the cyclic
646        # trash, and no cyclic trash is reachable from it, so safe_callback
647        # should get invoked when the bound method object callback (c.cb)
648        # -- which is itself a callback, and also part of the cyclic trash --
649        # gets reclaimed at the end of gc.
650
651        del callback, c, d, C
652        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
653        gc.collect()
654        self.assertEqual(alist, ["safe_callback called"])
655        self.assertEqual(external_wr(), None)
656
657        del alist[:]
658        gc.collect()
659        self.assertEqual(alist, [])
660
661    def test_gc_during_ref_creation(self):
662        self.check_gc_during_creation(weakref.ref)
663
664    def test_gc_during_proxy_creation(self):
665        self.check_gc_during_creation(weakref.proxy)
666
667    def check_gc_during_creation(self, makeref):
668        thresholds = gc.get_threshold()
669        gc.set_threshold(1, 1, 1)
670        gc.collect()
671        class A:
672            pass
673
674        def callback(*args):
675            pass
676
677        referenced = A()
678
679        a = A()
680        a.a = a
681        a.wr = makeref(referenced)
682
683        try:
684            # now make sure the object and the ref get labeled as
685            # cyclic trash:
686            a = A()
687            weakref.ref(referenced, callback)
688
689        finally:
690            gc.set_threshold(*thresholds)
691
692    def test_ref_created_during_del(self):
693        # Bug #1377858
694        # A weakref created in an object's __del__() would crash the
695        # interpreter when the weakref was cleaned up since it would refer to
696        # non-existent memory.  This test should not segfault the interpreter.
697        class Target(object):
698            def __del__(self):
699                global ref_from_del
700                ref_from_del = weakref.ref(self)
701
702        w = Target()
703
704    def test_init(self):
705        # Issue 3634
706        # <weakref to class>.__init__() doesn't check errors correctly
707        r = weakref.ref(Exception)
708        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
709        # No exception should be raised here
710        gc.collect()
711
712    def test_classes(self):
713        # Check that both old-style classes and new-style classes
714        # are weakrefable.
715        class A(object):
716            pass
717        class B:
718            pass
719        l = []
720        weakref.ref(int)
721        a = weakref.ref(A, l.append)
722        A = None
723        gc.collect()
724        self.assertEqual(a(), None)
725        self.assertEqual(l, [a])
726        b = weakref.ref(B, l.append)
727        B = None
728        gc.collect()
729        self.assertEqual(b(), None)
730        self.assertEqual(l, [a, b])
731
732    def test_equality(self):
733        # Alive weakrefs defer equality testing to their underlying object.
734        x = Object(1)
735        y = Object(1)
736        z = Object(2)
737        a = weakref.ref(x)
738        b = weakref.ref(y)
739        c = weakref.ref(z)
740        d = weakref.ref(x)
741        # Note how we directly test the operators here, to stress both
742        # __eq__ and __ne__.
743        self.assertTrue(a == b)
744        self.assertFalse(a != b)
745        self.assertFalse(a == c)
746        self.assertTrue(a != c)
747        self.assertTrue(a == d)
748        self.assertFalse(a != d)
749        del x, y, z
750        gc.collect()
751        for r in a, b, c:
752            # Sanity check
753            self.assertIs(r(), None)
754        # Dead weakrefs compare by identity: whether `a` and `d` are the
755        # same weakref object is an implementation detail, since they pointed
756        # to the same original object and didn't have a callback.
757        # (see issue #16453).
758        self.assertFalse(a == b)
759        self.assertTrue(a != b)
760        self.assertFalse(a == c)
761        self.assertTrue(a != c)
762        self.assertEqual(a == d, a is d)
763        self.assertEqual(a != d, a is not d)
764
765    def test_hashing(self):
766        # Alive weakrefs hash the same as the underlying object
767        x = Object(42)
768        y = Object(42)
769        a = weakref.ref(x)
770        b = weakref.ref(y)
771        self.assertEqual(hash(a), hash(42))
772        del x, y
773        gc.collect()
774        # Dead weakrefs:
775        # - retain their hash is they were hashed when alive;
776        # - otherwise, cannot be hashed.
777        self.assertEqual(hash(a), hash(42))
778        self.assertRaises(TypeError, hash, b)
779
780    def test_trashcan_16602(self):
781        # Issue #16602: when a weakref's target was part of a long
782        # deallocation chain, the trashcan mechanism could delay clearing
783        # of the weakref and make the target object visible from outside
784        # code even though its refcount had dropped to 0.  A crash ensued.
785        class C(object):
786            def __init__(self, parent):
787                if not parent:
788                    return
789                wself = weakref.ref(self)
790                def cb(wparent):
791                    o = wself()
792                self.wparent = weakref.ref(parent, cb)
793
794        d = weakref.WeakKeyDictionary()
795        root = c = C(None)
796        for n in range(100):
797            d[c] = c = C(c)
798        del root
799        gc.collect()
800
801
802class SubclassableWeakrefTestCase(TestBase):
803
804    def test_subclass_refs(self):
805        class MyRef(weakref.ref):
806            def __init__(self, ob, callback=None, value=42):
807                self.value = value
808                super(MyRef, self).__init__(ob, callback)
809            def __call__(self):
810                self.called = True
811                return super(MyRef, self).__call__()
812        o = Object("foo")
813        mr = MyRef(o, value=24)
814        self.assertIs(mr(), o)
815        self.assertTrue(mr.called)
816        self.assertEqual(mr.value, 24)
817        del o
818        self.assertIsNone(mr())
819        self.assertTrue(mr.called)
820
821    def test_subclass_refs_dont_replace_standard_refs(self):
822        class MyRef(weakref.ref):
823            pass
824        o = Object(42)
825        r1 = MyRef(o)
826        r2 = weakref.ref(o)
827        self.assertIsNot(r1, r2)
828        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
829        self.assertEqual(weakref.getweakrefcount(o), 2)
830        r3 = MyRef(o)
831        self.assertEqual(weakref.getweakrefcount(o), 3)
832        refs = weakref.getweakrefs(o)
833        self.assertEqual(len(refs), 3)
834        self.assertIs(r2, refs[0])
835        self.assertIn(r1, refs[1:])
836        self.assertIn(r3, refs[1:])
837
838    def test_subclass_refs_dont_conflate_callbacks(self):
839        class MyRef(weakref.ref):
840            pass
841        o = Object(42)
842        r1 = MyRef(o, id)
843        r2 = MyRef(o, str)
844        self.assertIsNot(r1, r2)
845        refs = weakref.getweakrefs(o)
846        self.assertIn(r1, refs)
847        self.assertIn(r2, refs)
848
849    def test_subclass_refs_with_slots(self):
850        class MyRef(weakref.ref):
851            __slots__ = "slot1", "slot2"
852            def __new__(type, ob, callback, slot1, slot2):
853                return weakref.ref.__new__(type, ob, callback)
854            def __init__(self, ob, callback, slot1, slot2):
855                self.slot1 = slot1
856                self.slot2 = slot2
857            def meth(self):
858                return self.slot1 + self.slot2
859        o = Object(42)
860        r = MyRef(o, None, "abc", "def")
861        self.assertEqual(r.slot1, "abc")
862        self.assertEqual(r.slot2, "def")
863        self.assertEqual(r.meth(), "abcdef")
864        self.assertFalse(hasattr(r, "__dict__"))
865
866    def test_subclass_refs_with_cycle(self):
867        # Bug #3110
868        # An instance of a weakref subclass can have attributes.
869        # If such a weakref holds the only strong reference to the object,
870        # deleting the weakref will delete the object. In this case,
871        # the callback must not be called, because the ref object is
872        # being deleted.
873        class MyRef(weakref.ref):
874            pass
875
876        # Use a local callback, for "regrtest -R::"
877        # to detect refcounting problems
878        def callback(w):
879            self.cbcalled += 1
880
881        o = C()
882        r1 = MyRef(o, callback)
883        r1.o = o
884        del o
885
886        del r1 # Used to crash here
887
888        self.assertEqual(self.cbcalled, 0)
889
890        # Same test, with two weakrefs to the same object
891        # (since code paths are different)
892        o = C()
893        r1 = MyRef(o, callback)
894        r2 = MyRef(o, callback)
895        r1.r = r2
896        r2.o = o
897        del o
898        del r2
899
900        del r1 # Used to crash here
901
902        self.assertEqual(self.cbcalled, 0)
903
904
905class MappingTestCase(TestBase):
906
907    COUNT = 10
908
909    def check_len_cycles(self, dict_type, cons):
910        N = 20
911        items = [RefCycle() for i in range(N)]
912        dct = dict_type(cons(i, o) for i, o in enumerate(items))
913        # Keep an iterator alive
914        it = dct.iteritems()
915        try:
916            next(it)
917        except StopIteration:
918            pass
919        del items
920        gc.collect()
921        n1 = len(dct)
922        list(it)
923        del it
924        gc.collect()
925        n2 = len(dct)
926        # iteration should prevent garbage collection here
927        # Note that this is a test on an implementation detail.  The requirement
928        # is only to provide stable iteration, not that the size of the container
929        # stay fixed.
930        self.assertEqual(n1, 20)
931        #self.assertIn(n1, (0, 1))
932        self.assertEqual(n2, 0)
933
934    def test_weak_keyed_len_cycles(self):
935        self.check_len_cycles(weakref.WeakKeyDictionary, lambda n, k: (k, n))
936
937    def test_weak_valued_len_cycles(self):
938        self.check_len_cycles(weakref.WeakValueDictionary, lambda n, k: (n, k))
939
940    def check_len_race(self, dict_type, cons):
941        # Extended sanity checks for len() in the face of cyclic collection
942        self.addCleanup(gc.set_threshold, *gc.get_threshold())
943        for th in range(1, 100):
944            N = 20
945            gc.collect(0)
946            gc.set_threshold(th, th, th)
947            items = [RefCycle() for i in range(N)]
948            dct = dict_type(cons(o) for o in items)
949            del items
950            # All items will be collected at next garbage collection pass
951            it = dct.iteritems()
952            try:
953                next(it)
954            except StopIteration:
955                pass
956            n1 = len(dct)
957            del it
958            n2 = len(dct)
959            self.assertGreaterEqual(n1, 0)
960            self.assertLessEqual(n1, N)
961            self.assertGreaterEqual(n2, 0)
962            self.assertLessEqual(n2, n1)
963
964    def test_weak_keyed_len_race(self):
965        self.check_len_race(weakref.WeakKeyDictionary, lambda k: (k, 1))
966
967    def test_weak_valued_len_race(self):
968        self.check_len_race(weakref.WeakValueDictionary, lambda k: (1, k))
969
970    def test_weak_values(self):
971        #
972        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
973        #
974        dict, objects = self.make_weak_valued_dict()
975        for o in objects:
976            self.assertEqual(weakref.getweakrefcount(o), 1,
977                         "wrong number of weak references to %r!" % o)
978            self.assertIs(o, dict[o.arg],
979                         "wrong object returned by weak dict!")
980        items1 = dict.items()
981        items2 = dict.copy().items()
982        items1.sort()
983        items2.sort()
984        self.assertEqual(items1, items2,
985                     "cloning of weak-valued dictionary did not work!")
986        del items1, items2
987        self.assertEqual(len(dict), self.COUNT)
988        del objects[0]
989        self.assertEqual(len(dict), (self.COUNT - 1),
990                     "deleting object did not cause dictionary update")
991        del objects, o
992        self.assertEqual(len(dict), 0,
993                     "deleting the values did not clear the dictionary")
994        # regression on SF bug #447152:
995        dict = weakref.WeakValueDictionary()
996        self.assertRaises(KeyError, dict.__getitem__, 1)
997        dict[2] = C()
998        self.assertRaises(KeyError, dict.__getitem__, 2)
999
1000    def test_weak_keys(self):
1001        #
1002        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
1003        #  len(d), in d.
1004        #
1005        dict, objects = self.make_weak_keyed_dict()
1006        for o in objects:
1007            self.assertEqual(weakref.getweakrefcount(o), 1,
1008                         "wrong number of weak references to %r!" % o)
1009            self.assertIs(o.arg, dict[o],
1010                         "wrong object returned by weak dict!")
1011        items1 = dict.items()
1012        items2 = dict.copy().items()
1013        self.assertEqual(set(items1), set(items2),
1014                     "cloning of weak-keyed dictionary did not work!")
1015        del items1, items2
1016        self.assertEqual(len(dict), self.COUNT)
1017        del objects[0]
1018        self.assertEqual(len(dict), (self.COUNT - 1),
1019                     "deleting object did not cause dictionary update")
1020        del objects, o
1021        self.assertEqual(len(dict), 0,
1022                     "deleting the keys did not clear the dictionary")
1023        o = Object(42)
1024        dict[o] = "What is the meaning of the universe?"
1025        self.assertIn(o, dict)
1026        self.assertNotIn(34, dict)
1027
1028    def test_weak_keyed_iters(self):
1029        dict, objects = self.make_weak_keyed_dict()
1030        self.check_iters(dict)
1031
1032        # Test keyrefs()
1033        refs = dict.keyrefs()
1034        self.assertEqual(len(refs), len(objects))
1035        objects2 = list(objects)
1036        for wr in refs:
1037            ob = wr()
1038            self.assertIn(ob, dict)
1039            self.assertEqual(ob.arg, dict[ob])
1040            objects2.remove(ob)
1041        self.assertEqual(len(objects2), 0)
1042
1043        # Test iterkeyrefs()
1044        objects2 = list(objects)
1045        self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
1046        for wr in dict.iterkeyrefs():
1047            ob = wr()
1048            self.assertIn(ob, dict)
1049            self.assertEqual(ob.arg, dict[ob])
1050            objects2.remove(ob)
1051        self.assertEqual(len(objects2), 0)
1052
1053    def test_weak_valued_iters(self):
1054        dict, objects = self.make_weak_valued_dict()
1055        self.check_iters(dict)
1056
1057        # Test valuerefs()
1058        refs = dict.valuerefs()
1059        self.assertEqual(len(refs), len(objects))
1060        objects2 = list(objects)
1061        for wr in refs:
1062            ob = wr()
1063            self.assertEqual(ob, dict[ob.arg])
1064            self.assertEqual(ob.arg, dict[ob.arg].arg)
1065            objects2.remove(ob)
1066        self.assertEqual(len(objects2), 0)
1067
1068        # Test itervaluerefs()
1069        objects2 = list(objects)
1070        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
1071        for wr in dict.itervaluerefs():
1072            ob = wr()
1073            self.assertEqual(ob, dict[ob.arg])
1074            self.assertEqual(ob.arg, dict[ob.arg].arg)
1075            objects2.remove(ob)
1076        self.assertEqual(len(objects2), 0)
1077
1078    def check_iters(self, dict):
1079        # item iterator:
1080        items = dict.items()
1081        for item in dict.iteritems():
1082            items.remove(item)
1083        self.assertEqual(len(items), 0, "iteritems() did not touch all items")
1084
1085        # key iterator, via __iter__():
1086        keys = dict.keys()
1087        for k in dict:
1088            keys.remove(k)
1089        self.assertEqual(len(keys), 0, "__iter__() did not touch all keys")
1090
1091        # key iterator, via iterkeys():
1092        keys = dict.keys()
1093        for k in dict.iterkeys():
1094            keys.remove(k)
1095        self.assertEqual(len(keys), 0, "iterkeys() did not touch all keys")
1096
1097        # value iterator:
1098        values = dict.values()
1099        for v in dict.itervalues():
1100            values.remove(v)
1101        self.assertEqual(len(values), 0,
1102                     "itervalues() did not touch all values")
1103
1104    def check_weak_destroy_while_iterating(self, dict, objects, iter_name):
1105        n = len(dict)
1106        it = iter(getattr(dict, iter_name)())
1107        next(it)             # Trigger internal iteration
1108        # Destroy an object
1109        del objects[-1]
1110        gc.collect()    # just in case
1111        # We have removed either the first consumed object, or another one
1112        self.assertIn(len(list(it)), [len(objects), len(objects) - 1])
1113        del it
1114        # The removal has been committed
1115        self.assertEqual(len(dict), n - 1)
1116
1117    def check_weak_destroy_and_mutate_while_iterating(self, dict, testcontext):
1118        # Check that we can explicitly mutate the weak dict without
1119        # interfering with delayed removal.
1120        # `testcontext` should create an iterator, destroy one of the
1121        # weakref'ed objects and then return a new key/value pair corresponding
1122        # to the destroyed object.
1123        with testcontext() as (k, v):
1124            self.assertFalse(k in dict)
1125        with testcontext() as (k, v):
1126            self.assertRaises(KeyError, dict.__delitem__, k)
1127        self.assertFalse(k in dict)
1128        with testcontext() as (k, v):
1129            self.assertRaises(KeyError, dict.pop, k)
1130        self.assertFalse(k in dict)
1131        with testcontext() as (k, v):
1132            dict[k] = v
1133        self.assertEqual(dict[k], v)
1134        ddict = copy.copy(dict)
1135        with testcontext() as (k, v):
1136            dict.update(ddict)
1137        self.assertEqual(dict, ddict)
1138        with testcontext() as (k, v):
1139            dict.clear()
1140        self.assertEqual(len(dict), 0)
1141
1142    def test_weak_keys_destroy_while_iterating(self):
1143        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1144        dict, objects = self.make_weak_keyed_dict()
1145        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
1146        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
1147        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
1148        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
1149        dict, objects = self.make_weak_keyed_dict()
1150        @contextlib.contextmanager
1151        def testcontext():
1152            try:
1153                it = iter(dict.iteritems())
1154                next(it)
1155                # Schedule a key/value for removal and recreate it
1156                v = objects.pop().arg
1157                gc.collect()      # just in case
1158                yield Object(v), v
1159            finally:
1160                it = None           # should commit all removals
1161                gc.collect()
1162        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1163
1164    def test_weak_values_destroy_while_iterating(self):
1165        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
1166        dict, objects = self.make_weak_valued_dict()
1167        self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
1168        self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
1169        self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
1170        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
1171        dict, objects = self.make_weak_valued_dict()
1172        @contextlib.contextmanager
1173        def testcontext():
1174            try:
1175                it = iter(dict.iteritems())
1176                next(it)
1177                # Schedule a key/value for removal and recreate it
1178                k = objects.pop().arg
1179                gc.collect()      # just in case
1180                yield k, Object(k)
1181            finally:
1182                it = None           # should commit all removals
1183                gc.collect()
1184        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
1185
1186    def test_make_weak_keyed_dict_from_dict(self):
1187        o = Object(3)
1188        dict = weakref.WeakKeyDictionary({o:364})
1189        self.assertEqual(dict[o], 364)
1190
1191    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
1192        o = Object(3)
1193        dict = weakref.WeakKeyDictionary({o:364})
1194        dict2 = weakref.WeakKeyDictionary(dict)
1195        self.assertEqual(dict[o], 364)
1196
1197    def make_weak_keyed_dict(self):
1198        dict = weakref.WeakKeyDictionary()
1199        objects = map(Object, range(self.COUNT))
1200        for o in objects:
1201            dict[o] = o.arg
1202        return dict, objects
1203
1204    def test_make_weak_valued_dict_misc(self):
1205        # errors
1206        self.assertRaises(TypeError, weakref.WeakValueDictionary.__init__)
1207        self.assertRaises(TypeError, weakref.WeakValueDictionary, {}, {})
1208        self.assertRaises(TypeError, weakref.WeakValueDictionary, (), ())
1209        # special keyword arguments
1210        o = Object(3)
1211        for kw in 'self', 'other', 'iterable':
1212            d = weakref.WeakValueDictionary(**{kw: o})
1213            self.assertEqual(list(d.keys()), [kw])
1214            self.assertEqual(d[kw], o)
1215
1216    def make_weak_valued_dict(self):
1217        dict = weakref.WeakValueDictionary()
1218        objects = map(Object, range(self.COUNT))
1219        for o in objects:
1220            dict[o.arg] = o
1221        return dict, objects
1222
1223    def check_popitem(self, klass, key1, value1, key2, value2):
1224        weakdict = klass()
1225        weakdict[key1] = value1
1226        weakdict[key2] = value2
1227        self.assertEqual(len(weakdict), 2)
1228        k, v = weakdict.popitem()
1229        self.assertEqual(len(weakdict), 1)
1230        if k is key1:
1231            self.assertIs(v, value1)
1232        else:
1233            self.assertIs(v, value2)
1234        k, v = weakdict.popitem()
1235        self.assertEqual(len(weakdict), 0)
1236        if k is key1:
1237            self.assertIs(v, value1)
1238        else:
1239            self.assertIs(v, value2)
1240
1241    def test_weak_valued_dict_popitem(self):
1242        self.check_popitem(weakref.WeakValueDictionary,
1243                           "key1", C(), "key2", C())
1244
1245    def test_weak_keyed_dict_popitem(self):
1246        self.check_popitem(weakref.WeakKeyDictionary,
1247                           C(), "value 1", C(), "value 2")
1248
1249    def check_setdefault(self, klass, key, value1, value2):
1250        self.assertIsNot(value1, value2,
1251                     "invalid test"
1252                     " -- value parameters must be distinct objects")
1253        weakdict = klass()
1254        o = weakdict.setdefault(key, value1)
1255        self.assertIs(o, value1)
1256        self.assertIn(key, weakdict)
1257        self.assertIs(weakdict.get(key), value1)
1258        self.assertIs(weakdict[key], value1)
1259
1260        o = weakdict.setdefault(key, value2)
1261        self.assertIs(o, value1)
1262        self.assertIn(key, weakdict)
1263        self.assertIs(weakdict.get(key), value1)
1264        self.assertIs(weakdict[key], value1)
1265
1266    def test_weak_valued_dict_setdefault(self):
1267        self.check_setdefault(weakref.WeakValueDictionary,
1268                              "key", C(), C())
1269
1270    def test_weak_keyed_dict_setdefault(self):
1271        self.check_setdefault(weakref.WeakKeyDictionary,
1272                              C(), "value 1", "value 2")
1273
1274    def check_update(self, klass, dict):
1275        #
1276        #  This exercises d.update(), len(d), d.keys(), in d,
1277        #  d.get(), d[].
1278        #
1279        weakdict = klass()
1280        weakdict.update(dict)
1281        self.assertEqual(len(weakdict), len(dict))
1282        for k in weakdict.keys():
1283            self.assertIn(k, dict,
1284                         "mysterious new key appeared in weak dict")
1285            v = dict.get(k)
1286            self.assertIs(v, weakdict[k])
1287            self.assertIs(v, weakdict.get(k))
1288        for k in dict.keys():
1289            self.assertIn(k, weakdict,
1290                         "original key disappeared in weak dict")
1291            v = dict[k]
1292            self.assertIs(v, weakdict[k])
1293            self.assertIs(v, weakdict.get(k))
1294
1295    def test_weak_valued_dict_update(self):
1296        self.check_update(weakref.WeakValueDictionary,
1297                          {1: C(), 'a': C(), C(): C()})
1298        # errors
1299        self.assertRaises(TypeError, weakref.WeakValueDictionary.update)
1300        d = weakref.WeakValueDictionary()
1301        self.assertRaises(TypeError, d.update, {}, {})
1302        self.assertRaises(TypeError, d.update, (), ())
1303        self.assertEqual(list(d.keys()), [])
1304        # special keyword arguments
1305        o = Object(3)
1306        for kw in 'self', 'dict', 'other', 'iterable':
1307            d = weakref.WeakValueDictionary()
1308            d.update(**{kw: o})
1309            self.assertEqual(list(d.keys()), [kw])
1310            self.assertEqual(d[kw], o)
1311
1312    def test_weak_keyed_dict_update(self):
1313        self.check_update(weakref.WeakKeyDictionary,
1314                          {C(): 1, C(): 2, C(): 3})
1315
1316    def test_weak_keyed_delitem(self):
1317        d = weakref.WeakKeyDictionary()
1318        o1 = Object('1')
1319        o2 = Object('2')
1320        d[o1] = 'something'
1321        d[o2] = 'something'
1322        self.assertEqual(len(d), 2)
1323        del d[o1]
1324        self.assertEqual(len(d), 1)
1325        self.assertEqual(d.keys(), [o2])
1326
1327    def test_weak_valued_delitem(self):
1328        d = weakref.WeakValueDictionary()
1329        o1 = Object('1')
1330        o2 = Object('2')
1331        d['something'] = o1
1332        d['something else'] = o2
1333        self.assertEqual(len(d), 2)
1334        del d['something']
1335        self.assertEqual(len(d), 1)
1336        self.assertEqual(d.items(), [('something else', o2)])
1337
1338    def test_weak_keyed_bad_delitem(self):
1339        d = weakref.WeakKeyDictionary()
1340        o = Object('1')
1341        # An attempt to delete an object that isn't there should raise
1342        # KeyError.  It didn't before 2.3.
1343        self.assertRaises(KeyError, d.__delitem__, o)
1344        self.assertRaises(KeyError, d.__getitem__, o)
1345
1346        # If a key isn't of a weakly referencable type, __getitem__ and
1347        # __setitem__ raise TypeError.  __delitem__ should too.
1348        self.assertRaises(TypeError, d.__delitem__,  13)
1349        self.assertRaises(TypeError, d.__getitem__,  13)
1350        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1351
1352    def test_weak_keyed_cascading_deletes(self):
1353        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1354        # over the keys via self.data.iterkeys().  If things vanished from
1355        # the dict during this (or got added), that caused a RuntimeError.
1356
1357        d = weakref.WeakKeyDictionary()
1358        mutate = False
1359
1360        class C(object):
1361            def __init__(self, i):
1362                self.value = i
1363            def __hash__(self):
1364                return hash(self.value)
1365            def __eq__(self, other):
1366                if mutate:
1367                    # Side effect that mutates the dict, by removing the
1368                    # last strong reference to a key.
1369                    del objs[-1]
1370                return self.value == other.value
1371
1372        objs = [C(i) for i in range(4)]
1373        for o in objs:
1374            d[o] = o.value
1375        del o   # now the only strong references to keys are in objs
1376        # Find the order in which iterkeys sees the keys.
1377        objs = d.keys()
1378        # Reverse it, so that the iteration implementation of __delitem__
1379        # has to keep looping to find the first object we delete.
1380        objs.reverse()
1381
1382        # Turn on mutation in C.__eq__.  The first time thru the loop,
1383        # under the iterkeys() business the first comparison will delete
1384        # the last item iterkeys() would see, and that causes a
1385        #     RuntimeError: dictionary changed size during iteration
1386        # when the iterkeys() loop goes around to try comparing the next
1387        # key.  After this was fixed, it just deletes the last object *our*
1388        # "for o in obj" loop would have gotten to.
1389        mutate = True
1390        count = 0
1391        for o in objs:
1392            count += 1
1393            del d[o]
1394        self.assertEqual(len(d), 0)
1395        self.assertEqual(count, 2)
1396
1397from test import mapping_tests
1398
1399class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1400    """Check that WeakValueDictionary conforms to the mapping protocol"""
1401    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1402    type2test = weakref.WeakValueDictionary
1403    def _reference(self):
1404        return self.__ref.copy()
1405
1406class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1407    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1408    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1409    type2test = weakref.WeakKeyDictionary
1410    def _reference(self):
1411        return self.__ref.copy()
1412
1413libreftest = """ Doctest for examples in the library reference: weakref.rst
1414
1415>>> import weakref
1416>>> class Dict(dict):
1417...     pass
1418...
1419>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
1420>>> r = weakref.ref(obj)
1421>>> print r() is obj
1422True
1423
1424>>> import weakref
1425>>> class Object:
1426...     pass
1427...
1428>>> o = Object()
1429>>> r = weakref.ref(o)
1430>>> o2 = r()
1431>>> o is o2
1432True
1433>>> del o, o2
1434>>> print r()
1435None
1436
1437>>> import weakref
1438>>> class ExtendedRef(weakref.ref):
1439...     def __init__(self, ob, callback=None, **annotations):
1440...         super(ExtendedRef, self).__init__(ob, callback)
1441...         self.__counter = 0
1442...         for k, v in annotations.iteritems():
1443...             setattr(self, k, v)
1444...     def __call__(self):
1445...         '''Return a pair containing the referent and the number of
1446...         times the reference has been called.
1447...         '''
1448...         ob = super(ExtendedRef, self).__call__()
1449...         if ob is not None:
1450...             self.__counter += 1
1451...             ob = (ob, self.__counter)
1452...         return ob
1453...
1454>>> class A:   # not in docs from here, just testing the ExtendedRef
1455...     pass
1456...
1457>>> a = A()
1458>>> r = ExtendedRef(a, foo=1, bar="baz")
1459>>> r.foo
14601
1461>>> r.bar
1462'baz'
1463>>> r()[1]
14641
1465>>> r()[1]
14662
1467>>> r()[0] is a
1468True
1469
1470
1471>>> import weakref
1472>>> _id2obj_dict = weakref.WeakValueDictionary()
1473>>> def remember(obj):
1474...     oid = id(obj)
1475...     _id2obj_dict[oid] = obj
1476...     return oid
1477...
1478>>> def id2obj(oid):
1479...     return _id2obj_dict[oid]
1480...
1481>>> a = A()             # from here, just testing
1482>>> a_id = remember(a)
1483>>> id2obj(a_id) is a
1484True
1485>>> del a
1486>>> try:
1487...     id2obj(a_id)
1488... except KeyError:
1489...     print 'OK'
1490... else:
1491...     print 'WeakValueDictionary error'
1492OK
1493
1494"""
1495
1496__test__ = {'libreftest' : libreftest}
1497
1498def test_main():
1499    test_support.run_unittest(
1500        ReferencesTestCase,
1501        MappingTestCase,
1502        WeakValueDictionaryTestCase,
1503        WeakKeyDictionaryTestCase,
1504        SubclassableWeakrefTestCase,
1505        )
1506    test_support.run_doctest(sys.modules[__name__])
1507
1508
1509if __name__ == "__main__":
1510    test_main()
1511