• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import gc
2import sys
3import unittest
4import UserList
5import weakref
6import operator
7
8from test import test_support
9
10# Used in ReferencesTestCase.test_ref_created_during_del() .
11ref_from_del = None
12
13class C:
14    def method(self):
15        pass
16
17
18class Callable:
19    bar = None
20
21    def __call__(self, x):
22        self.bar = x
23
24
25def create_function():
26    def f(): pass
27    return f
28
29def create_bound_method():
30    return C().method
31
32def create_unbound_method():
33    return C.method
34
35
36class TestBase(unittest.TestCase):
37
38    def setUp(self):
39        self.cbcalled = 0
40
41    def callback(self, ref):
42        self.cbcalled += 1
43
44
45class ReferencesTestCase(TestBase):
46
47    def test_basic_ref(self):
48        self.check_basic_ref(C)
49        self.check_basic_ref(create_function)
50        self.check_basic_ref(create_bound_method)
51        self.check_basic_ref(create_unbound_method)
52
53        # Just make sure the tp_repr handler doesn't raise an exception.
54        # Live reference:
55        o = C()
56        wr = weakref.ref(o)
57        repr(wr)
58        # Dead reference:
59        del o
60        repr(wr)
61
62    def test_basic_callback(self):
63        self.check_basic_callback(C)
64        self.check_basic_callback(create_function)
65        self.check_basic_callback(create_bound_method)
66        self.check_basic_callback(create_unbound_method)
67
68    def test_multiple_callbacks(self):
69        o = C()
70        ref1 = weakref.ref(o, self.callback)
71        ref2 = weakref.ref(o, self.callback)
72        del o
73        self.assertTrue(ref1() is None,
74                     "expected reference to be invalidated")
75        self.assertTrue(ref2() is None,
76                     "expected reference to be invalidated")
77        self.assertTrue(self.cbcalled == 2,
78                     "callback not called the right number of times")
79
80    def test_multiple_selfref_callbacks(self):
81        # Make sure all references are invalidated before callbacks are called
82        #
83        # What's important here is that we're using the first
84        # reference in the callback invoked on the second reference
85        # (the most recently created ref is cleaned up first).  This
86        # tests that all references to the object are invalidated
87        # before any of the callbacks are invoked, so that we only
88        # have one invocation of _weakref.c:cleanup_helper() active
89        # for a particular object at a time.
90        #
91        def callback(object, self=self):
92            self.ref()
93        c = C()
94        self.ref = weakref.ref(c, callback)
95        ref1 = weakref.ref(c, callback)
96        del c
97
98    def test_proxy_ref(self):
99        o = C()
100        o.bar = 1
101        ref1 = weakref.proxy(o, self.callback)
102        ref2 = weakref.proxy(o, self.callback)
103        del o
104
105        def check(proxy):
106            proxy.bar
107
108        self.assertRaises(weakref.ReferenceError, check, ref1)
109        self.assertRaises(weakref.ReferenceError, check, ref2)
110        self.assertRaises(weakref.ReferenceError, bool, weakref.proxy(C()))
111        self.assertTrue(self.cbcalled == 2)
112
113    def check_basic_ref(self, factory):
114        o = factory()
115        ref = weakref.ref(o)
116        self.assertTrue(ref() is not None,
117                     "weak reference to live object should be live")
118        o2 = ref()
119        self.assertTrue(o is o2,
120                     "<ref>() should return original object if live")
121
122    def check_basic_callback(self, factory):
123        self.cbcalled = 0
124        o = factory()
125        ref = weakref.ref(o, self.callback)
126        del o
127        self.assertTrue(self.cbcalled == 1,
128                     "callback did not properly set 'cbcalled'")
129        self.assertTrue(ref() is None,
130                     "ref2 should be dead after deleting object reference")
131
132    def test_ref_reuse(self):
133        o = C()
134        ref1 = weakref.ref(o)
135        # create a proxy to make sure that there's an intervening creation
136        # between these two; it should make no difference
137        proxy = weakref.proxy(o)
138        ref2 = weakref.ref(o)
139        self.assertTrue(ref1 is ref2,
140                     "reference object w/out callback should be re-used")
141
142        o = C()
143        proxy = weakref.proxy(o)
144        ref1 = weakref.ref(o)
145        ref2 = weakref.ref(o)
146        self.assertTrue(ref1 is ref2,
147                     "reference object w/out callback should be re-used")
148        self.assertTrue(weakref.getweakrefcount(o) == 2,
149                     "wrong weak ref count for object")
150        del proxy
151        self.assertTrue(weakref.getweakrefcount(o) == 1,
152                     "wrong weak ref count for object after deleting proxy")
153
154    def test_proxy_reuse(self):
155        o = C()
156        proxy1 = weakref.proxy(o)
157        ref = weakref.ref(o)
158        proxy2 = weakref.proxy(o)
159        self.assertTrue(proxy1 is proxy2,
160                     "proxy object w/out callback should have been re-used")
161
162    def test_basic_proxy(self):
163        o = C()
164        self.check_proxy(o, weakref.proxy(o))
165
166        L = UserList.UserList()
167        p = weakref.proxy(L)
168        self.assertFalse(p, "proxy for empty UserList should be false")
169        p.append(12)
170        self.assertEqual(len(L), 1)
171        self.assertTrue(p, "proxy for non-empty UserList should be true")
172        with test_support.check_py3k_warnings():
173            p[:] = [2, 3]
174        self.assertEqual(len(L), 2)
175        self.assertEqual(len(p), 2)
176        self.assertIn(3, p, "proxy didn't support __contains__() properly")
177        p[1] = 5
178        self.assertEqual(L[1], 5)
179        self.assertEqual(p[1], 5)
180        L2 = UserList.UserList(L)
181        p2 = weakref.proxy(L2)
182        self.assertEqual(p, p2)
183        ## self.assertEqual(repr(L2), repr(p2))
184        L3 = UserList.UserList(range(10))
185        p3 = weakref.proxy(L3)
186        with test_support.check_py3k_warnings():
187            self.assertEqual(L3[:], p3[:])
188            self.assertEqual(L3[5:], p3[5:])
189            self.assertEqual(L3[:5], p3[:5])
190            self.assertEqual(L3[2:5], p3[2:5])
191
192    def test_proxy_unicode(self):
193        # See bug 5037
194        class C(object):
195            def __str__(self):
196                return "string"
197            def __unicode__(self):
198                return u"unicode"
199        instance = C()
200        self.assertIn("__unicode__", dir(weakref.proxy(instance)))
201        self.assertEqual(unicode(weakref.proxy(instance)), u"unicode")
202
203    def test_proxy_index(self):
204        class C:
205            def __index__(self):
206                return 10
207        o = C()
208        p = weakref.proxy(o)
209        self.assertEqual(operator.index(p), 10)
210
211    def test_proxy_div(self):
212        class C:
213            def __floordiv__(self, other):
214                return 42
215            def __ifloordiv__(self, other):
216                return 21
217        o = C()
218        p = weakref.proxy(o)
219        self.assertEqual(p // 5, 42)
220        p //= 5
221        self.assertEqual(p, 21)
222
223    # The PyWeakref_* C API is documented as allowing either NULL or
224    # None as the value for the callback, where either means "no
225    # callback".  The "no callback" ref and proxy objects are supposed
226    # to be shared so long as they exist by all callers so long as
227    # they are active.  In Python 2.3.3 and earlier, this guarantee
228    # was not honored, and was broken in different ways for
229    # PyWeakref_NewRef() and PyWeakref_NewProxy().  (Two tests.)
230
231    def test_shared_ref_without_callback(self):
232        self.check_shared_without_callback(weakref.ref)
233
234    def test_shared_proxy_without_callback(self):
235        self.check_shared_without_callback(weakref.proxy)
236
237    def check_shared_without_callback(self, makeref):
238        o = Object(1)
239        p1 = makeref(o, None)
240        p2 = makeref(o, None)
241        self.assertTrue(p1 is p2, "both callbacks were None in the C API")
242        del p1, p2
243        p1 = makeref(o)
244        p2 = makeref(o, None)
245        self.assertTrue(p1 is p2, "callbacks were NULL, None in the C API")
246        del p1, p2
247        p1 = makeref(o)
248        p2 = makeref(o)
249        self.assertTrue(p1 is p2, "both callbacks were NULL in the C API")
250        del p1, p2
251        p1 = makeref(o, None)
252        p2 = makeref(o)
253        self.assertTrue(p1 is p2, "callbacks were None, NULL in the C API")
254
255    def test_callable_proxy(self):
256        o = Callable()
257        ref1 = weakref.proxy(o)
258
259        self.check_proxy(o, ref1)
260
261        self.assertTrue(type(ref1) is weakref.CallableProxyType,
262                     "proxy is not of callable type")
263        ref1('twinkies!')
264        self.assertTrue(o.bar == 'twinkies!',
265                     "call through proxy not passed through to original")
266        ref1(x='Splat.')
267        self.assertTrue(o.bar == 'Splat.',
268                     "call through proxy not passed through to original")
269
270        # expect due to too few args
271        self.assertRaises(TypeError, ref1)
272
273        # expect due to too many args
274        self.assertRaises(TypeError, ref1, 1, 2, 3)
275
276    def check_proxy(self, o, proxy):
277        o.foo = 1
278        self.assertTrue(proxy.foo == 1,
279                     "proxy does not reflect attribute addition")
280        o.foo = 2
281        self.assertTrue(proxy.foo == 2,
282                     "proxy does not reflect attribute modification")
283        del o.foo
284        self.assertTrue(not hasattr(proxy, 'foo'),
285                     "proxy does not reflect attribute removal")
286
287        proxy.foo = 1
288        self.assertTrue(o.foo == 1,
289                     "object does not reflect attribute addition via proxy")
290        proxy.foo = 2
291        self.assertTrue(
292            o.foo == 2,
293            "object does not reflect attribute modification via proxy")
294        del proxy.foo
295        self.assertTrue(not hasattr(o, 'foo'),
296                     "object does not reflect attribute removal via proxy")
297
298    def test_proxy_deletion(self):
299        # Test clearing of SF bug #762891
300        class Foo:
301            result = None
302            def __delitem__(self, accessor):
303                self.result = accessor
304        g = Foo()
305        f = weakref.proxy(g)
306        del f[0]
307        self.assertEqual(f.result, 0)
308
309    def test_proxy_bool(self):
310        # Test clearing of SF bug #1170766
311        class List(list): pass
312        lyst = List()
313        self.assertEqual(bool(weakref.proxy(lyst)), bool(lyst))
314
315    def test_getweakrefcount(self):
316        o = C()
317        ref1 = weakref.ref(o)
318        ref2 = weakref.ref(o, self.callback)
319        self.assertTrue(weakref.getweakrefcount(o) == 2,
320                     "got wrong number of weak reference objects")
321
322        proxy1 = weakref.proxy(o)
323        proxy2 = weakref.proxy(o, self.callback)
324        self.assertTrue(weakref.getweakrefcount(o) == 4,
325                     "got wrong number of weak reference objects")
326
327        del ref1, ref2, proxy1, proxy2
328        self.assertTrue(weakref.getweakrefcount(o) == 0,
329                     "weak reference objects not unlinked from"
330                     " referent when discarded.")
331
332        # assumes ints do not support weakrefs
333        self.assertTrue(weakref.getweakrefcount(1) == 0,
334                     "got wrong number of weak reference objects for int")
335
336    def test_getweakrefs(self):
337        o = C()
338        ref1 = weakref.ref(o, self.callback)
339        ref2 = weakref.ref(o, self.callback)
340        del ref1
341        self.assertTrue(weakref.getweakrefs(o) == [ref2],
342                     "list of refs does not match")
343
344        o = C()
345        ref1 = weakref.ref(o, self.callback)
346        ref2 = weakref.ref(o, self.callback)
347        del ref2
348        self.assertTrue(weakref.getweakrefs(o) == [ref1],
349                     "list of refs does not match")
350
351        del ref1
352        self.assertTrue(weakref.getweakrefs(o) == [],
353                     "list of refs not cleared")
354
355        # assumes ints do not support weakrefs
356        self.assertTrue(weakref.getweakrefs(1) == [],
357                     "list of refs does not match for int")
358
359    def test_newstyle_number_ops(self):
360        class F(float):
361            pass
362        f = F(2.0)
363        p = weakref.proxy(f)
364        self.assertTrue(p + 1.0 == 3.0)
365        self.assertTrue(1.0 + p == 3.0)  # this used to SEGV
366
367    def test_callbacks_protected(self):
368        # Callbacks protected from already-set exceptions?
369        # Regression test for SF bug #478534.
370        class BogusError(Exception):
371            pass
372        data = {}
373        def remove(k):
374            del data[k]
375        def encapsulate():
376            f = lambda : ()
377            data[weakref.ref(f, remove)] = None
378            raise BogusError
379        try:
380            encapsulate()
381        except BogusError:
382            pass
383        else:
384            self.fail("exception not properly restored")
385        try:
386            encapsulate()
387        except BogusError:
388            pass
389        else:
390            self.fail("exception not properly restored")
391
392    def test_sf_bug_840829(self):
393        # "weakref callbacks and gc corrupt memory"
394        # subtype_dealloc erroneously exposed a new-style instance
395        # already in the process of getting deallocated to gc,
396        # causing double-deallocation if the instance had a weakref
397        # callback that triggered gc.
398        # If the bug exists, there probably won't be an obvious symptom
399        # in a release build.  In a debug build, a segfault will occur
400        # when the second attempt to remove the instance from the "list
401        # of all objects" occurs.
402
403        import gc
404
405        class C(object):
406            pass
407
408        c = C()
409        wr = weakref.ref(c, lambda ignore: gc.collect())
410        del c
411
412        # There endeth the first part.  It gets worse.
413        del wr
414
415        c1 = C()
416        c1.i = C()
417        wr = weakref.ref(c1.i, lambda ignore: gc.collect())
418
419        c2 = C()
420        c2.c1 = c1
421        del c1  # still alive because c2 points to it
422
423        # Now when subtype_dealloc gets called on c2, it's not enough just
424        # that c2 is immune from gc while the weakref callbacks associated
425        # with c2 execute (there are none in this 2nd half of the test, btw).
426        # subtype_dealloc goes on to call the base classes' deallocs too,
427        # so any gc triggered by weakref callbacks associated with anything
428        # torn down by a base class dealloc can also trigger double
429        # deallocation of c2.
430        del c2
431
432    def test_callback_in_cycle_1(self):
433        import gc
434
435        class J(object):
436            pass
437
438        class II(object):
439            def acallback(self, ignore):
440                self.J
441
442        I = II()
443        I.J = J
444        I.wr = weakref.ref(J, I.acallback)
445
446        # Now J and II are each in a self-cycle (as all new-style class
447        # objects are, since their __mro__ points back to them).  I holds
448        # both a weak reference (I.wr) and a strong reference (I.J) to class
449        # J.  I is also in a cycle (I.wr points to a weakref that references
450        # I.acallback).  When we del these three, they all become trash, but
451        # the cycles prevent any of them from getting cleaned up immediately.
452        # Instead they have to wait for cyclic gc to deduce that they're
453        # trash.
454        #
455        # gc used to call tp_clear on all of them, and the order in which
456        # it does that is pretty accidental.  The exact order in which we
457        # built up these things manages to provoke gc into running tp_clear
458        # in just the right order (I last).  Calling tp_clear on II leaves
459        # behind an insane class object (its __mro__ becomes NULL).  Calling
460        # tp_clear on J breaks its self-cycle, but J doesn't get deleted
461        # just then because of the strong reference from I.J.  Calling
462        # tp_clear on I starts to clear I's __dict__, and just happens to
463        # clear I.J first -- I.wr is still intact.  That removes the last
464        # reference to J, which triggers the weakref callback.  The callback
465        # tries to do "self.J", and instances of new-style classes look up
466        # attributes ("J") in the class dict first.  The class (II) wants to
467        # search II.__mro__, but that's NULL.   The result was a segfault in
468        # a release build, and an assert failure in a debug build.
469        del I, J, II
470        gc.collect()
471
472    def test_callback_in_cycle_2(self):
473        import gc
474
475        # This is just like test_callback_in_cycle_1, except that II is an
476        # old-style class.  The symptom is different then:  an instance of an
477        # old-style class looks in its own __dict__ first.  'J' happens to
478        # get cleared from I.__dict__ before 'wr', and 'J' was never in II's
479        # __dict__, so the attribute isn't found.  The difference is that
480        # the old-style II doesn't have a NULL __mro__ (it doesn't have any
481        # __mro__), so no segfault occurs.  Instead it got:
482        #    test_callback_in_cycle_2 (__main__.ReferencesTestCase) ...
483        #    Exception exceptions.AttributeError:
484        #   "II instance has no attribute 'J'" in <bound method II.acallback
485        #       of <?.II instance at 0x00B9B4B8>> ignored
486
487        class J(object):
488            pass
489
490        class II:
491            def acallback(self, ignore):
492                self.J
493
494        I = II()
495        I.J = J
496        I.wr = weakref.ref(J, I.acallback)
497
498        del I, J, II
499        gc.collect()
500
501    def test_callback_in_cycle_3(self):
502        import gc
503
504        # This one broke the first patch that fixed the last two.  In this
505        # case, the objects reachable from the callback aren't also reachable
506        # from the object (c1) *triggering* the callback:  you can get to
507        # c1 from c2, but not vice-versa.  The result was that c2's __dict__
508        # got tp_clear'ed by the time the c2.cb callback got invoked.
509
510        class C:
511            def cb(self, ignore):
512                self.me
513                self.c1
514                self.wr
515
516        c1, c2 = C(), C()
517
518        c2.me = c2
519        c2.c1 = c1
520        c2.wr = weakref.ref(c1, c2.cb)
521
522        del c1, c2
523        gc.collect()
524
525    def test_callback_in_cycle_4(self):
526        import gc
527
528        # Like test_callback_in_cycle_3, except c2 and c1 have different
529        # classes.  c2's class (C) isn't reachable from c1 then, so protecting
530        # objects reachable from the dying object (c1) isn't enough to stop
531        # c2's class (C) from getting tp_clear'ed before c2.cb is invoked.
532        # The result was a segfault (C.__mro__ was NULL when the callback
533        # tried to look up self.me).
534
535        class C(object):
536            def cb(self, ignore):
537                self.me
538                self.c1
539                self.wr
540
541        class D:
542            pass
543
544        c1, c2 = D(), C()
545
546        c2.me = c2
547        c2.c1 = c1
548        c2.wr = weakref.ref(c1, c2.cb)
549
550        del c1, c2, C, D
551        gc.collect()
552
553    def test_callback_in_cycle_resurrection(self):
554        import gc
555
556        # Do something nasty in a weakref callback:  resurrect objects
557        # from dead cycles.  For this to be attempted, the weakref and
558        # its callback must also be part of the cyclic trash (else the
559        # objects reachable via the callback couldn't be in cyclic trash
560        # to begin with -- the callback would act like an external root).
561        # But gc clears trash weakrefs with callbacks early now, which
562        # disables the callbacks, so the callbacks shouldn't get called
563        # at all (and so nothing actually gets resurrected).
564
565        alist = []
566        class C(object):
567            def __init__(self, value):
568                self.attribute = value
569
570            def acallback(self, ignore):
571                alist.append(self.c)
572
573        c1, c2 = C(1), C(2)
574        c1.c = c2
575        c2.c = c1
576        c1.wr = weakref.ref(c2, c1.acallback)
577        c2.wr = weakref.ref(c1, c2.acallback)
578
579        def C_went_away(ignore):
580            alist.append("C went away")
581        wr = weakref.ref(C, C_went_away)
582
583        del c1, c2, C   # make them all trash
584        self.assertEqual(alist, [])  # del isn't enough to reclaim anything
585
586        gc.collect()
587        # c1.wr and c2.wr were part of the cyclic trash, so should have
588        # been cleared without their callbacks executing.  OTOH, the weakref
589        # to C is bound to a function local (wr), and wasn't trash, so that
590        # callback should have been invoked when C went away.
591        self.assertEqual(alist, ["C went away"])
592        # The remaining weakref should be dead now (its callback ran).
593        self.assertEqual(wr(), None)
594
595        del alist[:]
596        gc.collect()
597        self.assertEqual(alist, [])
598
599    def test_callbacks_on_callback(self):
600        import gc
601
602        # Set up weakref callbacks *on* weakref callbacks.
603        alist = []
604        def safe_callback(ignore):
605            alist.append("safe_callback called")
606
607        class C(object):
608            def cb(self, ignore):
609                alist.append("cb called")
610
611        c, d = C(), C()
612        c.other = d
613        d.other = c
614        callback = c.cb
615        c.wr = weakref.ref(d, callback)     # this won't trigger
616        d.wr = weakref.ref(callback, d.cb)  # ditto
617        external_wr = weakref.ref(callback, safe_callback)  # but this will
618        self.assertTrue(external_wr() is callback)
619
620        # The weakrefs attached to c and d should get cleared, so that
621        # C.cb is never called.  But external_wr isn't part of the cyclic
622        # trash, and no cyclic trash is reachable from it, so safe_callback
623        # should get invoked when the bound method object callback (c.cb)
624        # -- which is itself a callback, and also part of the cyclic trash --
625        # gets reclaimed at the end of gc.
626
627        del callback, c, d, C
628        self.assertEqual(alist, [])  # del isn't enough to clean up cycles
629        gc.collect()
630        self.assertEqual(alist, ["safe_callback called"])
631        self.assertEqual(external_wr(), None)
632
633        del alist[:]
634        gc.collect()
635        self.assertEqual(alist, [])
636
637    def test_gc_during_ref_creation(self):
638        self.check_gc_during_creation(weakref.ref)
639
640    def test_gc_during_proxy_creation(self):
641        self.check_gc_during_creation(weakref.proxy)
642
643    def check_gc_during_creation(self, makeref):
644        thresholds = gc.get_threshold()
645        gc.set_threshold(1, 1, 1)
646        gc.collect()
647        class A:
648            pass
649
650        def callback(*args):
651            pass
652
653        referenced = A()
654
655        a = A()
656        a.a = a
657        a.wr = makeref(referenced)
658
659        try:
660            # now make sure the object and the ref get labeled as
661            # cyclic trash:
662            a = A()
663            weakref.ref(referenced, callback)
664
665        finally:
666            gc.set_threshold(*thresholds)
667
668    def test_ref_created_during_del(self):
669        # Bug #1377858
670        # A weakref created in an object's __del__() would crash the
671        # interpreter when the weakref was cleaned up since it would refer to
672        # non-existent memory.  This test should not segfault the interpreter.
673        class Target(object):
674            def __del__(self):
675                global ref_from_del
676                ref_from_del = weakref.ref(self)
677
678        w = Target()
679
680    def test_init(self):
681        # Issue 3634
682        # <weakref to class>.__init__() doesn't check errors correctly
683        r = weakref.ref(Exception)
684        self.assertRaises(TypeError, r.__init__, 0, 0, 0, 0, 0)
685        # No exception should be raised here
686        gc.collect()
687
688    def test_classes(self):
689        # Check that both old-style classes and new-style classes
690        # are weakrefable.
691        class A(object):
692            pass
693        class B:
694            pass
695        l = []
696        weakref.ref(int)
697        a = weakref.ref(A, l.append)
698        A = None
699        gc.collect()
700        self.assertEqual(a(), None)
701        self.assertEqual(l, [a])
702        b = weakref.ref(B, l.append)
703        B = None
704        gc.collect()
705        self.assertEqual(b(), None)
706        self.assertEqual(l, [a, b])
707
708
709class SubclassableWeakrefTestCase(TestBase):
710
711    def test_subclass_refs(self):
712        class MyRef(weakref.ref):
713            def __init__(self, ob, callback=None, value=42):
714                self.value = value
715                super(MyRef, self).__init__(ob, callback)
716            def __call__(self):
717                self.called = True
718                return super(MyRef, self).__call__()
719        o = Object("foo")
720        mr = MyRef(o, value=24)
721        self.assertTrue(mr() is o)
722        self.assertTrue(mr.called)
723        self.assertEqual(mr.value, 24)
724        del o
725        self.assertTrue(mr() is None)
726        self.assertTrue(mr.called)
727
728    def test_subclass_refs_dont_replace_standard_refs(self):
729        class MyRef(weakref.ref):
730            pass
731        o = Object(42)
732        r1 = MyRef(o)
733        r2 = weakref.ref(o)
734        self.assertTrue(r1 is not r2)
735        self.assertEqual(weakref.getweakrefs(o), [r2, r1])
736        self.assertEqual(weakref.getweakrefcount(o), 2)
737        r3 = MyRef(o)
738        self.assertEqual(weakref.getweakrefcount(o), 3)
739        refs = weakref.getweakrefs(o)
740        self.assertEqual(len(refs), 3)
741        self.assertTrue(r2 is refs[0])
742        self.assertIn(r1, refs[1:])
743        self.assertIn(r3, refs[1:])
744
745    def test_subclass_refs_dont_conflate_callbacks(self):
746        class MyRef(weakref.ref):
747            pass
748        o = Object(42)
749        r1 = MyRef(o, id)
750        r2 = MyRef(o, str)
751        self.assertTrue(r1 is not r2)
752        refs = weakref.getweakrefs(o)
753        self.assertIn(r1, refs)
754        self.assertIn(r2, refs)
755
756    def test_subclass_refs_with_slots(self):
757        class MyRef(weakref.ref):
758            __slots__ = "slot1", "slot2"
759            def __new__(type, ob, callback, slot1, slot2):
760                return weakref.ref.__new__(type, ob, callback)
761            def __init__(self, ob, callback, slot1, slot2):
762                self.slot1 = slot1
763                self.slot2 = slot2
764            def meth(self):
765                return self.slot1 + self.slot2
766        o = Object(42)
767        r = MyRef(o, None, "abc", "def")
768        self.assertEqual(r.slot1, "abc")
769        self.assertEqual(r.slot2, "def")
770        self.assertEqual(r.meth(), "abcdef")
771        self.assertFalse(hasattr(r, "__dict__"))
772
773    def test_subclass_refs_with_cycle(self):
774        # Bug #3110
775        # An instance of a weakref subclass can have attributes.
776        # If such a weakref holds the only strong reference to the object,
777        # deleting the weakref will delete the object. In this case,
778        # the callback must not be called, because the ref object is
779        # being deleted.
780        class MyRef(weakref.ref):
781            pass
782
783        # Use a local callback, for "regrtest -R::"
784        # to detect refcounting problems
785        def callback(w):
786            self.cbcalled += 1
787
788        o = C()
789        r1 = MyRef(o, callback)
790        r1.o = o
791        del o
792
793        del r1 # Used to crash here
794
795        self.assertEqual(self.cbcalled, 0)
796
797        # Same test, with two weakrefs to the same object
798        # (since code paths are different)
799        o = C()
800        r1 = MyRef(o, callback)
801        r2 = MyRef(o, callback)
802        r1.r = r2
803        r2.o = o
804        del o
805        del r2
806
807        del r1 # Used to crash here
808
809        self.assertEqual(self.cbcalled, 0)
810
811
812class Object:
813    def __init__(self, arg):
814        self.arg = arg
815    def __repr__(self):
816        return "<Object %r>" % self.arg
817
818
819class MappingTestCase(TestBase):
820
821    COUNT = 10
822
823    def test_weak_values(self):
824        #
825        #  This exercises d.copy(), d.items(), d[], del d[], len(d).
826        #
827        dict, objects = self.make_weak_valued_dict()
828        for o in objects:
829            self.assertTrue(weakref.getweakrefcount(o) == 1,
830                         "wrong number of weak references to %r!" % o)
831            self.assertTrue(o is dict[o.arg],
832                         "wrong object returned by weak dict!")
833        items1 = dict.items()
834        items2 = dict.copy().items()
835        items1.sort()
836        items2.sort()
837        self.assertTrue(items1 == items2,
838                     "cloning of weak-valued dictionary did not work!")
839        del items1, items2
840        self.assertTrue(len(dict) == self.COUNT)
841        del objects[0]
842        self.assertTrue(len(dict) == (self.COUNT - 1),
843                     "deleting object did not cause dictionary update")
844        del objects, o
845        self.assertTrue(len(dict) == 0,
846                     "deleting the values did not clear the dictionary")
847        # regression on SF bug #447152:
848        dict = weakref.WeakValueDictionary()
849        self.assertRaises(KeyError, dict.__getitem__, 1)
850        dict[2] = C()
851        self.assertRaises(KeyError, dict.__getitem__, 2)
852
853    def test_weak_keys(self):
854        #
855        #  This exercises d.copy(), d.items(), d[] = v, d[], del d[],
856        #  len(d), in d.
857        #
858        dict, objects = self.make_weak_keyed_dict()
859        for o in objects:
860            self.assertTrue(weakref.getweakrefcount(o) == 1,
861                         "wrong number of weak references to %r!" % o)
862            self.assertTrue(o.arg is dict[o],
863                         "wrong object returned by weak dict!")
864        items1 = dict.items()
865        items2 = dict.copy().items()
866        self.assertTrue(set(items1) == set(items2),
867                     "cloning of weak-keyed dictionary did not work!")
868        del items1, items2
869        self.assertTrue(len(dict) == self.COUNT)
870        del objects[0]
871        self.assertTrue(len(dict) == (self.COUNT - 1),
872                     "deleting object did not cause dictionary update")
873        del objects, o
874        self.assertTrue(len(dict) == 0,
875                     "deleting the keys did not clear the dictionary")
876        o = Object(42)
877        dict[o] = "What is the meaning of the universe?"
878        self.assertIn(o, dict)
879        self.assertNotIn(34, dict)
880
881    def test_weak_keyed_iters(self):
882        dict, objects = self.make_weak_keyed_dict()
883        self.check_iters(dict)
884
885        # Test keyrefs()
886        refs = dict.keyrefs()
887        self.assertEqual(len(refs), len(objects))
888        objects2 = list(objects)
889        for wr in refs:
890            ob = wr()
891            self.assertIn(ob, dict)
892            self.assertEqual(ob.arg, dict[ob])
893            objects2.remove(ob)
894        self.assertEqual(len(objects2), 0)
895
896        # Test iterkeyrefs()
897        objects2 = list(objects)
898        self.assertEqual(len(list(dict.iterkeyrefs())), len(objects))
899        for wr in dict.iterkeyrefs():
900            ob = wr()
901            self.assertIn(ob, dict)
902            self.assertEqual(ob.arg, dict[ob])
903            objects2.remove(ob)
904        self.assertEqual(len(objects2), 0)
905
906    def test_weak_valued_iters(self):
907        dict, objects = self.make_weak_valued_dict()
908        self.check_iters(dict)
909
910        # Test valuerefs()
911        refs = dict.valuerefs()
912        self.assertEqual(len(refs), len(objects))
913        objects2 = list(objects)
914        for wr in refs:
915            ob = wr()
916            self.assertEqual(ob, dict[ob.arg])
917            self.assertEqual(ob.arg, dict[ob.arg].arg)
918            objects2.remove(ob)
919        self.assertEqual(len(objects2), 0)
920
921        # Test itervaluerefs()
922        objects2 = list(objects)
923        self.assertEqual(len(list(dict.itervaluerefs())), len(objects))
924        for wr in dict.itervaluerefs():
925            ob = wr()
926            self.assertEqual(ob, dict[ob.arg])
927            self.assertEqual(ob.arg, dict[ob.arg].arg)
928            objects2.remove(ob)
929        self.assertEqual(len(objects2), 0)
930
931    def check_iters(self, dict):
932        # item iterator:
933        items = dict.items()
934        for item in dict.iteritems():
935            items.remove(item)
936        self.assertTrue(len(items) == 0, "iteritems() did not touch all items")
937
938        # key iterator, via __iter__():
939        keys = dict.keys()
940        for k in dict:
941            keys.remove(k)
942        self.assertTrue(len(keys) == 0, "__iter__() did not touch all keys")
943
944        # key iterator, via iterkeys():
945        keys = dict.keys()
946        for k in dict.iterkeys():
947            keys.remove(k)
948        self.assertTrue(len(keys) == 0, "iterkeys() did not touch all keys")
949
950        # value iterator:
951        values = dict.values()
952        for v in dict.itervalues():
953            values.remove(v)
954        self.assertTrue(len(values) == 0,
955                     "itervalues() did not touch all values")
956
957    def test_make_weak_keyed_dict_from_dict(self):
958        o = Object(3)
959        dict = weakref.WeakKeyDictionary({o:364})
960        self.assertTrue(dict[o] == 364)
961
962    def test_make_weak_keyed_dict_from_weak_keyed_dict(self):
963        o = Object(3)
964        dict = weakref.WeakKeyDictionary({o:364})
965        dict2 = weakref.WeakKeyDictionary(dict)
966        self.assertTrue(dict[o] == 364)
967
968    def make_weak_keyed_dict(self):
969        dict = weakref.WeakKeyDictionary()
970        objects = map(Object, range(self.COUNT))
971        for o in objects:
972            dict[o] = o.arg
973        return dict, objects
974
975    def make_weak_valued_dict(self):
976        dict = weakref.WeakValueDictionary()
977        objects = map(Object, range(self.COUNT))
978        for o in objects:
979            dict[o.arg] = o
980        return dict, objects
981
982    def check_popitem(self, klass, key1, value1, key2, value2):
983        weakdict = klass()
984        weakdict[key1] = value1
985        weakdict[key2] = value2
986        self.assertTrue(len(weakdict) == 2)
987        k, v = weakdict.popitem()
988        self.assertTrue(len(weakdict) == 1)
989        if k is key1:
990            self.assertTrue(v is value1)
991        else:
992            self.assertTrue(v is value2)
993        k, v = weakdict.popitem()
994        self.assertTrue(len(weakdict) == 0)
995        if k is key1:
996            self.assertTrue(v is value1)
997        else:
998            self.assertTrue(v is value2)
999
1000    def test_weak_valued_dict_popitem(self):
1001        self.check_popitem(weakref.WeakValueDictionary,
1002                           "key1", C(), "key2", C())
1003
1004    def test_weak_keyed_dict_popitem(self):
1005        self.check_popitem(weakref.WeakKeyDictionary,
1006                           C(), "value 1", C(), "value 2")
1007
1008    def check_setdefault(self, klass, key, value1, value2):
1009        self.assertTrue(value1 is not value2,
1010                     "invalid test"
1011                     " -- value parameters must be distinct objects")
1012        weakdict = klass()
1013        o = weakdict.setdefault(key, value1)
1014        self.assertIs(o, value1)
1015        self.assertIn(key, weakdict)
1016        self.assertIs(weakdict.get(key), value1)
1017        self.assertIs(weakdict[key], value1)
1018
1019        o = weakdict.setdefault(key, value2)
1020        self.assertIs(o, value1)
1021        self.assertIn(key, weakdict)
1022        self.assertIs(weakdict.get(key), value1)
1023        self.assertIs(weakdict[key], value1)
1024
1025    def test_weak_valued_dict_setdefault(self):
1026        self.check_setdefault(weakref.WeakValueDictionary,
1027                              "key", C(), C())
1028
1029    def test_weak_keyed_dict_setdefault(self):
1030        self.check_setdefault(weakref.WeakKeyDictionary,
1031                              C(), "value 1", "value 2")
1032
1033    def check_update(self, klass, dict):
1034        #
1035        #  This exercises d.update(), len(d), d.keys(), in d,
1036        #  d.get(), d[].
1037        #
1038        weakdict = klass()
1039        weakdict.update(dict)
1040        self.assertEqual(len(weakdict), len(dict))
1041        for k in weakdict.keys():
1042            self.assertIn(k, dict,
1043                         "mysterious new key appeared in weak dict")
1044            v = dict.get(k)
1045            self.assertIs(v, weakdict[k])
1046            self.assertIs(v, weakdict.get(k))
1047        for k in dict.keys():
1048            self.assertIn(k, weakdict,
1049                         "original key disappeared in weak dict")
1050            v = dict[k]
1051            self.assertIs(v, weakdict[k])
1052            self.assertIs(v, weakdict.get(k))
1053
1054    def test_weak_valued_dict_update(self):
1055        self.check_update(weakref.WeakValueDictionary,
1056                          {1: C(), 'a': C(), C(): C()})
1057
1058    def test_weak_keyed_dict_update(self):
1059        self.check_update(weakref.WeakKeyDictionary,
1060                          {C(): 1, C(): 2, C(): 3})
1061
1062    def test_weak_keyed_delitem(self):
1063        d = weakref.WeakKeyDictionary()
1064        o1 = Object('1')
1065        o2 = Object('2')
1066        d[o1] = 'something'
1067        d[o2] = 'something'
1068        self.assertTrue(len(d) == 2)
1069        del d[o1]
1070        self.assertTrue(len(d) == 1)
1071        self.assertTrue(d.keys() == [o2])
1072
1073    def test_weak_valued_delitem(self):
1074        d = weakref.WeakValueDictionary()
1075        o1 = Object('1')
1076        o2 = Object('2')
1077        d['something'] = o1
1078        d['something else'] = o2
1079        self.assertTrue(len(d) == 2)
1080        del d['something']
1081        self.assertTrue(len(d) == 1)
1082        self.assertTrue(d.items() == [('something else', o2)])
1083
1084    def test_weak_keyed_bad_delitem(self):
1085        d = weakref.WeakKeyDictionary()
1086        o = Object('1')
1087        # An attempt to delete an object that isn't there should raise
1088        # KeyError.  It didn't before 2.3.
1089        self.assertRaises(KeyError, d.__delitem__, o)
1090        self.assertRaises(KeyError, d.__getitem__, o)
1091
1092        # If a key isn't of a weakly referencable type, __getitem__ and
1093        # __setitem__ raise TypeError.  __delitem__ should too.
1094        self.assertRaises(TypeError, d.__delitem__,  13)
1095        self.assertRaises(TypeError, d.__getitem__,  13)
1096        self.assertRaises(TypeError, d.__setitem__,  13, 13)
1097
1098    def test_weak_keyed_cascading_deletes(self):
1099        # SF bug 742860.  For some reason, before 2.3 __delitem__ iterated
1100        # over the keys via self.data.iterkeys().  If things vanished from
1101        # the dict during this (or got added), that caused a RuntimeError.
1102
1103        d = weakref.WeakKeyDictionary()
1104        mutate = False
1105
1106        class C(object):
1107            def __init__(self, i):
1108                self.value = i
1109            def __hash__(self):
1110                return hash(self.value)
1111            def __eq__(self, other):
1112                if mutate:
1113                    # Side effect that mutates the dict, by removing the
1114                    # last strong reference to a key.
1115                    del objs[-1]
1116                return self.value == other.value
1117
1118        objs = [C(i) for i in range(4)]
1119        for o in objs:
1120            d[o] = o.value
1121        del o   # now the only strong references to keys are in objs
1122        # Find the order in which iterkeys sees the keys.
1123        objs = d.keys()
1124        # Reverse it, so that the iteration implementation of __delitem__
1125        # has to keep looping to find the first object we delete.
1126        objs.reverse()
1127
1128        # Turn on mutation in C.__eq__.  The first time thru the loop,
1129        # under the iterkeys() business the first comparison will delete
1130        # the last item iterkeys() would see, and that causes a
1131        #     RuntimeError: dictionary changed size during iteration
1132        # when the iterkeys() loop goes around to try comparing the next
1133        # key.  After this was fixed, it just deletes the last object *our*
1134        # "for o in obj" loop would have gotten to.
1135        mutate = True
1136        count = 0
1137        for o in objs:
1138            count += 1
1139            del d[o]
1140        self.assertEqual(len(d), 0)
1141        self.assertEqual(count, 2)
1142
1143from test import mapping_tests
1144
1145class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1146    """Check that WeakValueDictionary conforms to the mapping protocol"""
1147    __ref = {"key1":Object(1), "key2":Object(2), "key3":Object(3)}
1148    type2test = weakref.WeakValueDictionary
1149    def _reference(self):
1150        return self.__ref.copy()
1151
1152class WeakKeyDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
1153    """Check that WeakKeyDictionary conforms to the mapping protocol"""
1154    __ref = {Object("key1"):1, Object("key2"):2, Object("key3"):3}
1155    type2test = weakref.WeakKeyDictionary
1156    def _reference(self):
1157        return self.__ref.copy()
1158
1159libreftest = """ Doctest for examples in the library reference: weakref.rst
1160
1161>>> import weakref
1162>>> class Dict(dict):
1163...     pass
1164...
1165>>> obj = Dict(red=1, green=2, blue=3)   # this object is weak referencable
1166>>> r = weakref.ref(obj)
1167>>> print r() is obj
1168True
1169
1170>>> import weakref
1171>>> class Object:
1172...     pass
1173...
1174>>> o = Object()
1175>>> r = weakref.ref(o)
1176>>> o2 = r()
1177>>> o is o2
1178True
1179>>> del o, o2
1180>>> print r()
1181None
1182
1183>>> import weakref
1184>>> class ExtendedRef(weakref.ref):
1185...     def __init__(self, ob, callback=None, **annotations):
1186...         super(ExtendedRef, self).__init__(ob, callback)
1187...         self.__counter = 0
1188...         for k, v in annotations.iteritems():
1189...             setattr(self, k, v)
1190...     def __call__(self):
1191...         '''Return a pair containing the referent and the number of
1192...         times the reference has been called.
1193...         '''
1194...         ob = super(ExtendedRef, self).__call__()
1195...         if ob is not None:
1196...             self.__counter += 1
1197...             ob = (ob, self.__counter)
1198...         return ob
1199...
1200>>> class A:   # not in docs from here, just testing the ExtendedRef
1201...     pass
1202...
1203>>> a = A()
1204>>> r = ExtendedRef(a, foo=1, bar="baz")
1205>>> r.foo
12061
1207>>> r.bar
1208'baz'
1209>>> r()[1]
12101
1211>>> r()[1]
12122
1213>>> r()[0] is a
1214True
1215
1216
1217>>> import weakref
1218>>> _id2obj_dict = weakref.WeakValueDictionary()
1219>>> def remember(obj):
1220...     oid = id(obj)
1221...     _id2obj_dict[oid] = obj
1222...     return oid
1223...
1224>>> def id2obj(oid):
1225...     return _id2obj_dict[oid]
1226...
1227>>> a = A()             # from here, just testing
1228>>> a_id = remember(a)
1229>>> id2obj(a_id) is a
1230True
1231>>> del a
1232>>> try:
1233...     id2obj(a_id)
1234... except KeyError:
1235...     print 'OK'
1236... else:
1237...     print 'WeakValueDictionary error'
1238OK
1239
1240"""
1241
1242__test__ = {'libreftest' : libreftest}
1243
1244def test_main():
1245    test_support.run_unittest(
1246        ReferencesTestCase,
1247        MappingTestCase,
1248        WeakValueDictionaryTestCase,
1249        WeakKeyDictionaryTestCase,
1250        SubclassableWeakrefTestCase,
1251        )
1252    test_support.run_doctest(sys.modules[__name__])
1253
1254
1255if __name__ == "__main__":
1256    test_main()
1257