python (3.12.0)
1 import sys
2 import unittest
3 from doctest import DocTestSuite
4 from test import support
5 from test.support import threading_helper
6 from test.support.import_helper import import_module
7 import weakref
8
9 # Modules under test
10 import _thread
11 import threading
12 import _threading_local
13
14
15 threading_helper.requires_working_threading(module=True)
16
17
18 class ESC[4;38;5;81mWeak(ESC[4;38;5;149mobject):
19 pass
20
21 def target(local, weaklist):
22 weak = Weak()
23 local.weak = weak
24 weaklist.append(weakref.ref(weak))
25
26
27 class ESC[4;38;5;81mBaseLocalTest:
28
29 def test_local_refs(self):
30 self._local_refs(20)
31 self._local_refs(50)
32 self._local_refs(100)
33
34 def _local_refs(self, n):
35 local = self._local()
36 weaklist = []
37 for i in range(n):
38 t = threading.Thread(target=target, args=(local, weaklist))
39 t.start()
40 t.join()
41 del t
42
43 support.gc_collect() # For PyPy or other GCs.
44 self.assertEqual(len(weaklist), n)
45
46 # XXX _threading_local keeps the local of the last stopped thread alive.
47 deadlist = [weak for weak in weaklist if weak() is None]
48 self.assertIn(len(deadlist), (n-1, n))
49
50 # Assignment to the same thread local frees it sometimes (!)
51 local.someothervar = None
52 support.gc_collect() # For PyPy or other GCs.
53 deadlist = [weak for weak in weaklist if weak() is None]
54 self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist)))
55
56 def test_derived(self):
57 # Issue 3088: if there is a threads switch inside the __init__
58 # of a threading.local derived class, the per-thread dictionary
59 # is created but not correctly set on the object.
60 # The first member set may be bogus.
61 import time
62 class ESC[4;38;5;81mLocal(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
63 def __init__(self):
64 time.sleep(0.01)
65 local = Local()
66
67 def f(i):
68 local.x = i
69 # Simply check that the variable is correctly set
70 self.assertEqual(local.x, i)
71
72 with threading_helper.start_threads(threading.Thread(target=f, args=(i,))
73 for i in range(10)):
74 pass
75
76 def test_derived_cycle_dealloc(self):
77 # http://bugs.python.org/issue6990
78 class ESC[4;38;5;81mLocal(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
79 pass
80 locals = None
81 passed = False
82 e1 = threading.Event()
83 e2 = threading.Event()
84
85 def f():
86 nonlocal passed
87 # 1) Involve Local in a cycle
88 cycle = [Local()]
89 cycle.append(cycle)
90 cycle[0].foo = 'bar'
91
92 # 2) GC the cycle (triggers threadmodule.c::local_clear
93 # before local_dealloc)
94 del cycle
95 support.gc_collect() # For PyPy or other GCs.
96 e1.set()
97 e2.wait()
98
99 # 4) New Locals should be empty
100 passed = all(not hasattr(local, 'foo') for local in locals)
101
102 t = threading.Thread(target=f)
103 t.start()
104 e1.wait()
105
106 # 3) New Locals should recycle the original's address. Creating
107 # them in the thread overwrites the thread state and avoids the
108 # bug
109 locals = [Local() for i in range(10)]
110 e2.set()
111 t.join()
112
113 self.assertTrue(passed)
114
115 def test_arguments(self):
116 # Issue 1522237
117 class ESC[4;38;5;81mMyLocal(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
118 def __init__(self, *args, **kwargs):
119 pass
120
121 MyLocal(a=1)
122 MyLocal(1)
123 self.assertRaises(TypeError, self._local, a=1)
124 self.assertRaises(TypeError, self._local, 1)
125
126 def _test_one_class(self, c):
127 self._failed = "No error message set or cleared."
128 obj = c()
129 e1 = threading.Event()
130 e2 = threading.Event()
131
132 def f1():
133 obj.x = 'foo'
134 obj.y = 'bar'
135 del obj.y
136 e1.set()
137 e2.wait()
138
139 def f2():
140 try:
141 foo = obj.x
142 except AttributeError:
143 # This is expected -- we haven't set obj.x in this thread yet!
144 self._failed = "" # passed
145 else:
146 self._failed = ('Incorrectly got value %r from class %r\n' %
147 (foo, c))
148 sys.stderr.write(self._failed)
149
150 t1 = threading.Thread(target=f1)
151 t1.start()
152 e1.wait()
153 t2 = threading.Thread(target=f2)
154 t2.start()
155 t2.join()
156 # The test is done; just let t1 know it can exit, and wait for it.
157 e2.set()
158 t1.join()
159
160 self.assertFalse(self._failed, self._failed)
161
162 def test_threading_local(self):
163 self._test_one_class(self._local)
164
165 def test_threading_local_subclass(self):
166 class ESC[4;38;5;81mLocalSubclass(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
167 """To test that subclasses behave properly."""
168 self._test_one_class(LocalSubclass)
169
170 def _test_dict_attribute(self, cls):
171 obj = cls()
172 obj.x = 5
173 self.assertEqual(obj.__dict__, {'x': 5})
174 with self.assertRaises(AttributeError):
175 obj.__dict__ = {}
176 with self.assertRaises(AttributeError):
177 del obj.__dict__
178
179 def test_dict_attribute(self):
180 self._test_dict_attribute(self._local)
181
182 def test_dict_attribute_subclass(self):
183 class ESC[4;38;5;81mLocalSubclass(ESC[4;38;5;149mselfESC[4;38;5;149m.ESC[4;38;5;149m_local):
184 """To test that subclasses behave properly."""
185 self._test_dict_attribute(LocalSubclass)
186
187 def test_cycle_collection(self):
188 class ESC[4;38;5;81mX:
189 pass
190
191 x = X()
192 x.local = self._local()
193 x.local.x = x
194 wr = weakref.ref(x)
195 del x
196 support.gc_collect() # For PyPy or other GCs.
197 self.assertIsNone(wr())
198
199
200 def test_threading_local_clear_race(self):
201 # See https://github.com/python/cpython/issues/100892
202
203 _testcapi = import_module('_testcapi')
204 _testcapi.call_in_temporary_c_thread(lambda: None, False)
205
206 for _ in range(1000):
207 _ = threading.local()
208
209 _testcapi.join_temporary_c_thread()
210
211
212 class ESC[4;38;5;81mThreadLocalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseLocalTest):
213 _local = _thread._local
214
215 class ESC[4;38;5;81mPyThreadingLocalTest(ESC[4;38;5;149munittestESC[4;38;5;149m.ESC[4;38;5;149mTestCase, ESC[4;38;5;149mBaseLocalTest):
216 _local = _threading_local.local
217
218
219 def load_tests(loader, tests, pattern):
220 tests.addTest(DocTestSuite('_threading_local'))
221
222 local_orig = _threading_local.local
223 def setUp(test):
224 _threading_local.local = _thread._local
225 def tearDown(test):
226 _threading_local.local = local_orig
227 tests.addTests(DocTestSuite('_threading_local',
228 setUp=setUp, tearDown=tearDown)
229 )
230 return tests
231
232
233 if __name__ == '__main__':
234 unittest.main()