python (3.11.7)
1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
2 # Licensed to PSF under a Contributor Agreement.
3
4 """Implements ProcessPoolExecutor.
5
6 The following diagram and text describe the data-flow through the system:
7
8 |======================= In-process =====================|== Out-of-process ==|
9
10 +----------+ +----------+ +--------+ +-----------+ +---------+
11 | | => | Work Ids | | | | Call Q | | Process |
12 | | +----------+ | | +-----------+ | Pool |
13 | | | ... | | | | ... | +---------+
14 | | | 6 | => | | => | 5, call() | => | |
15 | | | 7 | | | | ... | | |
16 | Process | | ... | | Local | +-----------+ | Process |
17 | Pool | +----------+ | Worker | | #1..n |
18 | Executor | | Thread | | |
19 | | +----------- + | | +-----------+ | |
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | |
21 | | +------------+ | | +-----------+ | |
22 | | | 6: call() | | | | ... | | |
23 | | | future | | | | 4, result | | |
24 | | | ... | | | | 3, except | | |
25 +----------+ +------------+ +--------+ +-----------+ +---------+
26
27 Executor.submit() called:
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29 - adds the id of the _WorkItem to the "Work Ids" queue
30
31 Local worker thread:
32 - reads work ids from the "Work Ids" queue and looks up the corresponding
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then
34 it is simply removed from the dict, otherwise it is repackaged as a
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38 - reads _ResultItems from "Result Q", updates the future stored in the
39 "Work Items" dict and deletes the dict entry
40
41 Process #1..n:
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
43 _ResultItems in "Result Q"
44 """
45
46 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48 import os
49 from concurrent.futures import _base
50 import queue
51 import multiprocessing as mp
52 import multiprocessing.connection
53 from multiprocessing.queues import Queue
54 import threading
55 import weakref
56 from functools import partial
57 import itertools
58 import sys
59 from traceback import format_exception
60
61
62 _threads_wakeups = weakref.WeakKeyDictionary()
63 _global_shutdown = False
64
65
66 class ESC[4;38;5;81m_ThreadWakeup:
67 def __init__(self):
68 self._closed = False
69 self._reader, self._writer = mp.Pipe(duplex=False)
70
71 def close(self):
72 # Please note that we do not take the shutdown lock when
73 # calling clear() (to avoid deadlocking) so this method can
74 # only be called safely from the same thread as all calls to
75 # clear() even if you hold the shutdown lock. Otherwise we
76 # might try to read from the closed pipe.
77 if not self._closed:
78 self._closed = True
79 self._writer.close()
80 self._reader.close()
81
82 def wakeup(self):
83 if not self._closed:
84 self._writer.send_bytes(b"")
85
86 def clear(self):
87 if not self._closed:
88 while self._reader.poll():
89 self._reader.recv_bytes()
90
91
92 def _python_exit():
93 global _global_shutdown
94 _global_shutdown = True
95 items = list(_threads_wakeups.items())
96 for _, thread_wakeup in items:
97 # call not protected by ProcessPoolExecutor._shutdown_lock
98 thread_wakeup.wakeup()
99 for t, _ in items:
100 t.join()
101
102 # Register for `_python_exit()` to be called just before joining all
103 # non-daemon threads. This is used instead of `atexit.register()` for
104 # compatibility with subinterpreters, which no longer support daemon threads.
105 # See bpo-39812 for context.
106 threading._register_atexit(_python_exit)
107
108 # Controls how many more calls than processes will be queued in the call queue.
109 # A smaller number will mean that processes spend more time idle waiting for
110 # work while a larger number will make Future.cancel() succeed less frequently
111 # (Futures in the call queue cannot be cancelled).
112 EXTRA_QUEUED_CALLS = 1
113
114
115 # On Windows, WaitForMultipleObjects is used to wait for processes to finish.
116 # It can wait on, at most, 63 objects. There is an overhead of two objects:
117 # - the result queue reader
118 # - the thread wakeup reader
119 _MAX_WINDOWS_WORKERS = 63 - 2
120
121 # Hack to embed stringification of remote traceback in local traceback
122
123 class ESC[4;38;5;81m_RemoteTraceback(ESC[4;38;5;149mException):
124 def __init__(self, tb):
125 self.tb = tb
126 def __str__(self):
127 return self.tb
128
129 class ESC[4;38;5;81m_ExceptionWithTraceback:
130 def __init__(self, exc, tb):
131 tb = ''.join(format_exception(type(exc), exc, tb))
132 self.exc = exc
133 # Traceback object needs to be garbage-collected as its frames
134 # contain references to all the objects in the exception scope
135 self.exc.__traceback__ = None
136 self.tb = '\n"""\n%s"""' % tb
137 def __reduce__(self):
138 return _rebuild_exc, (self.exc, self.tb)
139
140 def _rebuild_exc(exc, tb):
141 exc.__cause__ = _RemoteTraceback(tb)
142 return exc
143
144 class ESC[4;38;5;81m_WorkItem(ESC[4;38;5;149mobject):
145 def __init__(self, future, fn, args, kwargs):
146 self.future = future
147 self.fn = fn
148 self.args = args
149 self.kwargs = kwargs
150
151 class ESC[4;38;5;81m_ResultItem(ESC[4;38;5;149mobject):
152 def __init__(self, work_id, exception=None, result=None, exit_pid=None):
153 self.work_id = work_id
154 self.exception = exception
155 self.result = result
156 self.exit_pid = exit_pid
157
158 class ESC[4;38;5;81m_CallItem(ESC[4;38;5;149mobject):
159 def __init__(self, work_id, fn, args, kwargs):
160 self.work_id = work_id
161 self.fn = fn
162 self.args = args
163 self.kwargs = kwargs
164
165
166 class ESC[4;38;5;81m_SafeQueue(ESC[4;38;5;149mQueue):
167 """Safe Queue set exception to the future object linked to a job"""
168 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
169 thread_wakeup):
170 self.pending_work_items = pending_work_items
171 self.shutdown_lock = shutdown_lock
172 self.thread_wakeup = thread_wakeup
173 super().__init__(max_size, ctx=ctx)
174
175 def _on_queue_feeder_error(self, e, obj):
176 if isinstance(obj, _CallItem):
177 tb = format_exception(type(e), e, e.__traceback__)
178 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
179 work_item = self.pending_work_items.pop(obj.work_id, None)
180 with self.shutdown_lock:
181 self.thread_wakeup.wakeup()
182 # work_item can be None if another process terminated. In this
183 # case, the executor_manager_thread fails all work_items
184 # with BrokenProcessPool
185 if work_item is not None:
186 work_item.future.set_exception(e)
187 else:
188 super()._on_queue_feeder_error(e, obj)
189
190
191 def _get_chunks(*iterables, chunksize):
192 """ Iterates over zip()ed iterables in chunks. """
193 it = zip(*iterables)
194 while True:
195 chunk = tuple(itertools.islice(it, chunksize))
196 if not chunk:
197 return
198 yield chunk
199
200
201 def _process_chunk(fn, chunk):
202 """ Processes a chunk of an iterable passed to map.
203
204 Runs the function passed to map() on a chunk of the
205 iterable passed to map.
206
207 This function is run in a separate process.
208
209 """
210 return [fn(*args) for args in chunk]
211
212
213 def _sendback_result(result_queue, work_id, result=None, exception=None,
214 exit_pid=None):
215 """Safely send back the given result or exception"""
216 try:
217 result_queue.put(_ResultItem(work_id, result=result,
218 exception=exception, exit_pid=exit_pid))
219 except BaseException as e:
220 exc = _ExceptionWithTraceback(e, e.__traceback__)
221 result_queue.put(_ResultItem(work_id, exception=exc,
222 exit_pid=exit_pid))
223
224
225 def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
226 """Evaluates calls from call_queue and places the results in result_queue.
227
228 This worker is run in a separate process.
229
230 Args:
231 call_queue: A ctx.Queue of _CallItems that will be read and
232 evaluated by the worker.
233 result_queue: A ctx.Queue of _ResultItems that will written
234 to by the worker.
235 initializer: A callable initializer, or None
236 initargs: A tuple of args for the initializer
237 """
238 if initializer is not None:
239 try:
240 initializer(*initargs)
241 except BaseException:
242 _base.LOGGER.critical('Exception in initializer:', exc_info=True)
243 # The parent will notice that the process stopped and
244 # mark the pool broken
245 return
246 num_tasks = 0
247 exit_pid = None
248 while True:
249 call_item = call_queue.get(block=True)
250 if call_item is None:
251 # Wake up queue management thread
252 result_queue.put(os.getpid())
253 return
254
255 if max_tasks is not None:
256 num_tasks += 1
257 if num_tasks >= max_tasks:
258 exit_pid = os.getpid()
259
260 try:
261 r = call_item.fn(*call_item.args, **call_item.kwargs)
262 except BaseException as e:
263 exc = _ExceptionWithTraceback(e, e.__traceback__)
264 _sendback_result(result_queue, call_item.work_id, exception=exc,
265 exit_pid=exit_pid)
266 else:
267 _sendback_result(result_queue, call_item.work_id, result=r,
268 exit_pid=exit_pid)
269 del r
270
271 # Liberate the resource as soon as possible, to avoid holding onto
272 # open files or shared memory that is not needed anymore
273 del call_item
274
275 if exit_pid is not None:
276 return
277
278
279 class ESC[4;38;5;81m_ExecutorManagerThread(ESC[4;38;5;149mthreadingESC[4;38;5;149m.ESC[4;38;5;149mThread):
280 """Manages the communication between this process and the worker processes.
281
282 The manager is run in a local thread.
283
284 Args:
285 executor: A reference to the ProcessPoolExecutor that owns
286 this thread. A weakref will be own by the manager as well as
287 references to internal objects used to introspect the state of
288 the executor.
289 """
290
291 def __init__(self, executor):
292 # Store references to necessary internals of the executor.
293
294 # A _ThreadWakeup to allow waking up the queue_manager_thread from the
295 # main Thread and avoid deadlocks caused by permanently locked queues.
296 self.thread_wakeup = executor._executor_manager_thread_wakeup
297 self.shutdown_lock = executor._shutdown_lock
298
299 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
300 # to determine if the ProcessPoolExecutor has been garbage collected
301 # and that the manager can exit.
302 # When the executor gets garbage collected, the weakref callback
303 # will wake up the queue management thread so that it can terminate
304 # if there is no pending work item.
305 def weakref_cb(_,
306 thread_wakeup=self.thread_wakeup,
307 shutdown_lock=self.shutdown_lock):
308 mp.util.debug('Executor collected: triggering callback for'
309 ' QueueManager wakeup')
310 with shutdown_lock:
311 thread_wakeup.wakeup()
312
313 self.executor_reference = weakref.ref(executor, weakref_cb)
314
315 # A list of the ctx.Process instances used as workers.
316 self.processes = executor._processes
317
318 # A ctx.Queue that will be filled with _CallItems derived from
319 # _WorkItems for processing by the process workers.
320 self.call_queue = executor._call_queue
321
322 # A ctx.SimpleQueue of _ResultItems generated by the process workers.
323 self.result_queue = executor._result_queue
324
325 # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
326 self.work_ids_queue = executor._work_ids
327
328 # Maximum number of tasks a worker process can execute before
329 # exiting safely
330 self.max_tasks_per_child = executor._max_tasks_per_child
331
332 # A dict mapping work ids to _WorkItems e.g.
333 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
334 self.pending_work_items = executor._pending_work_items
335
336 super().__init__()
337
338 def run(self):
339 # Main loop for the executor manager thread.
340
341 while True:
342 self.add_call_item_to_queue()
343
344 result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
345
346 if is_broken:
347 self.terminate_broken(cause)
348 return
349 if result_item is not None:
350 self.process_result_item(result_item)
351
352 process_exited = result_item.exit_pid is not None
353 if process_exited:
354 p = self.processes.pop(result_item.exit_pid)
355 p.join()
356
357 # Delete reference to result_item to avoid keeping references
358 # while waiting on new results.
359 del result_item
360
361 if executor := self.executor_reference():
362 if process_exited:
363 with self.shutdown_lock:
364 executor._adjust_process_count()
365 else:
366 executor._idle_worker_semaphore.release()
367 del executor
368
369 if self.is_shutting_down():
370 self.flag_executor_shutting_down()
371
372 # When only canceled futures remain in pending_work_items, our
373 # next call to wait_result_broken_or_wakeup would hang forever.
374 # This makes sure we have some running futures or none at all.
375 self.add_call_item_to_queue()
376
377 # Since no new work items can be added, it is safe to shutdown
378 # this thread if there are no pending work items.
379 if not self.pending_work_items:
380 self.join_executor_internals()
381 return
382
383 def add_call_item_to_queue(self):
384 # Fills call_queue with _WorkItems from pending_work_items.
385 # This function never blocks.
386 while True:
387 if self.call_queue.full():
388 return
389 try:
390 work_id = self.work_ids_queue.get(block=False)
391 except queue.Empty:
392 return
393 else:
394 work_item = self.pending_work_items[work_id]
395
396 if work_item.future.set_running_or_notify_cancel():
397 self.call_queue.put(_CallItem(work_id,
398 work_item.fn,
399 work_item.args,
400 work_item.kwargs),
401 block=True)
402 else:
403 del self.pending_work_items[work_id]
404 continue
405
406 def wait_result_broken_or_wakeup(self):
407 # Wait for a result to be ready in the result_queue while checking
408 # that all worker processes are still running, or for a wake up
409 # signal send. The wake up signals come either from new tasks being
410 # submitted, from the executor being shutdown/gc-ed, or from the
411 # shutdown of the python interpreter.
412 result_reader = self.result_queue._reader
413 assert not self.thread_wakeup._closed
414 wakeup_reader = self.thread_wakeup._reader
415 readers = [result_reader, wakeup_reader]
416 worker_sentinels = [p.sentinel for p in list(self.processes.values())]
417 ready = mp.connection.wait(readers + worker_sentinels)
418
419 cause = None
420 is_broken = True
421 result_item = None
422 if result_reader in ready:
423 try:
424 result_item = result_reader.recv()
425 is_broken = False
426 except BaseException as e:
427 cause = format_exception(type(e), e, e.__traceback__)
428
429 elif wakeup_reader in ready:
430 is_broken = False
431
432 # No need to hold the _shutdown_lock here because:
433 # 1. we're the only thread to use the wakeup reader
434 # 2. we're also the only thread to call thread_wakeup.close()
435 # 3. we want to avoid a possible deadlock when both reader and writer
436 # would block (gh-105829)
437 self.thread_wakeup.clear()
438
439 return result_item, is_broken, cause
440
441 def process_result_item(self, result_item):
442 # Process the received a result_item. This can be either the PID of a
443 # worker that exited gracefully or a _ResultItem
444
445 if isinstance(result_item, int):
446 # Clean shutdown of a worker using its PID
447 # (avoids marking the executor broken)
448 assert self.is_shutting_down()
449 p = self.processes.pop(result_item)
450 p.join()
451 if not self.processes:
452 self.join_executor_internals()
453 return
454 else:
455 # Received a _ResultItem so mark the future as completed.
456 work_item = self.pending_work_items.pop(result_item.work_id, None)
457 # work_item can be None if another process terminated (see above)
458 if work_item is not None:
459 if result_item.exception:
460 work_item.future.set_exception(result_item.exception)
461 else:
462 work_item.future.set_result(result_item.result)
463
464 def is_shutting_down(self):
465 # Check whether we should start shutting down the executor.
466 executor = self.executor_reference()
467 # No more work items can be added if:
468 # - The interpreter is shutting down OR
469 # - The executor that owns this worker has been collected OR
470 # - The executor that owns this worker has been shutdown.
471 return (_global_shutdown or executor is None
472 or executor._shutdown_thread)
473
474 def terminate_broken(self, cause):
475 # Terminate the executor because it is in a broken state. The cause
476 # argument can be used to display more information on the error that
477 # lead the executor into becoming broken.
478
479 # Mark the process pool broken so that submits fail right now.
480 executor = self.executor_reference()
481 if executor is not None:
482 executor._broken = ('A child process terminated '
483 'abruptly, the process pool is not '
484 'usable anymore')
485 executor._shutdown_thread = True
486 executor = None
487
488 # All pending tasks are to be marked failed with the following
489 # BrokenProcessPool error
490 bpe = BrokenProcessPool("A process in the process pool was "
491 "terminated abruptly while the future was "
492 "running or pending.")
493 if cause is not None:
494 bpe.__cause__ = _RemoteTraceback(
495 f"\n'''\n{''.join(cause)}'''")
496
497 # Mark pending tasks as failed.
498 for work_id, work_item in self.pending_work_items.items():
499 work_item.future.set_exception(bpe)
500 # Delete references to object. See issue16284
501 del work_item
502 self.pending_work_items.clear()
503
504 # Terminate remaining workers forcibly: the queues or their
505 # locks may be in a dirty state and block forever.
506 for p in self.processes.values():
507 p.terminate()
508
509 # Prevent queue writing to a pipe which is no longer read.
510 # https://github.com/python/cpython/issues/94777
511 self.call_queue._reader.close()
512
513 # gh-107219: Close the connection writer which can unblock
514 # Queue._feed() if it was stuck in send_bytes().
515 if sys.platform == 'win32':
516 self.call_queue._writer.close()
517
518 # clean up resources
519 self.join_executor_internals()
520
521 def flag_executor_shutting_down(self):
522 # Flag the executor as shutting down and cancel remaining tasks if
523 # requested as early as possible if it is not gc-ed yet.
524 executor = self.executor_reference()
525 if executor is not None:
526 executor._shutdown_thread = True
527 # Cancel pending work items if requested.
528 if executor._cancel_pending_futures:
529 # Cancel all pending futures and update pending_work_items
530 # to only have futures that are currently running.
531 new_pending_work_items = {}
532 for work_id, work_item in self.pending_work_items.items():
533 if not work_item.future.cancel():
534 new_pending_work_items[work_id] = work_item
535 self.pending_work_items = new_pending_work_items
536 # Drain work_ids_queue since we no longer need to
537 # add items to the call queue.
538 while True:
539 try:
540 self.work_ids_queue.get_nowait()
541 except queue.Empty:
542 break
543 # Make sure we do this only once to not waste time looping
544 # on running processes over and over.
545 executor._cancel_pending_futures = False
546
547 def shutdown_workers(self):
548 n_children_to_stop = self.get_n_children_alive()
549 n_sentinels_sent = 0
550 # Send the right number of sentinels, to make sure all children are
551 # properly terminated.
552 while (n_sentinels_sent < n_children_to_stop
553 and self.get_n_children_alive() > 0):
554 for i in range(n_children_to_stop - n_sentinels_sent):
555 try:
556 self.call_queue.put_nowait(None)
557 n_sentinels_sent += 1
558 except queue.Full:
559 break
560
561 def join_executor_internals(self):
562 self.shutdown_workers()
563 # Release the queue's resources as soon as possible.
564 self.call_queue.close()
565 self.call_queue.join_thread()
566 with self.shutdown_lock:
567 self.thread_wakeup.close()
568 # If .join() is not called on the created processes then
569 # some ctx.Queue methods may deadlock on Mac OS X.
570 for p in self.processes.values():
571 p.join()
572
573 def get_n_children_alive(self):
574 # This is an upper bound on the number of children alive.
575 return sum(p.is_alive() for p in self.processes.values())
576
577
578 _system_limits_checked = False
579 _system_limited = None
580
581
582 def _check_system_limits():
583 global _system_limits_checked, _system_limited
584 if _system_limits_checked:
585 if _system_limited:
586 raise NotImplementedError(_system_limited)
587 _system_limits_checked = True
588 try:
589 import multiprocessing.synchronize
590 except ImportError:
591 _system_limited = (
592 "This Python build lacks multiprocessing.synchronize, usually due "
593 "to named semaphores being unavailable on this platform."
594 )
595 raise NotImplementedError(_system_limited)
596 try:
597 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
598 except (AttributeError, ValueError):
599 # sysconf not available or setting not available
600 return
601 if nsems_max == -1:
602 # indetermined limit, assume that limit is determined
603 # by available memory only
604 return
605 if nsems_max >= 256:
606 # minimum number of semaphores available
607 # according to POSIX
608 return
609 _system_limited = ("system provides too few semaphores (%d"
610 " available, 256 necessary)" % nsems_max)
611 raise NotImplementedError(_system_limited)
612
613
614 def _chain_from_iterable_of_lists(iterable):
615 """
616 Specialized implementation of itertools.chain.from_iterable.
617 Each item in *iterable* should be a list. This function is
618 careful not to keep references to yielded objects.
619 """
620 for element in iterable:
621 element.reverse()
622 while element:
623 yield element.pop()
624
625
626 class ESC[4;38;5;81mBrokenProcessPool(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mBrokenExecutor):
627 """
628 Raised when a process in a ProcessPoolExecutor terminated abruptly
629 while a future was in the running state.
630 """
631
632
633 class ESC[4;38;5;81mProcessPoolExecutor(ESC[4;38;5;149m_baseESC[4;38;5;149m.ESC[4;38;5;149mExecutor):
634 def __init__(self, max_workers=None, mp_context=None,
635 initializer=None, initargs=(), *, max_tasks_per_child=None):
636 """Initializes a new ProcessPoolExecutor instance.
637
638 Args:
639 max_workers: The maximum number of processes that can be used to
640 execute the given calls. If None or not given then as many
641 worker processes will be created as the machine has processors.
642 mp_context: A multiprocessing context to launch the workers. This
643 object should provide SimpleQueue, Queue and Process. Useful
644 to allow specific multiprocessing start methods.
645 initializer: A callable used to initialize worker processes.
646 initargs: A tuple of arguments to pass to the initializer.
647 max_tasks_per_child: The maximum number of tasks a worker process
648 can complete before it will exit and be replaced with a fresh
649 worker process. The default of None means worker process will
650 live as long as the executor. Requires a non-'fork' mp_context
651 start method. When given, we default to using 'spawn' if no
652 mp_context is supplied.
653 """
654 _check_system_limits()
655
656 if max_workers is None:
657 self._max_workers = os.cpu_count() or 1
658 if sys.platform == 'win32':
659 self._max_workers = min(_MAX_WINDOWS_WORKERS,
660 self._max_workers)
661 else:
662 if max_workers <= 0:
663 raise ValueError("max_workers must be greater than 0")
664 elif (sys.platform == 'win32' and
665 max_workers > _MAX_WINDOWS_WORKERS):
666 raise ValueError(
667 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
668
669 self._max_workers = max_workers
670
671 if mp_context is None:
672 if max_tasks_per_child is not None:
673 mp_context = mp.get_context("spawn")
674 else:
675 mp_context = mp.get_context()
676 self._mp_context = mp_context
677
678 # https://github.com/python/cpython/issues/90622
679 self._safe_to_dynamically_spawn_children = (
680 self._mp_context.get_start_method(allow_none=False) != "fork")
681
682 if initializer is not None and not callable(initializer):
683 raise TypeError("initializer must be a callable")
684 self._initializer = initializer
685 self._initargs = initargs
686
687 if max_tasks_per_child is not None:
688 if not isinstance(max_tasks_per_child, int):
689 raise TypeError("max_tasks_per_child must be an integer")
690 elif max_tasks_per_child <= 0:
691 raise ValueError("max_tasks_per_child must be >= 1")
692 if self._mp_context.get_start_method(allow_none=False) == "fork":
693 # https://github.com/python/cpython/issues/90622
694 raise ValueError("max_tasks_per_child is incompatible with"
695 " the 'fork' multiprocessing start method;"
696 " supply a different mp_context.")
697 self._max_tasks_per_child = max_tasks_per_child
698
699 # Management thread
700 self._executor_manager_thread = None
701
702 # Map of pids to processes
703 self._processes = {}
704
705 # Shutdown is a two-step process.
706 self._shutdown_thread = False
707 self._shutdown_lock = threading.Lock()
708 self._idle_worker_semaphore = threading.Semaphore(0)
709 self._broken = False
710 self._queue_count = 0
711 self._pending_work_items = {}
712 self._cancel_pending_futures = False
713
714 # _ThreadWakeup is a communication channel used to interrupt the wait
715 # of the main loop of executor_manager_thread from another thread (e.g.
716 # when calling executor.submit or executor.shutdown). We do not use the
717 # _result_queue to send wakeup signals to the executor_manager_thread
718 # as it could result in a deadlock if a worker process dies with the
719 # _result_queue write lock still acquired.
720 #
721 # _shutdown_lock must be locked to access _ThreadWakeup.close() and
722 # .wakeup(). Care must also be taken to not call clear or close from
723 # more than one thread since _ThreadWakeup.clear() is not protected by
724 # the _shutdown_lock
725 self._executor_manager_thread_wakeup = _ThreadWakeup()
726
727 # Create communication channels for the executor
728 # Make the call queue slightly larger than the number of processes to
729 # prevent the worker processes from idling. But don't make it too big
730 # because futures in the call queue cannot be cancelled.
731 queue_size = self._max_workers + EXTRA_QUEUED_CALLS
732 self._call_queue = _SafeQueue(
733 max_size=queue_size, ctx=self._mp_context,
734 pending_work_items=self._pending_work_items,
735 shutdown_lock=self._shutdown_lock,
736 thread_wakeup=self._executor_manager_thread_wakeup)
737 # Killed worker processes can produce spurious "broken pipe"
738 # tracebacks in the queue's own worker thread. But we detect killed
739 # processes anyway, so silence the tracebacks.
740 self._call_queue._ignore_epipe = True
741 self._result_queue = mp_context.SimpleQueue()
742 self._work_ids = queue.Queue()
743
744 def _start_executor_manager_thread(self):
745 if self._executor_manager_thread is None:
746 # Start the processes so that their sentinels are known.
747 if not self._safe_to_dynamically_spawn_children: # ie, using fork.
748 self._launch_processes()
749 self._executor_manager_thread = _ExecutorManagerThread(self)
750 self._executor_manager_thread.start()
751 _threads_wakeups[self._executor_manager_thread] = \
752 self._executor_manager_thread_wakeup
753
754 def _adjust_process_count(self):
755 # if there's an idle process, we don't need to spawn a new one.
756 if self._idle_worker_semaphore.acquire(blocking=False):
757 return
758
759 process_count = len(self._processes)
760 if process_count < self._max_workers:
761 # Assertion disabled as this codepath is also used to replace a
762 # worker that unexpectedly dies, even when using the 'fork' start
763 # method. That means there is still a potential deadlock bug. If a
764 # 'fork' mp_context worker dies, we'll be forking a new one when
765 # we know a thread is running (self._executor_manager_thread).
766 #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
767 self._spawn_process()
768
769 def _launch_processes(self):
770 # https://github.com/python/cpython/issues/90622
771 assert not self._executor_manager_thread, (
772 'Processes cannot be fork()ed after the thread has started, '
773 'deadlock in the child processes could result.')
774 for _ in range(len(self._processes), self._max_workers):
775 self._spawn_process()
776
777 def _spawn_process(self):
778 p = self._mp_context.Process(
779 target=_process_worker,
780 args=(self._call_queue,
781 self._result_queue,
782 self._initializer,
783 self._initargs,
784 self._max_tasks_per_child))
785 p.start()
786 self._processes[p.pid] = p
787
788 def submit(self, fn, /, *args, **kwargs):
789 with self._shutdown_lock:
790 if self._broken:
791 raise BrokenProcessPool(self._broken)
792 if self._shutdown_thread:
793 raise RuntimeError('cannot schedule new futures after shutdown')
794 if _global_shutdown:
795 raise RuntimeError('cannot schedule new futures after '
796 'interpreter shutdown')
797
798 f = _base.Future()
799 w = _WorkItem(f, fn, args, kwargs)
800
801 self._pending_work_items[self._queue_count] = w
802 self._work_ids.put(self._queue_count)
803 self._queue_count += 1
804 # Wake up queue management thread
805 self._executor_manager_thread_wakeup.wakeup()
806
807 if self._safe_to_dynamically_spawn_children:
808 self._adjust_process_count()
809 self._start_executor_manager_thread()
810 return f
811 submit.__doc__ = _base.Executor.submit.__doc__
812
813 def map(self, fn, *iterables, timeout=None, chunksize=1):
814 """Returns an iterator equivalent to map(fn, iter).
815
816 Args:
817 fn: A callable that will take as many arguments as there are
818 passed iterables.
819 timeout: The maximum number of seconds to wait. If None, then there
820 is no limit on the wait time.
821 chunksize: If greater than one, the iterables will be chopped into
822 chunks of size chunksize and submitted to the process pool.
823 If set to one, the items in the list will be sent one at a time.
824
825 Returns:
826 An iterator equivalent to: map(func, *iterables) but the calls may
827 be evaluated out-of-order.
828
829 Raises:
830 TimeoutError: If the entire result iterator could not be generated
831 before the given timeout.
832 Exception: If fn(*args) raises for any values.
833 """
834 if chunksize < 1:
835 raise ValueError("chunksize must be >= 1.")
836
837 results = super().map(partial(_process_chunk, fn),
838 _get_chunks(*iterables, chunksize=chunksize),
839 timeout=timeout)
840 return _chain_from_iterable_of_lists(results)
841
842 def shutdown(self, wait=True, *, cancel_futures=False):
843 with self._shutdown_lock:
844 self._cancel_pending_futures = cancel_futures
845 self._shutdown_thread = True
846 if self._executor_manager_thread_wakeup is not None:
847 # Wake up queue management thread
848 self._executor_manager_thread_wakeup.wakeup()
849
850 if self._executor_manager_thread is not None and wait:
851 self._executor_manager_thread.join()
852 # To reduce the risk of opening too many files, remove references to
853 # objects that use file descriptors.
854 self._executor_manager_thread = None
855 self._call_queue = None
856 if self._result_queue is not None and wait:
857 self._result_queue.close()
858 self._result_queue = None
859 self._processes = None
860 self._executor_manager_thread_wakeup = None
861
862 shutdown.__doc__ = _base.Executor.shutdown.__doc__