1import sys 2import unittest 3from doctest import DocTestSuite 4from test import support 5from test.support import threading_helper 6import weakref 7import gc 8 9# Modules under test 10import _thread 11import threading 12import _threading_local 13 14 15class Weak(object): 16 pass 17 18def target(local, weaklist): 19 weak = Weak() 20 local.weak = weak 21 weaklist.append(weakref.ref(weak)) 22 23 24class BaseLocalTest: 25 26 def test_local_refs(self): 27 self._local_refs(20) 28 self._local_refs(50) 29 self._local_refs(100) 30 31 def _local_refs(self, n): 32 local = self._local() 33 weaklist = [] 34 for i in range(n): 35 t = threading.Thread(target=target, args=(local, weaklist)) 36 t.start() 37 t.join() 38 del t 39 40 support.gc_collect() # For PyPy or other GCs. 41 self.assertEqual(len(weaklist), n) 42 43 # XXX _threading_local keeps the local of the last stopped thread alive. 44 deadlist = [weak for weak in weaklist if weak() is None] 45 self.assertIn(len(deadlist), (n-1, n)) 46 47 # Assignment to the same thread local frees it sometimes (!) 48 local.someothervar = None 49 support.gc_collect() # For PyPy or other GCs. 50 deadlist = [weak for weak in weaklist if weak() is None] 51 self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist))) 52 53 def test_derived(self): 54 # Issue 3088: if there is a threads switch inside the __init__ 55 # of a threading.local derived class, the per-thread dictionary 56 # is created but not correctly set on the object. 57 # The first member set may be bogus. 58 import time 59 class Local(self._local): 60 def __init__(self): 61 time.sleep(0.01) 62 local = Local() 63 64 def f(i): 65 local.x = i 66 # Simply check that the variable is correctly set 67 self.assertEqual(local.x, i) 68 69 with threading_helper.start_threads(threading.Thread(target=f, args=(i,)) 70 for i in range(10)): 71 pass 72 73 def test_derived_cycle_dealloc(self): 74 # http://bugs.python.org/issue6990 75 class Local(self._local): 76 pass 77 locals = None 78 passed = False 79 e1 = threading.Event() 80 e2 = threading.Event() 81 82 def f(): 83 nonlocal passed 84 # 1) Involve Local in a cycle 85 cycle = [Local()] 86 cycle.append(cycle) 87 cycle[0].foo = 'bar' 88 89 # 2) GC the cycle (triggers threadmodule.c::local_clear 90 # before local_dealloc) 91 del cycle 92 support.gc_collect() # For PyPy or other GCs. 93 e1.set() 94 e2.wait() 95 96 # 4) New Locals should be empty 97 passed = all(not hasattr(local, 'foo') for local in locals) 98 99 t = threading.Thread(target=f) 100 t.start() 101 e1.wait() 102 103 # 3) New Locals should recycle the original's address. Creating 104 # them in the thread overwrites the thread state and avoids the 105 # bug 106 locals = [Local() for i in range(10)] 107 e2.set() 108 t.join() 109 110 self.assertTrue(passed) 111 112 def test_arguments(self): 113 # Issue 1522237 114 class MyLocal(self._local): 115 def __init__(self, *args, **kwargs): 116 pass 117 118 MyLocal(a=1) 119 MyLocal(1) 120 self.assertRaises(TypeError, self._local, a=1) 121 self.assertRaises(TypeError, self._local, 1) 122 123 def _test_one_class(self, c): 124 self._failed = "No error message set or cleared." 125 obj = c() 126 e1 = threading.Event() 127 e2 = threading.Event() 128 129 def f1(): 130 obj.x = 'foo' 131 obj.y = 'bar' 132 del obj.y 133 e1.set() 134 e2.wait() 135 136 def f2(): 137 try: 138 foo = obj.x 139 except AttributeError: 140 # This is expected -- we haven't set obj.x in this thread yet! 141 self._failed = "" # passed 142 else: 143 self._failed = ('Incorrectly got value %r from class %r\n' % 144 (foo, c)) 145 sys.stderr.write(self._failed) 146 147 t1 = threading.Thread(target=f1) 148 t1.start() 149 e1.wait() 150 t2 = threading.Thread(target=f2) 151 t2.start() 152 t2.join() 153 # The test is done; just let t1 know it can exit, and wait for it. 154 e2.set() 155 t1.join() 156 157 self.assertFalse(self._failed, self._failed) 158 159 def test_threading_local(self): 160 self._test_one_class(self._local) 161 162 def test_threading_local_subclass(self): 163 class LocalSubclass(self._local): 164 """To test that subclasses behave properly.""" 165 self._test_one_class(LocalSubclass) 166 167 def _test_dict_attribute(self, cls): 168 obj = cls() 169 obj.x = 5 170 self.assertEqual(obj.__dict__, {'x': 5}) 171 with self.assertRaises(AttributeError): 172 obj.__dict__ = {} 173 with self.assertRaises(AttributeError): 174 del obj.__dict__ 175 176 def test_dict_attribute(self): 177 self._test_dict_attribute(self._local) 178 179 def test_dict_attribute_subclass(self): 180 class LocalSubclass(self._local): 181 """To test that subclasses behave properly.""" 182 self._test_dict_attribute(LocalSubclass) 183 184 def test_cycle_collection(self): 185 class X: 186 pass 187 188 x = X() 189 x.local = self._local() 190 x.local.x = x 191 wr = weakref.ref(x) 192 del x 193 support.gc_collect() # For PyPy or other GCs. 194 self.assertIsNone(wr()) 195 196 197class ThreadLocalTest(unittest.TestCase, BaseLocalTest): 198 _local = _thread._local 199 200class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest): 201 _local = _threading_local.local 202 203 204def test_main(): 205 suite = unittest.TestSuite() 206 suite.addTest(DocTestSuite('_threading_local')) 207 suite.addTest(unittest.makeSuite(ThreadLocalTest)) 208 suite.addTest(unittest.makeSuite(PyThreadingLocalTest)) 209 210 local_orig = _threading_local.local 211 def setUp(test): 212 _threading_local.local = _thread._local 213 def tearDown(test): 214 _threading_local.local = local_orig 215 suite.addTest(DocTestSuite('_threading_local', 216 setUp=setUp, tearDown=tearDown) 217 ) 218 219 support.run_unittest(suite) 220 221if __name__ == '__main__': 222 test_main() 223