1 import signal
2 import sys
3 import threading
4 import time
5 import unittest
6 from concurrent import futures
7
8 from test import support
9 from test.support.script_helper import assert_python_ok
10
11 from .util import (
12 BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
13 ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
14 create_executor_tests, setup_module)
15
16
17 def sleep_and_print(t, msg):
18 time.sleep(t)
19 print(msg)
20 sys.stdout.flush()
21
22
23 class ESC[4;38;5;81mExecutorShutdownTest:
24 def test_run_after_shutdown(self):
25 self.executor.shutdown()
26 self.assertRaises(RuntimeError,
27 self.executor.submit,
28 pow, 2, 5)
29
30 def test_interpreter_shutdown(self):
31 # Test the atexit hook for shutdown of worker threads and processes
32 rc, out, err = assert_python_ok('-c', """if 1:
33 from concurrent.futures import {executor_type}
34 from time import sleep
35 from test.test_concurrent_futures.test_shutdown import sleep_and_print
36 if __name__ == "__main__":
37 context = '{context}'
38 if context == "":
39 t = {executor_type}(5)
40 else:
41 from multiprocessing import get_context
42 context = get_context(context)
43 t = {executor_type}(5, mp_context=context)
44 t.submit(sleep_and_print, 1.0, "apple")
45 """.format(executor_type=self.executor_type.__name__,
46 context=getattr(self, "ctx", "")))
47 # Errors in atexit hooks don't change the process exit code, check
48 # stderr manually.
49 self.assertFalse(err)
50 self.assertEqual(out.strip(), b"apple")
51
52 def test_submit_after_interpreter_shutdown(self):
53 # Test the atexit hook for shutdown of worker threads and processes
54 rc, out, err = assert_python_ok('-c', """if 1:
55 import atexit
56 @atexit.register
57 def run_last():
58 try:
59 t.submit(id, None)
60 except RuntimeError:
61 print("runtime-error")
62 raise
63 from concurrent.futures import {executor_type}
64 if __name__ == "__main__":
65 context = '{context}'
66 if not context:
67 t = {executor_type}(5)
68 else:
69 from multiprocessing import get_context
70 context = get_context(context)
71 t = {executor_type}(5, mp_context=context)
72 t.submit(id, 42).result()
73 """.format(executor_type=self.executor_type.__name__,
74 context=getattr(self, "ctx", "")))
75 # Errors in atexit hooks don't change the process exit code, check
76 # stderr manually.
77 self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
78 self.assertEqual(out.strip(), b"runtime-error")
79
80 def test_hang_issue12364(self):
81 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
82 self.executor.shutdown()
83 for f in fs:
84 f.result()
85
86 def test_cancel_futures(self):
87 assert self.worker_count <= 5, "test needs few workers"
88 fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
89 self.executor.shutdown(cancel_futures=True)
90 # We can't guarantee the exact number of cancellations, but we can
91 # guarantee that *some* were cancelled. With few workers, many of
92 # the submitted futures should have been cancelled.
93 cancelled = [fut for fut in fs if fut.cancelled()]
94 self.assertGreater(len(cancelled), 20)
95
96 # Ensure the other futures were able to finish.
97 # Use "not fut.cancelled()" instead of "fut.done()" to include futures
98 # that may have been left in a pending state.
99 others = [fut for fut in fs if not fut.cancelled()]
100 for fut in others:
101 self.assertTrue(fut.done(), msg=f"{fut._state=}")
102 self.assertIsNone(fut.exception())
103
104 # Similar to the number of cancelled futures, we can't guarantee the
105 # exact number that completed. But, we can guarantee that at least
106 # one finished.
107 self.assertGreater(len(others), 0)
108
109 def test_hang_gh83386(self):
110 """shutdown(wait=False) doesn't hang at exit with running futures.
111
112 See https://github.com/python/cpython/issues/83386.
113 """
114 if self.executor_type == futures.ProcessPoolExecutor:
115 raise unittest.SkipTest(
116 "Hangs, see https://github.com/python/cpython/issues/83386")
117
118 rc, out, err = assert_python_ok('-c', """if True:
119 from concurrent.futures import {executor_type}
120 from test.test_concurrent_futures.test_shutdown import sleep_and_print
121 if __name__ == "__main__":
122 if {context!r}: multiprocessing.set_start_method({context!r})
123 t = {executor_type}(max_workers=3)
124 t.submit(sleep_and_print, 1.0, "apple")
125 t.shutdown(wait=False)
126 """.format(executor_type=self.executor_type.__name__,
127 context=getattr(self, 'ctx', None)))
128 self.assertFalse(err)
129 self.assertEqual(out.strip(), b"apple")
130
131 def test_hang_gh94440(self):
132 """shutdown(wait=True) doesn't hang when a future was submitted and
133 quickly canceled right before shutdown.
134
135 See https://github.com/python/cpython/issues/94440.
136 """
137 if not hasattr(signal, 'alarm'):
138 raise unittest.SkipTest(
139 "Tested platform does not support the alarm signal")
140
141 def timeout(_signum, _frame):
142 raise RuntimeError("timed out waiting for shutdown")
143
144 kwargs = {}
145 if getattr(self, 'ctx', None):
146 kwargs['mp_context'] = self.get_context()
147 executor = self.executor_type(max_workers=1, **kwargs)
148 executor.submit(int).result()
149 old_handler = signal.signal(signal.SIGALRM, timeout)
150 try:
151 signal.alarm(5)
152 executor.submit(int).cancel()
153 executor.shutdown(wait=True)
154 finally:
155 signal.alarm(0)
156 signal.signal(signal.SIGALRM, old_handler)
157
158
159 class ESC[4;38;5;81mThreadPoolShutdownTest(ESC[4;38;5;149mThreadPoolMixin, ESC[4;38;5;149mExecutorShutdownTest, ESC[4;38;5;149mBaseTestCase):
160 def test_threads_terminate(self):
161 def acquire_lock(lock):
162 lock.acquire()
163
164 sem = threading.Semaphore(0)
165 for i in range(3):
166 self.executor.submit(acquire_lock, sem)
167 self.assertEqual(len(self.executor._threads), 3)
168 for i in range(3):
169 sem.release()
170 self.executor.shutdown()
171 for t in self.executor._threads:
172 t.join()
173
174 def test_context_manager_shutdown(self):
175 with futures.ThreadPoolExecutor(max_workers=5) as e:
176 executor = e
177 self.assertEqual(list(e.map(abs, range(-5, 5))),
178 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
179
180 for t in executor._threads:
181 t.join()
182
183 def test_del_shutdown(self):
184 executor = futures.ThreadPoolExecutor(max_workers=5)
185 res = executor.map(abs, range(-5, 5))
186 threads = executor._threads
187 del executor
188
189 for t in threads:
190 t.join()
191
192 # Make sure the results were all computed before the
193 # executor got shutdown.
194 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
195
196 def test_shutdown_no_wait(self):
197 # Ensure that the executor cleans up the threads when calling
198 # shutdown with wait=False
199 executor = futures.ThreadPoolExecutor(max_workers=5)
200 res = executor.map(abs, range(-5, 5))
201 threads = executor._threads
202 executor.shutdown(wait=False)
203 for t in threads:
204 t.join()
205
206 # Make sure the results were all computed before the
207 # executor got shutdown.
208 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
209
210
211 def test_thread_names_assigned(self):
212 executor = futures.ThreadPoolExecutor(
213 max_workers=5, thread_name_prefix='SpecialPool')
214 executor.map(abs, range(-5, 5))
215 threads = executor._threads
216 del executor
217 support.gc_collect() # For PyPy or other GCs.
218
219 for t in threads:
220 self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
221 t.join()
222
223 def test_thread_names_default(self):
224 executor = futures.ThreadPoolExecutor(max_workers=5)
225 executor.map(abs, range(-5, 5))
226 threads = executor._threads
227 del executor
228 support.gc_collect() # For PyPy or other GCs.
229
230 for t in threads:
231 # Ensure that our default name is reasonably sane and unique when
232 # no thread_name_prefix was supplied.
233 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
234 t.join()
235
236 def test_cancel_futures_wait_false(self):
237 # Can only be reliably tested for TPE, since PPE often hangs with
238 # `wait=False` (even without *cancel_futures*).
239 rc, out, err = assert_python_ok('-c', """if True:
240 from concurrent.futures import ThreadPoolExecutor
241 from test.test_concurrent_futures.test_shutdown import sleep_and_print
242 if __name__ == "__main__":
243 t = ThreadPoolExecutor()
244 t.submit(sleep_and_print, .1, "apple")
245 t.shutdown(wait=False, cancel_futures=True)
246 """)
247 # Errors in atexit hooks don't change the process exit code, check
248 # stderr manually.
249 self.assertFalse(err)
250 self.assertEqual(out.strip(), b"apple")
251
252
253 class ESC[4;38;5;81mProcessPoolShutdownTest(ESC[4;38;5;149mExecutorShutdownTest):
254 def test_processes_terminate(self):
255 def acquire_lock(lock):
256 lock.acquire()
257
258 mp_context = self.get_context()
259 if mp_context.get_start_method(allow_none=False) == "fork":
260 # fork pre-spawns, not on demand.
261 expected_num_processes = self.worker_count
262 else:
263 expected_num_processes = 3
264
265 sem = mp_context.Semaphore(0)
266 for _ in range(3):
267 self.executor.submit(acquire_lock, sem)
268 self.assertEqual(len(self.executor._processes), expected_num_processes)
269 for _ in range(3):
270 sem.release()
271 processes = self.executor._processes
272 self.executor.shutdown()
273
274 for p in processes.values():
275 p.join()
276
277 def test_context_manager_shutdown(self):
278 with futures.ProcessPoolExecutor(
279 max_workers=5, mp_context=self.get_context()) as e:
280 processes = e._processes
281 self.assertEqual(list(e.map(abs, range(-5, 5))),
282 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
283
284 for p in processes.values():
285 p.join()
286
287 def test_del_shutdown(self):
288 executor = futures.ProcessPoolExecutor(
289 max_workers=5, mp_context=self.get_context())
290 res = executor.map(abs, range(-5, 5))
291 executor_manager_thread = executor._executor_manager_thread
292 processes = executor._processes
293 call_queue = executor._call_queue
294 executor_manager_thread = executor._executor_manager_thread
295 del executor
296 support.gc_collect() # For PyPy or other GCs.
297
298 # Make sure that all the executor resources were properly cleaned by
299 # the shutdown process
300 executor_manager_thread.join()
301 for p in processes.values():
302 p.join()
303 call_queue.join_thread()
304
305 # Make sure the results were all computed before the
306 # executor got shutdown.
307 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
308
309 def test_shutdown_no_wait(self):
310 # Ensure that the executor cleans up the processes when calling
311 # shutdown with wait=False
312 executor = futures.ProcessPoolExecutor(
313 max_workers=5, mp_context=self.get_context())
314 res = executor.map(abs, range(-5, 5))
315 processes = executor._processes
316 call_queue = executor._call_queue
317 executor_manager_thread = executor._executor_manager_thread
318 executor.shutdown(wait=False)
319
320 # Make sure that all the executor resources were properly cleaned by
321 # the shutdown process
322 executor_manager_thread.join()
323 for p in processes.values():
324 p.join()
325 call_queue.join_thread()
326
327 # Make sure the results were all computed before the executor got
328 # shutdown.
329 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
330
331
332 create_executor_tests(globals(), ProcessPoolShutdownTest,
333 executor_mixins=(ProcessPoolForkMixin,
334 ProcessPoolForkserverMixin,
335 ProcessPoolSpawnMixin))
336
337
338 def setUpModule():
339 setup_module()
340
341
342 if __name__ == "__main__":
343 unittest.main()