1 import _thread
2 import contextlib
3 import functools
4 import sys
5 import threading
6 import time
7 import unittest
8
9 from test import support
10
11
12 #=======================================================================
13 # Threading support to prevent reporting refleaks when running regrtest.py -R
14
15 # NOTE: we use thread._count() rather than threading.enumerate() (or the
16 # moral equivalent thereof) because a threading.Thread object is still alive
17 # until its __bootstrap() method has returned, even after it has been
18 # unregistered from the threading module.
19 # thread._count(), on the other hand, only gets decremented *after* the
20 # __bootstrap() method has returned, which gives us reliable reference counts
21 # at the end of a test run.
22
23
24 def threading_setup():
25 return _thread._count(), len(threading._dangling)
26
27
28 def threading_cleanup(*original_values):
29 orig_count, orig_ndangling = original_values
30
31 timeout = 1.0
32 for _ in support.sleeping_retry(timeout, error=False):
33 # Copy the thread list to get a consistent output. threading._dangling
34 # is a WeakSet, its value changes when it's read.
35 dangling_threads = list(threading._dangling)
36 count = _thread._count()
37
38 if count <= orig_count:
39 return
40
41 # Timeout!
42 support.environment_altered = True
43 support.print_warning(
44 f"threading_cleanup() failed to clean up threads "
45 f"in {timeout:.1f} seconds\n"
46 f" before: thread count={orig_count}, dangling={orig_ndangling}\n"
47 f" after: thread count={count}, dangling={len(dangling_threads)}")
48 for thread in dangling_threads:
49 support.print_warning(f"Dangling thread: {thread!r}")
50
51 # The warning happens when a test spawns threads and some of these threads
52 # are still running after the test completes. To fix this warning, join
53 # threads explicitly to wait until they complete.
54 #
55 # To make the warning more likely, reduce the timeout.
56
57
58 def reap_threads(func):
59 """Use this function when threads are being used. This will
60 ensure that the threads are cleaned up even when the test fails.
61 """
62 @functools.wraps(func)
63 def decorator(*args):
64 key = threading_setup()
65 try:
66 return func(*args)
67 finally:
68 threading_cleanup(*key)
69 return decorator
70
71
72 @contextlib.contextmanager
73 def wait_threads_exit(timeout=None):
74 """
75 bpo-31234: Context manager to wait until all threads created in the with
76 statement exit.
77
78 Use _thread.count() to check if threads exited. Indirectly, wait until
79 threads exit the internal t_bootstrap() C function of the _thread module.
80
81 threading_setup() and threading_cleanup() are designed to emit a warning
82 if a test leaves running threads in the background. This context manager
83 is designed to cleanup threads started by the _thread.start_new_thread()
84 which doesn't allow to wait for thread exit, whereas thread.Thread has a
85 join() method.
86 """
87 if timeout is None:
88 timeout = support.SHORT_TIMEOUT
89 old_count = _thread._count()
90 try:
91 yield
92 finally:
93 start_time = time.monotonic()
94 for _ in support.sleeping_retry(timeout, error=False):
95 support.gc_collect()
96 count = _thread._count()
97 if count <= old_count:
98 break
99 else:
100 dt = time.monotonic() - start_time
101 msg = (f"wait_threads() failed to cleanup {count - old_count} "
102 f"threads after {dt:.1f} seconds "
103 f"(count: {count}, old count: {old_count})")
104 raise AssertionError(msg)
105
106
107 def join_thread(thread, timeout=None):
108 """Join a thread. Raise an AssertionError if the thread is still alive
109 after timeout seconds.
110 """
111 if timeout is None:
112 timeout = support.SHORT_TIMEOUT
113 thread.join(timeout)
114 if thread.is_alive():
115 msg = f"failed to join the thread in {timeout:.1f} seconds"
116 raise AssertionError(msg)
117
118
119 @contextlib.contextmanager
120 def start_threads(threads, unlock=None):
121 import faulthandler
122 threads = list(threads)
123 started = []
124 try:
125 try:
126 for t in threads:
127 t.start()
128 started.append(t)
129 except:
130 if support.verbose:
131 print("Can't start %d threads, only %d threads started" %
132 (len(threads), len(started)))
133 raise
134 yield
135 finally:
136 try:
137 if unlock:
138 unlock()
139 endtime = time.monotonic()
140 for timeout in range(1, 16):
141 endtime += 60
142 for t in started:
143 t.join(max(endtime - time.monotonic(), 0.01))
144 started = [t for t in started if t.is_alive()]
145 if not started:
146 break
147 if support.verbose:
148 print('Unable to join %d threads during a period of '
149 '%d minutes' % (len(started), timeout))
150 finally:
151 started = [t for t in started if t.is_alive()]
152 if started:
153 faulthandler.dump_traceback(sys.stdout)
154 raise AssertionError('Unable to join %d threads' % len(started))
155
156
157 class ESC[4;38;5;81mcatch_threading_exception:
158 """
159 Context manager catching threading.Thread exception using
160 threading.excepthook.
161
162 Attributes set when an exception is caught:
163
164 * exc_type
165 * exc_value
166 * exc_traceback
167 * thread
168
169 See threading.excepthook() documentation for these attributes.
170
171 These attributes are deleted at the context manager exit.
172
173 Usage:
174
175 with threading_helper.catch_threading_exception() as cm:
176 # code spawning a thread which raises an exception
177 ...
178
179 # check the thread exception, use cm attributes:
180 # exc_type, exc_value, exc_traceback, thread
181 ...
182
183 # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
184 # exists at this point
185 # (to avoid reference cycles)
186 """
187
188 def __init__(self):
189 self.exc_type = None
190 self.exc_value = None
191 self.exc_traceback = None
192 self.thread = None
193 self._old_hook = None
194
195 def _hook(self, args):
196 self.exc_type = args.exc_type
197 self.exc_value = args.exc_value
198 self.exc_traceback = args.exc_traceback
199 self.thread = args.thread
200
201 def __enter__(self):
202 self._old_hook = threading.excepthook
203 threading.excepthook = self._hook
204 return self
205
206 def __exit__(self, *exc_info):
207 threading.excepthook = self._old_hook
208 del self.exc_type
209 del self.exc_value
210 del self.exc_traceback
211 del self.thread
212
213
214 def _can_start_thread() -> bool:
215 """Detect whether Python can start new threads.
216
217 Some WebAssembly platforms do not provide a working pthread
218 implementation. Thread support is stubbed and any attempt
219 to create a new thread fails.
220
221 - wasm32-wasi does not have threading.
222 - wasm32-emscripten can be compiled with or without pthread
223 support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
224 """
225 if sys.platform == "emscripten":
226 return sys._emscripten_info.pthreads
227 elif sys.platform == "wasi":
228 return False
229 else:
230 # assume all other platforms have working thread support.
231 return True
232
233 can_start_thread = _can_start_thread()
234
235 def requires_working_threading(*, module=False):
236 """Skip tests or modules that require working threading.
237
238 Can be used as a function/class decorator or to skip an entire module.
239 """
240 msg = "requires threading support"
241 if module:
242 if not can_start_thread:
243 raise unittest.SkipTest(msg)
244 else:
245 return unittest.skipUnless(can_start_thread, msg)