1 import contextlib
2 import queue
3 import signal
4 import sys
5 import time
6 import unittest
7 import unittest.mock
8 from pickle import PicklingError
9 from concurrent import futures
10 from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
11
12 from test import support
13
14 from .util import (
15 create_executor_tests, setup_module,
16 ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin)
17
18
19 def _crash(delay=None):
20 """Induces a segfault."""
21 if delay:
22 time.sleep(delay)
23 import faulthandler
24 faulthandler.disable()
25 faulthandler._sigsegv()
26
27
28 def _crash_with_data(data):
29 """Induces a segfault with dummy data in input."""
30 _crash()
31
32
33 def _exit():
34 """Induces a sys exit with exitcode 1."""
35 sys.exit(1)
36
37
38 def _raise_error(Err):
39 """Function that raises an Exception in process."""
40 raise Err()
41
42
43 def _raise_error_ignore_stderr(Err):
44 """Function that raises an Exception in process and ignores stderr."""
45 import io
46 sys.stderr = io.StringIO()
47 raise Err()
48
49
50 def _return_instance(cls):
51 """Function that returns a instance of cls."""
52 return cls()
53
54
55 class ESC[4;38;5;81mCrashAtPickle(ESC[4;38;5;149mobject):
56 """Bad object that triggers a segfault at pickling time."""
57 def __reduce__(self):
58 _crash()
59
60
61 class ESC[4;38;5;81mCrashAtUnpickle(ESC[4;38;5;149mobject):
62 """Bad object that triggers a segfault at unpickling time."""
63 def __reduce__(self):
64 return _crash, ()
65
66
67 class ESC[4;38;5;81mExitAtPickle(ESC[4;38;5;149mobject):
68 """Bad object that triggers a process exit at pickling time."""
69 def __reduce__(self):
70 _exit()
71
72
73 class ESC[4;38;5;81mExitAtUnpickle(ESC[4;38;5;149mobject):
74 """Bad object that triggers a process exit at unpickling time."""
75 def __reduce__(self):
76 return _exit, ()
77
78
79 class ESC[4;38;5;81mErrorAtPickle(ESC[4;38;5;149mobject):
80 """Bad object that triggers an error at pickling time."""
81 def __reduce__(self):
82 from pickle import PicklingError
83 raise PicklingError("Error in pickle")
84
85
86 class ESC[4;38;5;81mErrorAtUnpickle(ESC[4;38;5;149mobject):
87 """Bad object that triggers an error at unpickling time."""
88 def __reduce__(self):
89 from pickle import UnpicklingError
90 return _raise_error_ignore_stderr, (UnpicklingError, )
91
92
93 class ESC[4;38;5;81mExecutorDeadlockTest:
94 TIMEOUT = support.LONG_TIMEOUT
95
96 def _fail_on_deadlock(self, executor):
97 # If we did not recover before TIMEOUT seconds, consider that the
98 # executor is in a deadlock state and forcefully clean all its
99 # composants.
100 import faulthandler
101 from tempfile import TemporaryFile
102 with TemporaryFile(mode="w+") as f:
103 faulthandler.dump_traceback(file=f)
104 f.seek(0)
105 tb = f.read()
106 for p in executor._processes.values():
107 p.terminate()
108 # This should be safe to call executor.shutdown here as all possible
109 # deadlocks should have been broken.
110 executor.shutdown(wait=True)
111 print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
112 self.fail(f"Executor deadlock:\n\n{tb}")
113
114
115 def _check_crash(self, error, func, *args, ignore_stderr=False):
116 # test for deadlock caused by crashes in a pool
117 self.executor.shutdown(wait=True)
118
119 executor = self.executor_type(
120 max_workers=2, mp_context=self.get_context())
121 res = executor.submit(func, *args)
122
123 if ignore_stderr:
124 cm = support.captured_stderr()
125 else:
126 cm = contextlib.nullcontext()
127
128 try:
129 with self.assertRaises(error):
130 with cm:
131 res.result(timeout=self.TIMEOUT)
132 except futures.TimeoutError:
133 # If we did not recover before TIMEOUT seconds,
134 # consider that the executor is in a deadlock state
135 self._fail_on_deadlock(executor)
136 executor.shutdown(wait=True)
137
138 def test_error_at_task_pickle(self):
139 # Check problem occurring while pickling a task in
140 # the task_handler thread
141 self._check_crash(PicklingError, id, ErrorAtPickle())
142
143 def test_exit_at_task_unpickle(self):
144 # Check problem occurring while unpickling a task on workers
145 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
146
147 def test_error_at_task_unpickle(self):
148 # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr()
149 self.addCleanup(setattr, sys, 'stderr', sys.stderr)
150
151 # Check problem occurring while unpickling a task on workers
152 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
153
154 def test_crash_at_task_unpickle(self):
155 # Check problem occurring while unpickling a task on workers
156 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
157
158 def test_crash_during_func_exec_on_worker(self):
159 # Check problem occurring during func execution on workers
160 self._check_crash(BrokenProcessPool, _crash)
161
162 def test_exit_during_func_exec_on_worker(self):
163 # Check problem occurring during func execution on workers
164 self._check_crash(SystemExit, _exit)
165
166 def test_error_during_func_exec_on_worker(self):
167 # Check problem occurring during func execution on workers
168 self._check_crash(RuntimeError, _raise_error, RuntimeError)
169
170 def test_crash_during_result_pickle_on_worker(self):
171 # Check problem occurring while pickling a task result
172 # on workers
173 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
174
175 def test_exit_during_result_pickle_on_worker(self):
176 # Check problem occurring while pickling a task result
177 # on workers
178 self._check_crash(SystemExit, _return_instance, ExitAtPickle)
179
180 def test_error_during_result_pickle_on_worker(self):
181 # Check problem occurring while pickling a task result
182 # on workers
183 self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
184
185 def test_error_during_result_unpickle_in_result_handler(self):
186 # gh-109832: Restore stderr overriden by _raise_error_ignore_stderr()
187 self.addCleanup(setattr, sys, 'stderr', sys.stderr)
188
189 # Check problem occurring while unpickling a task in
190 # the result_handler thread
191 self._check_crash(BrokenProcessPool,
192 _return_instance, ErrorAtUnpickle,
193 ignore_stderr=True)
194
195 def test_exit_during_result_unpickle_in_result_handler(self):
196 # Check problem occurring while unpickling a task in
197 # the result_handler thread
198 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
199
200 def test_shutdown_deadlock(self):
201 # Test that the pool calling shutdown do not cause deadlock
202 # if a worker fails after the shutdown call.
203 self.executor.shutdown(wait=True)
204 with self.executor_type(max_workers=2,
205 mp_context=self.get_context()) as executor:
206 self.executor = executor # Allow clean up in fail_on_deadlock
207 f = executor.submit(_crash, delay=.1)
208 executor.shutdown(wait=True)
209 with self.assertRaises(BrokenProcessPool):
210 f.result()
211
212 def test_shutdown_deadlock_pickle(self):
213 # Test that the pool calling shutdown with wait=False does not cause
214 # a deadlock if a task fails at pickle after the shutdown call.
215 # Reported in bpo-39104.
216 self.executor.shutdown(wait=True)
217 with self.executor_type(max_workers=2,
218 mp_context=self.get_context()) as executor:
219 self.executor = executor # Allow clean up in fail_on_deadlock
220
221 # Start the executor and get the executor_manager_thread to collect
222 # the threads and avoid dangling thread that should be cleaned up
223 # asynchronously.
224 executor.submit(id, 42).result()
225 executor_manager = executor._executor_manager_thread
226
227 # Submit a task that fails at pickle and shutdown the executor
228 # without waiting
229 f = executor.submit(id, ErrorAtPickle())
230 executor.shutdown(wait=False)
231 with self.assertRaises(PicklingError):
232 f.result()
233
234 # Make sure the executor is eventually shutdown and do not leave
235 # dangling threads
236 executor_manager.join()
237
238 def test_crash_big_data(self):
239 # Test that there is a clean exception instad of a deadlock when a
240 # child process crashes while some data is being written into the
241 # queue.
242 # https://github.com/python/cpython/issues/94777
243 self.executor.shutdown(wait=True)
244 data = "a" * support.PIPE_MAX_SIZE
245 with self.executor_type(max_workers=2,
246 mp_context=self.get_context()) as executor:
247 self.executor = executor # Allow clean up in fail_on_deadlock
248 with self.assertRaises(BrokenProcessPool):
249 list(executor.map(_crash_with_data, [data] * 10))
250
251 def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
252 # Issue #105829: The _ExecutorManagerThread wakeup pipe could
253 # fill up and block. See: https://github.com/python/cpython/issues/105829
254
255 # Lots of cargo culting while writing this test, apologies if
256 # something is really stupid...
257
258 self.executor.shutdown(wait=True)
259
260 if not hasattr(signal, 'alarm'):
261 raise unittest.SkipTest(
262 "Tested platform does not support the alarm signal")
263
264 def timeout(_signum, _frame):
265 import faulthandler
266 faulthandler.dump_traceback()
267
268 raise RuntimeError("timed out while submitting jobs?")
269
270 thread_run = futures.process._ExecutorManagerThread.run
271 def mock_run(self):
272 # Delay thread startup so the wakeup pipe can fill up and block
273 time.sleep(3)
274 thread_run(self)
275
276 class ESC[4;38;5;81mMockWakeup(ESC[4;38;5;149m_ThreadWakeup):
277 """Mock wakeup object to force the wakeup to block"""
278 def __init__(self):
279 super().__init__()
280 self._dummy_queue = queue.Queue(maxsize=1)
281
282 def wakeup(self):
283 self._dummy_queue.put(None, block=True)
284 super().wakeup()
285
286 def clear(self):
287 super().clear()
288 try:
289 while True:
290 self._dummy_queue.get_nowait()
291 except queue.Empty:
292 pass
293
294 with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
295 'run', mock_run),
296 unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
297 MockWakeup)):
298 with self.executor_type(max_workers=2,
299 mp_context=self.get_context()) as executor:
300 self.executor = executor # Allow clean up in fail_on_deadlock
301
302 job_num = 100
303 job_data = range(job_num)
304
305 # Need to use sigalarm for timeout detection because
306 # Executor.submit is not guarded by any timeout (both
307 # self._work_ids.put(self._queue_count) and
308 # self._executor_manager_thread_wakeup.wakeup() might
309 # timeout, maybe more?). In this specific case it was
310 # the wakeup call that deadlocked on a blocking pipe.
311 old_handler = signal.signal(signal.SIGALRM, timeout)
312 try:
313 signal.alarm(int(self.TIMEOUT))
314 self.assertEqual(job_num, len(list(executor.map(int, job_data))))
315 finally:
316 signal.alarm(0)
317 signal.signal(signal.SIGALRM, old_handler)
318
319
320 create_executor_tests(globals(), ExecutorDeadlockTest,
321 executor_mixins=(ProcessPoolForkMixin,
322 ProcessPoolForkserverMixin,
323 ProcessPoolSpawnMixin))
324
325 def setUpModule():
326 setup_module()
327
328
329 if __name__ == "__main__":
330 unittest.main()