1 import os
2 import sys
3 import time
4 import unittest
5 from concurrent import futures
6 from concurrent.futures.process import BrokenProcessPool
7
8 from test import support
9 from test.support import hashlib_helper
10
11 from .executor import ExecutorTest, mul
12 from .util import (
13 ProcessPoolForkMixin, ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
14 create_executor_tests, setup_module)
15
16
17 class ESC[4;38;5;81mEventfulGCObj():
18 def __init__(self, mgr):
19 self.event = mgr.Event()
20
21 def __del__(self):
22 self.event.set()
23
24
25 class ESC[4;38;5;81mProcessPoolExecutorTest(ESC[4;38;5;149mExecutorTest):
26
27 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
28 def test_max_workers_too_large(self):
29 with self.assertRaisesRegex(ValueError,
30 "max_workers must be <= 61"):
31 futures.ProcessPoolExecutor(max_workers=62)
32
33 def test_killed_child(self):
34 # When a child process is abruptly terminated, the whole pool gets
35 # "broken".
36 futures = [self.executor.submit(time.sleep, 3)]
37 # Get one of the processes, and terminate (kill) it
38 p = next(iter(self.executor._processes.values()))
39 p.terminate()
40 for fut in futures:
41 self.assertRaises(BrokenProcessPool, fut.result)
42 # Submitting other jobs fails as well.
43 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
44
45 def test_map_chunksize(self):
46 def bad_map():
47 list(self.executor.map(pow, range(40), range(40), chunksize=-1))
48
49 ref = list(map(pow, range(40), range(40)))
50 self.assertEqual(
51 list(self.executor.map(pow, range(40), range(40), chunksize=6)),
52 ref)
53 self.assertEqual(
54 list(self.executor.map(pow, range(40), range(40), chunksize=50)),
55 ref)
56 self.assertEqual(
57 list(self.executor.map(pow, range(40), range(40), chunksize=40)),
58 ref)
59 self.assertRaises(ValueError, bad_map)
60
61 @classmethod
62 def _test_traceback(cls):
63 raise RuntimeError(123) # some comment
64
65 def test_traceback(self):
66 # We want ensure that the traceback from the child process is
67 # contained in the traceback raised in the main process.
68 future = self.executor.submit(self._test_traceback)
69 with self.assertRaises(Exception) as cm:
70 future.result()
71
72 exc = cm.exception
73 self.assertIs(type(exc), RuntimeError)
74 self.assertEqual(exc.args, (123,))
75 cause = exc.__cause__
76 self.assertIs(type(cause), futures.process._RemoteTraceback)
77 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
78
79 with support.captured_stderr() as f1:
80 try:
81 raise exc
82 except RuntimeError:
83 sys.excepthook(*sys.exc_info())
84 self.assertIn('raise RuntimeError(123) # some comment',
85 f1.getvalue())
86
87 @hashlib_helper.requires_hashdigest('md5')
88 def test_ressources_gced_in_workers(self):
89 # Ensure that argument for a job are correctly gc-ed after the job
90 # is finished
91 mgr = self.get_context().Manager()
92 obj = EventfulGCObj(mgr)
93 future = self.executor.submit(id, obj)
94 future.result()
95
96 self.assertTrue(obj.event.wait(timeout=1))
97
98 # explicitly destroy the object to ensure that EventfulGCObj.__del__()
99 # is called while manager is still running.
100 obj = None
101 support.gc_collect()
102
103 mgr.shutdown()
104 mgr.join()
105
106 def test_saturation(self):
107 executor = self.executor
108 mp_context = self.get_context()
109 sem = mp_context.Semaphore(0)
110 job_count = 15 * executor._max_workers
111 for _ in range(job_count):
112 executor.submit(sem.acquire)
113 self.assertEqual(len(executor._processes), executor._max_workers)
114 for _ in range(job_count):
115 sem.release()
116
117 def test_idle_process_reuse_one(self):
118 executor = self.executor
119 assert executor._max_workers >= 4
120 if self.get_context().get_start_method(allow_none=False) == "fork":
121 raise unittest.SkipTest("Incompatible with the fork start method.")
122 executor.submit(mul, 21, 2).result()
123 executor.submit(mul, 6, 7).result()
124 executor.submit(mul, 3, 14).result()
125 self.assertEqual(len(executor._processes), 1)
126
127 def test_idle_process_reuse_multiple(self):
128 executor = self.executor
129 assert executor._max_workers <= 5
130 if self.get_context().get_start_method(allow_none=False) == "fork":
131 raise unittest.SkipTest("Incompatible with the fork start method.")
132 executor.submit(mul, 12, 7).result()
133 executor.submit(mul, 33, 25)
134 executor.submit(mul, 25, 26).result()
135 executor.submit(mul, 18, 29)
136 executor.submit(mul, 1, 2).result()
137 executor.submit(mul, 0, 9)
138 self.assertLessEqual(len(executor._processes), 3)
139 executor.shutdown()
140
141 def test_max_tasks_per_child(self):
142 context = self.get_context()
143 if context.get_start_method(allow_none=False) == "fork":
144 with self.assertRaises(ValueError):
145 self.executor_type(1, mp_context=context, max_tasks_per_child=3)
146 return
147 # not using self.executor as we need to control construction.
148 # arguably this could go in another class w/o that mixin.
149 executor = self.executor_type(
150 1, mp_context=context, max_tasks_per_child=3)
151 f1 = executor.submit(os.getpid)
152 original_pid = f1.result()
153 # The worker pid remains the same as the worker could be reused
154 f2 = executor.submit(os.getpid)
155 self.assertEqual(f2.result(), original_pid)
156 self.assertEqual(len(executor._processes), 1)
157 f3 = executor.submit(os.getpid)
158 self.assertEqual(f3.result(), original_pid)
159
160 # A new worker is spawned, with a statistically different pid,
161 # while the previous was reaped.
162 f4 = executor.submit(os.getpid)
163 new_pid = f4.result()
164 self.assertNotEqual(original_pid, new_pid)
165 self.assertEqual(len(executor._processes), 1)
166
167 executor.shutdown()
168
169 def test_max_tasks_per_child_defaults_to_spawn_context(self):
170 # not using self.executor as we need to control construction.
171 # arguably this could go in another class w/o that mixin.
172 executor = self.executor_type(1, max_tasks_per_child=3)
173 self.assertEqual(executor._mp_context.get_start_method(), "spawn")
174
175 def test_max_tasks_early_shutdown(self):
176 context = self.get_context()
177 if context.get_start_method(allow_none=False) == "fork":
178 raise unittest.SkipTest("Incompatible with the fork start method.")
179 # not using self.executor as we need to control construction.
180 # arguably this could go in another class w/o that mixin.
181 executor = self.executor_type(
182 3, mp_context=context, max_tasks_per_child=1)
183 futures = []
184 for i in range(6):
185 futures.append(executor.submit(mul, i, i))
186 executor.shutdown()
187 for i, future in enumerate(futures):
188 self.assertEqual(future.result(), mul(i, i))
189
190
191 create_executor_tests(globals(), ProcessPoolExecutorTest,
192 executor_mixins=(ProcessPoolForkMixin,
193 ProcessPoolForkserverMixin,
194 ProcessPoolSpawnMixin))
195
196
197 def setUpModule():
198 setup_module()
199
200
201 if __name__ == "__main__":
202 unittest.main()