• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import unittest
3from doctest import DocTestSuite
4from test import support
5from test.support import threading_helper
6import weakref
7import gc
8
9# Modules under test
10import _thread
11import threading
12import _threading_local
13
14
15class Weak(object):
16    pass
17
18def target(local, weaklist):
19    weak = Weak()
20    local.weak = weak
21    weaklist.append(weakref.ref(weak))
22
23
24class BaseLocalTest:
25
26    def test_local_refs(self):
27        self._local_refs(20)
28        self._local_refs(50)
29        self._local_refs(100)
30
31    def _local_refs(self, n):
32        local = self._local()
33        weaklist = []
34        for i in range(n):
35            t = threading.Thread(target=target, args=(local, weaklist))
36            t.start()
37            t.join()
38        del t
39
40        support.gc_collect()  # For PyPy or other GCs.
41        self.assertEqual(len(weaklist), n)
42
43        # XXX _threading_local keeps the local of the last stopped thread alive.
44        deadlist = [weak for weak in weaklist if weak() is None]
45        self.assertIn(len(deadlist), (n-1, n))
46
47        # Assignment to the same thread local frees it sometimes (!)
48        local.someothervar = None
49        support.gc_collect()  # For PyPy or other GCs.
50        deadlist = [weak for weak in weaklist if weak() is None]
51        self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))
52
53    def test_derived(self):
54        # Issue 3088: if there is a threads switch inside the __init__
55        # of a threading.local derived class, the per-thread dictionary
56        # is created but not correctly set on the object.
57        # The first member set may be bogus.
58        import time
59        class Local(self._local):
60            def __init__(self):
61                time.sleep(0.01)
62        local = Local()
63
64        def f(i):
65            local.x = i
66            # Simply check that the variable is correctly set
67            self.assertEqual(local.x, i)
68
69        with threading_helper.start_threads(threading.Thread(target=f, args=(i,))
70                                            for i in range(10)):
71            pass
72
73    def test_derived_cycle_dealloc(self):
74        # http://bugs.python.org/issue6990
75        class Local(self._local):
76            pass
77        locals = None
78        passed = False
79        e1 = threading.Event()
80        e2 = threading.Event()
81
82        def f():
83            nonlocal passed
84            # 1) Involve Local in a cycle
85            cycle = [Local()]
86            cycle.append(cycle)
87            cycle[0].foo = 'bar'
88
89            # 2) GC the cycle (triggers threadmodule.c::local_clear
90            # before local_dealloc)
91            del cycle
92            support.gc_collect()  # For PyPy or other GCs.
93            e1.set()
94            e2.wait()
95
96            # 4) New Locals should be empty
97            passed = all(not hasattr(local, 'foo') for local in locals)
98
99        t = threading.Thread(target=f)
100        t.start()
101        e1.wait()
102
103        # 3) New Locals should recycle the original's address. Creating
104        # them in the thread overwrites the thread state and avoids the
105        # bug
106        locals = [Local() for i in range(10)]
107        e2.set()
108        t.join()
109
110        self.assertTrue(passed)
111
112    def test_arguments(self):
113        # Issue 1522237
114        class MyLocal(self._local):
115            def __init__(self, *args, **kwargs):
116                pass
117
118        MyLocal(a=1)
119        MyLocal(1)
120        self.assertRaises(TypeError, self._local, a=1)
121        self.assertRaises(TypeError, self._local, 1)
122
123    def _test_one_class(self, c):
124        self._failed = "No error message set or cleared."
125        obj = c()
126        e1 = threading.Event()
127        e2 = threading.Event()
128
129        def f1():
130            obj.x = 'foo'
131            obj.y = 'bar'
132            del obj.y
133            e1.set()
134            e2.wait()
135
136        def f2():
137            try:
138                foo = obj.x
139            except AttributeError:
140                # This is expected -- we haven't set obj.x in this thread yet!
141                self._failed = ""  # passed
142            else:
143                self._failed = ('Incorrectly got value %r from class %r\n' %
144                                (foo, c))
145                sys.stderr.write(self._failed)
146
147        t1 = threading.Thread(target=f1)
148        t1.start()
149        e1.wait()
150        t2 = threading.Thread(target=f2)
151        t2.start()
152        t2.join()
153        # The test is done; just let t1 know it can exit, and wait for it.
154        e2.set()
155        t1.join()
156
157        self.assertFalse(self._failed, self._failed)
158
159    def test_threading_local(self):
160        self._test_one_class(self._local)
161
162    def test_threading_local_subclass(self):
163        class LocalSubclass(self._local):
164            """To test that subclasses behave properly."""
165        self._test_one_class(LocalSubclass)
166
167    def _test_dict_attribute(self, cls):
168        obj = cls()
169        obj.x = 5
170        self.assertEqual(obj.__dict__, {'x': 5})
171        with self.assertRaises(AttributeError):
172            obj.__dict__ = {}
173        with self.assertRaises(AttributeError):
174            del obj.__dict__
175
176    def test_dict_attribute(self):
177        self._test_dict_attribute(self._local)
178
179    def test_dict_attribute_subclass(self):
180        class LocalSubclass(self._local):
181            """To test that subclasses behave properly."""
182        self._test_dict_attribute(LocalSubclass)
183
184    def test_cycle_collection(self):
185        class X:
186            pass
187
188        x = X()
189        x.local = self._local()
190        x.local.x = x
191        wr = weakref.ref(x)
192        del x
193        support.gc_collect()  # For PyPy or other GCs.
194        self.assertIsNone(wr())
195
196
197class ThreadLocalTest(unittest.TestCase, BaseLocalTest):
198    _local = _thread._local
199
200class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest):
201    _local = _threading_local.local
202
203
204def test_main():
205    suite = unittest.TestSuite()
206    suite.addTest(DocTestSuite('_threading_local'))
207    suite.addTest(unittest.makeSuite(ThreadLocalTest))
208    suite.addTest(unittest.makeSuite(PyThreadingLocalTest))
209
210    local_orig = _threading_local.local
211    def setUp(test):
212        _threading_local.local = _thread._local
213    def tearDown(test):
214        _threading_local.local = local_orig
215    suite.addTest(DocTestSuite('_threading_local',
216                               setUp=setUp, tearDown=tearDown)
217                  )
218
219    support.run_unittest(suite)
220
221if __name__ == '__main__':
222    test_main()
223