python (3.12.0)
1 import _thread
2 import contextlib
3 import functools
4 import sys
5 import threading
6 import time
7 import unittest
8
9 from test import support
10
11
12 #=======================================================================
13 # Threading support to prevent reporting refleaks when running regrtest.py -R
14
15 # NOTE: we use thread._count() rather than threading.enumerate() (or the
16 # moral equivalent thereof) because a threading.Thread object is still alive
17 # until its __bootstrap() method has returned, even after it has been
18 # unregistered from the threading module.
19 # thread._count(), on the other hand, only gets decremented *after* the
20 # __bootstrap() method has returned, which gives us reliable reference counts
21 # at the end of a test run.
22
23
24 def threading_setup():
25 return _thread._count(), threading._dangling.copy()
26
27
28 def threading_cleanup(*original_values):
29 _MAX_COUNT = 100
30
31 for count in range(_MAX_COUNT):
32 values = _thread._count(), threading._dangling
33 if values == original_values:
34 break
35
36 if not count:
37 # Display a warning at the first iteration
38 support.environment_altered = True
39 dangling_threads = values[1]
40 support.print_warning(f"threading_cleanup() failed to cleanup "
41 f"{values[0] - original_values[0]} threads "
42 f"(count: {values[0]}, "
43 f"dangling: {len(dangling_threads)})")
44 for thread in dangling_threads:
45 support.print_warning(f"Dangling thread: {thread!r}")
46
47 # Don't hold references to threads
48 dangling_threads = None
49 values = None
50
51 time.sleep(0.01)
52 support.gc_collect()
53
54
55 def reap_threads(func):
56 """Use this function when threads are being used. This will
57 ensure that the threads are cleaned up even when the test fails.
58 """
59 @functools.wraps(func)
60 def decorator(*args):
61 key = threading_setup()
62 try:
63 return func(*args)
64 finally:
65 threading_cleanup(*key)
66 return decorator
67
68
69 @contextlib.contextmanager
70 def wait_threads_exit(timeout=None):
71 """
72 bpo-31234: Context manager to wait until all threads created in the with
73 statement exit.
74
75 Use _thread.count() to check if threads exited. Indirectly, wait until
76 threads exit the internal t_bootstrap() C function of the _thread module.
77
78 threading_setup() and threading_cleanup() are designed to emit a warning
79 if a test leaves running threads in the background. This context manager
80 is designed to cleanup threads started by the _thread.start_new_thread()
81 which doesn't allow to wait for thread exit, whereas thread.Thread has a
82 join() method.
83 """
84 if timeout is None:
85 timeout = support.SHORT_TIMEOUT
86 old_count = _thread._count()
87 try:
88 yield
89 finally:
90 start_time = time.monotonic()
91 for _ in support.sleeping_retry(timeout, error=False):
92 support.gc_collect()
93 count = _thread._count()
94 if count <= old_count:
95 break
96 else:
97 dt = time.monotonic() - start_time
98 msg = (f"wait_threads() failed to cleanup {count - old_count} "
99 f"threads after {dt:.1f} seconds "
100 f"(count: {count}, old count: {old_count})")
101 raise AssertionError(msg)
102
103
104 def join_thread(thread, timeout=None):
105 """Join a thread. Raise an AssertionError if the thread is still alive
106 after timeout seconds.
107 """
108 if timeout is None:
109 timeout = support.SHORT_TIMEOUT
110 thread.join(timeout)
111 if thread.is_alive():
112 msg = f"failed to join the thread in {timeout:.1f} seconds"
113 raise AssertionError(msg)
114
115
116 @contextlib.contextmanager
117 def start_threads(threads, unlock=None):
118 try:
119 import faulthandler
120 except ImportError:
121 # It isn't supported on subinterpreters yet.
122 faulthandler = None
123 threads = list(threads)
124 started = []
125 try:
126 try:
127 for t in threads:
128 t.start()
129 started.append(t)
130 except:
131 if support.verbose:
132 print("Can't start %d threads, only %d threads started" %
133 (len(threads), len(started)))
134 raise
135 yield
136 finally:
137 try:
138 if unlock:
139 unlock()
140 endtime = time.monotonic()
141 for timeout in range(1, 16):
142 endtime += 60
143 for t in started:
144 t.join(max(endtime - time.monotonic(), 0.01))
145 started = [t for t in started if t.is_alive()]
146 if not started:
147 break
148 if support.verbose:
149 print('Unable to join %d threads during a period of '
150 '%d minutes' % (len(started), timeout))
151 finally:
152 started = [t for t in started if t.is_alive()]
153 if started:
154 if faulthandler is not None:
155 faulthandler.dump_traceback(sys.stdout)
156 raise AssertionError('Unable to join %d threads' % len(started))
157
158
159 class ESC[4;38;5;81mcatch_threading_exception:
160 """
161 Context manager catching threading.Thread exception using
162 threading.excepthook.
163
164 Attributes set when an exception is caught:
165
166 * exc_type
167 * exc_value
168 * exc_traceback
169 * thread
170
171 See threading.excepthook() documentation for these attributes.
172
173 These attributes are deleted at the context manager exit.
174
175 Usage:
176
177 with threading_helper.catch_threading_exception() as cm:
178 # code spawning a thread which raises an exception
179 ...
180
181 # check the thread exception, use cm attributes:
182 # exc_type, exc_value, exc_traceback, thread
183 ...
184
185 # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
186 # exists at this point
187 # (to avoid reference cycles)
188 """
189
190 def __init__(self):
191 self.exc_type = None
192 self.exc_value = None
193 self.exc_traceback = None
194 self.thread = None
195 self._old_hook = None
196
197 def _hook(self, args):
198 self.exc_type = args.exc_type
199 self.exc_value = args.exc_value
200 self.exc_traceback = args.exc_traceback
201 self.thread = args.thread
202
203 def __enter__(self):
204 self._old_hook = threading.excepthook
205 threading.excepthook = self._hook
206 return self
207
208 def __exit__(self, *exc_info):
209 threading.excepthook = self._old_hook
210 del self.exc_type
211 del self.exc_value
212 del self.exc_traceback
213 del self.thread
214
215
216 def _can_start_thread() -> bool:
217 """Detect whether Python can start new threads.
218
219 Some WebAssembly platforms do not provide a working pthread
220 implementation. Thread support is stubbed and any attempt
221 to create a new thread fails.
222
223 - wasm32-wasi does not have threading.
224 - wasm32-emscripten can be compiled with or without pthread
225 support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
226 """
227 if sys.platform == "emscripten":
228 return sys._emscripten_info.pthreads
229 elif sys.platform == "wasi":
230 return False
231 else:
232 # assume all other platforms have working thread support.
233 return True
234
235 can_start_thread = _can_start_thread()
236
237 def requires_working_threading(*, module=False):
238 """Skip tests or modules that require working threading.
239
240 Can be used as a function/class decorator or to skip an entire module.
241 """
242 msg = "requires threading support"
243 if module:
244 if not can_start_thread:
245 raise unittest.SkipTest(msg)
246 else:
247 return unittest.skipUnless(can_start_thread, msg)