• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from doctest import DocTestSuite
3from test import support
4import weakref
5import gc
6
7# Modules under test
8_thread = support.import_module('_thread')
9threading = support.import_module('threading')
10import _threading_local
11
12
13class Weak(object):
14    pass
15
16def target(local, weaklist):
17    weak = Weak()
18    local.weak = weak
19    weaklist.append(weakref.ref(weak))
20
21
22class BaseLocalTest:
23
24    def test_local_refs(self):
25        self._local_refs(20)
26        self._local_refs(50)
27        self._local_refs(100)
28
29    def _local_refs(self, n):
30        local = self._local()
31        weaklist = []
32        for i in range(n):
33            t = threading.Thread(target=target, args=(local, weaklist))
34            t.start()
35            t.join()
36        del t
37
38        gc.collect()
39        self.assertEqual(len(weaklist), n)
40
41        # XXX _threading_local keeps the local of the last stopped thread alive.
42        deadlist = [weak for weak in weaklist if weak() is None]
43        self.assertIn(len(deadlist), (n-1, n))
44
45        # Assignment to the same thread local frees it sometimes (!)
46        local.someothervar = None
47        gc.collect()
48        deadlist = [weak for weak in weaklist if weak() is None]
49        self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))
50
51    def test_derived(self):
52        # Issue 3088: if there is a threads switch inside the __init__
53        # of a threading.local derived class, the per-thread dictionary
54        # is created but not correctly set on the object.
55        # The first member set may be bogus.
56        import time
57        class Local(self._local):
58            def __init__(self):
59                time.sleep(0.01)
60        local = Local()
61
62        def f(i):
63            local.x = i
64            # Simply check that the variable is correctly set
65            self.assertEqual(local.x, i)
66
67        with support.start_threads(threading.Thread(target=f, args=(i,))
68                                   for i in range(10)):
69            pass
70
71    def test_derived_cycle_dealloc(self):
72        # http://bugs.python.org/issue6990
73        class Local(self._local):
74            pass
75        locals = None
76        passed = False
77        e1 = threading.Event()
78        e2 = threading.Event()
79
80        def f():
81            nonlocal passed
82            # 1) Involve Local in a cycle
83            cycle = [Local()]
84            cycle.append(cycle)
85            cycle[0].foo = 'bar'
86
87            # 2) GC the cycle (triggers threadmodule.c::local_clear
88            # before local_dealloc)
89            del cycle
90            gc.collect()
91            e1.set()
92            e2.wait()
93
94            # 4) New Locals should be empty
95            passed = all(not hasattr(local, 'foo') for local in locals)
96
97        t = threading.Thread(target=f)
98        t.start()
99        e1.wait()
100
101        # 3) New Locals should recycle the original's address. Creating
102        # them in the thread overwrites the thread state and avoids the
103        # bug
104        locals = [Local() for i in range(10)]
105        e2.set()
106        t.join()
107
108        self.assertTrue(passed)
109
110    def test_arguments(self):
111        # Issue 1522237
112        class MyLocal(self._local):
113            def __init__(self, *args, **kwargs):
114                pass
115
116        MyLocal(a=1)
117        MyLocal(1)
118        self.assertRaises(TypeError, self._local, a=1)
119        self.assertRaises(TypeError, self._local, 1)
120
121    def _test_one_class(self, c):
122        self._failed = "No error message set or cleared."
123        obj = c()
124        e1 = threading.Event()
125        e2 = threading.Event()
126
127        def f1():
128            obj.x = 'foo'
129            obj.y = 'bar'
130            del obj.y
131            e1.set()
132            e2.wait()
133
134        def f2():
135            try:
136                foo = obj.x
137            except AttributeError:
138                # This is expected -- we haven't set obj.x in this thread yet!
139                self._failed = ""  # passed
140            else:
141                self._failed = ('Incorrectly got value %r from class %r\n' %
142                                (foo, c))
143                sys.stderr.write(self._failed)
144
145        t1 = threading.Thread(target=f1)
146        t1.start()
147        e1.wait()
148        t2 = threading.Thread(target=f2)
149        t2.start()
150        t2.join()
151        # The test is done; just let t1 know it can exit, and wait for it.
152        e2.set()
153        t1.join()
154
155        self.assertFalse(self._failed, self._failed)
156
157    def test_threading_local(self):
158        self._test_one_class(self._local)
159
160    def test_threading_local_subclass(self):
161        class LocalSubclass(self._local):
162            """To test that subclasses behave properly."""
163        self._test_one_class(LocalSubclass)
164
165    def _test_dict_attribute(self, cls):
166        obj = cls()
167        obj.x = 5
168        self.assertEqual(obj.__dict__, {'x': 5})
169        with self.assertRaises(AttributeError):
170            obj.__dict__ = {}
171        with self.assertRaises(AttributeError):
172            del obj.__dict__
173
174    def test_dict_attribute(self):
175        self._test_dict_attribute(self._local)
176
177    def test_dict_attribute_subclass(self):
178        class LocalSubclass(self._local):
179            """To test that subclasses behave properly."""
180        self._test_dict_attribute(LocalSubclass)
181
182    def test_cycle_collection(self):
183        class X:
184            pass
185
186        x = X()
187        x.local = self._local()
188        x.local.x = x
189        wr = weakref.ref(x)
190        del x
191        gc.collect()
192        self.assertIsNone(wr())
193
194
195class ThreadLocalTest(unittest.TestCase, BaseLocalTest):
196    _local = _thread._local
197
198class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest):
199    _local = _threading_local.local
200
201
202def test_main():
203    suite = unittest.TestSuite()
204    suite.addTest(DocTestSuite('_threading_local'))
205    suite.addTest(unittest.makeSuite(ThreadLocalTest))
206    suite.addTest(unittest.makeSuite(PyThreadingLocalTest))
207
208    local_orig = _threading_local.local
209    def setUp(test):
210        _threading_local.local = _thread._local
211    def tearDown(test):
212        _threading_local.local = local_orig
213    suite.addTest(DocTestSuite('_threading_local',
214                               setUp=setUp, tearDown=tearDown)
215                  )
216
217    support.run_unittest(suite)
218
219if __name__ == '__main__':
220    test_main()
221